"""Guarded self-correction loop — fits the market-anchor correction table. What it does (the "tablo üreteci" of the feedback loop): 1. MEASURE: on settled real-odds matches, per implied-probability band, the gap between the RAW de-vigged probability and the actual rate — for BOTH the home side (ms_home) and the away side (ms_away). 2. BRAKE: a band only earns a correction if it passes the safety gates — * min sample (>= MIN_N matches in the band, fitted on TRAIN window) * shrinkage (delta = SHRINK x measured gap — never the full gap) * clipping (|delta| <= CLIP) * materiality (|delta| >= MIN_DELTA, else 0 — don't chase noise) 3. PROVE: the candidate table must beat the CURRENTLY ACTIVE corrections out-of-sample (most recent TEST_DAYS, never seen during fitting) on combined home+away ECE. If it doesn't, nothing is written. 4. WRITE: versioned artifact `config/market_anchor_corrections.json` (+ timestamped copy under `config/history/`). The engine reads the table at runtime (models/market_anchor.py) — the loop never modifies code. Run weekly (cron) or manually after big data ingests: python scripts/fit_anchor_corrections.py [--days 540] [--test-days 90] python scripts/fit_anchor_corrections.py --dry-run # measure only """ from __future__ import annotations import argparse import json import os import shutil import sys import time from collections import defaultdict from typing import Any, Callable, Dict, List, Optional, Tuple sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import psycopg2 # noqa: E402 from psycopg2.extras import RealDictCursor # noqa: E402 from data.db import get_clean_dsn # noqa: E402 from models.market_anchor import ( # noqa: E402 away_favorite_delta, home_favorite_delta, ) # ── safety gates ───────────────────────────────────────────────────── MIN_N = 1500 # band needs this many TRAIN matches to earn a correction SHRINK = 0.5 # apply only half of the measured gap CLIP = 0.05 # never correct more than 5 points MIN_DELTA = 0.004 # below this the correction is noise — emit 0 ACCEPT_MARGIN = 0.0002 # candidate must beat active combined ECE by this BANDS: Tuple[Tuple[float, float], ...] = ( (0.05, 0.15), (0.15, 0.25), (0.25, 0.35), (0.35, 0.45), (0.45, 0.55), (0.55, 0.65), (0.65, 0.75), (0.75, 0.85), (0.85, 1.01), ) REAL_ODDS_MIN_OVERROUND = 0.05 def fetch(days: int) -> List[Dict[str, Any]]: since_ms = int((time.time() - days * 86400) * 1000) sql = """ SELECT f.implied_home AS p1, f.implied_draw AS px, f.implied_away AS p2, m.mst_utc, (m.winner = 'home')::int AS home_won, (m.winner = 'away')::int AS away_won FROM football_ai_features f JOIN matches m ON m.id = f.match_id WHERE m.sport = 'football' AND m.winner IN ('home', 'away', 'draw') AND f.odds_overround > %s AND m.mst_utc >= %s """ out: List[Dict[str, Any]] = [] with psycopg2.connect(get_clean_dsn()) as conn: with conn.cursor() as cur: cur.execute("SET statement_timeout = '120s'") with conn.cursor("fit_stream", cursor_factory=RealDictCursor) as cur: cur.itersize = 5000 cur.execute(sql, (REAL_ODDS_MIN_OVERROUND, since_ms)) for r in cur: p1, px, p2 = r["p1"], r["px"], r["p2"] if p1 is None or px is None or p2 is None: continue if abs(float(p1) + float(px) + float(p2) - 1.0) > 0.02: continue out.append({ "p1": float(p1), "p2": float(p2), "y1": int(r["home_won"]), "y2": int(r["away_won"]), "mst_utc": int(r["mst_utc"]), }) return out def band_of(p: float) -> Optional[int]: for i, (lo, hi) in enumerate(BANDS): if lo <= p < hi: return i return None def fit_candidate( train: List[Dict[str, Any]], pkey: str, ykey: str ) -> List[Dict[str, Any]]: n = defaultdict(int); sp = defaultdict(float); sy = defaultdict(int) for r in train: b = band_of(r[pkey]) if b is None: continue n[b] += 1; sp[b] += r[pkey]; sy[b] += r[ykey] bands: List[Dict[str, Any]] = [] for i, (lo, hi) in enumerate(BANDS): if n[i] < MIN_N: continue # gate: not enough evidence — no correction for this band raw_gap = (sy[i] / n[i]) - (sp[i] / n[i]) delta = max(-CLIP, min(CLIP, SHRINK * raw_gap)) if abs(delta) < MIN_DELTA: delta = 0.0 bands.append({"lo": lo, "hi": hi, "delta": round(delta, 4), "n": n[i], "raw_gap": round(raw_gap, 4)}) return bands def table_delta_fn(table: List[Dict[str, Any]]) -> Callable[[float], float]: def fn(p: float) -> float: for b in table: if b["lo"] <= p < b["hi"]: return b["delta"] return 0.0 return fn def ece(rows: List[Dict[str, Any]], pkey: str, ykey: str, delta_fn: Callable[[float], float]) -> float: n = defaultdict(int); sp = defaultdict(float); sy = defaultdict(int) for r in rows: pc = min(0.98, r[pkey] + delta_fn(r[pkey])) b = min(19, int(pc * 20)) n[b] += 1; sp[b] += pc; sy[b] += r[ykey] total = sum(n.values()) if not total: return 0.0 return sum(n[b] * abs(sp[b] / n[b] - sy[b] / n[b]) for b in n) / total def print_bands(title: str, bands: List[Dict[str, Any]]) -> None: print(f"\ncandidate bands — {title} (after gates):") print(f"{'band':>12} {'n':>8} {'raw_gap_pt':>11} {'delta_pt':>9}") for b in bands: print(f"{b['lo']:>5.2f}-{b['hi']:<5.2f} {b['n']:>8} " f"{100 * b['raw_gap']:>11.2f} {100 * b['delta']:>9.2f}") def main() -> None: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--days", type=int, default=540, help="total lookback") ap.add_argument("--test-days", type=int, default=90, help="most recent window held out for acceptance") ap.add_argument("--dry-run", action="store_true", help="measure and report only — never write") args = ap.parse_args() rows = fetch(args.days) cutoff_ms = int((time.time() - args.test_days * 86400) * 1000) train = [r for r in rows if r["mst_utc"] < cutoff_ms] test = [r for r in rows if r["mst_utc"] >= cutoff_ms] print(f"matches: total={len(rows)} train={len(train)} test(OOS)={len(test)}") if len(train) < 10 * MIN_N or len(test) < 2000: print("ABORT: not enough data for a safe fit — keeping active table.") return cand_home = fit_candidate(train, "p1", "y1") cand_away = fit_candidate(train, "p2", "y2") print_bands("ms_home", cand_home) print_bands("ms_away", cand_away) # active = whatever the engine currently loads (artifact or fallback) ece_h_act = ece(test, "p1", "y1", home_favorite_delta) ece_a_act = ece(test, "p2", "y2", away_favorite_delta) ece_h_cand = ece(test, "p1", "y1", table_delta_fn(cand_home)) ece_a_cand = ece(test, "p2", "y2", table_delta_fn(cand_away)) ece_h_raw = ece(test, "p1", "y1", lambda _p: 0.0) ece_a_raw = ece(test, "p2", "y2", lambda _p: 0.0) print(f"\nOOS ECE (home/away/combined):") print(f" raw (devig only) : {100 * ece_h_raw:.3f}% / {100 * ece_a_raw:.3f}% " f"/ {100 * (ece_h_raw + ece_a_raw):.3f}%") print(f" ACTIVE table : {100 * ece_h_act:.3f}% / {100 * ece_a_act:.3f}% " f"/ {100 * (ece_h_act + ece_a_act):.3f}%") print(f" CANDIDATE table : {100 * ece_h_cand:.3f}% / {100 * ece_a_cand:.3f}% " f"/ {100 * (ece_h_cand + ece_a_cand):.3f}%") if args.dry_run: print("\n(dry-run: nothing written)") return combined_act = ece_h_act + ece_a_act combined_cand = ece_h_cand + ece_a_cand if combined_cand > combined_act - ACCEPT_MARGIN: print("\nREJECTED: candidate does not beat the active table " "out-of-sample. Active corrections stay. (Bu fren tasarim geregi:" " kanitlayamayan tablo yazilmaz.)") return cfg_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "config")) path = os.path.join(cfg_dir, "market_anchor_corrections.json") artifact = { "version": time.strftime("%Y-%m-%dT%H:%M:%S"), "fitted_on": {"days": args.days, "test_days": args.test_days, "n_train": len(train), "n_test": len(test)}, "validated": { "ece_home": {"raw": round(ece_h_raw, 5), "active_before": round(ece_h_act, 5), "candidate_oos": round(ece_h_cand, 5)}, "ece_away": {"raw": round(ece_a_raw, 5), "active_before": round(ece_a_act, 5), "candidate_oos": round(ece_a_cand, 5)}, }, "gates": {"min_n": MIN_N, "shrink": SHRINK, "clip": CLIP, "min_delta": MIN_DELTA}, "corrections": {"ms_home": cand_home, "ms_away": cand_away}, } hist_dir = os.path.join(cfg_dir, "history") os.makedirs(hist_dir, exist_ok=True) if os.path.exists(path): shutil.copy2(path, os.path.join( hist_dir, f"market_anchor_corrections-{int(time.time())}.json")) with open(path, "w", encoding="utf-8") as fh: json.dump(artifact, fh, ensure_ascii=False, indent=2) print(f"\nACCEPTED: wrote {path}") # The deployed ai-engine container has NO volume mounts, so the file above # is invisible to it — app_settings is the shared medium. Running engines # re-read it within ~10 minutes (TTL in models/market_anchor.py). try: with psycopg2.connect(get_clean_dsn()) as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO app_settings (key, value, updated_at) VALUES ('market_anchor_corrections', %s, now()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now() """, (json.dumps(artifact, ensure_ascii=False),), ) conn.commit() print("ACCEPTED: upserted app_settings['market_anchor_corrections'] " "(live engines refresh within ~10 min)") except Exception as exc: # file artifact still written — warn only print(f"WARN: app_settings upsert failed: {exc}") if __name__ == "__main__": main()