""" Diagnostic Backtest =================== Run the full V28 orchestrator (in-process — no HTTP) on a window of completed matches, capture the recommendation + key signal features + the actual outcome, and produce a *diagnostic* report: not just "what was the hit rate" but "which feature clusters drive the losing bets". Outputs: - reports/diagnostic_backtest_YYYYMMDD.csv (per-bet detail) - reports/diagnostic_backtest_YYYYMMDD.json (aggregate metrics) - reports/diagnostic_backtest_YYYYMMDD.txt (human-readable summary) Usage: python scripts/diagnostic_backtest.py --days 14 --max-matches 2000 python scripts/diagnostic_backtest.py --start 2026-05-10 --end 2026-05-24 """ from __future__ import annotations import argparse import json import os import sys import time import traceback from collections import defaultdict, Counter from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple import psycopg2 from psycopg2.extras import RealDictCursor # Path bootstrap so we can import the ai-engine package from anywhere SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) AI_ENGINE_DIR = os.path.dirname(SCRIPT_DIR) sys.path.insert(0, AI_ENGINE_DIR) from data.db import get_clean_dsn from services.single_match_orchestrator import get_single_match_orchestrator REPORTS_DIR = os.path.join(AI_ENGINE_DIR, "reports") os.makedirs(REPORTS_DIR, exist_ok=True) # Days with confirmed feeder gaps — exclude from sample EXCLUDED_DATES = {"2026-05-03", "2026-04-29"} # ── Outcome resolution ──────────────────────────────────────────────── def _norm_pick(pick: Optional[str]) -> str: return str(pick or "").strip().casefold() def resolve_outcome(market: str, pick: str, sh: int, sa: int, htsh: Optional[int], htsa: Optional[int]) -> Optional[bool]: """Mirror of prediction-settlement.market-resolver.ts (TS side). Returns True/False on settle, None if cannot resolve.""" m = (market or "").upper().replace(" ", "").replace("-", "_") p = _norm_pick(pick) if m in ("MS", "ML", "1X2"): outcome = "1" if sh > sa else "2" if sa > sh else "x" return p in {outcome, outcome.upper(), outcome.lower(), "0" if outcome == "x" else outcome} if m in ("HT", "IY"): if htsh is None or htsa is None: return None outcome = "1" if htsh > htsa else "2" if htsa > htsh else "x" return p in {outcome, "0" if outcome == "x" else outcome} if m in ("OU05", "OU15", "OU25", "OU35", "OU45", "TOTAL"): line = {"OU05": 0.5, "OU15": 1.5, "OU25": 2.5, "OU35": 3.5, "OU45": 4.5, "TOTAL": 2.5}[m] total = sh + sa if total == line: return None is_over = total > line if "over" in p or "üst" in p or "ust" in p: return is_over if "alt" in p or "under" in p: return not is_over return None if m in ("OU05_HT", "OU15_HT", "OU25_HT", "HT_OU05", "HT_OU15", "HT_OU25"): if htsh is None or htsa is None: return None line = {"OU05_HT": 0.5, "OU15_HT": 1.5, "OU25_HT": 2.5, "HT_OU05": 0.5, "HT_OU15": 1.5, "HT_OU25": 2.5}[m] total = htsh + htsa if total == line: return None is_over = total > line if "over" in p or "üst" in p or "ust" in p: return is_over if "alt" in p or "under" in p: return not is_over return None if m in ("BTTS", "KG"): both = sh > 0 and sa > 0 if "yes" in p or "var" in p: return both if "no" in p or "yok" in p: return not both return None if m in ("HTFT", "IYMS"): if htsh is None or htsa is None or "/" not in p: return None ht_p, ft_p = p.split("/", 1) ht_actual = "1" if htsh > htsa else "2" if htsa > htsh else "x" ft_actual = "1" if sh > sa else "2" if sa > sh else "x" return ht_p.strip() == ht_actual and ft_p.strip() == ft_actual if m in ("DC", "CIFTE_SANS"): ft = "1" if sh > sa else "2" if sa > sh else "X" raw = p.upper().replace("-", "").replace("/", "") if raw in ("1X", "X1"): pair = ["1", "X"] elif raw in ("X2", "2X"): pair = ["X", "2"] elif raw in ("12", "21"): pair = ["1", "2"] else: return None return ft in pair if m in ("OE", "TEKCIFT"): is_odd = (sh + sa) % 2 == 1 if "tek" in p or "odd" in p: return is_odd if "cift" in p or "çift" in p or "even" in p: return not is_odd return None return None def compute_unit_profit(won: Optional[bool], stake: float, odds: Optional[float]) -> float: if won is None: return 0.0 if not won: return -abs(stake) if stake else -1.0 if not odds or odds <= 1.0: return 0.0 return round(stake * (odds - 1.0), 4) # ── Data fetch ──────────────────────────────────────────────────────── def fetch_match_window(args) -> List[Dict]: dsn = get_clean_dsn() if "?schema=" in dsn: dsn = dsn.split("?schema=")[0] if args.start and args.end: start = datetime.strptime(args.start, "%Y-%m-%d") end = datetime.strptime(args.end, "%Y-%m-%d") + timedelta(days=1) else: end = datetime.now(timezone.utc).replace(tzinfo=None) start = end - timedelta(days=args.days) start_ms = int(start.timestamp() * 1000) end_ms = int(end.timestamp() * 1000) excluded = sorted(EXCLUDED_DATES) excluded_clause = "" if excluded: ex_csv = ",".join(f"'{d}'" for d in excluded) excluded_clause = ( f" AND to_timestamp(mst_utc/1000)::date " f"NOT IN ({ex_csv})" ) with psycopg2.connect(dsn) as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute( f""" SELECT id AS match_id, score_home, score_away, ht_score_home, ht_score_away, league_id, to_timestamp(mst_utc/1000)::date AS match_date FROM matches WHERE sport='football' AND status='FT' AND score_home IS NOT NULL AND score_away IS NOT NULL AND mst_utc >= %s AND mst_utc < %s {excluded_clause} ORDER BY mst_utc DESC LIMIT %s """, (start_ms, end_ms, args.max_matches), ) return cur.fetchall() # ── Per-bet capture ─────────────────────────────────────────────────── def capture_bet_row(match: Dict, package: Dict) -> Dict[str, Any]: """Distill orchestrator response + ground truth into one analytic row.""" main = package.get("main_pick") or {} bb = main.get("betting_brain") or {} advice = package.get("bet_advice") or {} v27 = package.get("v27_engine") or {} triple = (v27.get("triple_value") or {}) risk = package.get("risk") or {} quality = package.get("data_quality") or {} htft_payload = ((package.get("market_board") or {}).get("HTFT") or {}) htft_probs = htft_payload.get("probs") or {} sh, sa = match["score_home"], match["score_away"] htsh, htsa = match["ht_score_home"], match["ht_score_away"] market = main.get("market") pick = main.get("pick") odds_val = _f(main.get("odds")) stake = _f(main.get("stake_units"), 1.0) playable = bool(main.get("playable")) and bool(advice.get("playable")) won = resolve_outcome(market, pick, sh, sa, htsh, htsa) if market and pick else None profit = compute_unit_profit(won, stake, odds_val) if playable else 0.0 # Reversal context (only meaningful for MS picks) rev_prob = None if market == "MS" and pick in ("1", "2"): if pick == "1": rev_prob = _f(htft_probs.get("1/2"), 0.0) + _f(htft_probs.get("1/X"), 0.0) else: rev_prob = _f(htft_probs.get("2/1"), 0.0) + _f(htft_probs.get("2/X"), 0.0) return { "match_id": match["match_id"], "match_date": str(match["match_date"]), "league_id": match.get("league_id"), "score_home": sh, "score_away": sa, "ht_score_home": htsh, "ht_score_away": htsa, "market": market, "pick": pick, "odds": odds_val, "stake_units": stake, "playable": playable, "won": won, "unit_profit": profit, "raw_confidence": _f(main.get("raw_confidence")), "calibrated_confidence": _f(main.get("calibrated_confidence")), "play_score": _f(main.get("play_score")), "ev_edge": _f(main.get("ev_edge")), "bet_grade": main.get("bet_grade"), "is_value_sniper": bool(main.get("is_value_sniper")), "bb_score": _f(bb.get("score")), "bb_action": bb.get("action"), "bb_vetoes": ";".join(bb.get("vetoes") or []), "bb_issues": ";".join(bb.get("issues") or []), "bb_positives": ";".join(bb.get("positives") or []), "bb_model_prob": _f(bb.get("model_prob")), "bb_implied_prob": _f(bb.get("implied_prob")), "bb_model_market_gap": _f(bb.get("model_market_gap")), "bb_divergence": _f(bb.get("divergence")), "bb_trap_market": bool(bb.get("trap_market_flag")), "v27_consensus": v27.get("consensus"), "data_quality_score": _f(quality.get("score")), "data_quality_flags": ";".join(quality.get("flags") or []), "risk_level": (risk.get("level") if isinstance(risk, dict) else None), "odds_reliability": _f(main.get("odds_reliability")), "htft_reversal_prob": rev_prob, "htft_top_pick": _argmax(htft_probs), "league_name": (package.get("match_info") or {}).get("league_name"), "is_cup": _is_cup((package.get("match_info") or {}).get("league_name") or ""), "model_version": package.get("model_version"), "decision_reason": main.get("pick_reason") or advice.get("reason"), } def _f(x: Any, default: Optional[float] = None) -> Optional[float]: try: return float(x) if x is not None else default except (TypeError, ValueError): return default def _argmax(d: Dict[str, Any]) -> Optional[str]: best, val = None, -1.0 for k, v in d.items(): fv = _f(v, 0.0) or 0.0 if fv > val: best, val = k, fv return best _CUP_KEYWORDS = ("kupa", "cup", "coupe", "copa", "coppa", "pokal", "trophy", "shield", "ziraat", "süper kupa", "super cup", "beker", "taça", "taca") def _is_cup(name: str) -> bool: n = (name or "").lower() return any(kw in n for kw in _CUP_KEYWORDS) # ── Aggregation helpers ──────────────────────────────────────────────── def _bucket(value: Optional[float], edges: List[float]) -> Optional[str]: if value is None: return None for i, edge in enumerate(edges): if value < edge: if i == 0: return f"<{edge}" return f"{edges[i-1]}-{edge}" return f">={edges[-1]}" def _summary_stats(rows: List[Dict]) -> Dict[str, Any]: if not rows: return {"n": 0} settled = [r for r in rows if r["playable"] and r["won"] is not None] won = sum(1 for r in settled if r["won"]) lost = sum(1 for r in settled if not r["won"]) profit = sum(float(r["unit_profit"]) for r in settled) staked = sum(float(r["stake_units"]) for r in settled) return { "n_total": len(rows), "n_playable_settled": len(settled), "wins": won, "losses": lost, "hit_rate_pct": round(100.0 * won / len(settled), 2) if settled else None, "unit_profit": round(profit, 3), "staked": round(staked, 3), "roi_pct": round(100.0 * profit / staked, 2) if staked else None, } def aggregate(rows: List[Dict]) -> Dict[str, Any]: out: Dict[str, Any] = {"overall": _summary_stats(rows)} by = lambda key_fn: defaultdict(list) market_buckets = by(None) conf_buckets = by(None) odds_buckets = by(None) grade_buckets = by(None) cup_buckets = by(None) motivation_buckets = by(None) for r in rows: if r["playable"]: market_buckets[r["market"] or "?"].append(r) conf_buckets[_bucket(r["calibrated_confidence"], [45, 50, 55, 60, 65, 70, 80])].append(r) odds_buckets[_bucket(r["odds"], [1.3, 1.5, 1.8, 2.2, 3.0, 5.0])].append(r) grade_buckets[r["bet_grade"] or "?"].append(r) cup_buckets["cup" if r["is_cup"] else "league"].append(r) out["by_market"] = {k: _summary_stats(v) for k, v in market_buckets.items()} out["by_confidence"] = {k: _summary_stats(v) for k, v in conf_buckets.items() if k} out["by_odds"] = {k: _summary_stats(v) for k, v in odds_buckets.items() if k} out["by_grade"] = {k: _summary_stats(v) for k, v in grade_buckets.items()} out["by_competition"] = {k: _summary_stats(v) for k, v in cup_buckets.items()} return out def loss_diagnostics(rows: List[Dict]) -> Dict[str, Any]: losses = [r for r in rows if r["playable"] and r["won"] is False] if not losses: return {"n_losses": 0} n = len(losses) def share(predicate) -> Tuple[int, float]: c = sum(1 for r in losses if predicate(r)) return c, round(100.0 * c / n, 2) diagnostics = { "n_losses": n, "total_loss_units": round(sum(float(r["unit_profit"]) for r in losses), 3), "patterns": { "high_htft_reversal_prob (>=0.20)": share( lambda r: (r.get("htft_reversal_prob") or 0) >= 0.20 ), "cup_match": share(lambda r: r["is_cup"]), "low_league_reliability (<0.45)": share( lambda r: (r.get("odds_reliability") or 1) < 0.45 ), "v27_disagree": share(lambda r: r.get("v27_consensus") == "DISAGREE"), "trap_market_flagged": share(lambda r: r.get("bb_trap_market")), "low_calibrated_conf (<55)": share( lambda r: (r.get("calibrated_confidence") or 0) < 55 ), "high_odds_underdog (>=2.5)": share( lambda r: (r.get("odds") or 0) >= 2.5 ), "low_data_quality (<0.55)": share( lambda r: (r.get("data_quality_score") or 1) < 0.55 ), "high_risk_level": share( lambda r: r.get("risk_level") in ("HIGH", "EXTREME") ), "inferred_features": share( lambda r: "ai_features_inferred_from_history" in (r.get("data_quality_flags") or "") ), }, "by_market": Counter(r["market"] for r in losses).most_common(), "by_league": Counter(r.get("league_name") for r in losses).most_common(10), } # Top issue tags from betting_brain across losses issue_counter = Counter() veto_counter = Counter() for r in losses: for tag in (r.get("bb_issues") or "").split(";"): if tag: issue_counter[tag] += 1 for tag in (r.get("bb_vetoes") or "").split(";"): if tag: veto_counter[tag] += 1 diagnostics["top_bb_issues_in_losses"] = issue_counter.most_common(15) diagnostics["top_bb_vetoes_in_losses"] = veto_counter.most_common(15) return diagnostics # ── Recommendations ──────────────────────────────────────────────────── def make_recommendations(rows: List[Dict], agg: Dict[str, Any], diag: Dict[str, Any]) -> List[Dict[str, Any]]: recs: List[Dict[str, Any]] = [] overall = agg.get("overall") or {} if not overall.get("n_playable_settled"): return recs # Cross-reference market hit rate vs overall — flag chronic losers. overall_hit = overall.get("hit_rate_pct") or 0.0 for market, stats in (agg.get("by_market") or {}).items(): n = stats.get("n_playable_settled") or 0 hit = stats.get("hit_rate_pct") roi = stats.get("roi_pct") if n < 30: continue if hit is not None and roi is not None and roi < -10 and hit < overall_hit - 10: recs.append({ "type": "drop_market", "market": market, "evidence": f"hit={hit}%, roi={roi}%, n={n} — chronic loser", "suggested_fix": f"Add veto in betting_brain when market=={market} unless overwhelming evidence", "estimated_loss_prevented_units": round(-(stats.get("unit_profit") or 0), 2), }) # Confidence band tuning — flag bands where ROI < 0 despite passing eşik for band, stats in (agg.get("by_confidence") or {}).items(): n = stats.get("n_playable_settled") or 0 roi = stats.get("roi_pct") if n >= 40 and roi is not None and roi < -8: recs.append({ "type": "raise_confidence_threshold", "confidence_band": band, "evidence": f"n={n}, roi={roi}%", "suggested_fix": f"Raise MIN_BET_SCORE or market_min_conf above {band.split('-')[0]}", }) # Loss diagnostic — if cup matches dominate losses, recommend cup-aware filter patterns = (diag.get("patterns") or {}) cup_share = patterns.get("cup_match", (0, 0))[1] if cup_share >= 25: recs.append({ "type": "cup_match_filter", "evidence": f"{cup_share}% of losses are cup matches", "suggested_fix": "Tighten betting_brain thresholds for is_cup_match=True picks", }) rev_share = patterns.get("high_htft_reversal_prob (>=0.20)", (0, 0))[1] if rev_share >= 15: recs.append({ "type": "tighten_reversal_check", "evidence": f"{rev_share}% of losses had HTFT reversal prob >=0.20 (already partial fix)", "suggested_fix": "Lower reversal threshold in betting_brain from 0.25 to 0.20 for veto trigger", }) rel_share = patterns.get("low_league_reliability (<0.45)", (0, 0))[1] if rel_share >= 20: recs.append({ "type": "league_reliability_filter", "evidence": f"{rel_share}% of losses in low-reliability leagues (<0.45)", "suggested_fix": "Add hard veto when odds_reliability<0.45 for non-value-sniper picks", }) return recs # ── CSV / report writers ─────────────────────────────────────────────── def write_csv(rows: List[Dict], path: str): if not rows: return import csv fields = list(rows[0].keys()) with open(path, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() for r in rows: w.writerow(r) def write_text_summary(rows: List[Dict], agg: Dict, diag: Dict, recs: List[Dict], path: str, args): lines: List[str] = [] push = lines.append push("=" * 78) push("DIAGNOSTIC BACKTEST REPORT") push("=" * 78) push(f"Generated: {datetime.now().isoformat(timespec='seconds')}") push(f"Sample window: start={args.start or f'-{args.days}d'}, end={args.end or 'now'}") push(f"Max matches: {args.max_matches}") push(f"Excluded days: {sorted(EXCLUDED_DATES)}") push("") push("OVERALL") push("-" * 78) overall = agg.get("overall") or {} for k in ("n_total", "n_playable_settled", "wins", "losses", "hit_rate_pct", "unit_profit", "staked", "roi_pct"): push(f" {k:25}: {overall.get(k)}") push("") push("PER MARKET") push("-" * 78) push(f" {'market':<8} {'n':>6} {'hit%':>7} {'profit':>9} {'roi%':>7}") for market, s in sorted((agg.get("by_market") or {}).items(), key=lambda kv: -(kv[1].get("n_playable_settled") or 0)): push(f" {market:<8} {s.get('n_playable_settled',0):>6} " f"{str(s.get('hit_rate_pct','')):>7} " f"{str(s.get('unit_profit','')):>9} " f"{str(s.get('roi_pct','')):>7}") push("") push("PER CALIBRATED CONFIDENCE BAND") push("-" * 78) push(f" {'band':<10} {'n':>6} {'hit%':>7} {'roi%':>7}") for band, s in sorted((agg.get("by_confidence") or {}).items()): push(f" {band:<10} {s.get('n_playable_settled',0):>6} " f"{str(s.get('hit_rate_pct','')):>7} " f"{str(s.get('roi_pct','')):>7}") push("") push("PER ODDS BAND") push("-" * 78) push(f" {'band':<10} {'n':>6} {'hit%':>7} {'roi%':>7}") for band, s in sorted((agg.get("by_odds") or {}).items()): push(f" {band:<10} {s.get('n_playable_settled',0):>6} " f"{str(s.get('hit_rate_pct','')):>7} " f"{str(s.get('roi_pct','')):>7}") push("") push("LEAGUE vs CUP") push("-" * 78) for k, s in (agg.get("by_competition") or {}).items(): push(f" {k:<8} n={s.get('n_playable_settled',0):>4} " f"hit={s.get('hit_rate_pct','-')}% roi={s.get('roi_pct','-')}%") push("") push("LOSS DIAGNOSTICS") push("-" * 78) push(f" total losses: {diag.get('n_losses')}") push(f" total lost units: {diag.get('total_loss_units')}") push(f" By market: {diag.get('by_market')}") push(" Loss patterns (count, % of losses):") for pattern, (c, pct) in (diag.get("patterns") or {}).items(): push(f" {pattern:<55} {c:>4} ({pct}%)") push(" Top betting_brain issues seen in losses:") for issue, c in (diag.get("top_bb_issues_in_losses") or []): push(f" {issue:<55} {c}") push(" Top betting_brain vetoes (in losses — i.e. veto fired but bet still went through value-sniper override):") for veto, c in (diag.get("top_bb_vetoes_in_losses") or []): push(f" {veto:<55} {c}") push("") push("RECOMMENDATIONS") push("-" * 78) if not recs: push(" (none surfaced — sample too small or no clear pattern)") for r in recs: push(f" • [{r['type']}]") for k, v in r.items(): if k == "type": continue push(f" {k}: {v}") push("") push("=" * 78) with open(path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) # ── Main loop ───────────────────────────────────────────────────────── def _checkpoint_paths(args) -> Tuple[str, str]: """Stable checkpoint paths derived from the run's date window so a re-run with the same args picks up the same checkpoint.""" key = f"{args.start or 'd' + str(args.days)}_{args.end or 'now'}_{args.max_matches}" key = key.replace("-", "").replace(":", "") ckpt_csv = os.path.join(REPORTS_DIR, f"_checkpoint_{key}.csv") ckpt_state = os.path.join(REPORTS_DIR, f"_checkpoint_{key}.state") return ckpt_csv, ckpt_state def _load_checkpoint(args) -> Tuple[List[Dict], set]: """Read partial CSV + processed-IDs set if a previous run was interrupted.""" ckpt_csv, _ = _checkpoint_paths(args) if not os.path.exists(ckpt_csv): return [], set() import csv rows: List[Dict] = [] seen: set = set() try: with open(ckpt_csv, "r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) for row in reader: rows.append(row) seen.add(str(row.get("match_id") or "")) except Exception as e: print(f" checkpoint read failed ({e}); starting fresh") return [], set() return rows, seen def _flush_checkpoint(args, rows: List[Dict]) -> None: """Atomic-ish overwrite of the partial CSV. Cheap enough at every 100 rows.""" if not rows: return ckpt_csv, _ = _checkpoint_paths(args) import csv tmp = ckpt_csv + ".tmp" fields = list(rows[0].keys()) with open(tmp, "w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() for r in rows: w.writerow(r) os.replace(tmp, ckpt_csv) def main(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--days", type=int, default=14, help="Backwards window from now (default 14)") parser.add_argument("--max-matches", type=int, default=2000, help="Hard cap on matches processed (default 2000)") parser.add_argument("--start", help="Start date YYYY-MM-DD (overrides --days)") parser.add_argument("--end", help="End date YYYY-MM-DD") parser.add_argument("--progress-interval", type=int, default=50) parser.add_argument("--checkpoint-every", type=int, default=100, help="Flush partial CSV every N matches (default 100)") parser.add_argument("--no-resume", action="store_true", help="Ignore any prior checkpoint and start fresh") args = parser.parse_args() print("=" * 70) print("DIAGNOSTIC BACKTEST") print("=" * 70) print(f"Loading orchestrator...") orch = get_single_match_orchestrator() # Warm V25 + V27 + basketball loaders so the first match doesn't pay it try: orch._get_v25_predictor() except Exception as e: print(f" v25 warmup: {e}") try: orch._get_v27_predictor() except Exception as e: print(f" v27 warmup: {e}") print(f"Fetching match window...") matches = fetch_match_window(args) n = len(matches) print(f" {n} matches selected") if not matches: print("No matches to process. Exiting.") return # ── Resume from prior checkpoint if available ── rows: List[Dict[str, Any]] = [] seen_ids: set = set() if not args.no_resume: rows, seen_ids = _load_checkpoint(args) if rows: print(f" Resuming from checkpoint: {len(rows)} matches already done") errors: List[Tuple[str, str]] = [] t0 = time.time() for i, m in enumerate(matches, start=1): mid = str(m["match_id"]) if mid in seen_ids: continue try: pkg = orch.analyze_match(mid) if pkg is None: continue row = capture_bet_row(m, pkg) rows.append(row) except KeyboardInterrupt: print("\nInterrupted, flushing checkpoint...") _flush_checkpoint(args, rows) break except Exception as e: errors.append((mid, str(e))) if len(errors) <= 5: traceback.print_exc() # ── Periodic checkpoint flush so a crash doesn't lose everything ── if i % args.checkpoint_every == 0: _flush_checkpoint(args, rows) if i % args.progress_interval == 0: elapsed = time.time() - t0 rate = i / elapsed eta = (n - i) / rate if rate else 0 playable_so_far = sum(1 for r in rows if r["playable"]) print(f" [{i}/{n}] rate={rate:.1f}/s eta={eta/60:.1f}min " f"playable={playable_so_far} errors={len(errors)} " f"(checkpoint at every {args.checkpoint_every})") print(f"\nProcessed {len(rows)} rows in {(time.time()-t0):.1f}s " f"({len(errors)} errors)") # Aggregate print("Aggregating...") agg = aggregate(rows) diag = loss_diagnostics(rows) recs = make_recommendations(rows, agg, diag) stamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.csv") json_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.json") txt_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.txt") write_csv(rows, csv_path) with open(json_path, "w", encoding="utf-8") as f: json.dump({"args": vars(args), "aggregate": agg, "loss_diagnostics": diag, "recommendations": recs, "errors_sample": errors[:20]}, f, indent=2, default=str) write_text_summary(rows, agg, diag, recs, txt_path, args) print(f"\nOutputs:") print(f" CSV: {csv_path}") print(f" JSON: {json_path}") print(f" TXT: {txt_path}") print("\nOverall:", agg.get("overall")) if __name__ == "__main__": main()