Source code for endgame.ensemble.boosting

"""AdaBoost: Adaptive Boosting for classification and regression.

Implements SAMME (Stagewise Additive Modeling using a Multi-class
Exponential loss) for classification and SAMME.R for probability-
capable classifiers, plus AdaBoost.R2 for regression.

Example
-------
>>> from endgame.ensemble import AdaBoostClassifier
>>> boost = AdaBoostClassifier(
...     base_estimator=DecisionTreeClassifier(max_depth=1),
...     n_estimators=50,
...     learning_rate=1.0,
... )
>>> boost.fit(X_train, y_train)
>>> boost.predict(X_test)
"""

from __future__ import annotations

import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin, clone
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor


[docs] class AdaBoostClassifier(BaseEstimator, ClassifierMixin): """AdaBoost classifier (SAMME / SAMME.R). Parameters ---------- base_estimator : estimator, optional Base learner. Default: ``DecisionTreeClassifier(max_depth=1)`` (stump). n_estimators : int, default=50 Maximum number of boosting rounds. learning_rate : float, default=1.0 Shrinkage applied to each estimator's weight. Lower values require more estimators but generalize better. algorithm : {'SAMME', 'SAMME.R'}, default='SAMME.R' - ``'SAMME'``: discrete AdaBoost using class labels. - ``'SAMME.R'``: real AdaBoost using class probabilities (requires ``predict_proba``). random_state : int or None, default=None Attributes ---------- estimators_ : list of estimator Fitted weak learners. estimator_weights_ : ndarray Weight of each estimator (SAMME only). estimator_errors_ : ndarray Weighted error of each estimator. classes_ : ndarray n_classes_ : int feature_importances_ : ndarray Sum of feature importances weighted by estimator weight. """ def __init__( self, base_estimator: BaseEstimator | None = None, n_estimators: int = 50, learning_rate: float = 1.0, algorithm: str = "SAMME.R", random_state: int | None = None, ): self.base_estimator = base_estimator self.n_estimators = n_estimators self.learning_rate = learning_rate self.algorithm = algorithm self.random_state = random_state
[docs] def fit(self, X, y, sample_weight=None): X = np.asarray(X, dtype=np.float64) y = np.asarray(y) n_samples = X.shape[0] self.classes_ = np.unique(y) self.n_classes_ = len(self.classes_) le = {c: i for i, c in enumerate(self.classes_)} y_enc = np.array([le[v] for v in y]) base = self.base_estimator or DecisionTreeClassifier(max_depth=1) if sample_weight is None: w = np.full(n_samples, 1.0 / n_samples) else: w = np.asarray(sample_weight, dtype=np.float64).copy() w /= w.sum() self.estimators_ = [] self.estimator_weights_ = np.zeros(self.n_estimators) self.estimator_errors_ = np.zeros(self.n_estimators) rng = np.random.RandomState(self.random_state) for t in range(self.n_estimators): est = clone(base) if hasattr(est, "random_state"): est.random_state = rng.randint(0, 2**31) try: est.fit(X, y, sample_weight=w) except TypeError: est.fit(X, y) if self.algorithm == "SAMME.R" and hasattr(est, "predict_proba"): # SAMME.R: real-valued boosting using probabilities proba = est.predict_proba(X) proba = np.clip(proba, 1e-15, 1.0 - 1e-15) log_proba = np.log(proba) # Update weights using the SAMME.R formula indicator = np.zeros_like(proba) indicator[np.arange(n_samples), y_enc] = 1.0 estimator_weight = ( -1.0 * self.learning_rate * ((self.n_classes_ - 1.0) / self.n_classes_) * (indicator * log_proba).sum(axis=1) ) # Error rate for monitoring y_pred = self.classes_[np.argmax(proba, axis=1)] incorrect = (y_pred != y) err = float(np.dot(w, incorrect)) self.estimators_.append(est) self.estimator_weights_[t] = 1.0 # dummy for SAMME.R self.estimator_errors_[t] = err if err >= 1.0 - 1.0 / self.n_classes_: break # Reweight samples w *= np.exp(estimator_weight - estimator_weight.max()) w_sum = w.sum() if w_sum <= 0: break w /= w_sum else: # SAMME: discrete boosting y_pred = est.predict(X) incorrect = (y_pred != y) err = float(np.dot(w, incorrect)) if err > 1.0 - 1.0 / self.n_classes_: if t == 0: self.estimators_.append(est) self.estimator_weights_[t] = 1.0 self.estimator_errors_[t] = err break if err <= 0: self.estimators_.append(est) self.estimator_weights_[t] = 10.0 # large weight for perfect self.estimator_errors_[t] = 0.0 break alpha = self.learning_rate * ( np.log((1.0 - err) / err) + np.log(self.n_classes_ - 1.0) ) self.estimators_.append(est) self.estimator_weights_[t] = alpha self.estimator_errors_[t] = err # Reweight w *= np.exp(alpha * incorrect) w /= w.sum() n_fitted = len(self.estimators_) self.estimator_weights_ = self.estimator_weights_[:n_fitted] self.estimator_errors_ = self.estimator_errors_[:n_fitted] return self
[docs] def predict(self, X): X = np.asarray(X, dtype=np.float64) if self.algorithm == "SAMME.R" and hasattr(self.estimators_[0], "predict_proba"): proba = self.predict_proba(X) return self.classes_[np.argmax(proba, axis=1)] # SAMME: weighted vote n = X.shape[0] scores = np.zeros((n, self.n_classes_)) for est, alpha in zip(self.estimators_, self.estimator_weights_): preds = est.predict(X) le = {c: i for i, c in enumerate(self.classes_)} for j in range(n): scores[j, le[preds[j]]] += alpha return self.classes_[np.argmax(scores, axis=1)]
[docs] def predict_proba(self, X): X = np.asarray(X, dtype=np.float64) n = X.shape[0] if self.algorithm == "SAMME.R" and hasattr(self.estimators_[0], "predict_proba"): # SAMME.R: accumulate log probabilities log_proba_sum = np.zeros((n, self.n_classes_)) for est in self.estimators_: proba = est.predict_proba(X) proba = np.clip(proba, 1e-15, 1.0 - 1e-15) log_p = np.log(proba) log_proba_sum += self.learning_rate * ( (self.n_classes_ - 1) * (log_p - log_p.mean(axis=1, keepdims=True)) ) # Softmax log_proba_sum -= log_proba_sum.max(axis=1, keepdims=True) proba = np.exp(log_proba_sum) proba /= proba.sum(axis=1, keepdims=True) return proba # SAMME: convert weighted votes to probabilities scores = np.zeros((n, self.n_classes_)) le = {c: i for i, c in enumerate(self.classes_)} for est, alpha in zip(self.estimators_, self.estimator_weights_): preds = est.predict(X) for j in range(n): scores[j, le[preds[j]]] += alpha total = scores.sum(axis=1, keepdims=True) total = np.where(total == 0, 1, total) return scores / total
@property def feature_importances_(self): if not hasattr(self.estimators_[0], "feature_importances_"): raise AttributeError("Base estimator has no feature_importances_.") w = self.estimator_weights_ if w.sum() == 0: w = np.ones(len(self.estimators_)) norm_w = w / w.sum() imp = sum( est.feature_importances_ * nw for est, nw in zip(self.estimators_, norm_w) ) return imp
[docs] class AdaBoostRegressor(BaseEstimator, RegressorMixin): """AdaBoost.R2 regressor. Parameters ---------- base_estimator : estimator, optional Default: ``DecisionTreeRegressor(max_depth=3)``. n_estimators : int, default=50 learning_rate : float, default=1.0 loss : {'linear', 'square', 'exponential'}, default='linear' Loss function for computing sample weights. random_state : int or None, default=None Attributes ---------- estimators_ : list of estimator estimator_weights_ : ndarray estimator_errors_ : ndarray feature_importances_ : ndarray """ def __init__( self, base_estimator: BaseEstimator | None = None, n_estimators: int = 50, learning_rate: float = 1.0, loss: str = "linear", random_state: int | None = None, ): self.base_estimator = base_estimator self.n_estimators = n_estimators self.learning_rate = learning_rate self.loss = loss self.random_state = random_state
[docs] def fit(self, X, y, sample_weight=None): X = np.asarray(X, dtype=np.float64) y = np.asarray(y, dtype=np.float64).ravel() n_samples = X.shape[0] base = self.base_estimator or DecisionTreeRegressor(max_depth=3) rng = np.random.RandomState(self.random_state) if sample_weight is None: w = np.full(n_samples, 1.0 / n_samples) else: w = np.asarray(sample_weight, dtype=np.float64).copy() w /= w.sum() self.estimators_ = [] self.estimator_weights_ = np.zeros(self.n_estimators) self.estimator_errors_ = np.zeros(self.n_estimators) for t in range(self.n_estimators): est = clone(base) if hasattr(est, "random_state"): est.random_state = rng.randint(0, 2**31) try: est.fit(X, y, sample_weight=w) except TypeError: est.fit(X, y) y_pred = est.predict(X) # Compute error abs_error = np.abs(y - y_pred) max_err = abs_error.max() if max_err == 0: self.estimators_.append(est) self.estimator_weights_[t] = 1.0 self.estimator_errors_[t] = 0.0 break # Normalize error if self.loss == "linear": loss_arr = abs_error / max_err elif self.loss == "square": loss_arr = (abs_error / max_err) ** 2 elif self.loss == "exponential": loss_arr = 1.0 - np.exp(-abs_error / max_err) else: raise ValueError(f"Unknown loss: {self.loss}") avg_loss = float(np.dot(w, loss_arr)) if avg_loss >= 0.5: if t == 0: self.estimators_.append(est) self.estimator_weights_[t] = 1.0 self.estimator_errors_[t] = avg_loss break beta = avg_loss / (1.0 - avg_loss) alpha = self.learning_rate * np.log(1.0 / beta) self.estimators_.append(est) self.estimator_weights_[t] = alpha self.estimator_errors_[t] = avg_loss # Reweight w *= np.power(beta, (1.0 - loss_arr)) w_sum = w.sum() if w_sum <= 0: break w /= w_sum n_fitted = len(self.estimators_) self.estimator_weights_ = self.estimator_weights_[:n_fitted] self.estimator_errors_ = self.estimator_errors_[:n_fitted] return self
[docs] def predict(self, X): X = np.asarray(X, dtype=np.float64) # Weighted median preds = np.array([est.predict(X) for est in self.estimators_]) w = self.estimator_weights_ if w.sum() == 0: return preds.mean(axis=0) n = X.shape[0] result = np.empty(n) for i in range(n): vals = preds[:, i] order = np.argsort(vals) sorted_vals = vals[order] sorted_w = w[order] cumw = np.cumsum(sorted_w) half = cumw[-1] / 2.0 idx = np.searchsorted(cumw, half) idx = min(idx, len(sorted_vals) - 1) result[i] = sorted_vals[idx] return result
@property def feature_importances_(self): if not hasattr(self.estimators_[0], "feature_importances_"): raise AttributeError("Base estimator has no feature_importances_.") w = self.estimator_weights_ if w.sum() == 0: w = np.ones(len(self.estimators_)) norm_w = w / w.sum() return sum( est.feature_importances_ * nw for est, nw in zip(self.estimators_, norm_w) )