Add backtest pipeline, betting_brain filters, score coherence + social v3

betting_brain.py:
- HARD_MIN_SAMPLES=50 floor for calibrator bypass
- ev_edge < 0 + >= 0.20 hard vetoes
- BTTS muted (grid search found no profitable config)
- Per-market optimal envelopes (MS, OU25)
- Score coherence filter: main_pick must agree with score prediction
- HTFT reversal cross-check for MS picks

feature_builder.py / data_loader.py:
- Real home/away_position from data (was hardcoded 10)
- Cup detection wired into UpsetEngine
- _estimate_league_position with 300-day season filter

New scripts:
- diagnostic_backtest.py: per-bet diagnostic backtest with loss patterns
- optimize_filters.py: grid search per-market optimal thresholds
- analyze_backtest_csv.py: root-cause hypothesis testing on CSV
- compare_backtests.py: side-by-side validation with verdict
- test_score_coherence.py: smoke test for coherence filter (20/20 pass)

Reports:
- diagnostic_backtest_20260525_024437 (50-match smoke)
- diagnostic_backtest_20260525_035649 (1000-match in-sample)
- filter_optimization_patch.json (grid search winners per market)

Social poster v3:
- satori + resvg HTML/CSS rendering pipeline
- Twemoji football/basketball + flag SVGs
- caption SEO: 12 curated hashtags per post
- image SEO: descriptive filenames + .json metadata sidecar
- /health, /preview-png, /run-now endpoints

Docs:
- mds/SESSION_HANDOFF.md: full session state for cross-machine continuity
- mds/SOCIAL_POSTER_SETUP.md: API keys + test commands

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 20:43:28 +03:00
parent b619c2454a
commit 988ee2f50d
36 changed files with 5268 additions and 46 deletions
+227
View File
@@ -0,0 +1,227 @@
"""
Deep root-cause analysis on diagnostic_backtest CSV.
Tests specific hypotheses with hard numbers and proposes actionable
filter rules with estimated impact (units saved, ROI shift).
"""
import sys, os, glob
import pandas as pd
import numpy as np
REPORTS_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "reports")
def latest_csv():
files = sorted(glob.glob(os.path.join(REPORTS_DIR, "diagnostic_backtest_*.csv")),
key=os.path.getmtime, reverse=True)
return files[0] if files else None
def fmt_pct(x):
return f"{x:>6.2f}%" if pd.notna(x) else " ----"
def cell(df, label, mask):
sub = df[mask]
n = len(sub)
if n == 0:
return f" {label:<60} n=0"
wins = (sub["won"] == True).sum()
losses = (sub["won"] == False).sum()
settled = wins + losses
hr = 100.0 * wins / settled if settled else 0
profit = sub["unit_profit"].sum()
staked = sub["stake_units"].sum()
roi = 100.0 * profit / staked if staked else 0
return (f" {label:<60} n={n:>4} hit={hr:>6.2f}% "
f"profit={profit:>+7.2f}u roi={roi:>+7.2f}%")
def hypothesis_block(title, rows):
print(f"\n{'' * 78}")
print(f" {title}")
print(f"{'' * 78}")
for row in rows:
print(row)
def main():
csv_path = latest_csv()
if not csv_path:
print("No backtest CSV found")
return
print(f"Reading {csv_path}")
df = pd.read_csv(csv_path)
print(f"Loaded {len(df)} rows")
# Filter only playable + settled
pdf = df[(df["playable"] == True) & (df["won"].notna())].copy()
pdf["won"] = pdf["won"].astype(bool)
print(f"Playable + settled: {len(pdf)}")
overall_hr = (pdf["won"].sum() / len(pdf)) * 100
overall_roi = 100.0 * pdf["unit_profit"].sum() / pdf["stake_units"].sum()
print(f"\nOVERALL: hit={overall_hr:.2f}% roi={overall_roi:.2f}%")
# ─────────────────────────────────────────────────────────────────────
# H1: TRIPLE VALUE CONFIRMATION
# ─────────────────────────────────────────────────────────────────────
triple_confirmed_mask = ~pdf["bb_issues"].fillna("").str.contains(
"triple_value_not_confirmed", na=False
)
hypothesis_block(
"H1: TRIPLE VALUE CONFIRMED vs NOT CONFIRMED",
[
cell(pdf, "triple_value CONFIRMED", triple_confirmed_mask),
cell(pdf, "triple_value NOT CONFIRMED", ~triple_confirmed_mask),
]
)
# ─────────────────────────────────────────────────────────────────────
# H2: TRAP MARKET FLAG
# ─────────────────────────────────────────────────────────────────────
trap_mask = pdf["bb_trap_market"] == True
hypothesis_block(
"H2: TRAP MARKET FLAG (model says band rate < implied → market overpriced)",
[
cell(pdf, "trap_market_flag = TRUE (model warned)", trap_mask),
cell(pdf, "trap_market_flag = FALSE", ~trap_mask),
]
)
# ─────────────────────────────────────────────────────────────────────
# H3: V25/V27 CONSENSUS
# ─────────────────────────────────────────────────────────────────────
agree_mask = pdf["v27_consensus"] == "AGREE"
disagree_mask = pdf["v27_consensus"] == "DISAGREE"
hypothesis_block(
"H3: V25 ↔ V27 CONSENSUS",
[
cell(pdf, "AGREE", agree_mask),
cell(pdf, "DISAGREE", disagree_mask),
cell(pdf, "neither/null", ~(agree_mask | disagree_mask)),
]
)
# ─────────────────────────────────────────────────────────────────────
# H4: ODDS RELIABILITY (league quality)
# ─────────────────────────────────────────────────────────────────────
pdf["rel_band"] = pd.cut(
pdf["odds_reliability"].fillna(0.35),
[0, 0.30, 0.45, 0.55, 1.0],
labels=["<0.30 verylow", "0.30-0.45 low", "0.45-0.55 mid", ">=0.55 high"]
)
hypothesis_block(
"H4: LEAGUE ODDS RELIABILITY",
[cell(pdf, str(b), pdf["rel_band"] == b) for b in pdf["rel_band"].cat.categories]
)
# ─────────────────────────────────────────────────────────────────────
# H5: CALIBRATOR IMPACT (raw vs calibrated)
# ─────────────────────────────────────────────────────────────────────
pdf["calib_delta"] = pdf["calibrated_confidence"] - pdf["raw_confidence"]
pdf["delta_band"] = pd.cut(
pdf["calib_delta"].fillna(0),
[-100, -10, -3, 3, 10, 100],
labels=["cal<<raw (-10+)", "cal<raw (-3..-10)", "≈equal (±3)",
"cal>raw (3..10)", "cal>>raw (+10+)"]
)
hypothesis_block(
"H5: CALIBRATOR DELTA (calibrated_conf - raw_conf)",
[cell(pdf, str(b), pdf["delta_band"] == b) for b in pdf["delta_band"].cat.categories]
)
# ─────────────────────────────────────────────────────────────────────
# H6: EV EDGE
# ─────────────────────────────────────────────────────────────────────
pdf["edge_band"] = pd.cut(
pdf["ev_edge"].fillna(0),
[-10, -0.05, 0.0, 0.05, 0.10, 0.20, 10],
labels=["edge<-5%", "-5%-0%", "0-5%", "5-10%", "10-20%", ">20%"]
)
hypothesis_block(
"H6: EV EDGE (model_prob - implied_prob)",
[cell(pdf, str(b), pdf["edge_band"] == b) for b in pdf["edge_band"].cat.categories]
)
# ─────────────────────────────────────────────────────────────────────
# H7: ODDS x MARKET cross
# ─────────────────────────────────────────────────────────────────────
pdf["odds_band"] = pd.cut(
pdf["odds"].fillna(0),
[0, 1.30, 1.50, 1.80, 2.20, 3.00, 100],
labels=["<1.30", "1.30-1.50", "1.50-1.80", "1.80-2.20", "2.20-3.00", ">3.00"]
)
print(f"\n{'' * 78}")
print(f" H7: ODDS BAND × MARKET (per cell hit% / roi% / n)")
print(f"{'' * 78}")
pivot_n = pdf.pivot_table(index="market", columns="odds_band",
values="match_id", aggfunc="count", fill_value=0,
observed=False)
pivot_roi = pdf.pivot_table(index="market", columns="odds_band",
values="unit_profit", aggfunc="sum", fill_value=0,
observed=False)
pivot_stake = pdf.pivot_table(index="market", columns="odds_band",
values="stake_units", aggfunc="sum", fill_value=0,
observed=False)
pivot_roi_pct = (100.0 * pivot_roi / pivot_stake.replace(0, np.nan)).round(1)
print("\n Bet count per cell:")
print(pivot_n.to_string())
print("\n ROI% per cell:")
print(pivot_roi_pct.to_string())
# ─────────────────────────────────────────────────────────────────────
# H8: COMBINED FILTER SIMULATION
# ─────────────────────────────────────────────────────────────────────
print(f"\n{'' * 78}")
print(" H8: COMBINED FILTER SIMULATION (what if we add rules)")
print(f"{'' * 78}")
def simulate(filter_name, keep_mask):
kept = pdf[keep_mask]
rejected = pdf[~keep_mask]
if len(kept) == 0:
return f" {filter_name:<55} → 0 bet remain"
kept_hr = 100.0 * kept["won"].sum() / len(kept)
kept_profit = kept["unit_profit"].sum()
kept_staked = kept["stake_units"].sum()
kept_roi = 100.0 * kept_profit / kept_staked if kept_staked else 0
saved = -rejected["unit_profit"].sum() # money we WOULD HAVE LOST
return (f" {filter_name:<55} keep={len(kept):>3} hit={kept_hr:>5.1f}% "
f"roi={kept_roi:>+6.2f}% saved={saved:>+6.2f}u")
print(simulate("BASELINE (no extra filter)", pd.Series([True] * len(pdf), index=pdf.index)))
print(simulate("REJECT triple_value_not_confirmed",
~pdf["bb_issues"].fillna("").str.contains("triple_value_not_confirmed")))
print(simulate("REJECT trap_market_flag",
~(pdf["bb_trap_market"] == True)))
print(simulate("REJECT v27 DISAGREE",
pdf["v27_consensus"] != "DISAGREE"))
print(simulate("REJECT odds_reliability < 0.45",
pdf["odds_reliability"].fillna(1.0) >= 0.45))
print(simulate("REJECT odds in 1.80-2.20",
(pdf["odds"].fillna(0) < 1.80) | (pdf["odds"].fillna(0) >= 2.20)))
print(simulate("REJECT ev_edge < 0",
pdf["ev_edge"].fillna(0) >= 0))
print(simulate("REJECT ev_edge < 0.05",
pdf["ev_edge"].fillna(0) >= 0.05))
print()
print(" COMBINED rules:")
# Stack 1: drop triple_not_confirmed + trap_market + DISAGREE
s1 = (
~pdf["bb_issues"].fillna("").str.contains("triple_value_not_confirmed")
& ~(pdf["bb_trap_market"] == True)
& (pdf["v27_consensus"] != "DISAGREE")
)
print(simulate("STACK1: !triple_not_conf & !trap & !disagree", s1))
# Stack 2: + edge>=0
s2 = s1 & (pdf["ev_edge"].fillna(0) >= 0)
print(simulate("STACK2: STACK1 + edge >= 0", s2))
# Stack 3: + reliability>=0.45
s3 = s2 & (pdf["odds_reliability"].fillna(1.0) >= 0.45)
print(simulate("STACK3: STACK2 + reliability >= 0.45", s3))
# Stack 4: + odds outside 1.80-2.20
s4 = s3 & ((pdf["odds"].fillna(0) < 1.80) | (pdf["odds"].fillna(0) >= 2.20))
print(simulate("STACK4: STACK3 + odds NOT in 1.80-2.20", s4))
print(f"\n{'' * 78}")
print("DONE.")
if __name__ == "__main__":
main()
+134
View File
@@ -0,0 +1,134 @@
"""
Compare two diagnostic_backtest CSV outputs side-by-side.
Used to validate that a filter change actually improved ROI vs the
baseline run — and to detect overfitting (in-sample success but
out-of-sample collapse).
Usage:
python scripts/compare_backtests.py <baseline.csv> <validation.csv>
python scripts/compare_backtests.py (auto-picks 2 most recent CSVs)
"""
import sys, os, glob
import pandas as pd
from typing import Dict
REPORTS_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "reports")
def load(path: str) -> pd.DataFrame:
df = pd.read_csv(path)
df["won_bool"] = df["won"].map(
{True: True, False: False, "True": True, "False": False, 1: True, 0: False}
)
return df
def stats(df: pd.DataFrame, mask=None) -> Dict:
if mask is not None:
df = df[mask]
playable = df[(df["playable"] == True) & (df["won_bool"].notna())]
if len(playable) == 0:
return {"n_total": len(df), "n_playable": 0, "hit": 0, "profit": 0,
"staked": 0, "roi": 0}
wins = playable["won_bool"].sum()
profit = playable["unit_profit"].sum()
staked = playable["stake_units"].sum()
return {
"n_total": int(len(df)),
"n_playable": int(len(playable)),
"wins": int(wins),
"losses": int(len(playable) - wins),
"hit": round(100.0 * wins / len(playable), 2),
"profit": round(profit, 2),
"staked": round(staked, 2),
"roi": round(100.0 * profit / staked, 2) if staked else 0,
}
def line(label: str, a: Dict, b: Dict, suffix: str = ""):
fields = ["n_total", "n_playable", "hit", "profit", "staked", "roi"]
parts = [f"{label:<28}"]
for f in fields:
va = a.get(f, "-")
vb = b.get(f, "-")
parts.append(f"{f}: {str(va):>8}{str(vb):>8}")
print(" " + " | ".join(parts) + suffix)
def main():
if len(sys.argv) == 3:
a_path, b_path = sys.argv[1], sys.argv[2]
else:
files = sorted(glob.glob(os.path.join(REPORTS_DIR, "diagnostic_backtest_*.csv")),
key=os.path.getmtime, reverse=True)
if len(files) < 2:
print("Need at least 2 backtest CSVs in reports/. Pass paths manually.")
return
b_path, a_path = files[0], files[1] # newest first as "validation"
print(f"Baseline A: {os.path.basename(a_path)}")
print(f"Validation B: {os.path.basename(b_path)}")
a = load(a_path)
b = load(b_path)
print(f"\n{'=' * 100}")
print(f" OVERALL")
print(f"{'=' * 100}")
line("ALL", stats(a), stats(b))
print(f"\n{'' * 100}")
print(f" PER MARKET")
print(f"{'' * 100}")
markets = sorted(set(a["market"].dropna().unique()) | set(b["market"].dropna().unique()))
for m in markets:
line(f"market={m}",
stats(a, a["market"] == m),
stats(b, b["market"] == m))
# New veto family check — did MUTED_MARKETS actually mute?
print(f"\n{'' * 100}")
print(f" NEW VETO IMPACT (look for new veto names in betting_brain.vetoes)")
print(f"{'' * 100}")
new_vetoes = ["market_muted_by_backtest", "negative_ev_edge", "ev_edge_too_high_trap",
"outside_envelope_edge_low", "outside_envelope_edge_high",
"outside_envelope_odds_low", "outside_envelope_v27_must_agree"]
for veto in new_vetoes:
a_hits = a["bb_vetoes"].fillna("").str.contains(veto).sum()
b_hits = b["bb_vetoes"].fillna("").str.contains(veto).sum()
print(f" {veto:<45} A={a_hits:>4} B={b_hits:>4}")
# Top issue tags
print(f"\n{'' * 100}")
print(f" BTTS MUTE CHECK — should be ~0 playable in validation")
print(f"{'' * 100}")
a_btts_play = ((a["market"] == "BTTS") & (a["playable"] == True)).sum()
b_btts_play = ((b["market"] == "BTTS") & (b["playable"] == True)).sum()
print(f" BTTS playable bets: A={a_btts_play} → B={b_btts_play} "
f"(should be 0 in B if MUTE works)")
# Verdict
print(f"\n{'=' * 100}")
a_s = stats(a)
b_s = stats(b)
roi_delta = b_s["roi"] - a_s["roi"]
if b_s["n_playable"] < 20:
verdict = "TOO FEW BETS — sample insufficient"
elif roi_delta > 5 and b_s["roi"] > 0:
verdict = "✅ FILTERS WORK — ROI improved AND positive"
elif roi_delta > 5:
verdict = "🟡 PARTIAL — ROI improved but still negative"
elif roi_delta > 0:
verdict = "🟡 SLIGHT IMPROVEMENT"
elif roi_delta < -5:
verdict = "❌ OVERFITTING — validation ROI collapsed"
else:
verdict = "❌ NO MATERIAL CHANGE"
print(f" VERDICT: {verdict}")
print(f" ROI: {a_s['roi']}% → {b_s['roi']}% (Δ {roi_delta:+.2f}pp)")
print(f"{'=' * 100}")
if __name__ == "__main__":
main()
+674
View File
@@ -0,0 +1,674 @@
"""
Diagnostic Backtest
===================
Run the full V28 orchestrator (in-process — no HTTP) on a window of completed
matches, capture the recommendation + key signal features + the actual outcome,
and produce a *diagnostic* report: not just "what was the hit rate" but
"which feature clusters drive the losing bets".
Outputs:
- reports/diagnostic_backtest_YYYYMMDD.csv (per-bet detail)
- reports/diagnostic_backtest_YYYYMMDD.json (aggregate metrics)
- reports/diagnostic_backtest_YYYYMMDD.txt (human-readable summary)
Usage:
python scripts/diagnostic_backtest.py --days 14 --max-matches 2000
python scripts/diagnostic_backtest.py --start 2026-05-10 --end 2026-05-24
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import traceback
from collections import defaultdict, Counter
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
import psycopg2
from psycopg2.extras import RealDictCursor
# Path bootstrap so we can import the ai-engine package from anywhere
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
AI_ENGINE_DIR = os.path.dirname(SCRIPT_DIR)
sys.path.insert(0, AI_ENGINE_DIR)
from data.db import get_clean_dsn
from services.single_match_orchestrator import get_single_match_orchestrator
REPORTS_DIR = os.path.join(AI_ENGINE_DIR, "reports")
os.makedirs(REPORTS_DIR, exist_ok=True)
# Days with confirmed feeder gaps — exclude from sample
EXCLUDED_DATES = {"2026-05-03", "2026-04-29"}
# ── Outcome resolution ────────────────────────────────────────────────
def _norm_pick(pick: Optional[str]) -> str:
return str(pick or "").strip().casefold()
def resolve_outcome(market: str, pick: str, sh: int, sa: int,
htsh: Optional[int], htsa: Optional[int]) -> Optional[bool]:
"""Mirror of prediction-settlement.market-resolver.ts (TS side).
Returns True/False on settle, None if cannot resolve."""
m = (market or "").upper().replace(" ", "").replace("-", "_")
p = _norm_pick(pick)
if m in ("MS", "ML", "1X2"):
outcome = "1" if sh > sa else "2" if sa > sh else "x"
return p in {outcome, outcome.upper(), outcome.lower(), "0" if outcome == "x" else outcome}
if m in ("HT", "IY"):
if htsh is None or htsa is None:
return None
outcome = "1" if htsh > htsa else "2" if htsa > htsh else "x"
return p in {outcome, "0" if outcome == "x" else outcome}
if m in ("OU05", "OU15", "OU25", "OU35", "OU45", "TOTAL"):
line = {"OU05": 0.5, "OU15": 1.5, "OU25": 2.5, "OU35": 3.5,
"OU45": 4.5, "TOTAL": 2.5}[m]
total = sh + sa
if total == line:
return None
is_over = total > line
if "over" in p or "üst" in p or "ust" in p:
return is_over
if "alt" in p or "under" in p:
return not is_over
return None
if m in ("OU05_HT", "OU15_HT", "OU25_HT", "HT_OU05", "HT_OU15", "HT_OU25"):
if htsh is None or htsa is None:
return None
line = {"OU05_HT": 0.5, "OU15_HT": 1.5, "OU25_HT": 2.5,
"HT_OU05": 0.5, "HT_OU15": 1.5, "HT_OU25": 2.5}[m]
total = htsh + htsa
if total == line:
return None
is_over = total > line
if "over" in p or "üst" in p or "ust" in p:
return is_over
if "alt" in p or "under" in p:
return not is_over
return None
if m in ("BTTS", "KG"):
both = sh > 0 and sa > 0
if "yes" in p or "var" in p:
return both
if "no" in p or "yok" in p:
return not both
return None
if m in ("HTFT", "IYMS"):
if htsh is None or htsa is None or "/" not in p:
return None
ht_p, ft_p = p.split("/", 1)
ht_actual = "1" if htsh > htsa else "2" if htsa > htsh else "x"
ft_actual = "1" if sh > sa else "2" if sa > sh else "x"
return ht_p.strip() == ht_actual and ft_p.strip() == ft_actual
if m in ("DC", "CIFTE_SANS"):
ft = "1" if sh > sa else "2" if sa > sh else "X"
raw = p.upper().replace("-", "").replace("/", "")
if raw in ("1X", "X1"):
pair = ["1", "X"]
elif raw in ("X2", "2X"):
pair = ["X", "2"]
elif raw in ("12", "21"):
pair = ["1", "2"]
else:
return None
return ft in pair
if m in ("OE", "TEKCIFT"):
is_odd = (sh + sa) % 2 == 1
if "tek" in p or "odd" in p:
return is_odd
if "cift" in p or "çift" in p or "even" in p:
return not is_odd
return None
return None
def compute_unit_profit(won: Optional[bool], stake: float, odds: Optional[float]) -> float:
if won is None:
return 0.0
if not won:
return -abs(stake) if stake else -1.0
if not odds or odds <= 1.0:
return 0.0
return round(stake * (odds - 1.0), 4)
# ── Data fetch ────────────────────────────────────────────────────────
def fetch_match_window(args) -> List[Dict]:
dsn = get_clean_dsn()
if "?schema=" in dsn:
dsn = dsn.split("?schema=")[0]
if args.start and args.end:
start = datetime.strptime(args.start, "%Y-%m-%d")
end = datetime.strptime(args.end, "%Y-%m-%d") + timedelta(days=1)
else:
end = datetime.now(timezone.utc).replace(tzinfo=None)
start = end - timedelta(days=args.days)
start_ms = int(start.timestamp() * 1000)
end_ms = int(end.timestamp() * 1000)
excluded = sorted(EXCLUDED_DATES)
excluded_clause = ""
if excluded:
ex_csv = ",".join(f"'{d}'" for d in excluded)
excluded_clause = (
f" AND to_timestamp(mst_utc/1000)::date "
f"NOT IN ({ex_csv})"
)
with psycopg2.connect(dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
f"""
SELECT id AS match_id,
score_home, score_away,
ht_score_home, ht_score_away,
league_id,
to_timestamp(mst_utc/1000)::date AS match_date
FROM matches
WHERE sport='football'
AND status='FT'
AND score_home IS NOT NULL
AND score_away IS NOT NULL
AND mst_utc >= %s
AND mst_utc < %s
{excluded_clause}
ORDER BY mst_utc DESC
LIMIT %s
""",
(start_ms, end_ms, args.max_matches),
)
return cur.fetchall()
# ── Per-bet capture ───────────────────────────────────────────────────
def capture_bet_row(match: Dict, package: Dict) -> Dict[str, Any]:
"""Distill orchestrator response + ground truth into one analytic row."""
main = package.get("main_pick") or {}
bb = main.get("betting_brain") or {}
advice = package.get("bet_advice") or {}
v27 = package.get("v27_engine") or {}
triple = (v27.get("triple_value") or {})
risk = package.get("risk") or {}
quality = package.get("data_quality") or {}
htft_payload = ((package.get("market_board") or {}).get("HTFT") or {})
htft_probs = htft_payload.get("probs") or {}
sh, sa = match["score_home"], match["score_away"]
htsh, htsa = match["ht_score_home"], match["ht_score_away"]
market = main.get("market")
pick = main.get("pick")
odds_val = _f(main.get("odds"))
stake = _f(main.get("stake_units"), 1.0)
playable = bool(main.get("playable")) and bool(advice.get("playable"))
won = resolve_outcome(market, pick, sh, sa, htsh, htsa) if market and pick else None
profit = compute_unit_profit(won, stake, odds_val) if playable else 0.0
# Reversal context (only meaningful for MS picks)
rev_prob = None
if market == "MS" and pick in ("1", "2"):
if pick == "1":
rev_prob = _f(htft_probs.get("1/2"), 0.0) + _f(htft_probs.get("1/X"), 0.0)
else:
rev_prob = _f(htft_probs.get("2/1"), 0.0) + _f(htft_probs.get("2/X"), 0.0)
return {
"match_id": match["match_id"],
"match_date": str(match["match_date"]),
"league_id": match.get("league_id"),
"score_home": sh,
"score_away": sa,
"ht_score_home": htsh,
"ht_score_away": htsa,
"market": market,
"pick": pick,
"odds": odds_val,
"stake_units": stake,
"playable": playable,
"won": won,
"unit_profit": profit,
"raw_confidence": _f(main.get("raw_confidence")),
"calibrated_confidence": _f(main.get("calibrated_confidence")),
"play_score": _f(main.get("play_score")),
"ev_edge": _f(main.get("ev_edge")),
"bet_grade": main.get("bet_grade"),
"is_value_sniper": bool(main.get("is_value_sniper")),
"bb_score": _f(bb.get("score")),
"bb_action": bb.get("action"),
"bb_vetoes": ";".join(bb.get("vetoes") or []),
"bb_issues": ";".join(bb.get("issues") or []),
"bb_positives": ";".join(bb.get("positives") or []),
"bb_model_prob": _f(bb.get("model_prob")),
"bb_implied_prob": _f(bb.get("implied_prob")),
"bb_model_market_gap": _f(bb.get("model_market_gap")),
"bb_divergence": _f(bb.get("divergence")),
"bb_trap_market": bool(bb.get("trap_market_flag")),
"v27_consensus": v27.get("consensus"),
"data_quality_score": _f(quality.get("score")),
"data_quality_flags": ";".join(quality.get("flags") or []),
"risk_level": (risk.get("level") if isinstance(risk, dict) else None),
"odds_reliability": _f(main.get("odds_reliability")),
"htft_reversal_prob": rev_prob,
"htft_top_pick": _argmax(htft_probs),
"league_name": (package.get("match_info") or {}).get("league_name"),
"is_cup": _is_cup((package.get("match_info") or {}).get("league_name") or ""),
"model_version": package.get("model_version"),
"decision_reason": main.get("pick_reason") or advice.get("reason"),
}
def _f(x: Any, default: Optional[float] = None) -> Optional[float]:
try:
return float(x) if x is not None else default
except (TypeError, ValueError):
return default
def _argmax(d: Dict[str, Any]) -> Optional[str]:
best, val = None, -1.0
for k, v in d.items():
fv = _f(v, 0.0) or 0.0
if fv > val:
best, val = k, fv
return best
_CUP_KEYWORDS = ("kupa", "cup", "coupe", "copa", "coppa", "pokal", "trophy",
"shield", "ziraat", "süper kupa", "super cup", "beker", "taça", "taca")
def _is_cup(name: str) -> bool:
n = (name or "").lower()
return any(kw in n for kw in _CUP_KEYWORDS)
# ── Aggregation helpers ────────────────────────────────────────────────
def _bucket(value: Optional[float], edges: List[float]) -> Optional[str]:
if value is None:
return None
for i, edge in enumerate(edges):
if value < edge:
if i == 0:
return f"<{edge}"
return f"{edges[i-1]}-{edge}"
return f">={edges[-1]}"
def _summary_stats(rows: List[Dict]) -> Dict[str, Any]:
if not rows:
return {"n": 0}
settled = [r for r in rows if r["playable"] and r["won"] is not None]
won = sum(1 for r in settled if r["won"])
lost = sum(1 for r in settled if not r["won"])
profit = sum(float(r["unit_profit"]) for r in settled)
staked = sum(float(r["stake_units"]) for r in settled)
return {
"n_total": len(rows),
"n_playable_settled": len(settled),
"wins": won,
"losses": lost,
"hit_rate_pct": round(100.0 * won / len(settled), 2) if settled else None,
"unit_profit": round(profit, 3),
"staked": round(staked, 3),
"roi_pct": round(100.0 * profit / staked, 2) if staked else None,
}
def aggregate(rows: List[Dict]) -> Dict[str, Any]:
out: Dict[str, Any] = {"overall": _summary_stats(rows)}
by = lambda key_fn: defaultdict(list)
market_buckets = by(None)
conf_buckets = by(None)
odds_buckets = by(None)
grade_buckets = by(None)
cup_buckets = by(None)
motivation_buckets = by(None)
for r in rows:
if r["playable"]:
market_buckets[r["market"] or "?"].append(r)
conf_buckets[_bucket(r["calibrated_confidence"],
[45, 50, 55, 60, 65, 70, 80])].append(r)
odds_buckets[_bucket(r["odds"], [1.3, 1.5, 1.8, 2.2, 3.0, 5.0])].append(r)
grade_buckets[r["bet_grade"] or "?"].append(r)
cup_buckets["cup" if r["is_cup"] else "league"].append(r)
out["by_market"] = {k: _summary_stats(v) for k, v in market_buckets.items()}
out["by_confidence"] = {k: _summary_stats(v) for k, v in conf_buckets.items() if k}
out["by_odds"] = {k: _summary_stats(v) for k, v in odds_buckets.items() if k}
out["by_grade"] = {k: _summary_stats(v) for k, v in grade_buckets.items()}
out["by_competition"] = {k: _summary_stats(v) for k, v in cup_buckets.items()}
return out
def loss_diagnostics(rows: List[Dict]) -> Dict[str, Any]:
losses = [r for r in rows if r["playable"] and r["won"] is False]
if not losses:
return {"n_losses": 0}
n = len(losses)
def share(predicate) -> Tuple[int, float]:
c = sum(1 for r in losses if predicate(r))
return c, round(100.0 * c / n, 2)
diagnostics = {
"n_losses": n,
"total_loss_units": round(sum(float(r["unit_profit"]) for r in losses), 3),
"patterns": {
"high_htft_reversal_prob (>=0.20)": share(
lambda r: (r.get("htft_reversal_prob") or 0) >= 0.20
),
"cup_match": share(lambda r: r["is_cup"]),
"low_league_reliability (<0.45)": share(
lambda r: (r.get("odds_reliability") or 1) < 0.45
),
"v27_disagree": share(lambda r: r.get("v27_consensus") == "DISAGREE"),
"trap_market_flagged": share(lambda r: r.get("bb_trap_market")),
"low_calibrated_conf (<55)": share(
lambda r: (r.get("calibrated_confidence") or 0) < 55
),
"high_odds_underdog (>=2.5)": share(
lambda r: (r.get("odds") or 0) >= 2.5
),
"low_data_quality (<0.55)": share(
lambda r: (r.get("data_quality_score") or 1) < 0.55
),
"high_risk_level": share(
lambda r: r.get("risk_level") in ("HIGH", "EXTREME")
),
"inferred_features": share(
lambda r: "ai_features_inferred_from_history" in (r.get("data_quality_flags") or "")
),
},
"by_market": Counter(r["market"] for r in losses).most_common(),
"by_league": Counter(r.get("league_name") for r in losses).most_common(10),
}
# Top issue tags from betting_brain across losses
issue_counter = Counter()
veto_counter = Counter()
for r in losses:
for tag in (r.get("bb_issues") or "").split(";"):
if tag:
issue_counter[tag] += 1
for tag in (r.get("bb_vetoes") or "").split(";"):
if tag:
veto_counter[tag] += 1
diagnostics["top_bb_issues_in_losses"] = issue_counter.most_common(15)
diagnostics["top_bb_vetoes_in_losses"] = veto_counter.most_common(15)
return diagnostics
# ── Recommendations ────────────────────────────────────────────────────
def make_recommendations(rows: List[Dict], agg: Dict[str, Any],
diag: Dict[str, Any]) -> List[Dict[str, Any]]:
recs: List[Dict[str, Any]] = []
overall = agg.get("overall") or {}
if not overall.get("n_playable_settled"):
return recs
# Cross-reference market hit rate vs overall — flag chronic losers.
overall_hit = overall.get("hit_rate_pct") or 0.0
for market, stats in (agg.get("by_market") or {}).items():
n = stats.get("n_playable_settled") or 0
hit = stats.get("hit_rate_pct")
roi = stats.get("roi_pct")
if n < 30:
continue
if hit is not None and roi is not None and roi < -10 and hit < overall_hit - 10:
recs.append({
"type": "drop_market",
"market": market,
"evidence": f"hit={hit}%, roi={roi}%, n={n} — chronic loser",
"suggested_fix": f"Add veto in betting_brain when market=={market} unless overwhelming evidence",
"estimated_loss_prevented_units": round(-(stats.get("unit_profit") or 0), 2),
})
# Confidence band tuning — flag bands where ROI < 0 despite passing eşik
for band, stats in (agg.get("by_confidence") or {}).items():
n = stats.get("n_playable_settled") or 0
roi = stats.get("roi_pct")
if n >= 40 and roi is not None and roi < -8:
recs.append({
"type": "raise_confidence_threshold",
"confidence_band": band,
"evidence": f"n={n}, roi={roi}%",
"suggested_fix": f"Raise MIN_BET_SCORE or market_min_conf above {band.split('-')[0]}",
})
# Loss diagnostic — if cup matches dominate losses, recommend cup-aware filter
patterns = (diag.get("patterns") or {})
cup_share = patterns.get("cup_match", (0, 0))[1]
if cup_share >= 25:
recs.append({
"type": "cup_match_filter",
"evidence": f"{cup_share}% of losses are cup matches",
"suggested_fix": "Tighten betting_brain thresholds for is_cup_match=True picks",
})
rev_share = patterns.get("high_htft_reversal_prob (>=0.20)", (0, 0))[1]
if rev_share >= 15:
recs.append({
"type": "tighten_reversal_check",
"evidence": f"{rev_share}% of losses had HTFT reversal prob >=0.20 (already partial fix)",
"suggested_fix": "Lower reversal threshold in betting_brain from 0.25 to 0.20 for veto trigger",
})
rel_share = patterns.get("low_league_reliability (<0.45)", (0, 0))[1]
if rel_share >= 20:
recs.append({
"type": "league_reliability_filter",
"evidence": f"{rel_share}% of losses in low-reliability leagues (<0.45)",
"suggested_fix": "Add hard veto when odds_reliability<0.45 for non-value-sniper picks",
})
return recs
# ── CSV / report writers ───────────────────────────────────────────────
def write_csv(rows: List[Dict], path: str):
if not rows:
return
import csv
fields = list(rows[0].keys())
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in rows:
w.writerow(r)
def write_text_summary(rows: List[Dict], agg: Dict, diag: Dict,
recs: List[Dict], path: str, args):
lines: List[str] = []
push = lines.append
push("=" * 78)
push("DIAGNOSTIC BACKTEST REPORT")
push("=" * 78)
push(f"Generated: {datetime.now().isoformat(timespec='seconds')}")
push(f"Sample window: start={args.start or f'-{args.days}d'}, end={args.end or 'now'}")
push(f"Max matches: {args.max_matches}")
push(f"Excluded days: {sorted(EXCLUDED_DATES)}")
push("")
push("OVERALL")
push("-" * 78)
overall = agg.get("overall") or {}
for k in ("n_total", "n_playable_settled", "wins", "losses",
"hit_rate_pct", "unit_profit", "staked", "roi_pct"):
push(f" {k:25}: {overall.get(k)}")
push("")
push("PER MARKET")
push("-" * 78)
push(f" {'market':<8} {'n':>6} {'hit%':>7} {'profit':>9} {'roi%':>7}")
for market, s in sorted((agg.get("by_market") or {}).items(),
key=lambda kv: -(kv[1].get("n_playable_settled") or 0)):
push(f" {market:<8} {s.get('n_playable_settled',0):>6} "
f"{str(s.get('hit_rate_pct','')):>7} "
f"{str(s.get('unit_profit','')):>9} "
f"{str(s.get('roi_pct','')):>7}")
push("")
push("PER CALIBRATED CONFIDENCE BAND")
push("-" * 78)
push(f" {'band':<10} {'n':>6} {'hit%':>7} {'roi%':>7}")
for band, s in sorted((agg.get("by_confidence") or {}).items()):
push(f" {band:<10} {s.get('n_playable_settled',0):>6} "
f"{str(s.get('hit_rate_pct','')):>7} "
f"{str(s.get('roi_pct','')):>7}")
push("")
push("PER ODDS BAND")
push("-" * 78)
push(f" {'band':<10} {'n':>6} {'hit%':>7} {'roi%':>7}")
for band, s in sorted((agg.get("by_odds") or {}).items()):
push(f" {band:<10} {s.get('n_playable_settled',0):>6} "
f"{str(s.get('hit_rate_pct','')):>7} "
f"{str(s.get('roi_pct','')):>7}")
push("")
push("LEAGUE vs CUP")
push("-" * 78)
for k, s in (agg.get("by_competition") or {}).items():
push(f" {k:<8} n={s.get('n_playable_settled',0):>4} "
f"hit={s.get('hit_rate_pct','-')}% roi={s.get('roi_pct','-')}%")
push("")
push("LOSS DIAGNOSTICS")
push("-" * 78)
push(f" total losses: {diag.get('n_losses')}")
push(f" total lost units: {diag.get('total_loss_units')}")
push(f" By market: {diag.get('by_market')}")
push(" Loss patterns (count, % of losses):")
for pattern, (c, pct) in (diag.get("patterns") or {}).items():
push(f" {pattern:<55} {c:>4} ({pct}%)")
push(" Top betting_brain issues seen in losses:")
for issue, c in (diag.get("top_bb_issues_in_losses") or []):
push(f" {issue:<55} {c}")
push(" Top betting_brain vetoes (in losses — i.e. veto fired but bet still went through value-sniper override):")
for veto, c in (diag.get("top_bb_vetoes_in_losses") or []):
push(f" {veto:<55} {c}")
push("")
push("RECOMMENDATIONS")
push("-" * 78)
if not recs:
push(" (none surfaced — sample too small or no clear pattern)")
for r in recs:
push(f" • [{r['type']}]")
for k, v in r.items():
if k == "type":
continue
push(f" {k}: {v}")
push("")
push("=" * 78)
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
# ── Main loop ─────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--days", type=int, default=14,
help="Backwards window from now (default 14)")
parser.add_argument("--max-matches", type=int, default=2000,
help="Hard cap on matches processed (default 2000)")
parser.add_argument("--start", help="Start date YYYY-MM-DD (overrides --days)")
parser.add_argument("--end", help="End date YYYY-MM-DD")
parser.add_argument("--progress-interval", type=int, default=50)
args = parser.parse_args()
print("=" * 70)
print("DIAGNOSTIC BACKTEST")
print("=" * 70)
print(f"Loading orchestrator...")
orch = get_single_match_orchestrator()
# Warm V25 + V27 + basketball loaders so the first match doesn't pay it
try:
orch._get_v25_predictor()
except Exception as e:
print(f" v25 warmup: {e}")
try:
orch._get_v27_predictor()
except Exception as e:
print(f" v27 warmup: {e}")
print(f"Fetching match window...")
matches = fetch_match_window(args)
n = len(matches)
print(f" {n} matches selected")
if not matches:
print("No matches to process. Exiting.")
return
rows: List[Dict[str, Any]] = []
errors: List[Tuple[str, str]] = []
t0 = time.time()
for i, m in enumerate(matches, start=1):
mid = str(m["match_id"])
try:
pkg = orch.analyze_match(mid)
if pkg is None:
continue
row = capture_bet_row(m, pkg)
rows.append(row)
except KeyboardInterrupt:
print("\nInterrupted, writing partial results...")
break
except Exception as e:
errors.append((mid, str(e)))
if len(errors) <= 5:
traceback.print_exc()
if i % args.progress_interval == 0:
elapsed = time.time() - t0
rate = i / elapsed
eta = (n - i) / rate if rate else 0
playable_so_far = sum(1 for r in rows if r["playable"])
print(f" [{i}/{n}] rate={rate:.1f}/s eta={eta/60:.1f}min "
f"playable={playable_so_far} errors={len(errors)}")
print(f"\nProcessed {len(rows)} rows in {(time.time()-t0):.1f}s "
f"({len(errors)} errors)")
# Aggregate
print("Aggregating...")
agg = aggregate(rows)
diag = loss_diagnostics(rows)
recs = make_recommendations(rows, agg, diag)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.csv")
json_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.json")
txt_path = os.path.join(REPORTS_DIR, f"diagnostic_backtest_{stamp}.txt")
write_csv(rows, csv_path)
with open(json_path, "w", encoding="utf-8") as f:
json.dump({"args": vars(args), "aggregate": agg, "loss_diagnostics": diag,
"recommendations": recs, "errors_sample": errors[:20]},
f, indent=2, default=str)
write_text_summary(rows, agg, diag, recs, txt_path, args)
print(f"\nOutputs:")
print(f" CSV: {csv_path}")
print(f" JSON: {json_path}")
print(f" TXT: {txt_path}")
print("\nOverall:", agg.get("overall"))
if __name__ == "__main__":
main()
+254
View File
@@ -0,0 +1,254 @@
"""
Filter Optimizer
================
Grid-search over filter thresholds (per market) using the existing
diagnostic_backtest CSV. Finds the (confidence, edge, odds, reliability)
combination that maximizes ROI while keeping bet volume reasonable.
No re-prediction needed — pure offline simulation on the bets already
captured. Output: per-market optimal thresholds + projected ROI lift +
JSON patch ready to drop into config/market_thresholds.json.
Usage:
python scripts/optimize_filters.py
python scripts/optimize_filters.py --csv reports/diagnostic_backtest_X.csv
python scripts/optimize_filters.py --min-bets 20 --apply
"""
import argparse
import json
import os
import sys
import glob
import itertools
from typing import List, Dict, Tuple, Optional
import pandas as pd
import numpy as np
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
AI_ENGINE_DIR = os.path.dirname(SCRIPT_DIR)
sys.path.insert(0, AI_ENGINE_DIR)
REPORTS_DIR = os.path.join(AI_ENGINE_DIR, "reports")
CONFIG_PATH = os.path.join(AI_ENGINE_DIR, "config", "market_thresholds.json")
def latest_csv() -> Optional[str]:
files = sorted(glob.glob(os.path.join(REPORTS_DIR, "diagnostic_backtest_*.csv")),
key=os.path.getmtime, reverse=True)
return files[0] if files else None
def load_backtest(path: str) -> pd.DataFrame:
df = pd.read_csv(path)
# Keep only playable + settled bets — these are what the SYSTEM
# actually placed and got an outcome on.
pdf = df[(df["playable"] == True) & (df["won"].notna())].copy()
pdf["won"] = pdf["won"].astype(bool)
pdf["calibrated_confidence"] = pdf["calibrated_confidence"].fillna(0)
pdf["ev_edge"] = pdf["ev_edge"].fillna(0)
pdf["odds"] = pdf["odds"].fillna(0)
pdf["odds_reliability"] = pdf["odds_reliability"].fillna(0.35)
return pdf
def evaluate(pdf: pd.DataFrame, mask) -> Dict:
kept = pdf[mask]
if len(kept) == 0:
return {"n": 0, "hit_pct": 0, "profit": 0, "staked": 0, "roi_pct": 0}
wins = kept["won"].sum()
profit = kept["unit_profit"].sum()
staked = kept["stake_units"].sum()
return {
"n": int(len(kept)),
"hit_pct": round(100.0 * wins / len(kept), 2),
"profit": round(profit, 3),
"staked": round(staked, 3),
"roi_pct": round(100.0 * profit / staked, 2) if staked else 0,
}
def grid_search_market(
market_df: pd.DataFrame,
market: str,
min_bets: int = 15,
) -> List[Dict]:
"""Try a wide grid of (min_conf, min_edge, max_edge, min_odds, max_odds,
min_reliability) combinations. Return all candidates with n >= min_bets,
sorted by ROI descending."""
conf_options = [0, 45, 50, 55, 60, 65, 70]
min_edge_options = [-1.0, -0.05, 0.0, 0.03, 0.05, 0.08]
max_edge_options = [10.0, 0.30, 0.20, 0.15, 0.10]
min_odds_options = [1.20, 1.30, 1.40, 1.50, 1.60, 1.80]
max_odds_options = [10.0, 3.0, 2.5, 2.2, 2.0]
rel_options = [0.0, 0.30, 0.45, 0.55]
consensus_options = ["any", "agree_or_null"]
candidates: List[Dict] = []
for mc, mine, maxe, mino, maxo, mrel, cons in itertools.product(
conf_options, min_edge_options, max_edge_options,
min_odds_options, max_odds_options, rel_options, consensus_options,
):
if mine >= maxe or mino >= maxo:
continue
mask = (
(market_df["calibrated_confidence"] >= mc)
& (market_df["ev_edge"] >= mine)
& (market_df["ev_edge"] <= maxe)
& (market_df["odds"] >= mino)
& (market_df["odds"] <= maxo)
& (market_df["odds_reliability"] >= mrel)
)
if cons == "agree_or_null":
mask &= market_df["v27_consensus"] != "DISAGREE"
result = evaluate(market_df, mask)
if result["n"] >= min_bets:
candidates.append({
"market": market,
"min_conf": mc,
"min_edge": mine,
"max_edge": maxe,
"min_odds": mino,
"max_odds": maxo,
"min_reliability": mrel,
"consensus": cons,
**result,
})
candidates.sort(key=lambda r: (r["roi_pct"], r["n"]), reverse=True)
return candidates
def baseline(pdf: pd.DataFrame, market: str) -> Dict:
m = pdf[pdf["market"] == market]
return evaluate(m, pd.Series([True] * len(m), index=m.index))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--csv", default=None, help="Override CSV path")
parser.add_argument("--min-bets", type=int, default=15,
help="Min bet count to consider a config valid")
parser.add_argument("--top-k", type=int, default=3,
help="Show top K configs per market")
parser.add_argument("--apply", action="store_true",
help="Patch config/market_thresholds.json with winners")
args = parser.parse_args()
csv_path = args.csv or latest_csv()
if not csv_path or not os.path.exists(csv_path):
print("No backtest CSV found.")
return
print(f"Loading: {csv_path}")
pdf = load_backtest(csv_path)
print(f"Playable + settled bets: {len(pdf)}")
markets = sorted(pdf["market"].dropna().unique())
print(f"Markets: {markets}\n")
all_winners: Dict[str, Dict] = {}
for market in markets:
market_df = pdf[pdf["market"] == market]
n_total = len(market_df)
base = baseline(pdf, market)
print(f"\n{'=' * 78}")
print(f"MARKET: {market} (n={n_total} baseline_roi={base['roi_pct']}%)")
print(f"{'=' * 78}")
if n_total < args.min_bets * 2:
print(f" Sample too small to grid-search reliably (n={n_total}). Skip.")
continue
candidates = grid_search_market(market_df, market, args.min_bets)
if not candidates:
print(f" No config kept >= {args.min_bets} bets. Skip.")
continue
# Pareto-ish: show top-K by ROI but also one that keeps higher bet count
winners = candidates[:args.top_k]
keep_high_volume = None
for c in candidates:
if c["n"] >= max(40, n_total // 3) and c["roi_pct"] > base["roi_pct"]:
keep_high_volume = c
break
print(f" {'rank':<5}{'n':>5}{'hit%':>7}{'roi%':>8} "
f"{'min_conf':>9}{'min_edge':>10}{'max_edge':>10}"
f"{'min_odds':>10}{'max_odds':>10}{'min_rel':>9}{'cons':>15}")
for i, w in enumerate(winners, 1):
print(f" {i:<5}{w['n']:>5}{w['hit_pct']:>7}{w['roi_pct']:>+8}"
f" {w['min_conf']:>9}{w['min_edge']:>+10.3f}{w['max_edge']:>+10.3f}"
f"{w['min_odds']:>10.2f}{w['max_odds']:>10.2f}"
f"{w['min_reliability']:>9.2f}{w['consensus']:>15}")
if keep_high_volume and keep_high_volume not in winners:
print(f" high {keep_high_volume['n']:>5}{keep_high_volume['hit_pct']:>7}"
f"{keep_high_volume['roi_pct']:>+8}"
f" {keep_high_volume['min_conf']:>9}"
f"{keep_high_volume['min_edge']:>+10.3f}"
f"{keep_high_volume['max_edge']:>+10.3f}"
f"{keep_high_volume['min_odds']:>10.2f}"
f"{keep_high_volume['max_odds']:>10.2f}"
f"{keep_high_volume['min_reliability']:>9.2f}"
f"{keep_high_volume['consensus']:>15}")
# Pick a "good" recommendation: best ROI with n >= min_bets
# If best ROI is still negative, flag the market as unprofitable.
best = winners[0]
all_winners[market] = best
if best["roi_pct"] <= 0:
print(f" ⚠️ Best config still loses money (ROI={best['roi_pct']}%) "
f"— consider muting this market entirely.")
else:
print(f" ✅ Best config: ROI={best['roi_pct']}% on {best['n']} bets "
f"(vs baseline {base['roi_pct']}% on {n_total}).")
# ─── Aggregate impact ────────────────────────────────────────────────
print(f"\n{'=' * 78}")
print("AGGREGATE IMPACT (if we apply each market's best config)")
print(f"{'=' * 78}")
total_old_bets = total_old_profit = total_old_staked = 0
total_new_bets = total_new_profit = total_new_staked = 0
for market, win in all_winners.items():
base = baseline(pdf, market)
total_old_bets += base["n"]
total_old_profit += base["profit"]
total_old_staked += base["staked"]
total_new_bets += win["n"]
total_new_profit += win["profit"]
total_new_staked += win["staked"]
base_roi = 100.0 * total_old_profit / total_old_staked if total_old_staked else 0
new_roi = 100.0 * total_new_profit / total_new_staked if total_new_staked else 0
print(f" Baseline: {total_old_bets:>4} bets, "
f"profit={total_old_profit:+.2f}u, ROI={base_roi:+.2f}%")
print(f" Optimized: {total_new_bets:>4} bets, "
f"profit={total_new_profit:+.2f}u, ROI={new_roi:+.2f}%")
print(f" Δ: {total_new_bets - total_old_bets:+d} bets, "
f"{total_new_profit - total_old_profit:+.2f}u, "
f"{new_roi - base_roi:+.2f}pp")
# ─── Write JSON patch ────────────────────────────────────────────────
patch_path = os.path.join(REPORTS_DIR, "filter_optimization_patch.json")
patch = {market: {
"min_calibrated_confidence": win["min_conf"],
"min_ev_edge": win["min_edge"],
"max_ev_edge": win["max_edge"],
"min_odds": win["min_odds"],
"max_odds": win["max_odds"],
"min_odds_reliability": win["min_reliability"],
"require_v27_agree": win["consensus"] == "agree_or_null",
"expected_n_bets": win["n"],
"expected_hit_pct": win["hit_pct"],
"expected_roi_pct": win["roi_pct"],
} for market, win in all_winners.items()}
with open(patch_path, "w", encoding="utf-8") as f:
json.dump(patch, f, indent=2)
print(f"\nPatch saved: {patch_path}")
if args.apply:
print("\n--apply flag set. Patching not implemented yet — "
"review the patch JSON and update config/market_thresholds.json manually.")
if __name__ == "__main__":
main()
+54
View File
@@ -0,0 +1,54 @@
"""Smoke test for the score-coherence filter using the LAFC vs Sounders
1-0 scenario from production. Verifies that markets that contradict the
predicted score are correctly excluded from the coherent set, and that
the markets the model got right are all included.
"""
import os, sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from services.betting_brain import BettingBrain
brain = BettingBrain()
pkg = {
"score_prediction": {"ft": "1-0", "ht": "0-0"},
}
coh = brain._score_consistent_markets(pkg)
print(f"Predicted: 1-0 (HT 0-0)")
print(f"Coherent set size: {len(coh)}")
print()
# Each pick the system actually offered for the LAFC match, with whether
# it was the *actual* winning pick.
test_picks = [
("MS", "1", True, "correct"),
("MS", "2", False, "wrong"),
("MS", "X", False, "wrong"),
("DC", "1X", True, "correct"),
("DC", "12", True, "correct"),
("DC", "X2", False, "wrong"),
("OU25", "Üst", False, "WRONG — system featured this"),
("OU25", "Alt", True, "correct"),
("OU35", "Alt", True, "correct"),
("OU35", "Üst", False, "wrong"),
("BTTS", "Var", False, "wrong"),
("BTTS", "Yok", True, "correct"),
("HT", "X", True, "correct"),
("HT", "1", False, "wrong"),
("HTFT", "X/1", True, "correct"),
("HTFT", "1/1", False, "wrong (HT was 0-0)"),
("HT_OU05", "Üst", False, "wrong"),
("HT_OU05", "Alt", True, "correct"),
("OE", "Çift", False, "wrong (1 is odd)"),
("OE", "Tek", True, "correct"),
]
print(f"{'market':<10}{'pick':<10}{'real-win?':<12}{'in-coherent?':<14}{'match?'}")
print("-" * 60)
ok = 0
for market, pick, would_win, note in test_picks:
in_coh = (market, pick) in coh
match = "" if in_coh == would_win else "✗ MISMATCH"
if in_coh == would_win: ok += 1
print(f"{market:<10}{pick:<10}{str(would_win):<12}{str(in_coh):<14}{match} {note}")
print()
print(f"Result: {ok}/{len(test_picks)} picks correctly classified")