Source code for endgame.calibration.conformal
from __future__ import annotations
"""Conformal prediction for classification and regression.
Conformal prediction provides prediction sets (classification) or intervals
(regression) with guaranteed coverage probability under exchangeability.
References
----------
- Vovk et al. "Algorithmic Learning in a Random World" (2005)
- Romano et al. "Conformalized Quantile Regression" (2019)
- Angelopoulos & Bates "A Gentle Introduction to Conformal Prediction" (2022)
"""
from dataclasses import dataclass
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin, clone
from sklearn.model_selection import train_test_split
@dataclass
class ConformalResult:
"""Result container for conformal predictions."""
prediction_sets: list[set[int]] | None = None # For classification
intervals: tuple[np.ndarray, np.ndarray] | None = None # For regression
point_predictions: np.ndarray | None = None
conformity_scores: np.ndarray | None = None
quantile: float | None = None
[docs]
class ConformalClassifier(BaseEstimator, ClassifierMixin):
"""Conformal prediction wrapper for classification.
Provides prediction sets with guaranteed coverage probability.
Under exchangeability, P(y ∈ C(X)) >= 1 - alpha for a new test point.
Parameters
----------
estimator : sklearn-compatible classifier
Base classifier (must have predict_proba).
method : str, default='lac'
Conformal method:
- 'lac': Least Ambiguous set-valued Classifier (adaptive)
- 'aps': Adaptive Prediction Sets (randomized, exact coverage)
- 'raps': Regularized APS (controls set size with penalty)
- 'naive': Simple threshold on class probabilities
alpha : float, default=0.1
Miscoverage rate (1 - alpha = coverage probability).
k_reg : int, default=1
RAPS regularization: penalize sets larger than k_reg.
lambda_reg : float, default=0.01
RAPS regularization strength.
random_state : int, optional
Random seed for reproducibility.
Attributes
----------
estimator_ : estimator
Fitted base classifier.
classes_ : ndarray
Class labels.
n_classes_ : int
Number of classes.
conformity_scores_ : ndarray
Calibration conformity scores.
quantile_ : float
Calibrated quantile threshold.
Examples
--------
>>> from sklearn.linear_model import LogisticRegression
>>> from endgame.calibration import ConformalClassifier
>>>
>>> # Split data: train, calibration, test
>>> X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4)
>>> X_cal, X_test, y_cal, y_test = train_test_split(X_temp, y_temp, test_size=0.5)
>>>
>>> cp = ConformalClassifier(LogisticRegression(), method='aps', alpha=0.1)
>>> cp.fit(X_train, y_train, X_cal, y_cal)
>>> prediction_sets = cp.predict(X_test) # Returns sets with ~90% coverage
>>> print(f"Coverage: {cp.coverage_score(X_test, y_test):.3f}")
"""
_estimator_type = "classifier"
def __init__(
self,
estimator: BaseEstimator,
method: str = "lac",
alpha: float = 0.1,
k_reg: int = 1,
lambda_reg: float = 0.01,
random_state: int | None = None,
):
self.estimator = estimator
self.method = method
self.alpha = alpha
self.k_reg = k_reg
self.lambda_reg = lambda_reg
self.random_state = random_state
self.estimator_: BaseEstimator | None = None
self.classes_: np.ndarray | None = None
self.n_classes_: int = 0
self.conformity_scores_: np.ndarray | None = None
self.quantile_: float | None = None
self._is_fitted: bool = False
[docs]
def fit(
self,
X_train,
y_train,
X_cal: np.ndarray | None = None,
y_cal: np.ndarray | None = None,
cal_size: float = 0.2,
) -> ConformalClassifier:
"""Fit base model and calibrate conformal scores.
Parameters
----------
X_train : array-like of shape (n_train_samples, n_features)
Training features.
y_train : array-like of shape (n_train_samples,)
Training labels.
X_cal : array-like of shape (n_cal_samples, n_features), optional
Calibration features. If None, splits from training data.
y_cal : array-like of shape (n_cal_samples,), optional
Calibration labels. If None, splits from training data.
cal_size : float, default=0.2
Fraction of training data to use for calibration if X_cal not provided.
Returns
-------
self
Fitted conformal classifier.
"""
X_train = np.asarray(X_train)
y_train = np.asarray(y_train)
# Split calibration set if not provided
if X_cal is None or y_cal is None:
X_train, X_cal, y_train, y_cal = train_test_split(
X_train, y_train,
test_size=cal_size,
stratify=y_train,
random_state=self.random_state,
)
else:
X_cal = np.asarray(X_cal)
y_cal = np.asarray(y_cal)
# Store classes
self.classes_ = np.unique(y_train)
self.n_classes_ = len(self.classes_)
# Fit base estimator
self.estimator_ = clone(self.estimator)
self.estimator_.fit(X_train, y_train)
# Get calibration probabilities
cal_proba = self.estimator_.predict_proba(X_cal)
# Compute conformity scores on calibration set
self.conformity_scores_ = self._compute_conformity_scores(cal_proba, y_cal)
# Compute quantile for coverage guarantee
n_cal = len(y_cal)
q_level = np.ceil((n_cal + 1) * (1 - self.alpha)) / n_cal
q_level = min(q_level, 1.0)
self.quantile_ = np.quantile(self.conformity_scores_, q_level, method='higher')
self._is_fitted = True
return self
def _compute_conformity_scores(
self,
proba: np.ndarray,
y: np.ndarray,
) -> np.ndarray:
"""Compute conformity scores based on method."""
n_samples = len(y)
if self.method in ("naive", "lac"):
# Score = 1 - P(true class)
# LAC (Least Ambiguous Classifier, Sadinle et al. 2019):
# includes class y if P(y|x) >= 1 - q_hat
scores = 1 - proba[np.arange(n_samples), y]
elif self.method == "aps":
# Adaptive Prediction Sets (randomized)
rng = np.random.RandomState(self.random_state)
U = rng.uniform(0, 1, n_samples)
sorted_indices = np.argsort(-proba, axis=1)
sorted_proba = np.take_along_axis(proba, sorted_indices, axis=1)
cumsum_proba = np.cumsum(sorted_proba, axis=1)
scores = np.zeros(n_samples)
for i in range(n_samples):
true_class = y[i]
rank = np.where(sorted_indices[i] == true_class)[0][0]
if rank == 0:
scores[i] = U[i] * sorted_proba[i, 0]
else:
scores[i] = cumsum_proba[i, rank - 1] + U[i] * sorted_proba[i, rank]
elif self.method == "raps":
# Regularized APS
rng = np.random.RandomState(self.random_state)
U = rng.uniform(0, 1, n_samples)
sorted_indices = np.argsort(-proba, axis=1)
sorted_proba = np.take_along_axis(proba, sorted_indices, axis=1)
# Add regularization penalty
reg_penalty = self.lambda_reg * np.maximum(
0,
np.arange(1, self.n_classes_ + 1) - self.k_reg
)
sorted_proba_reg = sorted_proba + reg_penalty
cumsum_proba = np.cumsum(sorted_proba_reg, axis=1)
scores = np.zeros(n_samples)
for i in range(n_samples):
true_class = y[i]
rank = np.where(sorted_indices[i] == true_class)[0][0]
if rank == 0:
scores[i] = U[i] * sorted_proba_reg[i, 0]
else:
scores[i] = cumsum_proba[i, rank - 1] + U[i] * sorted_proba_reg[i, rank]
else:
raise ValueError(f"Unknown method: {self.method}")
return scores
[docs]
def predict(self, X) -> list[set[int]]:
"""Return prediction sets for each sample.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
Returns
-------
List[Set[int]]
Prediction set for each sample (set of class indices).
"""
self._check_is_fitted()
X = np.asarray(X)
proba = self.estimator_.predict_proba(X)
prediction_sets = self._construct_prediction_sets(proba)
return prediction_sets
def _construct_prediction_sets(self, proba: np.ndarray) -> list[set[int]]:
"""Construct prediction sets based on calibrated threshold."""
n_samples = proba.shape[0]
prediction_sets = []
if self.method in ("naive", "lac"):
# Include class y if P(y|x) >= 1 - quantile
for i in range(n_samples):
pred_set = set(np.where(proba[i] >= 1 - self.quantile_)[0])
if len(pred_set) == 0:
pred_set = {np.argmax(proba[i])}
prediction_sets.append(pred_set)
elif self.method in ["aps", "raps"]:
sorted_indices = np.argsort(-proba, axis=1)
sorted_proba = np.take_along_axis(proba, sorted_indices, axis=1)
if self.method == "raps":
reg_penalty = self.lambda_reg * np.maximum(
0,
np.arange(1, self.n_classes_ + 1) - self.k_reg
)
sorted_proba = sorted_proba + reg_penalty
cumsum_proba = np.cumsum(sorted_proba, axis=1)
for i in range(n_samples):
# Include classes in descending probability order until
# cumulative probability exceeds the quantile threshold
exceeds = np.where(cumsum_proba[i] >= self.quantile_)[0]
if len(exceeds) == 0:
# Include all classes if threshold never reached
pred_set = set(sorted_indices[i])
else:
# Include up to and including the first class that
# pushes cumulative probability past the threshold
max_idx = exceeds[0]
pred_set = set(sorted_indices[i, :max_idx + 1])
prediction_sets.append(pred_set)
return prediction_sets
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Return base classifier probabilities.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
Returns
-------
ndarray of shape (n_samples, n_classes)
Class probabilities from base classifier.
"""
self._check_is_fitted()
return self.estimator_.predict_proba(X)
[docs]
def predict_point(self, X) -> np.ndarray:
"""Return point predictions (most likely class).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
Returns
-------
ndarray of shape (n_samples,)
Predicted class labels.
"""
self._check_is_fitted()
return self.estimator_.predict(X)
[docs]
def coverage_score(self, X, y) -> float:
"""Compute empirical coverage on test data.
Parameters
----------
X : array-like
Test features.
y : array-like
True labels.
Returns
-------
float
Fraction of samples where true label is in prediction set.
"""
prediction_sets = self.predict(X)
y = np.asarray(y)
covered = sum(y[i] in pred_set for i, pred_set in enumerate(prediction_sets))
return covered / len(y)
[docs]
def average_set_size(self, X) -> float:
"""Average size of prediction sets (efficiency metric).
Smaller sets = more informative predictions.
Parameters
----------
X : array-like
Test features.
Returns
-------
float
Average prediction set size.
"""
prediction_sets = self.predict(X)
return np.mean([len(s) for s in prediction_sets])
def _check_is_fitted(self):
"""Check if the model is fitted."""
if not self._is_fitted:
raise RuntimeError("ConformalClassifier has not been fitted.")
[docs]
class ConformalRegressor(BaseEstimator, RegressorMixin):
"""Conformal prediction wrapper for regression.
Provides prediction intervals with guaranteed coverage probability.
Under exchangeability, P(y ∈ [lower, upper]) >= 1 - alpha.
Parameters
----------
estimator : sklearn-compatible regressor
Base regressor.
method : str, default='absolute'
Conformal method:
- 'absolute': Absolute residual method (symmetric intervals)
- 'normalized': Normalized residuals using variance estimate
- 'cqr': Conformalized Quantile Regression (asymmetric intervals)
- 'cqr_asymmetric': CQR with separate lower/upper scores
alpha : float, default=0.1
Miscoverage rate (1 - alpha = coverage probability).
quantile_estimator : estimator, optional
For CQR methods, estimator for quantiles. If None, uses
GradientBoostingRegressor with quantile loss.
symmetry : bool, default=True
For CQR, whether to use symmetric or asymmetric intervals.
random_state : int, optional
Random seed.
Attributes
----------
estimator_ : estimator
Fitted base regressor.
lower_estimator_ : estimator
For CQR, fitted lower quantile estimator.
upper_estimator_ : estimator
For CQR, fitted upper quantile estimator.
conformity_scores_ : ndarray
Calibration conformity scores.
quantile_ : float
Calibrated quantile threshold.
Examples
--------
>>> from sklearn.ensemble import RandomForestRegressor
>>> from endgame.calibration import ConformalRegressor
>>>
>>> cr = ConformalRegressor(RandomForestRegressor(), method='cqr', alpha=0.1)
>>> cr.fit(X_train, y_train, X_cal, y_cal)
>>> lower, upper = cr.predict_interval(X_test)
>>> print(f"Coverage: {cr.coverage_score(X_test, y_test):.3f}")
>>> print(f"Avg width: {cr.average_interval_width(X_test):.3f}")
"""
_estimator_type = "regressor"
def __init__(
self,
estimator: BaseEstimator,
method: str = "absolute",
alpha: float = 0.1,
quantile_estimator: BaseEstimator | None = None,
symmetry: bool = True,
random_state: int | None = None,
):
self.estimator = estimator
self.method = method
self.alpha = alpha
self.quantile_estimator = quantile_estimator
self.symmetry = symmetry
self.random_state = random_state
self.estimator_: BaseEstimator | None = None
self.lower_estimator_: BaseEstimator | None = None
self.upper_estimator_: BaseEstimator | None = None
self.variance_estimator_: BaseEstimator | None = None
self.conformity_scores_: np.ndarray | None = None
self.quantile_: float | None = None
self.lower_quantile_: float | None = None
self.upper_quantile_: float | None = None
self._is_fitted: bool = False
def _get_quantile_estimator(self, quantile: float):
"""Get quantile regressor."""
if self.quantile_estimator is not None:
est = clone(self.quantile_estimator)
if hasattr(est, 'alpha'):
est.alpha = quantile
elif hasattr(est, 'quantile'):
est.quantile = quantile
return est
# Default to GradientBoostingRegressor with quantile loss
try:
from sklearn.ensemble import GradientBoostingRegressor
return GradientBoostingRegressor(
loss='quantile',
alpha=quantile,
n_estimators=100,
max_depth=3,
random_state=self.random_state,
)
except ImportError:
raise ValueError(
"For CQR methods, please provide a quantile_estimator or "
"ensure sklearn.ensemble.GradientBoostingRegressor is available."
)
[docs]
def fit(
self,
X_train,
y_train,
X_cal: np.ndarray | None = None,
y_cal: np.ndarray | None = None,
cal_size: float = 0.2,
) -> ConformalRegressor:
"""Fit base model and calibrate conformal scores.
Parameters
----------
X_train : array-like
Training features.
y_train : array-like
Training targets.
X_cal : array-like, optional
Calibration features.
y_cal : array-like, optional
Calibration targets.
cal_size : float, default=0.2
Fraction for calibration if not provided.
Returns
-------
self
Fitted conformal regressor.
"""
X_train = np.asarray(X_train)
y_train = np.asarray(y_train).ravel()
# Split calibration set if not provided
if X_cal is None or y_cal is None:
X_train, X_cal, y_train, y_cal = train_test_split(
X_train, y_train,
test_size=cal_size,
random_state=self.random_state,
)
else:
X_cal = np.asarray(X_cal)
y_cal = np.asarray(y_cal).ravel()
if self.method in ["absolute", "normalized"]:
# Fit point estimator
self.estimator_ = clone(self.estimator)
self.estimator_.fit(X_train, y_train)
# Get calibration predictions
cal_pred = self.estimator_.predict(X_cal)
residuals = y_cal - cal_pred
if self.method == "absolute":
self.conformity_scores_ = np.abs(residuals)
else:
# Normalized: need variance estimate
self.variance_estimator_ = clone(self.estimator)
self.variance_estimator_.fit(X_train, (y_train - self.estimator_.predict(X_train))**2)
cal_var = np.maximum(self.variance_estimator_.predict(X_cal), 1e-6)
self.conformity_scores_ = np.abs(residuals) / np.sqrt(cal_var)
elif self.method in ["cqr", "cqr_asymmetric"]:
# Conformalized Quantile Regression
lower_q = self.alpha / 2
upper_q = 1 - self.alpha / 2
# Fit point estimator (for predict method)
self.estimator_ = clone(self.estimator)
self.estimator_.fit(X_train, y_train)
# Fit quantile estimators
self.lower_estimator_ = self._get_quantile_estimator(lower_q)
self.upper_estimator_ = self._get_quantile_estimator(upper_q)
self.lower_estimator_.fit(X_train, y_train)
self.upper_estimator_.fit(X_train, y_train)
# Get calibration quantile predictions
cal_lower = self.lower_estimator_.predict(X_cal)
cal_upper = self.upper_estimator_.predict(X_cal)
if self.method == "cqr" or self.symmetry:
# Symmetric CQR: take max of both sides
self.conformity_scores_ = np.maximum(
cal_lower - y_cal,
y_cal - cal_upper
)
else:
# Asymmetric CQR: separate scores
self.lower_scores_ = cal_lower - y_cal
self.upper_scores_ = y_cal - cal_upper
else:
raise ValueError(f"Unknown method: {self.method}")
# Compute quantile(s)
n_cal = len(y_cal)
q_level = np.ceil((n_cal + 1) * (1 - self.alpha)) / n_cal
q_level = min(q_level, 1.0)
if self.method == "cqr_asymmetric" and not self.symmetry:
q_level_half = np.ceil((n_cal + 1) * (1 - self.alpha / 2)) / n_cal
q_level_half = min(q_level_half, 1.0)
self.lower_quantile_ = np.quantile(self.lower_scores_, q_level_half, method='higher')
self.upper_quantile_ = np.quantile(self.upper_scores_, q_level_half, method='higher')
else:
self.quantile_ = np.quantile(self.conformity_scores_, q_level, method='higher')
self._is_fitted = True
return self
[docs]
def predict(self, X) -> np.ndarray:
"""Return point predictions.
Parameters
----------
X : array-like
Test samples.
Returns
-------
ndarray
Point predictions.
"""
self._check_is_fitted()
return self.estimator_.predict(X)
[docs]
def predict_interval(self, X) -> tuple[np.ndarray, np.ndarray]:
"""Return prediction intervals.
Parameters
----------
X : array-like
Test samples.
Returns
-------
lower : ndarray
Lower bounds of prediction intervals.
upper : ndarray
Upper bounds of prediction intervals.
"""
self._check_is_fitted()
X = np.asarray(X)
if self.method == "absolute":
pred = self.estimator_.predict(X)
lower = pred - self.quantile_
upper = pred + self.quantile_
elif self.method == "normalized":
pred = self.estimator_.predict(X)
var = np.maximum(self.variance_estimator_.predict(X), 1e-6)
width = self.quantile_ * np.sqrt(var)
lower = pred - width
upper = pred + width
elif self.method in ["cqr", "cqr_asymmetric"]:
pred_lower = self.lower_estimator_.predict(X)
pred_upper = self.upper_estimator_.predict(X)
if self.method == "cqr" or self.symmetry:
lower = pred_lower - self.quantile_
upper = pred_upper + self.quantile_
else:
lower = pred_lower - self.lower_quantile_
upper = pred_upper + self.upper_quantile_
return lower, upper
[docs]
def coverage_score(self, X, y) -> float:
"""Compute empirical coverage.
Parameters
----------
X : array-like
Test features.
y : array-like
True targets.
Returns
-------
float
Fraction of samples where true value is within interval.
"""
lower, upper = self.predict_interval(X)
y = np.asarray(y).ravel()
covered = np.sum((y >= lower) & (y <= upper))
return covered / len(y)
[docs]
def average_interval_width(self, X) -> float:
"""Average width of prediction intervals.
Parameters
----------
X : array-like
Test features.
Returns
-------
float
Average interval width.
"""
lower, upper = self.predict_interval(X)
return np.mean(upper - lower)
def _check_is_fitted(self):
"""Check if the model is fitted."""
if not self._is_fitted:
raise RuntimeError("ConformalRegressor has not been fitted.")
[docs]
class ConformizedQuantileRegressor(BaseEstimator, RegressorMixin):
"""Conformalized Quantile Regression for adaptive prediction intervals.
CQR combines quantile regression with conformal calibration to produce
prediction intervals that:
1. Adapt to heteroscedasticity (wider where uncertainty is higher)
2. Have guaranteed coverage under exchangeability
3. Are typically narrower than standard conformal methods
The algorithm:
1. Train quantile regressors for lower (α/2) and upper (1-α/2) quantiles
2. On calibration data, compute conformity scores:
E_i = max(q_lower(x_i) - y_i, y_i - q_upper(x_i))
3. Compute the (1-α)(1 + 1/n) quantile of scores
4. At prediction time:
[q_lower(x) - Q, q_upper(x) + Q]
This implementation integrates with QuantileRegressorForest but works
with any regressor that can predict quantiles.
Parameters
----------
quantile_estimator : estimator, optional
A regressor capable of predicting quantiles. Options:
- QuantileRegressorForest (recommended): Native quantile support
- GradientBoostingRegressor: With loss='quantile'
- Any estimator with predict_quantiles(X, quantiles) method
If None, uses QuantileRegressorForest with default settings.
alpha : float, default=0.1
Miscoverage rate. Target coverage is 1 - alpha.
E.g., alpha=0.1 targets 90% coverage.
cv : int or None, default=None
If int, use cross-conformal with cv folds for more efficient
data usage. Each fold serves as calibration for models trained
on other folds. If None, requires separate calibration set.
symmetric : bool, default=True
If True, use symmetric conformity scores:
E = max(q_lower - y, y - q_upper)
If False, use asymmetric scores (separate lower/upper adjustments):
E_lower = q_lower - y
E_upper = y - q_upper
Asymmetric can give tighter intervals but requires more calibration data.
random_state : int, RandomState, or None, default=None
Random seed for reproducibility.
Attributes
----------
quantile_estimator_ : estimator
Fitted quantile estimator (single model predicting both quantiles).
conformity_scores_ : ndarray
Calibration conformity scores.
quantile_ : float
Calibrated quantile threshold for symmetric CQR.
lower_quantile_ : float
Lower threshold for asymmetric CQR.
upper_quantile_ : float
Upper threshold for asymmetric CQR.
n_features_in_ : int
Number of features seen during fit.
Examples
--------
>>> from endgame.models.trees import QuantileRegressorForest
>>> from endgame.calibration import ConformizedQuantileRegressor
>>> from sklearn.model_selection import train_test_split
>>>
>>> # Split data into train and calibration
>>> X_train, X_cal, y_train, y_cal = train_test_split(X, y, test_size=0.2)
>>>
>>> # Create CQR with QRF (default)
>>> cqr = ConformizedQuantileRegressor(alpha=0.1)
>>> cqr.fit(X_train, y_train, X_cal, y_cal)
>>>
>>> # Get prediction intervals
>>> lower, upper = cqr.predict_interval(X_test)
>>> print(f"Coverage: {cqr.coverage_score(X_test, y_test):.3f}")
>>> print(f"Avg width: {np.mean(upper - lower):.3f}")
>>>
>>> # Using cross-conformal (no separate calibration set needed)
>>> cqr_cv = ConformizedQuantileRegressor(alpha=0.1, cv=5)
>>> cqr_cv.fit(X_train, y_train)
>>> lower, upper = cqr_cv.predict_interval(X_test)
Notes
-----
**When to use CQR vs standard conformal regression:**
- CQR produces adaptive intervals that are wider in high-uncertainty
regions and narrower in low-uncertainty regions
- Standard conformal produces constant-width intervals
- CQR is preferred for heteroscedastic data (varying noise)
- Standard conformal is simpler and may suffice for homoscedastic data
**Calibration set size:**
For reliable coverage, use at least 100-500 calibration samples.
With cv > 0, each fold acts as calibration, improving data efficiency.
**Integration with QuantileRegressorForest:**
QRF naturally estimates quantiles by storing leaf distributions.
This makes it ideal for CQR as it provides well-calibrated quantile
estimates out of the box without separate quantile loss training.
References
----------
Romano, Y., Patterson, E., & Candès, E. (2019). "Conformalized Quantile
Regression." NeurIPS.
Sesia, M., & Candès, E. (2020). "A comparison of some conformal quantile
regression methods." Stat, 9(1), e261.
"""
_estimator_type = "regressor"
def __init__(
self,
quantile_estimator: BaseEstimator | None = None,
alpha: float = 0.1,
cv: int | None = None,
symmetric: bool = True,
random_state: int | np.random.RandomState | None = None,
):
self.quantile_estimator = quantile_estimator
self.alpha = alpha
self.cv = cv
self.symmetric = symmetric
self.random_state = random_state
def _get_default_quantile_estimator(self):
"""Get default QuantileRegressorForest."""
try:
from endgame.models.trees import QuantileRegressorForest
return QuantileRegressorForest(
n_estimators=100,
random_state=self.random_state,
)
except ImportError:
# Fallback to GradientBoostingRegressor
from sklearn.ensemble import GradientBoostingRegressor
return GradientBoostingRegressor(
n_estimators=100,
max_depth=3,
random_state=self.random_state,
)
def _predict_quantiles(
self,
estimator: BaseEstimator,
X: np.ndarray,
quantiles: list[float],
) -> np.ndarray:
"""Predict quantiles using the estimator.
Returns array of shape (n_samples, n_quantiles).
"""
# Check if estimator has native quantile support
if hasattr(estimator, 'predict_quantiles'):
result = estimator.predict_quantiles(X, quantiles)
# Ensure 2D output
if result.ndim == 1:
result = result.reshape(-1, 1)
return result
# For QRF with quantiles parameter
if hasattr(estimator, 'quantiles'):
original_quantiles = estimator.quantiles
estimator.quantiles = quantiles
result = estimator.predict(X)
estimator.quantiles = original_quantiles
if result.ndim == 1:
result = result.reshape(-1, 1)
return result
# Fallback: assume it's a GradientBoostingRegressor-style estimator
# Need to fit separate models for each quantile
raise ValueError(
"Estimator must have predict_quantiles method or be a "
"QuantileRegressorForest. For other estimators, use "
"ConformalRegressor with method='cqr' instead."
)
[docs]
def fit(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_cal: np.ndarray | None = None,
y_cal: np.ndarray | None = None,
cal_size: float = 0.2,
) -> ConformizedQuantileRegressor:
"""Fit the CQR model.
Parameters
----------
X_train : array-like of shape (n_train_samples, n_features)
Training features.
y_train : array-like of shape (n_train_samples,)
Training targets.
X_cal : array-like of shape (n_cal_samples, n_features), optional
Calibration features. Required if cv is None.
If None and cv is None, splits from training data.
y_cal : array-like of shape (n_cal_samples,), optional
Calibration targets.
cal_size : float, default=0.2
Fraction of training data for calibration if X_cal not provided.
Returns
-------
self : object
Fitted CQR model.
"""
X_train = np.asarray(X_train)
y_train = np.asarray(y_train).ravel()
self.n_features_in_ = X_train.shape[1]
# Quantiles to predict
lower_q = self.alpha / 2
upper_q = 1 - self.alpha / 2
self._quantiles = [lower_q, upper_q]
if self.cv is not None:
# Cross-conformal: use CV to get conformity scores
return self._fit_cross_conformal(X_train, y_train)
# Standard split-conformal
if X_cal is None or y_cal is None:
X_train, X_cal, y_train, y_cal = train_test_split(
X_train, y_train,
test_size=cal_size,
random_state=self.random_state,
)
else:
X_cal = np.asarray(X_cal)
y_cal = np.asarray(y_cal).ravel()
# Initialize and fit quantile estimator
if self.quantile_estimator is None:
self.quantile_estimator_ = self._get_default_quantile_estimator()
else:
self.quantile_estimator_ = clone(self.quantile_estimator)
self.quantile_estimator_.fit(X_train, y_train)
# Compute conformity scores on calibration set
self._calibrate(X_cal, y_cal)
self._is_fitted = True
return self
def _fit_cross_conformal(
self,
X: np.ndarray,
y: np.ndarray,
) -> ConformizedQuantileRegressor:
"""Fit using cross-conformal for better data efficiency."""
from sklearn.model_selection import KFold
n_samples = len(y)
kf = KFold(n_splits=self.cv, shuffle=True, random_state=self.random_state)
# Store conformity scores from each fold
all_scores = []
all_lower_scores = []
all_upper_scores = []
# For final model, fit on all data
if self.quantile_estimator is None:
self.quantile_estimator_ = self._get_default_quantile_estimator()
else:
self.quantile_estimator_ = clone(self.quantile_estimator)
for train_idx, cal_idx in kf.split(X):
X_train_fold = X[train_idx]
y_train_fold = y[train_idx]
X_cal_fold = X[cal_idx]
y_cal_fold = y[cal_idx]
# Fit on training fold
fold_estimator = clone(self.quantile_estimator_)
fold_estimator.fit(X_train_fold, y_train_fold)
# Get quantile predictions on calibration fold
q_pred = self._predict_quantiles(fold_estimator, X_cal_fold, self._quantiles)
q_lower = q_pred[:, 0]
q_upper = q_pred[:, 1]
# Compute conformity scores
if self.symmetric:
scores = np.maximum(q_lower - y_cal_fold, y_cal_fold - q_upper)
all_scores.extend(scores.tolist())
else:
all_lower_scores.extend((q_lower - y_cal_fold).tolist())
all_upper_scores.extend((y_cal_fold - q_upper).tolist())
# Compute calibrated quantile(s)
n_cal = len(all_scores) if self.symmetric else len(all_lower_scores)
q_level = np.ceil((n_cal + 1) * (1 - self.alpha)) / n_cal
q_level = min(q_level, 1.0)
if self.symmetric:
self.conformity_scores_ = np.array(all_scores)
self.quantile_ = np.quantile(self.conformity_scores_, q_level, method='higher')
else:
q_level_half = np.ceil((n_cal + 1) * (1 - self.alpha / 2)) / n_cal
q_level_half = min(q_level_half, 1.0)
self.lower_scores_ = np.array(all_lower_scores)
self.upper_scores_ = np.array(all_upper_scores)
self.lower_quantile_ = np.quantile(self.lower_scores_, q_level_half, method='higher')
self.upper_quantile_ = np.quantile(self.upper_scores_, q_level_half, method='higher')
# Fit final model on all data
self.quantile_estimator_.fit(X, y)
self._is_fitted = True
return self
def _calibrate(self, X_cal: np.ndarray, y_cal: np.ndarray) -> None:
"""Compute conformity scores and calibrated quantile."""
# Get quantile predictions
q_pred = self._predict_quantiles(self.quantile_estimator_, X_cal, self._quantiles)
q_lower = q_pred[:, 0]
q_upper = q_pred[:, 1]
n_cal = len(y_cal)
q_level = np.ceil((n_cal + 1) * (1 - self.alpha)) / n_cal
q_level = min(q_level, 1.0)
if self.symmetric:
# Symmetric conformity scores
self.conformity_scores_ = np.maximum(
q_lower - y_cal,
y_cal - q_upper
)
self.quantile_ = np.quantile(
self.conformity_scores_, q_level, method='higher'
)
else:
# Asymmetric conformity scores
self.lower_scores_ = q_lower - y_cal
self.upper_scores_ = y_cal - q_upper
q_level_half = np.ceil((n_cal + 1) * (1 - self.alpha / 2)) / n_cal
q_level_half = min(q_level_half, 1.0)
self.lower_quantile_ = np.quantile(
self.lower_scores_, q_level_half, method='higher'
)
self.upper_quantile_ = np.quantile(
self.upper_scores_, q_level_half, method='higher'
)
[docs]
def predict(self, X: np.ndarray) -> np.ndarray:
"""Return point predictions (median).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
Returns
-------
y_pred : ndarray of shape (n_samples,)
Point predictions.
"""
self._check_is_fitted()
X = np.asarray(X)
# Predict median
q_pred = self._predict_quantiles(self.quantile_estimator_, X, [0.5])
return q_pred[:, 0]
[docs]
def predict_interval(
self,
X: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Return conformalized prediction intervals.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
Returns
-------
lower : ndarray of shape (n_samples,)
Lower bounds of prediction intervals.
upper : ndarray of shape (n_samples,)
Upper bounds of prediction intervals.
"""
self._check_is_fitted()
X = np.asarray(X)
# Get quantile predictions
q_pred = self._predict_quantiles(self.quantile_estimator_, X, self._quantiles)
q_lower = q_pred[:, 0]
q_upper = q_pred[:, 1]
# Apply conformal correction
if self.symmetric:
lower = q_lower - self.quantile_
upper = q_upper + self.quantile_
else:
lower = q_lower - self.lower_quantile_
upper = q_upper + self.upper_quantile_
return lower, upper
[docs]
def predict_quantiles(
self,
X: np.ndarray,
quantiles: list[float] | None = None,
) -> np.ndarray:
"""Predict arbitrary quantiles (uncalibrated).
Note: These are raw quantile predictions without conformal
calibration. For calibrated intervals, use predict_interval().
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
quantiles : list of float, optional
Quantiles to predict. Default: [0.1, 0.5, 0.9]
Returns
-------
ndarray of shape (n_samples, n_quantiles)
Quantile predictions.
"""
self._check_is_fitted()
X = np.asarray(X)
if quantiles is None:
quantiles = [0.1, 0.5, 0.9]
return self._predict_quantiles(self.quantile_estimator_, X, quantiles)
[docs]
def coverage_score(self, X, y) -> float:
"""Compute empirical coverage.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test features.
y : array-like of shape (n_samples,)
True targets.
Returns
-------
float
Fraction of samples where true value is within interval.
"""
lower, upper = self.predict_interval(X)
y = np.asarray(y).ravel()
covered = np.sum((y >= lower) & (y <= upper))
return covered / len(y)
[docs]
def average_interval_width(self, X) -> float:
"""Compute average prediction interval width.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test features.
Returns
-------
float
Average interval width.
"""
lower, upper = self.predict_interval(X)
return np.mean(upper - lower)
[docs]
def interval_width(self, X) -> np.ndarray:
"""Compute prediction interval widths for each sample.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test features.
Returns
-------
ndarray of shape (n_samples,)
Interval width for each sample.
"""
lower, upper = self.predict_interval(X)
return upper - lower
[docs]
def score(self, X, y) -> float:
"""Return negative interval width (for sklearn compatibility).
Higher is better (narrower intervals).
Parameters
----------
X : array-like
Test features.
y : array-like
Test targets (unused, for API compatibility).
Returns
-------
float
Negative average interval width.
"""
return -self.average_interval_width(X)
def _check_is_fitted(self):
"""Check if the model is fitted."""
if not hasattr(self, '_is_fitted') or not self._is_fitted:
raise RuntimeError("ConformizedQuantileRegressor has not been fitted.")