from __future__ import annotations
"""Complexity and fractal dimension measures for signal analysis.
Provides sklearn-compatible feature extractors for:
- Fractal dimensions (Higuchi, Petrosian, Katz)
- Hurst exponent
- Detrended Fluctuation Analysis (DFA)
- Lempel-Ziv Complexity
These measures quantify the self-similarity and complexity of signals,
commonly used in EEG/biosignal analysis.
References
----------
- Higuchi (1988): Higuchi fractal dimension
- Petrosian (1995): Petrosian fractal dimension
- Katz (1988): Katz fractal dimension
- Hurst (1951): Hurst exponent
- Peng et al. (1994): Detrended fluctuation analysis
- Lempel & Ziv (1976): Lempel-Ziv complexity
"""
import numpy as np
from endgame.signal.base import (
BaseFeatureExtractor,
ensure_2d_signals,
)
[docs]
def higuchi_fd(x: np.ndarray, kmax: int = 10) -> float:
"""Compute Higuchi fractal dimension of a signal.
The Higuchi fractal dimension (HFD) estimates the fractal dimension
of a time series by analyzing the length of the curve at different
scales.
Parameters
----------
x : np.ndarray
1D signal.
kmax : int, default=10
Maximum scale factor.
Returns
-------
float
Higuchi fractal dimension (typically 1-2).
References
----------
Higuchi, T. (1988). Approach to an irregular time series on the basis
of the fractal theory. Physica D: Nonlinear Phenomena, 31(2), 277-283.
"""
x = np.asarray(x).flatten()
n = len(x)
if kmax > n // 2:
kmax = n // 2
# Compute curve length for each scale k
lk = np.zeros(kmax)
for k in range(1, kmax + 1):
# Average length over all starting points
lm = np.zeros(k)
for m in range(k):
# Number of points in this subseries
num_points = (n - m - 1) // k
if num_points > 0:
# Sum of absolute differences
diff_sum = 0.0
for i in range(1, num_points + 1):
idx1 = m + i * k
idx2 = m + (i - 1) * k
if idx1 < n:
diff_sum += abs(x[idx1] - x[idx2])
# Normalized length
lm[m] = (diff_sum * (n - 1)) / (k * num_points * k)
# Average over starting points
lk[k - 1] = np.mean(lm)
# Linear regression of log(L(k)) vs log(1/k)
k_vals = np.arange(1, kmax + 1)
valid = lk > 0
if np.sum(valid) < 2:
return np.nan
log_k = np.log(1.0 / k_vals[valid])
log_lk = np.log(lk[valid])
# Slope is the fractal dimension
slope, _ = np.polyfit(log_k, log_lk, 1)
return slope
[docs]
def petrosian_fd(x: np.ndarray) -> float:
"""Compute Petrosian fractal dimension of a signal.
The Petrosian fractal dimension (PFD) provides a fast estimate of
the fractal dimension based on the number of sign changes in the
first derivative.
Parameters
----------
x : np.ndarray
1D signal.
Returns
-------
float
Petrosian fractal dimension.
References
----------
Petrosian, A. (1995). Kolmogorov complexity of finite sequences and
recognition of different preictal EEG patterns.
"""
x = np.asarray(x).flatten()
n = len(x)
# First derivative
dx = np.diff(x)
# Number of sign changes (zero crossings of derivative)
n_delta = np.sum(dx[:-1] * dx[1:] < 0)
# Petrosian formula
if n_delta == 0:
return np.log10(n)
return np.log10(n) / (np.log10(n) + np.log10(n / (n + 0.4 * n_delta)))
[docs]
def katz_fd(x: np.ndarray) -> float:
"""Compute Katz fractal dimension of a signal.
The Katz fractal dimension estimates the complexity based on the
ratio of the total path length to the maximum distance from the
first point.
Parameters
----------
x : np.ndarray
1D signal.
Returns
-------
float
Katz fractal dimension.
References
----------
Katz, M. J. (1988). Fractals and the analysis of waveforms.
Computers in Biology and Medicine, 18(3), 145-156.
"""
x = np.asarray(x).flatten()
n = len(x)
# Total path length (sum of distances between consecutive points)
# Using unit time steps
dists = np.sqrt(1 + np.diff(x) ** 2)
L = np.sum(dists)
# Maximum distance from first point
indices = np.arange(n)
distances_from_start = np.sqrt(indices**2 + (x - x[0]) ** 2)
d = np.max(distances_from_start)
# Katz formula
if d == 0:
return np.nan
return np.log10(n - 1) / (np.log10(n - 1) + np.log10(d / L))
[docs]
def hurst_exponent(x: np.ndarray, max_lag: int | None = None) -> float:
"""Compute Hurst exponent of a signal using R/S analysis.
The Hurst exponent (H) measures the long-term memory of a time series:
- H < 0.5: Anti-persistent (mean-reverting)
- H = 0.5: Random walk (no memory)
- H > 0.5: Persistent (trending)
Parameters
----------
x : np.ndarray
1D signal.
max_lag : int, optional
Maximum lag for R/S calculation. If None, uses n//2.
Returns
-------
float
Hurst exponent (typically 0-1).
References
----------
Hurst, H. E. (1951). Long-term storage capacity of reservoirs.
Transactions of the American Society of Civil Engineers, 116, 770-799.
"""
x = np.asarray(x).flatten()
n = len(x)
if max_lag is None:
max_lag = n // 2
# Use powers of 2 for lags
lags = []
lag = 4
while lag <= max_lag:
lags.append(lag)
lag *= 2
if len(lags) < 2:
return np.nan
rs_values = []
for lag in lags:
# Split into non-overlapping segments
n_segments = n // lag
if n_segments == 0:
continue
rs_segment = []
for i in range(n_segments):
segment = x[i * lag : (i + 1) * lag]
# Mean-adjusted cumulative sum
mean_adj = segment - np.mean(segment)
cum_sum = np.cumsum(mean_adj)
# Range
R = np.max(cum_sum) - np.min(cum_sum)
# Standard deviation
S = np.std(segment, ddof=1)
if S > 0:
rs_segment.append(R / S)
if rs_segment:
rs_values.append(np.mean(rs_segment))
else:
rs_values.append(np.nan)
# Remove NaN values
valid = ~np.isnan(rs_values)
if np.sum(valid) < 2:
return np.nan
lags = np.array(lags)[valid]
rs_values = np.array(rs_values)[valid]
# Linear regression of log(R/S) vs log(lag)
log_lag = np.log(lags)
log_rs = np.log(rs_values)
slope, _ = np.polyfit(log_lag, log_rs, 1)
return slope
[docs]
def detrended_fluctuation(
x: np.ndarray,
scale_min: int = 4,
scale_max: int | None = None,
n_scales: int = 10,
) -> float:
"""Compute Detrended Fluctuation Analysis (DFA) exponent.
DFA measures the self-similarity in a non-stationary time series
by analyzing the fluctuations around local linear trends.
Parameters
----------
x : np.ndarray
1D signal.
scale_min : int, default=4
Minimum scale (window size).
scale_max : int, optional
Maximum scale. If None, uses n//4.
n_scales : int, default=10
Number of scales to evaluate.
Returns
-------
float
DFA exponent (alpha):
- alpha < 0.5: Anti-correlated
- alpha = 0.5: White noise
- alpha = 1.0: 1/f noise (pink noise)
- alpha = 1.5: Brownian noise
- alpha > 1.0: Non-stationary, unbounded
References
----------
Peng, C. K., et al. (1994). Mosaic organization of DNA nucleotides.
Physical Review E, 49(2), 1685.
"""
x = np.asarray(x).flatten()
n = len(x)
if scale_max is None:
scale_max = n // 4
if scale_max < scale_min:
return np.nan
# Generate log-spaced scales
scales = np.unique(
np.logspace(np.log10(scale_min), np.log10(scale_max), n_scales).astype(int)
)
# Integrate the signal (cumulative sum of mean-adjusted signal)
y = np.cumsum(x - np.mean(x))
fluctuations = []
for scale in scales:
# Number of segments
n_segments = n // scale
if n_segments < 1:
fluctuations.append(np.nan)
continue
# Compute fluctuation for each segment
f2_segments = []
for i in range(n_segments):
segment = y[i * scale : (i + 1) * scale]
t = np.arange(len(segment))
# Fit linear trend and compute residuals
coeffs = np.polyfit(t, segment, 1)
trend = np.polyval(coeffs, t)
residuals = segment - trend
# Mean square fluctuation
f2_segments.append(np.mean(residuals**2))
# RMS fluctuation for this scale
fluctuations.append(np.sqrt(np.mean(f2_segments)))
# Remove NaN values
valid = ~np.isnan(fluctuations)
if np.sum(valid) < 2:
return np.nan
scales = scales[valid]
fluctuations = np.array(fluctuations)[valid]
# Linear regression of log(F) vs log(scale)
log_scale = np.log(scales)
log_fluct = np.log(fluctuations)
slope, _ = np.polyfit(log_scale, log_fluct, 1)
return slope
[docs]
def lempel_ziv_complexity(x: np.ndarray, threshold: float | None = None) -> float:
"""Compute Lempel-Ziv complexity of a signal.
LZC measures the complexity of a signal by counting the number of
distinct patterns found during sequential parsing.
Parameters
----------
x : np.ndarray
1D signal.
threshold : float, optional
Threshold for binarization. If None, uses median.
Returns
-------
float
Normalized Lempel-Ziv complexity (0-1).
References
----------
Lempel, A., & Ziv, J. (1976). On the complexity of finite sequences.
IEEE Transactions on Information Theory, 22(1), 75-81.
"""
x = np.asarray(x).flatten()
n = len(x)
# Binarize the signal
if threshold is None:
threshold = np.median(x)
binary = (x >= threshold).astype(int)
# Convert to string for easier pattern matching
s = "".join(map(str, binary))
# Lempel-Ziv parsing
complexity = 1
i = 0
prefix_len = 1
while prefix_len + i < n:
# Check if the next substring is in the prefix
if s[i : i + prefix_len + 1] in s[:i + prefix_len]:
prefix_len += 1
else:
complexity += 1
i += prefix_len
prefix_len = 1
# Normalize by theoretical maximum (n / log2(n))
if n > 1:
max_complexity = n / np.log2(n)
normalized = complexity / max_complexity
else:
normalized = 0.0
return normalized
[docs]
class HiguchiFD(BaseFeatureExtractor):
"""Higuchi fractal dimension feature extractor.
Parameters
----------
fs : float, optional
Sample rate in Hz.
kmax : int, default=10
Maximum scale factor.
Examples
--------
>>> hfd = HiguchiFD(kmax=10)
>>> features = hfd.fit_transform(signals)
"""
def __init__(self, fs: float | None = None, kmax: int = 10):
super().__init__(fs=fs)
self.kmax = kmax
[docs]
def fit(self, X, y=None, **fit_params) -> HiguchiFD:
X = self._validate_signal(X)
super().fit(X, y, **fit_params)
self.feature_names_ = ["higuchi_fd"]
return self
[docs]
class PetrosianFD(BaseFeatureExtractor):
"""Petrosian fractal dimension feature extractor.
Parameters
----------
fs : float, optional
Sample rate in Hz.
Examples
--------
>>> pfd = PetrosianFD()
>>> features = pfd.fit_transform(signals)
"""
def __init__(self, fs: float | None = None):
super().__init__(fs=fs)
[docs]
def fit(self, X, y=None, **fit_params) -> PetrosianFD:
X = self._validate_signal(X)
super().fit(X, y, **fit_params)
self.feature_names_ = ["petrosian_fd"]
return self
[docs]
class KatzFD(BaseFeatureExtractor):
"""Katz fractal dimension feature extractor.
Parameters
----------
fs : float, optional
Sample rate in Hz.
Examples
--------
>>> kfd = KatzFD()
>>> features = kfd.fit_transform(signals)
"""
def __init__(self, fs: float | None = None):
super().__init__(fs=fs)
[docs]
def fit(self, X, y=None, **fit_params) -> KatzFD:
X = self._validate_signal(X)
super().fit(X, y, **fit_params)
self.feature_names_ = ["katz_fd"]
return self
[docs]
class HurstExponent(BaseFeatureExtractor):
"""Hurst exponent feature extractor.
Parameters
----------
fs : float, optional
Sample rate in Hz.
max_lag : int, optional
Maximum lag for R/S calculation.
Examples
--------
>>> he = HurstExponent()
>>> features = he.fit_transform(signals)
"""
def __init__(self, fs: float | None = None, max_lag: int | None = None):
super().__init__(fs=fs)
self.max_lag = max_lag
[docs]
def fit(self, X, y=None, **fit_params) -> HurstExponent:
X = self._validate_signal(X)
super().fit(X, y, **fit_params)
self.feature_names_ = ["hurst_exponent"]
return self
[docs]
class DFA(BaseFeatureExtractor):
"""Detrended Fluctuation Analysis feature extractor.
Parameters
----------
fs : float, optional
Sample rate in Hz.
scale_min : int, default=4
Minimum scale.
scale_max : int, optional
Maximum scale.
n_scales : int, default=10
Number of scales.
Examples
--------
>>> dfa = DFA()
>>> features = dfa.fit_transform(signals)
"""
def __init__(
self,
fs: float | None = None,
scale_min: int = 4,
scale_max: int | None = None,
n_scales: int = 10,
):
super().__init__(fs=fs)
self.scale_min = scale_min
self.scale_max = scale_max
self.n_scales = n_scales
[docs]
def fit(self, X, y=None, **fit_params) -> DFA:
X = self._validate_signal(X)
super().fit(X, y, **fit_params)
self.feature_names_ = ["dfa_alpha"]
return self
[docs]
class LempelZivComplexity(BaseFeatureExtractor):
"""Lempel-Ziv complexity feature extractor.
Parameters
----------
fs : float, optional
Sample rate in Hz.
threshold : float, optional
Binarization threshold.
Examples
--------
>>> lzc = LempelZivComplexity()
>>> features = lzc.fit_transform(signals)
"""
def __init__(self, fs: float | None = None, threshold: float | None = None):
super().__init__(fs=fs)
self.threshold = threshold
[docs]
def fit(self, X, y=None, **fit_params) -> LempelZivComplexity:
X = self._validate_signal(X)
super().fit(X, y, **fit_params)
self.feature_names_ = ["lempel_ziv_complexity"]
return self