from __future__ import annotations
"""Unified data loading utilities for AutoML.
This module provides utilities for loading data from various sources
(CSV, Parquet, DataFrames) and inferring task types.
"""
import logging
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
[docs]
def load_data(
data: str | Path | pd.DataFrame | np.ndarray,
label: str | None = None,
**kwargs,
) -> tuple[pd.DataFrame, pd.Series | None]:
"""Load data from various sources.
Parameters
----------
data : str, Path, DataFrame, or ndarray
Input data. Can be:
- Path to CSV or Parquet file
- pandas DataFrame
- numpy array (label must be last column or specified separately)
label : str, optional
Name of the target column.
**kwargs
Additional arguments passed to pandas read functions.
Returns
-------
tuple of (DataFrame, Series or None)
Features and target (if label specified).
Examples
--------
>>> X, y = load_data("train.csv", label="target")
>>> X, y = load_data(df, label="price")
>>> X, _ = load_data("test.csv") # No label for test data
"""
# Load from file path
if isinstance(data, (str, Path)):
data = Path(data)
if not data.exists():
raise FileNotFoundError(f"Data file not found: {data}")
suffix = data.suffix.lower()
if suffix == ".csv":
df = pd.read_csv(data, **kwargs)
elif suffix in (".parquet", ".pq"):
df = pd.read_parquet(data, **kwargs)
elif suffix in (".feather", ".ftr"):
df = pd.read_feather(data, **kwargs)
elif suffix == ".json":
df = pd.read_json(data, **kwargs)
elif suffix in (".xlsx", ".xls"):
df = pd.read_excel(data, **kwargs)
elif suffix == ".pkl":
df = pd.read_pickle(data, **kwargs)
else:
# Try CSV as default
logger.warning(f"Unknown file extension {suffix}, trying CSV")
df = pd.read_csv(data, **kwargs)
# Convert numpy array to DataFrame
elif isinstance(data, np.ndarray):
n_cols = data.shape[1] if data.ndim > 1 else 1
if label is not None and isinstance(label, int):
# Label is column index
columns = [f"feature_{i}" for i in range(n_cols)]
columns[label] = "target"
label = "target"
else:
columns = [f"feature_{i}" for i in range(n_cols)]
if label is None:
# Assume last column is target
columns[-1] = "target"
label = "target"
df = pd.DataFrame(data, columns=columns)
# Already a DataFrame
elif isinstance(data, pd.DataFrame):
df = data.copy()
else:
raise TypeError(f"Unsupported data type: {type(data)}")
# Split features and target
if label is not None and label in df.columns:
y = df[label]
X = df.drop(columns=[label])
return X, y
else:
return df, None
def split_features_target(
df: pd.DataFrame,
label: str,
) -> tuple[pd.DataFrame, pd.Series]:
"""Split DataFrame into features and target.
Parameters
----------
df : DataFrame
Input DataFrame.
label : str
Name of the target column.
Returns
-------
tuple of (DataFrame, Series)
Features and target.
"""
if label not in df.columns:
raise ValueError(f"Label column '{label}' not found in DataFrame")
y = df[label]
X = df.drop(columns=[label])
return X, y
[docs]
def infer_task_type(
y: pd.Series | np.ndarray,
threshold: int = 10,
) -> str:
"""Infer task type from target variable.
Parameters
----------
y : array-like
Target variable.
threshold : int, default=10
Maximum number of unique values to consider as classification.
Returns
-------
str
Task type: "binary", "multiclass", or "regression".
Examples
--------
>>> infer_task_type(pd.Series([0, 1, 0, 1]))
'binary'
>>> infer_task_type(pd.Series([0, 1, 2, 3]))
'multiclass'
>>> infer_task_type(pd.Series([1.5, 2.3, 4.1, 5.7]))
'regression'
"""
if isinstance(y, pd.Series):
y_arr = y.values
else:
y_arr = np.asarray(y)
# Check for string/object types -> classification
if y_arr.dtype == object or isinstance(y_arr[0], str):
n_unique = len(np.unique(y_arr))
return "binary" if n_unique == 2 else "multiclass"
# Check for boolean -> binary classification
if y_arr.dtype == bool:
return "binary"
# Check number of unique values
n_unique = len(np.unique(y_arr[~np.isnan(y_arr)] if np.issubdtype(y_arr.dtype, np.floating) else y_arr))
# Few unique values -> classification
if n_unique <= threshold:
return "binary" if n_unique == 2 else "multiclass"
# Check if all values are integers
if np.issubdtype(y_arr.dtype, np.integer):
# Could still be classification with many classes
if n_unique <= 100:
return "binary" if n_unique == 2 else "multiclass"
# Default to regression
return "regression"
def infer_column_types(
df: pd.DataFrame,
max_unique_ratio: float = 0.05,
max_unique_absolute: int = 50,
) -> dict[str, list[str]]:
"""Infer column types for preprocessing.
Parameters
----------
df : DataFrame
Input DataFrame.
max_unique_ratio : float, default=0.05
Maximum ratio of unique values to consider as categorical.
max_unique_absolute : int, default=50
Maximum absolute number of unique values for categorical.
Returns
-------
dict
Dictionary with keys: "numeric", "categorical", "datetime", "text".
Examples
--------
>>> types = infer_column_types(df)
>>> print(types["numeric"])
['age', 'income', 'balance']
>>> print(types["categorical"])
['gender', 'country', 'product_type']
"""
column_types = {
"numeric": [],
"categorical": [],
"datetime": [],
"text": [],
"boolean": [],
}
n_rows = len(df)
for col in df.columns:
dtype = df[col].dtype
n_unique = df[col].nunique()
unique_ratio = n_unique / max(n_rows, 1)
# Boolean columns
if dtype == bool or (n_unique == 2 and set(df[col].dropna().unique()).issubset({0, 1})): # noqa: E721
column_types["boolean"].append(col)
# Datetime columns
elif pd.api.types.is_datetime64_any_dtype(dtype):
column_types["datetime"].append(col)
# Object/string columns
elif dtype == object: # noqa: E721
# Check if it looks like datetime
sample = df[col].dropna().head(100)
try:
pd.to_datetime(sample)
column_types["datetime"].append(col)
continue
except (ValueError, TypeError):
pass
# Check if it's categorical or text
avg_len = sample.astype(str).str.len().mean()
if avg_len > 50 or n_unique > max_unique_absolute:
column_types["text"].append(col)
else:
column_types["categorical"].append(col)
# Numeric columns
elif pd.api.types.is_numeric_dtype(dtype):
# Check if it should be categorical
if unique_ratio <= max_unique_ratio and n_unique <= max_unique_absolute:
column_types["categorical"].append(col)
else:
column_types["numeric"].append(col)
else:
# Default to numeric
column_types["numeric"].append(col)
return column_types
[docs]
class DataLoader:
"""Unified data loader for AutoML.
Handles loading from multiple sources and caches loaded data.
Parameters
----------
label : str
Name of the target column.
problem_type : str, default="auto"
Problem type: "auto", "binary", "multiclass", "regression".
sample_size : int, optional
If set, sample this many rows for faster development.
random_state : int, default=42
Random seed for sampling.
Attributes
----------
train_data_ : DataFrame
Loaded training data.
X_train_ : DataFrame
Training features.
y_train_ : Series
Training target.
column_types_ : dict
Inferred column types.
task_type_ : str
Inferred or specified task type.
Examples
--------
>>> loader = DataLoader(label="target")
>>> loader.load_train("train.csv")
>>> X, y = loader.get_train()
>>> X_test = loader.load_test("test.csv")
"""
def __init__(
self,
label: str,
problem_type: str = "auto",
sample_size: int | None = None,
random_state: int = 42,
):
self.label = label
self.problem_type = problem_type
self.sample_size = sample_size
self.random_state = random_state
# State
self.train_data_: pd.DataFrame | None = None
self.X_train_: pd.DataFrame | None = None
self.y_train_: pd.Series | None = None
self.test_data_: pd.DataFrame | None = None
self.X_test_: pd.DataFrame | None = None
self.column_types_: dict[str, list[str]] | None = None
self.task_type_: str | None = None
[docs]
def load_train(
self,
data: str | Path | pd.DataFrame | np.ndarray,
**kwargs,
) -> DataLoader:
"""Load training data.
Parameters
----------
data : str, Path, DataFrame, or ndarray
Training data source.
**kwargs
Additional arguments for data loading.
Returns
-------
self
"""
X, y = load_data(data, label=self.label, **kwargs)
if y is None:
raise ValueError(f"Label column '{self.label}' not found in training data")
# Sample if requested
if self.sample_size is not None and len(X) > self.sample_size:
np.random.seed(self.random_state)
indices = np.random.choice(len(X), self.sample_size, replace=False)
X = X.iloc[indices].reset_index(drop=True)
y = y.iloc[indices].reset_index(drop=True)
logger.info(f"Sampled {self.sample_size} rows from training data")
self.train_data_ = pd.concat([X, y.rename(self.label)], axis=1)
self.X_train_ = X
self.y_train_ = y
# Infer column types
self.column_types_ = infer_column_types(X)
# Infer task type
if self.problem_type == "auto":
self.task_type_ = infer_task_type(y)
else:
self.task_type_ = self.problem_type
logger.info(
f"Loaded training data: {X.shape[0]} rows, {X.shape[1]} features, "
f"task_type={self.task_type_}"
)
return self
[docs]
def load_test(
self,
data: str | Path | pd.DataFrame | np.ndarray,
**kwargs,
) -> pd.DataFrame:
"""Load test data.
Parameters
----------
data : str, Path, DataFrame, or ndarray
Test data source.
**kwargs
Additional arguments for data loading.
Returns
-------
DataFrame
Test features.
"""
X, _ = load_data(data, label=None, **kwargs)
# Remove label column if present
if self.label in X.columns:
X = X.drop(columns=[self.label])
self.test_data_ = X
self.X_test_ = X
logger.info(f"Loaded test data: {X.shape[0]} rows, {X.shape[1]} features")
return X
[docs]
def get_train(self) -> tuple[pd.DataFrame, pd.Series]:
"""Get training features and target.
Returns
-------
tuple of (DataFrame, Series)
Training features and target.
"""
if self.X_train_ is None:
raise ValueError("Training data not loaded. Call load_train() first.")
return self.X_train_, self.y_train_
[docs]
def get_test(self) -> pd.DataFrame:
"""Get test features.
Returns
-------
DataFrame
Test features.
"""
if self.X_test_ is None:
raise ValueError("Test data not loaded. Call load_test() first.")
return self.X_test_
[docs]
def get_validation_split(
self,
val_frac: float = 0.2,
stratify: bool = True,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
"""Split training data into train and validation sets.
Parameters
----------
val_frac : float, default=0.2
Fraction of data for validation.
stratify : bool, default=True
Whether to use stratified split for classification.
Returns
-------
tuple
(X_train, X_val, y_train, y_val)
"""
from sklearn.model_selection import train_test_split
if self.X_train_ is None:
raise ValueError("Training data not loaded. Call load_train() first.")
stratify_arg = self.y_train_ if (stratify and self.task_type_ != "regression") else None
X_train, X_val, y_train, y_val = train_test_split(
self.X_train_,
self.y_train_,
test_size=val_frac,
random_state=self.random_state,
stratify=stratify_arg,
)
return X_train, X_val, y_train, y_val
[docs]
def get_summary(self) -> dict[str, Any]:
"""Get summary of loaded data.
Returns
-------
dict
Data summary.
"""
summary = {
"label": self.label,
"task_type": self.task_type_,
"problem_type": self.problem_type,
}
if self.X_train_ is not None:
summary.update({
"n_train_samples": len(self.X_train_),
"n_features": self.X_train_.shape[1],
"column_types": {k: len(v) for k, v in self.column_types_.items()},
})
if self.task_type_ in ("binary", "multiclass"):
summary["n_classes"] = self.y_train_.nunique()
summary["class_distribution"] = self.y_train_.value_counts().to_dict()
if self.X_test_ is not None:
summary["n_test_samples"] = len(self.X_test_)
return summary