256 lines
11 KiB
Python
256 lines
11 KiB
Python
"""Guarded self-correction loop — fits the market-anchor correction table.
|
|
|
|
What it does (the "tablo üreteci" of the feedback loop):
|
|
|
|
1. MEASURE: on settled real-odds matches, per implied-probability band, the
|
|
gap between the RAW de-vigged probability and the actual rate — for BOTH
|
|
the home side (ms_home) and the away side (ms_away).
|
|
2. BRAKE: a band only earns a correction if it passes the safety gates —
|
|
* min sample (>= MIN_N matches in the band, fitted on TRAIN window)
|
|
* shrinkage (delta = SHRINK x measured gap — never the full gap)
|
|
* clipping (|delta| <= CLIP)
|
|
* materiality (|delta| >= MIN_DELTA, else 0 — don't chase noise)
|
|
3. PROVE: the candidate table must beat the CURRENTLY ACTIVE corrections
|
|
out-of-sample (most recent TEST_DAYS, never seen during fitting) on
|
|
combined home+away ECE. If it doesn't, nothing is written.
|
|
4. WRITE: versioned artifact `config/market_anchor_corrections.json`
|
|
(+ timestamped copy under `config/history/`). The engine reads the table
|
|
at runtime (models/market_anchor.py) — the loop never modifies code.
|
|
|
|
Run weekly (cron) or manually after big data ingests:
|
|
python scripts/fit_anchor_corrections.py [--days 540] [--test-days 90]
|
|
python scripts/fit_anchor_corrections.py --dry-run # measure only
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import time
|
|
from collections import defaultdict
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple
|
|
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
|
|
import psycopg2 # noqa: E402
|
|
from psycopg2.extras import RealDictCursor # noqa: E402
|
|
|
|
from data.db import get_clean_dsn # noqa: E402
|
|
from models.market_anchor import ( # noqa: E402
|
|
away_favorite_delta,
|
|
home_favorite_delta,
|
|
)
|
|
|
|
# ── safety gates ─────────────────────────────────────────────────────
|
|
MIN_N = 1500 # band needs this many TRAIN matches to earn a correction
|
|
SHRINK = 0.5 # apply only half of the measured gap
|
|
CLIP = 0.05 # never correct more than 5 points
|
|
MIN_DELTA = 0.004 # below this the correction is noise — emit 0
|
|
ACCEPT_MARGIN = 0.0002 # candidate must beat active combined ECE by this
|
|
|
|
BANDS: Tuple[Tuple[float, float], ...] = (
|
|
(0.05, 0.15), (0.15, 0.25), (0.25, 0.35), (0.35, 0.45),
|
|
(0.45, 0.55), (0.55, 0.65), (0.65, 0.75), (0.75, 0.85), (0.85, 1.01),
|
|
)
|
|
|
|
REAL_ODDS_MIN_OVERROUND = 0.05
|
|
|
|
|
|
def fetch(days: int) -> List[Dict[str, Any]]:
|
|
since_ms = int((time.time() - days * 86400) * 1000)
|
|
sql = """
|
|
SELECT f.implied_home AS p1, f.implied_draw AS px, f.implied_away AS p2,
|
|
m.mst_utc,
|
|
(m.winner = 'home')::int AS home_won,
|
|
(m.winner = 'away')::int AS away_won
|
|
FROM football_ai_features f
|
|
JOIN matches m ON m.id = f.match_id
|
|
WHERE m.sport = 'football'
|
|
AND m.winner IN ('home', 'away', 'draw')
|
|
AND f.odds_overround > %s
|
|
AND m.mst_utc >= %s
|
|
"""
|
|
out: List[Dict[str, Any]] = []
|
|
with psycopg2.connect(get_clean_dsn()) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SET statement_timeout = '120s'")
|
|
with conn.cursor("fit_stream", cursor_factory=RealDictCursor) as cur:
|
|
cur.itersize = 5000
|
|
cur.execute(sql, (REAL_ODDS_MIN_OVERROUND, since_ms))
|
|
for r in cur:
|
|
p1, px, p2 = r["p1"], r["px"], r["p2"]
|
|
if p1 is None or px is None or p2 is None:
|
|
continue
|
|
if abs(float(p1) + float(px) + float(p2) - 1.0) > 0.02:
|
|
continue
|
|
out.append({
|
|
"p1": float(p1), "p2": float(p2),
|
|
"y1": int(r["home_won"]), "y2": int(r["away_won"]),
|
|
"mst_utc": int(r["mst_utc"]),
|
|
})
|
|
return out
|
|
|
|
|
|
def band_of(p: float) -> Optional[int]:
|
|
for i, (lo, hi) in enumerate(BANDS):
|
|
if lo <= p < hi:
|
|
return i
|
|
return None
|
|
|
|
|
|
def fit_candidate(
|
|
train: List[Dict[str, Any]], pkey: str, ykey: str
|
|
) -> List[Dict[str, Any]]:
|
|
n = defaultdict(int); sp = defaultdict(float); sy = defaultdict(int)
|
|
for r in train:
|
|
b = band_of(r[pkey])
|
|
if b is None:
|
|
continue
|
|
n[b] += 1; sp[b] += r[pkey]; sy[b] += r[ykey]
|
|
bands: List[Dict[str, Any]] = []
|
|
for i, (lo, hi) in enumerate(BANDS):
|
|
if n[i] < MIN_N:
|
|
continue # gate: not enough evidence — no correction for this band
|
|
raw_gap = (sy[i] / n[i]) - (sp[i] / n[i])
|
|
delta = max(-CLIP, min(CLIP, SHRINK * raw_gap))
|
|
if abs(delta) < MIN_DELTA:
|
|
delta = 0.0
|
|
bands.append({"lo": lo, "hi": hi, "delta": round(delta, 4),
|
|
"n": n[i], "raw_gap": round(raw_gap, 4)})
|
|
return bands
|
|
|
|
|
|
def table_delta_fn(table: List[Dict[str, Any]]) -> Callable[[float], float]:
|
|
def fn(p: float) -> float:
|
|
for b in table:
|
|
if b["lo"] <= p < b["hi"]:
|
|
return b["delta"]
|
|
return 0.0
|
|
return fn
|
|
|
|
|
|
def ece(rows: List[Dict[str, Any]], pkey: str, ykey: str,
|
|
delta_fn: Callable[[float], float]) -> float:
|
|
n = defaultdict(int); sp = defaultdict(float); sy = defaultdict(int)
|
|
for r in rows:
|
|
pc = min(0.98, r[pkey] + delta_fn(r[pkey]))
|
|
b = min(19, int(pc * 20))
|
|
n[b] += 1; sp[b] += pc; sy[b] += r[ykey]
|
|
total = sum(n.values())
|
|
if not total:
|
|
return 0.0
|
|
return sum(n[b] * abs(sp[b] / n[b] - sy[b] / n[b]) for b in n) / total
|
|
|
|
|
|
def print_bands(title: str, bands: List[Dict[str, Any]]) -> None:
|
|
print(f"\ncandidate bands — {title} (after gates):")
|
|
print(f"{'band':>12} {'n':>8} {'raw_gap_pt':>11} {'delta_pt':>9}")
|
|
for b in bands:
|
|
print(f"{b['lo']:>5.2f}-{b['hi']:<5.2f} {b['n']:>8} "
|
|
f"{100 * b['raw_gap']:>11.2f} {100 * b['delta']:>9.2f}")
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("--days", type=int, default=540, help="total lookback")
|
|
ap.add_argument("--test-days", type=int, default=90,
|
|
help="most recent window held out for acceptance")
|
|
ap.add_argument("--dry-run", action="store_true",
|
|
help="measure and report only — never write")
|
|
args = ap.parse_args()
|
|
|
|
rows = fetch(args.days)
|
|
cutoff_ms = int((time.time() - args.test_days * 86400) * 1000)
|
|
train = [r for r in rows if r["mst_utc"] < cutoff_ms]
|
|
test = [r for r in rows if r["mst_utc"] >= cutoff_ms]
|
|
print(f"matches: total={len(rows)} train={len(train)} test(OOS)={len(test)}")
|
|
if len(train) < 10 * MIN_N or len(test) < 2000:
|
|
print("ABORT: not enough data for a safe fit — keeping active table.")
|
|
return
|
|
|
|
cand_home = fit_candidate(train, "p1", "y1")
|
|
cand_away = fit_candidate(train, "p2", "y2")
|
|
print_bands("ms_home", cand_home)
|
|
print_bands("ms_away", cand_away)
|
|
|
|
# active = whatever the engine currently loads (artifact or fallback)
|
|
ece_h_act = ece(test, "p1", "y1", home_favorite_delta)
|
|
ece_a_act = ece(test, "p2", "y2", away_favorite_delta)
|
|
ece_h_cand = ece(test, "p1", "y1", table_delta_fn(cand_home))
|
|
ece_a_cand = ece(test, "p2", "y2", table_delta_fn(cand_away))
|
|
ece_h_raw = ece(test, "p1", "y1", lambda _p: 0.0)
|
|
ece_a_raw = ece(test, "p2", "y2", lambda _p: 0.0)
|
|
|
|
print(f"\nOOS ECE (home/away/combined):")
|
|
print(f" raw (devig only) : {100 * ece_h_raw:.3f}% / {100 * ece_a_raw:.3f}% "
|
|
f"/ {100 * (ece_h_raw + ece_a_raw):.3f}%")
|
|
print(f" ACTIVE table : {100 * ece_h_act:.3f}% / {100 * ece_a_act:.3f}% "
|
|
f"/ {100 * (ece_h_act + ece_a_act):.3f}%")
|
|
print(f" CANDIDATE table : {100 * ece_h_cand:.3f}% / {100 * ece_a_cand:.3f}% "
|
|
f"/ {100 * (ece_h_cand + ece_a_cand):.3f}%")
|
|
|
|
if args.dry_run:
|
|
print("\n(dry-run: nothing written)")
|
|
return
|
|
|
|
combined_act = ece_h_act + ece_a_act
|
|
combined_cand = ece_h_cand + ece_a_cand
|
|
if combined_cand > combined_act - ACCEPT_MARGIN:
|
|
print("\nREJECTED: candidate does not beat the active table "
|
|
"out-of-sample. Active corrections stay. (Bu fren tasarim geregi:"
|
|
" kanitlayamayan tablo yazilmaz.)")
|
|
return
|
|
|
|
cfg_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "config"))
|
|
path = os.path.join(cfg_dir, "market_anchor_corrections.json")
|
|
artifact = {
|
|
"version": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
"fitted_on": {"days": args.days, "test_days": args.test_days,
|
|
"n_train": len(train), "n_test": len(test)},
|
|
"validated": {
|
|
"ece_home": {"raw": round(ece_h_raw, 5), "active_before": round(ece_h_act, 5),
|
|
"candidate_oos": round(ece_h_cand, 5)},
|
|
"ece_away": {"raw": round(ece_a_raw, 5), "active_before": round(ece_a_act, 5),
|
|
"candidate_oos": round(ece_a_cand, 5)},
|
|
},
|
|
"gates": {"min_n": MIN_N, "shrink": SHRINK, "clip": CLIP,
|
|
"min_delta": MIN_DELTA},
|
|
"corrections": {"ms_home": cand_home, "ms_away": cand_away},
|
|
}
|
|
hist_dir = os.path.join(cfg_dir, "history")
|
|
os.makedirs(hist_dir, exist_ok=True)
|
|
if os.path.exists(path):
|
|
shutil.copy2(path, os.path.join(
|
|
hist_dir, f"market_anchor_corrections-{int(time.time())}.json"))
|
|
with open(path, "w", encoding="utf-8") as fh:
|
|
json.dump(artifact, fh, ensure_ascii=False, indent=2)
|
|
print(f"\nACCEPTED: wrote {path}")
|
|
|
|
# The deployed ai-engine container has NO volume mounts, so the file above
|
|
# is invisible to it — app_settings is the shared medium. Running engines
|
|
# re-read it within ~10 minutes (TTL in models/market_anchor.py).
|
|
try:
|
|
with psycopg2.connect(get_clean_dsn()) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO app_settings (key, value, updated_at)
|
|
VALUES ('market_anchor_corrections', %s, now())
|
|
ON CONFLICT (key)
|
|
DO UPDATE SET value = EXCLUDED.value, updated_at = now()
|
|
""",
|
|
(json.dumps(artifact, ensure_ascii=False),),
|
|
)
|
|
conn.commit()
|
|
print("ACCEPTED: upserted app_settings['market_anchor_corrections'] "
|
|
"(live engines refresh within ~10 min)")
|
|
except Exception as exc: # file artifact still written — warn only
|
|
print(f"WARN: app_settings upsert failed: {exc}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|