Source code for endgame.clustering.scalable

"""Scalable and miscellaneous clustering: BIRCH, Mean Shift, FINCH.

BIRCH and Mean Shift wrap sklearn. FINCH wraps finch-clust (optional).
"""

from __future__ import annotations

import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils.validation import check_array, check_is_fitted

try:
    from finch import FINCH as _FINCH
    HAS_FINCH = True
except ImportError:
    HAS_FINCH = False


[docs] class BIRCHClusterer(BaseEstimator, ClusterMixin): """BIRCH incremental hierarchical clustering. Builds a CF-tree (Clustering Feature tree) for incremental clustering. Designed for very large datasets or streaming scenarios. Parameters ---------- n_clusters : int or None, default=3 Final number of clusters. If None, the subclusters from the CF-tree leaf nodes are returned directly. threshold : float, default=0.5 CF-tree leaf radius threshold. branching_factor : int, default=50 Maximum CF entries per node. compute_labels : bool, default=True Whether to compute labels for training data. Attributes ---------- labels_ : ndarray of shape (n_samples,) Cluster labels. subcluster_centers_ : ndarray CF-tree subcluster centres. n_clusters_ : int Number of clusters. """ def __init__( self, n_clusters: int | None = 3, threshold: float = 0.5, branching_factor: int = 50, compute_labels: bool = True, ): self.n_clusters = n_clusters self.threshold = threshold self.branching_factor = branching_factor self.compute_labels = compute_labels
[docs] def fit(self, X: ArrayLike, y=None) -> BIRCHClusterer: """Fit BIRCH.""" from sklearn.cluster import Birch X = check_array(X) self.model_ = Birch( n_clusters=self.n_clusters, threshold=self.threshold, branching_factor=self.branching_factor, compute_labels=self.compute_labels, ) self.model_.fit(X) self.labels_ = self.model_.labels_ self.subcluster_centers_ = self.model_.subcluster_centers_ self.n_clusters_ = len(set(self.labels_)) - (1 if -1 in self.labels_ else 0) self.n_features_in_ = X.shape[1] return self
[docs] def predict(self, X: ArrayLike) -> np.ndarray: """Predict cluster labels for new data.""" check_is_fitted(self, ["model_"]) X = check_array(X) return self.model_.predict(X)
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return cluster labels.""" self.fit(X) return self.labels_
[docs] def partial_fit(self, X: ArrayLike, y=None) -> BIRCHClusterer: """Incremental fit on a batch of data.""" X = check_array(X) if not hasattr(self, "model_"): from sklearn.cluster import Birch self.model_ = Birch( n_clusters=self.n_clusters, threshold=self.threshold, branching_factor=self.branching_factor, ) self.model_.partial_fit(X) if hasattr(self.model_, "labels_"): self.labels_ = self.model_.labels_ return self
[docs] class MeanShiftClusterer(BaseEstimator, ClusterMixin): """Mean Shift mode-finding clustering. Non-parametric mode finding via kernel density gradient ascent. Automatically determines k by finding density modes. Parameters ---------- bandwidth : float or None, default=None Kernel bandwidth. If None, estimated automatically. bin_seeding : bool, default=False Speed up by discretising seed points. min_bin_freq : int, default=1 Minimum bin frequency for seeding. cluster_all : bool, default=True If False, orphan points get label -1. n_jobs : int, default=-1 Parallel jobs. Attributes ---------- labels_ : ndarray of shape (n_samples,) Cluster labels. cluster_centers_ : ndarray of shape (n_clusters, n_features) Mode locations. n_clusters_ : int Number of clusters found. """ def __init__( self, bandwidth: float | None = None, bin_seeding: bool = False, min_bin_freq: int = 1, cluster_all: bool = True, n_jobs: int = -1, ): self.bandwidth = bandwidth self.bin_seeding = bin_seeding self.min_bin_freq = min_bin_freq self.cluster_all = cluster_all self.n_jobs = n_jobs
[docs] def fit(self, X: ArrayLike, y=None) -> MeanShiftClusterer: """Fit Mean Shift.""" from sklearn.cluster import MeanShift X = check_array(X) self.model_ = MeanShift( bandwidth=self.bandwidth, bin_seeding=self.bin_seeding, min_bin_freq=self.min_bin_freq, cluster_all=self.cluster_all, n_jobs=self.n_jobs, ) self.model_.fit(X) self.labels_ = self.model_.labels_ self.cluster_centers_ = self.model_.cluster_centers_ self.n_clusters_ = len(self.cluster_centers_) self.n_features_in_ = X.shape[1] return self
[docs] def predict(self, X: ArrayLike) -> np.ndarray: """Predict cluster labels for new data.""" check_is_fitted(self, ["model_"]) X = check_array(X) return self.model_.predict(X)
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return cluster labels.""" self.fit(X) return self.labels_
[docs] class FINCHClusterer(BaseEstimator, ClusterMixin): """FINCH: First Integer Neighbour Clustering Hierarchy. Zero-parameter clustering that uses first-neighbour relations to recursively merge clusters in O(n log n) with O(n) memory. Produces a hierarchy of partitions in 4-10 steps. Requires the ``finch-clust`` package. Parameters ---------- req_clust : int or None, default=None Requested number of clusters. If None, returns the partition at the first hierarchy level where all points are in the same cluster (i.e. the finest reasonable partition). distance : str, default='euclidean' Distance metric: 'euclidean' or 'cosine'. verbose : bool, default=False Print hierarchy information. Attributes ---------- labels_ : ndarray of shape (n_samples,) Cluster labels at the selected partition level. all_partitions_ : ndarray of shape (n_samples, n_levels) All hierarchy levels. n_clusters_ : int Number of clusters at the selected level. n_levels_ : int Number of hierarchy levels found. References ---------- Sarfraz et al., "Efficient Parameter-Free Clustering Using First Neighbor Relations", CVPR 2019. """ def __init__( self, req_clust: int | None = None, distance: str = "euclidean", verbose: bool = False, ): self.req_clust = req_clust self.distance = distance self.verbose = verbose
[docs] def fit(self, X: ArrayLike, y=None) -> FINCHClusterer: """Fit FINCH.""" if not HAS_FINCH: raise ImportError( "finch-clust is required for FINCHClusterer. " "Install with: pip install finch-clust" ) X = check_array(X, dtype=np.float64) partitions, num_clust, _ = _FINCH( X, req_clust=self.req_clust, distance=self.distance, verbose=self.verbose, ) self.all_partitions_ = partitions self.n_levels_ = partitions.shape[1] # Select the appropriate partition level if self.req_clust is not None: # The library returns partitions; last column is the requested k self.labels_ = partitions[:, -1] else: # Use the first (finest) partition self.labels_ = partitions[:, 0] self.n_clusters_ = len(set(self.labels_)) self.n_features_in_ = X.shape[1] return self
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return cluster labels.""" self.fit(X) return self.labels_