"""Graph/spectral clustering: Spectral Clustering and Affinity Propagation.
Both wrap sklearn with competition-tuned defaults.
"""
from __future__ import annotations
import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils.validation import check_array, check_is_fitted
[docs]
class SpectralClusterer(BaseEstimator, ClusterMixin):
"""Spectral clustering via graph Laplacian eigenvectors.
Constructs a similarity graph, computes eigenvectors of the graph
Laplacian, then runs k-means in the spectral embedding. Excels at
non-convex clusters (concentric circles, spirals).
Parameters
----------
n_clusters : int, default=8
Number of clusters.
affinity : str, default='rbf'
Similarity measure: 'rbf', 'nearest_neighbors', 'precomputed'.
gamma : float or None, default=None
RBF kernel bandwidth. If None, uses 1/n_features.
n_neighbors : int, default=10
Number of neighbours for 'nearest_neighbors' affinity.
n_init : int, default=10
k-means initializations in spectral space.
assign_labels : str, default='kmeans'
Label assignment: 'kmeans' or 'discretize'.
random_state : int or None, default=None
Random seed.
n_jobs : int, default=-1
Parallel jobs.
Attributes
----------
labels_ : ndarray of shape (n_samples,)
Cluster labels.
affinity_matrix_ : ndarray of shape (n_samples, n_samples)
Computed affinity matrix.
n_clusters_ : int
Number of clusters.
"""
def __init__(
self,
n_clusters: int = 8,
affinity: str = "rbf",
gamma: float | None = None,
n_neighbors: int = 10,
n_init: int = 10,
assign_labels: str = "kmeans",
random_state: int | None = None,
n_jobs: int = -1,
):
self.n_clusters = n_clusters
self.affinity = affinity
self.gamma = gamma
self.n_neighbors = n_neighbors
self.n_init = n_init
self.assign_labels = assign_labels
self.random_state = random_state
self.n_jobs = n_jobs
[docs]
def fit(self, X: ArrayLike, y=None) -> SpectralClusterer:
"""Fit spectral clustering."""
from sklearn.cluster import SpectralClustering
X = check_array(X)
# sklearn requires gamma to be a positive float (validated for all affinities)
gamma = self.gamma
if gamma is None:
gamma = 1.0 / X.shape[1]
self.model_ = SpectralClustering(
n_clusters=self.n_clusters,
affinity=self.affinity,
gamma=gamma,
n_neighbors=self.n_neighbors,
n_init=self.n_init,
assign_labels=self.assign_labels,
random_state=self.random_state,
n_jobs=self.n_jobs,
)
self.labels_ = self.model_.fit_predict(X)
self.affinity_matrix_ = self.model_.affinity_matrix_
self.n_clusters_ = self.n_clusters
self.n_features_in_ = X.shape[1]
return self
[docs]
def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray:
"""Fit and return cluster labels."""
self.fit(X)
return self.labels_
[docs]
class AffinityPropagationClusterer(BaseEstimator, ClusterMixin):
"""Affinity Propagation clustering via message passing.
Simultaneously chooses exemplars and assigns points via responsibility
and availability messages. No k required.
Parameters
----------
damping : float, default=0.5
Damping factor (0.5 to 1). Higher = more stable but slower.
max_iter : int, default=200
Maximum message-passing iterations.
convergence_iter : int, default=15
Iterations without change for convergence.
preference : float or array-like or None, default=None
Preference for each point to be an exemplar. Larger = more clusters.
None uses the median of the similarity matrix.
affinity : str, default='euclidean'
Affinity type: 'euclidean' or 'precomputed'.
random_state : int or None, default=None
Random seed.
Attributes
----------
labels_ : ndarray of shape (n_samples,)
Cluster labels.
cluster_centers_indices_ : ndarray
Indices of exemplar points.
cluster_centers_ : ndarray of shape (n_clusters, n_features)
Exemplar coordinates.
n_clusters_ : int
Number of clusters found.
n_iter_ : int
Iterations run.
"""
def __init__(
self,
damping: float = 0.5,
max_iter: int = 200,
convergence_iter: int = 15,
preference: float | np.ndarray | None = None,
affinity: str = "euclidean",
random_state: int | None = None,
):
self.damping = damping
self.max_iter = max_iter
self.convergence_iter = convergence_iter
self.preference = preference
self.affinity = affinity
self.random_state = random_state
[docs]
def fit(self, X: ArrayLike, y=None) -> AffinityPropagationClusterer:
"""Fit Affinity Propagation."""
from sklearn.cluster import AffinityPropagation
X = check_array(X)
self.model_ = AffinityPropagation(
damping=self.damping,
max_iter=self.max_iter,
convergence_iter=self.convergence_iter,
preference=self.preference,
affinity=self.affinity,
random_state=self.random_state,
)
self.model_.fit(X)
self.labels_ = self.model_.labels_
self.cluster_centers_indices_ = self.model_.cluster_centers_indices_
self.cluster_centers_ = self.model_.cluster_centers_
self.n_clusters_ = len(self.cluster_centers_indices_)
self.n_iter_ = self.model_.n_iter_
self.n_features_in_ = X.shape[1]
return self
[docs]
def predict(self, X: ArrayLike) -> np.ndarray:
"""Predict cluster labels for new data."""
check_is_fitted(self, ["model_"])
X = check_array(X)
return self.model_.predict(X)
[docs]
def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray:
"""Fit and return cluster labels."""
self.fit(X)
return self.labels_