Source code for endgame.models.rules.furia

"""FURIA (Fuzzy Unordered Rule Induction Algorithm) implementation.

FURIA is a fuzzy extension of RIPPER that:
1. Learns crisp rules using a RIPPER-like algorithm
2. Fuzzifies rules by replacing sharp boundaries with trapezoidal membership functions
3. Uses "rule stretching" to cover instances not covered by any rule

References
----------
- Hühn & Hüllermeier, "FURIA: An Algorithm for Unordered Fuzzy Rule Induction" (2009)

Example
-------
>>> from endgame.models.rules import FURIAClassifier
>>> clf = FURIAClassifier(max_rules=50)
>>> clf.fit(X_train, y_train)
>>> predictions = clf.predict(X_test)
>>> print(clf.get_rules_str())  # Human-readable rules
"""

from __future__ import annotations

from dataclasses import dataclass, field

import numpy as np
from numpy.typing import ArrayLike
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.validation import check_array, check_is_fitted, check_X_y

try:
    from numba import jit, prange
    HAS_NUMBA = True
except ImportError:
    HAS_NUMBA = False
    # Fallback decorator
    def jit(*args, **kwargs):
        def decorator(func):
            return func
        if len(args) == 1 and callable(args[0]):
            return args[0]
        return decorator
    prange = range


[docs] @dataclass class FuzzyCondition: """A fuzzy condition with trapezoidal membership function. The membership function is defined by four points (a, b, c, d): - membership = 0 for x <= a or x >= d - membership = 1 for b <= x <= c - linear interpolation for a < x < b and c < x < d For crisp conditions, a=b and c=d. Parameters ---------- feature_idx : int Index of the feature. feature_name : str Name of the feature. lower_bound : float or None Lower bound (a, b) of trapezoidal function. None means -inf. upper_bound : float or None Upper bound (c, d) of trapezoidal function. None means +inf. lower_support : float Support point a (where membership starts rising). upper_support : float Support point d (where membership ends). """ feature_idx: int feature_name: str lower_bound: float | None = None # b: core lower (membership = 1) upper_bound: float | None = None # c: core upper (membership = 1) lower_support: float | None = None # a: support lower (membership = 0) upper_support: float | None = None # d: support upper (membership = 0)
[docs] def membership(self, X: np.ndarray) -> np.ndarray: """Compute fuzzy membership degree for samples. Parameters ---------- X : ndarray of shape (n_samples, n_features) Input data. Returns ------- membership : ndarray of shape (n_samples,) Membership degree in [0, 1] for each sample. """ x = X[:, self.feature_idx] n = len(x) membership = np.ones(n) # Lower bound fuzzy region if self.lower_bound is not None: if self.lower_support is not None and self.lower_support < self.lower_bound: # Fuzzy lower bound mask_below = x < self.lower_support mask_fuzzy = (x >= self.lower_support) & (x < self.lower_bound) membership[mask_below] = 0.0 if np.any(mask_fuzzy): denom = self.lower_bound - self.lower_support if denom > 0: membership[mask_fuzzy] = (x[mask_fuzzy] - self.lower_support) / denom else: # Crisp lower bound membership[x < self.lower_bound] = 0.0 # Upper bound fuzzy region if self.upper_bound is not None: if self.upper_support is not None and self.upper_support > self.upper_bound: # Fuzzy upper bound mask_above = x > self.upper_support mask_fuzzy = (x > self.upper_bound) & (x <= self.upper_support) membership[mask_above] = 0.0 if np.any(mask_fuzzy): denom = self.upper_support - self.upper_bound if denom > 0: membership[mask_fuzzy] = (self.upper_support - x[mask_fuzzy]) / denom else: # Crisp upper bound membership[x > self.upper_bound] = 0.0 return membership
def __str__(self) -> str: parts = [] if self.lower_bound is not None: if self.lower_support is not None and self.lower_support < self.lower_bound: parts.append(f"{self.feature_name} >= ~{self.lower_bound:.4g}") else: parts.append(f"{self.feature_name} >= {self.lower_bound:.4g}") if self.upper_bound is not None: if self.upper_support is not None and self.upper_support > self.upper_bound: parts.append(f"{self.feature_name} <= ~{self.upper_bound:.4g}") else: parts.append(f"{self.feature_name} <= {self.upper_bound:.4g}") return " AND ".join(parts) if parts else "TRUE"
[docs] @dataclass class FuzzyRule: """A fuzzy rule consisting of multiple fuzzy conditions. The rule's firing strength is the minimum membership across all conditions (fuzzy AND via t-norm). Parameters ---------- conditions : list of FuzzyCondition The fuzzy conditions that make up this rule. consequent : int The class label this rule predicts. weight : float Rule weight (confidence/accuracy on training data). support : int Number of training instances covered by this rule. """ conditions: list[FuzzyCondition] = field(default_factory=list) consequent: int = 0 weight: float = 1.0 support: int = 0
[docs] def firing_strength(self, X: np.ndarray) -> np.ndarray: """Compute firing strength (membership degree) for samples. Uses minimum t-norm for fuzzy AND. Parameters ---------- X : ndarray of shape (n_samples, n_features) Input data. Returns ------- strength : ndarray of shape (n_samples,) Firing strength in [0, 1] for each sample. """ if not self.conditions: return np.ones(X.shape[0]) # Start with all ones strength = np.ones(X.shape[0]) # Apply minimum t-norm (fuzzy AND) for cond in self.conditions: strength = np.minimum(strength, cond.membership(X)) return strength
[docs] def covers(self, X: np.ndarray, threshold: float = 0.0) -> np.ndarray: """Check which samples are covered by this rule. Parameters ---------- X : ndarray of shape (n_samples, n_features) Input data. threshold : float Minimum firing strength to be considered covered. Returns ------- covered : ndarray of shape (n_samples,) dtype=bool True where sample is covered by rule. """ return self.firing_strength(X) > threshold
def __str__(self) -> str: if not self.conditions: antecedent = "TRUE" else: antecedent = " AND ".join(str(c) for c in self.conditions) return f"IF {antecedent} THEN class={self.consequent} (weight={self.weight:.3f}, support={self.support})"
def _grow_rule( X: np.ndarray, y: np.ndarray, target_class: int, feature_names: list[str], min_support: int = 2, max_conditions: int = 10, ) -> tuple[FuzzyRule, np.ndarray]: """Grow a single rule using RIPPER-like sequential covering. Parameters ---------- X : ndarray of shape (n_samples, n_features) Training data. y : ndarray of shape (n_samples,) Target labels. target_class : int The class this rule should predict. feature_names : list of str Feature names. min_support : int Minimum number of positive examples covered. max_conditions : int Maximum number of conditions in a rule. Returns ------- rule : FuzzyRule The grown rule. covered : ndarray of shape (n_samples,) dtype=bool Which samples are covered by the rule. """ n_samples, n_features = X.shape positive = (y == target_class) # Start with empty rule (covers everything) conditions = [] covered = np.ones(n_samples, dtype=bool) for _ in range(max_conditions): # Find the best condition to add best_gain = -np.inf best_condition = None best_new_covered = None # Current stats n_covered = covered.sum() n_pos_covered = (covered & positive).sum() if n_pos_covered < min_support: break # Try each feature and possible split points for feat_idx in range(n_features): feat_values = X[covered, feat_idx] feat_pos = positive[covered] if len(feat_values) < min_support: continue unique_vals = np.unique(feat_values) if len(unique_vals) < 2: continue # Subsample split candidates for speed on large feature spaces max_splits = 50 if len(unique_vals) - 1 > max_splits: quantile_idx = np.linspace(0, len(unique_vals) - 2, max_splits, dtype=int) candidate_idx = np.unique(quantile_idx) else: candidate_idx = np.arange(len(unique_vals) - 1) for i in candidate_idx: threshold = (unique_vals[i] + unique_vals[i + 1]) / 2 # Try lower bound condition (>=) new_covered_lower = covered & (X[:, feat_idx] >= threshold) n_new_covered = new_covered_lower.sum() n_new_pos = (new_covered_lower & positive).sum() if n_new_covered >= min_support and n_new_pos >= 1: # FOIL-like information gain if n_new_covered > 0 and n_covered > 0: p_old = n_pos_covered / n_covered if n_covered > 0 else 0 p_new = n_new_pos / n_new_covered if n_new_covered > 0 else 0 # Only improve purity, not make it worse if p_new > p_old: gain = n_new_pos * (np.log2(p_new + 1e-10) - np.log2(p_old + 1e-10)) if gain > best_gain: best_gain = gain best_condition = FuzzyCondition( feature_idx=feat_idx, feature_name=feature_names[feat_idx], lower_bound=threshold, upper_bound=None, ) best_new_covered = new_covered_lower # Try upper bound condition (<=) new_covered_upper = covered & (X[:, feat_idx] <= threshold) n_new_covered = new_covered_upper.sum() n_new_pos = (new_covered_upper & positive).sum() if n_new_covered >= min_support and n_new_pos >= 1: if n_new_covered > 0 and n_covered > 0: p_old = n_pos_covered / n_covered if n_covered > 0 else 0 p_new = n_new_pos / n_new_covered if n_new_covered > 0 else 0 if p_new > p_old: gain = n_new_pos * (np.log2(p_new + 1e-10) - np.log2(p_old + 1e-10)) if gain > best_gain: best_gain = gain best_condition = FuzzyCondition( feature_idx=feat_idx, feature_name=feature_names[feat_idx], lower_bound=None, upper_bound=threshold, ) best_new_covered = new_covered_upper if best_condition is None: break conditions.append(best_condition) covered = best_new_covered # Stop if we've covered only positive examples n_neg_covered = (covered & ~positive).sum() if n_neg_covered == 0: break # Compute rule statistics n_covered = covered.sum() n_pos_covered = (covered & positive).sum() weight = n_pos_covered / n_covered if n_covered > 0 else 0.0 rule = FuzzyRule( conditions=conditions, consequent=target_class, weight=weight, support=int(n_pos_covered), ) return rule, covered def _fuzzify_rule( rule: FuzzyRule, X: np.ndarray, y: np.ndarray, ) -> FuzzyRule: """Fuzzify a crisp rule by adding support regions to conditions. Uses the training data to determine appropriate fuzzy boundaries by finding the range of positive examples near each boundary. Parameters ---------- rule : FuzzyRule The crisp rule to fuzzify. X : ndarray of shape (n_samples, n_features) Training data. y : ndarray of shape (n_samples,) Target labels. Returns ------- fuzzy_rule : FuzzyRule The fuzzified rule. """ target_class = rule.consequent positive = (y == target_class) new_conditions = [] for cond in rule.conditions: feat_idx = cond.feature_idx feat_values = X[:, feat_idx] new_cond = FuzzyCondition( feature_idx=cond.feature_idx, feature_name=cond.feature_name, lower_bound=cond.lower_bound, upper_bound=cond.upper_bound, lower_support=cond.lower_support, upper_support=cond.upper_support, ) # Fuzzify lower bound if cond.lower_bound is not None: # Find positive instances below the boundary pos_below = feat_values[(feat_values < cond.lower_bound) & positive] if len(pos_below) > 0: # Extend support to include some positive instances new_cond.lower_support = np.percentile(pos_below, 10) if len(pos_below) > 1 else pos_below.min() else: new_cond.lower_support = cond.lower_bound # Fuzzify upper bound if cond.upper_bound is not None: # Find positive instances above the boundary pos_above = feat_values[(feat_values > cond.upper_bound) & positive] if len(pos_above) > 0: # Extend support to include some positive instances new_cond.upper_support = np.percentile(pos_above, 90) if len(pos_above) > 1 else pos_above.max() else: new_cond.upper_support = cond.upper_bound new_conditions.append(new_cond) return FuzzyRule( conditions=new_conditions, consequent=rule.consequent, weight=rule.weight, support=rule.support, ) def _stretch_rules( rules: list[FuzzyRule], X: np.ndarray, y: np.ndarray, stretch_factor: float = 0.1, ) -> list[FuzzyRule]: """Stretch rules to cover uncovered instances. For instances not covered by any rule, find the nearest rule and stretch its boundaries to include the instance. Parameters ---------- rules : list of FuzzyRule The rules to stretch. X : ndarray of shape (n_samples, n_features) Training data. y : ndarray of shape (n_samples,) Target labels. stretch_factor : float Maximum stretch as fraction of feature range. Returns ------- stretched_rules : list of FuzzyRule Rules with stretched boundaries. """ # This is a simplified version - full FURIA uses more sophisticated stretching return rules # Return as-is for now
[docs] class FURIAClassifier(ClassifierMixin, BaseEstimator): """Fuzzy Unordered Rule Induction Algorithm (FURIA) classifier. FURIA learns a set of fuzzy rules for classification. It extends RIPPER with fuzzy boundaries that allow soft decision regions. Parameters ---------- max_rules : int, default=50 Maximum number of rules to learn. min_support : int, default=2 Minimum number of positive examples a rule must cover. max_conditions : int, default=10 Maximum number of conditions per rule. fuzzify : bool, default=True Whether to fuzzify rules after learning. rule_stretching : bool, default=True Whether to use rule stretching for uncovered instances. random_state : int or None, default=None Random seed for reproducibility. Attributes ---------- classes_ : ndarray Unique class labels. rules_ : list of FuzzyRule Learned fuzzy rules. n_features_in_ : int Number of features seen during fit. feature_names_ : list of str Feature names. Examples -------- >>> from endgame.models.rules import FURIAClassifier >>> clf = FURIAClassifier(max_rules=30) >>> clf.fit(X_train, y_train) >>> predictions = clf.predict(X_test) >>> print(clf.get_rules_str()) """ def __init__( self, max_rules: int = 50, min_support: int = 2, max_conditions: int = 10, fuzzify: bool = True, rule_stretching: bool = True, random_state: int | None = None, ): self.max_rules = max_rules self.min_support = min_support self.max_conditions = max_conditions self.fuzzify = fuzzify self.rule_stretching = rule_stretching self.random_state = random_state
[docs] def fit( self, X: ArrayLike, y: ArrayLike, feature_names: list[str] | None = None, ) -> FURIAClassifier: """Fit the FURIA classifier. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data. y : array-like of shape (n_samples,) Target labels. feature_names : list of str, optional Names for the features. Returns ------- self : FURIAClassifier Fitted classifier. """ if feature_names is not None: _names = list(feature_names) elif hasattr(X, "columns"): _names = list(X.columns) else: _names = None X, y = check_X_y(X, y) if self.random_state is not None: np.random.seed(self.random_state) self.n_features_in_ = X.shape[1] self.feature_names_ = _names or [f"X{i}" for i in range(self.n_features_in_)] # Encode labels self._label_encoder = LabelEncoder() y_encoded = self._label_encoder.fit_transform(y) self.classes_ = self._label_encoder.classes_ self.n_classes_ = len(self.classes_) # Learn rules for each class self.rules_: list[FuzzyRule] = [] for class_idx in range(self.n_classes_): # Sequential covering for this class remaining = np.ones(len(y_encoded), dtype=bool) positive = (y_encoded == class_idx) n_positive = positive.sum() class_rules = [] while remaining.sum() > 0 and (remaining & positive).sum() > 0: if len(class_rules) >= self.max_rules // self.n_classes_: break # Grow a rule on remaining data X_remaining = X[remaining] y_remaining = y_encoded[remaining] rule, covered_remaining = _grow_rule( X_remaining, y_remaining, target_class=class_idx, feature_names=self.feature_names_, min_support=self.min_support, max_conditions=self.max_conditions, ) if rule.support < self.min_support: break # Map covered back to original indices remaining_indices = np.where(remaining)[0] covered_original = np.zeros(len(y_encoded), dtype=bool) covered_original[remaining_indices[covered_remaining]] = True # Fuzzify the rule if self.fuzzify: rule = _fuzzify_rule(rule, X, y_encoded) class_rules.append(rule) # Remove covered positive examples remaining[covered_original & positive] = False self.rules_.extend(class_rules) # Rule stretching if self.rule_stretching: self.rules_ = _stretch_rules(self.rules_, X, y_encoded) return self
[docs] def predict_proba(self, X: ArrayLike) -> np.ndarray: """Predict class probabilities. Uses the fuzzy firing strengths to compute class probabilities. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- proba : ndarray of shape (n_samples, n_classes) Class probabilities. """ check_is_fitted(self, 'rules_') X = check_array(X) n_samples = X.shape[0] # Compute weighted vote for each class class_scores = np.zeros((n_samples, self.n_classes_)) for rule in self.rules_: # Get firing strength for each sample strength = rule.firing_strength(X) # Add weighted vote class_scores[:, rule.consequent] += strength * rule.weight # Handle samples with no rule coverage (assign uniform) total_score = class_scores.sum(axis=1, keepdims=True) no_coverage = total_score.ravel() == 0 if np.any(no_coverage): class_scores[no_coverage] = 1.0 / self.n_classes_ total_score[no_coverage] = 1.0 # Normalize to probabilities proba = class_scores / (total_score + 1e-10) return proba
[docs] def predict(self, X: ArrayLike) -> np.ndarray: """Predict class labels. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- y_pred : ndarray of shape (n_samples,) Predicted class labels. """ proba = self.predict_proba(X) return self._label_encoder.inverse_transform(np.argmax(proba, axis=1))
[docs] def get_rules(self) -> list[FuzzyRule]: """Get the learned rules. Returns ------- rules : list of FuzzyRule The learned fuzzy rules. """ check_is_fitted(self, 'rules_') return self.rules_
[docs] def get_rules_str(self) -> str: """Get a human-readable string representation of the rules. Returns ------- rules_str : str String representation of all rules. """ check_is_fitted(self, 'rules_') lines = [f"FURIA Rules ({len(self.rules_)} rules):"] lines.append("-" * 60) for i, rule in enumerate(self.rules_, 1): # Convert class index to label class_label = self._label_encoder.inverse_transform([rule.consequent])[0] if not rule.conditions: antecedent = "TRUE" else: antecedent = " AND ".join(str(c) for c in rule.conditions) lines.append( f"R{i}: IF {antecedent}\n" f" THEN class={class_label} " f"(weight={rule.weight:.3f}, support={rule.support})" ) return "\n".join(lines)
[docs] def get_rule_importance(self) -> np.ndarray: """Get feature importance based on rule usage. Returns ------- importance : ndarray of shape (n_features,) Feature importance scores. """ check_is_fitted(self, 'rules_') importance = np.zeros(self.n_features_in_) for rule in self.rules_: for cond in rule.conditions: # Weight by rule weight and support importance[cond.feature_idx] += rule.weight * rule.support # Normalize if importance.sum() > 0: importance = importance / importance.sum() return importance
@property def feature_importances_(self) -> np.ndarray: """Feature importance scores (sklearn compatible).""" return self.get_rule_importance()