Source code for endgame.anomaly.pyod_wrapper

"""PyOD integration wrapper for unified anomaly detection.

This module provides a universal wrapper around PyOD's 40+ anomaly detection
algorithms with a consistent sklearn-compatible interface. Supports algorithm
selection by name and automatic hyperparameter configuration.
"""

from __future__ import annotations

import warnings
from typing import Any

import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, OutlierMixin
from sklearn.utils.validation import check_array, check_is_fitted

# Map algorithm names to PyOD classes
PYOD_ALGORITHMS = {
    # Probabilistic
    "ECOD": ("pyod.models.ecod", "ECOD"),
    "COPOD": ("pyod.models.copod", "COPOD"),
    "ABOD": ("pyod.models.abod", "ABOD"),
    "FastABOD": ("pyod.models.abod", "ABOD"),  # with method='fast'

    # Linear Models
    "PCA": ("pyod.models.pca", "PCA"),
    "MCD": ("pyod.models.mcd", "MCD"),
    "OCSVM": ("pyod.models.ocsvm", "OCSVM"),
    "LMDD": ("pyod.models.lmdd", "LMDD"),

    # Proximity-Based
    "LOF": ("pyod.models.lof", "LOF"),
    "COF": ("pyod.models.cof", "COF"),
    "CBLOF": ("pyod.models.cblof", "CBLOF"),
    "LOCI": ("pyod.models.loci", "LOCI"),
    "HBOS": ("pyod.models.hbos", "HBOS"),
    "KNN": ("pyod.models.knn", "KNN"),
    "AvgKNN": ("pyod.models.knn", "KNN"),  # with method='mean'
    "MedKNN": ("pyod.models.knn", "KNN"),  # with method='median'
    "SOD": ("pyod.models.sod", "SOD"),
    "ROD": ("pyod.models.rod", "ROD"),

    # Outlier Ensembles
    "IForest": ("pyod.models.iforest", "IForest"),
    "INNE": ("pyod.models.inne", "INNE"),
    "FB": ("pyod.models.feature_bagging", "FeatureBagging"),
    "LSCP": ("pyod.models.lscp", "LSCP"),
    "XGBOD": ("pyod.models.xgbod", "XGBOD"),
    "LODA": ("pyod.models.loda", "LODA"),
    "SUOD": ("pyod.models.suod", "SUOD"),

    # Neural Networks
    "AutoEncoder": ("pyod.models.auto_encoder", "AutoEncoder"),
    "VAE": ("pyod.models.vae", "VAE"),
    "SO_GAAL": ("pyod.models.so_gaal", "SO_GAAL"),
    "MO_GAAL": ("pyod.models.mo_gaal", "MO_GAAL"),
    "DeepSVDD": ("pyod.models.deep_svdd", "DeepSVDD"),
    "AnoGAN": ("pyod.models.anogan", "AnoGAN"),
    "ALAD": ("pyod.models.alad", "ALAD"),

    # Graph-Based
    "LUNAR": ("pyod.models.lunar", "LUNAR"),

    # Statistical
    "MAD": ("pyod.models.mad", "MAD"),
    "SOS": ("pyod.models.sos", "SOS"),
    "QMCD": ("pyod.models.qmcd", "QMCD"),
    "KDE": ("pyod.models.kde", "KDE"),
    "Sampling": ("pyod.models.sampling", "Sampling"),
    "GMM": ("pyod.models.gmm", "GMM"),
}


# Default hyperparameters for common algorithms
_DEFAULT_PARAMS = {
    "ECOD": {},  # parameter-free
    "COPOD": {},  # parameter-free
    "IForest": {"n_estimators": 200, "max_samples": "auto"},
    "LOF": {"n_neighbors": 20},
    "KNN": {"n_neighbors": 10},
    "HBOS": {"n_bins": 20},
    "PCA": {"n_components": None},  # auto
    "OCSVM": {"kernel": "rbf", "nu": 0.5},
    "CBLOF": {"n_clusters": 8, "alpha": 0.9, "beta": 5},
    "AutoEncoder": {"hidden_neurons": [64, 32, 32, 64], "epochs": 100},
    "VAE": {"encoder_neurons": [64, 32], "decoder_neurons": [32, 64], "epochs": 100},
}


def _import_pyod_class(algorithm: str):
    """Dynamically import a PyOD algorithm class."""
    if algorithm not in PYOD_ALGORITHMS:
        raise ValueError(
            f"Unknown algorithm: {algorithm}. "
            f"Available: {list(PYOD_ALGORITHMS.keys())}"
        )

    module_path, class_name = PYOD_ALGORITHMS[algorithm]

    try:
        import importlib
        module = importlib.import_module(module_path)
        return getattr(module, class_name)
    except ImportError as e:
        raise ImportError(
            f"Could not import {algorithm}. "
            f"Make sure PyOD is installed: pip install pyod\n"
            f"Original error: {e}"
        )


[docs] class PyODDetector(BaseEstimator, OutlierMixin): """Universal wrapper for PyOD anomaly detection algorithms. This wrapper provides a unified sklearn-compatible interface to all PyOD algorithms, with consistent scoring conventions and automatic hyperparameter defaults. Parameters ---------- algorithm : str, default='ECOD' Name of the PyOD algorithm. See PYOD_ALGORITHMS for available options. Popular choices: - 'ECOD': Empirical Cumulative Distribution (fast, parameter-free) - 'COPOD': Copula-Based (fast, parameter-free) - 'IForest': Isolation Forest - 'LOF': Local Outlier Factor - 'KNN': K-Nearest Neighbors - 'HBOS': Histogram-Based (very fast) - 'PCA': Principal Component Analysis - 'AutoEncoder': Deep learning autoencoder contamination : float, default=0.1 Expected proportion of anomalies. random_state : int or None, default=None Random seed for reproducibility. **kwargs : dict Additional algorithm-specific parameters passed to the PyOD model. Attributes ---------- model_ : PyOD model Fitted PyOD detector instance. threshold_ : float Decision threshold for binary classification. Examples -------- >>> from endgame.anomaly import PyODDetector, PYOD_ALGORITHMS >>> >>> # List available algorithms >>> print(list(PYOD_ALGORITHMS.keys())) >>> >>> # Fast parameter-free detection >>> detector = PyODDetector(algorithm='ECOD') >>> detector.fit(X_train) >>> scores = detector.decision_function(X_test) >>> >>> # KNN-based detection >>> detector = PyODDetector(algorithm='KNN', n_neighbors=15) >>> detector.fit(X_train) >>> labels = detector.predict(X_test) >>> >>> # Deep learning detector >>> detector = PyODDetector( ... algorithm='AutoEncoder', ... hidden_neurons=[128, 64, 64, 128], ... epochs=50 ... ) >>> detector.fit(X_train) """ def __init__( self, algorithm: str = "ECOD", contamination: float = 0.1, random_state: int | None = None, **kwargs: Any, ): self.algorithm = algorithm self.contamination = contamination self.random_state = random_state self.kwargs = kwargs def _get_model_params(self) -> dict: """Get parameters for the underlying PyOD model.""" # Start with algorithm defaults params = _DEFAULT_PARAMS.get(self.algorithm, {}).copy() # Override with user kwargs params.update(self.kwargs) # Always set contamination params["contamination"] = self.contamination # Set random state if applicable if self.random_state is not None: # Different algorithms use different parameter names if self.algorithm in ["IForest", "LODA", "FB"]: params["random_state"] = self.random_state # Handle special algorithm variants if self.algorithm == "FastABOD": params["method"] = "fast" elif self.algorithm == "AvgKNN": params["method"] = "mean" elif self.algorithm == "MedKNN": params["method"] = "median" return params
[docs] def fit(self, X: ArrayLike, y=None) -> PyODDetector: """Fit the PyOD detector on training data. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data. y : ignored Not used, present for API consistency. Returns ------- self : PyODDetector Fitted detector. """ X = check_array(X, accept_sparse=False, dtype=np.float64) # Import and instantiate the PyOD model model_class = _import_pyod_class(self.algorithm) params = self._get_model_params() # Filter params to only those accepted by the model import inspect valid_params = inspect.signature(model_class.__init__).parameters.keys() filtered_params = {k: v for k, v in params.items() if k in valid_params} with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning) self.model_ = model_class(**filtered_params) self.model_.fit(X) self.n_features_in_ = X.shape[1] self.threshold_ = self.model_.threshold_ return self
[docs] def decision_function(self, X: ArrayLike) -> np.ndarray: """Compute anomaly scores for samples. Higher scores indicate more anomalous samples. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to score. Returns ------- scores : ndarray of shape (n_samples,) Anomaly scores. Higher = more anomalous. """ check_is_fitted(self, ["model_"]) X = check_array(X, accept_sparse=False, dtype=np.float64) # PyOD already uses higher = more anomalous return self.model_.decision_function(X)
[docs] def predict(self, X: ArrayLike) -> np.ndarray: """Predict anomaly labels. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to classify. Returns ------- labels : ndarray of shape (n_samples,) 1 for anomalies, 0 for normal samples. """ check_is_fitted(self, ["model_"]) X = check_array(X, accept_sparse=False, dtype=np.float64) # PyOD uses 1 for anomaly, 0 for normal (our convention) return self.model_.predict(X)
[docs] def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray: """Fit and predict anomaly labels.""" self.fit(X) return self.predict(X)
[docs] def predict_proba(self, X: ArrayLike) -> np.ndarray: """Predict anomaly probabilities. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to classify. Returns ------- proba : ndarray of shape (n_samples, 2) Probabilities for [normal, anomaly] classes. """ check_is_fitted(self, ["model_"]) X = check_array(X, accept_sparse=False, dtype=np.float64) if hasattr(self.model_, "predict_proba"): return self.model_.predict_proba(X) else: # Fallback: convert scores to pseudo-probabilities via sigmoid scores = self.decision_function(X) # Normalize scores scores_normalized = (scores - scores.min()) / (scores.max() - scores.min() + 1e-10) return np.column_stack([1 - scores_normalized, scores_normalized])
[docs] def predict_confidence(self, X: ArrayLike) -> np.ndarray: """Return prediction confidence scores. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to score. Returns ------- confidence : ndarray of shape (n_samples,) Confidence scores (higher = more confident prediction). """ check_is_fitted(self, ["model_"]) X = check_array(X, accept_sparse=False, dtype=np.float64) if hasattr(self.model_, "predict_confidence"): return self.model_.predict_confidence(X) else: # Fallback: use distance from threshold scores = self.decision_function(X) return np.abs(scores - self.threshold_)
@property def available_algorithms(self) -> list[str]: """List of available PyOD algorithms.""" return list(PYOD_ALGORITHMS.keys())
[docs] def create_detector_ensemble( algorithms: list[str] | None = None, contamination: float = 0.1, random_state: int | None = None, ) -> list[PyODDetector]: """Create an ensemble of diverse PyOD detectors. Parameters ---------- algorithms : list of str or None, default=None Algorithms to include. None uses a default diverse set: ['ECOD', 'COPOD', 'IForest', 'LOF', 'KNN', 'HBOS'] contamination : float, default=0.1 Expected proportion of anomalies. random_state : int or None, default=None Random seed for reproducibility. Returns ------- detectors : list of PyODDetector List of configured detectors ready for fitting. Examples -------- >>> from endgame.anomaly import create_detector_ensemble >>> detectors = create_detector_ensemble(contamination=0.05) >>> for det in detectors: ... det.fit(X_train) >>> # Combine scores >>> scores = np.mean([d.decision_function(X_test) for d in detectors], axis=0) """ if algorithms is None: algorithms = ["ECOD", "COPOD", "IForest", "LOF", "KNN", "HBOS"] return [ PyODDetector( algorithm=algo, contamination=contamination, random_state=random_state, ) for algo in algorithms ]