113 lines
4.8 KiB
Python
113 lines
4.8 KiB
Python
"""
|
||
Calibration Report — are the model's probabilities "kusursuz"?
|
||
=============================================================
|
||
"Flawless probability" has a precise technical meaning: CALIBRATION. When the
|
||
model says 60%, the event must happen ~60% of the time. This measures exactly
|
||
that for the leak-free MS (1X2) model, and shows how much isotonic calibration
|
||
improves it.
|
||
|
||
Metrics:
|
||
* Reliability table: bin predicted prob -> avg predicted vs ACTUAL frequency.
|
||
Calibrated = avg_pred ≈ actual in every bin (gap ≈ 0).
|
||
* ECE (Expected Calibration Error): weighted mean |pred - actual|. Lower=better.
|
||
* Brier score, Log-loss: overall probability accuracy. Lower=better.
|
||
|
||
Time-split (no leakage): train 70% -> fit isotonic on next 15% -> test last 15%.
|
||
|
||
Usage: python scripts/calibration_report.py
|
||
"""
|
||
from __future__ import annotations
|
||
import os, sys
|
||
import numpy as np, pandas as pd, xgboost as xgb
|
||
from sklearn.isotonic import IsotonicRegression
|
||
|
||
if sys.stdout and hasattr(sys.stdout, "reconfigure"):
|
||
try: sys.stdout.reconfigure(encoding="utf-8")
|
||
except Exception: pass
|
||
|
||
AI_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
CSV = os.path.join(AI_DIR, "data", "training_data_v27.csv")
|
||
META = {"match_id","home_team_id","away_team_id","league_id","mst_utc",
|
||
"score_home","score_away","ht_score_home","ht_score_away"}
|
||
LEAKY = {"home_goals_form","away_goals_form","total_goals","ht_total_goals",
|
||
"squad_diff","home_squad_quality","away_squad_quality",
|
||
"referee_home_bias","referee_avg_goals"}
|
||
PARAMS = {"objective":"multi:softprob","num_class":3,"max_depth":5,"eta":0.05,
|
||
"subsample":0.8,"colsample_bytree":0.8,"tree_method":"hist","verbosity":0}
|
||
|
||
|
||
def reliability(probs, y, nbins=10):
|
||
"""Pool one-vs-rest predictions; bin by predicted prob; compare to actual freq."""
|
||
P = probs.reshape(-1)
|
||
hit = np.zeros((len(y), probs.shape[1]))
|
||
hit[np.arange(len(y)), y] = 1.0
|
||
H = hit.reshape(-1)
|
||
edges = np.linspace(0, 1, nbins + 1)
|
||
rows, ece, N = [], 0.0, len(P)
|
||
for i in range(nbins):
|
||
lo, hi = edges[i], edges[i+1]
|
||
m = (P >= lo) & (P < hi) if i < nbins-1 else (P >= lo) & (P <= hi)
|
||
if m.sum() == 0:
|
||
continue
|
||
ap, af, n = P[m].mean(), H[m].mean(), int(m.sum())
|
||
rows.append((f"{int(lo*100)}-{int(hi*100)}%", n, ap, af, af-ap))
|
||
ece += (n / N) * abs(ap - af)
|
||
return rows, ece
|
||
|
||
|
||
def brier(probs, y):
|
||
oh = np.zeros_like(probs); oh[np.arange(len(y)), y] = 1.0
|
||
return float(np.mean(np.sum((probs - oh) ** 2, axis=1)))
|
||
|
||
|
||
def logloss(probs, y):
|
||
p = np.clip(probs[np.arange(len(y)), y], 1e-9, 1)
|
||
return float(-np.mean(np.log(p)))
|
||
|
||
|
||
def main():
|
||
df = pd.read_csv(CSV, low_memory=False).sort_values("mst_utc").reset_index(drop=True)
|
||
sh = pd.to_numeric(df["score_home"], errors="coerce")
|
||
sa = pd.to_numeric(df["score_away"], errors="coerce")
|
||
ok = sh.notna() & sa.notna()
|
||
df, sh, sa = df[ok].reset_index(drop=True), sh[ok.values].values, sa[ok.values].values
|
||
y = np.where(sh > sa, 0, np.where(sh == sa, 1, 2))
|
||
feats = [c for c in df.columns if c not in META and not c.startswith("label_") and c not in LEAKY]
|
||
X = df[feats].apply(pd.to_numeric, errors="coerce").fillna(0.0).values
|
||
|
||
n = len(df); a, b = int(n*0.70), int(n*0.85)
|
||
Xtr, ytr = X[:a], y[:a]
|
||
Xca, yca = X[a:b], y[a:b]
|
||
Xte, yte = X[b:], y[b:]
|
||
print(f"{n:,} matches | train {len(ytr):,} / calib {len(yca):,} / test {len(yte):,} (time-split)")
|
||
|
||
bst = xgb.train(PARAMS, xgb.DMatrix(Xtr, label=ytr), num_boost_round=300)
|
||
raw_ca = bst.predict(xgb.DMatrix(Xca))
|
||
raw_te = bst.predict(xgb.DMatrix(Xte))
|
||
|
||
# isotonic per class (fit on calib), apply to test, renormalize
|
||
isos = []
|
||
for k in range(3):
|
||
ir = IsotonicRegression(out_of_bounds="clip", y_min=0, y_max=1)
|
||
ir.fit(raw_ca[:, k], (yca == k).astype(float))
|
||
isos.append(ir)
|
||
cal_te = np.column_stack([isos[k].predict(raw_te[:, k]) for k in range(3)])
|
||
cal_te = np.clip(cal_te, 1e-6, 1)
|
||
cal_te = cal_te / cal_te.sum(axis=1, keepdims=True)
|
||
|
||
for name, P in (("RAW (kalibrasyonsuz)", raw_te), ("ISOTONIC KALİBRELİ", cal_te)):
|
||
rows, ece = reliability(P, yte)
|
||
print(f"\n{'='*64}\n{name}\n{'='*64}")
|
||
print(f" {'tahmin bandı':<12}{'n':>7}{'ort.tahmin':>12}{'gerçek':>9}{'fark':>8}")
|
||
for band, nn, ap, af, gap in rows:
|
||
print(f" {band:<12}{nn:>7}{100*ap:>11.1f}%{100*af:>8.1f}%{100*gap:>+7.1f}")
|
||
print(f" ECE={100*ece:.2f}% Brier={brier(P,yte):.4f} LogLoss={logloss(P,yte):.4f}")
|
||
|
||
print("\nOKUMA: 'fark' ≈ 0 ise olasılıklar KUSURSUZ (söylediği %X gerçekten %X).")
|
||
print("ECE/Brier/LogLoss düştüyse kalibrasyon işe yaradı. Bu kalibre olasılıklar,")
|
||
print("maçın olası sonuçlarını dürüstçe gösterir — kayıp-minimizasyonun temeli budur.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|