Files
iddaai-be/ai-engine/models/calibration.py
T
fahricansecer 671979b07d
Deploy Iddaai Backend / build-and-deploy (push) Successful in 35s
gg2
2026-05-29 13:35:17 +03:00

570 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Calibration Module for XGBoost Models
=====================================
Calibrates raw probabilities from XGBoost models using Isotonic Regression.
Ensures that a predicted probability of 70% actually corresponds to a 70% win rate.
Usage:
from ai_engine.models.calibration import Calibrator
calibrator = Calibrator()
calibrated_prob = calibrator.calibrate("ms", raw_prob)
# Training new calibration models:
calibrator.train_calibration(valid_df, market="ms")
"""
import os
import pickle
import json
import numpy as np
import pandas as pd
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
from sklearn.isotonic import IsotonicRegression
from sklearn.calibration import calibration_curve
from sklearn.metrics import brier_score_loss
AI_ENGINE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CALIBRATION_DIR = os.path.join(AI_ENGINE_DIR, "models", "calibration")
os.makedirs(CALIBRATION_DIR, exist_ok=True)
# Supported markets for calibration
SUPPORTED_MARKETS = [
"ms", # Match Result (1X2) - multi-class, calibrated per class
"ms_home", # Standard Home win probability
"ms_home_heavy_fav", # Context: home odds <= 1.40
"ms_home_fav", # Context: 1.40 < home odds <= 1.80
"ms_home_balanced", # Context: 1.80 < home odds <= 2.50
"ms_home_underdog", # Context: home odds > 2.50
"ms_draw", # Draw probability
"ms_away", # Away win probability
"ou15", # Over/Under 1.5
"ou25", # Over/Under 2.5
"ou35", # Over/Under 3.5
"btts", # Both Teams to Score
"ht_ft", # Half-Time/Full-Time
"dc", # Double Chance
"ht", # Half-Time Result
"ht_home", # Half-Time Home win
"ht_draw", # Half-Time Draw
"ht_away", # Half-Time Away win
]
class CalibrationMetrics:
"""Stores calibration quality metrics for a market."""
def __init__(self):
self.brier_score: float = 0.0
self.calibration_error: float = 0.0
self.sample_count: int = 0
self.last_trained: str = ""
self.mean_predicted: float = 0.0
self.mean_actual: float = 0.0
def to_dict(self) -> Dict:
return {
"brier_score": round(self.brier_score, 4),
"calibration_error": round(self.calibration_error, 4),
"sample_count": self.sample_count,
"last_trained": self.last_trained,
"mean_predicted": round(self.mean_predicted, 4),
"mean_actual": round(self.mean_actual, 4),
}
class Calibrator:
"""
Probability calibration using Isotonic Regression.
Isotonic Regression is a non-parametric method that fits a piecewise
constant function that is monotonically increasing. It's ideal for
calibrating probabilities because:
1. It preserves ranking (if P(A) > P(B) before, P(A) > P(B) after)
2. It doesn't assume a specific distribution shape
3. It can correct systematic over/under-confidence
Example:
# Before calibration: model predicts 70% but actual win rate is 60%
# After calibration: model predicts 70% → calibrated to 60%
"""
def __init__(self):
self.calibrators: Dict[str, IsotonicRegression] = {}
self.metrics: Dict[str, CalibrationMetrics] = {}
# Less aggressive shrinkage — only meaningful overconfident bands are pulled.
# Default raised from ~0.85-0.90 to 0.95+ since the orchestrator and config
# already apply market-level multipliers; double-shrinkage was the root cause
# of 24-35pt avg calibrated-vs-raw drops in production traces.
self.heuristic_fallback: Dict[str, float] = {
"ms": 0.96,
"ms_home": 0.96,
"ms_home_heavy_fav": 0.98,
"ms_home_fav": 0.96,
"ms_home_balanced": 0.94,
"ms_home_underdog": 0.92,
"ms_draw": 0.94,
"ms_away": 0.96,
"ou15": 0.96,
"ou25": 0.96,
"ou35": 0.94,
"btts": 0.96,
"ht_ft": 0.92,
"dc": 0.97,
"ht": 0.92,
"ht_home": 0.92,
"ht_draw": 0.92,
"ht_away": 0.92,
}
self._load_calibrators()
def _load_calibrators(self):
"""Load trained calibrators for each market from disk."""
for market in SUPPORTED_MARKETS:
model_path = os.path.join(CALIBRATION_DIR, f"{market}_calibrator.pkl")
metrics_path = os.path.join(CALIBRATION_DIR, f"{market}_metrics.json")
if os.path.exists(model_path):
try:
with open(model_path, "rb") as f:
self.calibrators[market] = pickle.load(f)
print(f"[Calibrator] Loaded calibration model for {market}")
except Exception as e:
print(f"[Calibrator] Warning: Failed to load {market}: {e}")
if os.path.exists(metrics_path):
try:
with open(metrics_path, "r") as f:
data = json.load(f)
metrics = CalibrationMetrics()
metrics.brier_score = data.get("brier_score", 0.0)
metrics.calibration_error = data.get("calibration_error", 0.0)
metrics.sample_count = data.get("sample_count", 0)
metrics.last_trained = data.get("last_trained", "")
metrics.mean_predicted = data.get("mean_predicted", 0.0)
metrics.mean_actual = data.get("mean_actual", 0.0)
self.metrics[market] = metrics
except Exception as e:
print(f"[Calibrator] Warning: Failed to load metrics for {market}: {e}")
# Below this sample count, the isotonic model is treated as untrained
# (raw_prob is returned). Between MIN and FLOOR we ramp from 0 to ~15%
# trust. Between FLOOR and CEILING we ramp to full trust.
# Rationale: 12-sample calibrators are statistical noise; even 30%
# blending on them propagates the noise into the confidence value the
# betting_brain reads downstream.
HARD_MIN_SAMPLES = 50
TRUSTED_SAMPLE_FLOOR = 100
TRUSTED_SAMPLE_CEILING = 400
# Hard cap on how far calibration can move probability in either direction.
MAX_DELTA = 0.20
def calibrate(self, market_type: str, raw_prob: float, odds_val: Optional[float] = None) -> float:
"""
Calibrate a raw probability using Isotonic Regression with safeguards.
Args:
market_type (str): 'ms_home', 'ou25', 'btts', 'ht_ft', etc.
raw_prob (float): The raw probability from XGBoost (0.0 - 1.0)
odds_val (float, optional): The pre-match odds, used for context-aware bucket mapping
Returns:
float: Calibrated probability (0.0 - 1.0)
Safeguards:
* Low-sample trained models are blended with raw_prob to dampen overfit.
* MAX_DELTA caps the per-call adjustment (prevents 40pp swings).
"""
# Normalize market type
market_key = market_type.lower().replace("-", "_")
# Route to bucket if ms_home and odds provided
if market_key == "ms_home" and odds_val is not None and odds_val > 1.0:
if odds_val <= 1.40:
bucket_key = "ms_home_heavy_fav"
elif odds_val <= 1.80:
bucket_key = "ms_home_fav"
elif odds_val <= 2.50:
bucket_key = "ms_home_balanced"
else:
bucket_key = "ms_home_underdog"
if bucket_key in self.calibrators:
market_key = bucket_key
# If we have a trained Isotonic Regression model, use it (with safeguards)
if market_key in self.calibrators:
try:
iso_pred = float(self.calibrators[market_key].predict([raw_prob])[0])
# Sample-count weighted blend with raw probability.
# Sparse models barely move probability; mature models dominate.
metrics = self.metrics.get(market_key)
n_samples = metrics.sample_count if metrics else 0
if n_samples < self.HARD_MIN_SAMPLES:
# Below 50 samples isotonic fit is unreliable — bypass it
# entirely and return raw_prob. The heuristic shrinkage
# below would still apply a model-version multiplier elsewhere.
return float(np.clip(raw_prob, 0.01, 0.99))
if n_samples >= self.TRUSTED_SAMPLE_CEILING:
iso_weight = 1.0
elif n_samples <= self.TRUSTED_SAMPLE_FLOOR:
# Linear ramp from 0% at HARD_MIN_SAMPLES to ~25% at FLOOR
span = self.TRUSTED_SAMPLE_FLOOR - self.HARD_MIN_SAMPLES
iso_weight = 0.25 * (n_samples - self.HARD_MIN_SAMPLES) / span
else:
# Linearly ramp 25% → 100% between floor and ceiling
span = self.TRUSTED_SAMPLE_CEILING - self.TRUSTED_SAMPLE_FLOOR
iso_weight = 0.25 + 0.75 * (n_samples - self.TRUSTED_SAMPLE_FLOOR) / span
blended = iso_weight * iso_pred + (1.0 - iso_weight) * raw_prob
# Cap delta to avoid huge swings on noisy calibrators
delta = blended - raw_prob
if delta > self.MAX_DELTA:
blended = raw_prob + self.MAX_DELTA
elif delta < -self.MAX_DELTA:
blended = raw_prob - self.MAX_DELTA
return float(np.clip(blended, 0.01, 0.99))
except Exception as e:
print(f"[Calibrator] Warning: Isotonic failed for {market_key}: {e}")
# Fall through to heuristic
# Fallback to heuristic calibration
return self._heuristic_calibrate(market_key, raw_prob)
def _heuristic_calibrate(self, market_type: str, raw_prob: float) -> float:
"""
Heuristic calibration fallback when no trained model exists.
This applies a conservative shrinkage towards the mean:
- Binary markets (OU, BTTS): shrink towards 0.5
- Multi-class (MS): shrink towards 0.33
- HT/FT: stronger shrinkage due to higher variance
"""
# Get shrinkage factor for this market
shrinkage = self.heuristic_fallback.get(market_type, 0.90)
if market_type in ["ms", "ms_home", "ms_home_heavy_fav", "ms_home_fav", "ms_home_balanced", "ms_home_underdog", "ms_draw", "ms_away"]:
# Pull towards 0.33 (uniform for 3-class)
return (raw_prob * shrinkage) + (0.33 * (1.0 - shrinkage))
elif market_type in ["ou15", "ou25", "ou35", "btts"]:
# Pull towards 0.5 (uniform for binary)
return (raw_prob * shrinkage) + (0.5 * (1.0 - shrinkage))
elif market_type in ["ht_ft", "ht"]:
# Stronger shrinkage for high-variance markets
return raw_prob * shrinkage
elif market_type == "dc":
# Double chance is more reliable
return (raw_prob * shrinkage) + (0.66 * (1.0 - shrinkage))
return raw_prob
def train_calibration(
self,
df: pd.DataFrame,
market: str,
prob_col: str,
actual_col: str,
min_samples: int = 100,
save: bool = True,
) -> CalibrationMetrics:
"""
Train an Isotonic Regression calibration model for a specific market.
Args:
df: DataFrame with predictions and actual outcomes
market: Market identifier (e.g., 'ms_home', 'ou25', 'btts')
prob_col: Column name for raw probabilities
actual_col: Column name for actual outcomes (0 or 1)
min_samples: Minimum samples required to train
save: Whether to save the model to disk
Returns:
CalibrationMetrics with quality metrics
"""
# Filter valid data
valid_df = df[[prob_col, actual_col]].dropna()
n_samples = len(valid_df)
if n_samples < min_samples:
print(f"[Calibrator] Warning: Only {n_samples} samples for {market}, "
f"need at least {min_samples}")
metrics = CalibrationMetrics()
metrics.sample_count = n_samples
return metrics
# Extract arrays
raw_probs = valid_df[prob_col].values
actuals = valid_df[actual_col].values
# Train Isotonic Regression
iso = IsotonicRegression(out_of_bounds="clip", increasing=True)
iso.fit(raw_probs, actuals)
# Calculate calibrated probabilities
calibrated_probs = iso.predict(raw_probs)
# Calculate metrics
metrics = CalibrationMetrics()
metrics.sample_count = n_samples
metrics.last_trained = datetime.utcnow().isoformat()
metrics.brier_score = brier_score_loss(actuals, calibrated_probs)
metrics.mean_predicted = np.mean(raw_probs)
metrics.mean_actual = np.mean(actuals)
# Calculate Expected Calibration Error (ECE)
metrics.calibration_error = self._calculate_ece(
calibrated_probs, actuals, n_bins=10
)
# Store in memory
self.calibrators[market] = iso
self.metrics[market] = metrics
# Save to disk
if save:
self._save_calibration(market, iso, metrics)
print(f"[Calibrator] Trained {market}: "
f"Brier={metrics.brier_score:.4f}, "
f"ECE={metrics.calibration_error:.4f}, "
f"n={n_samples}")
return metrics
def train_all_markets(
self,
df: pd.DataFrame,
market_config: Dict[str, Tuple[str, str]],
min_samples: int = 100,
) -> Dict[str, CalibrationMetrics]:
"""
Train calibration models for multiple markets at once.
Args:
df: DataFrame with all predictions and outcomes
market_config: Dict mapping market -> (prob_col, actual_col)
e.g., {'ou25': ('ou25_over_prob', 'ou25_over_actual')}
min_samples: Minimum samples per market
Returns:
Dict of market -> CalibrationMetrics
"""
results = {}
for market, (prob_col, actual_col) in market_config.items():
print(f"\n[Calibrator] Training {market}...")
try:
metrics = self.train_calibration(
df=df,
market=market,
prob_col=prob_col,
actual_col=actual_col,
min_samples=min_samples,
save=True,
)
results[market] = metrics
except Exception as e:
print(f"[Calibrator] Failed to train {market}: {e}")
return results
def _calculate_ece(
self,
probs: np.ndarray,
actuals: np.ndarray,
n_bins: int = 10
) -> float:
"""
Calculate Expected Calibration Error (ECE).
ECE = sum(|bin_accuracy - bin_confidence| * bin_weight)
Lower is better. Perfect calibration = 0.
"""
bin_boundaries = np.linspace(0, 1, n_bins + 1)
ece = 0.0
for i in range(n_bins):
in_bin = (probs >= bin_boundaries[i]) & (probs < bin_boundaries[i + 1])
prop_in_bin = np.mean(in_bin)
if prop_in_bin > 0:
accuracy_in_bin = np.mean(actuals[in_bin])
avg_confidence_in_bin = np.mean(probs[in_bin])
ece += np.abs(accuracy_in_bin - avg_confidence_in_bin) * prop_in_bin
return ece
def _save_calibration(
self,
market: str,
calibrator: IsotonicRegression,
metrics: CalibrationMetrics
):
"""Save calibration model and metrics to disk."""
# Save model
model_path = os.path.join(CALIBRATION_DIR, f"{market}_calibrator.pkl")
with open(model_path, "wb") as f:
pickle.dump(calibrator, f)
# Save metrics
metrics_path = os.path.join(CALIBRATION_DIR, f"{market}_metrics.json")
with open(metrics_path, "w") as f:
json.dump(metrics.to_dict(), f, indent=2)
print(f"[Calibrator] Saved {market} to {CALIBRATION_DIR}")
def get_calibration_report(self) -> Dict[str, Any]:
"""Generate a summary report of all calibration models."""
report = {
"trained_markets": list(self.calibrators.keys()),
"metrics": {},
"heuristic_only": [],
}
for market in SUPPORTED_MARKETS:
if market in self.metrics:
report["metrics"][market] = self.metrics[market].to_dict()
elif market not in self.calibrators:
report["heuristic_only"].append(market)
return report
def get_calibrated_probabilities(
self,
market: str,
raw_probs: np.ndarray
) -> np.ndarray:
"""
Batch calibration for array of probabilities.
Args:
market: Market type
raw_probs: Array of raw probabilities
Returns:
Array of calibrated probabilities
"""
return np.array([self.calibrate(market, p) for p in raw_probs])
# Singleton instance
_calibrator_instance: Optional[Calibrator] = None
def get_calibrator() -> Calibrator:
"""Get or create the global Calibrator instance."""
global _calibrator_instance
if _calibrator_instance is None:
_calibrator_instance = Calibrator()
return _calibrator_instance
# ── FINAL-OUTPUT RECALIBRATION LAYER (V31e) ─────────────────────────────────
# A thin, LAST-STEP per-market map: production calibrated_confidence -> reality.
# Built from a 60-day backtest (scripts/fit_recalibrators.py); inference is a
# pure np.interp over a 99-point monotone grid — NO sklearn needed at runtime.
#
# WHY THIS EXISTS:
# The upstream chain (temperature scaling T=1.5 -> per-outcome isotonic ->
# POST_CAL_TRUST blend) crushes high-base-rate binary markets toward 0.5,
# so "system says 51%" can really hit 70%. MS survives (near-uniform picks),
# which is why MS is already well-calibrated and OU/HT-OU markets are not.
#
# SAFETY / "DO NO HARM":
# * Only markets whose fit-time ECE >= 5.0 carry a map (currently OU15, OU35,
# HT_OU05, HT_OU15). MS and every already-good market have NO map ->
# recalibrate_conf() returns the input UNCHANGED -> guaranteed no regression.
# * Out-of-sample validated (fit=older 65%, test=unseen 35%):
# MS ECE 1.1 -> 1.3 (flat, safe)
# HT_OU15 29.2 -> 0.8
# OU15 19.0 -> 3.3
# OU35 13.9 -> 4.3
# HT_OU05 11.5 -> 2.4
# * Adjusts ONLY the displayed confidence number. All rich analysis payload
# (probabilities, edges, vetoes, tiers, bands) is preserved untouched, and
# the pre-recalibration value is kept for audit by the caller.
FINAL_RECALIBRATOR_PATH = os.path.join(CALIBRATION_DIR, "final_recalibrators.json")
class FinalRecalibrator:
"""Per-market final-output recalibration via piecewise-linear interpolation.
Loads a compact JSON of 99-point lookup grids (x=calibrated_confidence/100,
y=reality). Markets absent from the file pass through as identity.
"""
def __init__(self, path: str = FINAL_RECALIBRATOR_PATH):
self.grid: Optional[np.ndarray] = None
self.maps: Dict[str, np.ndarray] = {}
self.source_path = path
self._load(path)
def _load(self, path: str) -> None:
if not os.path.exists(path):
print(f"[FinalRecalibrator] No map file at {path} — pass-through mode (all markets unchanged)")
return
try:
with open(path, "r") as f:
data = json.load(f)
meta = data.get("_meta", {})
grid = meta.get("grid")
if not grid:
print("[FinalRecalibrator] Map file missing _meta.grid — pass-through mode")
return
self.grid = np.asarray(grid, dtype=float)
for market, m in data.items():
if market == "_meta" or not isinstance(m, dict):
continue
y = m.get("y")
if y and len(y) == len(self.grid):
self.maps[str(market).upper()] = np.asarray(y, dtype=float)
else:
print(f"[FinalRecalibrator] Skipped {market}: grid/y length mismatch")
print(f"[FinalRecalibrator] Loaded reality maps for {sorted(self.maps.keys())} "
f"(everything else, incl. MS, passes through unchanged)")
except Exception as e:
print(f"[FinalRecalibrator] Warning: failed to load {path}: {e} — pass-through mode")
self.grid = None
self.maps = {}
def has_map(self, market: str) -> bool:
return bool(self.maps) and (market or "").upper() in self.maps
def recalibrate_conf(self, market: str, calibrated_conf: float) -> float:
"""Map a 0100 confidence to its reality-aligned value.
Markets without a trained map (including MS and all already-good
markets) return the input UNCHANGED. Any failure also returns the
input unchanged so this layer can never regress production.
"""
try:
key = (market or "").upper()
if self.grid is None or key not in self.maps:
return calibrated_conf
x = float(calibrated_conf) / 100.0
x = min(max(x, 0.0), 1.0)
y = float(np.interp(x, self.grid, self.maps[key]))
return max(1.0, min(99.0, y * 100.0))
except Exception:
return calibrated_conf
# Singleton instance
_final_recalibrator_instance: Optional[FinalRecalibrator] = None
def get_final_recalibrator() -> FinalRecalibrator:
"""Get or create the global FinalRecalibrator instance."""
global _final_recalibrator_instance
if _final_recalibrator_instance is None:
_final_recalibrator_instance = FinalRecalibrator()
return _final_recalibrator_instance