from __future__ import annotations
"""Oblique Random Forest: Random Forest with linear combination splits.
Standard Random Forests use axis-aligned splits — each split considers only a
single feature (e.g., "x₃ > 5.2"). Oblique Random Forests use oblique splits —
linear combinations of features (e.g., "0.3x₁ + 0.7x₃ - 0.2x₅ > 2.1"). This
allows the forest to capture linear decision boundaries that would require many
axis-aligned splits to approximate.
Why this matters for competitive ML:
- Provides ensemble diversity (errors uncorrelated with axis-aligned trees/GBDTs)
- Often outperforms standard RF on data with linear structure
- Valuable as a diverse base learner in stacking ensembles
References
----------
Menze, B. H., et al. (2011). "On Oblique Random Forests."
Machine Learning and Knowledge Discovery in Databases, ECML PKDD.
Breiman, L. (2001). "Random Forests." Machine Learning, 45(1), 5-32.
"""
import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import check_random_state
from sklearn.utils.validation import check_array, check_is_fitted, check_X_y
from endgame.models.trees.oblique_tree import (
ObliqueDecisionTreeClassifier,
ObliqueDecisionTreeRegressor,
)
def _fit_single_tree_classifier(
tree_idx: int,
X: np.ndarray,
y: np.ndarray,
sample_weight: np.ndarray | None,
bootstrap: bool,
base_seed: int,
tree_params: dict,
n_samples: int,
) -> tuple[ObliqueDecisionTreeClassifier, np.ndarray]:
"""Fit a single classifier tree (for parallel execution).
Returns the fitted tree and the bootstrap indices (for OOB calculation).
"""
rng = np.random.RandomState(base_seed + tree_idx)
# Bootstrap sampling
if bootstrap:
indices = rng.choice(n_samples, size=n_samples, replace=True)
X_sample = X[indices]
y_sample = y[indices]
if sample_weight is not None:
weight_sample = sample_weight[indices]
else:
weight_sample = None
else:
indices = np.arange(n_samples)
X_sample = X
y_sample = y
weight_sample = sample_weight
# Create and fit tree
tree = ObliqueDecisionTreeClassifier(
random_state=rng.randint(2**31),
**tree_params
)
tree.fit(X_sample, y_sample, sample_weight=weight_sample)
return tree, indices
def _fit_single_tree_regressor(
tree_idx: int,
X: np.ndarray,
y: np.ndarray,
sample_weight: np.ndarray | None,
bootstrap: bool,
base_seed: int,
tree_params: dict,
n_samples: int,
) -> tuple[ObliqueDecisionTreeRegressor, np.ndarray]:
"""Fit a single regressor tree (for parallel execution)."""
rng = np.random.RandomState(base_seed + tree_idx)
if bootstrap:
indices = rng.choice(n_samples, size=n_samples, replace=True)
X_sample = X[indices]
y_sample = y[indices]
if sample_weight is not None:
weight_sample = sample_weight[indices]
else:
weight_sample = None
else:
indices = np.arange(n_samples)
X_sample = X
y_sample = y
weight_sample = sample_weight
tree = ObliqueDecisionTreeRegressor(
random_state=rng.randint(2**31),
**tree_params
)
tree.fit(X_sample, y_sample, sample_weight=weight_sample)
return tree, indices
class ObliqueRandomForestClassifier(ClassifierMixin, BaseEstimator):
"""Random Forest with oblique (linear combination) splits.
Unlike standard Random Forest which uses axis-aligned splits on single
features, Oblique Random Forest finds linear combinations of features
for each split. This enables better decision boundaries for data with
linear structure while maintaining the ensemble benefits of RF.
Parameters
----------
n_estimators : int, default=100
Number of trees in the forest.
oblique_method : str, default='ridge'
Method for finding oblique split directions:
- 'ridge': Ridge regression on class labels (recommended)
- 'pca': Principal Component Analysis
- 'lda': Linear Discriminant Analysis (classification only)
- 'random': Random projections (fastest)
- 'svm': Linear SVM hyperplane
- 'householder': Householder reflections
criterion : str, default='gini'
Splitting criterion: 'gini' or 'entropy'.
max_depth : int, default=None
Maximum depth of each tree. None means nodes expand until
all leaves are pure or contain < min_samples_split samples.
min_samples_split : int or float, default=2
Minimum samples required to split an internal node.
If float, interpreted as fraction of total samples.
min_samples_leaf : int or float, default=1
Minimum samples required at a leaf node.
If float, interpreted as fraction of total samples.
max_features : int, float, str, or None, default='sqrt'
Number of features to consider for each oblique split:
- int: Use exactly max_features
- float: Use max_features * n_features (fraction)
- 'sqrt': Use sqrt(n_features)
- 'log2': Use log2(n_features)
- None: Use all features
max_leaf_nodes : int, default=None
Maximum number of leaf nodes per tree. None means unlimited.
(Not yet implemented - placeholder for sklearn compatibility)
min_impurity_decrease : float, default=0.0
Minimum impurity decrease required for a split.
bootstrap : bool, default=True
Whether to use bootstrap sampling for each tree.
oob_score : bool, default=False
Whether to compute out-of-bag score.
n_jobs : int, default=None
Number of parallel jobs for fitting trees.
None means 1, -1 means all processors.
random_state : int, RandomState, or None, default=None
Random seed for reproducibility.
verbose : int, default=0
Verbosity level.
warm_start : bool, default=False
If True, reuse previous fit and add more trees.
class_weight : dict, 'balanced', 'balanced_subsample', or None, default=None
Weights for classes. 'balanced' adjusts weights inversely
proportional to class frequencies.
(Not yet implemented - placeholder for sklearn compatibility)
feature_combinations : int, default=2
Number of features to combine in each oblique split candidate.
Higher values allow more complex splits but increase computation.
Only used when oblique_method='random'.
ridge_alpha : float, default=1.0
Regularization strength for ridge regression method.
Attributes
----------
estimators_ : list of ObliqueDecisionTreeClassifier
The fitted tree estimators.
classes_ : ndarray of shape (n_classes,)
Unique class labels.
n_classes_ : int
Number of classes.
n_features_in_ : int
Number of features seen during fit.
feature_names_in_ : ndarray of shape (n_features_in_,)
Feature names seen during fit (if provided).
feature_importances_ : ndarray of shape (n_features_in_,)
Impurity-based feature importances.
oob_score_ : float
Out-of-bag accuracy score (if oob_score=True).
oob_decision_function_ : ndarray of shape (n_samples, n_classes)
OOB predicted class probabilities (if oob_score=True).
Examples
--------
>>> from endgame.models.trees import ObliqueRandomForestClassifier
>>> from sklearn.datasets import make_classification
>>> X, y = make_classification(n_samples=1000, n_features=10,
... n_informative=5, random_state=42)
>>> clf = ObliqueRandomForestClassifier(n_estimators=100, random_state=42)
>>> clf.fit(X, y)
>>> print(clf.score(X, y))
Notes
-----
Oblique Random Forests typically outperform axis-aligned Random Forests
when the true decision boundary is linear or approximately linear. They
provide valuable diversity when ensembled with standard GBDTs.
"""
_estimator_type = "classifier"
def __init__(
self,
n_estimators: int = 100,
oblique_method: str = "ridge",
criterion: str = "gini",
max_depth: int | None = None,
min_samples_split: int | float = 2,
min_samples_leaf: int | float = 1,
max_features: int | float | str | None = "sqrt",
max_leaf_nodes: int | None = None,
min_impurity_decrease: float = 0.0,
bootstrap: bool = True,
oob_score: bool = False,
n_jobs: int | None = None,
random_state: int | np.random.RandomState | None = None,
verbose: int = 0,
warm_start: bool = False,
class_weight: dict | str | None = None,
feature_combinations: int = 2,
ridge_alpha: float = 1.0,
):
self.n_estimators = n_estimators
self.oblique_method = oblique_method
self.criterion = criterion
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.max_features = max_features
self.max_leaf_nodes = max_leaf_nodes
self.min_impurity_decrease = min_impurity_decrease
self.bootstrap = bootstrap
self.oob_score = oob_score
self.n_jobs = n_jobs
self.random_state = random_state
self.verbose = verbose
self.warm_start = warm_start
self.class_weight = class_weight
self.feature_combinations = feature_combinations
self.ridge_alpha = ridge_alpha
def fit(
self,
X: np.ndarray,
y: np.ndarray,
sample_weight: np.ndarray | None = None,
) -> ObliqueRandomForestClassifier:
"""Build an oblique random forest from the training data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : array-like of shape (n_samples,)
Target class labels.
sample_weight : array-like of shape (n_samples,), default=None
Sample weights for fitting.
Returns
-------
self : object
Fitted estimator.
"""
X, y = check_X_y(X, y)
n_samples, n_features = X.shape
self.n_features_in_ = n_features
# Encode classes
self._label_encoder = LabelEncoder()
y_encoded = self._label_encoder.fit_transform(y)
self.classes_ = self._label_encoder.classes_
self.n_classes_ = len(self.classes_)
# Tree parameters to pass to base estimator
tree_params = {
"oblique_method": self.oblique_method,
"criterion": self.criterion,
"max_depth": self.max_depth,
"min_samples_split": self.min_samples_split,
"min_samples_leaf": self.min_samples_leaf,
"max_features": self.max_features,
"min_impurity_decrease": self.min_impurity_decrease,
"ridge_alpha": self.ridge_alpha,
"feature_combinations": self.feature_combinations,
}
# Random state handling
random_state = check_random_state(self.random_state)
base_seed = random_state.randint(2**31)
# Handle warm start
if self.warm_start and hasattr(self, "estimators_"):
n_existing = len(self.estimators_)
if n_existing >= self.n_estimators:
return self
n_new = self.n_estimators - n_existing
else:
self.estimators_ = []
self._bootstrap_indices = []
n_existing = 0
n_new = self.n_estimators
# Fit trees in parallel
if self.verbose > 0:
print(f"Fitting {n_new} oblique trees...")
results = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)(
delayed(_fit_single_tree_classifier)(
i + n_existing, X, y_encoded, sample_weight,
self.bootstrap, base_seed, tree_params, n_samples
)
for i in range(n_new)
)
# Collect results
for tree, indices in results:
self.estimators_.append(tree)
self._bootstrap_indices.append(indices)
# Compute OOB score if requested
if self.oob_score:
self._compute_oob_score(X, y_encoded)
# Compute feature importances
self._compute_feature_importances()
return self
def _compute_oob_score(self, X: np.ndarray, y: np.ndarray) -> None:
"""Compute out-of-bag score."""
n_samples = X.shape[0]
oob_decision_function = np.zeros((n_samples, self.n_classes_))
n_oob_predictions = np.zeros(n_samples)
for tree, indices in zip(self.estimators_, self._bootstrap_indices):
# Find samples not in bootstrap
unsampled_mask = np.ones(n_samples, dtype=bool)
unsampled_mask[indices] = False
unsampled_indices = np.where(unsampled_mask)[0]
if len(unsampled_indices) == 0:
continue
# Get OOB predictions
proba = tree.predict_proba(X[unsampled_indices])
oob_decision_function[unsampled_indices] += proba
n_oob_predictions[unsampled_indices] += 1
# Normalize
valid_mask = n_oob_predictions > 0
if np.sum(valid_mask) == 0:
self.oob_score_ = 0.0
self.oob_decision_function_ = oob_decision_function
return
oob_decision_function[valid_mask] /= n_oob_predictions[valid_mask, np.newaxis]
self.oob_decision_function_ = oob_decision_function
# Compute accuracy
oob_predictions = np.argmax(oob_decision_function[valid_mask], axis=1)
self.oob_score_ = np.mean(oob_predictions == y[valid_mask])
def _compute_feature_importances(self) -> None:
"""Compute feature importances from all trees."""
importances = np.zeros(self.n_features_in_)
for tree in self.estimators_:
importances += tree.feature_importances_
# Normalize
importances = importances / len(self.estimators_)
total = np.sum(importances)
if total > 0:
importances = importances / total
self.feature_importances_ = importances
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict class labels for samples in X.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to predict.
Returns
-------
y_pred : ndarray of shape (n_samples,)
Predicted class labels.
"""
proba = self.predict_proba(X)
return self.classes_[np.argmax(proba, axis=1)]
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Predict class probabilities for samples in X.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to predict.
Returns
-------
proba : ndarray of shape (n_samples, n_classes)
Class probabilities.
"""
check_is_fitted(self, ["estimators_", "classes_"])
X = check_array(X)
n_samples = X.shape[0]
proba = np.zeros((n_samples, self.n_classes_))
for tree in self.estimators_:
tree_proba = tree.predict_proba(X)
proba += tree_proba
proba /= len(self.estimators_)
return proba
def apply(self, X: np.ndarray) -> np.ndarray:
"""Apply trees to X, return leaf indices.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input samples.
Returns
-------
X_leaves : ndarray of shape (n_samples, n_estimators)
Leaf indices for each sample in each tree.
"""
check_is_fitted(self, ["estimators_"])
X = check_array(X)
n_samples = X.shape[0]
leaves = np.zeros((n_samples, len(self.estimators_)), dtype=np.int64)
for i, tree in enumerate(self.estimators_):
leaves[:, i] = tree.apply(X)
return leaves
def decision_path(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Return decision path through the forest.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input samples.
Returns
-------
indicator : list of ndarrays
Decision path indicators for each tree.
n_nodes_ptr : ndarray of shape (n_estimators + 1,)
Cumulative node count for each tree.
"""
check_is_fitted(self, ["estimators_"])
X = check_array(X)
indicators = []
n_nodes = [0]
for tree in self.estimators_:
indicator = tree.decision_path(X)
indicators.append(indicator)
n_nodes.append(n_nodes[-1] + tree.n_nodes_)
return indicators, np.array(n_nodes)
@property
def n_estimators_(self) -> int:
"""Number of fitted estimators."""
return len(self.estimators_) if hasattr(self, "estimators_") else 0
[docs]
class ObliqueRandomForestRegressor(BaseEstimator, RegressorMixin):
"""Oblique Random Forest for regression.
Same as ObliqueRandomForestClassifier but for continuous targets.
Uses variance reduction (MSE) as the splitting criterion.
Parameters
----------
n_estimators : int, default=100
Number of trees in the forest.
oblique_method : str, default='ridge'
Method for finding oblique split directions:
- 'ridge': Ridge regression (recommended)
- 'pca': Principal Component Analysis
- 'random': Random projections (fastest)
- 'householder': Householder reflections
Note: 'lda' and 'svm' fall back to 'ridge' for regression.
criterion : str, default='squared_error'
Splitting criterion:
- 'squared_error': Mean squared error (variance reduction)
- 'absolute_error': Mean absolute error
max_depth : int, default=None
Maximum depth of each tree.
min_samples_split : int or float, default=2
Minimum samples required to split a node.
min_samples_leaf : int or float, default=1
Minimum samples required at a leaf.
max_features : int, float, str, or None, default='sqrt'
Features to consider per split.
max_leaf_nodes : int, default=None
Maximum leaf nodes per tree.
min_impurity_decrease : float, default=0.0
Minimum impurity decrease for split.
bootstrap : bool, default=True
Whether to use bootstrap sampling.
oob_score : bool, default=False
Whether to compute out-of-bag R² score.
n_jobs : int, default=None
Number of parallel jobs.
random_state : int, RandomState, or None, default=None
Random seed.
verbose : int, default=0
Verbosity level.
warm_start : bool, default=False
If True, reuse previous fit and add more trees.
feature_combinations : int, default=2
Features per random combination.
ridge_alpha : float, default=1.0
Ridge regularization strength.
Attributes
----------
estimators_ : list of ObliqueDecisionTreeRegressor
The fitted tree estimators.
n_features_in_ : int
Number of features seen during fit.
feature_importances_ : ndarray of shape (n_features_in_,)
Impurity-based feature importances.
oob_score_ : float
Out-of-bag R² score (if oob_score=True).
oob_prediction_ : ndarray of shape (n_samples,)
OOB predictions (if oob_score=True).
Examples
--------
>>> from endgame.models.trees import ObliqueRandomForestRegressor
>>> from sklearn.datasets import make_regression
>>> X, y = make_regression(n_samples=1000, n_features=10, random_state=42)
>>> reg = ObliqueRandomForestRegressor(n_estimators=100, random_state=42)
>>> reg.fit(X, y)
>>> print(reg.score(X, y))
"""
_estimator_type = "regressor"
def __init__(
self,
n_estimators: int = 100,
oblique_method: str = "ridge",
criterion: str = "squared_error",
max_depth: int | None = None,
min_samples_split: int | float = 2,
min_samples_leaf: int | float = 1,
max_features: int | float | str | None = "sqrt",
max_leaf_nodes: int | None = None,
min_impurity_decrease: float = 0.0,
bootstrap: bool = True,
oob_score: bool = False,
n_jobs: int | None = None,
random_state: int | np.random.RandomState | None = None,
verbose: int = 0,
warm_start: bool = False,
feature_combinations: int = 2,
ridge_alpha: float = 1.0,
):
self.n_estimators = n_estimators
self.oblique_method = oblique_method
self.criterion = criterion
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.max_features = max_features
self.max_leaf_nodes = max_leaf_nodes
self.min_impurity_decrease = min_impurity_decrease
self.bootstrap = bootstrap
self.oob_score = oob_score
self.n_jobs = n_jobs
self.random_state = random_state
self.verbose = verbose
self.warm_start = warm_start
self.feature_combinations = feature_combinations
self.ridge_alpha = ridge_alpha
[docs]
def fit(
self,
X: np.ndarray,
y: np.ndarray,
sample_weight: np.ndarray | None = None,
) -> ObliqueRandomForestRegressor:
"""Build an oblique random forest from the training data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : array-like of shape (n_samples,)
Target values.
sample_weight : array-like of shape (n_samples,), default=None
Sample weights.
Returns
-------
self : object
Fitted estimator.
"""
X, y = check_X_y(X, y)
y = y.astype(np.float64)
n_samples, n_features = X.shape
self.n_features_in_ = n_features
# Tree parameters
tree_params = {
"oblique_method": self.oblique_method,
"criterion": self.criterion,
"max_depth": self.max_depth,
"min_samples_split": self.min_samples_split,
"min_samples_leaf": self.min_samples_leaf,
"max_features": self.max_features,
"min_impurity_decrease": self.min_impurity_decrease,
"ridge_alpha": self.ridge_alpha,
"feature_combinations": self.feature_combinations,
}
# Random state
random_state = check_random_state(self.random_state)
base_seed = random_state.randint(2**31)
# Handle warm start
if self.warm_start and hasattr(self, "estimators_"):
n_existing = len(self.estimators_)
if n_existing >= self.n_estimators:
return self
n_new = self.n_estimators - n_existing
else:
self.estimators_ = []
self._bootstrap_indices = []
n_existing = 0
n_new = self.n_estimators
# Fit trees in parallel
if self.verbose > 0:
print(f"Fitting {n_new} oblique trees...")
results = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)(
delayed(_fit_single_tree_regressor)(
i + n_existing, X, y, sample_weight,
self.bootstrap, base_seed, tree_params, n_samples
)
for i in range(n_new)
)
# Collect results
for tree, indices in results:
self.estimators_.append(tree)
self._bootstrap_indices.append(indices)
# Compute OOB score if requested
if self.oob_score:
self._compute_oob_score(X, y)
# Compute feature importances
self._compute_feature_importances()
return self
def _compute_oob_score(self, X: np.ndarray, y: np.ndarray) -> None:
"""Compute out-of-bag R² score."""
n_samples = X.shape[0]
oob_prediction = np.zeros(n_samples)
n_oob_predictions = np.zeros(n_samples)
for tree, indices in zip(self.estimators_, self._bootstrap_indices):
unsampled_mask = np.ones(n_samples, dtype=bool)
unsampled_mask[indices] = False
unsampled_indices = np.where(unsampled_mask)[0]
if len(unsampled_indices) == 0:
continue
predictions = tree.predict(X[unsampled_indices])
oob_prediction[unsampled_indices] += predictions
n_oob_predictions[unsampled_indices] += 1
# Normalize
valid_mask = n_oob_predictions > 0
if np.sum(valid_mask) == 0:
self.oob_score_ = 0.0
self.oob_prediction_ = oob_prediction
return
oob_prediction[valid_mask] /= n_oob_predictions[valid_mask]
self.oob_prediction_ = oob_prediction
# Compute R²
y_valid = y[valid_mask]
pred_valid = oob_prediction[valid_mask]
ss_res = np.sum((y_valid - pred_valid) ** 2)
ss_tot = np.sum((y_valid - np.mean(y_valid)) ** 2)
self.oob_score_ = 1 - ss_res / ss_tot if ss_tot > 0 else 0.0
def _compute_feature_importances(self) -> None:
"""Compute feature importances from all trees."""
importances = np.zeros(self.n_features_in_)
for tree in self.estimators_:
importances += tree.feature_importances_
importances = importances / len(self.estimators_)
total = np.sum(importances)
if total > 0:
importances = importances / total
self.feature_importances_ = importances
[docs]
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict target values for samples in X.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to predict.
Returns
-------
y_pred : ndarray of shape (n_samples,)
Predicted values.
"""
check_is_fitted(self, ["estimators_"])
X = check_array(X)
predictions = np.zeros(X.shape[0])
for tree in self.estimators_:
predictions += tree.predict(X)
predictions /= len(self.estimators_)
return predictions
[docs]
def apply(self, X: np.ndarray) -> np.ndarray:
"""Apply trees to X, return leaf indices.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input samples.
Returns
-------
X_leaves : ndarray of shape (n_samples, n_estimators)
Leaf indices for each sample in each tree.
"""
check_is_fitted(self, ["estimators_"])
X = check_array(X)
n_samples = X.shape[0]
leaves = np.zeros((n_samples, len(self.estimators_)), dtype=np.int64)
for i, tree in enumerate(self.estimators_):
leaves[:, i] = tree.apply(X)
return leaves
@property
def n_estimators_(self) -> int:
"""Number of fitted estimators."""
return len(self.estimators_) if hasattr(self, "estimators_") else 0