"""Super Learner: Cross-validated optimal ensemble combination.
The Super Learner (van der Laan et al., 2007) is an oracle-optimal
ensemble that uses cross-validation to find the best convex
combination of base learners. It minimizes cross-validated risk under
a non-negative least-squares (NNLS) constraint, guaranteeing the
ensemble is asymptotically at least as good as the best single model.
This is the gold-standard ensemble method in biostatistics and is
widely used in Kaggle competitions.
Example
-------
>>> from endgame.ensemble import SuperLearner
>>> from sklearn.ensemble import RandomForestClassifier
>>> from sklearn.linear_model import LogisticRegression
>>> import xgboost as xgb
>>>
>>> sl = SuperLearner(
... base_estimators=[
... ("rf", RandomForestClassifier(n_estimators=100)),
... ("lr", LogisticRegression()),
... ("xgb", xgb.XGBClassifier()),
... ],
... meta_learner="nnls", # non-negative least squares
... cv=5,
... )
>>> sl.fit(X_train, y_train)
>>> sl.predict(X_test)
"""
from __future__ import annotations
from typing import Any
import numpy as np
from sklearn.base import BaseEstimator, clone
from sklearn.model_selection import KFold, StratifiedKFold, cross_val_predict
[docs]
class SuperLearner(BaseEstimator):
"""Cross-validated Super Learner ensemble.
Parameters
----------
base_estimators : list of (str, estimator) tuples
Named base learners to combine.
meta_learner : {'nnls', 'ridge', 'best'} or estimator, default='nnls'
How to combine OOF predictions:
- ``'nnls'``: Non-negative least squares (convex combination).
- ``'ridge'``: Ridge regression on OOF predictions.
- ``'best'``: Use the single best base learner (no blending).
- An sklearn estimator for custom meta-learning.
cv : int or CV splitter, default=5
Cross-validation strategy for OOF predictions.
use_proba : bool, default=True
Use ``predict_proba`` for classifiers (if available).
include_original_features : bool, default=False
Pass original features to the meta-learner alongside OOF predictions.
random_state : int or None, default=None
verbose : bool, default=False
Attributes
----------
coef_ : ndarray
Meta-learner weights (one per base estimator).
base_estimators_ : list of estimator
Fitted base estimators (on full training data).
oof_predictions_ : ndarray
Out-of-fold predictions used for meta-learning.
cv_scores_ : dict of {name: float}
Per-estimator cross-validated score.
is_classifier_ : bool
References
----------
van der Laan, M.J., Polley, E.C. & Hubbard, A.E. (2007).
Super Learner. *Statistical Applications in Genetics and
Molecular Biology*, 6(1).
"""
def __init__(
self,
base_estimators: list[tuple[str, BaseEstimator]],
meta_learner: str | BaseEstimator = "nnls",
cv: int | Any = 5,
use_proba: bool = True,
include_original_features: bool = False,
random_state: int | None = None,
verbose: bool = False,
):
self.base_estimators = base_estimators
self.meta_learner = meta_learner
self.cv = cv
self.use_proba = use_proba
self.include_original_features = include_original_features
self.random_state = random_state
self.verbose = verbose
def _log(self, msg):
if self.verbose:
print(f"[SuperLearner] {msg}")
[docs]
def fit(self, X, y, sample_weight=None):
"""Fit the Super Learner.
1. Generate OOF predictions for each base estimator.
2. Solve for optimal combination weights.
3. Refit all base estimators on the full training set.
"""
X = np.asarray(X)
y = np.asarray(y)
n_samples = X.shape[0]
# Auto-detect task
self.is_classifier_ = len(np.unique(y)) <= 30 and np.issubdtype(y.dtype, np.integer) or len(np.unique(y)) <= 10
if self.is_classifier_:
self.classes_ = np.unique(y)
self.n_classes_ = len(self.classes_)
# Build CV splitter
if isinstance(self.cv, int):
if self.is_classifier_:
cv = StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state)
else:
cv = KFold(n_splits=self.cv, shuffle=True, random_state=self.random_state)
else:
cv = self.cv
# Step 1: Generate OOF predictions
self._log("Generating out-of-fold predictions...")
oof_list = []
for name, est in self.base_estimators:
self._log(f" CV for {name}...")
method = self._get_predict_method(est)
try:
oof = cross_val_predict(est, X, y, cv=cv, method=method)
except Exception:
oof = cross_val_predict(est, X, y, cv=cv, method="predict")
if oof.ndim == 2 and oof.shape[1] == 2 and self.is_classifier_:
oof = oof[:, 1] # binary: use P(positive)
if oof.ndim == 1:
oof = oof.reshape(-1, 1)
oof_list.append(oof)
self.oof_predictions_ = np.hstack(oof_list)
# Step 2: Solve for meta-learner weights
self._log("Solving for optimal weights...")
Z = self.oof_predictions_
if self.include_original_features:
Z = np.hstack([Z, X])
if self.is_classifier_ and self.n_classes_ == 2:
y_target = (y == self.classes_[1]).astype(float)
else:
y_target = y.astype(float)
self._fit_meta(Z, y_target)
# Step 3: Refit base estimators on full data
self._log("Fitting base estimators on full data...")
self.base_estimators_ = []
for name, est in self.base_estimators:
self._log(f" Fitting {name}...")
fitted = clone(est)
if sample_weight is not None:
try:
fitted.fit(X, y, sample_weight=sample_weight)
except TypeError:
fitted.fit(X, y)
else:
fitted.fit(X, y)
self.base_estimators_.append(fitted)
# CV scores for each base learner
self._compute_cv_scores(y_target)
return self
def _get_predict_method(self, est):
if self.is_classifier_ and self.use_proba and hasattr(est, "predict_proba"):
return "predict_proba"
return "predict"
def _fit_meta(self, Z, y):
"""Fit the meta-learner."""
if isinstance(self.meta_learner, str):
if self.meta_learner == "nnls":
from scipy.optimize import nnls
# Solve: min ||Z @ w - y||^2, w >= 0
coef, _ = nnls(Z, y)
total = coef.sum()
self.coef_ = coef / total if total > 0 else np.ones(Z.shape[1]) / Z.shape[1]
self._meta_fitted = None
elif self.meta_learner == "ridge":
from sklearn.linear_model import Ridge
meta = Ridge(alpha=1.0)
meta.fit(Z, y)
self.coef_ = meta.coef_
self._meta_fitted = meta
elif self.meta_learner == "best":
# Select the single best base learner
n_base = len(self.base_estimators)
scores = []
for i in range(n_base):
col = Z[:, i] if Z.shape[1] > n_base else Z[:, i:i+1].ravel()
mse = float(np.mean((col - y) ** 2))
scores.append(mse)
best = int(np.argmin(scores))
self.coef_ = np.zeros(Z.shape[1])
self.coef_[best] = 1.0
self._meta_fitted = None
else:
raise ValueError(f"Unknown meta_learner: {self.meta_learner}")
else:
# Custom estimator
meta = clone(self.meta_learner)
meta.fit(Z, y)
self.coef_ = getattr(meta, "coef_", np.ones(Z.shape[1]) / Z.shape[1])
self._meta_fitted = meta
def _compute_cv_scores(self, y_target):
self.cv_scores_ = {}
oof = self.oof_predictions_
for i, (name, _) in enumerate(self.base_estimators):
col = oof[:, i] if oof.shape[1] > i else oof[:, 0]
mse = float(np.mean((col - y_target) ** 2))
self.cv_scores_[name] = round(1 - mse / (np.var(y_target) + 1e-15), 4)
def _get_base_predictions(self, X):
"""Get predictions from fitted base estimators."""
X = np.asarray(X)
pred_list = []
for est in self.base_estimators_:
method = self._get_predict_method(est)
if method == "predict_proba" and hasattr(est, "predict_proba"):
pred = est.predict_proba(X)
if pred.ndim == 2 and pred.shape[1] == 2 and self.is_classifier_:
pred = pred[:, 1]
else:
pred = est.predict(X)
if pred.ndim == 1:
pred = pred.reshape(-1, 1)
pred_list.append(pred)
return np.hstack(pred_list)
[docs]
def predict(self, X):
X = np.asarray(X)
Z = self._get_base_predictions(X)
if self.include_original_features:
Z = np.hstack([Z, X])
if self._meta_fitted is not None:
raw = self._meta_fitted.predict(Z)
else:
raw = Z @ self.coef_
if self.is_classifier_:
if self.n_classes_ == 2:
return self.classes_[(raw >= 0.5).astype(int)]
return self.classes_[np.argmax(Z[:, :self.n_classes_], axis=1)]
return raw
[docs]
def predict_proba(self, X):
if not self.is_classifier_:
raise ValueError("predict_proba only for classification tasks.")
X = np.asarray(X)
Z = self._get_base_predictions(X)
if self.include_original_features:
Z = np.hstack([Z, X])
if self._meta_fitted is not None:
raw = self._meta_fitted.predict(Z)
else:
raw = Z @ self.coef_
if self.n_classes_ == 2:
p = np.clip(raw, 0, 1)
return np.column_stack([1 - p, p])
# Multiclass: try averaging probabilities
return Z[:, :self.n_classes_]
@property
def named_estimators(self):
return {name: est for (name, _), est in zip(self.base_estimators, self.base_estimators_)}