from __future__ import annotations
"""Hill Climbing Ensemble: Forward selection with replacement."""
from collections.abc import Callable
import numpy as np
from sklearn.metrics import (
accuracy_score,
f1_score,
log_loss,
mean_absolute_error,
mean_squared_error,
r2_score,
roc_auc_score,
)
from endgame.core.base import BaseEnsemble
from endgame.core.types import EnsembleResult
[docs]
class HillClimbingEnsemble(BaseEnsemble):
"""Forward ensemble selection with replacement.
Iteratively adds models that maximize validation metric.
Key technique for non-differentiable metrics (F1, MAP@K).
Algorithm:
1. Start with empty ensemble
2. For each iteration:
a. For each model in pool:
- Compute metric if added to current ensemble
b. Add model that provides best improvement
3. Allow repeating models (weighted averaging)
Parameters
----------
metric : str or callable, default='roc_auc'
Metric to optimize: 'roc_auc', 'log_loss', 'f1', 'accuracy',
'rmse', 'mae', 'r2', or custom callable(y_true, y_pred).
n_iterations : int, default=100
Number of hill climbing iterations.
early_stopping : int, default=20
Stop if no improvement for this many iterations.
maximize : bool, default=True
Whether to maximize or minimize the metric.
init_weights : str, default='best_single'
Initial weight strategy: 'best_single', 'uniform', 'none'.
random_state : int, optional
Random seed for tie-breaking.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
weights_ : Dict[int, float]
Optimized model weights (by model index).
best_score_ : float
Best ensemble score achieved.
selection_history_ : List[int]
Order in which models were selected.
Examples
--------
>>> from endgame.ensemble import HillClimbingEnsemble
>>> ensemble = HillClimbingEnsemble(metric='roc_auc', n_iterations=100)
>>> ensemble.fit(oof_predictions, y_train)
>>> print(f"Weights: {ensemble.weights_}")
>>> test_pred = ensemble.predict(test_predictions)
"""
def __init__(
self,
metric: str | Callable = "roc_auc",
n_iterations: int = 100,
early_stopping: int = 20,
maximize: bool = True,
init_weights: str = "best_single",
random_state: int | None = None,
verbose: bool = False,
):
super().__init__(random_state=random_state, verbose=verbose)
self.metric = metric
self.n_iterations = n_iterations
self.early_stopping = early_stopping
self.maximize = maximize
self.init_weights = init_weights
self.weights_: dict[int, float] = {}
self.best_score_: float = 0.0
self.selection_history_: list[int] = []
self._metric_func: Callable | None = None
def _get_metric_func(self) -> Callable:
"""Get the metric function."""
if callable(self.metric):
return self.metric
metric_map = {
"roc_auc": lambda y, p: roc_auc_score(y, p),
"auc": lambda y, p: roc_auc_score(y, p),
"log_loss": lambda y, p: -log_loss(y, p), # Negate for maximization
"logloss": lambda y, p: -log_loss(y, p),
"f1": lambda y, p: f1_score(y, (p >= 0.5).astype(int)),
"f1_macro": lambda y, p: f1_score(y, (p >= 0.5).astype(int), average="macro"),
"accuracy": lambda y, p: accuracy_score(y, (p >= 0.5).astype(int)),
"rmse": lambda y, p: -np.sqrt(mean_squared_error(y, p)), # Negate
"mse": lambda y, p: -mean_squared_error(y, p),
"mae": lambda y, p: -mean_absolute_error(y, p),
"r2": lambda y, p: r2_score(y, p),
}
if self.metric.lower() not in metric_map:
raise ValueError(
f"Unknown metric '{self.metric}'. "
f"Available: {list(metric_map.keys())}"
)
return metric_map[self.metric.lower()]
def _compute_ensemble_prediction(
self,
predictions: list[np.ndarray],
weights: dict[int, float],
) -> np.ndarray:
"""Compute weighted ensemble prediction."""
if not weights:
return np.zeros_like(predictions[0])
total_weight = sum(weights.values())
if total_weight == 0:
return np.zeros_like(predictions[0])
result = np.zeros_like(predictions[0], dtype=np.float64)
for idx, weight in weights.items():
result += weight * predictions[idx]
result /= total_weight
return result
[docs]
def fit(
self,
predictions: list[np.ndarray],
y_true: np.ndarray,
) -> HillClimbingEnsemble:
"""Find optimal ensemble weights via hill climbing.
Parameters
----------
predictions : List of shape (n_models, n_samples, ...)
Out-of-fold predictions from each model.
y_true : array-like
True target values.
Returns
-------
self
"""
predictions = self._validate_predictions(predictions, y_true)
y_true = np.asarray(y_true)
n_models = len(predictions)
self._metric_func = self._get_metric_func()
# Evaluate individual model scores
individual_scores = []
for i, pred in enumerate(predictions):
try:
score = self._metric_func(y_true, pred)
except Exception:
score = -np.inf if self.maximize else np.inf
individual_scores.append(score)
self._log(f"Individual model scores: {[f'{s:.4f}' for s in individual_scores]}")
# Initialize weights
if self.init_weights == "best_single":
best_idx = np.argmax(individual_scores) if self.maximize else np.argmin(individual_scores)
self.weights_ = {best_idx: 1.0}
current_score = individual_scores[best_idx]
elif self.init_weights == "uniform":
self.weights_ = {i: 1.0 / n_models for i in range(n_models)}
current_pred = self._compute_ensemble_prediction(predictions, self.weights_)
current_score = self._metric_func(y_true, current_pred)
else:
self.weights_ = {}
current_score = -np.inf if self.maximize else np.inf
self.best_score_ = current_score
best_weights = self.weights_.copy()
self.selection_history_ = list(self.weights_.keys())
self._log(f"Initial score: {current_score:.4f}")
# Hill climbing iterations
no_improvement_count = 0
rng = np.random.RandomState(self.random_state)
for iteration in range(self.n_iterations):
best_candidate_idx = -1
best_candidate_score = current_score
# Try adding each model
for model_idx in range(n_models):
# Create candidate weights
candidate_weights = self.weights_.copy()
candidate_weights[model_idx] = candidate_weights.get(model_idx, 0) + 1.0
# Compute ensemble prediction
candidate_pred = self._compute_ensemble_prediction(predictions, candidate_weights)
# Evaluate
try:
candidate_score = self._metric_func(y_true, candidate_pred)
except Exception:
continue
# Check if better
is_better = (
(self.maximize and candidate_score > best_candidate_score) or
(not self.maximize and candidate_score < best_candidate_score)
)
if is_better:
best_candidate_idx = model_idx
best_candidate_score = candidate_score
# Update if improvement found
if best_candidate_idx >= 0:
is_improvement = (
(self.maximize and best_candidate_score > current_score) or
(not self.maximize and best_candidate_score < current_score)
)
if is_improvement:
self.weights_[best_candidate_idx] = self.weights_.get(best_candidate_idx, 0) + 1.0
current_score = best_candidate_score
self.selection_history_.append(best_candidate_idx)
no_improvement_count = 0
if self.verbose and (iteration + 1) % 10 == 0:
self._log(f"Iteration {iteration + 1}: score={current_score:.4f}")
# Update best
is_new_best = (
(self.maximize and current_score > self.best_score_) or
(not self.maximize and current_score < self.best_score_)
)
if is_new_best:
self.best_score_ = current_score
best_weights = self.weights_.copy()
else:
no_improvement_count += 1
else:
no_improvement_count += 1
# Early stopping
if no_improvement_count >= self.early_stopping:
self._log(f"Early stopping at iteration {iteration + 1}")
break
# Use best weights found
self.weights_ = best_weights
# Normalize weights
total = sum(self.weights_.values())
if total > 0:
self.weights_ = {k: v / total for k, v in self.weights_.items()}
self._log(f"Final score: {self.best_score_:.4f}")
self._log(f"Final weights: {self.weights_}")
self._is_fitted = True
return self
[docs]
def predict(self, predictions: list[np.ndarray]) -> np.ndarray:
"""Apply learned weights to generate ensemble prediction.
Parameters
----------
predictions : List of shape (n_models, n_samples, ...)
Predictions from each model.
Returns
-------
ndarray
Weighted ensemble prediction.
"""
self._check_is_fitted()
predictions = self._validate_predictions(predictions)
return self._compute_ensemble_prediction(predictions, self.weights_)
[docs]
def get_result(self) -> EnsembleResult:
"""Get ensemble result summary.
Returns
-------
EnsembleResult
Result containing weights, score, and selected models.
"""
self._check_is_fitted()
# Compute improvement over best single model
# (Would need to store individual scores for this)
improvement = 0.0
return EnsembleResult(
weights=self.weights_,
best_score=self.best_score_,
improvement=improvement,
selected_models=list(self.weights_.keys()),
)