"""AutoCluster: automatic clustering method selection based on data properties.
Selects the best clustering algorithm and parameters based on dataset
characteristics (n, d, expected k, noise detection needs).
"""
from __future__ import annotations
from typing import Any
import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils.validation import check_array
[docs]
class AutoCluster(BaseEstimator, ClusterMixin):
"""Automatic clustering with method selection based on data properties.
Selects the best clustering algorithm based on:
- Dataset size (n)
- Dimensionality (d)
- Whether k is specified
- Whether noise detection is needed
Parameters
----------
n_clusters : int or 'auto', default='auto'
Number of clusters. 'auto' uses algorithms that determine k
automatically (HDBSCAN, k*-Means, or GMM with BIC).
detect_noise : bool, default=False
Whether to detect noise/outlier points (label -1). If True,
prefers density-based methods (HDBSCAN, DBSCAN).
prefer : str or None, default=None
Override automatic selection: 'centroid', 'density',
'hierarchical', 'distribution', 'spectral'. If None, auto-selects.
random_state : int or None, default=None
Random seed.
verbose : bool, default=False
Enable verbose output.
**kwargs
Additional parameters passed to the selected clusterer.
Attributes
----------
labels_ : ndarray of shape (n_samples,)
Cluster labels.
selected_method_ : str
Name of the selected algorithm.
clusterer_ : BaseEstimator
The fitted clusterer instance.
n_clusters_ : int
Number of clusters found.
Examples
--------
>>> from endgame.clustering import AutoCluster
>>> ac = AutoCluster(n_clusters='auto', detect_noise=True)
>>> labels = ac.fit_predict(X)
>>> print(f"Selected: {ac.selected_method_}, k={ac.n_clusters_}")
"""
def __init__(
self,
n_clusters: int | str = "auto",
detect_noise: bool = False,
prefer: str | None = None,
random_state: int | None = None,
verbose: bool = False,
**kwargs,
):
self.n_clusters = n_clusters
self.detect_noise = detect_noise
self.prefer = prefer
self.random_state = random_state
self.verbose = verbose
self.kwargs = kwargs
def _log(self, msg: str):
if self.verbose:
print(f"[AutoCluster] {msg}")
def _select_method(self, n: int, d: int) -> tuple[str, dict]:
"""Select clustering method based on data characteristics.
Returns (method_name, params_dict).
"""
k = self.n_clusters
auto_k = (k == "auto")
if self.prefer is not None:
return self._select_from_preference(self.prefer, n, d, k, auto_k)
# Noise detection requested -> density-based
if self.detect_noise:
if auto_k:
return "hdbscan", {"min_cluster_size": max(15, n // 100)}
else:
return "dbscan", {"eps": 0.5, "min_samples": 5}
# Auto-k requested
if auto_k:
if n > 50000:
# Large dataset: use k*-means (fast, auto-k)
return "kstar_means", {"k_max": min(50, n // 100)}
elif n > 10000:
return "hdbscan", {"min_cluster_size": max(15, n // 100)}
else:
# Small-medium: k*-means for auto-k
return "kstar_means", {"k_max": min(30, n // 10)}
# k is specified
k = int(k)
# Large dataset
if n > 100000:
return "minibatch_kmeans", {"n_clusters": k}
# Small dataset with low d -> spectral can shine
if n < 5000 and d < 50:
return "kmeans", {"n_clusters": k}
# Default: K-Means
return "kmeans", {"n_clusters": k}
def _select_from_preference(
self, prefer: str, n: int, d: int, k: Any, auto_k: bool
) -> tuple[str, dict]:
"""Select based on user preference category."""
if prefer == "centroid":
if auto_k:
return "kstar_means", {}
return "kmeans", {"n_clusters": int(k)}
if prefer == "density":
if auto_k:
return "hdbscan", {"min_cluster_size": max(15, n // 100)}
return "dbscan", {"eps": 0.5, "min_samples": 5}
if prefer == "hierarchical":
if auto_k:
return "agglomerative", {"n_clusters": None, "distance_threshold": 1.0}
return "agglomerative", {"n_clusters": int(k)}
if prefer == "distribution":
if auto_k:
return "gmm_auto", {}
return "gmm", {"n_components": int(k)}
if prefer == "spectral":
k_val = max(2, int(k)) if not auto_k else 8
return "spectral", {"n_clusters": k_val}
raise ValueError(
f"Unknown preference '{prefer}'. "
"Expected: 'centroid', 'density', 'hierarchical', 'distribution', 'spectral'."
)
def _build_clusterer(self, method: str, params: dict) -> BaseEstimator:
"""Instantiate the selected clusterer."""
# Merge user kwargs (overrides auto-selected params)
merged = {**params}
merged.update(self.kwargs)
# Add random_state if the clusterer supports it
if "random_state" not in merged and self.random_state is not None:
merged["random_state"] = self.random_state
if method == "kmeans":
from endgame.clustering.centroid import KMeansClusterer
return KMeansClusterer(**merged)
if method == "minibatch_kmeans":
from endgame.clustering.centroid import MiniBatchKMeansClusterer
return MiniBatchKMeansClusterer(**merged)
if method == "kstar_means":
from endgame.clustering.centroid import KStarMeansClusterer
return KStarMeansClusterer(**merged)
if method == "dbscan":
merged.pop("random_state", None)
from endgame.clustering.density import DBSCANClusterer
return DBSCANClusterer(**merged)
if method == "hdbscan":
merged.pop("random_state", None)
from endgame.clustering.density import HDBSCANClusterer
return HDBSCANClusterer(**merged)
if method == "agglomerative":
merged.pop("random_state", None)
from endgame.clustering.hierarchical import AgglomerativeClusterer
return AgglomerativeClusterer(**merged)
if method == "gmm":
from endgame.clustering.distribution import GaussianMixtureClusterer
return GaussianMixtureClusterer(**merged)
if method == "gmm_auto":
# Will select n_components via BIC in fit()
from endgame.clustering.distribution import GaussianMixtureClusterer
return GaussianMixtureClusterer(**merged)
if method == "spectral":
from endgame.clustering.graph import SpectralClusterer
return SpectralClusterer(**merged)
raise ValueError(f"Unknown method: {method}")
[docs]
def fit(self, X: ArrayLike, y=None) -> AutoCluster:
"""Fit the auto-selected clusterer.
Parameters
----------
X : array-like of shape (n_samples, n_features)
y : ignored
Returns
-------
self
"""
X = check_array(X)
n, d = X.shape
method, params = self._select_method(n, d)
self.selected_method_ = method
self._log(f"Selected method: {method} (n={n}, d={d})")
# Special case: GMM with auto k selection
if method == "gmm_auto":
from endgame.clustering.distribution import GaussianMixtureClusterer
gmm = GaussianMixtureClusterer(
random_state=self.random_state, **self.kwargs
)
best_k = gmm.select_n_components(X)
self._log(f"GMM BIC selected k={best_k}")
self.clusterer_ = GaussianMixtureClusterer(
n_components=best_k,
random_state=self.random_state,
**self.kwargs,
)
else:
self.clusterer_ = self._build_clusterer(method, params)
self.clusterer_.fit(X)
self.labels_ = self.clusterer_.labels_
self.n_clusters_ = len(set(self.labels_)) - (1 if -1 in self.labels_ else 0)
self.n_features_in_ = d
return self
[docs]
def predict(self, X: ArrayLike) -> np.ndarray:
"""Predict cluster labels for new data (if supported).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Returns
-------
ndarray of shape (n_samples,)
"""
if not hasattr(self, "clusterer_"):
raise RuntimeError("AutoCluster has not been fitted.")
if hasattr(self.clusterer_, "predict"):
return self.clusterer_.predict(X)
raise NotImplementedError(
f"{self.selected_method_} does not support predict() on new data. "
"Use fit_predict() instead."
)
[docs]
def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray:
"""Fit and return cluster labels."""
self.fit(X)
return self.labels_