Source code for endgame.preprocessing.temporal

from __future__ import annotations

"""Temporal feature extraction for time series and datetime columns."""

from collections.abc import Sequence
from typing import Any

import numpy as np

from endgame.core.base import PolarsTransformer
from endgame.core.polars_ops import HAS_POLARS

if HAS_POLARS:
    import polars as pl


[docs] class TemporalFeatures(PolarsTransformer): """Extracts temporal features from datetime columns. Generates comprehensive datetime features including cyclical encodings for periodic patterns. Features generated: - Basic: year, month, day, dayofweek, hour, minute, second - Boolean: is_weekend, is_month_start, is_month_end, is_year_start, is_year_end - Derived: quarter, week_of_year, day_of_year - Cyclical: sin/cos encodings for month, day, hour, dayofweek Parameters ---------- datetime_cols : List[str], optional Datetime columns to extract features from. If None, auto-detects datetime columns. features : List[str], optional Features to extract. If None, extracts all. Options: 'year', 'month', 'day', 'dayofweek', 'hour', 'minute', 'second', 'is_weekend', 'quarter', 'week_of_year', 'day_of_year', 'is_month_start', 'is_month_end', 'cyclical'. cyclical : bool, default=True Whether to add cyclical (sin/cos) encodings. drop_original : bool, default=False Whether to drop the original datetime columns. Examples -------- >>> tf = TemporalFeatures(cyclical=True) >>> X_temporal = tf.fit_transform(X) """ def __init__( self, datetime_cols: list[str] | None = None, features: list[str] | None = None, cyclical: bool = True, drop_original: bool = False, output_format: str = "auto", random_state: int | None = None, verbose: bool = False, ): super().__init__( output_format=output_format, random_state=random_state, verbose=verbose, ) self.datetime_cols = datetime_cols self.features = features self.cyclical = cyclical self.drop_original = drop_original self._target_cols: list[str] = [] self._new_feature_names: list[str] = [] def _detect_datetime_cols(self, df: pl.DataFrame) -> list[str]: """Auto-detect datetime columns.""" datetime_cols = [] for col in df.columns: dtype = df[col].dtype if dtype in (pl.Datetime, pl.Date): datetime_cols.append(col) return datetime_cols
[docs] def fit(self, X, y=None, **fit_params) -> TemporalFeatures: """Identify datetime columns.""" lf = self._to_lazyframe(X, store_metadata=True) df = lf.collect() if self.datetime_cols is None: self._target_cols = self._detect_datetime_cols(df) else: self._target_cols = [c for c in self.datetime_cols if c in df.columns] self._log(f"Found {len(self._target_cols)} datetime columns") self._is_fitted = True return self
[docs] def transform(self, X) -> Any: """Extract temporal features.""" self._check_is_fitted() lf = self._to_lazyframe(X) df = lf.collect() new_cols = [] self._new_feature_names = [] default_features = [ 'year', 'month', 'day', 'dayofweek', 'hour', 'is_weekend', 'quarter', 'week_of_year', 'day_of_year' ] features_to_extract = self.features or default_features for col in self._target_cols: dt_col = pl.col(col) # Basic features if 'year' in features_to_extract: new_cols.append(dt_col.dt.year().alias(f"{col}_year")) self._new_feature_names.append(f"{col}_year") if 'month' in features_to_extract: new_cols.append(dt_col.dt.month().alias(f"{col}_month")) self._new_feature_names.append(f"{col}_month") if 'day' in features_to_extract: new_cols.append(dt_col.dt.day().alias(f"{col}_day")) self._new_feature_names.append(f"{col}_day") if 'dayofweek' in features_to_extract: new_cols.append(dt_col.dt.weekday().alias(f"{col}_dayofweek")) self._new_feature_names.append(f"{col}_dayofweek") if 'hour' in features_to_extract: new_cols.append(dt_col.dt.hour().alias(f"{col}_hour")) self._new_feature_names.append(f"{col}_hour") if 'minute' in features_to_extract: new_cols.append(dt_col.dt.minute().alias(f"{col}_minute")) self._new_feature_names.append(f"{col}_minute") if 'second' in features_to_extract: new_cols.append(dt_col.dt.second().alias(f"{col}_second")) self._new_feature_names.append(f"{col}_second") # Boolean features if 'is_weekend' in features_to_extract: new_cols.append( (dt_col.dt.weekday() >= 5).cast(pl.Int8).alias(f"{col}_is_weekend") ) self._new_feature_names.append(f"{col}_is_weekend") if 'is_month_start' in features_to_extract: new_cols.append( (dt_col.dt.day() == 1).cast(pl.Int8).alias(f"{col}_is_month_start") ) self._new_feature_names.append(f"{col}_is_month_start") if 'is_month_end' in features_to_extract: # Approximate: day >= 28 new_cols.append( (dt_col.dt.day() >= 28).cast(pl.Int8).alias(f"{col}_is_month_end") ) self._new_feature_names.append(f"{col}_is_month_end") # Derived features if 'quarter' in features_to_extract: new_cols.append(dt_col.dt.quarter().alias(f"{col}_quarter")) self._new_feature_names.append(f"{col}_quarter") if 'week_of_year' in features_to_extract: new_cols.append(dt_col.dt.week().alias(f"{col}_week_of_year")) self._new_feature_names.append(f"{col}_week_of_year") if 'day_of_year' in features_to_extract: new_cols.append(dt_col.dt.ordinal_day().alias(f"{col}_day_of_year")) self._new_feature_names.append(f"{col}_day_of_year") # Cyclical encodings if self.cyclical or 'cyclical' in features_to_extract: # Month (1-12) month = dt_col.dt.month().cast(pl.Float64) new_cols.append( (2 * np.pi * month / 12).sin().alias(f"{col}_month_sin") ) new_cols.append( (2 * np.pi * month / 12).cos().alias(f"{col}_month_cos") ) self._new_feature_names.extend([f"{col}_month_sin", f"{col}_month_cos"]) # Day of week (0-6) dow = dt_col.dt.weekday().cast(pl.Float64) new_cols.append( (2 * np.pi * dow / 7).sin().alias(f"{col}_dow_sin") ) new_cols.append( (2 * np.pi * dow / 7).cos().alias(f"{col}_dow_cos") ) self._new_feature_names.extend([f"{col}_dow_sin", f"{col}_dow_cos"]) # Hour (0-23) hour = dt_col.dt.hour().cast(pl.Float64) new_cols.append( (2 * np.pi * hour / 24).sin().alias(f"{col}_hour_sin") ) new_cols.append( (2 * np.pi * hour / 24).cos().alias(f"{col}_hour_cos") ) self._new_feature_names.extend([f"{col}_hour_sin", f"{col}_hour_cos"]) # Day of month (1-31) day = dt_col.dt.day().cast(pl.Float64) new_cols.append( (2 * np.pi * day / 31).sin().alias(f"{col}_day_sin") ) new_cols.append( (2 * np.pi * day / 31).cos().alias(f"{col}_day_cos") ) self._new_feature_names.extend([f"{col}_day_sin", f"{col}_day_cos"]) if new_cols: result = df.with_columns(new_cols) else: result = df if self.drop_original: result = result.drop(self._target_cols) return self._from_lazyframe(result.lazy())
[docs] class LagFeatures(PolarsTransformer): """Generate lag features for time series data. Creates shifted versions of features to capture temporal dependencies. Parameters ---------- cols : List[str], optional Columns to create lags for. If None, uses all numeric columns. lags : List[int], default=[1, 2, 3] Lag periods to create. group_cols : List[str], optional Columns to group by when computing lags. fill_value : float, optional Value to fill NaN from lagging. If None, keeps NaN. Examples -------- >>> lf = LagFeatures(cols=['price'], lags=[1, 7, 30]) >>> X_lagged = lf.fit_transform(X) """ def __init__( self, cols: list[str] | None = None, lags: Sequence[int] = (1, 2, 3), group_cols: list[str] | None = None, fill_value: float | None = None, output_format: str = "auto", random_state: int | None = None, verbose: bool = False, ): super().__init__( output_format=output_format, random_state=random_state, verbose=verbose, ) self.cols = cols self.lags = list(lags) self.group_cols = group_cols self.fill_value = fill_value self._target_cols: list[str] = []
[docs] def fit(self, X, y=None, **fit_params) -> LagFeatures: """Identify columns to lag.""" lf = self._to_lazyframe(X, store_metadata=True) df = lf.collect() from endgame.core.polars_ops import infer_numeric_columns if self.cols is None: self._target_cols = infer_numeric_columns(X) else: self._target_cols = [c for c in self.cols if c in df.columns] self._is_fitted = True return self
[docs] def transform(self, X) -> Any: """Create lag features.""" self._check_is_fitted() lf = self._to_lazyframe(X) df = lf.collect() lag_cols = [] for col in self._target_cols: for lag in self.lags: if self.group_cols: lag_expr = pl.col(col).shift(lag).over(self.group_cols) else: lag_expr = pl.col(col).shift(lag) if self.fill_value is not None: lag_expr = lag_expr.fill_null(self.fill_value) lag_cols.append(lag_expr.alias(f"{col}_lag_{lag}")) if lag_cols: result = df.with_columns(lag_cols) else: result = df return self._from_lazyframe(result.lazy())
[docs] class RollingFeatures(PolarsTransformer): """Generate rolling window statistics. Creates rolling aggregations for time series data. Parameters ---------- cols : List[str], optional Columns to compute rolling stats for. windows : List[int], default=[3, 7, 14] Window sizes. methods : List[str], default=['mean', 'std'] Aggregation methods: 'mean', 'std', 'min', 'max', 'sum'. group_cols : List[str], optional Columns to group by. min_periods : int, default=1 Minimum observations in window required. Examples -------- >>> rf = RollingFeatures(cols=['price'], windows=[7, 30]) >>> X_rolling = rf.fit_transform(X) """ def __init__( self, cols: list[str] | None = None, windows: Sequence[int] = (3, 7, 14), methods: Sequence[str] = ("mean", "std"), group_cols: list[str] | None = None, min_periods: int = 1, output_format: str = "auto", random_state: int | None = None, verbose: bool = False, ): super().__init__( output_format=output_format, random_state=random_state, verbose=verbose, ) self.cols = cols self.windows = list(windows) self.methods = list(methods) self.group_cols = group_cols self.min_periods = min_periods self._target_cols: list[str] = []
[docs] def fit(self, X, y=None, **fit_params) -> RollingFeatures: """Identify columns for rolling statistics.""" lf = self._to_lazyframe(X, store_metadata=True) df = lf.collect() from endgame.core.polars_ops import infer_numeric_columns if self.cols is None: self._target_cols = infer_numeric_columns(X) else: self._target_cols = [c for c in self.cols if c in df.columns] self._is_fitted = True return self
[docs] def transform(self, X) -> Any: """Compute rolling features.""" self._check_is_fitted() lf = self._to_lazyframe(X) df = lf.collect() rolling_cols = [] for col in self._target_cols: for window in self.windows: for method in self.methods: # Build rolling expression if self.group_cols: base_expr = pl.col(col).over(self.group_cols) else: base_expr = pl.col(col) if method == "mean": roll_expr = base_expr.rolling_mean( window_size=window, min_periods=self.min_periods, ) elif method == "std": roll_expr = base_expr.rolling_std( window_size=window, min_periods=self.min_periods, ) elif method == "min": roll_expr = base_expr.rolling_min( window_size=window, min_periods=self.min_periods, ) elif method == "max": roll_expr = base_expr.rolling_max( window_size=window, min_periods=self.min_periods, ) elif method == "sum": roll_expr = base_expr.rolling_sum( window_size=window, min_periods=self.min_periods, ) else: raise ValueError(f"Unknown method: {method}") rolling_cols.append( roll_expr.alias(f"{col}_rolling_{method}_{window}") ) if rolling_cols: result = df.with_columns(rolling_cols) else: result = df return self._from_lazyframe(result.lazy())