Source code for endgame.preprocessing.noise_detection

"""Label Noise Detection for data cleaning.

Implements Confident Learning and ensemble-based approaches to identify
mislabeled training examples.

References
----------
- Northcutt et al., 2021 - "Confident Learning: Estimating Uncertainty in
  Dataset Labels" (JAIR)

Example
-------
>>> from endgame.preprocessing import ConfidentLearningFilter
>>> clf = ConfidentLearningFilter(base_estimator='xgboost')
>>> noise_mask = clf.fit_detect(X, y)
>>> X_clean, y_clean = X[~noise_mask], y[~noise_mask]
"""

from __future__ import annotations

from typing import Any

import numpy as np
from sklearn.base import BaseEstimator, clone
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, cross_val_predict


[docs] class ConfidentLearningFilter(BaseEstimator): """Identify mislabeled examples using Confident Learning. Uses cross-validated predicted probabilities to estimate the joint distribution of noisy and true labels, then identifies examples that are likely mislabeled. Parameters ---------- base_estimator : estimator or str, default='rf' Classifier to use for cross-validated probability estimation. Can be 'rf' (RandomForest), 'xgboost', 'lgbm', or any sklearn-compatible classifier with predict_proba. cv : int, default=5 Number of cross-validation folds for probability estimation. threshold : float or str, default='auto' Confidence threshold for identifying noise. If 'auto', uses per-class average predicted probability as threshold. If float, uses the same threshold for all classes. method : str, default='prune_by_class' Method for identifying noisy labels: - 'prune_by_class': Remove examples with low self-confidence - 'prune_by_noise_rate': Remove based on estimated noise rates - 'both': Intersection of both methods (most conservative) n_jobs : int, default=1 Number of parallel jobs for cross-validation. random_state : int or None, default=None Random state for reproducibility. Attributes ---------- noise_mask_ : ndarray of shape (n_samples,) Boolean mask where True indicates suspected noisy labels. noise_indices_ : ndarray Indices of suspected noisy examples. confident_joint_ : ndarray of shape (n_classes, n_classes) Estimated joint distribution of noisy vs. true labels. noise_rate_ : float Estimated overall noise rate. per_class_noise_rate_ : ndarray Estimated noise rate per class. pred_proba_ : ndarray of shape (n_samples, n_classes) Cross-validated predicted probabilities. Example ------- >>> clf = ConfidentLearningFilter(base_estimator='rf', cv=5) >>> noise_mask = clf.fit_detect(X, y) >>> print(f"Found {noise_mask.sum()} noisy labels ({noise_mask.mean():.1%})") >>> X_clean, y_clean = X[~noise_mask], y[~noise_mask] """ def __init__( self, base_estimator: str | Any = "rf", cv: int = 5, threshold: float | str = "auto", method: str = "prune_by_class", n_jobs: int = 1, random_state: int | None = None, ): self.base_estimator = base_estimator self.cv = cv self.threshold = threshold self.method = method self.n_jobs = n_jobs self.random_state = random_state def _get_estimator(self): """Resolve the base estimator.""" if isinstance(self.base_estimator, str): if self.base_estimator == "rf": return RandomForestClassifier( n_estimators=100, max_depth=None, n_jobs=self.n_jobs, random_state=self.random_state, ) elif self.base_estimator == "xgboost": try: from xgboost import XGBClassifier return XGBClassifier( n_estimators=100, max_depth=6, random_state=self.random_state, verbosity=0, ) except ImportError: raise ImportError("xgboost is required for base_estimator='xgboost'") elif self.base_estimator == "lgbm": try: from lightgbm import LGBMClassifier return LGBMClassifier( n_estimators=100, max_depth=6, random_state=self.random_state, verbose=-1, ) except ImportError: raise ImportError("lightgbm is required for base_estimator='lgbm'") else: raise ValueError(f"Unknown estimator string: {self.base_estimator}") return clone(self.base_estimator)
[docs] def fit(self, X, y) -> ConfidentLearningFilter: """Fit the noise detector. Parameters ---------- X : array-like of shape (n_samples, n_features) Training features. y : array-like of shape (n_samples,) Noisy training labels. Returns ------- self """ X = np.asarray(X) y = np.asarray(y) self.classes_ = np.unique(y) n_classes = len(self.classes_) n_samples = len(y) # Map labels to 0..n_classes-1 label_map = {c: i for i, c in enumerate(self.classes_)} y_mapped = np.array([label_map[c] for c in y]) # Cross-validated predicted probabilities estimator = self._get_estimator() cv = StratifiedKFold( n_splits=self.cv, shuffle=True, random_state=self.random_state, ) self.pred_proba_ = cross_val_predict( estimator, X, y_mapped, cv=cv, method="predict_proba", n_jobs=self.n_jobs, ) # Compute per-class thresholds if self.threshold == "auto": thresholds = np.zeros(n_classes) for k in range(n_classes): class_mask = y_mapped == k if class_mask.sum() > 0: thresholds[k] = self.pred_proba_[class_mask, k].mean() else: thresholds[k] = 0.5 else: thresholds = np.full(n_classes, float(self.threshold)) # Compute confident joint matrix self.confident_joint_ = np.zeros((n_classes, n_classes), dtype=int) for i in range(n_samples): given_label = y_mapped[i] proba = self.pred_proba_[i] # Find classes where predicted probability exceeds threshold confident_classes = np.where(proba >= thresholds)[0] if len(confident_classes) == 0: # Use argmax as fallback confident_classes = [np.argmax(proba)] for pred_label in confident_classes: self.confident_joint_[given_label, pred_label] += 1 # Estimate noise rates self.per_class_noise_rate_ = np.zeros(n_classes) for k in range(n_classes): total_k = self.confident_joint_[k].sum() if total_k > 0: self.per_class_noise_rate_[k] = 1.0 - self.confident_joint_[k, k] / total_k self.noise_rate_ = float(np.average( self.per_class_noise_rate_, weights=np.bincount(y_mapped, minlength=n_classes), )) # Identify noisy examples self.noise_mask_ = self._identify_noise(y_mapped, thresholds) self.noise_indices_ = np.where(self.noise_mask_)[0] return self
def _identify_noise(self, y_mapped, thresholds): """Identify noisy examples based on the chosen method.""" n_samples = len(y_mapped) n_classes = len(self.classes_) if self.method == "prune_by_class": return self._prune_by_class(y_mapped, thresholds) elif self.method == "prune_by_noise_rate": return self._prune_by_noise_rate(y_mapped) elif self.method == "both": mask1 = self._prune_by_class(y_mapped, thresholds) mask2 = self._prune_by_noise_rate(y_mapped) return mask1 & mask2 else: raise ValueError(f"Unknown method: {self.method}") def _prune_by_class(self, y_mapped, thresholds): """Prune examples with low self-confidence.""" n_samples = len(y_mapped) noise_mask = np.zeros(n_samples, dtype=bool) for i in range(n_samples): given_label = y_mapped[i] self_confidence = self.pred_proba_[i, given_label] pred_label = np.argmax(self.pred_proba_[i]) # Mark as noisy if self-confidence is below threshold # AND predicted class differs from given label if self_confidence < thresholds[given_label] and pred_label != given_label: noise_mask[i] = True return noise_mask def _prune_by_noise_rate(self, y_mapped): """Prune based on estimated noise rates per class.""" n_samples = len(y_mapped) n_classes = len(self.classes_) noise_mask = np.zeros(n_samples, dtype=bool) for k in range(n_classes): class_mask = y_mapped == k class_indices = np.where(class_mask)[0] if len(class_indices) == 0: continue # Sort by self-confidence (ascending = most likely noisy first) self_confidences = self.pred_proba_[class_indices, k] sorted_idx = np.argsort(self_confidences) # Remove the estimated number of noisy examples n_noisy = int(self.per_class_noise_rate_[k] * len(class_indices)) noisy_in_class = class_indices[sorted_idx[:n_noisy]] noise_mask[noisy_in_class] = True return noise_mask
[docs] def fit_detect(self, X, y) -> np.ndarray: """Fit and return the noise mask. Parameters ---------- X : array-like of shape (n_samples, n_features) Training features. y : array-like of shape (n_samples,) Noisy training labels. Returns ------- noise_mask : ndarray of shape (n_samples,) Boolean mask where True indicates suspected noisy label. """ self.fit(X, y) return self.noise_mask_
[docs] def clean(self, X, y): """Fit and return cleaned data. Parameters ---------- X : array-like Features. y : array-like Labels. Returns ------- X_clean : ndarray Features with noisy examples removed. y_clean : ndarray Labels with noisy examples removed. """ noise_mask = self.fit_detect(X, y) X = np.asarray(X) y = np.asarray(y) return X[~noise_mask], y[~noise_mask]
[docs] class ConsensusFilter(BaseEstimator): """Identify noisy labels via consensus of multiple classifiers. Trains multiple diverse classifiers and identifies examples where the majority disagree with the given label. Parameters ---------- estimators : list of estimators, optional List of classifiers to use. If None, uses a default diverse set. cv : int, default=5 Cross-validation folds for prediction. consensus_threshold : float, default=0.5 Fraction of classifiers that must disagree with the given label for it to be flagged as noisy. n_jobs : int, default=1 Number of parallel jobs. random_state : int or None, default=None Random state for reproducibility. Example ------- >>> from endgame.preprocessing import ConsensusFilter >>> cf = ConsensusFilter(consensus_threshold=0.7) >>> noise_mask = cf.fit_detect(X, y) """ def __init__( self, estimators=None, cv: int = 5, consensus_threshold: float = 0.5, n_jobs: int = 1, random_state: int | None = None, ): self.estimators = estimators self.cv = cv self.consensus_threshold = consensus_threshold self.n_jobs = n_jobs self.random_state = random_state def _get_default_estimators(self): """Get a diverse set of default classifiers.""" estimators = [ RandomForestClassifier( n_estimators=100, random_state=self.random_state, n_jobs=1 ), ] try: from sklearn.linear_model import LogisticRegression estimators.append( LogisticRegression(max_iter=1000, random_state=self.random_state) ) except ImportError: pass try: from sklearn.neighbors import KNeighborsClassifier estimators.append(KNeighborsClassifier(n_neighbors=10)) except ImportError: pass try: from sklearn.ensemble import GradientBoostingClassifier estimators.append( GradientBoostingClassifier( n_estimators=50, max_depth=4, random_state=self.random_state ) ) except ImportError: pass return estimators
[docs] def fit(self, X, y) -> ConsensusFilter: """Fit the consensus noise detector. Parameters ---------- X : array-like of shape (n_samples, n_features) y : array-like of shape (n_samples,) Returns ------- self """ X = np.asarray(X) y = np.asarray(y) estimators = self.estimators or self._get_default_estimators() cv = StratifiedKFold( n_splits=self.cv, shuffle=True, random_state=self.random_state, ) n_samples = len(y) disagreement_counts = np.zeros(n_samples, dtype=int) for est in estimators: try: oof_preds = cross_val_predict(est, X, y, cv=cv, n_jobs=self.n_jobs) disagreement_counts += (oof_preds != y).astype(int) except Exception: continue n_estimators_used = max(len(estimators), 1) self.disagreement_ratio_ = disagreement_counts / n_estimators_used self.noise_mask_ = self.disagreement_ratio_ >= self.consensus_threshold self.noise_indices_ = np.where(self.noise_mask_)[0] self.noise_rate_ = float(self.noise_mask_.mean()) return self
[docs] def fit_detect(self, X, y) -> np.ndarray: """Fit and return noise mask.""" self.fit(X, y) return self.noise_mask_
[docs] def clean(self, X, y): """Fit and return cleaned data.""" noise_mask = self.fit_detect(X, y) X = np.asarray(X) y = np.asarray(y) return X[~noise_mask], y[~noise_mask]
[docs] class CrossValNoiseDetector(BaseEstimator): """Simple cross-validated noise detection. Flags examples that are consistently misclassified across CV folds as potentially noisy. Parameters ---------- base_estimator : estimator, default=None Classifier to use. If None, uses RandomForestClassifier. cv : int, default=5 Number of CV folds. n_repeats : int, default=3 Number of repetitions with different random seeds. misclassification_threshold : float, default=0.5 Fraction of times an example must be misclassified across all folds and repeats to be flagged as noisy. random_state : int or None, default=None Random state. Example ------- >>> detector = CrossValNoiseDetector(n_repeats=5) >>> noise_mask = detector.fit_detect(X, y) """ def __init__( self, base_estimator=None, cv: int = 5, n_repeats: int = 3, misclassification_threshold: float = 0.5, random_state: int | None = None, ): self.base_estimator = base_estimator self.cv = cv self.n_repeats = n_repeats self.misclassification_threshold = misclassification_threshold self.random_state = random_state
[docs] def fit(self, X, y) -> CrossValNoiseDetector: """Fit the noise detector.""" X = np.asarray(X) y = np.asarray(y) estimator = self.base_estimator or RandomForestClassifier( n_estimators=100, random_state=self.random_state ) n_samples = len(y) misclassification_counts = np.zeros(n_samples, dtype=int) total_evaluations = np.zeros(n_samples, dtype=int) rng = np.random.RandomState(self.random_state) for rep in range(self.n_repeats): seed = rng.randint(0, 2**31) cv = StratifiedKFold( n_splits=self.cv, shuffle=True, random_state=seed ) try: oof_preds = cross_val_predict(clone(estimator), X, y, cv=cv) misclassification_counts += (oof_preds != y).astype(int) total_evaluations += 1 except Exception: continue total_evaluations = np.maximum(total_evaluations, 1) self.misclassification_rate_ = misclassification_counts / total_evaluations self.noise_mask_ = self.misclassification_rate_ >= self.misclassification_threshold self.noise_indices_ = np.where(self.noise_mask_)[0] self.noise_rate_ = float(self.noise_mask_.mean()) return self
[docs] def fit_detect(self, X, y) -> np.ndarray: """Fit and return noise mask.""" self.fit(X, y) return self.noise_mask_
[docs] def clean(self, X, y): """Fit and return cleaned data.""" noise_mask = self.fit_detect(X, y) X = np.asarray(X) y = np.asarray(y) return X[~noise_mask], y[~noise_mask]