192 lines
7.9 KiB
Python
192 lines
7.9 KiB
Python
"""
|
|
Walk-Forward Odds-Blind Experiment — THE pivotal test.
|
|
======================================================
|
|
Question this answers: can a model BEAT THE MARKET out-of-sample, betting only
|
|
on information the price doesn't already contain?
|
|
|
|
Method (no leakage, time-ordered):
|
|
* data sorted by kickoff (mst_utc); train on the past, test on the future,
|
|
rolled over several folds.
|
|
* TWO models on the MS (1X2) market:
|
|
ALL = every feature INCLUDING the bookmaker odds (what the live
|
|
engine does -> it mostly re-learns the price).
|
|
BLIND = identical but odds/implied/_present columns REMOVED, so the
|
|
model must disagree with the market using fundamentals only.
|
|
* For each, an honest value-bet simulation on the test fold using the REAL
|
|
odds payouts (margin included): bet the outcome with the biggest
|
|
model_prob - implied_prob edge above a margin; ROI = realized P/L per 1u.
|
|
|
|
Read: if BLIND's value ROI is consistently > 0 across folds, there is a real,
|
|
exploitable lead. If both are <= 0 (expected), these markets aren't beatable
|
|
with this data and the honest move is to stop staking.
|
|
|
|
Usage:
|
|
python scripts/walkforward_oddsblind.py
|
|
python scripts/walkforward_oddsblind.py --folds 6 --estimators 300
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
if sys.stdout and hasattr(sys.stdout, "reconfigure"):
|
|
try:
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
AI_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
CSV = os.path.join(AI_DIR, "data", "training_data_v27.csv")
|
|
|
|
import xgboost as xgb # noqa: E402
|
|
|
|
META = {"match_id", "home_team_id", "away_team_id", "league_id", "mst_utc",
|
|
"score_home", "score_away", "ht_score_home", "ht_score_away"}
|
|
|
|
# Confirmed target leakage: *_goals_form integer-valued and ~0.63 correlated
|
|
# with THIS match's goals; their diff equals the actual goal diff 73% of the
|
|
# time. Excluded so the experiment measures genuine pre-match predictive power.
|
|
LEAKY = {
|
|
# CONFIRMED (encode the actual match result):
|
|
"home_goals_form", "away_goals_form", # ~0.63 corr w/ this match's goals
|
|
"total_goals", # this match's full-time total
|
|
"ht_total_goals", # this match's half-time total
|
|
# STRONG SUSPECTS (dominate importance + high outcome corr; audit extractor):
|
|
"squad_diff", "home_squad_quality", "away_squad_quality",
|
|
"referee_home_bias", "referee_avg_goals",
|
|
}
|
|
|
|
|
|
def is_odds_col(c: str) -> bool:
|
|
cl = c.lower()
|
|
return ("odds" in cl) or ("implied" in cl)
|
|
|
|
|
|
def logloss(y: np.ndarray, p: np.ndarray) -> float:
|
|
p = np.clip(p, 1e-9, 1 - 1e-9)
|
|
return float(-np.mean(np.log(p[np.arange(len(y)), y])))
|
|
|
|
|
|
def value_sim(proba: np.ndarray, y: np.ndarray, odds: np.ndarray,
|
|
margin: float) -> dict:
|
|
"""Bet the class with the biggest (model_prob - 1/odds) edge above margin."""
|
|
implied = np.where(odds > 1.0, 1.0 / odds, np.nan)
|
|
edge = proba - implied
|
|
# ignore classes without valid odds
|
|
edge = np.where(np.isnan(implied), -9.0, edge)
|
|
pick = np.argmax(edge, axis=1)
|
|
best_edge = edge[np.arange(len(y)), pick]
|
|
bet = best_edge > margin
|
|
n = int(bet.sum())
|
|
if n == 0:
|
|
return {"n": 0, "roi": None, "hit": None}
|
|
win = (pick == y) & bet
|
|
pick_odds = odds[np.arange(len(y)), pick]
|
|
pnl = np.where(win, pick_odds - 1.0, -1.0)
|
|
pnl = pnl[bet]
|
|
return {"n": n, "roi": round(100.0 * pnl.sum() / n, 2),
|
|
"hit": round(100.0 * win[bet].sum() / n, 1)}
|
|
|
|
|
|
def train_eval(Xtr, ytr, Xte, yte, odds_te, est, margins):
|
|
dtr = xgb.DMatrix(Xtr, label=ytr)
|
|
dte = xgb.DMatrix(Xte)
|
|
params = {"objective": "multi:softprob", "num_class": 3, "max_depth": 5,
|
|
"eta": 0.05, "subsample": 0.8, "colsample_bytree": 0.8,
|
|
"tree_method": "hist", "verbosity": 0}
|
|
booster = xgb.train(params, dtr, num_boost_round=est)
|
|
proba = booster.predict(dte)
|
|
out = {"logloss": round(logloss(yte, proba), 4),
|
|
"acc": round(100.0 * (proba.argmax(1) == yte).mean(), 1)}
|
|
for mg in margins:
|
|
out[f"val@{mg}"] = value_sim(proba, yte, odds_te, mg)
|
|
return out
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("--folds", type=int, default=5)
|
|
ap.add_argument("--estimators", type=int, default=250)
|
|
ap.add_argument("--test-frac", type=float, default=0.5,
|
|
help="Fraction at the end used as rolling OOS (default 0.5)")
|
|
args = ap.parse_args()
|
|
|
|
print(f"Loading {CSV} ...")
|
|
df = pd.read_csv(CSV, low_memory=False)
|
|
df = df.sort_values("mst_utc").reset_index(drop=True)
|
|
print(f" {len(df)} rows, {df.shape[1]} cols")
|
|
|
|
# Derive true MS outcome from scores: 0=home,1=draw,2=away (robust, no label trust)
|
|
sh = pd.to_numeric(df["score_home"], errors="coerce")
|
|
sa = pd.to_numeric(df["score_away"], errors="coerce")
|
|
y = np.where(sh > sa, 0, np.where(sh == sa, 1, 2))
|
|
valid = sh.notna() & sa.notna()
|
|
df, y = df[valid].reset_index(drop=True), y[valid.values]
|
|
|
|
odds = df[["odds_ms_h", "odds_ms_d", "odds_ms_a"]].apply(
|
|
pd.to_numeric, errors="coerce").fillna(0.0).values
|
|
|
|
feat_all = [c for c in df.columns if c not in META and not c.startswith("label_")
|
|
and c not in LEAKY]
|
|
feat_blind = [c for c in feat_all if not is_odds_col(c)]
|
|
print(f" excluded leaky cols: {sorted(LEAKY)}")
|
|
Xall = df[feat_all].apply(pd.to_numeric, errors="coerce").fillna(0.0)
|
|
Xblind = df[feat_blind].apply(pd.to_numeric, errors="coerce").fillna(0.0)
|
|
print(f" features: ALL={len(feat_all)} BLIND={len(feat_blind)} "
|
|
f"(dropped {len(feat_all)-len(feat_blind)} odds cols)")
|
|
print(f" base rates: home={100*(y==0).mean():.1f}% draw={100*(y==1).mean():.1f}% "
|
|
f"away={100*(y==2).mean():.1f}%")
|
|
|
|
n = len(df)
|
|
start = int(n * (1 - args.test_frac))
|
|
bounds = np.linspace(start, n, args.folds + 1, dtype=int)
|
|
margins = [0.0, 0.05, 0.10]
|
|
|
|
agg = {"ALL": {f"val@{m}": [] for m in margins}, "BLIND": {f"val@{m}": [] for m in margins}}
|
|
agg["ALL"]["logloss"] = []; agg["BLIND"]["logloss"] = []
|
|
|
|
print(f"\nWalk-forward: {args.folds} folds, train=expanding, est={args.estimators}\n")
|
|
hdr = f"{'fold':<5}{'model':<7}{'logloss':>9}{'acc%':>7}" + "".join(
|
|
f"{('val@'+str(m)):>22}" for m in margins)
|
|
print(hdr); print("-" * len(hdr))
|
|
for i in range(args.folds):
|
|
te0, te1 = bounds[i], bounds[i + 1]
|
|
if te1 - te0 < 50:
|
|
continue
|
|
tr = slice(0, te0)
|
|
te = slice(te0, te1)
|
|
for name, X in (("ALL", Xall), ("BLIND", Xblind)):
|
|
r = train_eval(X.iloc[tr].values, y[tr], X.iloc[te].values, y[te],
|
|
odds[te], args.estimators, margins)
|
|
agg[name]["logloss"].append(r["logloss"])
|
|
cells = ""
|
|
for m in margins:
|
|
v = r[f"val@{m}"]
|
|
agg[name][f"val@{m}"].append(v)
|
|
cells += f"{('n=' + str(v['n']) + ' roi=' + str(v['roi'])):>22}"
|
|
print(f"{i:<5}{name:<7}{r['logloss']:>9}{r['acc']:>7}{cells}")
|
|
print()
|
|
|
|
print("=" * 70)
|
|
print("AGGREGATE (sum bets, weighted ROI across folds)")
|
|
print("=" * 70)
|
|
for name in ("ALL", "BLIND"):
|
|
ll = np.mean(agg[name]["logloss"]) if agg[name]["logloss"] else float("nan")
|
|
print(f"\n{name} mean logloss={ll:.4f}")
|
|
for m in margins:
|
|
vs = agg[name][f"val@{m}"]
|
|
tot_n = sum(v["n"] for v in vs)
|
|
tot_pnl = sum((v["roi"] / 100.0 * v["n"]) for v in vs if v["roi"] is not None)
|
|
roi = round(100.0 * tot_pnl / tot_n, 2) if tot_n else None
|
|
print(f" margin {m}: total_bets={tot_n:>6} ROI(flat1u)={roi}%")
|
|
print("\nREAD: BLIND ROI>0 across margins/folds = real edge. Both <=0 = no")
|
|
print("exploitable edge in MS with this data (stop staking; the -EV is the vig).")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|