Source code for endgame.feature_selection.knockoff

from __future__ import annotations

"""Knockoff filter for FDR-controlled feature selection."""

from typing import Literal

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LassoCV
from sklearn.utils.validation import check_array, check_is_fitted, check_X_y


[docs] class KnockoffSelector(TransformerMixin, BaseEstimator): """Knockoff filter for feature selection with FDR control. The knockoff filter creates "knockoff" copies of features that have the same correlation structure but are independent of the target. Features are selected if they are more important than their knockoffs. Provides rigorous statistical guarantees on False Discovery Rate. Based on Barber & Candes (2015) and Candes et al. (2018). Parameters ---------- fdr : float, default=0.1 Target false discovery rate. method : str, default='equicorrelated' Knockoff generation method: - 'equicorrelated': Equicorrelated knockoffs (faster) - 'sdp': SDP knockoffs (more powerful, requires cvxpy) - 'gaussian': Model-X Gaussian knockoffs statistic : str, default='lasso_cv' Feature statistic: - 'lasso_cv': LASSO with CV selection - 'lasso_fixed': LASSO with fixed lambda - 'ridge': Ridge coefficients offset : int, default=1 Knockoff+ offset (0 for original knockoff). random_state : int, optional Random seed. verbose : bool, default=False Whether to print progress. Attributes ---------- selected_features_ : ndarray Indices of selected features. statistics_ : ndarray Knockoff statistics W_j for each feature. threshold_ : float Selection threshold. knockoffs_ : ndarray Generated knockoff features. Example ------- >>> from endgame.feature_selection import KnockoffSelector >>> selector = KnockoffSelector(fdr=0.1) >>> X_selected = selector.fit_transform(X, y) """ def __init__( self, fdr: float = 0.1, method: Literal["equicorrelated", "sdp", "gaussian"] = "equicorrelated", statistic: Literal["lasso_cv", "lasso_fixed", "ridge"] = "lasso_cv", offset: int = 1, random_state: int | None = None, verbose: bool = False, ): self.fdr = fdr self.method = method self.statistic = statistic self.offset = offset self.random_state = random_state self.verbose = verbose def _generate_knockoffs_equicorrelated( self, X: np.ndarray, rng ) -> np.ndarray: """Generate equicorrelated knockoffs. X_knockoff has same covariance structure but is independent of y. """ n_samples, n_features = X.shape # Center and scale X_centered = X - X.mean(axis=0) X_scaled = X_centered / (X_centered.std(axis=0) + 1e-10) # Compute correlation matrix Sigma = np.corrcoef(X_scaled, rowvar=False) Sigma = np.nan_to_num(Sigma, nan=0.0) # Regularize Sigma = Sigma + 0.1 * np.eye(n_features) # Compute s (knockoff variance parameter) min_eigenvalue = np.linalg.eigvalsh(Sigma).min() s = min(1.0, 2 * min_eigenvalue) * np.ones(n_features) # Generate knockoffs # X_knockoff = X - X @ Sigma^{-1} @ diag(s) + Z @ C # where C'C = 2*diag(s) - diag(s) @ Sigma^{-1} @ diag(s) Sigma_inv = np.linalg.inv(Sigma) S = np.diag(s) # Compute covariance of knockoff residual C_squared = 2 * S - S @ Sigma_inv @ S C_squared = 0.5 * (C_squared + C_squared.T) # Ensure symmetric # Eigendecomposition eigenvalues, eigenvectors = np.linalg.eigh(C_squared) eigenvalues = np.maximum(eigenvalues, 0) # Handle numerical issues C = eigenvectors @ np.diag(np.sqrt(eigenvalues)) @ eigenvectors.T # Generate knockoffs Z = rng.randn(n_samples, n_features) X_knockoff = X_scaled - X_scaled @ Sigma_inv @ S + Z @ C # Rescale X_knockoff = X_knockoff * X_centered.std(axis=0) + X.mean(axis=0) return X_knockoff def _generate_knockoffs_gaussian( self, X: np.ndarray, rng ) -> np.ndarray: """Generate Model-X Gaussian knockoffs.""" n_samples, n_features = X.shape # Estimate mean and covariance mu = X.mean(axis=0) Sigma = np.cov(X, rowvar=False) # Regularize Sigma = Sigma + 0.1 * np.eye(n_features) # Use equicorrelated construction with estimated parameters return self._generate_knockoffs_equicorrelated(X, rng) def _compute_statistics_lasso( self, X: np.ndarray, X_knockoff: np.ndarray, y: np.ndarray ) -> np.ndarray: """Compute knockoff statistics using LASSO.""" n_features = X.shape[1] # Combine original and knockoff features X_combined = np.hstack([X, X_knockoff]) # Fit LASSO if self.statistic == "lasso_cv": model = LassoCV(cv=5, random_state=self.random_state) else: model = LassoCV(cv=5, random_state=self.random_state) model.fit(X_combined, y) coef = np.abs(model.coef_) # Compute W statistics # W_j = |Z_j| - |Z_{j+p}| where Z are coefficients W = coef[:n_features] - coef[n_features:] return W def _compute_statistics_ridge( self, X: np.ndarray, X_knockoff: np.ndarray, y: np.ndarray ) -> np.ndarray: """Compute knockoff statistics using Ridge.""" from sklearn.linear_model import RidgeCV n_features = X.shape[1] X_combined = np.hstack([X, X_knockoff]) model = RidgeCV(cv=5) model.fit(X_combined, y) coef = np.abs(model.coef_) W = coef[:n_features] - coef[n_features:] return W def _compute_threshold(self, W: np.ndarray) -> float: """Compute knockoff threshold for FDR control.""" # Sort absolute values W_abs = np.abs(W) W_sorted = np.sort(W_abs)[::-1] threshold = np.inf for t in W_sorted: if t == 0: continue # Count selections n_positive = np.sum(t <= W) n_negative = np.sum(-t >= W) # Estimate FDP if n_positive == 0: continue fdp = (n_negative + self.offset) / n_positive if fdp <= self.fdr: threshold = t break return threshold if threshold != np.inf else W_sorted[0] + 1
[docs] def fit(self, X, y): """Fit the knockoff selector. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data. y : array-like of shape (n_samples,) Target values. Returns ------- self : KnockoffSelector """ X, y = check_X_y(X, y) n_samples, n_features = X.shape self.n_features_in_ = n_features rng = np.random.RandomState(self.random_state) if self.verbose: print("Generating knockoff features...") # Generate knockoffs if self.method == "equicorrelated": self.knockoffs_ = self._generate_knockoffs_equicorrelated(X, rng) elif self.method == "gaussian": self.knockoffs_ = self._generate_knockoffs_gaussian(X, rng) elif self.method == "sdp": # Fall back to equicorrelated if cvxpy not available try: import cvxpy # SDP implementation would go here self.knockoffs_ = self._generate_knockoffs_equicorrelated(X, rng) except ImportError: self.knockoffs_ = self._generate_knockoffs_equicorrelated(X, rng) else: raise ValueError(f"Unknown method: {self.method}") if self.verbose: print("Computing knockoff statistics...") # Compute statistics if self.statistic in ("lasso_cv", "lasso_fixed"): self.statistics_ = self._compute_statistics_lasso( X, self.knockoffs_, y ) elif self.statistic == "ridge": self.statistics_ = self._compute_statistics_ridge( X, self.knockoffs_, y ) else: raise ValueError(f"Unknown statistic: {self.statistic}") # Compute threshold self.threshold_ = self._compute_threshold(self.statistics_) if self.verbose: print(f"Threshold: {self.threshold_:.4f}") # Select features self._support_mask = self.statistics_ >= self.threshold_ self.selected_features_ = np.where(self._support_mask)[0] self.n_features_ = len(self.selected_features_) return self
[docs] def transform(self, X) -> np.ndarray: """Select features. Parameters ---------- X : array-like of shape (n_samples, n_features) Data to transform. Returns ------- X_selected : ndarray Data with selected features. """ check_is_fitted(self, "selected_features_") X = check_array(X) if len(self.selected_features_) == 0: # Return all features if none selected return X return X[:, self.selected_features_]
[docs] def fit_transform(self, X, y) -> np.ndarray: """Fit and transform.""" self.fit(X, y) return self.transform(X)
[docs] def get_support(self, indices: bool = False) -> np.ndarray: """Get mask or indices of selected features.""" check_is_fitted(self, "_support_mask") if indices: return self.selected_features_ return self._support_mask
[docs] def get_statistics(self) -> np.ndarray: """Get knockoff W statistics.""" check_is_fitted(self, "statistics_") return self.statistics_