from __future__ import annotations
"""PRIM: Patient Rule Induction Method for bump hunting.
PRIM is a subgroup discovery algorithm that finds rectangular regions
(boxes) in the feature space where the target variable has unusually
high (or low) mean values. It operates through iterative "peeling" and
optional "pasting" steps.
The algorithm is particularly useful for:
- Finding high-performing customer segments
- Identifying failure modes in manufacturing
- Scenario discovery in policy analysis
- Anomaly detection contexts
References
----------
- Friedman & Fisher, "Bump Hunting in High-Dimensional Data" (1999)
- RAND Corporation sdtoolkit
- Project-Platypus/PRIM Python implementation
"""
from dataclasses import dataclass, field
from typing import Literal
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.preprocessing import LabelEncoder
[docs]
@dataclass
class Box:
"""A rectangular region (box) in feature space.
Attributes
----------
limits : Dict[int, Tuple[float, float]]
Feature index -> (lower, upper) bound.
coverage : float
Fraction of data points inside the box.
density : float
Mean target value inside the box.
support : int
Number of data points inside the box.
"""
limits: dict[int, tuple[float, float]] = field(default_factory=dict)
coverage: float = 1.0
density: float = 0.0
support: int = 0
[docs]
def contains(self, X: np.ndarray) -> np.ndarray:
"""Check which points are inside the box.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Data points to check.
Returns
-------
mask : ndarray of shape (n_samples,)
Boolean mask, True if point is inside box.
"""
if not self.limits:
return np.ones(len(X), dtype=bool)
feat_indices = np.fromiter(self.limits.keys(), dtype=np.intp)
bounds = np.array([self.limits[i] for i in feat_indices])
X_sub = X[:, feat_indices]
return np.all((X_sub >= bounds[:, 0]) & (X_sub <= bounds[:, 1]), axis=1)
[docs]
def to_rules(self, feature_names: list[str] | None = None) -> list[str]:
"""Convert box to human-readable rules.
Parameters
----------
feature_names : list of str, optional
Names of features.
Returns
-------
rules : list of str
List of rule strings.
"""
rules = []
for feat_idx, (lower, upper) in sorted(self.limits.items()):
name = feature_names[feat_idx] if feature_names else f"x{feat_idx}"
rules.append(f"{lower:.4g} <= {name} <= {upper:.4g}")
return rules
def __repr__(self) -> str:
return (
f"Box(coverage={self.coverage:.3f}, density={self.density:.4f}, "
f"support={self.support}, n_restrictions={len(self.limits)})"
)
[docs]
@dataclass
class PRIMResult:
"""Result of PRIM analysis.
Attributes
----------
boxes : List[Box]
Sequence of boxes from peeling trajectory.
peeling_trajectory : List[Dict]
Statistics at each peeling step.
selected_box : Box
The selected box (based on some criterion).
selected_idx : int
Index of selected box in trajectory.
"""
boxes: list[Box] = field(default_factory=list)
peeling_trajectory: list[dict[str, float]] = field(default_factory=list)
selected_box: Box | None = None
selected_idx: int = -1
[docs]
def get_pareto_frontier(self) -> list[int]:
"""Get indices of boxes on the coverage-density Pareto frontier."""
if not self.peeling_trajectory:
return []
coverages = np.array([t["coverage"] for t in self.peeling_trajectory])
densities = np.array([t["density"] for t in self.peeling_trajectory])
# Find Pareto-optimal points (max density for each coverage level)
pareto_indices = []
max_density = -np.inf
for i in range(len(coverages) - 1, -1, -1):
if densities[i] > max_density:
pareto_indices.append(i)
max_density = densities[i]
return sorted(pareto_indices)
[docs]
class PRIMRegressor(RegressorMixin, BaseEstimator):
"""PRIM (Patient Rule Induction Method) for regression/continuous targets.
Finds rectangular regions where the target variable has unusually
high mean values. Uses iterative peeling to shrink boxes while
increasing target density.
Parameters
----------
alpha : float, default=0.05
Peeling fraction - proportion of data removed in each peel.
Smaller values = more "patient" peeling.
threshold_type : str, default='quantile'
How to define "interesting" regions: 'quantile' or 'absolute'.
threshold : float, default=0.9
Threshold for defining interesting regions.
If 'quantile', fraction of top values to consider interesting.
min_support : int or float, default=20
Minimum number of points in a box. If float, interpreted as fraction.
pasting : bool, default=True
Whether to apply pasting (box expansion) after peeling.
paste_alpha : float, default=0.01
Pasting fraction for box expansion.
n_boxes : int, default=1
Number of boxes to find (sequential covering).
Attributes
----------
result_ : PRIMResult
Full PRIM analysis result.
boxes_ : List[Box]
The final boxes found.
feature_names_in_ : ndarray
Names of features.
n_features_in_ : int
Number of features.
Examples
--------
>>> from endgame.models.subgroup import PRIMRegressor
>>> prim = PRIMRegressor(alpha=0.05, min_support=30)
>>> prim.fit(X, y)
>>> print(prim.boxes_[0].to_rules())
>>> mask = prim.predict(X) # Boolean mask of points in box
Notes
-----
PRIM works best when:
1. You're looking for interpretable subgroups
2. The target has heterogeneous behavior across the feature space
3. You want rectangular (axis-aligned) regions
"""
_estimator_type = "regressor"
def __init__(
self,
alpha: float = 0.05,
threshold_type: Literal["quantile", "absolute"] = "quantile",
threshold: float = 0.9,
min_support: int | float = 20,
pasting: bool = True,
paste_alpha: float = 0.01,
n_boxes: int = 1,
):
self.alpha = alpha
self.threshold_type = threshold_type
self.threshold = threshold
self.min_support = min_support
self.pasting = pasting
self.paste_alpha = paste_alpha
self.n_boxes = n_boxes
self.result_: PRIMResult | None = None
self.boxes_: list[Box] = []
self.feature_names_in_: np.ndarray | None = None
self.n_features_in_: int = 0
self._is_fitted: bool = False
[docs]
def fit(self, X, y, feature_names: list[str] | None = None) -> PRIMRegressor:
"""Fit PRIM to find high-density regions.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training features.
y : array-like of shape (n_samples,)
Target values (higher = more interesting).
feature_names : list of str, optional
Names of features for interpretable output.
Returns
-------
self
"""
X = np.asarray(X, dtype=np.float64) if not isinstance(X, np.ndarray) else X.astype(np.float64, copy=False)
y = np.asarray(y, dtype=np.float64) if not isinstance(y, np.ndarray) else y.astype(np.float64, copy=False)
n_samples, n_features = X.shape
self.n_features_in_ = n_features
if feature_names is not None:
self.feature_names_in_ = np.array(feature_names)
else:
self.feature_names_in_ = np.array([f"x{i}" for i in range(n_features)])
# Calculate minimum support
if isinstance(self.min_support, float) and self.min_support < 1:
min_support = int(self.min_support * n_samples)
else:
min_support = int(self.min_support)
min_support = max(min_support, 2)
# Store global mean for predict() fallback
self._y_global_mean = float(y.mean())
# Find boxes sequentially
self.boxes_ = []
remaining_mask = np.ones(n_samples, dtype=bool)
for _ in range(self.n_boxes):
X_remaining = X[remaining_mask]
y_remaining = y[remaining_mask]
if len(y_remaining) < min_support:
break
# Run PRIM on remaining data
result = self._prim_one_box(
X_remaining, y_remaining, min_support
)
if result.selected_box is not None:
self.boxes_.append(result.selected_box)
# Remove covered points for sequential covering
box_mask = result.selected_box.contains(X)
remaining_mask[box_mask] = False
self.result_ = result if self.n_boxes == 1 else None
self._is_fitted = True
return self
def _prim_one_box(
self, X: np.ndarray, y: np.ndarray, min_support: int
) -> PRIMResult:
"""Find one box using PRIM algorithm."""
n_samples, n_features = X.shape
current_limits = {
i: (X[:, i].min(), X[:, i].max()) for i in range(n_features)
}
current_mask = np.ones(n_samples, dtype=bool)
boxes = []
trajectory = []
support = n_samples
y_sum = y.sum()
density = y_sum / support if support > 0 else 0.0
coverage = 1.0
boxes.append(
Box(
limits=current_limits.copy(),
coverage=coverage,
density=density,
support=support,
)
)
trajectory.append({"coverage": coverage, "density": density, "support": support})
X_cols = [X[:, i] for i in range(n_features)]
buf = np.empty(n_samples, dtype=bool)
# Peeling phase
while support > min_support:
best_feat = -1
best_side = ""
best_threshold = 0.0
best_density = density
for feat_idx in range(n_features):
col = X_cols[feat_idx]
x_active = col[current_mask]
n_active = len(x_active)
if n_active <= min_support:
continue
k_lo = max(1, int(self.alpha * n_active))
k_hi = min(n_active - 1, int((1 - self.alpha) * n_active))
partitioned = np.partition(x_active, (k_lo, k_hi))
thresh_lo = partitioned[k_lo]
thresh_hi = partitioned[k_hi]
# Peel from bottom (in-place mask to avoid temporaries)
np.greater(col, thresh_lo, out=buf)
np.logical_and(current_mask, buf, out=buf)
n_lo = buf.sum()
if n_lo >= min_support:
d_lo = np.dot(y, buf) / n_lo
if d_lo > best_density:
best_density = d_lo
best_feat = feat_idx
best_side = "lower"
best_threshold = thresh_lo
# Peel from top (reuse buf)
np.less(col, thresh_hi, out=buf)
np.logical_and(current_mask, buf, out=buf)
n_hi = buf.sum()
if n_hi >= min_support:
d_hi = np.dot(y, buf) / n_hi
if d_hi > best_density:
best_density = d_hi
best_feat = feat_idx
best_side = "upper"
best_threshold = thresh_hi
if best_feat < 0:
break
col = X_cols[best_feat]
if best_side == "lower":
np.greater(col, best_threshold, out=buf)
current_limits[best_feat] = (
best_threshold, current_limits[best_feat][1],
)
else:
np.less(col, best_threshold, out=buf)
current_limits[best_feat] = (
current_limits[best_feat][0], best_threshold,
)
np.logical_and(current_mask, buf, out=current_mask)
support = current_mask.sum()
density = np.dot(y, current_mask) / support
coverage = support / n_samples
boxes.append(
Box(
limits=current_limits.copy(),
coverage=coverage,
density=density,
support=support,
)
)
trajectory.append(
{"coverage": coverage, "density": density, "support": support}
)
# Pasting phase (expand box boundaries if it improves density)
if self.pasting and len(boxes) > 1:
boxes, trajectory = self._paste(X, y, boxes, trajectory, min_support)
# Select best box (maximum density with reasonable support)
selected_idx = self._select_box(trajectory)
return PRIMResult(
boxes=boxes,
peeling_trajectory=trajectory,
selected_box=boxes[selected_idx] if boxes else None,
selected_idx=selected_idx,
)
def _paste(
self,
X: np.ndarray,
y: np.ndarray,
boxes: list[Box],
trajectory: list[dict],
min_support: int,
) -> tuple[list[Box], list[dict]]:
"""Apply pasting to expand box boundaries."""
if not boxes:
return boxes, trajectory
n_samples, n_features = X.shape
X_cols = [X[:, i] for i in range(n_features)]
best_idx = self._select_box(trajectory)
current_box = boxes[best_idx]
current_limits = current_box.limits.copy()
current_mask = current_box.contains(X)
current_n = current_mask.sum()
current_ysum = np.dot(y, current_mask)
improved = True
while improved:
improved = False
current_density = current_ysum / current_n if current_n > 0 else 0.0
for feat_idx in range(n_features):
if feat_idx not in current_limits:
continue
col = X_cols[feat_idx]
lower, upper = current_limits[feat_idx]
# Try expanding lower bound
outside_lower = col < lower
n_outside = outside_lower.sum()
if n_outside > 0:
x_outside = col[outside_lower]
k = min(n_outside - 1, int((1 - self.paste_alpha) * n_outside))
expand_threshold = np.partition(x_outside, k)[k]
added = (col >= expand_threshold) & (col < lower) & ~current_mask
n_added = added.sum()
if n_added > 0:
new_n = current_n + n_added
new_ysum = current_ysum + np.dot(y, added)
new_density = new_ysum / new_n
if new_density > current_density:
current_limits[feat_idx] = (expand_threshold, upper)
current_mask = current_mask | added
current_n = new_n
current_ysum = new_ysum
improved = True
lower, upper = current_limits[feat_idx]
# Try expanding upper bound
outside_upper = col > upper
n_outside = outside_upper.sum()
if n_outside > 0:
x_outside = col[outside_upper]
k = max(0, int(self.paste_alpha * n_outside))
expand_threshold = np.partition(x_outside, k)[k]
added = (col <= expand_threshold) & (col > upper) & ~current_mask
n_added = added.sum()
if n_added > 0:
new_n = current_n + n_added
new_ysum = current_ysum + np.dot(y, added)
new_density = new_ysum / new_n
if new_density > current_density:
current_limits[feat_idx] = (lower, expand_threshold)
current_mask = current_mask | added
current_n = new_n
current_ysum = new_ysum
improved = True
if current_n != current_box.support:
final_density = current_ysum / current_n if current_n > 0 else 0.0
pasted_box = Box(
limits=current_limits.copy(),
coverage=current_n / n_samples,
density=final_density,
support=current_n,
)
boxes.append(pasted_box)
trajectory.append(
{
"coverage": pasted_box.coverage,
"density": pasted_box.density,
"support": pasted_box.support,
}
)
return boxes, trajectory
def _select_box(self, trajectory: list[dict]) -> int:
"""Select the best box from the peeling trajectory.
Maximizes density subject to minimum coverage (0.01).
"""
if not trajectory:
return 0
best_idx = 0
best_density = -np.inf
min_coverage = 0.01
for i, t in enumerate(trajectory):
if t["coverage"] >= min_coverage and t["density"] > best_density:
best_density = t["density"]
best_idx = i
if best_density == -np.inf:
return len(trajectory) - 1
return best_idx
[docs]
def predict(self, X) -> np.ndarray:
"""Predict target values based on box membership.
Points inside a box get that box's mean target density.
Points outside all boxes get the global training mean.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data points.
Returns
-------
predictions : ndarray of shape (n_samples,)
Predicted target values.
"""
if not self._is_fitted:
raise RuntimeError("PRIMRegressor has not been fitted.")
X = np.asarray(X)
predictions = np.full(len(X), self._y_global_mean)
# Assign box density to points inside boxes (last box wins)
for box in self.boxes_:
mask = box.contains(X)
predictions[mask] = box.density
return predictions
[docs]
def predict_membership(self, X) -> np.ndarray:
"""Predict whether points fall in the found box(es).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data points.
Returns
-------
mask : ndarray of shape (n_samples,)
Boolean mask, True if point is in any box.
"""
if not self._is_fitted:
raise RuntimeError("PRIMRegressor has not been fitted.")
X = np.asarray(X)
mask = np.zeros(len(X), dtype=bool)
for box in self.boxes_:
mask |= box.contains(X)
return mask
[docs]
def score(self, X, y) -> float:
"""Score the model: mean target value in predicted boxes.
Parameters
----------
X : array-like
Features.
y : array-like
Target values.
Returns
-------
score : float
Mean target value in boxes minus overall mean.
"""
if not self._is_fitted:
raise RuntimeError("PRIMRegressor has not been fitted.")
X = np.asarray(X)
y = np.asarray(y)
mask = self.predict(X)
if mask.sum() == 0:
return 0.0
return y[mask].mean() - y.mean()
[docs]
def get_rules(self) -> list[list[str]]:
"""Get human-readable rules for all boxes.
Returns
-------
rules : list of list of str
Rules for each box.
"""
if not self._is_fitted:
raise RuntimeError("PRIMRegressor has not been fitted.")
return [
box.to_rules(list(self.feature_names_in_)) for box in self.boxes_
]
[docs]
class PRIMClassifier(ClassifierMixin, BaseEstimator):
"""PRIM for classification via one-vs-rest subgroup discovery.
Trains a PRIM regressor per class (one-vs-rest) on the binary
indicator for each class. At prediction time, the class whose
box gives the highest density for a sample wins; samples not in
any box are assigned the majority class.
Parameters
----------
alpha : float, default=0.05
Peeling fraction.
min_support : int or float, default=20
Minimum number of points in a box.
pasting : bool, default=True
Whether to apply pasting after peeling.
paste_alpha : float, default=0.01
Pasting fraction.
n_boxes : int, default=1
Number of boxes to find per class.
Examples
--------
>>> from endgame.models.subgroup import PRIMClassifier
>>> prim = PRIMClassifier(alpha=0.05)
>>> prim.fit(X, y)
>>> preds = prim.predict(X)
>>> print(prim.get_rules())
"""
_estimator_type = "classifier"
def __init__(
self,
alpha: float = 0.05,
min_support: int | float = 20,
pasting: bool = True,
paste_alpha: float = 0.01,
n_boxes: int = 1,
):
self.alpha = alpha
self.min_support = min_support
self.pasting = pasting
self.paste_alpha = paste_alpha
self.n_boxes = n_boxes
self.classes_: np.ndarray | None = None
self._prim_regressors: list[PRIMRegressor] = []
self._base_rates: np.ndarray | None = None
self._label_encoder: LabelEncoder | None = None
self._majority_class_idx: int = 0
self._is_fitted: bool = False
[docs]
def fit(self, X, y, feature_names: list[str] | None = None) -> PRIMClassifier:
"""Fit one PRIM model per class (one-vs-rest).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training features.
y : array-like of shape (n_samples,)
Target labels.
feature_names : list of str, optional
Names of features.
Returns
-------
self
"""
X = np.asarray(X, dtype=np.float64)
y = np.asarray(y)
self._label_encoder = LabelEncoder()
y_encoded = self._label_encoder.fit_transform(y)
self.classes_ = self._label_encoder.classes_
n_classes = len(self.classes_)
counts = np.bincount(y_encoded, minlength=n_classes)
self._majority_class_idx = int(np.argmax(counts))
self._base_rates = counts / len(y_encoded)
self._prim_regressors = []
for c in range(n_classes):
y_binary = (y_encoded == c).astype(np.float64)
reg = PRIMRegressor(
alpha=self.alpha,
min_support=self.min_support,
pasting=self.pasting,
paste_alpha=self.paste_alpha,
n_boxes=self.n_boxes,
)
reg.fit(X, y_binary, feature_names=feature_names)
self._prim_regressors.append(reg)
self._is_fitted = True
return self
@property
def boxes_(self) -> list[list[Box]]:
"""Get the discovered boxes for each class."""
return [r.boxes_ for r in self._prim_regressors]
@property
def feature_names_in_(self) -> np.ndarray | None:
"""Get feature names."""
if not self._prim_regressors:
return None
return self._prim_regressors[0].feature_names_in_
@property
def n_features_in_(self) -> int:
"""Get number of features."""
if not self._prim_regressors:
return 0
return self._prim_regressors[0].n_features_in_
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Estimate class probabilities based on box densities.
For each sample, the probability for class *c* is the density
of the best box that contains it, or the base rate if no box
contains it. Probabilities are row-normalised.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data points.
Returns
-------
proba : ndarray of shape (n_samples, n_classes)
Class probabilities.
"""
if not self._is_fitted:
raise RuntimeError("PRIMClassifier has not been fitted.")
X = np.asarray(X, dtype=np.float64)
n_samples = len(X)
n_classes = len(self.classes_)
proba = np.tile(self._base_rates, (n_samples, 1)).astype(np.float64)
for c, reg in enumerate(self._prim_regressors):
for box in reg.boxes_:
mask = box.contains(X)
if mask.any():
proba[mask, c] = max(box.density, proba[mask, c].max())
row_sums = proba.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
proba /= row_sums
return proba
[docs]
def predict(self, X) -> np.ndarray:
"""Predict class labels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data points.
Returns
-------
labels : ndarray of shape (n_samples,)
Predicted class labels.
"""
proba = self.predict_proba(X)
indices = np.argmax(proba, axis=1)
return self._label_encoder.inverse_transform(indices)
[docs]
def score(self, X, y) -> float:
"""Classification accuracy.
Parameters
----------
X : array-like
Features.
y : array-like
Target labels.
Returns
-------
score : float
Accuracy.
"""
return float(np.mean(self.predict(X) == np.asarray(y)))
[docs]
def get_rules(self) -> list[list[list[str]]]:
"""Get human-readable rules for all classes and boxes.
Returns
-------
rules : list[list[list[str]]]
rules[class_idx][box_idx] is a list of rule strings.
"""
if not self._is_fitted:
raise RuntimeError("PRIMClassifier has not been fitted.")
return [reg.get_rules() for reg in self._prim_regressors]