Source code for endgame.clustering.density

"""Density-based clustering: DBSCAN, HDBSCAN, OPTICS, DPC.

DBSCAN, HDBSCAN, and OPTICS wrap sklearn. DPC (Density Peaks Clustering)
is implemented from scratch.
"""

from __future__ import annotations

import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils.validation import check_array


[docs] class DBSCANClusterer(BaseEstimator, ClusterMixin): """DBSCAN density-based clustering with competition defaults. Finds arbitrary-shaped clusters and labels noise points as -1. Parameters ---------- eps : float, default=0.5 Neighbourhood radius. min_samples : int, default=5 Minimum samples in a neighbourhood for a core point. metric : str, default='euclidean' Distance metric. algorithm : str, default='auto' Nearest neighbours algorithm: 'auto', 'ball_tree', 'kd_tree', 'brute'. leaf_size : int, default=30 Leaf size for tree-based algorithms. n_jobs : int, default=-1 Parallel jobs. Attributes ---------- labels_ : ndarray of shape (n_samples,) Cluster labels (-1 for noise). core_sample_indices_ : ndarray Indices of core samples. n_clusters_ : int Number of clusters found (excluding noise). """ def __init__( self, eps: float = 0.5, min_samples: int = 5, metric: str = "euclidean", algorithm: str = "auto", leaf_size: int = 30, n_jobs: int = -1, ): self.eps = eps self.min_samples = min_samples self.metric = metric self.algorithm = algorithm self.leaf_size = leaf_size self.n_jobs = n_jobs
[docs] def fit(self, X: ArrayLike, y=None) -> DBSCANClusterer: """Fit DBSCAN.""" from sklearn.cluster import DBSCAN X = check_array(X) self.model_ = DBSCAN( eps=self.eps, min_samples=self.min_samples, metric=self.metric, algorithm=self.algorithm, leaf_size=self.leaf_size, n_jobs=self.n_jobs, ) self.model_.fit(X) self.labels_ = self.model_.labels_ self.core_sample_indices_ = self.model_.core_sample_indices_ self.n_clusters_ = len(set(self.labels_)) - (1 if -1 in self.labels_ else 0) self.n_features_in_ = X.shape[1] return self
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return cluster labels.""" self.fit(X) return self.labels_
[docs] class HDBSCANClusterer(BaseEstimator, ClusterMixin): """HDBSCAN hierarchical density-based clustering. Runs DBSCAN across all eps values simultaneously via mutual reachability MST, extracting the most stable clusters. Only real param is ``min_cluster_size``. Handles variable-density clusters. Uses sklearn's HDBSCAN (>=1.3) with fallback to the ``hdbscan`` package. Parameters ---------- min_cluster_size : int, default=15 Minimum cluster size. min_samples : int or None, default=None Core distance samples. Defaults to ``min_cluster_size``. metric : str, default='euclidean' Distance metric. cluster_selection_method : str, default='eom' Cluster extraction: 'eom' (Excess of Mass) or 'leaf'. cluster_selection_epsilon : float, default=0.0 Distance threshold for merging clusters. alpha : float, default=1.0 Mutual reachability smoothing. allow_single_cluster : bool, default=False Whether to allow a single-cluster result. n_jobs : int, default=-1 Parallel jobs. Attributes ---------- labels_ : ndarray of shape (n_samples,) Cluster labels (-1 for noise). probabilities_ : ndarray of shape (n_samples,) Cluster membership probabilities. n_clusters_ : int Number of clusters found. """ def __init__( self, min_cluster_size: int = 15, min_samples: int | None = None, metric: str = "euclidean", cluster_selection_method: str = "eom", cluster_selection_epsilon: float = 0.0, alpha: float = 1.0, allow_single_cluster: bool = False, n_jobs: int = -1, ): self.min_cluster_size = min_cluster_size self.min_samples = min_samples self.metric = metric self.cluster_selection_method = cluster_selection_method self.cluster_selection_epsilon = cluster_selection_epsilon self.alpha = alpha self.allow_single_cluster = allow_single_cluster self.n_jobs = n_jobs def _get_hdbscan(self): """Get HDBSCAN class, preferring sklearn >= 1.3.""" try: from sklearn.cluster import HDBSCAN return HDBSCAN, "sklearn" except ImportError: pass try: import hdbscan return hdbscan.HDBSCAN, "hdbscan" except ImportError: raise ImportError( "HDBSCAN requires sklearn >= 1.3 or the hdbscan package. " "Install with: pip install hdbscan" )
[docs] def fit(self, X: ArrayLike, y=None) -> HDBSCANClusterer: """Fit HDBSCAN.""" X = check_array(X) HDBSCAN, backend = self._get_hdbscan() params = { "min_cluster_size": self.min_cluster_size, "min_samples": self.min_samples, "metric": self.metric, "cluster_selection_method": self.cluster_selection_method, "alpha": self.alpha, "allow_single_cluster": self.allow_single_cluster, } if backend == "sklearn": params["cluster_selection_epsilon"] = self.cluster_selection_epsilon params["n_jobs"] = self.n_jobs else: # hdbscan package uses slightly different params params["cluster_selection_epsilon"] = self.cluster_selection_epsilon self.model_ = HDBSCAN(**params) self.model_.fit(X) self.labels_ = self.model_.labels_ self.probabilities_ = getattr(self.model_, "probabilities_", np.ones(len(X))) self.n_clusters_ = len(set(self.labels_)) - (1 if -1 in self.labels_ else 0) self.n_features_in_ = X.shape[1] return self
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return cluster labels.""" self.fit(X) return self.labels_
[docs] class OPTICSClusterer(BaseEstimator, ClusterMixin): """OPTICS ordering-based clustering. Produces a reachability plot and extracts clusters, generalizing DBSCAN to handle varying density. Parameters ---------- min_samples : int or float, default=5 Core distance parameter. max_eps : float, default=inf Maximum neighbourhood radius. metric : str, default='minkowski' Distance metric. p : float, default=2 Minkowski power (2 = Euclidean). cluster_method : str, default='xi' Extraction method: 'xi' or 'dbscan'. xi : float, default=0.05 Steepness threshold for xi extraction. min_cluster_size : int or float or None, default=None Minimum cluster size for extraction. n_jobs : int, default=-1 Parallel jobs. Attributes ---------- labels_ : ndarray of shape (n_samples,) Cluster labels (-1 for noise). reachability_ : ndarray of shape (n_samples,) Reachability distances. ordering_ : ndarray of shape (n_samples,) OPTICS ordering. n_clusters_ : int Number of clusters found. """ def __init__( self, min_samples: int | float = 5, max_eps: float = np.inf, metric: str = "minkowski", p: float = 2, cluster_method: str = "xi", xi: float = 0.05, min_cluster_size: int | float | None = None, n_jobs: int = -1, ): self.min_samples = min_samples self.max_eps = max_eps self.metric = metric self.p = p self.cluster_method = cluster_method self.xi = xi self.min_cluster_size = min_cluster_size self.n_jobs = n_jobs
[docs] def fit(self, X: ArrayLike, y=None) -> OPTICSClusterer: """Fit OPTICS.""" from sklearn.cluster import OPTICS X = check_array(X) self.model_ = OPTICS( min_samples=self.min_samples, max_eps=self.max_eps, metric=self.metric, p=self.p, cluster_method=self.cluster_method, xi=self.xi, min_cluster_size=self.min_cluster_size, n_jobs=self.n_jobs, ) self.model_.fit(X) self.labels_ = self.model_.labels_ self.reachability_ = self.model_.reachability_ self.ordering_ = self.model_.ordering_ self.n_clusters_ = len(set(self.labels_)) - (1 if -1 in self.labels_ else 0) self.n_features_in_ = X.shape[1] return self
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return cluster labels.""" self.fit(X) return self.labels_
[docs] class DensityPeaksClusterer(BaseEstimator, ClusterMixin): """Density Peaks Clustering (DPC). Cluster centres are points with simultaneously high local density (rho) and large distance to any denser point (delta). Points are assigned by following the chain to the nearest denser neighbour. Parameters ---------- n_clusters : int or None, default=None Number of clusters. If None, auto-select from the decision graph using ``gamma_threshold``. percent : float, default=2.0 Percentage of data to use as cutoff distance (d_c) for density estimation. E.g. 2.0 means d_c is the distance at the 2nd percentile of all pairwise distances. gamma_threshold : float or None, default=None If ``n_clusters`` is None, points with ``rho * delta`` above this threshold are chosen as centres. If None, uses Otsu-like thresholding on gamma values. metric : str, default='euclidean' Distance metric. random_state : int or None, default=None Random seed (for tie-breaking). Attributes ---------- labels_ : ndarray of shape (n_samples,) Cluster labels. rho_ : ndarray of shape (n_samples,) Local densities. delta_ : ndarray of shape (n_samples,) Distance to nearest denser point. centers_ : ndarray of shape (n_centers,) Indices of cluster centres. n_clusters_ : int Number of clusters found. References ---------- Rodriguez & Laio, "Clustering by fast search and find of density peaks", Science, 2014. """ def __init__( self, n_clusters: int | None = None, percent: float = 2.0, gamma_threshold: float | None = None, metric: str = "euclidean", random_state: int | None = None, ): self.n_clusters = n_clusters self.percent = percent self.gamma_threshold = gamma_threshold self.metric = metric self.random_state = random_state
[docs] def fit(self, X: ArrayLike, y=None) -> DensityPeaksClusterer: """Fit DPC.""" from sklearn.metrics import pairwise_distances X = check_array(X, dtype=np.float64) n = X.shape[0] # Compute pairwise distances dist = pairwise_distances(X, metric=self.metric) # Compute cutoff distance d_c tri_idx = np.triu_indices(n, k=1) all_dists = dist[tri_idx] d_c = np.percentile(all_dists, self.percent) d_c = max(d_c, 1e-10) # Compute local density rho (Gaussian kernel) rho = np.sum(np.exp(-(dist / d_c) ** 2), axis=1) - 1.0 self.rho_ = rho # Compute delta: distance to nearest point with higher density delta = np.full(n, np.inf) nearest_denser = np.full(n, -1, dtype=int) # Sort by density descending order = np.argsort(-rho) for i_rank in range(1, n): idx = order[i_rank] # Among all points with higher density, find nearest higher_mask = rho > rho[idx] if not higher_mask.any(): delta[idx] = dist[idx].max() continue higher_dists = dist[idx][higher_mask] nearest_pos = np.argmin(higher_dists) delta[idx] = higher_dists[nearest_pos] nearest_denser[idx] = np.where(higher_mask)[0][nearest_pos] # Point with highest density: delta = max distance top = order[0] delta[top] = dist[top].max() self.delta_ = delta # Select centres gamma = rho * delta if self.n_clusters is not None: # Pick top n_clusters by gamma center_indices = np.argsort(-gamma)[: self.n_clusters] else: # Auto-select via threshold if self.gamma_threshold is not None: center_indices = np.where(gamma > self.gamma_threshold)[0] else: # Otsu-like: threshold at mean + 2*std of gamma threshold = np.mean(gamma) + 2 * np.std(gamma) center_indices = np.where(gamma > threshold)[0] if len(center_indices) < 1: center_indices = np.array([order[0]]) self.centers_ = center_indices self.n_clusters_ = len(center_indices) # Assign labels: follow nearest denser neighbour chain to a centre labels = np.full(n, -1, dtype=int) centre_set = set(center_indices.tolist()) for i, idx in enumerate(center_indices): labels[idx] = i # Assign in order of decreasing density for idx in order: if labels[idx] >= 0: continue # Walk up the chain chain = [idx] current = idx while labels[current] < 0 and nearest_denser[current] >= 0: current = nearest_denser[current] chain.append(current) if labels[current] >= 0: for c in chain: labels[c] = labels[current] else: # Assign to nearest centre centre_dists = dist[idx][center_indices] labels[idx] = np.argmin(centre_dists) self.labels_ = labels self.n_features_in_ = X.shape[1] return self
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return cluster labels.""" self.fit(X) return self.labels_