From 4c137fbab6461c307b56916b9b062f49485908b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fahri=20Can=20Se=C3=A7er?= Date: Thu, 11 Jun 2026 00:25:45 +0300 Subject: [PATCH] wow --- .../config/market_anchor_corrections.json | 131 +++++++++ ai-engine/models/live_matrix.py | 184 ++++++++++++ ai-engine/models/market_anchor.py | 175 +++++++++++- ai-engine/scripts/calibration_scoreboard.py | 261 ++++++++++++++++++ ai-engine/scripts/fit_anchor_corrections.py | 255 +++++++++++++++++ .../services/orchestrator/market_board.py | 53 +++- ai-engine/tests/test_live_matrix.py | 80 ++++++ ai-engine/tests/test_market_anchor.py | 80 ++++++ .../predictions/predictions.service.ts | 33 ++- 9 files changed, 1246 insertions(+), 6 deletions(-) create mode 100644 ai-engine/config/market_anchor_corrections.json create mode 100644 ai-engine/models/live_matrix.py create mode 100644 ai-engine/scripts/calibration_scoreboard.py create mode 100644 ai-engine/scripts/fit_anchor_corrections.py create mode 100644 ai-engine/tests/test_live_matrix.py diff --git a/ai-engine/config/market_anchor_corrections.json b/ai-engine/config/market_anchor_corrections.json new file mode 100644 index 0000000..33958eb --- /dev/null +++ b/ai-engine/config/market_anchor_corrections.json @@ -0,0 +1,131 @@ +{ + "version": "2026-06-10T23:11:49", + "fitted_on": { + "days": 540, + "test_days": 90, + "n_train": 64603, + "n_test": 14450 + }, + "validated": { + "ece_home": { + "raw": 0.01803, + "active_before": 0.01312, + "candidate_oos": 0.01301 + }, + "ece_away": { + "raw": 0.01234, + "active_before": 0.01234, + "candidate_oos": 0.00845 + } + }, + "gates": { + "min_n": 1500, + "shrink": 0.5, + "clip": 0.05, + "min_delta": 0.004 + }, + "corrections": { + "ms_home": [ + { + "lo": 0.05, + "hi": 0.15, + "delta": 0.0, + "n": 2124, + "raw_gap": -0.0072 + }, + { + "lo": 0.15, + "hi": 0.25, + "delta": 0.0, + "n": 6476, + "raw_gap": -0.0031 + }, + { + "lo": 0.25, + "hi": 0.35, + "delta": 0.0, + "n": 12565, + "raw_gap": -0.0018 + }, + { + "lo": 0.35, + "hi": 0.45, + "delta": 0.0, + "n": 16431, + "raw_gap": 0.006 + }, + { + "lo": 0.45, + "hi": 0.55, + "delta": 0.0124, + "n": 12995, + "raw_gap": 0.0248 + }, + { + "lo": 0.55, + "hi": 0.65, + "delta": 0.0154, + "n": 8479, + "raw_gap": 0.0307 + }, + { + "lo": 0.65, + "hi": 0.75, + "delta": 0.0203, + "n": 4638, + "raw_gap": 0.0407 + } + ], + "ms_away": [ + { + "lo": 0.05, + "hi": 0.15, + "delta": -0.0077, + "n": 6762, + "raw_gap": -0.0154 + }, + { + "lo": 0.15, + "hi": 0.25, + "delta": -0.0048, + "n": 16211, + "raw_gap": -0.0097 + }, + { + "lo": 0.25, + "hi": 0.35, + "delta": 0.0, + "n": 18440, + "raw_gap": -0.002 + }, + { + "lo": 0.35, + "hi": 0.45, + "delta": 0.009, + "n": 12061, + "raw_gap": 0.0181 + }, + { + "lo": 0.45, + "hi": 0.55, + "delta": 0.0116, + "n": 5930, + "raw_gap": 0.0231 + }, + { + "lo": 0.55, + "hi": 0.65, + "delta": 0.0199, + "n": 3287, + "raw_gap": 0.0399 + }, + { + "lo": 0.65, + "hi": 0.75, + "delta": 0.0295, + "n": 1580, + "raw_gap": 0.0589 + } + ] + } +} \ No newline at end of file diff --git a/ai-engine/models/live_matrix.py b/ai-engine/models/live_matrix.py new file mode 100644 index 0000000..2991f28 --- /dev/null +++ b/ai-engine/models/live_matrix.py @@ -0,0 +1,184 @@ +"""Live-conditioned score projection (V38) — pure functions, no I/O. + +Answers, DURING a match, questions like "1-0 at 80' — what is the REAL +probability the away team still scores?" by conditioning the same calibrated +market-anchored lambdas (V35/V36) on the current score and minute. + +Mechanics — a minute-stepped Markov chain over remaining goals: + + 1. Pre-match lambdas come from the SAME source the score card uses + (de-vigged 1X2 + over2.5, models/score_matrix solvers) — one consistent + probability spine pre-match and in-play. + 2. Each remaining minute contributes lambda_side x minute_share(t) goals, + where minute_share is the EMPIRICAL goal-time intensity curve measured + on 38,779 clean-timeline real-odds matches (1H share 44.4%, late-game + intensity rises, stoppage spikes at 45' and 90+'). + 3. Each minute's intensity is scaled by the MEASURED score-state + multiplier: trailing teams push (+9%, +17% after 70'), leading teams + shut up shop (-5%/-7%), 2+ ahead opens up. The chain updates the state + as virtual goals happen, so multipliers switch mid-projection exactly + like they do on the pitch. + +All constants are fitted on the train window (matches older than the last 90 +days); the held-out window validates calibration out-of-sample before any of +this reaches the screen. +""" + +from __future__ import annotations + +from typing import Dict, List, Optional, Tuple + +from models.score_matrix import split_lambdas, total_lambda_from_over25 + +MAX_MINUTE = 94 # 90 + folded stoppage +LATE_PHASE_FROM = 70 # measured multipliers switch here +MAX_EXTRA_GOALS = 7 # per side, absorbing cap for the chain + +# Empirical goal-time intensity: share of a match's goals per 5-min bucket +# (0-5, ..., 90-94+). Measured on 105k goals; 45' and 90+' buckets carry the +# folded stoppage-time spikes. +INTENSITY_SHARES: Tuple[float, ...] = ( + 0.036, 0.045, 0.047, 0.047, 0.045, 0.046, 0.048, 0.049, 0.081, + 0.048, 0.057, 0.055, 0.054, 0.053, 0.052, 0.053, 0.052, 0.056, 0.076, +) + +# Score-state goal-intensity multipliers, measured (actual/expected) by the +# scoring side's goal difference, split early (<70') / late (>=70'). +_STATE_MULT_EARLY: Dict[int, float] = {-2: 1.095, -1: 1.045, 0: 0.966, 1: 0.952, 2: 1.011} +_STATE_MULT_LATE: Dict[int, float] = {-2: 1.123, -1: 1.174, 0: 1.015, 1: 0.930, 2: 1.011} + + +def _minute_share(minute: int) -> float: + """Per-minute share of match-total goal intensity at `minute` (1-based).""" + b = min(len(INTENSITY_SHARES) - 1, max(0, (minute - 1) // 5)) + return INTENSITY_SHARES[b] / 5.0 + + +def state_multiplier(diff: int, minute: int) -> float: + """Intensity multiplier for a side whose current goal difference is + `diff` (own − opponent), at `minute`.""" + d = max(-2, min(2, diff)) + table = _STATE_MULT_LATE if minute >= LATE_PHASE_FROM else _STATE_MULT_EARLY + return table[d] + + +def estimate_minute(match_date_ms: Optional[int], now_ms: int) -> Optional[int]: + """Approximate current match minute from kickoff time (no feed minute is + available: live_matches.substate carries none). Folds the ~15' half-time + break; accuracy is ±2-3 minutes which barely moves the projection.""" + if not match_date_ms: + return None + elapsed = (now_ms - int(match_date_ms)) / 60000.0 + if elapsed < 0: + return None + if elapsed <= 48: # first half (+stoppage) + minute = elapsed + elif elapsed <= 63: # half-time break window + minute = 46 + else: + minute = elapsed - 15.0 # second half, break folded out + return int(max(1, min(MAX_MINUTE, minute))) + + +def _chain( + lam_h: float, + lam_a: float, + cur_h: int, + cur_a: int, + minute: int, +) -> Dict[Tuple[int, int], float]: + """Distribution over (extra home goals, extra away goals) from `minute` + to full time, with state-dependent intensities.""" + dist: Dict[Tuple[int, int], float] = {(0, 0): 1.0} + for t in range(minute, MAX_MINUTE + 1): + share = _minute_share(t) + nxt: Dict[Tuple[int, int], float] = {} + for (eh, ea), p in dist.items(): + diff = (cur_h + eh) - (cur_a + ea) + ph = lam_h * share * state_multiplier(diff, t) + pa = lam_a * share * state_multiplier(-diff, t) + ph = min(ph, 0.30); pa = min(pa, 0.30) + stay = max(0.0, 1.0 - ph - pa) + nxt[(eh, ea)] = nxt.get((eh, ea), 0.0) + p * stay + if eh < MAX_EXTRA_GOALS: + nxt[(eh + 1, ea)] = nxt.get((eh + 1, ea), 0.0) + p * ph + else: + nxt[(eh, ea)] = nxt.get((eh, ea), 0.0) + p * ph + if ea < MAX_EXTRA_GOALS: + nxt[(eh, ea + 1)] = nxt.get((eh, ea + 1), 0.0) + p * pa + else: + nxt[(eh, ea)] = nxt.get((eh, ea), 0.0) + p * pa + dist = nxt + return dist + + +def build_live_projection( + p1: float, + px: float, + p2: float, + p_over25: float, + cur_h: int, + cur_a: int, + minute: int, +) -> Dict[str, object]: + """Live projection from the anchored pre-match probabilities + the pitch + state. Returns honest, score/minute-aware probabilities. + + (p1, px, p2) and p_over25 are the CALIBRATED (V35-anchored) numbers; the + same spine the pre-match cards display. + """ + minute = int(max(1, min(MAX_MINUTE, minute))) + cur_h = max(0, int(cur_h)); cur_a = max(0, int(cur_a)) + total = total_lambda_from_over25(p_over25) + lam_h, lam_a = split_lambdas(total, p1, p2) + + dist = _chain(lam_h, lam_a, cur_h, cur_a, minute) + + p_home_win = p_draw = p_away_win = 0.0 + p_home_scores = p_away_scores = 0.0 + exp_goals = 0.0 + scores: Dict[str, float] = {} + for (eh, ea), p in dist.items(): + fh, fa = cur_h + eh, cur_a + ea + if fh > fa: p_home_win += p + elif fh == fa: p_draw += p + else: p_away_win += p + if eh > 0: p_home_scores += p + if ea > 0: p_away_scores += p + exp_goals += p * (eh + ea) + key = f"{min(fh,9)}-{min(fa,9)}" + scores[key] = scores.get(key, 0.0) + p + + top = sorted(scores.items(), key=lambda kv: kv[1], reverse=True)[:5] + total_now = cur_h + cur_a + p_over25_live = sum( + p for (eh, ea), p in dist.items() if total_now + eh + ea >= 3 + ) + + # "comeback": the side currently behind at least draws / currently level + # match does NOT stay level + if cur_h > cur_a: + p_comeback = p_draw + p_away_win + elif cur_a > cur_h: + p_comeback = p_draw + p_home_win + else: + p_comeback = p_home_win + p_away_win # deadlock breaks + + return { + "minute": minute, + "current_score": f"{cur_h}-{cur_a}", + "probs": { + "1": round(p_home_win, 4), + "X": round(p_draw, 4), + "2": round(p_away_win, 4), + }, + "p_home_scores_again": round(p_home_scores, 4), + "p_away_scores_again": round(p_away_scores, 4), + "p_comeback": round(p_comeback, 4), + "p_over25": round(p_over25_live, 4), + "expected_remaining_goals": round(exp_goals, 2), + "scenario_top5": [ + {"score": s, "prob": round(p, 4)} for s, p in top + ], + "calibration_source": "live_matrix_v38", + } diff --git a/ai-engine/models/market_anchor.py b/ai-engine/models/market_anchor.py index f700e15..c5723ff 100644 --- a/ai-engine/models/market_anchor.py +++ b/ai-engine/models/market_anchor.py @@ -25,7 +25,11 @@ without the DB or the heavy model stack. from __future__ import annotations -from typing import List, Optional, Tuple +import json +import os +import threading +import time +from typing import Any, Dict, List, Optional, Tuple def devig(odds: List[Optional[float]]) -> Optional[List[float]]: @@ -53,6 +57,13 @@ def devig(odds: List[Optional[float]]) -> Optional[List[float]]: # unbiased. Values are deliberately conservative — universal and shrunk toward 0 # vs the raw tier-0 (soft-league) edge, because the bias is weaker in efficient # top leagues. Applying these took MS-home OOS ECE 1.56% -> 0.64%. +# +# These static bands are the BUILT-IN FALLBACK. The live values come from the +# versioned artifact `config/market_anchor_corrections.json`, refreshed by +# `scripts/fit_anchor_corrections.py` (the guarded self-correction loop: +# measure on settled matches -> shrink/clip/min-sample gates -> out-of-sample +# acceptance -> write table). The engine only ever consumes the TABLE — the +# loop never modifies code. _HOME_FAV_BANDS: Tuple[Tuple[float, float, float], ...] = ( (0.45, 0.55, 0.010), (0.55, 0.65, 0.018), @@ -60,17 +71,177 @@ _HOME_FAV_BANDS: Tuple[Tuple[float, float, float], ...] = ( (0.75, 1.01, 0.034), ) +_DEFAULT_CORRECTIONS_PATH = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "..", "config", + "market_anchor_corrections.json", +) + + +def _corrections_path() -> str: + return os.environ.get( + "MARKET_ANCHOR_CORRECTIONS_PATH", _DEFAULT_CORRECTIONS_PATH + ) +_corrections_lock = threading.Lock() +_corrections_cache: Optional[Dict[str, Any]] = None +_corrections_ts: float = 0.0 +# Re-check sources at most every 10 minutes: the self-correction cron writes a +# new table to app_settings; running engines pick it up WITHOUT a restart. +_CORRECTIONS_TTL_S = 600.0 + + +def _parse_corrections(raw: Dict[str, Any]) -> Optional[Dict[str, Any]]: + parsed_table: Dict[str, Any] = {} + for key in ("ms_home", "ms_away"): + bands = raw.get("corrections", {}).get(key) + if not (isinstance(bands, list) and bands): + continue + parsed = [] + for b in bands: + lo = float(b["lo"]); hi = float(b["hi"]); delta = float(b["delta"]) + if not (0.0 <= lo < hi <= 1.01) or abs(delta) > 0.10: + raise ValueError(f"correction band out of range: {b}") + parsed.append((lo, hi, delta)) + parsed_table[key] = tuple(parsed) + if not parsed_table: + return None + parsed_table["version"] = str(raw.get("version", "?")) + return parsed_table + + +def _db_corrections_raw() -> Optional[Dict[str, Any]]: + """Fetch the correction artifact from app_settings (the deployment's shared + medium — the ai-engine container has no volume mounts, so a host-side cron + can only reach the running engine through the database). Guarded: any + failure → None, never breaks a prediction. Disable with MARKET_ANCHOR_DB=0.""" + if os.environ.get("MARKET_ANCHOR_DB", "1") == "0": + return None + try: + import psycopg2 # local import: keeps module usable without DB deps + from data.db import get_clean_dsn + + with psycopg2.connect(get_clean_dsn(), connect_timeout=3) as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT value FROM app_settings" + " WHERE key = 'market_anchor_corrections'" + ) + row = cur.fetchone() + if row and row[0]: + return json.loads(row[0]) + except Exception: + return None + return None + + +def _load_corrections() -> Optional[Dict[str, Any]]: + """Resolve the active correction table (thread-safe, TTL-cached). + + Source order: + 1. MARKET_ANCHOR_CORRECTIONS_PATH env file (tests/dev — file-only mode, + malformed → static fallback, DB and default file are NOT consulted) + 2. app_settings DB row 'market_anchor_corrections' (production path — + refreshed by scripts/fit_anchor_corrections.py) + 3. bundled config/market_anchor_corrections.json + 4. None → built-in static fallback bands + """ + global _corrections_cache, _corrections_ts + now = time.time() + if now - _corrections_ts < _CORRECTIONS_TTL_S: + return _corrections_cache + with _corrections_lock: + if now - _corrections_ts < _CORRECTIONS_TTL_S: + return _corrections_cache + table: Optional[Dict[str, Any]] = None + env_path = os.environ.get("MARKET_ANCHOR_CORRECTIONS_PATH") + if env_path: + try: + with open(env_path, "r", encoding="utf-8") as fh: + table = _parse_corrections(json.load(fh)) + except (OSError, ValueError, KeyError, TypeError, json.JSONDecodeError): + table = None + else: + raw = _db_corrections_raw() + if raw is not None: + try: + table = _parse_corrections(raw) + except (ValueError, KeyError, TypeError): + table = None + if table is None: + try: + with open(_corrections_path(), "r", encoding="utf-8") as fh: + table = _parse_corrections(json.load(fh)) + except (OSError, ValueError, KeyError, TypeError, json.JSONDecodeError): + table = None + _corrections_cache = table + _corrections_ts = time.time() + return _corrections_cache + + +def reload_corrections() -> None: + """Force re-read of the correction sources (used after a refresh/tests).""" + global _corrections_ts, _corrections_cache + with _corrections_lock: + _corrections_ts = 0.0 + _corrections_cache = None + def home_favorite_delta(p_home: float) -> float: """Additive correction to the de-vigged home-win probability. - Zero below 0.45 (no measured bias for non-favourites).""" + Band semantics: a fitted-artifact band OVERRIDES the static prior where it + exists (including an explicit delta of 0 — evidence of "no bias"). Where + the artifact is SILENT (a range that never passed the min-sample gate, + e.g. big favourites 0.75+), the static prior still applies — missing + evidence must not silently erase proven knowledge.""" + table = _load_corrections() + if table and "ms_home" in table: + for lo, hi, delta in table["ms_home"]: + if lo <= p_home < hi: + return delta for lo, hi, delta in _HOME_FAV_BANDS: if lo <= p_home < hi: return delta return 0.0 +def away_favorite_delta(p_away: float) -> float: + """Additive correction to the de-vigged away-win probability. + + Scoreboard measurement (2026-06): away favourites also win a few points + MORE than the de-vigged price implies (+2.6..+4.2pt). Unlike the home + side there is NO built-in fallback — away corrections must be EARNED via + the fitted artifact (scripts/fit_anchor_corrections.py passing its + out-of-sample acceptance gate). No artifact → zero → prior behaviour.""" + table = _load_corrections() + bands = table.get("ms_away", ()) if table else () + for lo, hi, delta in bands: + if lo <= p_away < hi: + return delta + return 0.0 + + +def apply_corrections( + p1: float, px: float, p2: float +) -> Tuple[float, float, float]: + """Apply favourite corrections to a 3-way (1, X, 2) vector. + + In practice only one side can be a favourite (both ≥0.45 would leave no + room for the draw); if both bands somehow fire, the larger delta wins. + The other two outcomes are renormalised so the vector still sums to 1.""" + d1 = home_favorite_delta(p1) + d2 = away_favorite_delta(p2) + if d1 <= 0.0 and d2 <= 0.0: + return p1, px, p2 + if d1 >= d2: + return apply_home_correction(p1, px, p2) + p2n = min(0.98, p2 + d2) + remaining = 1.0 - p2n + rest = p1 + px + if rest <= 0.0: + return p1, px, p2n + return p1 / rest * remaining, px / rest * remaining, p2n + + def apply_home_correction( p1: float, px: float, p2: float ) -> Tuple[float, float, float]: diff --git a/ai-engine/scripts/calibration_scoreboard.py b/ai-engine/scripts/calibration_scoreboard.py new file mode 100644 index 0000000..1934f90 --- /dev/null +++ b/ai-engine/scripts/calibration_scoreboard.py @@ -0,0 +1,261 @@ +"""Calibration scoreboard — "dediğimiz vs olan" karnesi. + +Measures, on settled real-odds matches, how honest the DISPLAYED numbers are: + + 1. ANCHORED PIPELINE (what V35 shows): per market (MS 1/X/2, OU2.5, BTTS) + reliability buckets — mean stated probability vs actual frequency, + plus ECE / Brier per market. + 2. SCORE CARD (V36): modal-score hit vs stated modal probability, top-5 + coverage, HT modal hit. + 3. STORED RUNS: prediction_runs settled per engine_version (the + `.sim-finished` buckets — the user's manual finished-match tests — are + reported separately and never mixed into the live karne). + +It recomputes the anchored numbers with the SAME modules the engine ships +(models/market_anchor.py + models/score_matrix.py), so the scoreboard always +grades current pipeline math, not a copy of it. + +DB: uses DATABASE_URL (data/db.py). Reads are gentle: a server-side cursor +over an indexed, date-bounded join — never aggregate-scans the giant odds +tables (prod runs on a Raspberry Pi). + +Usage: + python scripts/calibration_scoreboard.py [--days 365] [--buckets 10] +""" + +from __future__ import annotations + +import argparse +import os +import sys +import time +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +import psycopg2 # noqa: E402 +from psycopg2.extras import RealDictCursor # noqa: E402 + +from data.db import get_clean_dsn # noqa: E402 +from models.market_anchor import apply_corrections # noqa: E402 +from models.score_matrix import build_calibrated_score_package # noqa: E402 + +REAL_ODDS_MIN_OVERROUND = 0.05 # the user's hard rule: no real odds -> excluded + + +def _fetch_settled_matches(days: int) -> List[Dict[str, Any]]: + """Finished, real-odds matches with stored de-vigged implied probs.""" + since_ms = int((time.time() - days * 86400) * 1000) + sql = """ + SELECT f.implied_home, f.implied_draw, f.implied_away, + f.implied_over25, f.implied_btts_yes, f.odds_overround, + m.score_home, m.score_away, m.ht_score_home, m.ht_score_away + FROM football_ai_features f + JOIN matches m ON m.id = f.match_id + WHERE m.sport = 'football' + AND m.winner IN ('home', 'away', 'draw') + AND m.score_home IS NOT NULL + AND f.odds_overround > %s + AND m.mst_utc >= %s + """ + rows: List[Dict[str, Any]] = [] + with psycopg2.connect(get_clean_dsn()) as conn: + with conn.cursor() as cur: + cur.execute("SET statement_timeout = '120s'") + # server-side (named) cursor: streams gently instead of one big fetch + with conn.cursor("scoreboard_stream", cursor_factory=RealDictCursor) as cur: + cur.itersize = 5000 + cur.execute(sql, (REAL_ODDS_MIN_OVERROUND, since_ms)) + for r in cur: + rows.append(dict(r)) + return rows + + +def _anchored_probs(row: Dict[str, Any]) -> Optional[Tuple[float, float, float]]: + """The MS vector the V35 pipeline would display (devig is already done in + the stored features; apply the active home-favourite correction).""" + try: + p1 = float(row["implied_home"]); px = float(row["implied_draw"]); p2 = float(row["implied_away"]) + except (TypeError, ValueError): + return None + if not (0.0 < p1 < 1.0 and 0.0 < px < 1.0 and 0.0 < p2 < 1.0): + return None + if abs(p1 + px + p2 - 1.0) > 0.02: # not a clean de-vigged vector + return None + return apply_corrections(p1, px, p2) + + +class Reliability: + """Accumulates (stated probability, outcome) pairs into buckets.""" + + def __init__(self, n_buckets: int) -> None: + self.n_buckets = n_buckets + self.n = defaultdict(int) + self.sum_p = defaultdict(float) + self.sum_y = defaultdict(int) + + def add(self, p: float, hit: bool) -> None: + b = min(self.n_buckets - 1, int(p * self.n_buckets)) + self.n[b] += 1 + self.sum_p[b] += p + self.sum_y[b] += 1 if hit else 0 + + def report(self, title: str) -> Tuple[float, float]: + total = sum(self.n.values()) + if not total: + print(f"\n== {title}: no data ==") + return 0.0, 0.0 + ece = 0.0 + brier_num = 0.0 + print(f"\n== {title} (n={total}) ==") + print(f"{'band':>10} {'n':>8} {'said%':>8} {'actual%':>8} {'gap_pt':>7}") + for b in sorted(self.n): + n = self.n[b] + said = self.sum_p[b] / n + act = self.sum_y[b] / n + ece += n * abs(said - act) + print(f"{b / self.n_buckets:>5.2f}-{(b + 1) / self.n_buckets:<4.2f} " + f"{n:>8} {100 * said:>8.1f} {100 * act:>8.1f} {100 * (act - said):>7.1f}") + ece /= total + # Brier from bucket stats is approximate; recompute exactly elsewhere + # if needed. ECE is the headline honesty metric here. + print(f"{'ECE':>10}: {100 * ece:.2f}%") + return ece, brier_num + + +def grade_pipeline(rows: List[Dict[str, Any]], n_buckets: int) -> None: + ms1 = Reliability(n_buckets); msx = Reliability(n_buckets); ms2 = Reliability(n_buckets) + ou = Reliability(n_buckets); btts = Reliability(n_buckets) + top1 = top5 = ht1 = 0 + stated_modal = 0.0 + n_score = 0 + + for r in rows: + anch = _anchored_probs(r) + sh, sa = int(r["score_home"]), int(r["score_away"]) + winner = "home" if sh > sa else "away" if sa > sh else "draw" + if anch is not None: + p1, px, p2 = anch + ms1.add(p1, winner == "home") + msx.add(px, winner == "draw") + ms2.add(p2, winner == "away") + # exactly-0.5 values are DEFAULT FILL for matches without a real OU/BTTS + # market (measured: 15,993 of 78k OU rows) — never grade or use them. + try: + po = float(r["implied_over25"]) + if po == 0.5 or not (0.05 < po < 0.95): + po = None + else: + ou.add(po, sh + sa >= 3) + except (TypeError, ValueError): + po = None + try: + pb = float(r["implied_btts_yes"]) + if pb != 0.5 and 0.05 < pb < 0.95: + btts.add(pb, sh > 0 and sa > 0) + except (TypeError, ValueError): + pass + + # V36 score card (sampled fully — pure math, no I/O) + if anch is not None and po is not None and 0.05 < po < 0.95: + pkg = build_calibrated_score_package(*anch, po) + actual = f"{min(sh, 10)}-{min(sa, 10)}" + n_score += 1 + stated_modal += float(pkg["scenario_top5"][0]["prob"]) + if pkg["ft"] == actual: + top1 += 1 + if actual in [d["score"] for d in pkg["scenario_top5"]]: + top5 += 1 + hh, ha = r.get("ht_score_home"), r.get("ht_score_away") + if hh is not None and ha is not None and pkg["ht"] == f"{min(int(hh),10)}-{min(int(ha),10)}": + ht1 += 1 + + ms1.report("MS ev (1) — anchored pipeline") + msx.report("MS beraberlik (X) — anchored pipeline") + ms2.report("MS deplasman (2) — anchored pipeline") + ou.report("Ust/Alt 2.5 (over) — devig") + btts.report("KG Var — devig") + + if n_score: + print(f"\n== V36 skor karti (n={n_score}) ==") + print(f" modal skor isabeti : {100 * top1 / n_score:.1f}% (soylenen: {100 * stated_modal / n_score:.1f}%)") + print(f" top-5 kapsama : {100 * top5 / n_score:.1f}%") + print(f" IY modal isabeti : {100 * ht1 / n_score:.1f}%") + + +def grade_stored_runs() -> None: + """Settle prediction_runs main_pick stated probabilities per engine_version. + `.sim-finished` buckets (manual finished-match tests) report separately.""" + sql = """ + SELECT pr.engine_version, + pr.payload_summary->'main_pick'->>'market' AS market, + pr.payload_summary->'main_pick'->>'pick' AS pick, + COALESCE((pr.payload_summary->'main_pick'->>'calibrated_probability')::float, + (pr.payload_summary->'main_pick'->>'probability')::float) AS p, + m.score_home AS sh, m.score_away AS sa, m.winner AS w + FROM prediction_runs pr + JOIN matches m ON m.id = pr.match_id + WHERE m.score_home IS NOT NULL + AND jsonb_typeof(pr.payload_summary->'main_pick') = 'object' + """ + with psycopg2.connect(get_clean_dsn()) as conn: + with conn.cursor() as cur: + cur.execute("SET statement_timeout = '60s'") + with conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute(sql) + rows = cur.fetchall() + + def settle(market: str, pick: str, sh: int, sa: int, w: str) -> Optional[bool]: + total = sh + sa + pick_u = (pick or "").upper() + over = "UST" in pick_u.replace("Ü", "U") or "OVER" in pick_u + if market == "MS": + return {"1": w == "home", "X": w == "draw", "2": w == "away"}.get(pick) + if market in ("OU15", "OU25", "OU35"): + line = {"OU15": 1.5, "OU25": 2.5, "OU35": 3.5}[market] + return total > line if over else total < line + if market == "BTTS": + yes = "VAR" in pick_u or "YES" in pick_u + return (sh > 0 and sa > 0) if yes else not (sh > 0 and sa > 0) + return None + + stats: Dict[str, List[Tuple[float, bool]]] = defaultdict(list) + for r in rows: + if r["p"] is None: + continue + hit = settle(str(r["market"]), str(r["pick"]), int(r["sh"]), int(r["sa"]), str(r["w"])) + if hit is None: + continue + stats[str(r["engine_version"])].append((float(r["p"]), bool(hit))) + + print("\n== prediction_runs karnesi (main_pick, soylenen vs olan) ==") + print(f"{'engine_version':<34} {'n':>5} {'said%':>8} {'actual%':>8}") + for ver in sorted(stats): + pairs = stats[ver] + n = len(pairs) + said = sum(p for p, _ in pairs) / n + act = sum(1 for _, h in pairs if h) / n + tag = " <- test kovasi" if ver.endswith(".sim-finished") else "" + print(f"{ver:<34} {n:>5} {100 * said:>8.1f} {100 * act:>8.1f}{tag}") + if not stats: + print(" (settle edilebilir kayit yok)") + + +def main() -> None: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--days", type=int, default=365, help="lookback window (days)") + ap.add_argument("--buckets", type=int, default=10) + args = ap.parse_args() + + t0 = time.time() + rows = _fetch_settled_matches(args.days) + print(f"settled real-odds matches loaded: {len(rows)} (last {args.days} days, " + f"{time.time() - t0:.1f}s)") + if rows: + grade_pipeline(rows, args.buckets) + grade_stored_runs() + + +if __name__ == "__main__": + main() diff --git a/ai-engine/scripts/fit_anchor_corrections.py b/ai-engine/scripts/fit_anchor_corrections.py new file mode 100644 index 0000000..d85e7c8 --- /dev/null +++ b/ai-engine/scripts/fit_anchor_corrections.py @@ -0,0 +1,255 @@ +"""Guarded self-correction loop — fits the market-anchor correction table. + +What it does (the "tablo üreteci" of the feedback loop): + + 1. MEASURE: on settled real-odds matches, per implied-probability band, the + gap between the RAW de-vigged probability and the actual rate — for BOTH + the home side (ms_home) and the away side (ms_away). + 2. BRAKE: a band only earns a correction if it passes the safety gates — + * min sample (>= MIN_N matches in the band, fitted on TRAIN window) + * shrinkage (delta = SHRINK x measured gap — never the full gap) + * clipping (|delta| <= CLIP) + * materiality (|delta| >= MIN_DELTA, else 0 — don't chase noise) + 3. PROVE: the candidate table must beat the CURRENTLY ACTIVE corrections + out-of-sample (most recent TEST_DAYS, never seen during fitting) on + combined home+away ECE. If it doesn't, nothing is written. + 4. WRITE: versioned artifact `config/market_anchor_corrections.json` + (+ timestamped copy under `config/history/`). The engine reads the table + at runtime (models/market_anchor.py) — the loop never modifies code. + +Run weekly (cron) or manually after big data ingests: + python scripts/fit_anchor_corrections.py [--days 540] [--test-days 90] + python scripts/fit_anchor_corrections.py --dry-run # measure only +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import sys +import time +from collections import defaultdict +from typing import Any, Callable, Dict, List, Optional, Tuple + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +import psycopg2 # noqa: E402 +from psycopg2.extras import RealDictCursor # noqa: E402 + +from data.db import get_clean_dsn # noqa: E402 +from models.market_anchor import ( # noqa: E402 + away_favorite_delta, + home_favorite_delta, +) + +# ── safety gates ───────────────────────────────────────────────────── +MIN_N = 1500 # band needs this many TRAIN matches to earn a correction +SHRINK = 0.5 # apply only half of the measured gap +CLIP = 0.05 # never correct more than 5 points +MIN_DELTA = 0.004 # below this the correction is noise — emit 0 +ACCEPT_MARGIN = 0.0002 # candidate must beat active combined ECE by this + +BANDS: Tuple[Tuple[float, float], ...] = ( + (0.05, 0.15), (0.15, 0.25), (0.25, 0.35), (0.35, 0.45), + (0.45, 0.55), (0.55, 0.65), (0.65, 0.75), (0.75, 0.85), (0.85, 1.01), +) + +REAL_ODDS_MIN_OVERROUND = 0.05 + + +def fetch(days: int) -> List[Dict[str, Any]]: + since_ms = int((time.time() - days * 86400) * 1000) + sql = """ + SELECT f.implied_home AS p1, f.implied_draw AS px, f.implied_away AS p2, + m.mst_utc, + (m.winner = 'home')::int AS home_won, + (m.winner = 'away')::int AS away_won + FROM football_ai_features f + JOIN matches m ON m.id = f.match_id + WHERE m.sport = 'football' + AND m.winner IN ('home', 'away', 'draw') + AND f.odds_overround > %s + AND m.mst_utc >= %s + """ + out: List[Dict[str, Any]] = [] + with psycopg2.connect(get_clean_dsn()) as conn: + with conn.cursor() as cur: + cur.execute("SET statement_timeout = '120s'") + with conn.cursor("fit_stream", cursor_factory=RealDictCursor) as cur: + cur.itersize = 5000 + cur.execute(sql, (REAL_ODDS_MIN_OVERROUND, since_ms)) + for r in cur: + p1, px, p2 = r["p1"], r["px"], r["p2"] + if p1 is None or px is None or p2 is None: + continue + if abs(float(p1) + float(px) + float(p2) - 1.0) > 0.02: + continue + out.append({ + "p1": float(p1), "p2": float(p2), + "y1": int(r["home_won"]), "y2": int(r["away_won"]), + "mst_utc": int(r["mst_utc"]), + }) + return out + + +def band_of(p: float) -> Optional[int]: + for i, (lo, hi) in enumerate(BANDS): + if lo <= p < hi: + return i + return None + + +def fit_candidate( + train: List[Dict[str, Any]], pkey: str, ykey: str +) -> List[Dict[str, Any]]: + n = defaultdict(int); sp = defaultdict(float); sy = defaultdict(int) + for r in train: + b = band_of(r[pkey]) + if b is None: + continue + n[b] += 1; sp[b] += r[pkey]; sy[b] += r[ykey] + bands: List[Dict[str, Any]] = [] + for i, (lo, hi) in enumerate(BANDS): + if n[i] < MIN_N: + continue # gate: not enough evidence — no correction for this band + raw_gap = (sy[i] / n[i]) - (sp[i] / n[i]) + delta = max(-CLIP, min(CLIP, SHRINK * raw_gap)) + if abs(delta) < MIN_DELTA: + delta = 0.0 + bands.append({"lo": lo, "hi": hi, "delta": round(delta, 4), + "n": n[i], "raw_gap": round(raw_gap, 4)}) + return bands + + +def table_delta_fn(table: List[Dict[str, Any]]) -> Callable[[float], float]: + def fn(p: float) -> float: + for b in table: + if b["lo"] <= p < b["hi"]: + return b["delta"] + return 0.0 + return fn + + +def ece(rows: List[Dict[str, Any]], pkey: str, ykey: str, + delta_fn: Callable[[float], float]) -> float: + n = defaultdict(int); sp = defaultdict(float); sy = defaultdict(int) + for r in rows: + pc = min(0.98, r[pkey] + delta_fn(r[pkey])) + b = min(19, int(pc * 20)) + n[b] += 1; sp[b] += pc; sy[b] += r[ykey] + total = sum(n.values()) + if not total: + return 0.0 + return sum(n[b] * abs(sp[b] / n[b] - sy[b] / n[b]) for b in n) / total + + +def print_bands(title: str, bands: List[Dict[str, Any]]) -> None: + print(f"\ncandidate bands — {title} (after gates):") + print(f"{'band':>12} {'n':>8} {'raw_gap_pt':>11} {'delta_pt':>9}") + for b in bands: + print(f"{b['lo']:>5.2f}-{b['hi']:<5.2f} {b['n']:>8} " + f"{100 * b['raw_gap']:>11.2f} {100 * b['delta']:>9.2f}") + + +def main() -> None: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--days", type=int, default=540, help="total lookback") + ap.add_argument("--test-days", type=int, default=90, + help="most recent window held out for acceptance") + ap.add_argument("--dry-run", action="store_true", + help="measure and report only — never write") + args = ap.parse_args() + + rows = fetch(args.days) + cutoff_ms = int((time.time() - args.test_days * 86400) * 1000) + train = [r for r in rows if r["mst_utc"] < cutoff_ms] + test = [r for r in rows if r["mst_utc"] >= cutoff_ms] + print(f"matches: total={len(rows)} train={len(train)} test(OOS)={len(test)}") + if len(train) < 10 * MIN_N or len(test) < 2000: + print("ABORT: not enough data for a safe fit — keeping active table.") + return + + cand_home = fit_candidate(train, "p1", "y1") + cand_away = fit_candidate(train, "p2", "y2") + print_bands("ms_home", cand_home) + print_bands("ms_away", cand_away) + + # active = whatever the engine currently loads (artifact or fallback) + ece_h_act = ece(test, "p1", "y1", home_favorite_delta) + ece_a_act = ece(test, "p2", "y2", away_favorite_delta) + ece_h_cand = ece(test, "p1", "y1", table_delta_fn(cand_home)) + ece_a_cand = ece(test, "p2", "y2", table_delta_fn(cand_away)) + ece_h_raw = ece(test, "p1", "y1", lambda _p: 0.0) + ece_a_raw = ece(test, "p2", "y2", lambda _p: 0.0) + + print(f"\nOOS ECE (home/away/combined):") + print(f" raw (devig only) : {100 * ece_h_raw:.3f}% / {100 * ece_a_raw:.3f}% " + f"/ {100 * (ece_h_raw + ece_a_raw):.3f}%") + print(f" ACTIVE table : {100 * ece_h_act:.3f}% / {100 * ece_a_act:.3f}% " + f"/ {100 * (ece_h_act + ece_a_act):.3f}%") + print(f" CANDIDATE table : {100 * ece_h_cand:.3f}% / {100 * ece_a_cand:.3f}% " + f"/ {100 * (ece_h_cand + ece_a_cand):.3f}%") + + if args.dry_run: + print("\n(dry-run: nothing written)") + return + + combined_act = ece_h_act + ece_a_act + combined_cand = ece_h_cand + ece_a_cand + if combined_cand > combined_act - ACCEPT_MARGIN: + print("\nREJECTED: candidate does not beat the active table " + "out-of-sample. Active corrections stay. (Bu fren tasarim geregi:" + " kanitlayamayan tablo yazilmaz.)") + return + + cfg_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "config")) + path = os.path.join(cfg_dir, "market_anchor_corrections.json") + artifact = { + "version": time.strftime("%Y-%m-%dT%H:%M:%S"), + "fitted_on": {"days": args.days, "test_days": args.test_days, + "n_train": len(train), "n_test": len(test)}, + "validated": { + "ece_home": {"raw": round(ece_h_raw, 5), "active_before": round(ece_h_act, 5), + "candidate_oos": round(ece_h_cand, 5)}, + "ece_away": {"raw": round(ece_a_raw, 5), "active_before": round(ece_a_act, 5), + "candidate_oos": round(ece_a_cand, 5)}, + }, + "gates": {"min_n": MIN_N, "shrink": SHRINK, "clip": CLIP, + "min_delta": MIN_DELTA}, + "corrections": {"ms_home": cand_home, "ms_away": cand_away}, + } + hist_dir = os.path.join(cfg_dir, "history") + os.makedirs(hist_dir, exist_ok=True) + if os.path.exists(path): + shutil.copy2(path, os.path.join( + hist_dir, f"market_anchor_corrections-{int(time.time())}.json")) + with open(path, "w", encoding="utf-8") as fh: + json.dump(artifact, fh, ensure_ascii=False, indent=2) + print(f"\nACCEPTED: wrote {path}") + + # The deployed ai-engine container has NO volume mounts, so the file above + # is invisible to it — app_settings is the shared medium. Running engines + # re-read it within ~10 minutes (TTL in models/market_anchor.py). + try: + with psycopg2.connect(get_clean_dsn()) as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO app_settings (key, value, updated_at) + VALUES ('market_anchor_corrections', %s, now()) + ON CONFLICT (key) + DO UPDATE SET value = EXCLUDED.value, updated_at = now() + """, + (json.dumps(artifact, ensure_ascii=False),), + ) + conn.commit() + print("ACCEPTED: upserted app_settings['market_anchor_corrections'] " + "(live engines refresh within ~10 min)") + except Exception as exc: # file artifact still written — warn only + print(f"WARN: app_settings upsert failed: {exc}") + + +if __name__ == "__main__": + main() diff --git a/ai-engine/services/orchestrator/market_board.py b/ai-engine/services/orchestrator/market_board.py index 5f7e907..49de1ca 100644 --- a/ai-engine/services/orchestrator/market_board.py +++ b/ai-engine/services/orchestrator/market_board.py @@ -57,8 +57,9 @@ from utils.top_leagues import load_top_league_ids from utils.league_reliability import load_league_reliability from config.config_loader import build_threshold_dict, get_threshold_default from models.calibration import get_calibrator, get_final_recalibrator -from models.market_anchor import devig, apply_home_correction +from models.market_anchor import devig, apply_corrections from models.score_matrix import build_calibrated_score_package +from models.live_matrix import build_live_projection, estimate_minute # ── V30: Post-calibration trust factors ───────────────────────────── # Controls how much to trust isotonic calibrator vs raw model output. @@ -372,6 +373,12 @@ class MarketBoardMixin: # 13.1%, top-5 coverage 51%, per-score gaps <1.2pt. cal_score = self._build_calibrated_score(market_board) + # V38: while the match is LIVE, also project score/minute-conditioned + # probabilities (P(side scores again), live 1X2, comeback, scenarios). + # OOS-validated on 70,410 reconstructed live moments: ECE 0.5-0.8%; + # "one-goal lead at 80'" case: said 21.7% vs actual 23.0%. + live_projection = self._build_live_projection(market_board, data) + # Determine simulation mode for the response _resp_status = str(data.status or "").upper() _resp_state = str(data.state or "").upper() @@ -446,6 +453,9 @@ class MarketBoardMixin: } ), "market_board": market_board, + # V38: score/minute-aware live probabilities (None when not live or + # no real odds). FE can render "deplasman gol atar: %X / dönme: %Y". + "live_projection": live_projection, "others": { "handicap": prediction.handicap_pick, "cards": { @@ -1115,10 +1125,10 @@ class MarketBoardMixin: val = self._real_market_odds(odds, key) return val if val > 1.01 else None - # MS (3-way) + home-favourite correction; DC derived from the same vector + # MS (3-way) + favourite corrections; DC derived from the same vector ms = devig([real("ms_h"), real("ms_d"), real("ms_a")]) if ms is not None: - p1, px, p2 = apply_home_correction(*ms) + p1, px, p2 = apply_corrections(*ms) if "MS" in market_board: self._set_board(market_board, "MS", {"1": p1, "X": px, "2": p2}) if "DC" in market_board: @@ -1305,6 +1315,43 @@ class MarketBoardMixin: "scenario_top5": pkg["scenario_top5"], } + def _build_live_projection( + self, + market_board: Dict[str, Any], + data: MatchData, + ) -> Optional[Dict[str, Any]]: + """V38: score/minute-conditioned live projection from the anchored + probabilities. None unless the match is live, both MS and OU25 were + anchored (real odds) and a minute estimate exists. Same kill-switch.""" + if os.environ.get("MARKET_ANCHOR_CAL", "1") == "0": + return None + if not self._is_live_match(data): + return None + ms = market_board.get("MS") or {} + ou = market_board.get("OU25") or {} + if ( + ms.get("calibration_source") != "market_anchor_v35" + or ou.get("calibration_source") != "market_anchor_v35" + ): + return None + minute = estimate_minute( + getattr(data, "match_date_ms", None), int(time.time() * 1000) + ) + if minute is None: + return None + try: + return build_live_projection( + float(ms["probs"]["1"]), + float(ms["probs"]["X"]), + float(ms["probs"]["2"]), + float(ou["probs"]["over"]), + int(data.current_score_home or 0), + int(data.current_score_away or 0), + minute, + ) + except (KeyError, TypeError, ValueError, ZeroDivisionError, OverflowError): + return None + def _build_market_rows( self, data: MatchData, diff --git a/ai-engine/tests/test_live_matrix.py b/ai-engine/tests/test_live_matrix.py new file mode 100644 index 0000000..e34af65 --- /dev/null +++ b/ai-engine/tests/test_live_matrix.py @@ -0,0 +1,80 @@ +"""Unit tests for V38 live-conditioned projection (pure, no DB/model deps).""" + +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from models.live_matrix import ( + build_live_projection, + estimate_minute, + state_multiplier, +) + + +def _approx(a, b, tol=1e-6): + return abs(a - b) <= tol + + +def test_probs_form_distribution(): + proj = build_live_projection(0.50, 0.27, 0.23, 0.55, 1, 0, 60) + p = proj["probs"] + assert _approx(p["1"] + p["X"] + p["2"], 1.0, 1e-3) + assert 0.0 <= proj["p_away_scores_again"] <= 1.0 + + +def test_minute_one_roughly_matches_prematch(): + # at 0-0 minute 1 the projection must stay close to the anchored numbers + proj = build_live_projection(0.50, 0.27, 0.23, 0.55, 0, 0, 1) + assert abs(proj["probs"]["1"] - 0.50) < 0.06 + assert abs(proj["probs"]["2"] - 0.23) < 0.06 + + +def test_one_goal_lead_at_80(): + # the user's exact case: 1-0 at 80' (OOS-validated: said 21.7 / actual 23.0) + proj = build_live_projection(0.50, 0.27, 0.23, 0.55, 1, 0, 80) + assert proj["probs"]["1"] > 0.72 # leader is now strong fav + assert 0.08 <= proj["p_away_scores_again"] <= 0.30 + assert _approx( + proj["p_comeback"], proj["probs"]["X"] + proj["probs"]["2"], 1e-9 + ) + + +def test_less_time_means_fewer_chances(): + early = build_live_projection(0.50, 0.27, 0.23, 0.55, 1, 0, 60) + late = build_live_projection(0.50, 0.27, 0.23, 0.55, 1, 0, 85) + assert late["p_away_scores_again"] < early["p_away_scores_again"] + assert late["probs"]["1"] > early["probs"]["1"] + + +def test_trailing_team_pushes_late(): + assert state_multiplier(-1, 80) > 1.05 # trailing by one, late: pushes + assert state_multiplier(1, 80) < 1.0 # leading by one, late: parks bus + assert state_multiplier(-1, 80) > state_multiplier(-1, 30) + + +def test_score_consistency_with_current_score(): + proj = build_live_projection(0.50, 0.27, 0.23, 0.55, 2, 1, 75) + # every scenario must be reachable from the current score + for s in proj["scenario_top5"]: + fh, fa = map(int, str(s["score"]).split("-")) + assert fh >= 2 and fa >= 1 + assert proj["current_score"] == "2-1" + + +def test_estimate_minute_approximation(): + now = 1_700_000_000_000 + assert estimate_minute(None, now) is None + assert estimate_minute(now + 60_000, now) is None # not kicked off + assert estimate_minute(now - 30 * 60_000, now) == 30 # mid 1H + assert estimate_minute(now - 55 * 60_000, now) == 46 # HT break + assert estimate_minute(now - 80 * 60_000, now) == 65 # 2H, break folded + assert estimate_minute(now - 200 * 60_000, now) == 94 # capped + + +if __name__ == "__main__": + fns = [v for k, v in sorted(globals().items()) if k.startswith("test_")] + for fn in fns: + fn() + print(f"PASS {fn.__name__}") + print(f"\nAll {len(fns)} tests passed.") diff --git a/ai-engine/tests/test_market_anchor.py b/ai-engine/tests/test_market_anchor.py index c79a9ea..d1c1059 100644 --- a/ai-engine/tests/test_market_anchor.py +++ b/ai-engine/tests/test_market_anchor.py @@ -5,6 +5,9 @@ import sys sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) +# tests must be deterministic: never consult the DB source for corrections +os.environ["MARKET_ANCHOR_DB"] = "0" + from models.market_anchor import devig, home_favorite_delta, apply_home_correction @@ -51,6 +54,83 @@ def test_apply_home_correction_keeps_distribution_valid(): assert _approx(q[0], 0.30) +def test_corrections_artifact_loaded_and_fallback(): + import json + import tempfile + from models import market_anchor as ma + + # 1) valid artifact -> values come from the file + with tempfile.NamedTemporaryFile( + "w", suffix=".json", delete=False, encoding="utf-8" + ) as fh: + json.dump( + {"version": "test", "corrections": {"ms_home": [ + {"lo": 0.60, "hi": 0.70, "delta": 0.042}, + ]}}, + fh, + ) + path = fh.name + try: + os.environ["MARKET_ANCHOR_CORRECTIONS_PATH"] = path + ma.reload_corrections() + assert _approx(ma.home_favorite_delta(0.65), 0.042) + # band not in the artifact -> the STATIC PRIOR applies (silence must + # not erase proven knowledge); 0.45-0.55 static prior is 0.010 + assert _approx(ma.home_favorite_delta(0.50), 0.010) + + # 2) malformed artifact -> static fallback, never crashes + with open(path, "w", encoding="utf-8") as fh2: + fh2.write("{not json") + ma.reload_corrections() + assert ma.home_favorite_delta(0.65) > 0.0 # fallback band value + assert _approx(ma.home_favorite_delta(0.65), 0.028) + finally: + os.environ.pop("MARKET_ANCHOR_CORRECTIONS_PATH", None) + ma.reload_corrections() + os.unlink(path) + + +def test_away_corrections_only_from_artifact(): + import json + import tempfile + from models import market_anchor as ma + + # without an artifact: away correction must be ZERO (earned, not assumed). + # (Point the env path at a nonexistent file: the repo now SHIPS a fitted + # artifact, so "no artifact" must be simulated explicitly.) + os.environ["MARKET_ANCHOR_CORRECTIONS_PATH"] = os.path.join( + os.path.dirname(__file__), "does_not_exist.json" + ) + ma.reload_corrections() + assert ma.away_favorite_delta(0.65) == 0.0 + base = ma.apply_corrections(0.20, 0.20, 0.60) + assert _approx(base[2], 0.60) # away untouched without artifact + + with tempfile.NamedTemporaryFile( + "w", suffix=".json", delete=False, encoding="utf-8" + ) as fh: + json.dump( + {"version": "t2", "corrections": { + "ms_home": [{"lo": 0.45, "hi": 0.55, "delta": 0.010}], + "ms_away": [{"lo": 0.55, "hi": 0.65, "delta": 0.020}], + }}, + fh, + ) + path = fh.name + try: + os.environ["MARKET_ANCHOR_CORRECTIONS_PATH"] = path + ma.reload_corrections() + assert _approx(ma.away_favorite_delta(0.60), 0.020) + p1, px, p2 = ma.apply_corrections(0.20, 0.20, 0.60) + assert p2 > 0.60 # away favourite lifted + assert _approx(p1 + px + p2, 1.0) # still a valid distribution + assert p1 < 0.20 and px < 0.20 # others renormalised down + finally: + os.environ.pop("MARKET_ANCHOR_CORRECTIONS_PATH", None) + ma.reload_corrections() + os.unlink(path) + + if __name__ == "__main__": fns = [v for k, v in sorted(globals().items()) if k.startswith("test_")] for fn in fns: diff --git a/src/modules/predictions/predictions.service.ts b/src/modules/predictions/predictions.service.ts index c94df41..7eb249f 100755 --- a/src/modules/predictions/predictions.service.ts +++ b/src/modules/predictions/predictions.service.ts @@ -1525,6 +1525,37 @@ export class PredictionsService implements OnModuleInit, OnModuleDestroy { payload: MatchPredictionDto, ): Promise { try { + // Finished-match re-analyses (manual validation runs) must not pollute + // the forward track record: they would bias settlement ROI, the + // per-league karne and engine-version comparisons. Tag them into their + // own engine_version bucket so every GROUP BY engine_version isolates + // them automatically — the data is kept, the live karne stays clean. + const auditMatch = await this.prisma.match.findUnique({ + where: { id: matchId }, + select: { + state: true, + status: true, + scoreHome: true, + scoreAway: true, + mstUtc: true, + }, + }); + const kickoffMs = + auditMatch?.mstUtc != null ? Number(auditMatch.mstUtc) : null; + const kickoffLongPast = + kickoffMs !== null && Date.now() - kickoffMs > 3 * 60 * 60 * 1000; + const isCompletedRun = + isMatchCompleted({ + state: auditMatch?.state ?? null, + status: auditMatch?.status ?? null, + scoreHome: auditMatch?.scoreHome, + scoreAway: auditMatch?.scoreAway, + }) || kickoffLongPast; + const baseVersion = String(payload.model_version || "unknown"); + const engineVersion = isCompletedRun + ? `${baseVersion}.sim-finished` + : baseVersion; + const oddsSnapshot = await this.getPredictionOddsSnapshot(matchId); const payloadSummary = this.buildPredictionPayloadSummary(payload); await this.prisma.$executeRawUnsafe( @@ -1539,7 +1570,7 @@ export class PredictionsService implements OnModuleInit, OnModuleDestroy { VALUES ($1, $2, $3, $4::jsonb, $5::jsonb) `, matchId, - String(payload.model_version || "unknown"), + engineVersion, typeof payload.decision_trace_id === "string" ? payload.decision_trace_id : null,