Files
iddaai-be/ai-engine/scripts/diagnostic_backtest.py
T
fahricansecer 659110c806
Deploy Iddaai Backend / build-and-deploy (push) Successful in 4m32s
Update handoff doc + add backtest checkpoint/resume
2026-05-25 22:29:05 +03:00

739 lines
28 KiB
Python

"""
Diagnostic Backtest
===================
Run the full V28 orchestrator (in-process — no HTTP) on a window of completed
matches, capture the recommendation + key signal features + the actual outcome,
and produce a *diagnostic* report: not just "what was the hit rate" but
"which feature clusters drive the losing bets".
Outputs:
- reports/diagnostic_backtest_YYYYMMDD.csv (per-bet detail)
- reports/diagnostic_backtest_YYYYMMDD.json (aggregate metrics)
- reports/diagnostic_backtest_YYYYMMDD.txt (human-readable summary)
Usage:
python scripts/diagnostic_backtest.py --days 14 --max-matches 2000
python scripts/diagnostic_backtest.py --start 2026-05-10 --end 2026-05-24
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import traceback
from collections import defaultdict, Counter
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
import psycopg2
from psycopg2.extras import RealDictCursor
# Path bootstrap so we can import the ai-engine package from anywhere
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
AI_ENGINE_DIR = os.path.dirname(SCRIPT_DIR)
sys.path.insert(0, AI_ENGINE_DIR)
from data.db import get_clean_dsn
from services.single_match_orchestrator import get_single_match_orchestrator
REPORTS_DIR = os.path.join(AI_ENGINE_DIR, "reports")
os.makedirs(REPORTS_DIR, exist_ok=True)
# Days with confirmed feeder gaps — exclude from sample
EXCLUDED_DATES = {"2026-05-03", "2026-04-29"}
# ── Outcome resolution ────────────────────────────────────────────────
def _norm_pick(pick: Optional[str]) -> str:
return str(pick or "").strip().casefold()
def resolve_outcome(market: str, pick: str, sh: int, sa: int,
htsh: Optional[int], htsa: Optional[int]) -> Optional[bool]:
"""Mirror of prediction-settlement.market-resolver.ts (TS side).
Returns True/False on settle, None if cannot resolve."""
m = (market or "").upper().replace(" ", "").replace("-", "_")
p = _norm_pick(pick)
if m in ("MS", "ML", "1X2"):
outcome = "1" if sh > sa else "2" if sa > sh else "x"
return p in {outcome, outcome.upper(), outcome.lower(), "0" if outcome == "x" else outcome}
if m in ("HT", "IY"):
if htsh is None or htsa is None:
return None
outcome = "1" if htsh > htsa else "2" if htsa > htsh else "x"
return p in {outcome, "0" if outcome == "x" else outcome}
if m in ("OU05", "OU15", "OU25", "OU35", "OU45", "TOTAL"):
line = {"OU05": 0.5, "OU15": 1.5, "OU25": 2.5, "OU35": 3.5,
"OU45": 4.5, "TOTAL": 2.5}[m]
total = sh + sa
if total == line:
return None
is_over = total > line
if "over" in p or "üst" in p or "ust" in p:
return is_over
if "alt" in p or "under" in p:
return not is_over
return None
if m in ("OU05_HT", "OU15_HT", "OU25_HT", "HT_OU05", "HT_OU15", "HT_OU25"):
if htsh is None or htsa is None:
return None
line = {"OU05_HT": 0.5, "OU15_HT": 1.5, "OU25_HT": 2.5,
"HT_OU05": 0.5, "HT_OU15": 1.5, "HT_OU25": 2.5}[m]
total = htsh + htsa
if total == line:
return None
is_over = total > line
if "over" in p or "üst" in p or "ust" in p:
return is_over
if "alt" in p or "under" in p:
return not is_over
return None
if m in ("BTTS", "KG"):
both = sh > 0 and sa > 0
if "yes" in p or "var" in p:
return both
if "no" in p or "yok" in p:
return not both
return None
if m in ("HTFT", "IYMS"):
if htsh is None or htsa is None or "/" not in p:
return None
ht_p, ft_p = p.split("/", 1)
ht_actual = "1" if htsh > htsa else "2" if htsa > htsh else "x"
ft_actual = "1" if sh > sa else "2" if sa > sh else "x"
return ht_p.strip() == ht_actual and ft_p.strip() == ft_actual
if m in ("DC", "CIFTE_SANS"):
ft = "1" if sh > sa else "2" if sa > sh else "X"
raw = p.upper().replace("-", "").replace("/", "")
if raw in ("1X", "X1"):
pair = ["1", "X"]
elif raw in ("X2", "2X"):
pair = ["X", "2"]
elif raw in ("12", "21"):
pair = ["1", "2"]
else:
return None
return ft in pair
if m in ("OE", "TEKCIFT"):
is_odd = (sh + sa) % 2 == 1
if "tek" in p or "odd" in p:
return is_odd
if "cift" in p or "çift" in p or "even" in p:
return not is_odd
return None
return None
def compute_unit_profit(won: Optional[bool], stake: float, odds: Optional[float]) -> float:
if won is None:
return 0.0
if not won:
return -abs(stake) if stake else -1.0
if not odds or odds <= 1.0:
return 0.0
return round(stake * (odds - 1.0), 4)
# ── Data fetch ────────────────────────────────────────────────────────
def fetch_match_window(args) -> List[Dict]:
dsn = get_clean_dsn()
if "?schema=" in dsn:
dsn = dsn.split("?schema=")[0]
if args.start and args.end:
start = datetime.strptime(args.start, "%Y-%m-%d")
end = datetime.strptime(args.end, "%Y-%m-%d") + timedelta(days=1)
else:
end = datetime.now(timezone.utc).replace(tzinfo=None)
start = end - timedelta(days=args.days)
start_ms = int(start.timestamp() * 1000)
end_ms = int(end.timestamp() * 1000)
excluded = sorted(EXCLUDED_DATES)
excluded_clause = ""
if excluded:
ex_csv = ",".join(f"'{d}'" for d in excluded)
excluded_clause = (
f" AND to_timestamp(mst_utc/1000)::date "
f"NOT IN ({ex_csv})"
)
with psycopg2.connect(dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
f"""
SELECT id AS match_id,
score_home, score_away,
ht_score_home, ht_score_away,
league_id,
to_timestamp(mst_utc/1000)::date AS match_date
FROM matches
WHERE sport='football'
AND status='FT'
AND score_home IS NOT NULL
AND score_away IS NOT NULL
AND mst_utc >= %s
AND mst_utc < %s
{excluded_clause}
ORDER BY mst_utc DESC
LIMIT %s
""",
(start_ms, end_ms, args.max_matches),
)
return cur.fetchall()
# ── Per-bet capture ───────────────────────────────────────────────────
def capture_bet_row(match: Dict, package: Dict) -> Dict[str, Any]:
"""Distill orchestrator response + ground truth into one analytic row."""
main = package.get("main_pick") or {}
bb = main.get("betting_brain") or {}
advice = package.get("bet_advice") or {}
v27 = package.get("v27_engine") or {}
triple = (v27.get("triple_value") or {})
risk = package.get("risk") or {}
quality = package.get("data_quality") or {}
htft_payload = ((package.get("market_board") or {}).get("HTFT") or {})
htft_probs = htft_payload.get("probs") or {}
sh, sa = match["score_home"], match["score_away"]
htsh, htsa = match["ht_score_home"], match["ht_score_away"]
market = main.get("market")
pick = main.get("pick")
odds_val = _f(main.get("odds"))
stake = _f(main.get("stake_units"), 1.0)
playable = bool(main.get("playable")) and bool(advice.get("playable"))
won = resolve_outcome(market, pick, sh, sa, htsh, htsa) if market and pick else None
profit = compute_unit_profit(won, stake, odds_val) if playable else 0.0
# Reversal context (only meaningful for MS picks)
rev_prob = None
if market == "MS" and pick in ("1", "2"):
if pick == "1":
rev_prob = _f(htft_probs.get("1/2"), 0.0) + _f(htft_probs.get("1/X"), 0.0)
else:
rev_prob = _f(htft_probs.get("2/1"), 0.0) + _f(htft_probs.get("2/X"), 0.0)
return {
"match_id": match["match_id"],
"match_date": str(match["match_date"]),
"league_id": match.get("league_id"),
"score_home": sh,
"score_away": sa,
"ht_score_home": htsh,
"ht_score_away": htsa,
"market": market,
"pick": pick,
"odds": odds_val,
"stake_units": stake,
"playable": playable,
"won": won,
"unit_profit": profit,
"raw_confidence": _f(main.get("raw_confidence")),
"calibrated_confidence": _f(main.get("calibrated_confidence")),
"play_score": _f(main.get("play_score")),
"ev_edge": _f(main.get("ev_edge")),
"bet_grade": main.get("bet_grade"),
"is_value_sniper": bool(main.get("is_value_sniper")),
"bb_score": _f(bb.get("score")),
"bb_action": bb.get("action"),
"bb_vetoes": ";".join(bb.get("vetoes") or []),
"bb_issues": ";".join(bb.get("issues") or []),
"bb_positives": ";".join(bb.get("positives") or []),
"bb_model_prob": _f(bb.get("model_prob")),
"bb_implied_prob": _f(bb.get("implied_prob")),
"bb_model_market_gap": _f(bb.get("model_market_gap")),
"bb_divergence": _f(bb.get("divergence")),
"bb_trap_market": bool(bb.get("trap_market_flag")),
"v27_consensus": v27.get("consensus"),
"data_quality_score": _f(quality.get("score")),
"data_quality_flags": ";".join(quality.get("flags") or []),
"risk_level": (risk.get("level") if isinstance(risk, dict) else None),
"odds_reliability": _f(main.get("odds_reliability")),
"htft_reversal_prob": rev_prob,
"htft_top_pick": _argmax(htft_probs),
"league_name": (package.get("match_info") or {}).get("league_name"),
"is_cup": _is_cup((package.get("match_info") or {}).get("league_name") or ""),
"model_version": package.get("model_version"),
"decision_reason": main.get("pick_reason") or advice.get("reason"),
}
def _f(x: Any, default: Optional[float] = None) -> Optional[float]:
try:
return float(x) if x is not None else default
except (TypeError, ValueError):
return default
def _argmax(d: Dict[str, Any]) -> Optional[str]:
best, val = None, -1.0
for k, v in d.items():
fv = _f(v, 0.0) or 0.0
if fv > val:
best, val = k, fv
return best
_CUP_KEYWORDS = ("kupa", "cup", "coupe", "copa", "coppa", "pokal", "trophy",
"shield", "ziraat", "süper kupa", "super cup", "beker", "taça", "taca")
def _is_cup(name: str) -> bool:
n = (name or "").lower()
return any(kw in n for kw in _CUP_KEYWORDS)
# ── Aggregation helpers ────────────────────────────────────────────────
def _bucket(value: Optional[float], edges: List[float]) -> Optional[str]:
if value is None:
return None
for i, edge in enumerate(edges):
if value < edge:
if i == 0:
return f"<{edge}"
return f"{edges[i-1]}-{edge}"
return f">={edges[-1]}"
def _summary_stats(rows: List[Dict]) -> Dict[str, Any]:
if not rows:
return {"n": 0}
settled = [r for r in rows if r["playable"] and r["won"] is not None]
won = sum(1 for r in settled if r["won"])
lost = sum(1 for r in settled if not r["won"])
profit = sum(float(r["unit_profit"]) for r in settled)
staked = sum(float(r["stake_units"]) for r in settled)
return {
"n_total": len(rows),
"n_playable_settled": len(settled),
"wins": won,
"losses": lost,
"hit_rate_pct": round(100.0 * won / len(settled), 2) if settled else None,
"unit_profit": round(profit, 3),
"staked": round(staked, 3),
"roi_pct": round(100.0 * profit / staked, 2) if staked else None,
}
def aggregate(rows: List[Dict]) -> Dict[str, Any]:
out: Dict[str, Any] = {"overall": _summary_stats(rows)}
by = lambda key_fn: defaultdict(list)
market_buckets = by(None)
conf_buckets = by(None)
odds_buckets = by(None)
grade_buckets = by(None)
cup_buckets = by(None)
motivation_buckets = by(None)
for r in rows:
if r["playable"]:
market_buckets[r["market"] or "?"].append(r)
conf_buckets[_bucket(r["calibrated_confidence"],
[45, 50, 55, 60, 65, 70, 80])].append(r)
odds_buckets[_bucket(r["odds"], [1.3, 1.5, 1.8, 2.2, 3.0, 5.0])].append(r)
grade_buckets[r["bet_grade"] or "?"].append(r)
cup_buckets["cup" if r["is_cup"] else "league"].append(r)
out["by_market"] = {k: _summary_stats(v) for k, v in market_buckets.items()}
out["by_confidence"] = {k: _summary_stats(v) for k, v in conf_buckets.items() if k}
out["by_odds"] = {k: _summary_stats(v) for k, v in odds_buckets.items() if k}
out["by_grade"] = {k: _summary_stats(v) for k, v in grade_buckets.items()}
out["by_competition"] = {k: _summary_stats(v) for k, v in cup_buckets.items()}
return out
def loss_diagnostics(rows: List[Dict]) -> Dict[str, Any]:
losses = [r for r in rows if r["playable"] and r["won"] is False]
if not losses:
return {"n_losses": 0}
n = len(losses)
def share(predicate) -> Tuple[int, float]:
c = sum(1 for r in losses if predicate(r))
return c, round(100.0 * c / n, 2)
diagnostics = {
"n_losses": n,
"total_loss_units": round(sum(float(r["unit_profit"]) for r in losses), 3),
"patterns": {
"high_htft_reversal_prob (>=0.20)": share(
lambda r: (r.get("htft_reversal_prob") or 0) >= 0.20
),
"cup_match": share(lambda r: r["is_cup"]),
"low_league_reliability (<0.45)": share(
lambda r: (r.get("odds_reliability") or 1) < 0.45
),
"v27_disagree": share(lambda r: r.get("v27_consensus") == "DISAGREE"),
"trap_market_flagged": share(lambda r: r.get("bb_trap_market")),
"low_calibrated_conf (<55)": share(
lambda r: (r.get("calibrated_confidence") or 0) < 55
),
"high_odds_underdog (>=2.5)": share(
lambda r: (r.get("odds") or 0) >= 2.5
),
"low_data_quality (<0.55)": share(
lambda r: (r.get("data_quality_score") or 1) < 0.55
),
"high_risk_level": share(
lambda r: r.get("risk_level") in ("HIGH", "EXTREME")
),
"inferred_features": share(
lambda r: "ai_features_inferred_from_history" in (r.get("data_quality_flags") or "")
),
},
"by_market": Counter(r["market"] for r in losses).most_common(),
"by_league": Counter(r.get("league_name") for r in losses).most_common(10),
}
# Top issue tags from betting_brain across losses
issue_counter = Counter()
veto_counter = Counter()
for r in losses:
for tag in (r.get("bb_issues") or "").split(";"):
if tag:
issue_counter[tag] += 1
for tag in (r.get("bb_vetoes") or "").split(";"):
if tag:
veto_counter[tag] += 1
diagnostics["top_bb_issues_in_losses"] = issue_counter.most_common(15)
diagnostics["top_bb_vetoes_in_losses"] = veto_counter.most_common(15)
return diagnostics
# ── Recommendations ────────────────────────────────────────────────────
def make_recommendations(rows: List[Dict], agg: Dict[str, Any],
diag: Dict[str, Any]) -> List[Dict[str, Any]]:
recs: List[Dict[str, Any]] = []
overall = agg.get("overall") or {}
if not overall.get("n_playable_settled"):
return recs
# Cross-reference market hit rate vs overall — flag chronic losers.
overall_hit = overall.get("hit_rate_pct") or 0.0
for market, stats in (agg.get("by_market") or {}).items():
n = stats.get("n_playable_settled") or 0
hit = stats.get("hit_rate_pct")
roi = stats.get("roi_pct")
if n < 30:
continue
if hit is not None and roi is not None and roi < -10 and hit < overall_hit - 10:
recs.append({
"type": "drop_market",
"market": market,
"evidence": f"hit={hit}%, roi={roi}%, n={n} — chronic loser",
"suggested_fix": f"Add veto in betting_brain when market=={market} unless overwhelming evidence",
"estimated_loss_prevented_units": round(-(stats.get("unit_profit") or 0), 2),
})
# Confidence band tuning — flag bands where ROI < 0 despite passing eşik
for band, stats in (agg.get("by_confidence") or {}).items():
n = stats.get("n_playable_settled") or 0
roi = stats.get("roi_pct")
if n >= 40 and roi is not None and roi < -8:
recs.append({
"type": "raise_confidence_threshold",
"confidence_band": band,
"evidence": f"n={n}, roi={roi}%",
"suggested_fix": f"Raise MIN_BET_SCORE or market_min_conf above {band.split('-')[0]}",
})
# Loss diagnostic — if cup matches dominate losses, recommend cup-aware filter
patterns = (diag.get("patterns") or {})
cup_share = patterns.get("cup_match", (0, 0))[1]
if cup_share >= 25:
recs.append({
"type": "cup_match_filter",
"evidence": f"{cup_share}% of losses are cup matches",
"suggested_fix": "Tighten betting_brain thresholds for is_cup_match=True picks",
})
rev_share = patterns.get("high_htft_reversal_prob (>=0.20)", (0, 0))[1]
if rev_share >= 15:
recs.append({
"type": "tighten_reversal_check",
"evidence": f"{rev_share}% of losses had HTFT reversal prob >=0.20 (already partial fix)",
"suggested_fix": "Lower reversal threshold in betting_brain from 0.25 to 0.20 for veto trigger",
})
rel_share = patterns.get("low_league_reliability (<0.45)", (0, 0))[1]
if rel_share >= 20:
recs.append({
"type": "league_reliability_filter",
"evidence": f"{rel_share}% of losses in low-reliability leagues (<0.45)",
"suggested_fix": "Add hard veto when odds_reliability<0.45 for non-value-sniper picks",
})
return recs
# ── CSV / report writers ───────────────────────────────────────────────
def write_csv(rows: List[Dict], path: str):
if not rows:
return
import csv
fields = list(rows[0].keys())
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in rows:
w.writerow(r)
def write_text_summary(rows: List[Dict], agg: Dict, diag: Dict,
recs: List[Dict], path: str, args):
lines: List[str] = []
push = lines.append
push("=" * 78)
push("DIAGNOSTIC BACKTEST REPORT")
push("=" * 78)
push(f"Generated: {datetime.now().isoformat(timespec='seconds')}")
push(f"Sample window: start={args.start or f'-{args.days}d'}, end={args.end or 'now'}")
push(f"Max matches: {args.max_matches}")
push(f"Excluded days: {sorted(EXCLUDED_DATES)}")
push("")
push("OVERALL")
push("-" * 78)
overall = agg.get("overall") or {}
for k in ("n_total", "n_playable_settled", "wins", "losses",
"hit_rate_pct", "unit_profit", "staked", "roi_pct"):
push(f" {k:25}: {overall.get(k)}")
push("")
push("PER MARKET")
push("-" * 78)
push(f" {'market':<8} {'n':>6} {'hit%':>7} {'profit':>9} {'roi%':>7}")
for market, s in sorted((agg.get("by_market") or {}).items(),
key=lambda kv: -(kv[1].get("n_playable_settled") or 0)):
push(f" {market:<8} {s.get('n_playable_settled',0):>6} "
f"{str(s.get('hit_rate_pct','')):>7} "
f"{str(s.get('unit_profit','')):>9} "
f"{str(s.get('roi_pct','')):>7}")
push("")
push("PER CALIBRATED CONFIDENCE BAND")
push("-" * 78)
push(f" {'band':<10} {'n':>6} {'hit%':>7} {'roi%':>7}")
for band, s in sorted((agg.get("by_confidence") or {}).items()):
push(f" {band:<10} {s.get('n_playable_settled',0):>6} "
f"{str(s.get('hit_rate_pct','')):>7} "
f"{str(s.get('roi_pct','')):>7}")
push("")
push("PER ODDS BAND")
push("-" * 78)
push(f" {'band':<10} {'n':>6} {'hit%':>7} {'roi%':>7}")
for band, s in sorted((agg.get("by_odds") or {}).items()):
push(f" {band:<10} {s.get('n_playable_settled',0):>6} "
f"{str(s.get('hit_rate_pct','')):>7} "
f"{str(s.get('roi_pct','')):>7}")
push("")
push("LEAGUE vs CUP")
push("-" * 78)
for k, s in (agg.get("by_competition") or {}).items():
push(f" {k:<8} n={s.get('n_playable_settled',0):>4} "
f"hit={s.get('hit_rate_pct','-')}% roi={s.get('roi_pct','-')}%")
push("")
push("LOSS DIAGNOSTICS")
push("-" * 78)
push(f" total losses: {diag.get('n_losses')}")
push(f" total lost units: {diag.get('total_loss_units')}")
push(f" By market: {diag.get('by_market')}")
push(" Loss patterns (count, % of losses):")
for pattern, (c, pct) in (diag.get("patterns") or {}).items():
push(f" {pattern:<55} {c:>4} ({pct}%)")
push(" Top betting_brain issues seen in losses:")
for issue, c in (diag.get("top_bb_issues_in_losses") or []):
push(f" {issue:<55} {c}")
push(" Top betting_brain vetoes (in losses — i.e. veto fired but bet still went through value-sniper override):")
for veto, c in (diag.get("top_bb_vetoes_in_losses") or []):
push(f" {veto:<55} {c}")
push("")
push("RECOMMENDATIONS")
push("-" * 78)
if not recs:
push(" (none surfaced — sample too small or no clear pattern)")
for r in recs:
push(f" • [{r['type']}]")
for k, v in r.items():
if k == "type":
continue
push(f" {k}: {v}")
push("")
push("=" * 78)
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
# ── Main loop ─────────────────────────────────────────────────────────
def _checkpoint_paths(args) -> Tuple[str, str]:
"""Stable checkpoint paths derived from the run's date window so a
re-run with the same args picks up the same checkpoint."""
key = f"{args.start or 'd' + str(args.days)}_{args.end or 'now'}_{args.max_matches}"
key = key.replace("-", "").replace(":", "")
ckpt_csv = os.path.join(REPORTS_DIR, f"_checkpoint_{key}.csv")
ckpt_state = os.path.join(REPORTS_DIR, f"_checkpoint_{key}.state")
return ckpt_csv, ckpt_state
def _load_checkpoint(args) -> Tuple[List[Dict], set]:
"""Read partial CSV + processed-IDs set if a previous run was interrupted."""
ckpt_csv, _ = _checkpoint_paths(args)
if not os.path.exists(ckpt_csv):
return [], set()
import csv
rows: List[Dict] = []
seen: set = set()
try:
with open(ckpt_csv, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(row)
seen.add(str(row.get("match_id") or ""))
except Exception as e:
print(f" checkpoint read failed ({e}); starting fresh")
return [], set()
return rows, seen
def _flush_checkpoint(args, rows: List[Dict]) -> None:
"""Atomic-ish overwrite of the partial CSV. Cheap enough at every 100 rows."""
if not rows:
return
ckpt_csv, _ = _checkpoint_paths(args)
import csv
tmp = ckpt_csv + ".tmp"
fields = list(rows[0].keys())
with open(tmp, "w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in rows:
w.writerow(r)
os.replace(tmp, ckpt_csv)
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--days", type=int, default=14,
help="Backwards window from now (default 14)")
parser.add_argument("--max-matches", type=int, default=2000,
help="Hard cap on matches processed (default 2000)")
parser.add_argument("--start", help="Start date YYYY-MM-DD (overrides --days)")
parser.add_argument("--end", help="End date YYYY-MM-DD")
parser.add_argument("--progress-interval", type=int, default=50)
parser.add_argument("--checkpoint-every", type=int, default=100,
help="Flush partial CSV every N matches (default 100)")
parser.add_argument("--no-resume", action="store_true",
help="Ignore any prior checkpoint and start fresh")
args = parser.parse_args()
print("=" * 70)
print("DIAGNOSTIC BACKTEST")
print("=" * 70)
print(f"Loading orchestrator...")
orch = get_single_match_orchestrator()
# Warm V25 + V27 + basketball loaders so the first match doesn't pay it
try:
orch._get_v25_predictor()
except Exception as e:
print(f" v25 warmup: {e}")
try:
orch._get_v27_predictor()
except Exception as e:
print(f" v27 warmup: {e}")
print(f"Fetching match window...")
matches = fetch_match_window(args)
n = len(matches)
print(f" {n} matches selected")
if not matches:
print("No matches to process. Exiting.")
return
# ── Resume from prior checkpoint if available ──
rows: List[Dict[str, Any]] = []
seen_ids: set = set()
if not args.no_resume:
rows, seen_ids = _load_checkpoint(args)
if rows:
print(f" Resuming from checkpoint: {len(rows)} matches already done")
errors: List[Tuple[str, str]] = []
t0 = time.time()
for i, m in enumerate(matches, start=1):
mid = str(m["match_id"])
if mid in seen_ids:
continue
try:
pkg = orch.analyze_match(mid)
if pkg is None:
continue
row = capture_bet_row(m, pkg)
rows.append(row)
except KeyboardInterrupt:
print("\nInterrupted, flushing checkpoint...")
_flush_checkpoint(args, rows)
break
except Exception as e:
errors.append((mid, str(e)))
if len(errors) <= 5:
traceback.print_exc()
# ── Periodic checkpoint flush so a crash doesn't lose everything ──
if i % args.checkpoint_every == 0:
_flush_checkpoint(args, rows)
if i % args.progress_interval == 0:
elapsed = time.time() - t0
rate = i / elapsed
eta = (n - i) / rate if rate else 0
playable_so_far = sum(1 for r in rows if r["playable"])
print(f" [{i}/{n}] rate={rate:.1f}/s eta={eta/60:.1f}min "
f"playable={playable_so_far} errors={len(errors)} "
f"(checkpoint at every {args.checkpoint_every})")
print(f"\nProcessed {len(rows)} rows in {(time.time()-t0):.1f}s "
f"({len(errors)} errors)")
# Aggregate
print("Aggregating...")
agg = aggregate(rows)
diag = loss_diagnostics(rows)
recs = make_recommendations(rows, agg, diag)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.csv")
json_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.json")
txt_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.txt")
write_csv(rows, csv_path)
with open(json_path, "w", encoding="utf-8") as f:
json.dump({"args": vars(args), "aggregate": agg, "loss_diagnostics": diag,
"recommendations": recs, "errors_sample": errors[:20]},
f, indent=2, default=str)
write_text_summary(rows, agg, diag, recs, txt_path, args)
print(f"\nOutputs:")
print(f" CSV: {csv_path}")
print(f" JSON: {json_path}")
print(f" TXT: {txt_path}")
print("\nOverall:", agg.get("overall"))
if __name__ == "__main__":
main()