from __future__ import annotations
"""Knockoff filter for FDR-controlled feature selection."""
from typing import Literal
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LassoCV
from sklearn.utils.validation import check_array, check_is_fitted, check_X_y
[docs]
class KnockoffSelector(TransformerMixin, BaseEstimator):
"""Knockoff filter for feature selection with FDR control.
The knockoff filter creates "knockoff" copies of features that have
the same correlation structure but are independent of the target.
Features are selected if they are more important than their knockoffs.
Provides rigorous statistical guarantees on False Discovery Rate.
Based on Barber & Candes (2015) and Candes et al. (2018).
Parameters
----------
fdr : float, default=0.1
Target false discovery rate.
method : str, default='equicorrelated'
Knockoff generation method:
- 'equicorrelated': Equicorrelated knockoffs (faster)
- 'sdp': SDP knockoffs (more powerful, requires cvxpy)
- 'gaussian': Model-X Gaussian knockoffs
statistic : str, default='lasso_cv'
Feature statistic:
- 'lasso_cv': LASSO with CV selection
- 'lasso_fixed': LASSO with fixed lambda
- 'ridge': Ridge coefficients
offset : int, default=1
Knockoff+ offset (0 for original knockoff).
random_state : int, optional
Random seed.
verbose : bool, default=False
Whether to print progress.
Attributes
----------
selected_features_ : ndarray
Indices of selected features.
statistics_ : ndarray
Knockoff statistics W_j for each feature.
threshold_ : float
Selection threshold.
knockoffs_ : ndarray
Generated knockoff features.
Example
-------
>>> from endgame.feature_selection import KnockoffSelector
>>> selector = KnockoffSelector(fdr=0.1)
>>> X_selected = selector.fit_transform(X, y)
"""
def __init__(
self,
fdr: float = 0.1,
method: Literal["equicorrelated", "sdp", "gaussian"] = "equicorrelated",
statistic: Literal["lasso_cv", "lasso_fixed", "ridge"] = "lasso_cv",
offset: int = 1,
random_state: int | None = None,
verbose: bool = False,
):
self.fdr = fdr
self.method = method
self.statistic = statistic
self.offset = offset
self.random_state = random_state
self.verbose = verbose
def _generate_knockoffs_equicorrelated(
self, X: np.ndarray, rng
) -> np.ndarray:
"""Generate equicorrelated knockoffs.
X_knockoff has same covariance structure but is independent of y.
"""
n_samples, n_features = X.shape
# Center and scale
X_centered = X - X.mean(axis=0)
X_scaled = X_centered / (X_centered.std(axis=0) + 1e-10)
# Compute correlation matrix
Sigma = np.corrcoef(X_scaled, rowvar=False)
Sigma = np.nan_to_num(Sigma, nan=0.0)
# Regularize
Sigma = Sigma + 0.1 * np.eye(n_features)
# Compute s (knockoff variance parameter)
min_eigenvalue = np.linalg.eigvalsh(Sigma).min()
s = min(1.0, 2 * min_eigenvalue) * np.ones(n_features)
# Generate knockoffs
# X_knockoff = X - X @ Sigma^{-1} @ diag(s) + Z @ C
# where C'C = 2*diag(s) - diag(s) @ Sigma^{-1} @ diag(s)
Sigma_inv = np.linalg.inv(Sigma)
S = np.diag(s)
# Compute covariance of knockoff residual
C_squared = 2 * S - S @ Sigma_inv @ S
C_squared = 0.5 * (C_squared + C_squared.T) # Ensure symmetric
# Eigendecomposition
eigenvalues, eigenvectors = np.linalg.eigh(C_squared)
eigenvalues = np.maximum(eigenvalues, 0) # Handle numerical issues
C = eigenvectors @ np.diag(np.sqrt(eigenvalues)) @ eigenvectors.T
# Generate knockoffs
Z = rng.randn(n_samples, n_features)
X_knockoff = X_scaled - X_scaled @ Sigma_inv @ S + Z @ C
# Rescale
X_knockoff = X_knockoff * X_centered.std(axis=0) + X.mean(axis=0)
return X_knockoff
def _generate_knockoffs_gaussian(
self, X: np.ndarray, rng
) -> np.ndarray:
"""Generate Model-X Gaussian knockoffs."""
n_samples, n_features = X.shape
# Estimate mean and covariance
mu = X.mean(axis=0)
Sigma = np.cov(X, rowvar=False)
# Regularize
Sigma = Sigma + 0.1 * np.eye(n_features)
# Use equicorrelated construction with estimated parameters
return self._generate_knockoffs_equicorrelated(X, rng)
def _compute_statistics_lasso(
self, X: np.ndarray, X_knockoff: np.ndarray, y: np.ndarray
) -> np.ndarray:
"""Compute knockoff statistics using LASSO."""
n_features = X.shape[1]
# Combine original and knockoff features
X_combined = np.hstack([X, X_knockoff])
# Fit LASSO
if self.statistic == "lasso_cv":
model = LassoCV(cv=5, random_state=self.random_state)
else:
model = LassoCV(cv=5, random_state=self.random_state)
model.fit(X_combined, y)
coef = np.abs(model.coef_)
# Compute W statistics
# W_j = |Z_j| - |Z_{j+p}| where Z are coefficients
W = coef[:n_features] - coef[n_features:]
return W
def _compute_statistics_ridge(
self, X: np.ndarray, X_knockoff: np.ndarray, y: np.ndarray
) -> np.ndarray:
"""Compute knockoff statistics using Ridge."""
from sklearn.linear_model import RidgeCV
n_features = X.shape[1]
X_combined = np.hstack([X, X_knockoff])
model = RidgeCV(cv=5)
model.fit(X_combined, y)
coef = np.abs(model.coef_)
W = coef[:n_features] - coef[n_features:]
return W
def _compute_threshold(self, W: np.ndarray) -> float:
"""Compute knockoff threshold for FDR control."""
# Sort absolute values
W_abs = np.abs(W)
W_sorted = np.sort(W_abs)[::-1]
threshold = np.inf
for t in W_sorted:
if t == 0:
continue
# Count selections
n_positive = np.sum(t <= W)
n_negative = np.sum(-t >= W)
# Estimate FDP
if n_positive == 0:
continue
fdp = (n_negative + self.offset) / n_positive
if fdp <= self.fdr:
threshold = t
break
return threshold if threshold != np.inf else W_sorted[0] + 1
[docs]
def fit(self, X, y):
"""Fit the knockoff selector.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : array-like of shape (n_samples,)
Target values.
Returns
-------
self : KnockoffSelector
"""
X, y = check_X_y(X, y)
n_samples, n_features = X.shape
self.n_features_in_ = n_features
rng = np.random.RandomState(self.random_state)
if self.verbose:
print("Generating knockoff features...")
# Generate knockoffs
if self.method == "equicorrelated":
self.knockoffs_ = self._generate_knockoffs_equicorrelated(X, rng)
elif self.method == "gaussian":
self.knockoffs_ = self._generate_knockoffs_gaussian(X, rng)
elif self.method == "sdp":
# Fall back to equicorrelated if cvxpy not available
try:
import cvxpy
# SDP implementation would go here
self.knockoffs_ = self._generate_knockoffs_equicorrelated(X, rng)
except ImportError:
self.knockoffs_ = self._generate_knockoffs_equicorrelated(X, rng)
else:
raise ValueError(f"Unknown method: {self.method}")
if self.verbose:
print("Computing knockoff statistics...")
# Compute statistics
if self.statistic in ("lasso_cv", "lasso_fixed"):
self.statistics_ = self._compute_statistics_lasso(
X, self.knockoffs_, y
)
elif self.statistic == "ridge":
self.statistics_ = self._compute_statistics_ridge(
X, self.knockoffs_, y
)
else:
raise ValueError(f"Unknown statistic: {self.statistic}")
# Compute threshold
self.threshold_ = self._compute_threshold(self.statistics_)
if self.verbose:
print(f"Threshold: {self.threshold_:.4f}")
# Select features
self._support_mask = self.statistics_ >= self.threshold_
self.selected_features_ = np.where(self._support_mask)[0]
self.n_features_ = len(self.selected_features_)
return self
[docs]
def get_support(self, indices: bool = False) -> np.ndarray:
"""Get mask or indices of selected features."""
check_is_fitted(self, "_support_mask")
if indices:
return self.selected_features_
return self._support_mask
[docs]
def get_statistics(self) -> np.ndarray:
"""Get knockoff W statistics."""
check_is_fitted(self, "statistics_")
return self.statistics_