"""Isolation Forest detectors with competition-tuned defaults.
This module provides sklearn-compatible Isolation Forest implementations:
- IsolationForestDetector: Standard Isolation Forest with optimized defaults
- ExtendedIsolationForest: Extended IF using random hyperplanes (better for clustered anomalies)
"""
from __future__ import annotations
import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, OutlierMixin
from sklearn.ensemble import IsolationForest
from sklearn.utils.validation import check_array, check_is_fitted
[docs]
class IsolationForestDetector(BaseEstimator, OutlierMixin):
"""Isolation Forest with competition-tuned defaults.
This wrapper provides sensible defaults optimized for competition performance:
- Higher n_estimators (200 vs sklearn's 100)
- Bootstrap sampling enabled
- max_features tuned for high-dimensional data
- Consistent scoring convention (higher = more anomalous)
Parameters
----------
n_estimators : int, default=200
Number of isolation trees. More trees = more stable anomaly scores.
contamination : float or 'auto', default='auto'
Expected proportion of anomalies. 'auto' uses heuristic based on
training data distribution.
max_samples : int or float or 'auto', default='auto'
Number of samples to draw for each tree.
- 'auto': min(256, n_samples)
- int: exact number of samples
- float: fraction of samples
max_features : float or int, default=1.0
Features to draw for each tree.
- float: fraction of features
- int: exact number of features
bootstrap : bool, default=True
Whether to bootstrap samples. True improves diversity.
n_jobs : int, default=-1
Parallel jobs for fitting trees. -1 uses all cores.
random_state : int or None, default=None
Random seed for reproducibility.
warm_start : bool, default=False
Reuse trees from previous fit and add more.
Attributes
----------
model_ : IsolationForest
Fitted sklearn IsolationForest instance.
threshold_ : float
Decision threshold for binary anomaly classification.
Examples
--------
>>> from endgame.anomaly import IsolationForestDetector
>>> detector = IsolationForestDetector(contamination=0.1)
>>> detector.fit(X_train)
>>> scores = detector.decision_function(X_test) # Higher = more anomalous
>>> labels = detector.predict(X_test) # 1 = anomaly, 0 = normal
"""
def __init__(
self,
n_estimators: int = 200,
contamination: float | str = "auto",
max_samples: int | float | str = "auto",
max_features: float | int = 1.0,
bootstrap: bool = True,
n_jobs: int = -1,
random_state: int | None = None,
warm_start: bool = False,
):
self.n_estimators = n_estimators
self.contamination = contamination
self.max_samples = max_samples
self.max_features = max_features
self.bootstrap = bootstrap
self.n_jobs = n_jobs
self.random_state = random_state
self.warm_start = warm_start
[docs]
def fit(self, X: ArrayLike, y=None) -> IsolationForestDetector:
"""Fit the Isolation Forest on training data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Not used, present for API consistency.
Returns
-------
self : IsolationForestDetector
Fitted detector.
"""
X = check_array(X, accept_sparse=False)
self.model_ = IsolationForest(
n_estimators=self.n_estimators,
contamination=self.contamination,
max_samples=self.max_samples,
max_features=self.max_features,
bootstrap=self.bootstrap,
n_jobs=self.n_jobs,
random_state=self.random_state,
warm_start=self.warm_start,
)
self.model_.fit(X)
# Store threshold for predict
self.threshold_ = self.model_.offset_
self.n_features_in_ = X.shape[1]
return self
[docs]
def decision_function(self, X: ArrayLike) -> np.ndarray:
"""Compute anomaly scores for samples.
Higher scores indicate more anomalous samples (opposite of sklearn convention).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to score.
Returns
-------
scores : ndarray of shape (n_samples,)
Anomaly scores. Higher = more anomalous.
"""
check_is_fitted(self, ["model_"])
X = check_array(X, accept_sparse=False)
# Negate sklearn's scores so higher = more anomalous
return -self.model_.decision_function(X)
[docs]
def predict(self, X: ArrayLike) -> np.ndarray:
"""Predict anomaly labels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to classify.
Returns
-------
labels : ndarray of shape (n_samples,)
1 for anomalies, 0 for normal samples.
"""
check_is_fitted(self, ["model_"])
X = check_array(X, accept_sparse=False)
# Convert sklearn's {-1, 1} to {1, 0}
sklearn_labels = self.model_.predict(X)
return (sklearn_labels == -1).astype(int)
[docs]
def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray:
"""Fit and predict anomaly labels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training samples.
y : ignored
Not used, present for API consistency.
Returns
-------
labels : ndarray of shape (n_samples,)
1 for anomalies, 0 for normal samples.
"""
self.fit(X)
return self.predict(X)
[docs]
def score_samples(self, X: ArrayLike) -> np.ndarray:
"""Return raw anomaly scores (average path length).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to score.
Returns
-------
scores : ndarray of shape (n_samples,)
Average path lengths (lower = more anomalous).
"""
check_is_fitted(self, ["model_"])
X = check_array(X, accept_sparse=False)
return self.model_.score_samples(X)
class ExtendedIsolationForest(BaseEstimator, OutlierMixin):
"""Extended Isolation Forest using random hyperplanes.
Standard Isolation Forest uses axis-parallel splits, which can miss
anomalies in clustered or elongated distributions. Extended IF uses
random hyperplane splits, providing better detection for:
- Clustered anomalies
- Anomalies along linear subspaces
- High-dimensional data with correlations
Parameters
----------
n_estimators : int, default=200
Number of isolation trees.
contamination : float, default=0.1
Expected proportion of anomalies for threshold setting.
max_samples : int or float or 'auto', default='auto'
Number of samples to draw for each tree.
extension_level : int or None, default=None
Dimensionality of random hyperplanes.
- None: full dimensionality (n_features - 1)
- int: specific dimensionality (0 = standard IF)
random_state : int or None, default=None
Random seed for reproducibility.
n_jobs : int, default=-1
Parallel jobs for evaluation. -1 uses all cores.
Attributes
----------
trees_ : list
Fitted extended isolation trees.
threshold_ : float
Decision threshold for binary classification.
References
----------
Hariri, S., Kind, M. C., & Brunner, R. J. (2019).
Extended Isolation Forest. IEEE TKDE.
Examples
--------
>>> from endgame.anomaly import ExtendedIsolationForest
>>> detector = ExtendedIsolationForest(contamination=0.1)
>>> detector.fit(X_train)
>>> scores = detector.decision_function(X_test)
"""
def __init__(
self,
n_estimators: int = 200,
contamination: float = 0.1,
max_samples: int | float | str = "auto",
extension_level: int | None = None,
random_state: int | None = None,
n_jobs: int = -1,
):
self.n_estimators = n_estimators
self.contamination = contamination
self.max_samples = max_samples
self.extension_level = extension_level
self.random_state = random_state
self.n_jobs = n_jobs
def fit(self, X: ArrayLike, y=None) -> ExtendedIsolationForest:
"""Fit Extended Isolation Forest.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Not used, present for API consistency.
Returns
-------
self : ExtendedIsolationForest
Fitted detector.
"""
X = check_array(X, accept_sparse=False, dtype=np.float64)
n_samples, n_features = X.shape
self.n_features_in_ = n_features
# Determine sample size
if self.max_samples == "auto":
max_samples = min(256, n_samples)
elif isinstance(self.max_samples, float):
max_samples = int(self.max_samples * n_samples)
else:
max_samples = self.max_samples
# Determine extension level
if self.extension_level is None:
self.extension_level_ = n_features - 1
else:
self.extension_level_ = min(self.extension_level, n_features - 1)
# Height limit
self.height_limit_ = int(np.ceil(np.log2(max_samples)))
rng = np.random.default_rng(self.random_state)
# Build trees
self.trees_ = []
for i in range(self.n_estimators):
# Sample data
indices = rng.choice(n_samples, size=max_samples, replace=False)
X_sample = X[indices]
# Build tree
tree = self._build_tree(X_sample, 0, rng)
self.trees_.append(tree)
# Compute threshold from training scores
train_scores = self.decision_function(X)
self.threshold_ = np.percentile(train_scores, 100 * (1 - self.contamination))
return self
def _build_tree(
self,
X: np.ndarray,
depth: int,
rng: np.random.Generator
) -> dict:
"""Recursively build an extended isolation tree."""
n_samples, n_features = X.shape
# Termination conditions
if depth >= self.height_limit_ or n_samples <= 1:
return {"type": "leaf", "size": n_samples}
# Random hyperplane: n = random normal vector, p = random intercept
# For extension_level = 0, this reduces to standard IF (single coordinate)
if self.extension_level_ == 0:
# Standard axis-parallel split
split_dim = rng.integers(n_features)
n = np.zeros(n_features)
n[split_dim] = 1.0
else:
# Random hyperplane in extension_level + 1 dimensions
dims = rng.choice(n_features, size=self.extension_level_ + 1, replace=False)
n = np.zeros(n_features)
n[dims] = rng.standard_normal(len(dims))
n = n / (np.linalg.norm(n) + 1e-10)
# Project data onto normal
projections = X @ n
# Random split point between min and max
p_min, p_max = projections.min(), projections.max()
if p_min == p_max:
return {"type": "leaf", "size": n_samples}
p = rng.uniform(p_min, p_max)
# Split
left_mask = projections < p
right_mask = ~left_mask
if left_mask.sum() == 0 or right_mask.sum() == 0:
return {"type": "leaf", "size": n_samples}
return {
"type": "node",
"n": n,
"p": p,
"left": self._build_tree(X[left_mask], depth + 1, rng),
"right": self._build_tree(X[right_mask], depth + 1, rng),
}
def _path_length(self, x: np.ndarray, tree: dict, depth: int = 0) -> float:
"""Compute path length for a single sample."""
if tree["type"] == "leaf":
# Add expected path length for remaining elements
n = tree["size"]
if n <= 1:
return depth
return depth + self._c(n)
projection = np.dot(x, tree["n"])
if projection < tree["p"]:
return self._path_length(x, tree["left"], depth + 1)
else:
return self._path_length(x, tree["right"], depth + 1)
@staticmethod
def _c(n: int) -> float:
"""Average path length of unsuccessful search in BST."""
if n <= 1:
return 0
return 2.0 * (np.log(n - 1) + 0.5772156649) - 2.0 * (n - 1) / n
def decision_function(self, X: ArrayLike) -> np.ndarray:
"""Compute anomaly scores.
Higher scores indicate more anomalous samples.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to score.
Returns
-------
scores : ndarray of shape (n_samples,)
Anomaly scores. Higher = more anomalous.
"""
check_is_fitted(self, ["trees_"])
X = check_array(X, accept_sparse=False, dtype=np.float64)
n_samples = X.shape[0]
# Compute average path length for each sample
path_lengths = np.zeros(n_samples)
for tree in self.trees_:
for i in range(n_samples):
path_lengths[i] += self._path_length(X[i], tree)
path_lengths /= self.n_estimators
# Convert to anomaly score (shorter paths = higher score)
# Score = 2^(-E[h(x)] / c(n))
c_n = self._c(256) # Use 256 as reference
scores = np.power(2, -path_lengths / c_n)
return scores
def predict(self, X: ArrayLike) -> np.ndarray:
"""Predict anomaly labels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to classify.
Returns
-------
labels : ndarray of shape (n_samples,)
1 for anomalies, 0 for normal samples.
"""
check_is_fitted(self, ["trees_", "threshold_"])
scores = self.decision_function(X)
return (scores >= self.threshold_).astype(int)
def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray:
"""Fit and predict anomaly labels."""
self.fit(X)
return self.predict(X)