from __future__ import annotations
"""Stacking Ensemble: Multi-level model stacking."""
from typing import Any
import numpy as np
from sklearn.base import BaseEstimator, clone
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.model_selection import KFold, StratifiedKFold, cross_val_predict
from endgame.core.base import BaseEnsemble
[docs]
class StackingEnsemble(BaseEnsemble):
"""Multi-level stacking with out-of-fold prediction handling.
Level 1: Diverse base models (GBDTs, NNs, etc.)
Level 2: Meta-learner (typically Ridge/Linear Regression)
The meta-learner is trained on out-of-fold predictions from Level 1
to prevent overfitting.
Parameters
----------
base_estimators : List[estimator]
Level 1 models.
meta_estimator : estimator, optional
Level 2 model. Default: Ridge for regression, LogisticRegression for classification.
cv : int or CV splitter, default=5
Cross-validation strategy for OOF predictions.
passthrough : bool, default=False
Whether to include original features in Level 2.
use_proba : bool, default=True
Use predict_proba for classification (if available).
stack_method : str, default='auto'
Method for stacking: 'auto', 'predict', 'predict_proba'.
random_state : int, optional
Random seed.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
base_estimators_ : List[estimator]
Fitted Level 1 models.
meta_estimator_ : estimator
Fitted Level 2 model.
oof_predictions_ : ndarray
Out-of-fold predictions used for meta-learner training.
Examples
--------
>>> from endgame.ensemble import StackingEnsemble
>>> base_models = [LGBMWrapper(), XGBWrapper(), CatBoostWrapper()]
>>> stacker = StackingEnsemble(base_estimators=base_models)
>>> stacker.fit(X_train, y_train)
>>> predictions = stacker.predict(X_test)
"""
def __init__(
self,
base_estimators: list[BaseEstimator] | None = None,
meta_estimator: BaseEstimator | None = None,
cv: int | Any = 5,
passthrough: bool = False,
use_proba: bool = True,
stack_method: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
super().__init__(
estimators=base_estimators,
random_state=random_state,
verbose=verbose,
)
self.base_estimators = base_estimators or []
self.meta_estimator = meta_estimator
self.cv = cv
self.passthrough = passthrough
self.use_proba = use_proba
self.stack_method = stack_method
self.base_estimators_: list[BaseEstimator] = []
self.meta_estimator_: BaseEstimator | None = None
self.oof_predictions_: np.ndarray | None = None
self._is_classifier: bool = False
self._n_features_in: int = 0
def _get_default_meta_estimator(self) -> BaseEstimator:
"""Get default meta-estimator based on task."""
if self._is_classifier:
return LogisticRegression(
C=1.0,
max_iter=1000,
random_state=self.random_state,
)
return Ridge(alpha=1.0, random_state=self.random_state)
def _get_stack_method(self, estimator: BaseEstimator) -> str:
"""Determine stacking method for an estimator."""
if self.stack_method != "auto":
return self.stack_method
if self._is_classifier and self.use_proba and hasattr(estimator, "predict_proba"):
return "predict_proba"
return "predict"
def _get_cv_splitter(self, y: np.ndarray) -> Any:
"""Get cross-validation splitter."""
if isinstance(self.cv, int):
if self._is_classifier:
return StratifiedKFold(
n_splits=self.cv,
shuffle=True,
random_state=self.random_state,
)
return KFold(
n_splits=self.cv,
shuffle=True,
random_state=self.random_state,
)
return self.cv
[docs]
def fit(
self,
X,
y,
sample_weight: np.ndarray | None = None,
**fit_params,
) -> StackingEnsemble:
"""Fit the stacking ensemble.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : array-like of shape (n_samples,)
Target values.
sample_weight : array-like, optional
Sample weights.
**fit_params
Additional parameters.
Returns
-------
self
"""
X = np.asarray(X)
y = np.asarray(y)
self._n_features_in = X.shape[1]
self._is_classifier = len(np.unique(y)) <= 20
if self._is_classifier:
self.classes_ = np.unique(y)
self.n_classes_ = len(self.classes_)
cv = self._get_cv_splitter(y)
self._log(f"Fitting {len(self.base_estimators)} base estimators...")
# Generate OOF predictions for each base estimator
oof_list = []
for i, estimator in enumerate(self.base_estimators):
self._log(f" Fitting base estimator {i + 1}/{len(self.base_estimators)}")
method = self._get_stack_method(estimator)
try:
if method == "predict_proba":
oof_pred = cross_val_predict(
estimator,
X,
y,
cv=cv,
method="predict_proba",
)
# For binary classification, use probability of positive class
if oof_pred.ndim == 2 and oof_pred.shape[1] == 2:
oof_pred = oof_pred[:, 1]
else:
oof_pred = cross_val_predict(
estimator,
X,
y,
cv=cv,
method="predict",
)
except Exception as e:
self._log(f" Warning: {e}, falling back to predict", level="warn")
oof_pred = cross_val_predict(
estimator,
X,
y,
cv=cv,
method="predict",
)
if oof_pred.ndim == 1:
oof_pred = oof_pred.reshape(-1, 1)
oof_list.append(oof_pred)
# Stack OOF predictions
self.oof_predictions_ = np.hstack(oof_list)
# Add original features if passthrough
if self.passthrough:
meta_features = np.hstack([self.oof_predictions_, X])
else:
meta_features = self.oof_predictions_
# Fit meta-estimator
self._log("Fitting meta-estimator...")
self.meta_estimator_ = self.meta_estimator or self._get_default_meta_estimator()
self.meta_estimator_ = clone(self.meta_estimator_)
if sample_weight is not None:
self.meta_estimator_.fit(meta_features, y, sample_weight=sample_weight)
else:
self.meta_estimator_.fit(meta_features, y)
# Fit base estimators on full data for prediction
self._log("Fitting base estimators on full data...")
self.base_estimators_ = []
for i, estimator in enumerate(self.base_estimators):
fitted = clone(estimator)
if sample_weight is not None:
fitted.fit(X, y, sample_weight=sample_weight)
else:
fitted.fit(X, y)
self.base_estimators_.append(fitted)
self._is_fitted = True
return self
def _get_base_predictions(self, X: np.ndarray) -> np.ndarray:
"""Get predictions from base estimators."""
pred_list = []
for estimator in self.base_estimators_:
method = self._get_stack_method(estimator)
if method == "predict_proba" and hasattr(estimator, "predict_proba"):
pred = estimator.predict_proba(X)
if pred.ndim == 2 and pred.shape[1] == 2:
pred = pred[:, 1]
else:
pred = estimator.predict(X)
if pred.ndim == 1:
pred = pred.reshape(-1, 1)
pred_list.append(pred)
return np.hstack(pred_list)
[docs]
def predict(self, X) -> np.ndarray:
"""Predict using the stacking ensemble.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to predict.
Returns
-------
ndarray
Predictions.
"""
self._check_is_fitted()
X = np.asarray(X)
# Get base predictions
base_predictions = self._get_base_predictions(X)
# Add original features if passthrough
if self.passthrough:
meta_features = np.hstack([base_predictions, X])
else:
meta_features = base_predictions
return self.meta_estimator_.predict(meta_features)
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Predict class probabilities.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to predict.
Returns
-------
ndarray of shape (n_samples, n_classes)
Class probabilities.
"""
self._check_is_fitted()
if not self._is_classifier:
raise ValueError("predict_proba only available for classification")
X = np.asarray(X)
# Get base predictions
base_predictions = self._get_base_predictions(X)
# Add original features if passthrough
if self.passthrough:
meta_features = np.hstack([base_predictions, X])
else:
meta_features = base_predictions
if hasattr(self.meta_estimator_, "predict_proba"):
return self.meta_estimator_.predict_proba(meta_features)
# Fall back to decision function if available
if hasattr(self.meta_estimator_, "decision_function"):
decision = self.meta_estimator_.decision_function(meta_features)
# Convert to probabilities using sigmoid
proba = 1 / (1 + np.exp(-decision))
if proba.ndim == 1:
return np.vstack([1 - proba, proba]).T
return proba
raise ValueError("Meta-estimator doesn't support probability predictions")
[docs]
def score(self, X, y, sample_weight=None) -> float:
"""Return the mean accuracy on the given test data and labels.
For classification, this is the accuracy score. For regression,
this is the R^2 score.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
y : array-like of shape (n_samples,)
True labels for classification, true values for regression.
sample_weight : array-like of shape (n_samples,), optional
Sample weights.
Returns
-------
float
Score of the predictions.
"""
self._check_is_fitted()
if self._is_classifier:
from sklearn.metrics import accuracy_score
y_pred = self.predict(X)
return accuracy_score(y, y_pred, sample_weight=sample_weight)
else:
from sklearn.metrics import r2_score
y_pred = self.predict(X)
return r2_score(y, y_pred, sample_weight=sample_weight)