This commit is contained in:
@@ -0,0 +1,413 @@
|
||||
"""
|
||||
Calibration Module for XGBoost Models
|
||||
=====================================
|
||||
Calibrates raw probabilities from XGBoost models using Isotonic Regression.
|
||||
Ensures that a predicted probability of 70% actually corresponds to a 70% win rate.
|
||||
|
||||
Usage:
|
||||
from ai_engine.models.calibration import Calibrator
|
||||
calibrator = Calibrator()
|
||||
calibrated_prob = calibrator.calibrate("ms", raw_prob)
|
||||
|
||||
# Training new calibration models:
|
||||
calibrator.train_calibration(valid_df, market="ms")
|
||||
"""
|
||||
|
||||
import os
|
||||
import pickle
|
||||
import json
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
from sklearn.isotonic import IsotonicRegression
|
||||
from sklearn.calibration import calibration_curve
|
||||
from sklearn.metrics import brier_score_loss
|
||||
|
||||
AI_ENGINE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
CALIBRATION_DIR = os.path.join(AI_ENGINE_DIR, "models", "calibration")
|
||||
|
||||
os.makedirs(CALIBRATION_DIR, exist_ok=True)
|
||||
|
||||
# Supported markets for calibration
|
||||
SUPPORTED_MARKETS = [
|
||||
"ms", # Match Result (1X2) - multi-class, calibrated per class
|
||||
"ms_home", # Standard Home win probability
|
||||
"ms_home_heavy_fav", # Context: home odds <= 1.40
|
||||
"ms_home_fav", # Context: 1.40 < home odds <= 1.80
|
||||
"ms_home_balanced", # Context: 1.80 < home odds <= 2.50
|
||||
"ms_home_underdog", # Context: home odds > 2.50
|
||||
"ms_draw", # Draw probability
|
||||
"ms_away", # Away win probability
|
||||
"ou15", # Over/Under 1.5
|
||||
"ou25", # Over/Under 2.5
|
||||
"ou35", # Over/Under 3.5
|
||||
"btts", # Both Teams to Score
|
||||
"ht_ft", # Half-Time/Full-Time
|
||||
"dc", # Double Chance
|
||||
"ht", # Half-Time Result
|
||||
]
|
||||
|
||||
|
||||
class CalibrationMetrics:
|
||||
"""Stores calibration quality metrics for a market."""
|
||||
|
||||
def __init__(self):
|
||||
self.brier_score: float = 0.0
|
||||
self.calibration_error: float = 0.0
|
||||
self.sample_count: int = 0
|
||||
self.last_trained: str = ""
|
||||
self.mean_predicted: float = 0.0
|
||||
self.mean_actual: float = 0.0
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
return {
|
||||
"brier_score": round(self.brier_score, 4),
|
||||
"calibration_error": round(self.calibration_error, 4),
|
||||
"sample_count": self.sample_count,
|
||||
"last_trained": self.last_trained,
|
||||
"mean_predicted": round(self.mean_predicted, 4),
|
||||
"mean_actual": round(self.mean_actual, 4),
|
||||
}
|
||||
|
||||
|
||||
class Calibrator:
|
||||
"""
|
||||
Probability calibration using Isotonic Regression.
|
||||
|
||||
Isotonic Regression is a non-parametric method that fits a piecewise
|
||||
constant function that is monotonically increasing. It's ideal for
|
||||
calibrating probabilities because:
|
||||
|
||||
1. It preserves ranking (if P(A) > P(B) before, P(A) > P(B) after)
|
||||
2. It doesn't assume a specific distribution shape
|
||||
3. It can correct systematic over/under-confidence
|
||||
|
||||
Example:
|
||||
# Before calibration: model predicts 70% but actual win rate is 60%
|
||||
# After calibration: model predicts 70% → calibrated to 60%
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.calibrators: Dict[str, IsotonicRegression] = {}
|
||||
self.metrics: Dict[str, CalibrationMetrics] = {}
|
||||
self.heuristic_fallback: Dict[str, float] = {
|
||||
"ms": 0.90,
|
||||
"ms_home": 0.90,
|
||||
"ms_home_heavy_fav": 0.95,
|
||||
"ms_home_fav": 0.90,
|
||||
"ms_home_balanced": 0.85,
|
||||
"ms_home_underdog": 0.80,
|
||||
"ms_draw": 0.90,
|
||||
"ms_away": 0.90,
|
||||
"ou15": 0.90,
|
||||
"ou25": 0.90,
|
||||
"ou35": 0.90,
|
||||
"btts": 0.90,
|
||||
"ht_ft": 0.85,
|
||||
"dc": 0.93,
|
||||
"ht": 0.85,
|
||||
}
|
||||
self._load_calibrators()
|
||||
|
||||
def _load_calibrators(self):
|
||||
"""Load trained calibrators for each market from disk."""
|
||||
for market in SUPPORTED_MARKETS:
|
||||
model_path = os.path.join(CALIBRATION_DIR, f"{market}_calibrator.pkl")
|
||||
metrics_path = os.path.join(CALIBRATION_DIR, f"{market}_metrics.json")
|
||||
|
||||
if os.path.exists(model_path):
|
||||
try:
|
||||
with open(model_path, "rb") as f:
|
||||
self.calibrators[market] = pickle.load(f)
|
||||
print(f"[Calibrator] Loaded calibration model for {market}")
|
||||
except Exception as e:
|
||||
print(f"[Calibrator] Warning: Failed to load {market}: {e}")
|
||||
|
||||
if os.path.exists(metrics_path):
|
||||
try:
|
||||
with open(metrics_path, "r") as f:
|
||||
data = json.load(f)
|
||||
metrics = CalibrationMetrics()
|
||||
metrics.brier_score = data.get("brier_score", 0.0)
|
||||
metrics.calibration_error = data.get("calibration_error", 0.0)
|
||||
metrics.sample_count = data.get("sample_count", 0)
|
||||
metrics.last_trained = data.get("last_trained", "")
|
||||
metrics.mean_predicted = data.get("mean_predicted", 0.0)
|
||||
metrics.mean_actual = data.get("mean_actual", 0.0)
|
||||
self.metrics[market] = metrics
|
||||
except Exception as e:
|
||||
print(f"[Calibrator] Warning: Failed to load metrics for {market}: {e}")
|
||||
|
||||
def calibrate(self, market_type: str, raw_prob: float, odds_val: Optional[float] = None) -> float:
|
||||
"""
|
||||
Calibrate a raw probability using Isotonic Regression.
|
||||
|
||||
Args:
|
||||
market_type (str): 'ms_home', 'ou25', 'btts', 'ht_ft', etc.
|
||||
raw_prob (float): The raw probability from XGBoost (0.0 - 1.0)
|
||||
odds_val (float, optional): The pre-match odds, used for context-aware bucket mapping
|
||||
|
||||
Returns:
|
||||
float: Calibrated probability (0.0 - 1.0)
|
||||
"""
|
||||
# Normalize market type
|
||||
market_key = market_type.lower().replace("-", "_")
|
||||
|
||||
# Route to bucket if ms_home and odds provided
|
||||
if market_key == "ms_home" and odds_val is not None and odds_val > 1.0:
|
||||
if odds_val <= 1.40:
|
||||
bucket_key = "ms_home_heavy_fav"
|
||||
elif odds_val <= 1.80:
|
||||
bucket_key = "ms_home_fav"
|
||||
elif odds_val <= 2.50:
|
||||
bucket_key = "ms_home_balanced"
|
||||
else:
|
||||
bucket_key = "ms_home_underdog"
|
||||
|
||||
if bucket_key in self.calibrators:
|
||||
market_key = bucket_key
|
||||
|
||||
# If we have a trained Isotonic Regression model, use it
|
||||
if market_key in self.calibrators:
|
||||
try:
|
||||
calibrated = self.calibrators[market_key].predict([raw_prob])[0]
|
||||
# Ensure output is valid probability
|
||||
return float(np.clip(calibrated, 0.01, 0.99))
|
||||
except Exception as e:
|
||||
print(f"[Calibrator] Warning: Isotonic failed for {market_key}: {e}")
|
||||
# Fall through to heuristic
|
||||
|
||||
# Fallback to heuristic calibration
|
||||
return self._heuristic_calibrate(market_key, raw_prob)
|
||||
|
||||
def _heuristic_calibrate(self, market_type: str, raw_prob: float) -> float:
|
||||
"""
|
||||
Heuristic calibration fallback when no trained model exists.
|
||||
|
||||
This applies a conservative shrinkage towards the mean:
|
||||
- Binary markets (OU, BTTS): shrink towards 0.5
|
||||
- Multi-class (MS): shrink towards 0.33
|
||||
- HT/FT: stronger shrinkage due to higher variance
|
||||
"""
|
||||
# Get shrinkage factor for this market
|
||||
shrinkage = self.heuristic_fallback.get(market_type, 0.90)
|
||||
|
||||
if market_type in ["ms", "ms_home", "ms_home_heavy_fav", "ms_home_fav", "ms_home_balanced", "ms_home_underdog", "ms_draw", "ms_away"]:
|
||||
# Pull towards 0.33 (uniform for 3-class)
|
||||
return (raw_prob * shrinkage) + (0.33 * (1.0 - shrinkage))
|
||||
|
||||
elif market_type in ["ou15", "ou25", "ou35", "btts"]:
|
||||
# Pull towards 0.5 (uniform for binary)
|
||||
return (raw_prob * shrinkage) + (0.5 * (1.0 - shrinkage))
|
||||
|
||||
elif market_type in ["ht_ft", "ht"]:
|
||||
# Stronger shrinkage for high-variance markets
|
||||
return raw_prob * shrinkage
|
||||
|
||||
elif market_type == "dc":
|
||||
# Double chance is more reliable
|
||||
return (raw_prob * shrinkage) + (0.66 * (1.0 - shrinkage))
|
||||
|
||||
return raw_prob
|
||||
|
||||
def train_calibration(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
market: str,
|
||||
prob_col: str,
|
||||
actual_col: str,
|
||||
min_samples: int = 100,
|
||||
save: bool = True,
|
||||
) -> CalibrationMetrics:
|
||||
"""
|
||||
Train an Isotonic Regression calibration model for a specific market.
|
||||
|
||||
Args:
|
||||
df: DataFrame with predictions and actual outcomes
|
||||
market: Market identifier (e.g., 'ms_home', 'ou25', 'btts')
|
||||
prob_col: Column name for raw probabilities
|
||||
actual_col: Column name for actual outcomes (0 or 1)
|
||||
min_samples: Minimum samples required to train
|
||||
save: Whether to save the model to disk
|
||||
|
||||
Returns:
|
||||
CalibrationMetrics with quality metrics
|
||||
"""
|
||||
# Filter valid data
|
||||
valid_df = df[[prob_col, actual_col]].dropna()
|
||||
n_samples = len(valid_df)
|
||||
|
||||
if n_samples < min_samples:
|
||||
print(f"[Calibrator] Warning: Only {n_samples} samples for {market}, "
|
||||
f"need at least {min_samples}")
|
||||
metrics = CalibrationMetrics()
|
||||
metrics.sample_count = n_samples
|
||||
return metrics
|
||||
|
||||
# Extract arrays
|
||||
raw_probs = valid_df[prob_col].values
|
||||
actuals = valid_df[actual_col].values
|
||||
|
||||
# Train Isotonic Regression
|
||||
iso = IsotonicRegression(out_of_bounds="clip", increasing=True)
|
||||
iso.fit(raw_probs, actuals)
|
||||
|
||||
# Calculate calibrated probabilities
|
||||
calibrated_probs = iso.predict(raw_probs)
|
||||
|
||||
# Calculate metrics
|
||||
metrics = CalibrationMetrics()
|
||||
metrics.sample_count = n_samples
|
||||
metrics.last_trained = datetime.utcnow().isoformat()
|
||||
metrics.brier_score = brier_score_loss(actuals, calibrated_probs)
|
||||
metrics.mean_predicted = np.mean(raw_probs)
|
||||
metrics.mean_actual = np.mean(actuals)
|
||||
|
||||
# Calculate Expected Calibration Error (ECE)
|
||||
metrics.calibration_error = self._calculate_ece(
|
||||
calibrated_probs, actuals, n_bins=10
|
||||
)
|
||||
|
||||
# Store in memory
|
||||
self.calibrators[market] = iso
|
||||
self.metrics[market] = metrics
|
||||
|
||||
# Save to disk
|
||||
if save:
|
||||
self._save_calibration(market, iso, metrics)
|
||||
|
||||
print(f"[Calibrator] Trained {market}: "
|
||||
f"Brier={metrics.brier_score:.4f}, "
|
||||
f"ECE={metrics.calibration_error:.4f}, "
|
||||
f"n={n_samples}")
|
||||
|
||||
return metrics
|
||||
|
||||
def train_all_markets(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
market_config: Dict[str, Tuple[str, str]],
|
||||
min_samples: int = 100,
|
||||
) -> Dict[str, CalibrationMetrics]:
|
||||
"""
|
||||
Train calibration models for multiple markets at once.
|
||||
|
||||
Args:
|
||||
df: DataFrame with all predictions and outcomes
|
||||
market_config: Dict mapping market -> (prob_col, actual_col)
|
||||
e.g., {'ou25': ('ou25_over_prob', 'ou25_over_actual')}
|
||||
min_samples: Minimum samples per market
|
||||
|
||||
Returns:
|
||||
Dict of market -> CalibrationMetrics
|
||||
"""
|
||||
results = {}
|
||||
|
||||
for market, (prob_col, actual_col) in market_config.items():
|
||||
print(f"\n[Calibrator] Training {market}...")
|
||||
try:
|
||||
metrics = self.train_calibration(
|
||||
df=df,
|
||||
market=market,
|
||||
prob_col=prob_col,
|
||||
actual_col=actual_col,
|
||||
min_samples=min_samples,
|
||||
save=True,
|
||||
)
|
||||
results[market] = metrics
|
||||
except Exception as e:
|
||||
print(f"[Calibrator] Failed to train {market}: {e}")
|
||||
|
||||
return results
|
||||
|
||||
def _calculate_ece(
|
||||
self,
|
||||
probs: np.ndarray,
|
||||
actuals: np.ndarray,
|
||||
n_bins: int = 10
|
||||
) -> float:
|
||||
"""
|
||||
Calculate Expected Calibration Error (ECE).
|
||||
|
||||
ECE = sum(|bin_accuracy - bin_confidence| * bin_weight)
|
||||
|
||||
Lower is better. Perfect calibration = 0.
|
||||
"""
|
||||
bin_boundaries = np.linspace(0, 1, n_bins + 1)
|
||||
ece = 0.0
|
||||
|
||||
for i in range(n_bins):
|
||||
in_bin = (probs >= bin_boundaries[i]) & (probs < bin_boundaries[i + 1])
|
||||
prop_in_bin = np.mean(in_bin)
|
||||
|
||||
if prop_in_bin > 0:
|
||||
accuracy_in_bin = np.mean(actuals[in_bin])
|
||||
avg_confidence_in_bin = np.mean(probs[in_bin])
|
||||
ece += np.abs(accuracy_in_bin - avg_confidence_in_bin) * prop_in_bin
|
||||
|
||||
return ece
|
||||
|
||||
def _save_calibration(
|
||||
self,
|
||||
market: str,
|
||||
calibrator: IsotonicRegression,
|
||||
metrics: CalibrationMetrics
|
||||
):
|
||||
"""Save calibration model and metrics to disk."""
|
||||
# Save model
|
||||
model_path = os.path.join(CALIBRATION_DIR, f"{market}_calibrator.pkl")
|
||||
with open(model_path, "wb") as f:
|
||||
pickle.dump(calibrator, f)
|
||||
|
||||
# Save metrics
|
||||
metrics_path = os.path.join(CALIBRATION_DIR, f"{market}_metrics.json")
|
||||
with open(metrics_path, "w") as f:
|
||||
json.dump(metrics.to_dict(), f, indent=2)
|
||||
|
||||
print(f"[Calibrator] Saved {market} to {CALIBRATION_DIR}")
|
||||
|
||||
def get_calibration_report(self) -> Dict[str, Any]:
|
||||
"""Generate a summary report of all calibration models."""
|
||||
report = {
|
||||
"trained_markets": list(self.calibrators.keys()),
|
||||
"metrics": {},
|
||||
"heuristic_only": [],
|
||||
}
|
||||
|
||||
for market in SUPPORTED_MARKETS:
|
||||
if market in self.metrics:
|
||||
report["metrics"][market] = self.metrics[market].to_dict()
|
||||
elif market not in self.calibrators:
|
||||
report["heuristic_only"].append(market)
|
||||
|
||||
return report
|
||||
|
||||
def get_calibrated_probabilities(
|
||||
self,
|
||||
market: str,
|
||||
raw_probs: np.ndarray
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Batch calibration for array of probabilities.
|
||||
|
||||
Args:
|
||||
market: Market type
|
||||
raw_probs: Array of raw probabilities
|
||||
|
||||
Returns:
|
||||
Array of calibrated probabilities
|
||||
"""
|
||||
return np.array([self.calibrate(market, p) for p in raw_probs])
|
||||
|
||||
|
||||
# Singleton instance
|
||||
_calibrator_instance: Optional[Calibrator] = None
|
||||
|
||||
|
||||
def get_calibrator() -> Calibrator:
|
||||
"""Get or create the global Calibrator instance."""
|
||||
global _calibrator_instance
|
||||
if _calibrator_instance is None:
|
||||
_calibrator_instance = Calibrator()
|
||||
return _calibrator_instance
|
||||
Reference in New Issue
Block a user