from __future__ import annotations
"""Missing data imputation transformers.
Provides sklearn-compatible imputers with competition-winning defaults:
- SimpleImputer: Mean/median/mode/constant with better defaults than sklearn
- IndicatorImputer: Adds binary missing-indicator columns alongside imputed values
- KNNImputer: K-Nearest Neighbors imputation with competition defaults
- MICEImputer: Multiple Imputation by Chained Equations (IterativeImputer)
- MissForestImputer: Random Forest-based iterative imputation
- AutoImputer: Automatic strategy selection based on missingness patterns
All imputers accept numpy arrays and pandas DataFrames, preserving
column names when possible.
Examples
--------
>>> import numpy as np
>>> from endgame.preprocessing.imputation import AutoImputer
>>> X = np.array([[1, 2], [np.nan, 3], [7, np.nan]])
>>> imputer = AutoImputer()
>>> X_imputed = imputer.fit_transform(X)
"""
from typing import Any
import numpy as np
from sklearn.base import TransformerMixin
from endgame.core.base import EndgameEstimator
try:
import pandas as pd
HAS_PANDAS = True
except ImportError:
HAS_PANDAS = False
def _to_numpy(X: Any) -> np.ndarray:
"""Convert input to numpy array, handling pandas DataFrames."""
if isinstance(X, np.ndarray):
return X
if HAS_PANDAS and isinstance(X, pd.DataFrame):
return X.values
return np.asarray(X, dtype=np.float64)
def _extract_column_names(X: Any) -> list[str] | None:
"""Extract column names from input if available."""
if HAS_PANDAS and isinstance(X, pd.DataFrame):
return list(X.columns)
return None
def _restore_dataframe(
X_out: np.ndarray,
columns: list[str] | None,
was_dataframe: bool,
) -> Any:
"""Restore pandas DataFrame if the input was a DataFrame."""
if was_dataframe and HAS_PANDAS and columns is not None:
return pd.DataFrame(X_out, columns=columns)
return X_out
[docs]
class SimpleImputer(EndgameEstimator, TransformerMixin):
"""Simple imputation with mean, median, mode, or constant fill.
Thin wrapper around sklearn.impute.SimpleImputer with better defaults
for competition settings (median instead of mean, which is more robust
to outliers).
Parameters
----------
strategy : str, default='median'
Imputation strategy:
- 'mean': Replace with column mean
- 'median': Replace with column median (default, outlier-robust)
- 'most_frequent': Replace with mode
- 'constant': Replace with ``fill_value``
fill_value : float or str, optional
Value to use when ``strategy='constant'``. Default is 0.
add_indicator : bool, default=False
If True, append binary missing-indicator columns.
copy : bool, default=True
If True, create a copy of X before imputing.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
statistics_ : ndarray of shape (n_features,)
The imputation fill value for each feature.
indicator_ : MissingIndicator or None
Indicator used to add binary indicators for missing values.
n_features_in_ : int
Number of features seen during fit.
Examples
--------
>>> import numpy as np
>>> from endgame.preprocessing.imputation import SimpleImputer
>>> X = np.array([[1, 2], [np.nan, 3], [7, np.nan]])
>>> imp = SimpleImputer(strategy='median')
>>> imp.fit_transform(X)
array([[1. , 2. ],
[4. , 3. ],
[7. , 2.5]])
"""
def __init__(
self,
strategy: str = "median",
fill_value: float | str | None = None,
add_indicator: bool = False,
copy: bool = True,
verbose: bool = False,
):
super().__init__(verbose=verbose)
self.strategy = strategy
self.fill_value = fill_value
self.add_indicator = add_indicator
self.copy = copy
[docs]
def fit(self, X, y=None, **fit_params) -> SimpleImputer:
"""Fit the imputer on training data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data with missing values (np.nan).
y : ignored
Returns
-------
self
"""
from sklearn.impute import SimpleImputer as _SklearnSimpleImputer
self._column_names = _extract_column_names(X)
self._was_dataframe = HAS_PANDAS and isinstance(X, pd.DataFrame)
fill_value = self.fill_value if self.fill_value is not None else 0
self._imputer = _SklearnSimpleImputer(
strategy=self.strategy,
fill_value=fill_value,
add_indicator=self.add_indicator,
copy=self.copy,
)
self._imputer.fit(_to_numpy(X))
self.statistics_ = self._imputer.statistics_
self.n_features_in_ = self._imputer.n_features_in_
self.indicator_ = getattr(self._imputer, "indicator_", None)
self._is_fitted = True
self._log(f"Fitted SimpleImputer with strategy='{self.strategy}'")
return self
[docs]
def get_feature_names_out(
self, input_features: list[str] | None = None,
) -> list[str]:
"""Get output feature names."""
self._check_is_fitted()
names = input_features or self._column_names or []
if self.add_indicator and names:
names = list(names) + [f"{c}_missing" for c in names]
return names
[docs]
class IndicatorImputer(EndgameEstimator, TransformerMixin):
"""Imputer that adds binary missing-indicator columns alongside imputed values.
For each feature with missing values, appends a binary column indicating
which rows were originally missing. This is a common Kaggle trick that
lets tree-based models learn different splits for missing vs. non-missing.
Parameters
----------
base_strategy : str, default='median'
Strategy for filling missing values: 'mean', 'median', 'most_frequent',
'constant'.
fill_value : float, optional
Fill value when base_strategy='constant'.
only_missing : bool, default=True
If True, only add indicators for features that have missing values
in the training data. If False, add indicators for all features.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
statistics_ : ndarray of shape (n_features,)
The imputation fill value for each feature.
missing_features_ : list of int
Indices of features that had missing values during fit.
n_features_in_ : int
Number of features seen during fit.
Examples
--------
>>> import numpy as np
>>> from endgame.preprocessing.imputation import IndicatorImputer
>>> X = np.array([[1, 2], [np.nan, 3], [7, np.nan]])
>>> imp = IndicatorImputer(base_strategy='median')
>>> X_out = imp.fit_transform(X)
>>> X_out.shape
(3, 4)
"""
def __init__(
self,
base_strategy: str = "median",
fill_value: float | None = None,
only_missing: bool = True,
verbose: bool = False,
):
super().__init__(verbose=verbose)
self.base_strategy = base_strategy
self.fill_value = fill_value
self.only_missing = only_missing
[docs]
def fit(self, X, y=None, **fit_params) -> IndicatorImputer:
"""Fit the indicator imputer.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Returns
-------
self
"""
from sklearn.impute import SimpleImputer as _SklearnSimpleImputer
self._column_names = _extract_column_names(X)
self._was_dataframe = HAS_PANDAS and isinstance(X, pd.DataFrame)
X_np = _to_numpy(X)
self.n_features_in_ = X_np.shape[1]
# Identify features with missing values
missing_mask = np.isnan(X_np)
if self.only_missing:
self.missing_features_ = list(
np.where(missing_mask.any(axis=0))[0]
)
else:
self.missing_features_ = list(range(X_np.shape[1]))
# Fit the base imputer
fill_value = self.fill_value if self.fill_value is not None else 0
self._imputer = _SklearnSimpleImputer(
strategy=self.base_strategy,
fill_value=fill_value,
)
self._imputer.fit(X_np)
self.statistics_ = self._imputer.statistics_
self._is_fitted = True
self._log(
f"Fitted IndicatorImputer: {len(self.missing_features_)} features "
f"with missing values"
)
return self
[docs]
def get_feature_names_out(
self, input_features: list[str] | None = None,
) -> list[str]:
"""Get output feature names."""
self._check_is_fitted()
names = input_features or self._column_names or []
if names:
indicator_names = [f"{names[i]}_missing" for i in self.missing_features_]
return list(names) + indicator_names
return []
[docs]
class KNNImputer(EndgameEstimator, TransformerMixin):
"""K-Nearest Neighbors imputation with competition defaults.
Wraps sklearn.impute.KNNImputer with defaults tuned for tabular
competitions: n_neighbors=5, uniform weights, nan_euclidean distance.
Parameters
----------
n_neighbors : int, default=5
Number of nearest neighbors to use.
weights : str, default='uniform'
Weight function for prediction: 'uniform' or 'distance'.
metric : str, default='nan_euclidean'
Distance metric for finding neighbors.
add_indicator : bool, default=False
If True, append binary missing-indicator columns.
copy : bool, default=True
If True, create a copy of X.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
n_features_in_ : int
Number of features seen during fit.
Examples
--------
>>> import numpy as np
>>> from endgame.preprocessing.imputation import KNNImputer
>>> X = np.array([[1, 2], [np.nan, 3], [7, 6], [5, np.nan]])
>>> imp = KNNImputer(n_neighbors=2)
>>> imp.fit_transform(X)
array([[1. , 2. ],
[3. , 3. ],
[7. , 6. ],
[5. , 4. ]])
"""
def __init__(
self,
n_neighbors: int = 5,
weights: str = "uniform",
metric: str = "nan_euclidean",
add_indicator: bool = False,
copy: bool = True,
verbose: bool = False,
):
super().__init__(verbose=verbose)
self.n_neighbors = n_neighbors
self.weights = weights
self.metric = metric
self.add_indicator = add_indicator
self.copy = copy
[docs]
def fit(self, X, y=None, **fit_params) -> KNNImputer:
"""Fit the KNN imputer.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Returns
-------
self
"""
from sklearn.impute import KNNImputer as _SklearnKNNImputer
self._column_names = _extract_column_names(X)
self._was_dataframe = HAS_PANDAS and isinstance(X, pd.DataFrame)
self._imputer = _SklearnKNNImputer(
n_neighbors=self.n_neighbors,
weights=self.weights,
metric=self.metric,
add_indicator=self.add_indicator,
copy=self.copy,
)
self._imputer.fit(_to_numpy(X))
self.n_features_in_ = self._imputer.n_features_in_
self._is_fitted = True
self._log(f"Fitted KNNImputer with n_neighbors={self.n_neighbors}")
return self
[docs]
def get_feature_names_out(
self, input_features: list[str] | None = None,
) -> list[str]:
"""Get output feature names."""
self._check_is_fitted()
names = input_features or self._column_names or []
if self.add_indicator and names:
names = list(names) + [f"{c}_missing" for c in names]
return names
[docs]
class MICEImputer(EndgameEstimator, TransformerMixin):
"""Multiple Imputation by Chained Equations.
Uses sklearn.impute.IterativeImputer with BayesianRidge as the default
estimator, which is the standard MICE implementation. Iteratively models
each feature as a function of all other features.
Parameters
----------
estimator : estimator, optional
The estimator to predict each feature from all others. Default is
BayesianRidge, which provides the standard MICE formulation.
max_iter : int, default=10
Maximum number of imputation rounds.
tol : float, default=1e-3
Convergence tolerance.
initial_strategy : str, default='median'
Strategy for initial imputation before iterating: 'mean', 'median',
'most_frequent', 'constant'.
sample_posterior : bool, default=False
If True, sample from the predictive posterior for each imputation.
Provides proper multiple imputations when True.
random_state : int, default=42
Random seed for reproducibility. Default set for deterministic results
in competition settings.
add_indicator : bool, default=False
If True, append binary missing-indicator columns.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
n_features_in_ : int
Number of features seen during fit.
n_iter_ : int
Number of iterations performed.
Examples
--------
>>> import numpy as np
>>> from endgame.preprocessing.imputation import MICEImputer
>>> X = np.array([[1, 2], [np.nan, 3], [7, np.nan], [5, 4]])
>>> imp = MICEImputer(max_iter=10, random_state=42)
>>> X_imputed = imp.fit_transform(X)
"""
def __init__(
self,
estimator: Any | None = None,
max_iter: int = 10,
tol: float = 1e-3,
initial_strategy: str = "median",
sample_posterior: bool = False,
random_state: int | None = 42,
add_indicator: bool = False,
verbose: bool = False,
):
super().__init__(random_state=random_state, verbose=verbose)
self.estimator = estimator
self.max_iter = max_iter
self.tol = tol
self.initial_strategy = initial_strategy
self.sample_posterior = sample_posterior
self.add_indicator = add_indicator
[docs]
def fit(self, X, y=None, **fit_params) -> MICEImputer:
"""Fit the MICE imputer.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Returns
-------
self
"""
from sklearn.experimental import enable_iterative_imputer # noqa: F401
from sklearn.impute import IterativeImputer
self._column_names = _extract_column_names(X)
self._was_dataframe = HAS_PANDAS and isinstance(X, pd.DataFrame)
estimator = self.estimator
if estimator is None:
from sklearn.linear_model import BayesianRidge
estimator = BayesianRidge()
self._imputer = IterativeImputer(
estimator=estimator,
max_iter=self.max_iter,
tol=self.tol,
initial_strategy=self.initial_strategy,
sample_posterior=self.sample_posterior,
random_state=self.random_state,
add_indicator=self.add_indicator,
)
self._imputer.fit(_to_numpy(X))
self.n_features_in_ = self._imputer.n_features_in_
self.n_iter_ = self._imputer.n_iter_
self._is_fitted = True
self._log(
f"Fitted MICEImputer: converged in {self.n_iter_} iterations"
)
return self
[docs]
def get_feature_names_out(
self, input_features: list[str] | None = None,
) -> list[str]:
"""Get output feature names."""
self._check_is_fitted()
names = input_features or self._column_names or []
if self.add_indicator and names:
names = list(names) + [f"{c}_missing" for c in names]
return names
[docs]
class MissForestImputer(EndgameEstimator, TransformerMixin):
"""Random Forest-based iterative imputation (MissForest algorithm).
Uses sklearn.impute.IterativeImputer with a RandomForestRegressor
as the base estimator. This non-parametric approach handles non-linear
relationships and interactions between features effectively.
Parameters
----------
n_estimators : int, default=100
Number of trees in the random forest estimator.
max_iter : int, default=10
Maximum number of imputation rounds.
max_depth : int or None, default=None
Maximum depth of each tree. None means nodes are expanded until
all leaves are pure or contain fewer than min_samples_split samples.
max_features : str or float, default='sqrt'
Number of features considered at each split.
initial_strategy : str, default='median'
Strategy for initial imputation before iterating.
random_state : int, default=42
Random seed for reproducibility.
n_jobs : int, default=-1
Number of parallel jobs for the random forest. -1 uses all cores.
add_indicator : bool, default=False
If True, append binary missing-indicator columns.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
n_features_in_ : int
Number of features seen during fit.
n_iter_ : int
Number of iterations performed.
Examples
--------
>>> import numpy as np
>>> from endgame.preprocessing.imputation import MissForestImputer
>>> X = np.array([[1, 2], [np.nan, 3], [7, np.nan], [5, 4]])
>>> imp = MissForestImputer(n_estimators=50, random_state=42)
>>> X_imputed = imp.fit_transform(X)
"""
def __init__(
self,
n_estimators: int = 100,
max_iter: int = 10,
max_depth: int | None = None,
max_features: str | float = "sqrt",
initial_strategy: str = "median",
random_state: int | None = 42,
n_jobs: int = -1,
add_indicator: bool = False,
verbose: bool = False,
):
super().__init__(random_state=random_state, verbose=verbose)
self.n_estimators = n_estimators
self.max_iter = max_iter
self.max_depth = max_depth
self.max_features = max_features
self.initial_strategy = initial_strategy
self.n_jobs = n_jobs
self.add_indicator = add_indicator
[docs]
def fit(self, X, y=None, **fit_params) -> MissForestImputer:
"""Fit the MissForest imputer.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Returns
-------
self
"""
from sklearn.ensemble import RandomForestRegressor
from sklearn.experimental import enable_iterative_imputer # noqa: F401
from sklearn.impute import IterativeImputer
self._column_names = _extract_column_names(X)
self._was_dataframe = HAS_PANDAS and isinstance(X, pd.DataFrame)
rf_estimator = RandomForestRegressor(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
max_features=self.max_features,
random_state=self.random_state,
n_jobs=self.n_jobs,
)
self._imputer = IterativeImputer(
estimator=rf_estimator,
max_iter=self.max_iter,
initial_strategy=self.initial_strategy,
random_state=self.random_state,
add_indicator=self.add_indicator,
)
self._imputer.fit(_to_numpy(X))
self.n_features_in_ = self._imputer.n_features_in_
self.n_iter_ = self._imputer.n_iter_
self._is_fitted = True
self._log(
f"Fitted MissForestImputer with {self.n_estimators} trees, "
f"converged in {self.n_iter_} iterations"
)
return self
[docs]
def get_feature_names_out(
self, input_features: list[str] | None = None,
) -> list[str]:
"""Get output feature names."""
self._check_is_fitted()
names = input_features or self._column_names or []
if self.add_indicator and names:
names = list(names) + [f"{c}_missing" for c in names]
return names
[docs]
class AutoImputer(EndgameEstimator, TransformerMixin):
"""Automatic imputation strategy selection based on missingness patterns.
Analyzes the missingness structure in the data and selects an appropriate
imputation strategy:
- <5% missing -> SimpleImputer (fast, sufficient for low missingness)
- 5-30% missing -> KNNImputer (captures local structure)
- >30% missing -> MICEImputer (models complex dependencies)
Also performs an approximate Little's MCAR test to characterize the
missingness mechanism (MCAR, MAR, or MNAR).
Parameters
----------
strategy : str, default='auto'
Imputation strategy:
- 'auto': Automatically select based on missingness percentage
- 'simple': Force SimpleImputer
- 'knn': Force KNNImputer
- 'mice': Force MICEImputer
- 'missforest': Force MissForestImputer
low_threshold : float, default=0.05
Missingness fraction below which SimpleImputer is used (in auto mode).
high_threshold : float, default=0.30
Missingness fraction above which MICEImputer is used (in auto mode).
random_state : int, default=42
Random seed for reproducibility.
add_indicator : bool, default=False
If True, append binary missing-indicator columns.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
missingness_fraction_ : float
Overall fraction of missing values in the training data.
missingness_type_ : str
Detected missingness mechanism: 'MCAR', 'MAR', or 'MNAR'.
selected_strategy_ : str
The imputation strategy that was selected.
imputer_ : estimator
The fitted imputer instance.
n_features_in_ : int
Number of features seen during fit.
Examples
--------
>>> import numpy as np
>>> from endgame.preprocessing.imputation import AutoImputer
>>> X = np.array([[1, 2], [np.nan, 3], [7, np.nan], [5, 4]])
>>> imp = AutoImputer(strategy='auto', random_state=42)
>>> X_imputed = imp.fit_transform(X)
>>> imp.selected_strategy_
'knn'
"""
def __init__(
self,
strategy: str = "auto",
low_threshold: float = 0.05,
high_threshold: float = 0.30,
random_state: int | None = 42,
add_indicator: bool = False,
verbose: bool = False,
):
super().__init__(random_state=random_state, verbose=verbose)
self.strategy = strategy
self.low_threshold = low_threshold
self.high_threshold = high_threshold
self.add_indicator = add_indicator
@staticmethod
def _littles_mcar_test_approx(X: np.ndarray) -> tuple[str, float]:
"""Approximate Little's MCAR test using correlation between missingness
patterns and observed values.
This is a lightweight approximation. The full Little's MCAR test uses a
chi-squared statistic on the EM-estimated covariance matrix; here we
instead compute point-biserial correlations between each feature's
missingness indicator and all other observed features, then aggregate.
A high aggregate correlation suggests the missingness depends on
observed values (MAR or MNAR). If correlations are uniformly low,
missingness is likely MCAR.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Data with missing values (np.nan).
Returns
-------
missingness_type : str
One of 'MCAR', 'MAR', 'MNAR'.
test_statistic : float
Aggregate correlation score. Higher values indicate structured
missingness.
"""
n_samples, n_features = X.shape
missing_mask = np.isnan(X)
# Features that have some (but not all) missing values
cols_with_missing = []
for j in range(n_features):
n_miss = missing_mask[:, j].sum()
if 0 < n_miss < n_samples:
cols_with_missing.append(j)
if len(cols_with_missing) == 0:
return "MCAR", 0.0
correlations = []
for j in cols_with_missing:
indicator = missing_mask[:, j].astype(np.float64)
for k in range(n_features):
if k == j:
continue
# Use only rows where feature k is observed
observed_mask = ~missing_mask[:, k]
if observed_mask.sum() < 10:
continue
vals = X[observed_mask, k]
ind = indicator[observed_mask]
# Need variance in both to compute correlation
if np.std(vals) < 1e-12 or np.std(ind) < 1e-12:
continue
corr = np.abs(np.corrcoef(vals, ind)[0, 1])
if not np.isnan(corr):
correlations.append(corr)
if len(correlations) == 0:
return "MCAR", 0.0
mean_corr = float(np.mean(correlations))
max_corr = float(np.max(correlations))
# Heuristic thresholds:
# - mean_corr < 0.05 => MCAR (no systematic relationship)
# - mean_corr < 0.15 and max_corr < 0.3 => MAR (mild dependence)
# - otherwise => MNAR (strong dependence on observed or unobserved)
if mean_corr < 0.05:
return "MCAR", mean_corr
elif mean_corr < 0.15 and max_corr < 0.30:
return "MAR", mean_corr
else:
return "MNAR", mean_corr
[docs]
def fit(self, X, y=None, **fit_params) -> AutoImputer:
"""Fit the auto imputer.
Analyzes missingness patterns and selects the appropriate strategy,
then fits the chosen imputer.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Returns
-------
self
"""
self._column_names = _extract_column_names(X)
self._was_dataframe = HAS_PANDAS and isinstance(X, pd.DataFrame)
X_np = _to_numpy(X)
n_samples, n_features = X_np.shape
self.n_features_in_ = n_features
# Compute missingness statistics
total_cells = n_samples * n_features
n_missing = np.isnan(X_np).sum()
self.missingness_fraction_ = float(n_missing / total_cells) if total_cells > 0 else 0.0
# Approximate Little's MCAR test
self.missingness_type_, self._test_statistic = self._littles_mcar_test_approx(X_np)
# Select strategy
if self.strategy == "auto":
if self.missingness_fraction_ < self.low_threshold:
self.selected_strategy_ = "simple"
elif self.missingness_fraction_ < self.high_threshold:
self.selected_strategy_ = "knn"
else:
self.selected_strategy_ = "mice"
else:
self.selected_strategy_ = self.strategy
self._log(
f"Missingness: {self.missingness_fraction_:.1%} "
f"(type={self.missingness_type_}), "
f"selected strategy: {self.selected_strategy_}"
)
# Build and fit the selected imputer
if self.selected_strategy_ == "simple":
self.imputer_ = SimpleImputer(
strategy="median",
add_indicator=self.add_indicator,
verbose=self.verbose,
)
elif self.selected_strategy_ == "knn":
self.imputer_ = KNNImputer(
n_neighbors=5,
add_indicator=self.add_indicator,
verbose=self.verbose,
)
elif self.selected_strategy_ == "mice":
self.imputer_ = MICEImputer(
max_iter=10,
random_state=self.random_state,
add_indicator=self.add_indicator,
verbose=self.verbose,
)
elif self.selected_strategy_ == "missforest":
self.imputer_ = MissForestImputer(
random_state=self.random_state,
add_indicator=self.add_indicator,
verbose=self.verbose,
)
else:
raise ValueError(
f"Unknown strategy: '{self.strategy}'. "
"Use 'auto', 'simple', 'knn', 'mice', or 'missforest'."
)
self.imputer_.fit(X, y)
self._is_fitted = True
return self
[docs]
def get_feature_names_out(
self, input_features: list[str] | None = None,
) -> list[str]:
"""Get output feature names."""
self._check_is_fitted()
return self.imputer_.get_feature_names_out(input_features)