Source code for endgame.anomaly.lof

"""Local Outlier Factor detector with competition-tuned defaults.

LOF measures local density deviation to identify samples that are substantially
less dense than their neighbors - making it effective for detecting local anomalies
that exist in low-density regions relative to their k-nearest neighbors.
"""

from __future__ import annotations

import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, OutlierMixin
from sklearn.neighbors import LocalOutlierFactor
from sklearn.utils.validation import check_array, check_is_fitted


[docs] class LocalOutlierFactorDetector(BaseEstimator, OutlierMixin): """Local Outlier Factor with competition-tuned defaults. LOF compares the local density of a point with that of its neighbors. Points with substantially lower density are considered outliers. Effective for detecting local anomalies in non-uniform distributions. Parameters ---------- n_neighbors : int, default=20 Number of neighbors for density estimation. Higher values make the detector more robust but may miss small local anomalies. contamination : float or 'auto', default='auto' Expected proportion of anomalies. Used for threshold setting. algorithm : {'auto', 'ball_tree', 'kd_tree', 'brute'}, default='auto' Algorithm for nearest neighbor queries. leaf_size : int, default=30 Leaf size for tree algorithms. metric : str or callable, default='minkowski' Distance metric for neighbor queries. p : int, default=2 Power parameter for Minkowski metric (2 = Euclidean). novelty : bool, default=True Whether to use LOF for novelty detection (scoring new samples). True enables predict() and decision_function() on unseen data. n_jobs : int, default=-1 Parallel jobs for neighbor queries. -1 uses all cores. Attributes ---------- model_ : LocalOutlierFactor Fitted sklearn LOF instance. threshold_ : float Decision threshold for binary classification. Examples -------- >>> from endgame.anomaly import LocalOutlierFactorDetector >>> detector = LocalOutlierFactorDetector(contamination=0.1) >>> detector.fit(X_train) >>> scores = detector.decision_function(X_test) # Higher = more anomalous >>> labels = detector.predict(X_test) # 1 = anomaly, 0 = normal """ def __init__( self, n_neighbors: int = 20, contamination: float | str = "auto", algorithm: str = "auto", leaf_size: int = 30, metric: str = "minkowski", p: int = 2, novelty: bool = True, n_jobs: int = -1, ): self.n_neighbors = n_neighbors self.contamination = contamination self.algorithm = algorithm self.leaf_size = leaf_size self.metric = metric self.p = p self.novelty = novelty self.n_jobs = n_jobs
[docs] def fit(self, X: ArrayLike, y=None) -> LocalOutlierFactorDetector: """Fit the LOF model on training data. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data (assumed to be mostly normal). y : ignored Not used, present for API consistency. Returns ------- self : LocalOutlierFactorDetector Fitted detector. """ X = check_array(X, accept_sparse=False) self.model_ = LocalOutlierFactor( n_neighbors=self.n_neighbors, contamination=self.contamination, algorithm=self.algorithm, leaf_size=self.leaf_size, metric=self.metric, p=self.p, novelty=self.novelty, n_jobs=self.n_jobs, ) self.model_.fit(X) self.n_features_in_ = X.shape[1] self.threshold_ = self.model_.offset_ return self
[docs] def decision_function(self, X: ArrayLike) -> np.ndarray: """Compute anomaly scores for samples. Higher scores indicate more anomalous samples. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to score. Returns ------- scores : ndarray of shape (n_samples,) Anomaly scores. Higher = more anomalous. """ check_is_fitted(self, ["model_"]) X = check_array(X, accept_sparse=False) # Negate sklearn's scores so higher = more anomalous return -self.model_.decision_function(X)
[docs] def predict(self, X: ArrayLike) -> np.ndarray: """Predict anomaly labels. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to classify. Returns ------- labels : ndarray of shape (n_samples,) 1 for anomalies, 0 for normal samples. """ check_is_fitted(self, ["model_"]) X = check_array(X, accept_sparse=False) # Convert sklearn's {-1, 1} to {1, 0} sklearn_labels = self.model_.predict(X) return (sklearn_labels == -1).astype(int)
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and predict anomaly labels on training data. Note: For LOF, this uses the transductive scores computed during fit, not the inductive scores from predict(). Parameters ---------- X : array-like of shape (n_samples, n_features) Training samples. y : ignored Not used, present for API consistency. Returns ------- labels : ndarray of shape (n_samples,) 1 for anomalies, 0 for normal samples. """ X = check_array(X, accept_sparse=False) # Use non-novelty LOF for fit_predict (transductive) lof_transductive = LocalOutlierFactor( n_neighbors=self.n_neighbors, contamination=self.contamination, algorithm=self.algorithm, leaf_size=self.leaf_size, metric=self.metric, p=self.p, novelty=False, n_jobs=self.n_jobs, ) sklearn_labels = lof_transductive.fit_predict(X) # Also fit the novelty model for future predictions self.model_ = LocalOutlierFactor( n_neighbors=self.n_neighbors, contamination=self.contamination, algorithm=self.algorithm, leaf_size=self.leaf_size, metric=self.metric, p=self.p, novelty=True, n_jobs=self.n_jobs, ) self.model_.fit(X) self.n_features_in_ = X.shape[1] self.threshold_ = self.model_.offset_ return (sklearn_labels == -1).astype(int)
[docs] def score_samples(self, X: ArrayLike) -> np.ndarray: """Return negative LOF scores (sklearn convention). Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to score. Returns ------- scores : ndarray of shape (n_samples,) Negative LOF scores (higher = more normal). """ check_is_fitted(self, ["model_"]) X = check_array(X, accept_sparse=False) return self.model_.score_samples(X)