Source code for endgame.models.trees.rotation_forest

from __future__ import annotations

"""Rotation Forest: Ensemble of trees trained on PCA-rotated feature subsets."""


import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin, clone
from sklearn.decomposition import PCA
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.utils.validation import check_array, check_is_fitted, check_X_y

from endgame.core.base import EndgameEstimator


class BaseRotationForest(EndgameEstimator):
    """Base class for Rotation Forest.

    Rotation Forest is an ensemble method where each tree sees a
    different "view" of the data through PCA rotation of feature subsets.
    This provides ensemble diversity without feature subsampling.

    Algorithm:
    1. For each tree:
       a. Randomly split features into K subsets
       b. Apply PCA to each subset → get rotation matrix components
       c. Combine into full rotation matrix R
       d. Project X → X @ R
       e. Train tree on rotated data
    2. Predictions: Average tree predictions

    Parameters
    ----------
    n_estimators : int, default=10
        Number of trees in the forest.
    n_subsets : int, default=3
        Number of feature subsets per tree.
    max_features : float or int, default=0.5
        Features per subset for PCA rotation.
        If float, fraction of total features.
        If int, absolute number.
    base_estimator : estimator, optional
        Base tree estimator.
    bootstrap : bool, default=True
        Whether to bootstrap samples for each tree.
    random_state : int, optional
        Random seed.
    n_jobs : int, default=1
        Number of parallel jobs.

    References
    ----------
    Rodriguez, J.J., Kuncheva, L.I., and Alonso, C.J., 2006.
    Rotation Forest: A New Classifier Ensemble Method.
    IEEE Transactions on Pattern Analysis and Machine Intelligence.
    """

    def __init__(
        self,
        n_estimators: int = 10,
        n_subsets: int = 3,
        max_features: float = 0.5,
        base_estimator: BaseEstimator | None = None,
        bootstrap: bool = True,
        random_state: int | None = None,
        n_jobs: int = 1,
        verbose: bool = False,
    ):
        super().__init__(random_state=random_state, verbose=verbose)
        self.n_estimators = n_estimators
        self.n_subsets = n_subsets
        self.max_features = max_features
        self.base_estimator = base_estimator
        self.bootstrap = bootstrap
        self.n_jobs = n_jobs

        self.estimators_: list[BaseEstimator] = []
        self.rotation_matrices_: list[np.ndarray] = []
        self.feature_subsets_: list[list[list[int]]] = []

    def _get_base_estimator(self) -> BaseEstimator:
        """Get the base tree estimator."""
        raise NotImplementedError("Subclasses must implement _get_base_estimator")

    def _compute_n_features_per_subset(self, n_features: int) -> int:
        """Compute number of features per subset."""
        if isinstance(self.max_features, float):
            return max(1, int(n_features * self.max_features / self.n_subsets))
        return max(1, self.max_features // self.n_subsets)

    def _create_rotation_matrix(
        self,
        X: np.ndarray,
        rng: np.random.RandomState,
    ) -> tuple[np.ndarray, list[list[int]]]:
        """Create rotation matrix from PCA on feature subsets.

        Parameters
        ----------
        X : ndarray of shape (n_samples, n_features)
            Training data.
        rng : RandomState
            Random number generator.

        Returns
        -------
        rotation_matrix : ndarray of shape (n_features, n_features)
            Full rotation matrix.
        subsets : List[List[int]]
            Feature indices for each subset.
        """
        n_samples, n_features = X.shape
        features_per_subset = self._compute_n_features_per_subset(n_features)

        # Randomly partition features into subsets
        feature_indices = rng.permutation(n_features)
        subsets = []

        start = 0
        for i in range(self.n_subsets):
            if i == self.n_subsets - 1:
                # Last subset gets remaining features
                subset = feature_indices[start:].tolist()
            else:
                end = start + features_per_subset
                subset = feature_indices[start:end].tolist()
                start = end

            if len(subset) > 0:
                subsets.append(subset)

        # Build rotation matrix
        rotation_matrix = np.zeros((n_features, n_features))

        for subset in subsets:
            if len(subset) == 0:
                continue

            # Extract features for this subset
            X_subset = X[:, subset]

            # Bootstrap sample for PCA
            if self.bootstrap:
                boot_idx = rng.choice(n_samples, n_samples, replace=True)
                X_subset = X_subset[boot_idx]

            # Fit PCA
            n_components = min(len(subset), X_subset.shape[0])
            pca = PCA(n_components=n_components)

            try:
                pca.fit(X_subset)
                # Place PCA components in rotation matrix
                components = pca.components_.T  # (n_subset_features, n_components)

                for i, feat_idx in enumerate(subset):
                    for j, comp_idx in enumerate(subset[:n_components]):
                        if j < components.shape[1]:
                            rotation_matrix[feat_idx, comp_idx] = components[i, j]
            except Exception:
                # Fall back to identity for this subset
                for feat_idx in subset:
                    rotation_matrix[feat_idx, feat_idx] = 1.0

        # Fill diagonal for unused features
        for i in range(n_features):
            if np.all(rotation_matrix[i, :] == 0) and np.all(rotation_matrix[:, i] == 0):
                rotation_matrix[i, i] = 1.0

        return rotation_matrix, subsets

    def _fit_single_tree(
        self,
        X: np.ndarray,
        y: np.ndarray,
        base_estimator: BaseEstimator,
        seed: int,
    ) -> tuple[BaseEstimator, np.ndarray, list[list[int]]]:
        """Fit a single rotation tree.

        Parameters
        ----------
        X : ndarray
            Training data.
        y : ndarray
            Target values.
        base_estimator : estimator
            Base tree to clone and fit.
        seed : int
            Random seed for this tree.

        Returns
        -------
        tree : estimator
            Fitted tree.
        rotation_matrix : ndarray
            Rotation matrix for this tree.
        subsets : List[List[int]]
            Feature subsets used.
        """
        n_samples = X.shape[0]
        rng = np.random.RandomState(seed)

        # Create rotation matrix
        rotation_matrix, subsets = self._create_rotation_matrix(X, rng)

        # Rotate data
        X_rotated = X @ rotation_matrix

        # Bootstrap samples
        if self.bootstrap:
            boot_idx = rng.choice(n_samples, n_samples, replace=True)
            X_boot = X_rotated[boot_idx]
            y_boot = y[boot_idx]
        else:
            X_boot = X_rotated
            y_boot = y

        # Train tree
        tree = clone(base_estimator)
        tree.fit(X_boot, y_boot)

        return tree, rotation_matrix, subsets

    def fit(self, X, y, **fit_params) -> BaseRotationForest:
        """Fit the rotation forest.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data.
        y : array-like of shape (n_samples,)
            Target values.

        Returns
        -------
        self
        """
        X, y = check_X_y(X, y)
        n_samples, n_features = X.shape

        rng = np.random.RandomState(self.random_state)
        # Generate seeds for each tree to ensure reproducibility with parallel execution
        seeds = rng.randint(0, 2**31 - 1, size=self.n_estimators)

        base_estimator = self.base_estimator or self._get_base_estimator()

        self._log(f"Training {self.n_estimators} rotation trees...")

        # For large datasets, use sequential processing to avoid pickling issues
        n_jobs = self.n_jobs
        if n_samples > 100000 and n_jobs != 1:
            n_jobs = 1  # Fall back to sequential for large datasets

        # Fit trees in parallel
        results = Parallel(n_jobs=n_jobs)(
            delayed(self._fit_single_tree)(X, y, base_estimator, seed)
            for seed in seeds
        )

        # Unpack results
        self.estimators_ = [r[0] for r in results]
        self.rotation_matrices_ = [r[1] for r in results]
        self.feature_subsets_ = [r[2] for r in results]

        self._n_features_in = n_features
        self._is_fitted = True
        return self


[docs] class RotationForestClassifier(ClassifierMixin, BaseRotationForest): """Rotation Forest for classification. Parameters ---------- n_estimators : int, default=10 Number of trees. n_subsets : int, default=3 Number of feature subsets per tree. max_features : float, default=0.5 Fraction of features per subset. base_estimator : estimator, optional Base tree. Default: DecisionTreeClassifier. bootstrap : bool, default=True Bootstrap samples. random_state : int, optional Random seed. Examples -------- >>> from endgame.models import RotationForestClassifier >>> clf = RotationForestClassifier(n_estimators=20) >>> clf.fit(X_train, y_train) >>> predictions = clf.predict(X_test) """ _estimator_type = "classifier" def _get_base_estimator(self) -> BaseEstimator: """Get default decision tree classifier.""" return DecisionTreeClassifier(random_state=self.random_state)
[docs] def fit(self, X, y, **fit_params) -> RotationForestClassifier: """Fit the classifier.""" X, y = check_X_y(X, y) self.classes_ = np.unique(y) self.n_classes_ = len(self.classes_) return super().fit(X, y, **fit_params)
[docs] def predict(self, X) -> np.ndarray: """Predict class labels. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples,) Predicted class labels. """ proba = self.predict_proba(X) return self.classes_[np.argmax(proba, axis=1)]
def _predict_proba_single_tree( self, X: np.ndarray, tree: BaseEstimator, rotation_matrix: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: """Get predictions from a single tree. Returns ------- tree_proba : ndarray Probability predictions from this tree. tree_classes : ndarray Classes known to this tree. """ X_rotated = X @ rotation_matrix return tree.predict_proba(X_rotated), tree.classes_
[docs] def predict_proba(self, X) -> np.ndarray: """Predict class probabilities. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples, n_classes) Class probabilities. """ check_is_fitted(self, ["estimators_", "rotation_matrices_"]) X = check_array(X) n_samples = X.shape[0] proba = np.zeros((n_samples, self.n_classes_)) # For large datasets, use sequential processing to avoid pickling issues # The rotation matrices can be very large and cause memory issues with joblib n_jobs = self.n_jobs if n_samples > 100000 and n_jobs != 1: n_jobs = 1 # Fall back to sequential for large datasets # Get predictions from all trees in parallel results = Parallel(n_jobs=n_jobs)( delayed(self._predict_proba_single_tree)(X, tree, rotation_matrix) for tree, rotation_matrix in zip(self.estimators_, self.rotation_matrices_) ) # Aggregate predictions for tree_proba, tree_classes in results: # Handle case where tree didn't see all classes for i, cls in enumerate(tree_classes): cls_idx = np.where(self.classes_ == cls)[0][0] proba[:, cls_idx] += tree_proba[:, i] proba /= len(self.estimators_) return proba
[docs] class RotationForestRegressor(BaseRotationForest, RegressorMixin): """Rotation Forest for regression. Parameters ---------- n_estimators : int, default=10 Number of trees. n_subsets : int, default=3 Number of feature subsets per tree. max_features : float, default=0.5 Fraction of features per subset. base_estimator : estimator, optional Base tree. Default: DecisionTreeRegressor. bootstrap : bool, default=True Bootstrap samples. random_state : int, optional Random seed. Examples -------- >>> from endgame.models import RotationForestRegressor >>> reg = RotationForestRegressor(n_estimators=20) >>> reg.fit(X_train, y_train) >>> predictions = reg.predict(X_test) """ _estimator_type = "regressor" def _get_base_estimator(self) -> BaseEstimator: """Get default decision tree regressor.""" return DecisionTreeRegressor(random_state=self.random_state) def _predict_single_tree( self, X: np.ndarray, tree: BaseEstimator, rotation_matrix: np.ndarray, ) -> np.ndarray: """Get predictions from a single tree.""" X_rotated = X @ rotation_matrix return tree.predict(X_rotated)
[docs] def predict(self, X) -> np.ndarray: """Predict target values. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples,) Predicted values. """ check_is_fitted(self, ["estimators_", "rotation_matrices_"]) X = check_array(X) # For large datasets, use sequential processing to avoid pickling issues n_jobs = self.n_jobs if X.shape[0] > 100000 and n_jobs != 1: n_jobs = 1 # Fall back to sequential for large datasets # Get predictions from all trees in parallel all_predictions = Parallel(n_jobs=n_jobs)( delayed(self._predict_single_tree)(X, tree, rotation_matrix) for tree, rotation_matrix in zip(self.estimators_, self.rotation_matrices_) ) # Average predictions predictions = np.mean(all_predictions, axis=0) return predictions