Source code for endgame.ensemble.multi_output

from __future__ import annotations

"""Multi-output wrappers: parallel and chained multi-target estimation.

Provides MultiOutputClassifier, MultiOutputRegressor, ClassifierChain,
and RegressorChain for multi-target learning tasks where Y has shape
(n_samples, n_outputs).
"""


import numpy as np
from sklearn.base import (
    BaseEstimator,
    ClassifierMixin,
    RegressorMixin,
    clone,
)

try:
    from joblib import Parallel, delayed
except ImportError:
    Parallel = None
    delayed = None


def _fit_single_estimator(estimator, X, y, sample_weight=None):
    """Fit a single estimator on (X, y).

    Parameters
    ----------
    estimator : estimator
        The estimator to fit.
    X : ndarray of shape (n_samples, n_features)
        Training features.
    y : ndarray of shape (n_samples,)
        Target values for a single output.
    sample_weight : ndarray, optional
        Sample weights.

    Returns
    -------
    estimator
        Fitted estimator.
    """
    if sample_weight is not None:
        try:
            estimator.fit(X, y, sample_weight=sample_weight)
        except TypeError:
            estimator.fit(X, y)
    else:
        estimator.fit(X, y)
    return estimator


[docs] class MultiOutputClassifier(BaseEstimator, ClassifierMixin): """Wraps a single-output classifier for multi-output classification. Fits one independent clone of the base classifier per output column. Supports parallel fitting via joblib. Parameters ---------- estimator : estimator The base classifier to clone for each output. Must implement ``fit`` and ``predict``. n_jobs : int, optional Number of jobs for parallel fitting. ``None`` means 1 (sequential). ``-1`` means using all processors. random_state : int, optional Random seed. Passed to each cloned estimator if it accepts ``random_state``. verbose : bool, default=False Enable verbose output during fitting. Attributes ---------- estimators_ : List[estimator] Fitted classifiers, one per output. classes_ : List[ndarray] Class labels for each output. n_outputs_ : int Number of output columns. Examples -------- >>> from endgame.ensemble.multi_output import MultiOutputClassifier >>> from sklearn.tree import DecisionTreeClassifier >>> import numpy as np >>> X = np.random.randn(100, 5) >>> Y = np.random.randint(0, 3, size=(100, 3)) >>> clf = MultiOutputClassifier(DecisionTreeClassifier(), n_jobs=-1) >>> clf.fit(X, Y) >>> preds = clf.predict(X) >>> preds.shape (100, 3) """ def __init__( self, estimator: BaseEstimator = None, n_jobs: int | None = None, random_state: int | None = None, verbose: bool = False, ): self.estimator = estimator self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose def _make_estimator(self): """Clone the base estimator and inject random_state if supported.""" est = clone(self.estimator) if self.random_state is not None and hasattr(est, "random_state"): est.random_state = self.random_state return est
[docs] def fit(self, X, Y, sample_weight=None): """Fit one classifier per output column. Parameters ---------- X : array-like of shape (n_samples, n_features) Training features. Y : array-like of shape (n_samples, n_outputs) Multi-output target matrix. sample_weight : array-like of shape (n_samples,), optional Sample weights passed to each estimator. Returns ------- self """ X = np.asarray(X) Y = np.asarray(Y) if Y.ndim == 1: Y = Y.reshape(-1, 1) self.n_outputs_ = Y.shape[1] self.classes_ = [np.unique(Y[:, i]) for i in range(self.n_outputs_)] estimators = [self._make_estimator() for _ in range(self.n_outputs_)] if Parallel is not None and self.n_jobs is not None and self.n_jobs != 1: self.estimators_ = Parallel(n_jobs=self.n_jobs)( delayed(_fit_single_estimator)(est, X, Y[:, i], sample_weight) for i, est in enumerate(estimators) ) else: self.estimators_ = [ _fit_single_estimator(est, X, Y[:, i], sample_weight) for i, est in enumerate(estimators) ] return self
[docs] def predict(self, X): """Predict class labels for each output. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples, n_outputs) Predicted class labels. """ self._check_fitted() X = np.asarray(X) predictions = np.column_stack( [est.predict(X) for est in self.estimators_] ) return predictions
[docs] def predict_proba(self, X): """Predict class probabilities for each output. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- list of ndarray List of length ``n_outputs_``, where each element is an array of shape ``(n_samples, n_classes_k)`` containing class probabilities for output k. """ self._check_fitted() X = np.asarray(X) return [est.predict_proba(X) for est in self.estimators_]
[docs] def score(self, X, Y, sample_weight=None): """Return the mean accuracy across all outputs. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. Y : array-like of shape (n_samples, n_outputs) True labels. sample_weight : array-like, optional Sample weights. Returns ------- float Mean of per-output accuracy scores. """ from sklearn.metrics import accuracy_score Y = np.asarray(Y) if Y.ndim == 1: Y = Y.reshape(-1, 1) preds = self.predict(X) scores = [ accuracy_score(Y[:, i], preds[:, i], sample_weight=sample_weight) for i in range(self.n_outputs_) ] return float(np.mean(scores))
def _check_fitted(self): """Raise if the estimator has not been fitted.""" if not hasattr(self, "estimators_") or self.estimators_ is None: raise RuntimeError( f"{self.__class__.__name__} has not been fitted. " "Call 'fit' before using this estimator." )
[docs] class MultiOutputRegressor(BaseEstimator, RegressorMixin): """Wraps a single-output regressor for multi-output regression. Fits one independent clone of the base regressor per output column. Supports parallel fitting via joblib. Parameters ---------- estimator : estimator The base regressor to clone for each output. Must implement ``fit`` and ``predict``. n_jobs : int, optional Number of jobs for parallel fitting. ``None`` means 1 (sequential). ``-1`` means using all processors. random_state : int, optional Random seed. Passed to each cloned estimator if it accepts ``random_state``. verbose : bool, default=False Enable verbose output during fitting. Attributes ---------- estimators_ : List[estimator] Fitted regressors, one per output. n_outputs_ : int Number of output columns. Examples -------- >>> from endgame.ensemble.multi_output import MultiOutputRegressor >>> from sklearn.linear_model import Ridge >>> import numpy as np >>> X = np.random.randn(100, 5) >>> Y = np.random.randn(100, 3) >>> reg = MultiOutputRegressor(Ridge(), n_jobs=-1) >>> reg.fit(X, Y) >>> preds = reg.predict(X) >>> preds.shape (100, 3) """ def __init__( self, estimator: BaseEstimator = None, n_jobs: int | None = None, random_state: int | None = None, verbose: bool = False, ): self.estimator = estimator self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose def _make_estimator(self): """Clone the base estimator and inject random_state if supported.""" est = clone(self.estimator) if self.random_state is not None and hasattr(est, "random_state"): est.random_state = self.random_state return est
[docs] def fit(self, X, Y, sample_weight=None): """Fit one regressor per output column. Parameters ---------- X : array-like of shape (n_samples, n_features) Training features. Y : array-like of shape (n_samples, n_outputs) Multi-output target matrix. sample_weight : array-like of shape (n_samples,), optional Sample weights passed to each estimator. Returns ------- self """ X = np.asarray(X) Y = np.asarray(Y) if Y.ndim == 1: Y = Y.reshape(-1, 1) self.n_outputs_ = Y.shape[1] estimators = [self._make_estimator() for _ in range(self.n_outputs_)] if Parallel is not None and self.n_jobs is not None and self.n_jobs != 1: self.estimators_ = Parallel(n_jobs=self.n_jobs)( delayed(_fit_single_estimator)(est, X, Y[:, i], sample_weight) for i, est in enumerate(estimators) ) else: self.estimators_ = [ _fit_single_estimator(est, X, Y[:, i], sample_weight) for i, est in enumerate(estimators) ] return self
[docs] def predict(self, X): """Predict target values for each output. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples, n_outputs) Predicted values. """ self._check_fitted() X = np.asarray(X) predictions = np.column_stack( [est.predict(X) for est in self.estimators_] ) return predictions
@property def feature_importances_(self): """Average feature importances across all output estimators. Returns ------- ndarray of shape (n_features,) Mean of ``feature_importances_`` across fitted estimators. Raises ------ AttributeError If the base estimators do not expose ``feature_importances_``. """ self._check_fitted() all_importances = [] for est in self.estimators_: if not hasattr(est, "feature_importances_"): raise AttributeError( f"Base estimator {type(est).__name__} does not provide " "feature_importances_." ) all_importances.append(est.feature_importances_) return np.mean(all_importances, axis=0)
[docs] def score(self, X, Y, sample_weight=None): """Return the mean R^2 score across all outputs. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. Y : array-like of shape (n_samples, n_outputs) True target values. sample_weight : array-like, optional Sample weights. Returns ------- float Mean of per-output R^2 scores. """ from sklearn.metrics import r2_score Y = np.asarray(Y) if Y.ndim == 1: Y = Y.reshape(-1, 1) preds = self.predict(X) scores = [ r2_score(Y[:, i], preds[:, i], sample_weight=sample_weight) for i in range(self.n_outputs_) ] return float(np.mean(scores))
def _check_fitted(self): """Raise if the estimator has not been fitted.""" if not hasattr(self, "estimators_") or self.estimators_ is None: raise RuntimeError( f"{self.__class__.__name__} has not been fitted. " "Call 'fit' before using this estimator." )
def _determine_chain_order(Y, order, random_state=None): """Determine the chain ordering for output columns. Parameters ---------- Y : ndarray of shape (n_samples, n_outputs) Target matrix. order : str or list of int Chain ordering strategy: - ``'auto'``: order by descending absolute pairwise correlation so that highly correlated outputs are adjacent. - ``'random'``: random permutation. - list of int: explicit ordering. random_state : int, optional Random seed for ``'random'`` ordering. Returns ------- list of int Column indices in chain order. """ n_outputs = Y.shape[1] if isinstance(order, (list, np.ndarray)): order_list = list(order) if sorted(order_list) != list(range(n_outputs)): raise ValueError( f"order must be a permutation of range({n_outputs}), " f"got {order_list}" ) return order_list if order == "random": rng = np.random.RandomState(random_state) return list(rng.permutation(n_outputs)) if order == "auto": # Greedy nearest-neighbour walk on the correlation matrix. # Start with the output that has highest mean absolute correlation # (a well-connected hub), then always pick the unvisited output most # correlated with the current one. corr = np.abs(np.corrcoef(Y.T)) np.fill_diagonal(corr, 0.0) remaining = set(range(n_outputs)) # Start with highest mean correlation column start = int(np.argmax(corr.mean(axis=1))) chain = [start] remaining.remove(start) while remaining: current = chain[-1] # Among remaining, pick the one most correlated with current best = max(remaining, key=lambda j: corr[current, j]) chain.append(best) remaining.remove(best) return chain raise ValueError( f"order must be 'auto', 'random', or a list of int, got {order!r}" )
[docs] class ClassifierChain(BaseEstimator, ClassifierMixin): """Chain classifiers where each uses predictions of previous outputs as features. Each classifier in the chain receives the original feature matrix X augmented with the predictions from all preceding classifiers. This allows the chain to model dependencies between outputs. Parameters ---------- estimator : estimator The base classifier to clone for each link in the chain. order : str or list of int, default='auto' Chain ordering strategy: - ``'auto'``: greedy ordering by pairwise correlation so that adjacent outputs in the chain are maximally correlated. - ``'random'``: random permutation (seeded by ``random_state``). - list of int: explicit column ordering. n_jobs : int, optional Not used directly (chain is inherently sequential), but stored for API consistency. random_state : int, optional Random seed for random ordering and estimator cloning. verbose : bool, default=False Enable verbose output. Attributes ---------- estimators_ : List[estimator] Fitted classifiers in chain order. order_ : list of int The resolved output ordering. classes_ : List[ndarray] Class labels for each output (in original column order). n_outputs_ : int Number of output columns. Examples -------- >>> from endgame.ensemble.multi_output import ClassifierChain >>> from sklearn.linear_model import LogisticRegression >>> import numpy as np >>> X = np.random.randn(200, 5) >>> Y = np.random.randint(0, 2, size=(200, 3)) >>> chain = ClassifierChain(LogisticRegression(), order='auto') >>> chain.fit(X, Y) >>> preds = chain.predict(X) >>> preds.shape (200, 3) """ def __init__( self, estimator: BaseEstimator = None, order: str | list[int] = "auto", n_jobs: int | None = None, random_state: int | None = None, verbose: bool = False, ): self.estimator = estimator self.order = order self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose def _make_estimator(self): """Clone the base estimator and inject random_state if supported.""" est = clone(self.estimator) if self.random_state is not None and hasattr(est, "random_state"): est.random_state = self.random_state return est
[docs] def fit(self, X, Y, sample_weight=None): """Fit the classifier chain. Parameters ---------- X : array-like of shape (n_samples, n_features) Training features. Y : array-like of shape (n_samples, n_outputs) Multi-output target matrix. sample_weight : array-like of shape (n_samples,), optional Sample weights. Returns ------- self """ X = np.asarray(X, dtype=np.float64) Y = np.asarray(Y) if Y.ndim == 1: Y = Y.reshape(-1, 1) self.n_outputs_ = Y.shape[1] self.classes_ = [np.unique(Y[:, i]) for i in range(self.n_outputs_)] self.order_ = _determine_chain_order(Y, self.order, self.random_state) self.estimators_ = [] X_aug = X.copy() for idx in self.order_: est = self._make_estimator() _fit_single_estimator(est, X_aug, Y[:, idx], sample_weight) self.estimators_.append(est) # Augment X with the true labels for this output (teacher forcing) X_aug = np.column_stack([X_aug, Y[:, idx]]) return self
[docs] def predict(self, X): """Predict class labels for each output. At prediction time, the chain uses its own predictions (rather than ground truth) for augmentation. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples, n_outputs) Predicted class labels in original column order. """ self._check_fitted() X = np.asarray(X, dtype=np.float64) # predictions_by_order[k] = prediction for chain position k predictions_by_order = {} X_aug = X.copy() for k, idx in enumerate(self.order_): est = self.estimators_[k] pred = est.predict(X_aug) predictions_by_order[idx] = pred X_aug = np.column_stack([X_aug, pred]) # Reassemble in original column order result = np.column_stack( [predictions_by_order[i] for i in range(self.n_outputs_)] ) return result
[docs] def predict_proba(self, X): """Predict class probabilities for each output. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- list of ndarray List of length ``n_outputs_`` (in original column order), where each element is an array of shape ``(n_samples, n_classes_k)``. """ self._check_fitted() X = np.asarray(X, dtype=np.float64) probas_by_order = {} preds_by_order = {} X_aug = X.copy() for k, idx in enumerate(self.order_): est = self.estimators_[k] probas_by_order[idx] = est.predict_proba(X_aug) pred = est.predict(X_aug) preds_by_order[idx] = pred X_aug = np.column_stack([X_aug, pred]) return [probas_by_order[i] for i in range(self.n_outputs_)]
[docs] def score(self, X, Y, sample_weight=None): """Return the mean accuracy across all outputs. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. Y : array-like of shape (n_samples, n_outputs) True labels. sample_weight : array-like, optional Sample weights. Returns ------- float Mean of per-output accuracy scores. """ from sklearn.metrics import accuracy_score Y = np.asarray(Y) if Y.ndim == 1: Y = Y.reshape(-1, 1) preds = self.predict(X) scores = [ accuracy_score(Y[:, i], preds[:, i], sample_weight=sample_weight) for i in range(self.n_outputs_) ] return float(np.mean(scores))
def _check_fitted(self): """Raise if the estimator has not been fitted.""" if not hasattr(self, "estimators_") or self.estimators_ is None: raise RuntimeError( f"{self.__class__.__name__} has not been fitted. " "Call 'fit' before using this estimator." )
[docs] class RegressorChain(BaseEstimator, RegressorMixin): """Chain regressors where each uses predictions of previous outputs as features. Each regressor in the chain receives the original feature matrix X augmented with the predictions from all preceding regressors. This allows the chain to model dependencies between outputs. Parameters ---------- estimator : estimator The base regressor to clone for each link in the chain. order : str or list of int, default='auto' Chain ordering strategy: - ``'auto'``: greedy ordering by pairwise correlation so that adjacent outputs in the chain are maximally correlated. - ``'random'``: random permutation (seeded by ``random_state``). - list of int: explicit column ordering. n_jobs : int, optional Not used directly (chain is inherently sequential), but stored for API consistency. random_state : int, optional Random seed for random ordering and estimator cloning. verbose : bool, default=False Enable verbose output. Attributes ---------- estimators_ : List[estimator] Fitted regressors in chain order. order_ : list of int The resolved output ordering. n_outputs_ : int Number of output columns. Examples -------- >>> from endgame.ensemble.multi_output import RegressorChain >>> from sklearn.linear_model import Ridge >>> import numpy as np >>> X = np.random.randn(200, 5) >>> Y = np.random.randn(200, 3) >>> chain = RegressorChain(Ridge(), order='auto') >>> chain.fit(X, Y) >>> preds = chain.predict(X) >>> preds.shape (200, 3) """ def __init__( self, estimator: BaseEstimator = None, order: str | list[int] = "auto", n_jobs: int | None = None, random_state: int | None = None, verbose: bool = False, ): self.estimator = estimator self.order = order self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose def _make_estimator(self): """Clone the base estimator and inject random_state if supported.""" est = clone(self.estimator) if self.random_state is not None and hasattr(est, "random_state"): est.random_state = self.random_state return est
[docs] def fit(self, X, Y, sample_weight=None): """Fit the regressor chain. Parameters ---------- X : array-like of shape (n_samples, n_features) Training features. Y : array-like of shape (n_samples, n_outputs) Multi-output target matrix. sample_weight : array-like of shape (n_samples,), optional Sample weights. Returns ------- self """ X = np.asarray(X, dtype=np.float64) Y = np.asarray(Y, dtype=np.float64) if Y.ndim == 1: Y = Y.reshape(-1, 1) self.n_outputs_ = Y.shape[1] self.order_ = _determine_chain_order(Y, self.order, self.random_state) self.estimators_ = [] X_aug = X.copy() for idx in self.order_: est = self._make_estimator() _fit_single_estimator(est, X_aug, Y[:, idx], sample_weight) self.estimators_.append(est) # Augment X with the true values for this output (teacher forcing) X_aug = np.column_stack([X_aug, Y[:, idx]]) return self
[docs] def predict(self, X): """Predict target values for each output. At prediction time, the chain uses its own predictions (rather than ground truth) for augmentation. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples, n_outputs) Predicted values in original column order. """ self._check_fitted() X = np.asarray(X, dtype=np.float64) predictions_by_order = {} X_aug = X.copy() for k, idx in enumerate(self.order_): est = self.estimators_[k] pred = est.predict(X_aug) predictions_by_order[idx] = pred X_aug = np.column_stack([X_aug, pred]) result = np.column_stack( [predictions_by_order[i] for i in range(self.n_outputs_)] ) return result
@property def feature_importances_(self): """Average feature importances across chain estimators. Only includes importances for the original features (not the chained predictions), averaged across all estimators. Returns ------- ndarray of shape (n_features,) Mean feature importances for the original features. Raises ------ AttributeError If the base estimators do not expose ``feature_importances_``. """ self._check_fitted() # The first estimator sees n_features original features. # The k-th estimator sees n_features + k chained columns. # We extract only the first n_features importances from each. first_est = self.estimators_[0] if not hasattr(first_est, "feature_importances_"): raise AttributeError( f"Base estimator {type(first_est).__name__} does not provide " "feature_importances_." ) n_features = len(first_est.feature_importances_) all_importances = [] for est in self.estimators_: imp = est.feature_importances_[:n_features] all_importances.append(imp) return np.mean(all_importances, axis=0)
[docs] def score(self, X, Y, sample_weight=None): """Return the mean R^2 score across all outputs. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. Y : array-like of shape (n_samples, n_outputs) True target values. sample_weight : array-like, optional Sample weights. Returns ------- float Mean of per-output R^2 scores. """ from sklearn.metrics import r2_score Y = np.asarray(Y) if Y.ndim == 1: Y = Y.reshape(-1, 1) preds = self.predict(X) scores = [ r2_score(Y[:, i], preds[:, i], sample_weight=sample_weight) for i in range(self.n_outputs_) ] return float(np.mean(scores))
def _check_fitted(self): """Raise if the estimator has not been fitted.""" if not hasattr(self, "estimators_") or self.estimators_ is None: raise RuntimeError( f"{self.__class__.__name__} has not been fitted. " "Call 'fit' before using this estimator." )