Files
iddaai-be/ai-engine/scripts/train_v27_value_sniper.py

578 lines
22 KiB
Python

"""
V27 Value Sniper — PRO Training Script
========================================
KEY INSIGHT: Train model WITHOUT odds to get independent probability.
Then compare with market odds to find genuine value edges.
Strategy:
Stage A: "Fundamentals Model" — odds-free, learns from ELO/form/rolling/H2H
Stage B: "Value Model" — uses fundamentals + odds disagreement as features
Stage C: Multi-market — 1X2, O/U 2.5, BTTS
Stage D: Walk-forward backtest with Kelly sizing
"""
import os, sys, json, pickle, time, warnings
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.metrics import accuracy_score, log_loss
from sklearn.isotonic import IsotonicRegression
warnings.filterwarnings("ignore")
AI_DIR = Path(__file__).resolve().parent.parent
DATA_CSV = AI_DIR / "data" / "training_data.csv"
MODELS_DIR = AI_DIR / "models" / "v27"
MODELS_DIR.mkdir(parents=True, exist_ok=True)
# ── Leakage & category definitions ──
LEAKAGE_COLS = [
"total_goals", "goal_diff", "ht_total_goals", "ht_goal_diff",
"score_home", "score_away", "ht_score_home", "ht_score_away",
"home_goals_form", "away_goals_form",
"home_squad_quality", "away_squad_quality", "squad_diff",
"home_key_players", "away_key_players",
"home_missing_impact", "away_missing_impact",
"referee_home_bias", "referee_avg_goals", "referee_cards_total",
"referee_avg_yellow", "referee_avg_red", "referee_penalty_rate",
"referee_over25_rate", "referee_experience", "referee_matches",
]
LABEL_COLS = [c for c in [] ] # populated dynamically
META_COLS = ["match_id", "league_name", "home_team", "away_team"]
ODDS_COLS_PATTERNS = ["odds_", "implied_"]
def get_odds_cols(df):
return [c for c in df.columns if any(c.startswith(p) for p in ODDS_COLS_PATTERNS)]
def get_label_cols(df):
return [c for c in df.columns if c.startswith("label_")]
def get_clean_features(df):
"""Features with NO odds and NO leakage — pure fundamentals."""
odds = set(get_odds_cols(df))
labels = set(get_label_cols(df))
exclude = odds | labels | set(LEAKAGE_COLS) | set(META_COLS)
# Also exclude ID columns
exclude |= {c for c in df.columns if c.endswith("_id") and c != "match_id"}
feats = [c for c in df.columns if c not in exclude]
# Keep only numeric
feats = [c for c in feats if pd.to_numeric(df[c], errors="coerce").notna().sum() > len(df)*0.3]
return feats
def load_data():
print(f"Loading {DATA_CSV}...")
df = pd.read_csv(DATA_CSV, low_memory=False)
print(f" Raw: {len(df)} rows")
# Ensure odds exist for value comparison
for c in ["odds_ms_h","odds_ms_d","odds_ms_a"]:
df[c] = pd.to_numeric(df[c], errors="coerce")
df = df.dropna(subset=["odds_ms_h","odds_ms_d","odds_ms_a"])
df = df[(df.odds_ms_h>1.01)&(df.odds_ms_d>1.01)&(df.odds_ms_a>1.01)]
# OU25 odds
for c in ["odds_ou25_over","odds_ou25_under"]:
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors="coerce")
# Implied probabilities
margin = 1/df.odds_ms_h + 1/df.odds_ms_d + 1/df.odds_ms_a
df["implied_h"] = (1/df.odds_ms_h)/margin
df["implied_d"] = (1/df.odds_ms_d)/margin
df["implied_a"] = (1/df.odds_ms_a)/margin
print(f" After filter: {len(df)} rows")
return df
def temporal_split(df, val_ratio=0.15, test_ratio=0.10):
n = len(df)
tr = int(n*(1-val_ratio-test_ratio))
va = int(n*(1-test_ratio))
return df.iloc[:tr].copy(), df.iloc[tr:va].copy(), df.iloc[va:].copy()
# ═══════════════════════════════════════════════════════════════════
# STAGE A: Fundamentals-Only Model (NO ODDS)
# ═══════════════════════════════════════════════════════════════════
def train_fundamentals_model(X_tr, y_tr, X_va, y_va, feat_cols, market="ms"):
"""Train ensemble WITHOUT odds features."""
models = {}
n_class = 3 if market == "ms" else 2
# XGBoost
try:
import xgboost as xgb
print(f" [XGB] Training {market.upper()}...")
dtrain = xgb.DMatrix(X_tr, label=y_tr, feature_names=feat_cols)
dval = xgb.DMatrix(X_va, label=y_va, feature_names=feat_cols)
params = {
"objective": "multi:softprob" if n_class==3 else "binary:logistic",
"eval_metric": "mlogloss" if n_class==3 else "logloss",
"max_depth": 6, "learning_rate": 0.02, "subsample": 0.75,
"colsample_bytree": 0.75, "min_child_weight": 10,
"reg_alpha": 0.5, "reg_lambda": 2.0,
"verbosity": 0, "tree_method": "hist",
}
if n_class == 3:
params["num_class"] = 3
m = xgb.train(params, dtrain, num_boost_round=2000,
evals=[(dval,"val")], early_stopping_rounds=80,
verbose_eval=False)
p = m.predict(dval)
if n_class == 2:
p = np.column_stack([1-p, p])
acc = accuracy_score(y_va, p.argmax(1))
print(f" acc={acc:.4f}")
models["xgb"] = m
except ImportError:
pass
# LightGBM
try:
import lightgbm as lgb
print(f" [LGB] Training {market.upper()}...")
ds_tr = lgb.Dataset(X_tr, label=y_tr)
ds_va = lgb.Dataset(X_va, label=y_va, reference=ds_tr)
par = {
"objective": "multiclass" if n_class==3 else "binary",
"metric": "multi_logloss" if n_class==3 else "binary_logloss",
"num_leaves": 48, "learning_rate": 0.02,
"feature_fraction": 0.7, "bagging_fraction": 0.7,
"bagging_freq": 1, "min_child_samples": 30,
"lambda_l1": 0.5, "lambda_l2": 2.0, "verbose": -1,
}
if n_class == 3:
par["num_class"] = 3
m = lgb.train(par, ds_tr, 2000, valid_sets=[ds_va],
callbacks=[lgb.early_stopping(80, verbose=False)])
p = m.predict(X_va)
if n_class == 2:
p = np.column_stack([1-p, p])
acc = accuracy_score(y_va, p.argmax(1))
print(f" acc={acc:.4f}")
models["lgb"] = m
except ImportError:
pass
# CatBoost
try:
from catboost import CatBoostClassifier
print(f" [CB] Training {market.upper()}...")
m = CatBoostClassifier(
iterations=2000, learning_rate=0.02, depth=6,
l2_leaf_reg=5, loss_function="MultiClass" if n_class==3 else "Logloss",
early_stopping_rounds=80, verbose=0, task_type="CPU",
**({"classes_count": 3} if n_class==3 else {}),
)
m.fit(X_tr, y_tr, eval_set=(X_va, y_va))
p = m.predict_proba(X_va)
acc = accuracy_score(y_va, p.argmax(1))
print(f" acc={acc:.4f}")
models["cb"] = m
except ImportError:
pass
return models
def ensemble_predict(models, X, feat_cols, n_class=3):
preds = []
for name, m in models.items():
if name == "xgb":
import xgboost as xgb
dm = xgb.DMatrix(X, feature_names=feat_cols)
p = m.predict(dm)
if n_class == 2 and p.ndim == 1:
p = np.column_stack([1-p, p])
elif name == "lgb":
p = m.predict(X)
if n_class == 2 and p.ndim == 1:
p = np.column_stack([1-p, p])
elif name == "cb":
p = m.predict_proba(X)
preds.append(np.array(p))
if not preds:
raise RuntimeError("No models!")
return np.mean(preds, axis=0)
# ═══════════════════════════════════════════════════════════════════
# STAGE B: Walk-Forward Backtest with Kelly
# ═══════════════════════════════════════════════════════════════════
def kelly_fraction(model_prob, odds, fraction=0.25):
"""Fractional Kelly: f = fraction * (p*odds - 1) / (odds - 1)"""
edge = model_prob * odds - 1
if edge <= 0 or odds <= 1:
return 0.0
f = edge / (odds - 1)
return max(0, min(fraction * f, 0.10)) # cap at 10% bankroll
def backtest_value(models, df_test, feat_cols, market="ms",
min_edge=0.05, min_odds=1.40, max_odds=4.50,
use_kelly=True):
"""Realistic backtest: flat or Kelly sizing, edge filtering."""
X = df_test[feat_cols].values
n_class = 3 if market == "ms" else 2
probs = ensemble_predict(models, X, feat_cols, n_class)
if market == "ms":
y = df_test["label_ms"].values
odds_arr = df_test[["odds_ms_h","odds_ms_d","odds_ms_a"]].values
implied = df_test[["implied_h","implied_d","implied_a"]].values
class_names = ["Home","Draw","Away"]
elif market == "ou25":
if "label_ou25" not in df_test.columns:
return {}
y = df_test["label_ou25"].values
o_over = pd.to_numeric(df_test["odds_ou25_o"], errors="coerce").fillna(1.85).values if "odds_ou25_o" in df_test.columns else np.full(len(df_test), 1.85)
o_under = pd.to_numeric(df_test["odds_ou25_u"], errors="coerce").fillna(1.85).values if "odds_ou25_u" in df_test.columns else np.full(len(df_test), 1.85)
odds_arr = np.column_stack([o_under, o_over])
m = 1/odds_arr
implied = m / m.sum(axis=1, keepdims=True)
class_names = ["Under","Over"]
else:
return {}
results = {"bets": [], "total": 0, "wins": 0, "pnl": 0.0, "bankroll_curve": [1000.0]}
bankroll = 1000.0
for i in range(len(y)):
for cls in range(n_class):
edge = probs[i, cls] - implied[i, cls]
odds_val = odds_arr[i, cls]
# FILTERS
if edge < min_edge:
continue
if odds_val < min_odds or odds_val > max_odds:
continue
# Don't bet on heavy favorites with tiny edge
if implied[i, cls] > 0.65 and edge < 0.08:
continue
# Sizing
if use_kelly:
frac = kelly_fraction(probs[i, cls], odds_val, fraction=0.15)
stake = bankroll * frac
else:
stake = 10.0 # flat
if stake < 1:
continue
won = (y[i] == cls)
pnl = stake * (odds_val - 1) if won else -stake
bankroll += pnl
results["bets"].append({
"edge": float(edge), "odds": float(odds_val),
"model_p": float(probs[i,cls]), "implied_p": float(implied[i,cls]),
"won": bool(won), "pnl": float(pnl), "stake": float(stake),
"class": class_names[cls],
})
results["bankroll_curve"].append(bankroll)
results["total"] += 1
if won:
results["wins"] += 1
results["pnl"] = bankroll - 1000.0
return results
def print_backtest(results, label=""):
total = results.get("total", 0)
if total == 0:
print(f" {label}: No bets placed")
return
wins = results["wins"]
pnl = results["pnl"]
hit = wins/total*100
roi = pnl / sum(b["stake"] for b in results["bets"]) * 100
curve = results["bankroll_curve"]
peak = max(curve)
dd = min((c - peak) / peak * 100 for c in curve if c <= peak) if len(curve) > 1 else 0
# Per-class breakdown
by_class = {}
for b in results["bets"]:
cls = b["class"]
if cls not in by_class:
by_class[cls] = {"n": 0, "w": 0, "pnl": 0}
by_class[cls]["n"] += 1
if b["won"]:
by_class[cls]["w"] += 1
by_class[cls]["pnl"] += b["pnl"]
print(f"\n {label}")
print(f" Bets: {total} | Hit: {hit:.1f}% | ROI: {roi:+.1f}%")
print(f" PnL: {pnl:+.0f} | Final: {curve[-1]:.0f} | MaxDD: {dd:.1f}%")
for cls, d in sorted(by_class.items()):
r = d["pnl"]/d["n"]*100 if d["n"] > 0 else 0
print(f" {cls:6s}: {d['n']:4d} bets, "
f"hit={d['w']/d['n']*100:.1f}%, avg_pnl={r:+.1f}%")
# ═══════════════════════════════════════════════════════════════════
# MAIN
# ═══════════════════════════════════════════════════════════════════
def main():
print("=" * 65)
print(" V27 VALUE SNIPER — PRO TRAINING (Odds-Free Fundamentals)")
print("=" * 65)
t0 = time.time()
df = load_data()
clean_feats = get_clean_features(df)
print(f" Clean features (no odds): {len(clean_feats)}")
# Numerify
for c in clean_feats:
df[c] = pd.to_numeric(df[c], errors="coerce")
df[clean_feats] = df[clean_feats].fillna(df[clean_feats].median())
# Remove constant columns
clean_feats = [c for c in clean_feats if df[c].nunique() > 1]
print(f" After removing constants: {len(clean_feats)}")
# Split
tr, va, te = temporal_split(df)
print(f" Train: {len(tr)}, Val: {len(va)}, Test: {len(te)}")
print(f" Target: H={tr.label_ms.eq(0).mean():.1%}, "
f"D={tr.label_ms.eq(1).mean():.1%}, A={tr.label_ms.eq(2).mean():.1%}")
X_tr = tr[clean_feats].values
y_tr = tr["label_ms"].values
X_va = va[clean_feats].values
y_va = va["label_ms"].values
# ── STAGE A: Train fundamentals model (1X2) ──
print("\n" + ""*65)
print(" STAGE A: Fundamentals-Only 1X2 Model")
print(""*65)
ms_models = train_fundamentals_model(X_tr, y_tr, X_va, y_va, clean_feats, "ms")
val_probs = ensemble_predict(ms_models, X_va, clean_feats, 3)
val_acc = accuracy_score(y_va, val_probs.argmax(1))
val_ll = log_loss(y_va, val_probs)
print(f"\n Ensemble Val: acc={val_acc:.4f}, logloss={val_ll:.4f}")
# Compare with odds baseline
odds_pred = va[["implied_h","implied_d","implied_a"]].values.argmax(1)
odds_acc = accuracy_score(y_va, odds_pred)
print(f" Odds baseline: acc={odds_acc:.4f}")
print(f" Model vs Odds: {val_acc - odds_acc:+.4f}")
# ── STAGE B: O/U 2.5 Model ──
ou_models = None
if "label_ou25" in tr.columns:
print("\n" + ""*65)
print(" STAGE A.2: Fundamentals-Only O/U 2.5 Model")
print(""*65)
y_tr_ou = tr['label_ou25'].values
y_va_ou = va['label_ou25'].values
mask_tr = ~np.isnan(y_tr_ou)
mask_va = ~np.isnan(y_va_ou)
if mask_tr.sum() > 1000:
ou_models = train_fundamentals_model(
X_tr[mask_tr], y_tr_ou[mask_tr].astype(int),
X_va[mask_va], y_va_ou[mask_va].astype(int),
clean_feats, 'ou25')
# ── STAGE A.3: BTTS Model ──
btts_models = None
if 'label_btts' in tr.columns:
print('\n' + '' * 65)
print(' STAGE A.3: Fundamentals-Only BTTS Model')
print('' * 65)
y_tr_btts = tr['label_btts'].values
y_va_btts = va['label_btts'].values
mask_tr_btts = ~np.isnan(y_tr_btts)
mask_va_btts = ~np.isnan(y_va_btts)
if mask_tr_btts.sum() > 1000:
btts_models = train_fundamentals_model(
X_tr[mask_tr_btts], y_tr_btts[mask_tr_btts].astype(int),
X_va[mask_va_btts], y_va_btts[mask_va_btts].astype(int),
clean_feats, 'btts')
# Quick val accuracy
btts_probs = ensemble_predict(
btts_models,
X_va[mask_va_btts],
clean_feats,
n_class=2,
)
btts_acc = accuracy_score(
y_va_btts[mask_va_btts].astype(int),
btts_probs.argmax(1),
)
btts_ll = log_loss(
y_va_btts[mask_va_btts].astype(int),
btts_probs,
)
print(f'\n BTTS Ensemble Val: acc={btts_acc:.4f}, logloss={btts_ll:.4f}')
# Compare with naive baseline (always predict majority class)
btts_majority = y_va_btts[mask_va_btts].astype(int).mean()
print(f' BTTS baseline: {max(btts_majority, 1-btts_majority):.4f} (majority class)')
print(f' Model vs baseline: {btts_acc - max(btts_majority, 1-btts_majority):+.4f}')
# ── STAGE C: Backtest ──
print("\n" + ""*65)
print(" STAGE B: Walk-Forward Backtest (Test Set)")
print(""*65)
# Try multiple edge thresholds
best_roi = -999
best_cfg = {}
for min_edge in [0.03, 0.05, 0.07, 0.10, 0.12, 0.15]:
for min_odds in [1.35, 1.50, 1.70]:
r = backtest_value(ms_models, te, clean_feats, "ms",
min_edge=min_edge, min_odds=min_odds,
max_odds=5.0, use_kelly=True)
if r.get("total", 0) >= 20:
invested = sum(b["stake"] for b in r["bets"])
roi = r["pnl"] / invested * 100 if invested > 0 else -100
if roi > best_roi:
best_roi = roi
best_cfg = {"edge": min_edge, "min_odds": min_odds, "result": r}
if best_cfg:
cfg = best_cfg
print(f"\n Best 1X2 Config: edge>{cfg['edge']}, odds>{cfg['min_odds']}")
print_backtest(cfg["result"], "1X2 VALUE")
# Flat bet comparison
print("\n --- Flat Bet Comparison ---")
for edge in [0.05, 0.07, 0.10]:
r = backtest_value(ms_models, te, clean_feats, "ms",
min_edge=edge, min_odds=1.50, max_odds=4.5,
use_kelly=False)
if r.get("total", 0) > 0:
inv = r["total"] * 10
roi = r["pnl"]/inv*100
print(f" Edge>{edge:.2f}: {r['total']} bets, "
f"hit={r['wins']/r['total']*100:.1f}%, ROI={roi:+.1f}%")
# OU25 backtest
if ou_models:
print('\n --- O/U 2.5 Backtest ---')
for edge in [0.05, 0.07, 0.10]:
r = backtest_value(ou_models, te, clean_feats, 'ou25',
min_edge=edge, min_odds=1.50, max_odds=3.0,
use_kelly=True)
if r.get('total', 0) > 0:
print_backtest(r, f'OU25 edge>{edge}')
# BTTS backtest
if btts_models and 'label_btts' in te.columns:
print('\n --- BTTS Backtest ---')
# Build BTTS odds for backtest
if 'odds_btts_y' in te.columns and 'odds_btts_n' in te.columns:
te_btts = te.copy()
te_btts['odds_btts_y'] = pd.to_numeric(
te_btts['odds_btts_y'], errors='coerce',
).fillna(1.85)
te_btts['odds_btts_n'] = pd.to_numeric(
te_btts['odds_btts_n'], errors='coerce',
).fillna(1.85)
for edge in [0.05, 0.07, 0.10]:
X_test = te_btts[clean_feats].values
probs = ensemble_predict(btts_models, X_test, clean_feats, 2)
y_btts = te_btts['label_btts'].values.astype(int)
odds_arr = te_btts[['odds_btts_n', 'odds_btts_y']].values
m_arr = 1 / odds_arr
impl = m_arr / m_arr.sum(axis=1, keepdims=True)
total_bets = 0
wins = 0
pnl = 0.0
for i in range(len(y_btts)):
for cls in range(2):
e = probs[i, cls] - impl[i, cls]
o = odds_arr[i, cls]
if e < edge or o < 1.50 or o > 3.0:
continue
total_bets += 1
won = (y_btts[i] == cls)
if won:
wins += 1
pnl += 10 * (o - 1)
else:
pnl -= 10
if total_bets > 0:
roi = pnl / (total_bets * 10) * 100
hit = wins / total_bets * 100
print(
f' Edge>{edge:.2f}: {total_bets} bets, '
f'hit={hit:.1f}%, ROI={roi:+.1f}%'
)
# ── Feature importance ──
if "lgb" in ms_models:
imp = ms_models["lgb"].feature_importance(importance_type="gain")
imp_df = pd.DataFrame({"feature": clean_feats, "importance": imp}
).sort_values("importance", ascending=False)
print("\n TOP 15 FEATURES (no odds!):")
for _, r in imp_df.head(15).iterrows():
print(f" {r['feature']:40s} {r['importance']:.0f}")
imp_df.to_csv(MODELS_DIR / "v27_feature_importance.csv", index=False)
# ── Save ──
print("\n" + ""*65)
print(" SAVING MODELS")
print(""*65)
for name, m in ms_models.items():
p = MODELS_DIR / f"v27_ms_{name}.pkl"
with open(p, "wb") as f:
pickle.dump(m, f)
print(f"{p.name}")
if ou_models:
for name, m in ou_models.items():
p = MODELS_DIR / f'v27_ou25_{name}.pkl'
with open(p, 'wb') as f:
pickle.dump(m, f)
print(f'{p.name}')
if btts_models:
for name, m in btts_models.items():
p = MODELS_DIR / f'v27_btts_{name}.pkl'
with open(p, 'wb') as f:
pickle.dump(m, f)
print(f'{p.name}')
meta = {
'version': 'v27-pro',
'trained_at': time.strftime('%Y-%m-%d %H:%M:%S'),
'approach': 'odds-free fundamentals + value edge detection',
'feature_count': len(clean_feats),
'total_samples': len(df),
'val_acc': round(val_acc, 4),
'val_ll': round(val_ll, 4),
'best_config': {
k: v for k, v in best_cfg.items() if k != 'result'
} if best_cfg else {},
'markets': (
['ms']
+ (['ou25'] if ou_models else [])
+ (['btts'] if btts_models else [])
),
}
with open(MODELS_DIR / 'v27_metadata.json', 'w') as f:
json.dump(meta, f, indent=2, default=str)
with open(MODELS_DIR / 'v27_feature_cols.json', 'w') as f:
json.dump(clean_feats, f, indent=2)
print(f' ✓ metadata + feature_cols')
print(f"\n Total time: {(time.time()-t0)/60:.1f} min")
print(" DONE!")
if __name__ == "__main__":
main()