"""GritBot: Rule-based anomaly detection via recursive partitioning.
GritBot finds anomalies by partitioning data into homogeneous subsets
and identifying values that are surprising given the subset context.
This is a Python reimplementation of Quinlan's GritBot algorithm with
numba JIT compilation for performance.
Reference:
Quinlan, J.R. (2010). GritBot GPL Edition. Rulequest Research.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, OutlierMixin
from sklearn.utils.validation import check_array, check_is_fitted
try:
from numba import jit, prange
HAS_NUMBA = True
except ImportError:
HAS_NUMBA = False
def jit(*args, **kwargs):
def decorator(func):
return func
return decorator
prange = range
# GritBot constants (from original implementation)
MAXFRAC = 0.01 # max proportion of outliers in a group
MAXNORM = 2.67 # max SDs for non-outlier
MAXTAIL = 5.34 # max SDs for 'ordinary' tail
MINCONTEXT = 25 # min cases to detect other difference
[docs]
@dataclass
class AnomalyContext:
"""Context conditions that define when an anomaly occurs."""
conditions: list[tuple[int, str, Any, Any]] = field(default_factory=list)
# Each condition: (feature_idx, op, value1, value2)
# op: 'le', 'gt', 'in_range', 'eq', 'in_set'
[docs]
@dataclass
class Anomaly:
"""Detected anomaly with context."""
case_idx: int
feature_idx: int
value: Any
score: float # Lower = more anomalous (probability-like)
context: AnomalyContext
group_size: int
group_mean: float = 0.0
group_std: float = 0.0
expected_value: Any = None
@jit(nopython=True, cache=True)
def _trimmed_mean_std(values: np.ndarray, tail_frac: float = 0.01) -> tuple[float, float]:
"""Compute trimmed mean and SD, excluding extreme tails.
Uses GritBot's robust estimation approach: exclude tail cases
and adjust SD by factor (N + 3*Tail) / (N + Tail).
"""
n = len(values)
if n < 10:
return np.mean(values), np.std(values) + 1e-10
# Sort for trimming
sorted_vals = np.sort(values)
# Compute tail size (same as GritBot's MaxAnoms formula)
tail = int(tail_frac * n + 2 * np.sqrt(n * tail_frac * (1 - tail_frac)) + 1)
tail = min(tail, n // 4) # Don't trim more than 25%
if tail >= n // 2:
return np.mean(values), np.std(values) + 1e-10
# Trimmed statistics
trimmed = sorted_vals[tail:n-tail]
cases = len(trimmed)
if cases < 5:
return np.mean(values), np.std(values) + 1e-10
mean = np.mean(trimmed)
std = np.std(trimmed)
# Adjust SD by factor (N + 3*Tail) / (N + Tail) per GritBot
if cases > tail:
std = std * (cases + 3.0 * tail) / (cases + tail)
return mean, max(std, 1e-10)
@jit(nopython=True, cache=True)
def _find_continuous_outliers(
values: np.ndarray,
indices: np.ndarray,
max_norm: float,
min_abnorm: float,
max_frac: float,
) -> tuple[np.ndarray, np.ndarray, float, float]:
"""Find continuous outliers in a subset.
Returns:
outlier_indices: indices of outliers
outlier_scores: anomaly scores (lower = more anomalous)
mean: group mean
std: group std
"""
n = len(values)
if n < 35:
return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.float64), 0.0, 1.0
# Robust mean/std estimation
mean, std = _trimmed_mean_std(values, max_frac)
if std < 1e-10:
return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.float64), mean, std
# Compute Z-scores
z_scores = np.abs(values - mean) / std
# Maximum allowed anomalies
max_anoms = int(max_frac * n + 2 * np.sqrt(n * max_frac * (1 - max_frac)) + 1)
# Sort by Z-score descending
sorted_idx = np.argsort(-z_scores)
# Find outliers: must have Z > min_abnorm and be separated from normal cases
outlier_mask = np.zeros(n, dtype=np.bool_)
n_outliers = 0
for i in range(min(max_anoms, n)):
idx = sorted_idx[i]
z = z_scores[idx]
if z < min_abnorm:
break
# Check gap from next case (simplified gap check)
if i + 1 < n:
next_z = z_scores[sorted_idx[i + 1]]
if z - next_z < min_abnorm - max_norm:
break
outlier_mask[idx] = True
n_outliers += 1
if n_outliers == 0:
return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.float64), mean, std
# Collect outliers
outlier_indices = np.empty(n_outliers, dtype=np.int64)
outlier_scores = np.empty(n_outliers, dtype=np.float64)
j = 0
for i in range(n):
if outlier_mask[i]:
outlier_indices[j] = indices[i]
# Score using Chebyshev bound: P(|X-mu| >= k*sigma) <= 1/k^2
z = z_scores[i]
outlier_scores[j] = 1.0 / (z * z)
j += 1
return outlier_indices, outlier_scores, mean, std
@jit(nopython=True, cache=True)
def _find_discrete_outliers(
values: np.ndarray,
indices: np.ndarray,
priors: np.ndarray,
min_abnorm: float,
max_frac: float,
) -> tuple[np.ndarray, np.ndarray, int]:
"""Find discrete outliers in a subset.
Anomalies are minority class values that are surprising given the
class distribution in the subset.
Returns:
outlier_indices: indices of outliers
outlier_scores: anomaly scores
majority_class: most common value
"""
n = len(values)
if n < 35:
return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.float64), 0
# Count frequencies
n_classes = len(priors)
freqs = np.zeros(n_classes, dtype=np.int64)
for v in values:
if 0 <= v < n_classes:
freqs[v] += 1
# Find majority class
majority = 0
for i in range(n_classes):
if freqs[i] > freqs[majority]:
majority = i
n_anoms = n - freqs[majority]
max_anoms = int(max_frac * n + 2 * np.sqrt(n * max_frac * (1 - max_frac)) + 1)
if n_anoms > max_anoms or n_anoms == 0:
return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.float64), majority
# Threshold for surprise: impurity / prior < 1 / (MINABNORM^2)
threshold = 1.0 / (min_abnorm * min_abnorm)
# Find surprising minority cases
outlier_list = []
score_list = []
for i in range(n):
v = values[i]
if v == majority:
continue
# Compute surprise score: (anoms / n) / prior
prior = priors[v] if 0 <= v < n_classes else 1.0 / n_classes
score = (n_anoms / n) / max(prior, 1e-10)
if score <= threshold:
outlier_list.append(indices[i])
score_list.append(score)
n_out = len(outlier_list)
if n_out == 0:
return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.float64), majority
outlier_indices = np.empty(n_out, dtype=np.int64)
outlier_scores = np.empty(n_out, dtype=np.float64)
for i in range(n_out):
outlier_indices[i] = outlier_list[i]
outlier_scores[i] = score_list[i]
return outlier_indices, outlier_scores, majority
@jit(nopython=True, cache=True, parallel=True)
def _compute_split_gain_continuous(
target: np.ndarray,
feature: np.ndarray,
valid_mask: np.ndarray,
) -> tuple[float, float]:
"""Find best split point for continuous feature to minimize target variance."""
valid_idx = np.where(valid_mask)[0]
n = len(valid_idx)
if n < 70: # Need enough cases for split
return -1.0, 0.0
# Sort by feature value
order = np.argsort(feature[valid_idx])
sorted_target = target[valid_idx[order]]
sorted_feature = feature[valid_idx[order]]
# Compute cumulative stats for efficient variance calculation
cum_sum = np.cumsum(sorted_target)
cum_sum_sq = np.cumsum(sorted_target ** 2)
total_sum = cum_sum[-1]
total_sum_sq = cum_sum_sq[-1]
total_var = total_sum_sq / n - (total_sum / n) ** 2
if total_var < 1e-10:
return -1.0, 0.0
best_gain = -1.0
best_cut = 0.0
min_leaf = max(35, n // 10)
for i in range(min_leaf, n - min_leaf):
# Skip if same value
if sorted_feature[i] == sorted_feature[i-1]:
continue
# Left variance
n_left = i
left_sum = cum_sum[i-1]
left_sum_sq = cum_sum_sq[i-1]
left_var = left_sum_sq / n_left - (left_sum / n_left) ** 2
# Right variance
n_right = n - i
right_sum = total_sum - left_sum
right_sum_sq = total_sum_sq - left_sum_sq
right_var = right_sum_sq / n_right - (right_sum / n_right) ** 2
# Weighted variance reduction
weighted_var = (n_left * left_var + n_right * right_var) / n
gain = total_var - weighted_var
if gain > best_gain:
best_gain = gain
best_cut = (sorted_feature[i-1] + sorted_feature[i]) / 2
return best_gain, best_cut
[docs]
class GritBotDetector(BaseEstimator, OutlierMixin):
"""GritBot-style anomaly detection via recursive partitioning.
GritBot finds anomalies by recursively partitioning data to find
homogeneous subsets, then identifying values that are surprising
given the subset context. This approach is particularly effective for:
- Data with mixed attribute types
- Context-dependent anomalies (value is only anomalous in certain contexts)
- Interpretable anomaly explanations
Parameters
----------
max_conditions : int, default=4
Maximum number of conditions (splits) defining a subset context.
filtering_level : float, default=50.0
Controls sensitivity (0-100). Higher = fewer but more confident anomalies.
- 0: MINABNORM=4 (more sensitive)
- 50: MINABNORM=8 (default)
- 100: MINABNORM=20 (very conservative)
contamination : float, default=0.01
Maximum expected proportion of anomalies.
min_cases : int or None, default=None
Minimum cases in a subset to check for anomalies.
None uses max(35, 0.5% of data).
categorical_features : list or None, default=None
Indices of categorical features. If None, auto-detected.
n_jobs : int, default=1
Parallel jobs (currently not used, reserved for future).
random_state : int or None, default=None
Random seed for reproducibility.
Attributes
----------
anomalies_ : list[Anomaly]
Detected anomalies with full context.
anomaly_indices_ : np.ndarray
Indices of detected anomaly cases.
anomaly_scores_ : np.ndarray
Scores for each sample (higher = more anomalous).
References
----------
Quinlan, J.R. (2010). GritBot GPL Edition. Rulequest Research.
Examples
--------
>>> from endgame.anomaly import GritBotDetector
>>> detector = GritBotDetector(filtering_level=50)
>>> detector.fit(X_train)
>>> scores = detector.decision_function(X_test)
>>> labels = detector.predict(X_test) # 1 = anomaly
>>>
>>> # Get interpretable anomaly explanations
>>> for anomaly in detector.anomalies_[:5]:
... print(f"Case {anomaly.case_idx}: feature {anomaly.feature_idx}")
... print(f" Value: {anomaly.value}, Expected: {anomaly.expected_value}")
... print(f" Context: {anomaly.context.conditions}")
"""
def __init__(
self,
max_conditions: int = 4,
filtering_level: float = 50.0,
contamination: float = 0.01,
min_cases: int | None = None,
categorical_features: list | None = None,
n_jobs: int = 1,
random_state: int | None = None,
):
self.max_conditions = max_conditions
self.filtering_level = filtering_level
self.contamination = contamination
self.min_cases = min_cases
self.categorical_features = categorical_features
self.n_jobs = n_jobs
self.random_state = random_state
def _get_minabnorm(self) -> float:
"""Compute MINABNORM from filtering level (per GritBot formula)."""
cf = np.clip(self.filtering_level, 0, 100)
if cf < 50:
return 0.08 * cf + 4
else:
return 0.24 * cf - 4
def _detect_categorical(self, X: np.ndarray) -> np.ndarray:
"""Auto-detect categorical features."""
n_samples, n_features = X.shape
is_categorical = np.zeros(n_features, dtype=bool)
for j in range(n_features):
col = X[:, j]
unique = np.unique(col[~np.isnan(col)])
# Heuristic: categorical if few unique values or all integers
if len(unique) <= 20 or len(unique) <= 0.05 * n_samples and np.allclose(col[~np.isnan(col)], col[~np.isnan(col)].astype(int)):
is_categorical[j] = True
return is_categorical
def _compute_priors(self, X: np.ndarray, is_categorical: np.ndarray) -> list[np.ndarray]:
"""Compute prior probabilities for categorical features."""
n_samples, n_features = X.shape
priors = []
for j in range(n_features):
if is_categorical[j]:
col = X[:, j].astype(int)
max_val = int(np.nanmax(col)) + 1
counts = np.zeros(max_val)
for v in col:
if not np.isnan(v) and 0 <= v < max_val:
counts[int(v)] += 1
priors.append(counts / n_samples)
else:
priors.append(np.array([1.0]))
return priors
[docs]
def fit(self, X: ArrayLike, y=None) -> GritBotDetector:
"""Fit the GritBot detector and find anomalies.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Not used.
Returns
-------
self : GritBotDetector
Fitted detector.
"""
X = check_array(X, accept_sparse=False, ensure_all_finite='allow-nan')
n_samples, n_features = X.shape
self.n_features_in_ = n_features
# Set minimum cases
if self.min_cases is None:
self._min_cases = max(35, int(0.005 * n_samples))
else:
self._min_cases = self.min_cases
# Detect categorical features
if self.categorical_features is not None:
self._is_categorical = np.zeros(n_features, dtype=bool)
self._is_categorical[self.categorical_features] = True
else:
self._is_categorical = self._detect_categorical(X)
# Compute priors for categorical features
self._priors = self._compute_priors(X, self._is_categorical)
# Get parameters
minabnorm = self._get_minabnorm()
# Store training data for anomaly detection
self._X_train = X.copy()
# Precompute statistics for each feature (for fast scoring)
self._feature_stats = []
for j in range(n_features):
col = X[:, j]
valid = col[~np.isnan(col)]
if len(valid) >= self._min_cases:
mean, std = _trimmed_mean_std(valid, MAXFRAC)
self._feature_stats.append((mean, std, len(valid)))
else:
self._feature_stats.append((0.0, 1.0, 0))
# Find anomalies using recursive partitioning
self.anomalies_ = []
self._anomaly_scores = np.zeros(n_samples)
# Check each feature as the target attribute
for target_att in range(n_features):
self._check_attribute(
X,
np.arange(n_samples),
target_att,
[], # conditions
minabnorm,
)
# Collect results
self.anomaly_indices_ = np.array([a.case_idx for a in self.anomalies_])
# Compute threshold for predict
if len(self.anomalies_) > 0:
scores = self._anomaly_scores.copy()
# Set threshold based on contamination
self.threshold_ = np.percentile(
scores[scores > 0],
100 * (1 - self.contamination)
) if (scores > 0).sum() > 0 else 0.5
else:
self.threshold_ = 0.5
return self
def _check_attribute(
self,
X: np.ndarray,
indices: np.ndarray,
target_att: int,
conditions: list,
minabnorm: float,
):
"""Check an attribute for anomalies within a subset."""
n = len(indices)
if n < self._min_cases:
return
# Get target values (exclude missing)
target_vals = X[indices, target_att]
valid_mask = ~np.isnan(target_vals)
valid_indices = indices[valid_mask]
valid_vals = target_vals[valid_mask]
if len(valid_vals) < self._min_cases:
return
# Find outliers in this subset
if self._is_categorical[target_att]:
outlier_idx, scores, majority = _find_discrete_outliers(
valid_vals.astype(np.int64),
valid_indices,
self._priors[target_att],
minabnorm,
MAXFRAC,
)
mean, std = float(majority), 0.0
expected = int(majority)
else:
outlier_idx, scores, mean, std = _find_continuous_outliers(
valid_vals,
valid_indices,
MAXNORM,
minabnorm,
MAXFRAC,
)
expected = mean
# Record anomalies
for i, (idx, score) in enumerate(zip(outlier_idx, scores)):
# Only record if this is the best score for this case/feature
if score < self._anomaly_scores[idx] or self._anomaly_scores[idx] == 0:
self._anomaly_scores[idx] = 1.0 - score # Convert to higher=more anomalous
ctx = AnomalyContext(conditions=conditions.copy())
anomaly = Anomaly(
case_idx=int(idx),
feature_idx=target_att,
value=X[idx, target_att],
score=score,
context=ctx,
group_size=len(valid_vals),
group_mean=mean,
group_std=std,
expected_value=expected,
)
self.anomalies_.append(anomaly)
# Recursive partitioning if we can add more conditions
if len(conditions) >= self.max_conditions:
return
# Find best split on other attributes
best_gain = 0.0
best_att = -1
best_cut = 0.0
for split_att in range(X.shape[1]):
if split_att == target_att:
continue
split_vals = X[valid_indices, split_att]
split_mask = ~np.isnan(split_vals)
if split_mask.sum() < 2 * self._min_cases:
continue
if self._is_categorical[split_att]:
# For categorical, use information gain (simplified)
continue # Skip for now, focus on continuous splits
else:
gain, cut = _compute_split_gain_continuous(
valid_vals[split_mask],
split_vals[split_mask],
np.ones(split_mask.sum(), dtype=bool),
)
if gain > best_gain:
best_gain = gain
best_att = split_att
best_cut = cut
# Recursively check if good split found
if best_gain > 0 and best_att >= 0:
split_vals = X[valid_indices, best_att]
# Left branch (<=)
left_mask = split_vals <= best_cut
if left_mask.sum() >= self._min_cases:
left_cond = conditions + [(best_att, 'le', best_cut, None)]
self._check_attribute(
X, valid_indices[left_mask], target_att, left_cond, minabnorm
)
# Right branch (>)
right_mask = split_vals > best_cut
if right_mask.sum() >= self._min_cases:
right_cond = conditions + [(best_att, 'gt', best_cut, None)]
self._check_attribute(
X, valid_indices[right_mask], target_att, right_cond, minabnorm
)
[docs]
def decision_function(self, X: ArrayLike) -> np.ndarray:
"""Compute anomaly scores for samples.
Higher scores indicate more anomalous samples.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to score.
Returns
-------
scores : ndarray of shape (n_samples,)
Anomaly scores.
"""
check_is_fitted(self, ["anomalies_", "_feature_stats"])
X = check_array(X, accept_sparse=False, ensure_all_finite='allow-nan')
n_samples, n_features = X.shape
minabnorm = self._get_minabnorm()
# Vectorized scoring using precomputed statistics
scores = np.zeros(n_samples)
for j in range(n_features):
mean, std, n_valid = self._feature_stats[j]
if n_valid < self._min_cases:
continue
vals = X[:, j]
valid_mask = ~np.isnan(vals)
if self._is_categorical[j]:
# Score based on rarity
priors = self._priors[j]
for i in np.where(valid_mask)[0]:
v = int(vals[i])
if 0 <= v < len(priors):
score = 1.0 - priors[v]
else:
score = 0.99
scores[i] = max(scores[i], score)
else:
# Vectorized Z-score calculation
z = np.abs(vals - mean) / max(std, 1e-10)
# Compute scores vectorized
feat_scores = np.zeros(n_samples)
high_z = z > minabnorm
mid_z = (z > MAXNORM) & ~high_z
feat_scores[high_z] = 1.0 - 1.0 / (z[high_z] ** 2)
feat_scores[mid_z] = 0.5 * (z[mid_z] - MAXNORM) / (minabnorm - MAXNORM)
# Apply mask and take max
feat_scores[~valid_mask] = 0.0
scores = np.maximum(scores, feat_scores)
return scores
[docs]
def predict(self, X: ArrayLike) -> np.ndarray:
"""Predict anomaly labels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to classify.
Returns
-------
labels : ndarray of shape (n_samples,)
1 for anomalies, 0 for normal samples.
"""
scores = self.decision_function(X)
return (scores >= self.threshold_).astype(int)
[docs]
def fit_predict(self, X: ArrayLike, y=None) -> np.ndarray:
"""Fit and return anomaly labels for training data."""
self.fit(X)
# For training data, use the stored scores
return (self._anomaly_scores >= self.threshold_).astype(int)
[docs]
def get_anomaly_report(self, max_anomalies: int = 10) -> str:
"""Generate a human-readable anomaly report.
Parameters
----------
max_anomalies : int, default=10
Maximum anomalies to include in report.
Returns
-------
report : str
Formatted anomaly report.
"""
check_is_fitted(self, ["anomalies_"])
if not self.anomalies_:
return "No anomalies detected."
# Sort by score (most anomalous first)
sorted_anoms = sorted(self.anomalies_, key=lambda a: a.score)[:max_anomalies]
lines = [f"GritBot Anomaly Report ({len(self.anomalies_)} total anomalies)"]
lines.append("=" * 60)
for i, anom in enumerate(sorted_anoms, 1):
lines.append(f"\n{i}. Case {anom.case_idx} [score: {anom.score:.4f}]")
lines.append(f" Feature {anom.feature_idx}: {anom.value:.4f}" if isinstance(anom.value, float) else f" Feature {anom.feature_idx}: {anom.value}")
if anom.expected_value is not None:
lines.append(f" Expected: {anom.expected_value:.4f}" if isinstance(anom.expected_value, float) else f" Expected: {anom.expected_value}")
lines.append(f" Group: {anom.group_size} cases, mean={anom.group_mean:.3f}, std={anom.group_std:.3f}")
if anom.context.conditions:
lines.append(" Context:")
for feat, op, val1, val2 in anom.context.conditions:
if op == 'le':
lines.append(f" Feature {feat} <= {val1:.3f}")
elif op == 'gt':
lines.append(f" Feature {feat} > {val1:.3f}")
elif op == 'in_range':
lines.append(f" Feature {feat} in [{val1:.3f}, {val2:.3f}]")
return "\n".join(lines)