Source code for endgame.timeseries.baselines

from __future__ import annotations

"""Simple forecasting baselines.

These baseline models are essential for:
1. Establishing benchmark performance
2. Sanity checking more complex models
3. Fast prototyping and feature engineering validation
4. Kaggle competitions where simple models often outperform complex ones

All baselines are implemented from scratch with no external dependencies
beyond numpy, making them lightweight and fast.
"""

from typing import Any, Literal

import numpy as np

from endgame.timeseries.base import (
    BaseForecaster,
    UnivariateForecasterMixin,
    validate_forecast_input,
)


[docs] class NaiveForecaster(BaseForecaster, UnivariateForecasterMixin): """Naive forecaster using simple strategies. Implements several naive forecasting strategies that serve as strong baselines, especially for random walk-like data. Parameters ---------- strategy : str, default="last" Forecasting strategy: - "last": Predict the last observed value (random walk) - "mean": Predict the mean of the training data - "median": Predict the median of the training data random_state : int, optional Random seed (unused, for API consistency). verbose : bool, default=False Enable verbose output. Examples -------- >>> model = NaiveForecaster(strategy="last") >>> model.fit(y_train) >>> forecast = model.predict(horizon=7) Notes ----- The "last" strategy is equivalent to a random walk model without drift and often performs surprisingly well on financial and economic data. """ def __init__( self, strategy: Literal["last", "mean", "median"] = "last", random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.strategy = strategy self._forecast_value: float | None = None
[docs] def fit( self, y: Any, X: Any | None = None, **fit_params, ) -> NaiveForecaster: """Fit the naive forecaster. Parameters ---------- y : array-like of shape (n_samples,) Training time series. X : ignored Exogenous features (not used). Returns ------- self Fitted forecaster. """ y, _ = validate_forecast_input(y, X=None) y = self._validate_univariate(y) self.y_ = y.copy() self.n_samples_ = len(y) # Compute forecast value based on strategy if self.strategy == "last": self._forecast_value = y[-1] elif self.strategy == "mean": self._forecast_value = np.mean(y) elif self.strategy == "median": self._forecast_value = np.median(y) else: raise ValueError(f"Unknown strategy: {self.strategy}") self._log(f"Fitted with {self.strategy} strategy, value={self._forecast_value:.4f}") self.is_fitted_ = True return self
[docs] def predict( self, horizon: int, X: Any | None = None, ) -> np.ndarray: """Generate naive forecasts. Parameters ---------- horizon : int Number of steps ahead to forecast. X : ignored Exogenous features (not used). Returns ------- np.ndarray of shape (horizon,) Constant forecasts. """ self._check_is_fitted() horizon = self._validate_horizon(horizon) return np.full(horizon, self._forecast_value)
[docs] def predict_interval( self, horizon: int, coverage: float = 0.95, X: Any | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Generate prediction intervals using residual-based approach. Parameters ---------- horizon : int Forecast horizon. coverage : float, default=0.95 Coverage probability. X : ignored Not used. Returns ------- tuple (point_forecast, lower, upper) """ self._check_is_fitted() horizon = self._validate_horizon(horizon) point = self.predict(horizon) # Estimate variance from in-sample residuals if self.strategy == "last": # For random walk, residuals are first differences residuals = np.diff(self.y_) else: residuals = self.y_ - self._forecast_value std = np.std(residuals) # Prediction intervals widen with horizon for random walk alpha = 1 - coverage z = -np.percentile(np.random.standard_normal(10000), alpha / 2 * 100) if self.strategy == "last": # Variance grows linearly with horizon for random walk horizon_factors = np.sqrt(np.arange(1, horizon + 1)) else: horizon_factors = np.ones(horizon) margin = z * std * horizon_factors lower = point - margin upper = point + margin return point, lower, upper
[docs] def get_fitted_values(self) -> np.ndarray: """Get in-sample fitted values. For naive forecaster, fitted values are lagged actuals (strategy=last) or constant values (strategy=mean/median). """ self._check_is_fitted() if self.strategy == "last": # One-step-ahead forecasts are lagged values fitted = np.empty(self.n_samples_) fitted[0] = np.nan fitted[1:] = self.y_[:-1] else: fitted = np.full(self.n_samples_, self._forecast_value) return fitted
[docs] class SeasonalNaiveForecaster(BaseForecaster, UnivariateForecasterMixin): """Seasonal naive forecaster. Predicts using values from the same season in the previous cycle. Essential baseline for data with strong seasonality. Parameters ---------- seasonal_period : int, default=1 Length of seasonal cycle (e.g., 7 for weekly, 12 for monthly, 24 for hourly). If None, attempts to auto-detect from data. random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Examples -------- >>> # Weekly seasonality >>> model = SeasonalNaiveForecaster(seasonal_period=7) >>> model.fit(daily_sales) >>> forecast = model.predict(horizon=14) # Next two weeks """ def __init__( self, seasonal_period: int = 1, random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.seasonal_period = seasonal_period self._last_season: np.ndarray | None = None
[docs] def fit( self, y: Any, X: Any | None = None, **fit_params, ) -> SeasonalNaiveForecaster: """Fit the seasonal naive forecaster. Parameters ---------- y : array-like of shape (n_samples,) Training time series. X : ignored Not used. Returns ------- self Fitted forecaster. """ y, _ = validate_forecast_input(y, X=None) y = self._validate_univariate(y) self.y_ = y.copy() self.n_samples_ = len(y) sp = self.seasonal_period if sp < 1: raise ValueError(f"seasonal_period must be >= 1, got {sp}") if len(y) < sp: raise ValueError( f"Time series length ({len(y)}) must be >= seasonal_period ({sp})" ) # Store the last complete season self._last_season = y[-sp:] self._log(f"Fitted with seasonal_period={sp}") self.is_fitted_ = True return self
[docs] def predict( self, horizon: int, X: Any | None = None, ) -> np.ndarray: """Generate seasonal naive forecasts. Parameters ---------- horizon : int Number of steps ahead to forecast. X : ignored Not used. Returns ------- np.ndarray of shape (horizon,) Seasonal forecasts. """ self._check_is_fitted() horizon = self._validate_horizon(horizon) sp = self.seasonal_period n_cycles = (horizon + sp - 1) // sp # Tile the last season and trim to horizon forecast = np.tile(self._last_season, n_cycles)[:horizon] return forecast
[docs] def predict_interval( self, horizon: int, coverage: float = 0.95, X: Any | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Generate prediction intervals.""" self._check_is_fitted() horizon = self._validate_horizon(horizon) point = self.predict(horizon) sp = self.seasonal_period # Compute seasonal residuals n_complete = self.n_samples_ - sp residuals = self.y_[sp:] - self.y_[:-sp] std = np.std(residuals) alpha = 1 - coverage z = -np.percentile(np.random.standard_normal(10000), alpha / 2 * 100) # Variance grows with number of complete cycles cycle_idx = np.arange(horizon) // sp + 1 margin = z * std * np.sqrt(cycle_idx) return point, point - margin, point + margin
[docs] def get_fitted_values(self) -> np.ndarray: """Get in-sample fitted values (lag-sp values).""" self._check_is_fitted() sp = self.seasonal_period fitted = np.empty(self.n_samples_) fitted[:sp] = np.nan fitted[sp:] = self.y_[:-sp] return fitted
[docs] class MovingAverageForecaster(BaseForecaster, UnivariateForecasterMixin): """Moving average forecaster. Predicts using simple or weighted moving average of recent observations. Effective for smoothing noise and capturing local trends. Parameters ---------- window : int, default=5 Number of recent observations to average. weights : array-like, optional Custom weights for each lag. If None, uses equal weights. Weights are applied in reverse order (most recent first). center : bool, default=False If True, use centered moving average (for decomposition). Only affects fitted values, not predictions. random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Examples -------- >>> # Simple moving average >>> model = MovingAverageForecaster(window=7) >>> model.fit(y_train) >>> forecast = model.predict(horizon=30) >>> # Weighted moving average (recent values weighted more) >>> model = MovingAverageForecaster(window=5, weights=[1, 2, 3, 4, 5]) """ def __init__( self, window: int = 5, weights: list[float] | None = None, center: bool = False, random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.window = window self.weights = weights self.center = center self._ma_value: float | None = None self._normalized_weights: np.ndarray | None = None
[docs] def fit( self, y: Any, X: Any | None = None, **fit_params, ) -> MovingAverageForecaster: """Fit the moving average forecaster. Parameters ---------- y : array-like Training time series. X : ignored Not used. Returns ------- self Fitted forecaster. """ y, _ = validate_forecast_input(y, X=None) y = self._validate_univariate(y) self.y_ = y.copy() self.n_samples_ = len(y) if self.window < 1: raise ValueError(f"window must be >= 1, got {self.window}") if len(y) < self.window: raise ValueError( f"Time series length ({len(y)}) must be >= window ({self.window})" ) # Normalize weights if self.weights is not None: weights = np.asarray(self.weights, dtype=float) if len(weights) != self.window: raise ValueError( f"weights length ({len(weights)}) must equal window ({self.window})" ) self._normalized_weights = weights / weights.sum() else: self._normalized_weights = np.ones(self.window) / self.window # Compute moving average of last window observations self._ma_value = np.dot(self._normalized_weights, y[-self.window:]) self._log(f"Fitted MA({self.window}), value={self._ma_value:.4f}") self.is_fitted_ = True return self
[docs] def predict( self, horizon: int, X: Any | None = None, ) -> np.ndarray: """Generate moving average forecasts. Note: For multi-step forecasts, this uses a recursive approach where each forecast becomes input for the next. Parameters ---------- horizon : int Number of steps ahead to forecast. X : ignored Not used. Returns ------- np.ndarray of shape (horizon,) Forecasts. """ self._check_is_fitted() horizon = self._validate_horizon(horizon) # Use recursive forecasting forecast = np.empty(horizon) history = np.concatenate([self.y_[-self.window:], np.zeros(horizon)]) for h in range(horizon): window_data = history[h:h + self.window] forecast[h] = np.dot(self._normalized_weights, window_data) history[self.window + h] = forecast[h] return forecast
[docs] def predict_interval( self, horizon: int, coverage: float = 0.95, X: Any | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Generate prediction intervals.""" self._check_is_fitted() horizon = self._validate_horizon(horizon) point = self.predict(horizon) # Compute residuals from in-sample MA fitted = self.get_fitted_values() valid_mask = ~np.isnan(fitted) residuals = self.y_[valid_mask] - fitted[valid_mask] std = np.std(residuals) alpha = 1 - coverage z = -np.percentile(np.random.standard_normal(10000), alpha / 2 * 100) # Intervals widen with horizon (approximate) horizon_factors = np.sqrt(1 + np.arange(horizon) * 0.1) margin = z * std * horizon_factors return point, point - margin, point + margin
[docs] def get_fitted_values(self) -> np.ndarray: """Get in-sample fitted values.""" self._check_is_fitted() fitted = np.empty(self.n_samples_) fitted[:self.window - 1] = np.nan for i in range(self.window - 1, self.n_samples_): window_data = self.y_[i - self.window + 1:i + 1] fitted[i] = np.dot(self._normalized_weights, window_data) return fitted
[docs] class ExponentialSmoothingForecaster(BaseForecaster, UnivariateForecasterMixin): """Simple exponential smoothing forecaster. Implements single exponential smoothing (SES) for level forecasting and double exponential smoothing (Holt's method) for trend. Parameters ---------- alpha : float, default=0.3 Smoothing parameter for level (0 < alpha < 1). Higher values give more weight to recent observations. beta : float, optional Smoothing parameter for trend (0 < beta < 1). If None, uses simple exponential smoothing without trend. optimize : bool, default=False If True, optimize alpha (and beta) using grid search. initial_method : str, default="heuristic" Method for initializing level (and trend): - "heuristic": Use first few observations - "estimated": Estimate from training data random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Examples -------- >>> # Simple exponential smoothing >>> model = ExponentialSmoothingForecaster(alpha=0.3) >>> model.fit(y_train) >>> forecast = model.predict(horizon=10) >>> # Holt's linear trend method >>> model = ExponentialSmoothingForecaster(alpha=0.3, beta=0.1) >>> # Optimize parameters >>> model = ExponentialSmoothingForecaster(optimize=True) """ def __init__( self, alpha: float = 0.3, beta: float | None = None, optimize: bool = False, initial_method: Literal["heuristic", "estimated"] = "heuristic", random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.alpha = alpha self.beta = beta self.optimize = optimize self.initial_method = initial_method # Fitted parameters self.alpha_: float | None = None self.beta_: float | None = None self._level: float | None = None self._trend: float | None = None self._fitted_levels: np.ndarray | None = None self._fitted_trends: np.ndarray | None = None def _initialize_level_trend(self, y: np.ndarray) -> tuple[float, float | None]: """Initialize level and trend components.""" if self.initial_method == "heuristic": level = y[0] trend = (y[min(len(y) - 1, 9)] - y[0]) / min(len(y) - 1, 9) if self.beta is not None else None else: # estimated level = np.mean(y[:min(len(y), 10)]) if self.beta is not None: # Simple trend estimate n = min(len(y), 10) x = np.arange(n) trend = np.polyfit(x, y[:n], 1)[0] else: trend = None return level, trend def _smooth( self, y: np.ndarray, alpha: float, beta: float | None, ) -> tuple[np.ndarray, np.ndarray | None, float, float | None]: """Apply exponential smoothing.""" n = len(y) levels = np.empty(n) trends = np.empty(n) if beta is not None else None level, trend = self._initialize_level_trend(y) for t in range(n): prev_level = level if beta is not None: # Holt's method level = alpha * y[t] + (1 - alpha) * (level + trend) trend = beta * (level - prev_level) + (1 - beta) * trend trends[t] = trend else: # Simple exponential smoothing level = alpha * y[t] + (1 - alpha) * level levels[t] = level return levels, trends, level, trend def _compute_sse( self, y: np.ndarray, alpha: float, beta: float | None, ) -> float: """Compute sum of squared errors for parameter optimization.""" levels, trends, _, _ = self._smooth(y, alpha, beta) # One-step-ahead forecasts fitted = np.empty(len(y)) fitted[0] = y[0] # First fitted value if beta is not None: for t in range(1, len(y)): fitted[t] = levels[t - 1] + trends[t - 1] else: fitted[1:] = levels[:-1] sse = np.sum((y - fitted) ** 2) return sse def _optimize_parameters(self, y: np.ndarray) -> tuple[float, float | None]: """Optimize smoothing parameters via grid search.""" best_alpha = self.alpha best_beta = self.beta best_sse = float('inf') alphas = np.linspace(0.01, 0.99, 20) if self.beta is not None: betas = np.linspace(0.01, 0.99, 20) for a in alphas: for b in betas: sse = self._compute_sse(y, a, b) if sse < best_sse: best_sse = sse best_alpha = a best_beta = b else: for a in alphas: sse = self._compute_sse(y, a, None) if sse < best_sse: best_sse = sse best_alpha = a return best_alpha, best_beta
[docs] def fit( self, y: Any, X: Any | None = None, **fit_params, ) -> ExponentialSmoothingForecaster: """Fit the exponential smoothing forecaster. Parameters ---------- y : array-like Training time series. X : ignored Not used. Returns ------- self Fitted forecaster. """ y, _ = validate_forecast_input(y, X=None) y = self._validate_univariate(y) self.y_ = y.copy() self.n_samples_ = len(y) # Validate parameters if not 0 < self.alpha < 1: raise ValueError(f"alpha must be in (0, 1), got {self.alpha}") if self.beta is not None and not 0 < self.beta < 1: raise ValueError(f"beta must be in (0, 1), got {self.beta}") # Optimize if requested if self.optimize: self.alpha_, self.beta_ = self._optimize_parameters(y) self._log(f"Optimized: alpha={self.alpha_:.4f}, beta={self.beta_}") else: self.alpha_ = self.alpha self.beta_ = self.beta # Fit model self._fitted_levels, self._fitted_trends, self._level, self._trend = \ self._smooth(y, self.alpha_, self.beta_) self._log(f"Fitted SES(alpha={self.alpha_:.2f}), level={self._level:.4f}") self.is_fitted_ = True return self
[docs] def predict( self, horizon: int, X: Any | None = None, ) -> np.ndarray: """Generate exponential smoothing forecasts. Parameters ---------- horizon : int Number of steps ahead to forecast. X : ignored Not used. Returns ------- np.ndarray of shape (horizon,) Forecasts. """ self._check_is_fitted() horizon = self._validate_horizon(horizon) if self._trend is not None: # Holt's method: forecast = level + h * trend h = np.arange(1, horizon + 1) forecast = self._level + h * self._trend else: # Simple ES: flat forecast forecast = np.full(horizon, self._level) return forecast
[docs] def predict_interval( self, horizon: int, coverage: float = 0.95, X: Any | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Generate prediction intervals.""" self._check_is_fitted() horizon = self._validate_horizon(horizon) point = self.predict(horizon) # Compute residuals fitted = self.get_fitted_values() valid_mask = ~np.isnan(fitted) residuals = self.y_[valid_mask] - fitted[valid_mask] std = np.std(residuals) alpha = 1 - coverage z = -np.percentile(np.random.standard_normal(10000), alpha / 2 * 100) # Variance formula for SES a = self.alpha_ h = np.arange(1, horizon + 1) if self._trend is None: # SES variance factor var_factor = 1 + (h - 1) * a ** 2 else: # Holt's variance (approximate) b = self.beta_ var_factor = 1 + (h - 1) * (a ** 2 + a * b * h + b ** 2 * h * (2 * h - 1) / 6) margin = z * std * np.sqrt(var_factor) return point, point - margin, point + margin
[docs] def get_fitted_values(self) -> np.ndarray: """Get in-sample fitted values (one-step-ahead forecasts).""" self._check_is_fitted() fitted = np.empty(self.n_samples_) fitted[0] = np.nan if self._fitted_trends is not None: fitted[1:] = self._fitted_levels[:-1] + self._fitted_trends[:-1] else: fitted[1:] = self._fitted_levels[:-1] return fitted
[docs] class DriftForecaster(BaseForecaster, UnivariateForecasterMixin): """Drift (random walk with drift) forecaster. Predicts using the last value plus average historical change. Equivalent to fitting a line between first and last observations. Parameters ---------- random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Examples -------- >>> model = DriftForecaster() >>> model.fit(y_train) >>> forecast = model.predict(horizon=10) Notes ----- The drift method is equivalent to: y_hat[t+h] = y[t] + h * (y[t] - y[1]) / (t - 1) This is a useful baseline for trending data where you expect the trend to continue. """ def __init__( self, random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self._drift: float | None = None self._last_value: float | None = None
[docs] def fit( self, y: Any, X: Any | None = None, **fit_params, ) -> DriftForecaster: """Fit the drift forecaster. Parameters ---------- y : array-like Training time series. X : ignored Not used. Returns ------- self Fitted forecaster. """ y, _ = validate_forecast_input(y, X=None) y = self._validate_univariate(y) self.y_ = y.copy() self.n_samples_ = len(y) if len(y) < 2: raise ValueError("Need at least 2 observations for drift method") # Drift = average change per period self._drift = (y[-1] - y[0]) / (len(y) - 1) self._last_value = y[-1] self._log(f"Fitted with drift={self._drift:.6f}") self.is_fitted_ = True return self
[docs] def predict( self, horizon: int, X: Any | None = None, ) -> np.ndarray: """Generate drift forecasts. Parameters ---------- horizon : int Number of steps ahead to forecast. X : ignored Not used. Returns ------- np.ndarray of shape (horizon,) Forecasts. """ self._check_is_fitted() horizon = self._validate_horizon(horizon) h = np.arange(1, horizon + 1) forecast = self._last_value + h * self._drift return forecast
[docs] def predict_interval( self, horizon: int, coverage: float = 0.95, X: Any | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Generate prediction intervals.""" self._check_is_fitted() horizon = self._validate_horizon(horizon) point = self.predict(horizon) # Residuals from drift model (detrended differences) fitted = self.get_fitted_values() valid_mask = ~np.isnan(fitted) residuals = self.y_[valid_mask] - fitted[valid_mask] std = np.std(residuals) alpha = 1 - coverage z = -np.percentile(np.random.standard_normal(10000), alpha / 2 * 100) # Variance grows linearly with horizon for drift h = np.arange(1, horizon + 1) n = self.n_samples_ var_factor = np.sqrt(h * (1 + h / n)) margin = z * std * var_factor return point, point - margin, point + margin
[docs] def get_fitted_values(self) -> np.ndarray: """Get in-sample fitted values.""" self._check_is_fitted() fitted = np.empty(self.n_samples_) fitted[0] = np.nan # Each fitted value = previous actual + drift fitted[1:] = self.y_[:-1] + self._drift return fitted
[docs] class ThetaForecaster(BaseForecaster, UnivariateForecasterMixin): """Theta method forecaster. Implements the Theta method which won the M3 forecasting competition. Decomposes series into two theta lines and combines their forecasts. Parameters ---------- theta : float, default=2.0 Theta parameter. theta=0 gives linear regression, theta=2 is the standard Theta method. seasonal_period : int, optional If provided, applies seasonal adjustment before forecasting. random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Examples -------- >>> model = ThetaForecaster(theta=2.0) >>> model.fit(y_train) >>> forecast = model.predict(horizon=12) Notes ----- The Theta method: 1. Decomposes series into theta=0 (linear trend) and theta=2 (amplified curvature) 2. Forecasts theta=0 using linear regression 3. Forecasts theta=2 using simple exponential smoothing 4. Combines forecasts (equal weights) This simple method often outperforms complex approaches on many datasets. References ---------- Assimakopoulos, V., & Nikolopoulos, K. (2000). The theta model: a decomposition approach to forecasting. International Journal of Forecasting, 16(4), 521-530. """ def __init__( self, theta: float = 2.0, seasonal_period: int | None = None, random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.theta = theta self.seasonal_period = seasonal_period # Fitted components self._drift: float | None = None self._ses_level: float | None = None self._ses_alpha: float = 0.5 # Could optimize self._seasonal_factors: np.ndarray | None = None self._deseasonalized: np.ndarray | None = None def _deseasonalize(self, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Remove seasonal component using classical decomposition.""" sp = self.seasonal_period n = len(y) # Compute seasonal indices using moving average if n < 2 * sp: return y, np.ones(sp) # Centered moving average for trend ma = np.convolve(y, np.ones(sp) / sp, mode='valid') # Adjust for even seasonal periods if sp % 2 == 0: ma = (ma[:-1] + ma[1:]) / 2 # Pad MA to original length pad_left = (n - len(ma)) // 2 pad_right = n - len(ma) - pad_left trend = np.concatenate([ np.full(pad_left, ma[0]), ma, np.full(pad_right, ma[-1]) ]) # Seasonal = actual / trend (multiplicative) # Use additive for simplicity: seasonal = actual - trend seasonal_component = y - trend # Average seasonal factors factors = np.zeros(sp) counts = np.zeros(sp) for i in range(n): factors[i % sp] += seasonal_component[i] counts[i % sp] += 1 factors = factors / np.maximum(counts, 1) # Normalize to sum to zero factors = factors - np.mean(factors) # Deseasonalize seasonal_indices = np.array([factors[i % sp] for i in range(n)]) deseasonalized = y - seasonal_indices return deseasonalized, factors
[docs] def fit( self, y: Any, X: Any | None = None, **fit_params, ) -> ThetaForecaster: """Fit the Theta forecaster. Parameters ---------- y : array-like Training time series. X : ignored Not used. Returns ------- self Fitted forecaster. """ y, _ = validate_forecast_input(y, X=None) y = self._validate_univariate(y) self.y_ = y.copy() self.n_samples_ = len(y) if len(y) < 3: raise ValueError("Need at least 3 observations for Theta method") # Deseasonalize if needed if self.seasonal_period is not None and self.seasonal_period > 1: y_adj, self._seasonal_factors = self._deseasonalize(y) self._deseasonalized = y_adj else: y_adj = y self._seasonal_factors = None self._deseasonalized = y n = len(y_adj) # Fit linear trend (theta=0 line) x = np.arange(n) slope, intercept = np.polyfit(x, y_adj, 1) self._drift = slope # Fit SES on theta=2 line # theta=2 line: z = theta * y - (theta-1) * linear_trend theta_2_line = self.theta * y_adj - (self.theta - 1) * (intercept + slope * x) # Simple exponential smoothing on theta_2 line level = theta_2_line[0] for t in range(1, n): level = self._ses_alpha * theta_2_line[t] + (1 - self._ses_alpha) * level self._ses_level = level self._log(f"Fitted Theta(theta={self.theta}), drift={self._drift:.6f}") self.is_fitted_ = True return self
[docs] def predict( self, horizon: int, X: Any | None = None, ) -> np.ndarray: """Generate Theta forecasts. Parameters ---------- horizon : int Number of steps ahead to forecast. X : ignored Not used. Returns ------- np.ndarray of shape (horizon,) Forecasts. """ self._check_is_fitted() horizon = self._validate_horizon(horizon) n = self.n_samples_ h = np.arange(1, horizon + 1) # Linear trend forecast (theta=0) x_last = n - 1 y_last = self._deseasonalized[-1] linear_forecast = y_last + self._drift * h # SES forecast (theta=2) - flat ses_forecast = np.full(horizon, self._ses_level) # Combine: average of theta=0 and theta=2 forecasts forecast = (linear_forecast + ses_forecast) / 2 # Re-seasonalize if self._seasonal_factors is not None: sp = self.seasonal_period for i in range(horizon): forecast[i] += self._seasonal_factors[(n + i) % sp] return forecast
[docs] def get_fitted_values(self) -> np.ndarray: """Get in-sample fitted values.""" self._check_is_fitted() # For simplicity, use one-step-ahead from combined model fitted = np.empty(self.n_samples_) fitted[0] = np.nan y_adj = self._deseasonalized # One-step-ahead forecasts level = y_adj[0] for t in range(1, self.n_samples_): # Linear component linear_pred = y_adj[t - 1] + self._drift # SES component theta_2_val = self.theta * y_adj[t - 1] - (self.theta - 1) * ( y_adj[0] + self._drift * (t - 1) ) level = self._ses_alpha * theta_2_val + (1 - self._ses_alpha) * level # Combined fitted[t] = (linear_pred + level) / 2 # Re-seasonalize if self._seasonal_factors is not None: fitted[t] += self._seasonal_factors[t % self.seasonal_period] return fitted