Source code for endgame.ensemble.bayesian_averaging

"""Bayesian Model Averaging: Weight models by posterior probability.

Combines predictions from multiple fitted models by weighting each
model's contribution proportional to its posterior probability given
the data. Uses BIC (Bayesian Information Criterion) or AIC to
approximate the marginal likelihood.

Example
-------
>>> from endgame.ensemble import BayesianModelAveraging
>>> bma = BayesianModelAveraging(criterion="bic")
>>> bma.fit(
...     estimators=[rf, xgb, lr],
...     X_val=X_val,
...     y_val=y_val,
... )
>>> bma.predict(X_test)
"""

from __future__ import annotations

from collections.abc import Sequence

import numpy as np
from sklearn.base import BaseEstimator
from sklearn.metrics import log_loss, mean_squared_error


[docs] class BayesianModelAveraging(BaseEstimator): """Bayesian Model Averaging using information-criterion weights. Parameters ---------- criterion : {'bic', 'aic', 'aic_c'}, default='bic' - ``'bic'``: Bayesian Information Criterion (penalizes complexity more). - ``'aic'``: Akaike Information Criterion. - ``'aic_c'``: Corrected AIC for small samples. prior : {'uniform', 'complexity'} or array-like, optional Prior over models. Default is uniform. task : {'auto', 'classification', 'regression'}, default='auto' Attributes ---------- weights_ : ndarray Posterior model weights summing to 1. ic_scores_ : ndarray Information criterion values for each model. estimators_ : list of estimator Fitted estimators (references, not clones). """ def __init__( self, criterion: str = "bic", prior: str | Sequence[float] = "uniform", task: str = "auto", ): self.criterion = criterion self.prior = prior self.task = task
[docs] def fit( self, estimators: list[BaseEstimator], X_val: np.ndarray, y_val: np.ndarray, ) -> BayesianModelAveraging: """Compute posterior weights from validation data. Parameters ---------- estimators : list of fitted estimators Already-fitted models. X_val : array-like Validation features. y_val : array-like Validation target. Returns ------- self """ X_val = np.asarray(X_val) y_val = np.asarray(y_val) n = len(y_val) self.estimators_ = list(estimators) m = len(estimators) # Auto-detect task if self.task == "auto": unique = np.unique(y_val) self._is_classifier = len(unique) <= 30 and (np.issubdtype(y_val.dtype, np.integer) or len(unique) <= 10) else: self._is_classifier = self.task == "classification" if self._is_classifier: self.classes_ = np.unique(y_val) # Compute negative log-likelihood for each model nll = np.zeros(m) n_params = np.zeros(m) for i, est in enumerate(estimators): if self._is_classifier: nll[i], n_params[i] = self._classification_nll(est, X_val, y_val, n) else: nll[i], n_params[i] = self._regression_nll(est, X_val, y_val, n) # Compute IC if self.criterion == "bic": self.ic_scores_ = 2 * nll + n_params * np.log(n) elif self.criterion == "aic": self.ic_scores_ = 2 * nll + 2 * n_params elif self.criterion == "aic_c": self.ic_scores_ = 2 * nll + 2 * n_params + ( 2 * n_params * (n_params + 1) / np.maximum(n - n_params - 1, 1) ) else: raise ValueError(f"Unknown criterion: {self.criterion}") # Convert IC to posterior weights via exp(-0.5 * delta_IC) delta = self.ic_scores_ - self.ic_scores_.min() log_weights = -0.5 * delta # Apply prior if isinstance(self.prior, str): if self.prior == "uniform": pass # uniform = no change elif self.prior == "complexity": # Favor simpler models log_weights -= 0.01 * n_params else: log_prior = np.log(np.asarray(self.prior, dtype=float) + 1e-15) log_weights += log_prior # Softmax normalization log_weights -= log_weights.max() raw_weights = np.exp(log_weights) self.weights_ = raw_weights / raw_weights.sum() return self
def _classification_nll(self, est, X, y, n): """Negative log-likelihood for classification.""" try: proba = est.predict_proba(X) nll = n * log_loss(y, proba) except Exception: y_pred = est.predict(X) err_rate = float(np.mean(y_pred != y)) nll = -n * ( (1 - err_rate) * np.log(max(1 - err_rate, 1e-15)) + err_rate * np.log(max(err_rate, 1e-15)) ) n_params = self._estimate_params(est) return float(nll), float(n_params) def _regression_nll(self, est, X, y, n): """Negative log-likelihood for regression (Gaussian).""" y_pred = est.predict(X) mse = mean_squared_error(y, y_pred) sigma2 = max(mse, 1e-15) nll = 0.5 * n * (np.log(2 * np.pi * sigma2) + 1.0) n_params = self._estimate_params(est) return float(nll), float(n_params) def _estimate_params(self, est): """Estimate effective number of parameters.""" name = type(est).__name__ # Known models if hasattr(est, "coef_"): coef = np.asarray(est.coef_).ravel() return len(coef) + (1 if hasattr(est, "intercept_") else 0) if hasattr(est, "n_estimators") and hasattr(est, "max_depth"): depth = est.max_depth or 6 return getattr(est, "n_estimators", 100) * (2 ** depth) if hasattr(est, "tree_"): return est.tree_.node_count if hasattr(est, "n_estimators"): return getattr(est, "n_estimators", 100) * 10 return 10 # fallback
[docs] def predict(self, X): X = np.asarray(X) if self._is_classifier: proba = self.predict_proba(X) return self.classes_[np.argmax(proba, axis=1)] # Regression: weighted average pred = np.zeros(X.shape[0]) for est, w in zip(self.estimators_, self.weights_): pred += w * est.predict(X) return pred
[docs] def predict_proba(self, X): if not self._is_classifier: raise ValueError("predict_proba only for classification.") X = np.asarray(X) n = X.shape[0] n_classes = len(self.classes_) proba = np.zeros((n, n_classes)) for est, w in zip(self.estimators_, self.weights_): if hasattr(est, "predict_proba"): p = est.predict_proba(X) proba[:, :p.shape[1]] += w * p else: preds = est.predict(X) le = {c: i for i, c in enumerate(self.classes_)} for j, pred in enumerate(preds): if pred in le: proba[j, le[pred]] += w # Normalize row_sums = proba.sum(axis=1, keepdims=True) row_sums = np.where(row_sums == 0, 1, row_sums) return proba / row_sums