Source code for endgame.preprocessing.encoding
from __future__ import annotations
"""Target and categorical encoding transformers."""
from typing import Any
import numpy as np
from sklearn.model_selection import KFold
from endgame.core.base import PolarsTransformer
from endgame.core.polars_ops import (
HAS_PANDAS,
HAS_POLARS,
infer_categorical_columns,
)
if HAS_POLARS:
import polars as pl
if HAS_PANDAS:
pass
[docs]
class SafeTargetEncoder(PolarsTransformer):
"""Target encoding with M-estimate smoothing and inner-fold encoding.
Prevents target leakage through nested cross-validation during fit
and applies smoothing for rare categories.
Implements: S_i = (n_i × μ_i + m × μ_global) / (n_i + m)
Parameters
----------
cols : List[str], optional
Columns to encode. If None, encodes all categorical columns.
smoothing : float, default=10
Smoothing parameter (m) for rare categories.
Higher values = more regularization toward global mean.
cv : int, default=5
Number of folds for inner-fold encoding during fit.
min_samples_leaf : int, default=1
Minimum samples required to compute category statistic.
noise_level : float, default=0.0
Gaussian noise std to add for regularization.
handle_unknown : str, default='global_mean'
Strategy for unseen categories: 'global_mean', 'nan', 'error'.
output_format : str, default='auto'
Output format: 'auto', 'polars', 'pandas', 'numpy'.
random_state : int, optional
Random seed for cross-validation and noise.
Examples
--------
>>> from endgame.preprocessing import SafeTargetEncoder
>>> encoder = SafeTargetEncoder(smoothing=10, cv=5)
>>> X_encoded = encoder.fit_transform(X, y)
"""
def __init__(
self,
cols: list[str] | None = None,
smoothing: float = 10.0,
cv: int = 5,
min_samples_leaf: int = 1,
noise_level: float = 0.0,
handle_unknown: str = "global_mean",
output_format: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
super().__init__(
output_format=output_format,
random_state=random_state,
verbose=verbose,
)
self.cols = cols
self.smoothing = smoothing
self.cv = cv
self.min_samples_leaf = min_samples_leaf
self.noise_level = noise_level
self.handle_unknown = handle_unknown
self._encodings: dict[str, dict[Any, float]] = {}
self._global_mean: float = 0.0
self._target_cols: list[str] = []
[docs]
def fit(self, X, y, **fit_params) -> SafeTargetEncoder:
"""Fit the target encoder.
Uses inner-fold encoding to prevent leakage during training.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : array-like of shape (n_samples,)
Target values.
Returns
-------
self
"""
lf = self._to_lazyframe(X, store_metadata=True)
df = lf.collect()
y = np.asarray(y).astype(np.float64)
self._global_mean = float(np.mean(y))
# Determine columns to encode
if self.cols is None:
self._target_cols = infer_categorical_columns(X)
else:
self._target_cols = [c for c in self.cols if c in df.columns]
self._log(f"Encoding {len(self._target_cols)} columns with target encoding")
# Compute encodings using full data (for transform)
for col in self._target_cols:
col_data = df[col].to_numpy()
self._encodings[col] = self._compute_encoding(col_data, y)
self._is_fitted = True
return self
def _compute_encoding(
self,
col_data: np.ndarray,
y: np.ndarray,
) -> dict[Any, float]:
"""Compute target encoding for a single column."""
encodings = {}
# Replace None with a sentinel before finding uniques to avoid
# comparison errors between str and NoneType during sorting.
_MISSING = "__MISSING__"
is_none = np.array([v is None or (isinstance(v, float) and np.isnan(v))
for v in col_data])
safe_data = np.where(is_none, _MISSING, col_data)
unique_values = np.unique(safe_data)
for value in unique_values:
mask = safe_data == value
n_samples = mask.sum()
if n_samples < self.min_samples_leaf:
encodings[value] = self._global_mean
else:
category_mean = y[mask].mean()
# M-estimate smoothing
smoothed = (
n_samples * category_mean + self.smoothing * self._global_mean
) / (n_samples + self.smoothing)
encodings[value] = smoothed
return encodings
[docs]
def transform(self, X) -> Any:
"""Transform data using learned encodings.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data to transform.
Returns
-------
X_transformed : array-like
Transformed data with encoded columns.
"""
self._check_is_fitted()
lf = self._to_lazyframe(X)
df = lf.collect()
result_dict = {}
for col in df.columns:
if col in self._target_cols:
# Apply encoding
col_data = df[col].to_numpy()
encoded = np.array([
self._encodings[col].get(v, self._handle_unknown_value(col, v))
for v in col_data
])
# Add noise if specified
if self.noise_level > 0:
rng = np.random.RandomState(self.random_state)
encoded = encoded + rng.normal(0, self.noise_level, len(encoded))
result_dict[col] = encoded
else:
result_dict[col] = df[col].to_numpy()
result_df = pl.DataFrame(result_dict)
return self._from_lazyframe(result_df.lazy())
def _handle_unknown_value(self, col: str, value: Any) -> float:
"""Handle unknown category values."""
if self.handle_unknown == "global_mean":
return self._global_mean
elif self.handle_unknown == "nan":
return np.nan
elif self.handle_unknown == "error":
raise ValueError(f"Unknown value '{value}' in column '{col}'")
else:
raise ValueError(f"Unknown handle_unknown strategy: {self.handle_unknown}")
[docs]
def fit_transform(self, X, y, **fit_params) -> Any:
"""Fit and transform with inner-fold encoding to prevent leakage.
During fit_transform, uses cross-validation to compute encodings
without leakage. Each sample is encoded using statistics computed
only from other samples.
Parameters
----------
X : array-like
Training data.
y : array-like
Target values.
Returns
-------
X_transformed : array-like
Transformed training data.
"""
lf = self._to_lazyframe(X, store_metadata=True)
df = lf.collect()
y = np.asarray(y).astype(np.float64)
self._global_mean = float(np.mean(y))
# Determine columns to encode
if self.cols is None:
self._target_cols = infer_categorical_columns(X)
else:
self._target_cols = [c for c in self.cols if c in df.columns]
n_samples = len(df)
result_dict = {col: np.zeros(n_samples) for col in self._target_cols}
# Copy non-encoded columns
for col in df.columns:
if col not in self._target_cols:
result_dict[col] = df[col].to_numpy()
# Inner-fold encoding
kf = KFold(n_splits=self.cv, shuffle=True, random_state=self.random_state)
for train_idx, val_idx in kf.split(df):
for col in self._target_cols:
col_data = df[col].to_numpy()
# Compute encoding from train fold only
train_encoding = self._compute_encoding(
col_data[train_idx],
y[train_idx],
)
# Apply to validation fold
for idx in val_idx:
value = col_data[idx]
encoded = train_encoding.get(value, self._global_mean)
result_dict[col][idx] = encoded
# Store encodings computed from full data for transform
for col in self._target_cols:
col_data = df[col].to_numpy()
self._encodings[col] = self._compute_encoding(col_data, y)
# Add noise if specified
if self.noise_level > 0:
rng = np.random.RandomState(self.random_state)
for col in self._target_cols:
result_dict[col] = result_dict[col] + rng.normal(
0, self.noise_level, n_samples
)
self._is_fitted = True
result_df = pl.DataFrame(result_dict)
return self._from_lazyframe(result_df.lazy())
[docs]
def get_feature_names_out(
self,
input_features: list[str] | None = None,
) -> list[str]:
"""Get output feature names (same as input for target encoding)."""
self._check_is_fitted()
return self._feature_names_in or []
[docs]
class LeaveOneOutEncoder(PolarsTransformer):
"""Leave-One-Out target encoding for online settings.
Each sample's encoding excludes its own target value, preventing
direct leakage while still using all available data.
Parameters
----------
cols : List[str], optional
Columns to encode. If None, encodes all categorical columns.
smoothing : float, default=1.0
Smoothing parameter for regularization.
handle_unknown : str, default='global_mean'
Strategy for unseen categories.
random_state : int, optional
Random seed for reproducibility.
"""
def __init__(
self,
cols: list[str] | None = None,
smoothing: float = 1.0,
handle_unknown: str = "global_mean",
output_format: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
super().__init__(
output_format=output_format,
random_state=random_state,
verbose=verbose,
)
self.cols = cols
self.smoothing = smoothing
self.handle_unknown = handle_unknown
self._encodings: dict[str, dict[Any, tuple]] = {} # value -> (sum, count)
self._global_mean: float = 0.0
self._target_cols: list[str] = []
[docs]
def fit(self, X, y, **fit_params) -> LeaveOneOutEncoder:
"""Fit the LOO encoder."""
lf = self._to_lazyframe(X, store_metadata=True)
df = lf.collect()
y = np.asarray(y).astype(np.float64)
self._global_mean = float(np.mean(y))
if self.cols is None:
self._target_cols = infer_categorical_columns(X)
else:
self._target_cols = [c for c in self.cols if c in df.columns]
# Store sum and count for each category
for col in self._target_cols:
col_data = df[col].to_numpy()
self._encodings[col] = {}
for value in np.unique(col_data):
mask = col_data == value
self._encodings[col][value] = (y[mask].sum(), mask.sum())
self._is_fitted = True
return self
[docs]
def transform(self, X) -> Any:
"""Transform using stored statistics (no LOO at test time)."""
self._check_is_fitted()
lf = self._to_lazyframe(X)
df = lf.collect()
result_dict = {}
for col in df.columns:
if col in self._target_cols:
col_data = df[col].to_numpy()
encoded = np.zeros(len(col_data))
for i, value in enumerate(col_data):
if value in self._encodings[col]:
total, count = self._encodings[col][value]
encoded[i] = (total + self.smoothing * self._global_mean) / (
count + self.smoothing
)
else:
encoded[i] = self._global_mean
result_dict[col] = encoded
else:
result_dict[col] = df[col].to_numpy()
result_df = pl.DataFrame(result_dict)
return self._from_lazyframe(result_df.lazy())
[docs]
def fit_transform(self, X, y, **fit_params) -> Any:
"""Fit and transform with LOO to prevent leakage."""
self.fit(X, y, **fit_params)
lf = self._to_lazyframe(X)
df = lf.collect()
y = np.asarray(y).astype(np.float64)
result_dict = {}
for col in df.columns:
if col in self._target_cols:
col_data = df[col].to_numpy()
encoded = np.zeros(len(col_data))
for i, value in enumerate(col_data):
if value in self._encodings[col]:
total, count = self._encodings[col][value]
# Leave out current sample
loo_total = total - y[i]
loo_count = count - 1
if loo_count > 0:
encoded[i] = (
loo_total + self.smoothing * self._global_mean
) / (loo_count + self.smoothing)
else:
encoded[i] = self._global_mean
else:
encoded[i] = self._global_mean
result_dict[col] = encoded
else:
result_dict[col] = df[col].to_numpy()
result_df = pl.DataFrame(result_dict)
return self._from_lazyframe(result_df.lazy())
[docs]
class CatBoostEncoder(PolarsTransformer):
"""CatBoost-style ordered target encoding.
Encodes based only on preceding samples, mimicking CatBoost's
internal target statistic computation. Prevents leakage by
using only "past" information for each sample.
Parameters
----------
cols : List[str], optional
Columns to encode.
smoothing : float, default=1.0
Smoothing parameter.
random_state : int, optional
Random seed for sample ordering.
"""
def __init__(
self,
cols: list[str] | None = None,
smoothing: float = 1.0,
output_format: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
super().__init__(
output_format=output_format,
random_state=random_state,
verbose=verbose,
)
self.cols = cols
self.smoothing = smoothing
self._encodings: dict[str, dict[Any, float]] = {}
self._global_mean: float = 0.0
self._target_cols: list[str] = []
[docs]
def fit(self, X, y, **fit_params) -> CatBoostEncoder:
"""Fit encoder (stores final statistics for transform)."""
lf = self._to_lazyframe(X, store_metadata=True)
df = lf.collect()
y = np.asarray(y).astype(np.float64)
self._global_mean = float(np.mean(y))
if self.cols is None:
self._target_cols = infer_categorical_columns(X)
else:
self._target_cols = [c for c in self.cols if c in df.columns]
# Compute final encodings for transform
for col in self._target_cols:
col_data = df[col].to_numpy()
self._encodings[col] = {}
for value in np.unique(col_data):
mask = col_data == value
n = mask.sum()
if n > 0:
mean = y[mask].mean()
self._encodings[col][value] = (
n * mean + self.smoothing * self._global_mean
) / (n + self.smoothing)
else:
self._encodings[col][value] = self._global_mean
self._is_fitted = True
return self
[docs]
def transform(self, X) -> Any:
"""Transform using final statistics."""
self._check_is_fitted()
lf = self._to_lazyframe(X)
df = lf.collect()
result_dict = {}
for col in df.columns:
if col in self._target_cols:
col_data = df[col].to_numpy()
encoded = np.array([
self._encodings[col].get(v, self._global_mean)
for v in col_data
])
result_dict[col] = encoded
else:
result_dict[col] = df[col].to_numpy()
result_df = pl.DataFrame(result_dict)
return self._from_lazyframe(result_df.lazy())
[docs]
def fit_transform(self, X, y, **fit_params) -> Any:
"""Fit and transform with ordered encoding."""
lf = self._to_lazyframe(X, store_metadata=True)
df = lf.collect()
y = np.asarray(y).astype(np.float64)
self._global_mean = float(np.mean(y))
if self.cols is None:
self._target_cols = infer_categorical_columns(X)
else:
self._target_cols = [c for c in self.cols if c in df.columns]
n_samples = len(df)
# Random permutation for ordering
rng = np.random.RandomState(self.random_state)
perm = rng.permutation(n_samples)
inv_perm = np.argsort(perm)
result_dict = {}
for col in df.columns:
if col in self._target_cols:
col_data = df[col].to_numpy()[perm]
y_perm = y[perm]
encoded = np.zeros(n_samples)
# Running statistics
running_sum: dict[Any, float] = {}
running_count: dict[Any, int] = {}
for i in range(n_samples):
value = col_data[i]
# Use running statistics up to this point
if value in running_count and running_count[value] > 0:
prior_sum = running_sum[value]
prior_count = running_count[value]
encoded[i] = (
prior_sum + self.smoothing * self._global_mean
) / (prior_count + self.smoothing)
else:
encoded[i] = self._global_mean
# Update running statistics
if value not in running_sum:
running_sum[value] = 0.0
running_count[value] = 0
running_sum[value] += y_perm[i]
running_count[value] += 1
# Restore original order
result_dict[col] = encoded[inv_perm]
else:
result_dict[col] = df[col].to_numpy()
# Store final encodings for transform
for col in self._target_cols:
col_data = df[col].to_numpy()
self._encodings[col] = {}
for value in np.unique(col_data):
mask = col_data == value
n = mask.sum()
if n > 0:
mean = y[mask].mean()
self._encodings[col][value] = (
n * mean + self.smoothing * self._global_mean
) / (n + self.smoothing)
self._is_fitted = True
result_df = pl.DataFrame(result_dict)
return self._from_lazyframe(result_df.lazy())
[docs]
class FrequencyEncoder(PolarsTransformer):
"""Frequency encoding for categorical features.
Replaces categories with their frequency (count or proportion).
Simple but effective encoding that doesn't require target values.
Parameters
----------
cols : List[str], optional
Columns to encode. If None, encodes all categorical columns.
normalize : bool, default=True
If True, use proportions. If False, use raw counts.
handle_unknown : str, default='zero'
Strategy for unseen categories: 'zero', 'nan', 'error'.
"""
def __init__(
self,
cols: list[str] | None = None,
normalize: bool = True,
handle_unknown: str = "zero",
output_format: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
super().__init__(
output_format=output_format,
random_state=random_state,
verbose=verbose,
)
self.cols = cols
self.normalize = normalize
self.handle_unknown = handle_unknown
self._frequencies: dict[str, dict[Any, float]] = {}
self._target_cols: list[str] = []
self._n_samples: int = 0
[docs]
def fit(self, X, y=None, **fit_params) -> FrequencyEncoder:
"""Compute frequencies from training data."""
lf = self._to_lazyframe(X, store_metadata=True)
df = lf.collect()
self._n_samples = len(df)
if self.cols is None:
# For numpy arrays, encode all columns by default
if isinstance(X, np.ndarray):
self._target_cols = list(df.columns)
else:
self._target_cols = infer_categorical_columns(X)
else:
self._target_cols = [c for c in self.cols if c in df.columns]
for col in self._target_cols:
col_data = df[col].to_numpy().flatten() # Ensure 1D
unique, counts = np.unique(col_data, return_counts=True)
if self.normalize:
frequencies = counts / self._n_samples
else:
frequencies = counts.astype(float)
self._frequencies[col] = dict(zip(unique, frequencies))
self._is_fitted = True
return self
[docs]
def transform(self, X) -> Any:
"""Apply frequency encoding."""
self._check_is_fitted()
lf = self._to_lazyframe(X)
df = lf.collect()
result_dict = {}
for col in df.columns:
if col in self._target_cols:
col_data = df[col].to_numpy().flatten() # Ensure 1D
if self.handle_unknown == "zero":
default = 0.0
elif self.handle_unknown == "nan":
default = np.nan
else:
default = None
encoded = np.zeros(len(col_data), dtype=np.float64)
for i, v in enumerate(col_data):
if v in self._frequencies[col]:
encoded[i] = self._frequencies[col][v]
elif default is not None:
encoded[i] = default
else:
raise ValueError(f"Unknown value '{v}' in column '{col}'")
result_dict[col] = encoded
else:
col_arr = df[col].to_numpy()
# Ensure 1D array
if col_arr.ndim > 1:
col_arr = col_arr.flatten()
result_dict[col] = col_arr
result_df = pl.DataFrame(result_dict)
return self._from_lazyframe(result_df.lazy())