Source code for endgame.clustering.distribution

"""Distribution-based clustering: GMM and Fuzzy C-Means.

GaussianMixtureClusterer wraps sklearn's GaussianMixture.
FuzzyCMeansClusterer is implemented from scratch.
"""

from __future__ import annotations

import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils.validation import check_array, check_is_fitted


[docs] class GaussianMixtureClusterer(BaseEstimator, ClusterMixin): """Gaussian Mixture Model clustering. Fits k Gaussians via EM. The probabilistic analog of K-Means — gives soft assignments and handles elliptical clusters. Supports BIC/AIC for model selection. Parameters ---------- n_components : int, default=8 Number of mixture components. covariance_type : str, default='full' Covariance type: 'full', 'tied', 'diag', 'spherical'. n_init : int, default=5 Number of EM initializations. max_iter : int, default=200 Maximum EM iterations. tol : float, default=1e-3 Convergence tolerance. reg_covar : float, default=1e-6 Covariance regularization. init_params : str, default='k-means++' Initialization: 'kmeans', 'k-means++', 'random', 'random_from_data'. random_state : int or None, default=None Random seed. Attributes ---------- labels_ : ndarray of shape (n_samples,) Hard cluster assignments (argmax of responsibilities). probabilities_ : ndarray of shape (n_samples, n_components) Soft assignment probabilities (responsibilities). means_ : ndarray of shape (n_components, n_features) Component means. covariances_ : ndarray Component covariances. weights_ : ndarray of shape (n_components,) Mixing weights. bic_ : float Bayesian Information Criterion of the fitted model. aic_ : float Akaike Information Criterion of the fitted model. """ def __init__( self, n_components: int = 8, covariance_type: str = "full", n_init: int = 5, max_iter: int = 200, tol: float = 1e-3, reg_covar: float = 1e-6, init_params: str = "k-means++", random_state: int | None = None, ): self.n_components = n_components self.covariance_type = covariance_type self.n_init = n_init self.max_iter = max_iter self.tol = tol self.reg_covar = reg_covar self.init_params = init_params self.random_state = random_state
[docs] def fit(self, X: ArrayLike, y=None) -> GaussianMixtureClusterer: """Fit GMM.""" from sklearn.mixture import GaussianMixture X = check_array(X) self.model_ = GaussianMixture( n_components=self.n_components, covariance_type=self.covariance_type, n_init=self.n_init, max_iter=self.max_iter, tol=self.tol, reg_covar=self.reg_covar, init_params=self.init_params, random_state=self.random_state, ) self.model_.fit(X) self.probabilities_ = self.model_.predict_proba(X) self.labels_ = self.probabilities_.argmax(axis=1) self.means_ = self.model_.means_ self.covariances_ = self.model_.covariances_ self.weights_ = self.model_.weights_ self.bic_ = self.model_.bic(X) self.aic_ = self.model_.aic(X) self.n_features_in_ = X.shape[1] return self
[docs] def predict(self, X: ArrayLike) -> np.ndarray: """Predict hard cluster labels.""" check_is_fitted(self, ["model_"]) X = check_array(X) return self.model_.predict(X)
[docs] def predict_proba(self, X: ArrayLike) -> np.ndarray: """Predict soft cluster probabilities.""" check_is_fitted(self, ["model_"]) X = check_array(X) return self.model_.predict_proba(X)
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return hard cluster labels.""" self.fit(X) return self.labels_
[docs] def score(self, X: ArrayLike) -> float: """Return average log-likelihood.""" check_is_fitted(self, ["model_"]) X = check_array(X) return self.model_.score(X)
[docs] def select_n_components( self, X: ArrayLike, k_range: range | None = None, criterion: str = "bic", ) -> int: """Select optimal n_components via BIC or AIC. Parameters ---------- X : array-like Data to evaluate. k_range : range or None, default=None Range of k values. Defaults to range(1, 21). criterion : str, default='bic' Selection criterion: 'bic' or 'aic'. Returns ------- int Optimal number of components. """ from sklearn.mixture import GaussianMixture X = check_array(X) if k_range is None: k_range = range(1, min(21, X.shape[0])) scores = {} for k in k_range: gmm = GaussianMixture( n_components=k, covariance_type=self.covariance_type, n_init=self.n_init, max_iter=self.max_iter, random_state=self.random_state, ) gmm.fit(X) scores[k] = gmm.bic(X) if criterion == "bic" else gmm.aic(X) return min(scores, key=scores.get)
[docs] class FuzzyCMeansClusterer(BaseEstimator, ClusterMixin): """Fuzzy C-Means clustering. Soft version of K-Means where each point has a degree of membership in each cluster. Useful when clusters genuinely overlap. Parameters ---------- n_clusters : int, default=8 Number of clusters. m : float, default=2.0 Fuzziness coefficient (m > 1). Higher values = softer assignments. m = 1 approaches hard K-Means; m >> 1 approaches uniform membership. max_iter : int, default=300 Maximum iterations. tol : float, default=1e-4 Convergence tolerance on membership matrix change. random_state : int or None, default=None Random seed. Attributes ---------- labels_ : ndarray of shape (n_samples,) Hard cluster labels (argmax of membership). membership_ : ndarray of shape (n_samples, n_clusters) Fuzzy membership matrix. cluster_centers_ : ndarray of shape (n_clusters, n_features) Cluster centroids. n_iter_ : int Number of iterations run. """ def __init__( self, n_clusters: int = 8, m: float = 2.0, max_iter: int = 300, tol: float = 1e-4, random_state: int | None = None, ): self.n_clusters = n_clusters self.m = m self.max_iter = max_iter self.tol = tol self.random_state = random_state
[docs] def fit(self, X: ArrayLike, y=None) -> FuzzyCMeansClusterer: """Fit Fuzzy C-Means.""" X = check_array(X, dtype=np.float64) n, d = X.shape k = self.n_clusters rng = np.random.RandomState(self.random_state) # Initialize membership matrix randomly U = rng.dirichlet(np.ones(k), size=n) # (n, k) for iteration in range(self.max_iter): # Compute centres: c_j = sum(u_ij^m * x_i) / sum(u_ij^m) Um = U ** self.m # (n, k) centers = (Um.T @ X) / Um.sum(axis=0)[:, None] # (k, d) # Compute distances: d_ij = ||x_i - c_j|| dists = np.zeros((n, k)) for j in range(k): dists[:, j] = np.linalg.norm(X - centers[j], axis=1) # Avoid division by zero dists = np.maximum(dists, 1e-10) # Update membership power = 2.0 / (self.m - 1) U_new = np.zeros((n, k)) for j in range(k): # u_ij = 1 / sum_l (d_ij/d_il)^(2/(m-1)) ratios = (dists[:, j:j+1] / dists) ** power # (n, k) U_new[:, j] = 1.0 / ratios.sum(axis=1) # Check convergence change = np.max(np.abs(U_new - U)) U = U_new if change < self.tol: break self.membership_ = U self.labels_ = U.argmax(axis=1) self.cluster_centers_ = centers self.n_iter_ = iteration + 1 self.n_features_in_ = d return self
[docs] def predict(self, X: ArrayLike) -> np.ndarray: """Predict hard cluster labels for new data.""" return self.predict_memberships(X).argmax(axis=1)
[docs] def predict_memberships(self, X: ArrayLike) -> np.ndarray: """Predict fuzzy membership for new data. Parameters ---------- X : array-like of shape (n_samples, n_features) Returns ------- ndarray of shape (n_samples, n_clusters) Membership matrix. """ check_is_fitted(self, ["cluster_centers_"]) X = check_array(X, dtype=np.float64) n = X.shape[0] k = self.n_clusters dists = np.zeros((n, k)) for j in range(k): dists[:, j] = np.linalg.norm(X - self.cluster_centers_[j], axis=1) dists = np.maximum(dists, 1e-10) power = 2.0 / (self.m - 1) U = np.zeros((n, k)) for j in range(k): ratios = (dists[:, j:j+1] / dists) ** power U[:, j] = 1.0 / ratios.sum(axis=1) return U
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and return hard cluster labels.""" self.fit(X) return self.labels_