Source code for endgame.ensemble.blending

from __future__ import annotations

"""Blending methods for ensemble combination."""

from collections.abc import Callable
from typing import Any

import numpy as np
from scipy import stats
from sklearn.base import BaseEstimator, ClassifierMixin, clone
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.model_selection import train_test_split

from endgame.core.base import BaseEnsemble


[docs] class BlendingEnsemble(BaseEnsemble, ClassifierMixin): """Blending Ensemble using hold-out set for meta-learner training. Unlike stacking which uses cross-validation, blending uses a hold-out portion of the training data to generate meta-features for the second-level learner. Parameters ---------- base_estimators : List[estimator] Level 1 models. meta_estimator : estimator, optional Level 2 model. Default: LogisticRegression for classification. blend_fraction : float, default=0.2 Fraction of training data to use for blending (meta-learner training). use_proba : bool, default=True Use predict_proba for classification (if available). passthrough : bool, default=False Whether to include original features in Level 2. cv : int, optional Ignored. For API compatibility with StackingEnsemble. random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Attributes ---------- base_estimators_ : List[estimator] Fitted Level 1 models. meta_estimator_ : estimator Fitted Level 2 model. classes_ : ndarray Unique class labels (for classification). Examples -------- >>> from endgame.ensemble import BlendingEnsemble >>> base_models = [RandomForestClassifier(), GradientBoostingClassifier()] >>> blender = BlendingEnsemble(base_estimators=base_models) >>> blender.fit(X_train, y_train) >>> predictions = blender.predict(X_test) """ def __init__( self, base_estimators: list[BaseEstimator] | None = None, meta_estimator: BaseEstimator | None = None, blend_fraction: float = 0.2, use_proba: bool = True, passthrough: bool = False, cv: int | None = None, # Ignored, for compatibility random_state: int | None = None, verbose: bool = False, ): super().__init__( estimators=base_estimators, random_state=random_state, verbose=verbose, ) self.base_estimators = base_estimators or [] self.meta_estimator = meta_estimator self.blend_fraction = blend_fraction self.use_proba = use_proba self.passthrough = passthrough self.cv = cv # Ignored self.base_estimators_: list[BaseEstimator] = [] self.meta_estimator_: BaseEstimator | None = None self._is_classifier: bool = True self._n_features_in: int = 0
[docs] def fit( self, X, y, sample_weight: np.ndarray | None = None, **fit_params, ) -> BlendingEnsemble: """Fit the blending ensemble. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data. y : array-like of shape (n_samples,) Target values. sample_weight : array-like, optional Sample weights. Returns ------- self """ X = np.asarray(X) y = np.asarray(y) self._n_features_in = X.shape[1] self._is_classifier = len(np.unique(y)) <= 20 if self._is_classifier: self.classes_ = np.unique(y) self.n_classes_ = len(self.classes_) # Split into train and blend sets if sample_weight is not None: X_train, X_blend, y_train, y_blend, w_train, w_blend = train_test_split( X, y, sample_weight, test_size=self.blend_fraction, random_state=self.random_state, stratify=y if self._is_classifier else None, ) else: X_train, X_blend, y_train, y_blend = train_test_split( X, y, test_size=self.blend_fraction, random_state=self.random_state, stratify=y if self._is_classifier else None, ) w_train = w_blend = None self._log(f"Training {len(self.base_estimators)} base estimators...") self._log(f"Train set: {len(X_train)}, Blend set: {len(X_blend)}") # Fit base estimators on training portion self.base_estimators_ = [] blend_predictions = [] for i, estimator in enumerate(self.base_estimators): self._log(f" Fitting base estimator {i + 1}/{len(self.base_estimators)}") fitted = clone(estimator) if w_train is not None: fitted.fit(X_train, y_train, sample_weight=w_train) else: fitted.fit(X_train, y_train) self.base_estimators_.append(fitted) # Get predictions on blend set if self._is_classifier and self.use_proba and hasattr(fitted, "predict_proba"): pred = fitted.predict_proba(X_blend) if pred.ndim == 2 and pred.shape[1] == 2: pred = pred[:, 1:2] else: pred = fitted.predict(X_blend) if pred.ndim == 1: pred = pred.reshape(-1, 1) blend_predictions.append(pred) # Stack blend predictions meta_features = np.hstack(blend_predictions) if self.passthrough: meta_features = np.hstack([meta_features, X_blend]) # Fit meta-estimator self._log("Fitting meta-estimator...") if self.meta_estimator is None: if self._is_classifier: self.meta_estimator_ = LogisticRegression( C=1.0, max_iter=1000, random_state=self.random_state ) else: self.meta_estimator_ = Ridge(alpha=1.0, random_state=self.random_state) else: self.meta_estimator_ = clone(self.meta_estimator) if w_blend is not None: self.meta_estimator_.fit(meta_features, y_blend, sample_weight=w_blend) else: self.meta_estimator_.fit(meta_features, y_blend) # Refit base estimators on full data for better test predictions self._log("Refitting base estimators on full data...") self.base_estimators_ = [] for i, estimator in enumerate(self.base_estimators): fitted = clone(estimator) if sample_weight is not None: fitted.fit(X, y, sample_weight=sample_weight) else: fitted.fit(X, y) self.base_estimators_.append(fitted) self._is_fitted = True return self
def _get_meta_features(self, X: np.ndarray) -> np.ndarray: """Get meta-features from base estimator predictions.""" predictions = [] for fitted in self.base_estimators_: if self._is_classifier and self.use_proba and hasattr(fitted, "predict_proba"): pred = fitted.predict_proba(X) if pred.ndim == 2 and pred.shape[1] == 2: pred = pred[:, 1:2] else: pred = fitted.predict(X) if pred.ndim == 1: pred = pred.reshape(-1, 1) predictions.append(pred) meta_features = np.hstack(predictions) if self.passthrough: meta_features = np.hstack([meta_features, X]) return meta_features
[docs] def predict(self, X) -> np.ndarray: """Predict using the blending ensemble. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray Predictions. """ self._check_is_fitted() X = np.asarray(X) meta_features = self._get_meta_features(X) return self.meta_estimator_.predict(meta_features)
[docs] def predict_proba(self, X) -> np.ndarray: """Predict class probabilities. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples, n_classes) Class probabilities. """ self._check_is_fitted() if not self._is_classifier: raise ValueError("predict_proba only available for classification") X = np.asarray(X) meta_features = self._get_meta_features(X) if hasattr(self.meta_estimator_, "predict_proba"): return self.meta_estimator_.predict_proba(meta_features) if hasattr(self.meta_estimator_, "decision_function"): decision = self.meta_estimator_.decision_function(meta_features) proba = 1 / (1 + np.exp(-decision)) if proba.ndim == 1: return np.vstack([1 - proba, proba]).T return proba raise ValueError("Meta-estimator doesn't support probability predictions")
[docs] def score(self, X, y, sample_weight=None) -> float: """Return accuracy score on the given data. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. y : array-like of shape (n_samples,) True labels. sample_weight : array-like, optional Sample weights. Returns ------- float Accuracy score. """ from sklearn.metrics import accuracy_score y_pred = self.predict(X) return accuracy_score(y, y_pred, sample_weight=sample_weight)
[docs] class OptimizedBlender(BaseEnsemble): """Optuna-powered blend weight optimization. Uses Bayesian optimization to find optimal weights for combining model predictions. Parameters ---------- metric : str or callable Metric to optimize: 'roc_auc', 'rmse', 'mae', etc. n_trials : int, default=100 Number of optimization trials. weight_bounds : Tuple[float, float], default=(0, 1) Bounds for individual model weights. normalize : bool, default=True Whether weights must sum to 1. maximize : bool, default=True Whether to maximize or minimize the metric. random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Attributes ---------- weights_ : Dict[int, float] Optimized model weights. best_score_ : float Best score achieved. study_ : optuna.Study Optuna study object for further analysis. Examples -------- >>> blender = OptimizedBlender(metric='roc_auc', n_trials=100) >>> blender.fit(oof_predictions, y_train) >>> final_pred = blender.predict(test_predictions) """ def __init__( self, metric: str | Callable = "roc_auc", n_trials: int = 100, weight_bounds: tuple[float, float] = (0, 1), normalize: bool = True, maximize: bool = True, random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.metric = metric self.n_trials = n_trials self.weight_bounds = weight_bounds self.normalize = normalize self.maximize = maximize self.best_score_: float = 0.0 self.study_: Any | None = None def _get_metric_func(self) -> Callable: """Get the metric function.""" if callable(self.metric): return self.metric from sklearn.metrics import ( log_loss, mean_absolute_error, mean_squared_error, r2_score, roc_auc_score, ) metric_map = { "roc_auc": roc_auc_score, "log_loss": log_loss, "rmse": lambda y, p: np.sqrt(mean_squared_error(y, p)), "mse": mean_squared_error, "mae": mean_absolute_error, "r2": r2_score, } if self.metric.lower() not in metric_map: raise ValueError(f"Unknown metric: {self.metric}") return metric_map[self.metric.lower()]
[docs] def fit( self, predictions: list[np.ndarray], y_true: np.ndarray, ) -> OptimizedBlender: """Optimize blend weights using Optuna. Parameters ---------- predictions : List of arrays Out-of-fold predictions from each model. y_true : array-like True target values. Returns ------- self """ try: import optuna except ImportError: raise ImportError("Optuna is required for OptimizedBlender. Install with: pip install optuna") predictions = self._validate_predictions(predictions, y_true) y_true = np.asarray(y_true) n_models = len(predictions) metric_func = self._get_metric_func() # Determine optimization direction direction = "maximize" if self.maximize else "minimize" # Suppress Optuna logging if not verbose if not self.verbose: optuna.logging.set_verbosity(optuna.logging.WARNING) def objective(trial: optuna.Trial) -> float: # Sample weights weights = [] for i in range(n_models): w = trial.suggest_float( f"weight_{i}", self.weight_bounds[0], self.weight_bounds[1], ) weights.append(w) # Normalize if requested if self.normalize: total = sum(weights) if total > 0: weights = [w / total for w in weights] else: weights = [1.0 / n_models] * n_models # Compute blended prediction blended = np.zeros_like(predictions[0], dtype=np.float64) for w, pred in zip(weights, predictions): blended += w * pred # Evaluate try: score = metric_func(y_true, blended) except Exception: return float("-inf") if self.maximize else float("inf") return score # Create and run study self.study_ = optuna.create_study( direction=direction, sampler=optuna.samplers.TPESampler(seed=self.random_state), ) self.study_.optimize( objective, n_trials=self.n_trials, show_progress_bar=self.verbose, ) # Extract best weights best_params = self.study_.best_params weights = [best_params[f"weight_{i}"] for i in range(n_models)] if self.normalize: total = sum(weights) if total > 0: weights = [w / total for w in weights] self.weights_ = {i: w for i, w in enumerate(weights)} self.best_score_ = self.study_.best_value self._log(f"Best score: {self.best_score_:.4f}") self._log(f"Best weights: {self.weights_}") self._is_fitted = True return self
[docs] def predict(self, predictions: list[np.ndarray]) -> np.ndarray: """Apply optimized weights. Parameters ---------- predictions : List of arrays Predictions from each model. Returns ------- ndarray Blended prediction. """ self._check_is_fitted() predictions = self._validate_predictions(predictions) return self._weighted_average(predictions, self.weights_)
[docs] class RankAverageBlender(BaseEnsemble): """Rank-based blending for submissions. Converts predictions to ranks before averaging. Robust to different prediction scales across models. Parameters ---------- method : str, default='average' Rank method: 'average', 'min', 'max', 'dense', 'ordinal'. normalize : bool, default=True Whether to normalize ranks to [0, 1]. weights : Dict[int, float], optional Optional model weights. If None, uniform weights. Examples -------- >>> blender = RankAverageBlender() >>> final_pred = blender.blend(test_predictions) """ def __init__( self, method: str = "average", normalize: bool = True, weights: dict[int, float] | None = None, random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.method = method self.normalize = normalize self.weights = weights
[docs] def fit( self, predictions: list[np.ndarray] | None = None, y_true: np.ndarray | None = None, ) -> RankAverageBlender: """Fit the blender (stores weights if provided). Parameters ---------- predictions : ignored y_true : ignored Returns ------- self """ if self.weights is not None: self.weights_ = self.weights.copy() else: self.weights_ = None self._is_fitted = True return self
[docs] def blend(self, predictions: list[np.ndarray]) -> np.ndarray: """Blend predictions using rank averaging. Parameters ---------- predictions : List of arrays Predictions from each model. Returns ------- ndarray Rank-averaged prediction. """ predictions = self._validate_predictions(predictions) n_models = len(predictions) # Convert to ranks ranked_predictions = [] for pred in predictions: ranks = stats.rankdata(pred, method=self.method) if self.normalize: ranks = ranks / len(ranks) ranked_predictions.append(ranks) # Weighted average of ranks if self.weights_ is None: weights = {i: 1.0 / n_models for i in range(n_models)} else: weights = self.weights_ return self._weighted_average(ranked_predictions, weights)
[docs] def predict(self, predictions: list[np.ndarray]) -> np.ndarray: """Alias for blend().""" return self.blend(predictions)
[docs] class PowerBlender(BaseEnsemble): """Power-weighted blending based on individual scores. Weights models by their validation scores raised to a power. Higher power = more weight to best models. Parameters ---------- scores : List[float] Validation scores for each model. power : float, default=2.0 Power to raise scores to (higher = more aggressive weighting). higher_is_better : bool, default=True Whether higher scores are better. Examples -------- >>> scores = [0.85, 0.87, 0.86] >>> blender = PowerBlender(scores=scores, power=3.0) >>> final_pred = blender.predict(test_predictions) """ def __init__( self, scores: list[float] | None = None, power: float = 2.0, higher_is_better: bool = True, random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.scores = scores self.power = power self.higher_is_better = higher_is_better
[docs] def fit( self, predictions: list[np.ndarray] | None = None, y_true: np.ndarray | None = None, scores: list[float] | None = None, ) -> PowerBlender: """Compute power-weighted blending weights. Parameters ---------- predictions : ignored y_true : ignored scores : List[float], optional Model scores (overrides constructor scores). Returns ------- self """ if scores is not None: self.scores = scores if self.scores is None: raise ValueError("scores must be provided") scores_arr = np.array(self.scores) # Invert if lower is better if not self.higher_is_better: scores_arr = 1.0 / (scores_arr + 1e-10) # Apply power weights = scores_arr ** self.power # Normalize weights = weights / weights.sum() self.weights_ = {i: w for i, w in enumerate(weights)} self._log(f"Power weights: {self.weights_}") self._is_fitted = True return self
[docs] def predict(self, predictions: list[np.ndarray]) -> np.ndarray: """Apply power weights. Parameters ---------- predictions : List of arrays Predictions from each model. Returns ------- ndarray Power-weighted prediction. """ self._check_is_fitted() predictions = self._validate_predictions(predictions) return self._weighted_average(predictions, self.weights_)