481 lines
19 KiB
Python
481 lines
19 KiB
Python
"""
|
|
V27 Value Sniper — PRO Training Script
|
|
========================================
|
|
KEY INSIGHT: Train model WITHOUT odds to get independent probability.
|
|
Then compare with market odds to find genuine value edges.
|
|
|
|
Strategy:
|
|
Stage A: "Fundamentals Model" — odds-free, learns from ELO/form/rolling/H2H
|
|
Stage B: "Value Model" — uses fundamentals + odds disagreement as features
|
|
Stage C: Multi-market — 1X2, O/U 2.5, BTTS
|
|
Stage D: Walk-forward backtest with Kelly sizing
|
|
"""
|
|
import os, sys, json, pickle, time, warnings
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
from sklearn.metrics import accuracy_score, log_loss
|
|
from sklearn.isotonic import IsotonicRegression
|
|
|
|
warnings.filterwarnings("ignore")
|
|
|
|
AI_DIR = Path(__file__).resolve().parent.parent
|
|
DATA_CSV = AI_DIR / "data" / "training_data_v27.csv"
|
|
MODELS_DIR = AI_DIR / "models" / "v27"
|
|
MODELS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ── Leakage & category definitions ──
|
|
LEAKAGE_COLS = [
|
|
"total_goals", "goal_diff", "ht_total_goals", "ht_goal_diff",
|
|
"score_home", "score_away", "ht_score_home", "ht_score_away",
|
|
"home_goals_form", "away_goals_form",
|
|
"home_squad_quality", "away_squad_quality", "squad_diff",
|
|
"home_key_players", "away_key_players",
|
|
"home_missing_impact", "away_missing_impact",
|
|
"referee_home_bias", "referee_avg_goals", "referee_cards_total",
|
|
"referee_avg_yellow", "referee_avg_red", "referee_penalty_rate",
|
|
"referee_over25_rate", "referee_experience", "referee_matches",
|
|
]
|
|
LABEL_COLS = [c for c in [] ] # populated dynamically
|
|
META_COLS = ["match_id", "league_name", "home_team", "away_team"]
|
|
ODDS_COLS_PATTERNS = ["odds_", "implied_"]
|
|
|
|
|
|
def get_odds_cols(df):
|
|
return [c for c in df.columns if any(c.startswith(p) for p in ODDS_COLS_PATTERNS)]
|
|
|
|
|
|
def get_label_cols(df):
|
|
return [c for c in df.columns if c.startswith("label_")]
|
|
|
|
|
|
def get_clean_features(df):
|
|
"""Features with NO odds and NO leakage — pure fundamentals."""
|
|
odds = set(get_odds_cols(df))
|
|
labels = set(get_label_cols(df))
|
|
exclude = odds | labels | set(LEAKAGE_COLS) | set(META_COLS)
|
|
# Also exclude ID columns
|
|
exclude |= {c for c in df.columns if c.endswith("_id") and c != "match_id"}
|
|
feats = [c for c in df.columns if c not in exclude]
|
|
# Keep only numeric
|
|
feats = [c for c in feats if pd.to_numeric(df[c], errors="coerce").notna().sum() > len(df)*0.3]
|
|
return feats
|
|
|
|
|
|
def load_data():
|
|
print(f"Loading {DATA_CSV}...")
|
|
df = pd.read_csv(DATA_CSV, low_memory=False)
|
|
print(f" Raw: {len(df)} rows")
|
|
|
|
# Ensure odds exist for value comparison
|
|
for c in ["odds_ms_h","odds_ms_d","odds_ms_a"]:
|
|
df[c] = pd.to_numeric(df[c], errors="coerce")
|
|
df = df.dropna(subset=["odds_ms_h","odds_ms_d","odds_ms_a"])
|
|
df = df[(df.odds_ms_h>1.01)&(df.odds_ms_d>1.01)&(df.odds_ms_a>1.01)]
|
|
|
|
# OU25 odds
|
|
for c in ["odds_ou25_over","odds_ou25_under"]:
|
|
if c in df.columns:
|
|
df[c] = pd.to_numeric(df[c], errors="coerce")
|
|
|
|
# Implied probabilities
|
|
margin = 1/df.odds_ms_h + 1/df.odds_ms_d + 1/df.odds_ms_a
|
|
df["implied_h"] = (1/df.odds_ms_h)/margin
|
|
df["implied_d"] = (1/df.odds_ms_d)/margin
|
|
df["implied_a"] = (1/df.odds_ms_a)/margin
|
|
|
|
print(f" After filter: {len(df)} rows")
|
|
return df
|
|
|
|
|
|
def temporal_split(df, val_ratio=0.15, test_ratio=0.10):
|
|
n = len(df)
|
|
tr = int(n*(1-val_ratio-test_ratio))
|
|
va = int(n*(1-test_ratio))
|
|
return df.iloc[:tr].copy(), df.iloc[tr:va].copy(), df.iloc[va:].copy()
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# STAGE A: Fundamentals-Only Model (NO ODDS)
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
def train_fundamentals_model(X_tr, y_tr, X_va, y_va, feat_cols, market="ms"):
|
|
"""Train ensemble WITHOUT odds features."""
|
|
models = {}
|
|
n_class = 3 if market == "ms" else 2
|
|
|
|
# XGBoost
|
|
try:
|
|
import xgboost as xgb
|
|
print(f" [XGB] Training {market.upper()}...")
|
|
dtrain = xgb.DMatrix(X_tr, label=y_tr, feature_names=feat_cols)
|
|
dval = xgb.DMatrix(X_va, label=y_va, feature_names=feat_cols)
|
|
params = {
|
|
"objective": "multi:softprob" if n_class==3 else "binary:logistic",
|
|
"eval_metric": "mlogloss" if n_class==3 else "logloss",
|
|
"max_depth": 6, "learning_rate": 0.02, "subsample": 0.75,
|
|
"colsample_bytree": 0.75, "min_child_weight": 10,
|
|
"reg_alpha": 0.5, "reg_lambda": 2.0,
|
|
"verbosity": 0, "tree_method": "hist",
|
|
}
|
|
if n_class == 3:
|
|
params["num_class"] = 3
|
|
m = xgb.train(params, dtrain, num_boost_round=2000,
|
|
evals=[(dval,"val")], early_stopping_rounds=80,
|
|
verbose_eval=False)
|
|
p = m.predict(dval)
|
|
if n_class == 2:
|
|
p = np.column_stack([1-p, p])
|
|
acc = accuracy_score(y_va, p.argmax(1))
|
|
print(f" acc={acc:.4f}")
|
|
models["xgb"] = m
|
|
except ImportError:
|
|
pass
|
|
|
|
# LightGBM
|
|
try:
|
|
import lightgbm as lgb
|
|
print(f" [LGB] Training {market.upper()}...")
|
|
ds_tr = lgb.Dataset(X_tr, label=y_tr)
|
|
ds_va = lgb.Dataset(X_va, label=y_va, reference=ds_tr)
|
|
par = {
|
|
"objective": "multiclass" if n_class==3 else "binary",
|
|
"metric": "multi_logloss" if n_class==3 else "binary_logloss",
|
|
"num_leaves": 48, "learning_rate": 0.02,
|
|
"feature_fraction": 0.7, "bagging_fraction": 0.7,
|
|
"bagging_freq": 1, "min_child_samples": 30,
|
|
"lambda_l1": 0.5, "lambda_l2": 2.0, "verbose": -1,
|
|
}
|
|
if n_class == 3:
|
|
par["num_class"] = 3
|
|
m = lgb.train(par, ds_tr, 2000, valid_sets=[ds_va],
|
|
callbacks=[lgb.early_stopping(80, verbose=False)])
|
|
p = m.predict(X_va)
|
|
if n_class == 2:
|
|
p = np.column_stack([1-p, p])
|
|
acc = accuracy_score(y_va, p.argmax(1))
|
|
print(f" acc={acc:.4f}")
|
|
models["lgb"] = m
|
|
except ImportError:
|
|
pass
|
|
|
|
# CatBoost
|
|
try:
|
|
from catboost import CatBoostClassifier
|
|
print(f" [CB] Training {market.upper()}...")
|
|
m = CatBoostClassifier(
|
|
iterations=2000, learning_rate=0.02, depth=6,
|
|
l2_leaf_reg=5, loss_function="MultiClass" if n_class==3 else "Logloss",
|
|
early_stopping_rounds=80, verbose=0, task_type="CPU",
|
|
**({"classes_count": 3} if n_class==3 else {}),
|
|
)
|
|
m.fit(X_tr, y_tr, eval_set=(X_va, y_va))
|
|
p = m.predict_proba(X_va)
|
|
acc = accuracy_score(y_va, p.argmax(1))
|
|
print(f" acc={acc:.4f}")
|
|
models["cb"] = m
|
|
except ImportError:
|
|
pass
|
|
|
|
return models
|
|
|
|
|
|
def ensemble_predict(models, X, feat_cols, n_class=3):
|
|
preds = []
|
|
for name, m in models.items():
|
|
if name == "xgb":
|
|
import xgboost as xgb
|
|
dm = xgb.DMatrix(X, feature_names=feat_cols)
|
|
p = m.predict(dm)
|
|
if n_class == 2 and p.ndim == 1:
|
|
p = np.column_stack([1-p, p])
|
|
elif name == "lgb":
|
|
p = m.predict(X)
|
|
if n_class == 2 and p.ndim == 1:
|
|
p = np.column_stack([1-p, p])
|
|
elif name == "cb":
|
|
p = m.predict_proba(X)
|
|
preds.append(np.array(p))
|
|
if not preds:
|
|
raise RuntimeError("No models!")
|
|
return np.mean(preds, axis=0)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# STAGE B: Walk-Forward Backtest with Kelly
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
def kelly_fraction(model_prob, odds, fraction=0.25):
|
|
"""Fractional Kelly: f = fraction * (p*odds - 1) / (odds - 1)"""
|
|
edge = model_prob * odds - 1
|
|
if edge <= 0 or odds <= 1:
|
|
return 0.0
|
|
f = edge / (odds - 1)
|
|
return max(0, min(fraction * f, 0.10)) # cap at 10% bankroll
|
|
|
|
|
|
def backtest_value(models, df_test, feat_cols, market="ms",
|
|
min_edge=0.05, min_odds=1.40, max_odds=4.50,
|
|
use_kelly=True):
|
|
"""Realistic backtest: flat or Kelly sizing, edge filtering."""
|
|
X = df_test[feat_cols].values
|
|
n_class = 3 if market == "ms" else 2
|
|
probs = ensemble_predict(models, X, feat_cols, n_class)
|
|
|
|
if market == "ms":
|
|
y = df_test["label_ms"].values
|
|
odds_arr = df_test[["odds_ms_h","odds_ms_d","odds_ms_a"]].values
|
|
implied = df_test[["implied_h","implied_d","implied_a"]].values
|
|
class_names = ["Home","Draw","Away"]
|
|
elif market == "ou25":
|
|
if "label_ou25" not in df_test.columns:
|
|
return {}
|
|
y = df_test["label_ou25"].values
|
|
o_over = pd.to_numeric(df_test["odds_ou25_o"], errors="coerce").fillna(1.85).values if "odds_ou25_o" in df_test.columns else np.full(len(df_test), 1.85)
|
|
o_under = pd.to_numeric(df_test["odds_ou25_u"], errors="coerce").fillna(1.85).values if "odds_ou25_u" in df_test.columns else np.full(len(df_test), 1.85)
|
|
odds_arr = np.column_stack([o_under, o_over])
|
|
m = 1/odds_arr
|
|
implied = m / m.sum(axis=1, keepdims=True)
|
|
class_names = ["Under","Over"]
|
|
else:
|
|
return {}
|
|
|
|
results = {"bets": [], "total": 0, "wins": 0, "pnl": 0.0, "bankroll_curve": [1000.0]}
|
|
bankroll = 1000.0
|
|
|
|
for i in range(len(y)):
|
|
for cls in range(n_class):
|
|
edge = probs[i, cls] - implied[i, cls]
|
|
odds_val = odds_arr[i, cls]
|
|
|
|
# FILTERS
|
|
if edge < min_edge:
|
|
continue
|
|
if odds_val < min_odds or odds_val > max_odds:
|
|
continue
|
|
# Don't bet on heavy favorites with tiny edge
|
|
if implied[i, cls] > 0.65 and edge < 0.08:
|
|
continue
|
|
|
|
# Sizing
|
|
if use_kelly:
|
|
frac = kelly_fraction(probs[i, cls], odds_val, fraction=0.15)
|
|
stake = bankroll * frac
|
|
else:
|
|
stake = 10.0 # flat
|
|
|
|
if stake < 1:
|
|
continue
|
|
|
|
won = (y[i] == cls)
|
|
pnl = stake * (odds_val - 1) if won else -stake
|
|
bankroll += pnl
|
|
|
|
results["bets"].append({
|
|
"edge": float(edge), "odds": float(odds_val),
|
|
"model_p": float(probs[i,cls]), "implied_p": float(implied[i,cls]),
|
|
"won": bool(won), "pnl": float(pnl), "stake": float(stake),
|
|
"class": class_names[cls],
|
|
})
|
|
results["bankroll_curve"].append(bankroll)
|
|
results["total"] += 1
|
|
if won:
|
|
results["wins"] += 1
|
|
results["pnl"] = bankroll - 1000.0
|
|
|
|
return results
|
|
|
|
|
|
def print_backtest(results, label=""):
|
|
total = results.get("total", 0)
|
|
if total == 0:
|
|
print(f" {label}: No bets placed")
|
|
return
|
|
wins = results["wins"]
|
|
pnl = results["pnl"]
|
|
hit = wins/total*100
|
|
roi = pnl / sum(b["stake"] for b in results["bets"]) * 100
|
|
curve = results["bankroll_curve"]
|
|
peak = max(curve)
|
|
dd = min((c - peak) / peak * 100 for c in curve if c <= peak) if len(curve) > 1 else 0
|
|
|
|
# Per-class breakdown
|
|
by_class = {}
|
|
for b in results["bets"]:
|
|
cls = b["class"]
|
|
if cls not in by_class:
|
|
by_class[cls] = {"n": 0, "w": 0, "pnl": 0}
|
|
by_class[cls]["n"] += 1
|
|
if b["won"]:
|
|
by_class[cls]["w"] += 1
|
|
by_class[cls]["pnl"] += b["pnl"]
|
|
|
|
print(f"\n {label}")
|
|
print(f" Bets: {total} | Hit: {hit:.1f}% | ROI: {roi:+.1f}%")
|
|
print(f" PnL: {pnl:+.0f} | Final: {curve[-1]:.0f} | MaxDD: {dd:.1f}%")
|
|
for cls, d in sorted(by_class.items()):
|
|
r = d["pnl"]/d["n"]*100 if d["n"] > 0 else 0
|
|
print(f" {cls:6s}: {d['n']:4d} bets, "
|
|
f"hit={d['w']/d['n']*100:.1f}%, avg_pnl={r:+.1f}%")
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# MAIN
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
def main():
|
|
print("=" * 65)
|
|
print(" V27 VALUE SNIPER — PRO TRAINING (Odds-Free Fundamentals)")
|
|
print("=" * 65)
|
|
t0 = time.time()
|
|
|
|
df = load_data()
|
|
clean_feats = get_clean_features(df)
|
|
print(f" Clean features (no odds): {len(clean_feats)}")
|
|
|
|
# Numerify
|
|
for c in clean_feats:
|
|
df[c] = pd.to_numeric(df[c], errors="coerce")
|
|
df[clean_feats] = df[clean_feats].fillna(df[clean_feats].median())
|
|
|
|
# Remove constant columns
|
|
clean_feats = [c for c in clean_feats if df[c].nunique() > 1]
|
|
print(f" After removing constants: {len(clean_feats)}")
|
|
|
|
# Split
|
|
tr, va, te = temporal_split(df)
|
|
print(f" Train: {len(tr)}, Val: {len(va)}, Test: {len(te)}")
|
|
print(f" Target: H={tr.label_ms.eq(0).mean():.1%}, "
|
|
f"D={tr.label_ms.eq(1).mean():.1%}, A={tr.label_ms.eq(2).mean():.1%}")
|
|
|
|
X_tr = tr[clean_feats].values
|
|
y_tr = tr["label_ms"].values
|
|
X_va = va[clean_feats].values
|
|
y_va = va["label_ms"].values
|
|
|
|
# ── STAGE A: Train fundamentals model (1X2) ──
|
|
print("\n" + "─"*65)
|
|
print(" STAGE A: Fundamentals-Only 1X2 Model")
|
|
print("─"*65)
|
|
ms_models = train_fundamentals_model(X_tr, y_tr, X_va, y_va, clean_feats, "ms")
|
|
|
|
val_probs = ensemble_predict(ms_models, X_va, clean_feats, 3)
|
|
val_acc = accuracy_score(y_va, val_probs.argmax(1))
|
|
val_ll = log_loss(y_va, val_probs)
|
|
print(f"\n Ensemble Val: acc={val_acc:.4f}, logloss={val_ll:.4f}")
|
|
|
|
# Compare with odds baseline
|
|
odds_pred = va[["implied_h","implied_d","implied_a"]].values.argmax(1)
|
|
odds_acc = accuracy_score(y_va, odds_pred)
|
|
print(f" Odds baseline: acc={odds_acc:.4f}")
|
|
print(f" Model vs Odds: {val_acc - odds_acc:+.4f}")
|
|
|
|
# ── STAGE B: O/U 2.5 Model ──
|
|
ou_models = None
|
|
if "label_ou25" in tr.columns:
|
|
print("\n" + "─"*65)
|
|
print(" STAGE A.2: Fundamentals-Only O/U 2.5 Model")
|
|
print("─"*65)
|
|
y_tr_ou = tr["label_ou25"].values
|
|
y_va_ou = va["label_ou25"].values
|
|
mask_tr = ~np.isnan(y_tr_ou)
|
|
mask_va = ~np.isnan(y_va_ou)
|
|
if mask_tr.sum() > 1000:
|
|
ou_models = train_fundamentals_model(
|
|
X_tr[mask_tr], y_tr_ou[mask_tr].astype(int),
|
|
X_va[mask_va], y_va_ou[mask_va].astype(int),
|
|
clean_feats, "ou25")
|
|
|
|
# ── STAGE C: Backtest ──
|
|
print("\n" + "─"*65)
|
|
print(" STAGE B: Walk-Forward Backtest (Test Set)")
|
|
print("─"*65)
|
|
|
|
# Try multiple edge thresholds
|
|
best_roi = -999
|
|
best_cfg = {}
|
|
for min_edge in [0.03, 0.05, 0.07, 0.10, 0.12, 0.15]:
|
|
for min_odds in [1.35, 1.50, 1.70]:
|
|
r = backtest_value(ms_models, te, clean_feats, "ms",
|
|
min_edge=min_edge, min_odds=min_odds,
|
|
max_odds=5.0, use_kelly=True)
|
|
if r.get("total", 0) >= 20:
|
|
invested = sum(b["stake"] for b in r["bets"])
|
|
roi = r["pnl"] / invested * 100 if invested > 0 else -100
|
|
if roi > best_roi:
|
|
best_roi = roi
|
|
best_cfg = {"edge": min_edge, "min_odds": min_odds, "result": r}
|
|
|
|
if best_cfg:
|
|
cfg = best_cfg
|
|
print(f"\n Best 1X2 Config: edge>{cfg['edge']}, odds>{cfg['min_odds']}")
|
|
print_backtest(cfg["result"], "1X2 VALUE")
|
|
|
|
# Flat bet comparison
|
|
print("\n --- Flat Bet Comparison ---")
|
|
for edge in [0.05, 0.07, 0.10]:
|
|
r = backtest_value(ms_models, te, clean_feats, "ms",
|
|
min_edge=edge, min_odds=1.50, max_odds=4.5,
|
|
use_kelly=False)
|
|
if r.get("total", 0) > 0:
|
|
inv = r["total"] * 10
|
|
roi = r["pnl"]/inv*100
|
|
print(f" Edge>{edge:.2f}: {r['total']} bets, "
|
|
f"hit={r['wins']/r['total']*100:.1f}%, ROI={roi:+.1f}%")
|
|
|
|
# OU25 backtest
|
|
if ou_models:
|
|
print("\n --- O/U 2.5 Backtest ---")
|
|
for edge in [0.05, 0.07, 0.10]:
|
|
r = backtest_value(ou_models, te, clean_feats, "ou25",
|
|
min_edge=edge, min_odds=1.50, max_odds=3.0,
|
|
use_kelly=True)
|
|
if r.get("total", 0) > 0:
|
|
print_backtest(r, f"OU25 edge>{edge}")
|
|
|
|
# ── Feature importance ──
|
|
if "lgb" in ms_models:
|
|
imp = ms_models["lgb"].feature_importance(importance_type="gain")
|
|
imp_df = pd.DataFrame({"feature": clean_feats, "importance": imp}
|
|
).sort_values("importance", ascending=False)
|
|
print("\n TOP 15 FEATURES (no odds!):")
|
|
for _, r in imp_df.head(15).iterrows():
|
|
print(f" {r['feature']:40s} {r['importance']:.0f}")
|
|
imp_df.to_csv(MODELS_DIR / "v27_feature_importance.csv", index=False)
|
|
|
|
# ── Save ──
|
|
print("\n" + "─"*65)
|
|
print(" SAVING MODELS")
|
|
print("─"*65)
|
|
for name, m in ms_models.items():
|
|
p = MODELS_DIR / f"v27_ms_{name}.pkl"
|
|
with open(p, "wb") as f:
|
|
pickle.dump(m, f)
|
|
print(f" ✓ {p.name}")
|
|
|
|
if ou_models:
|
|
for name, m in ou_models.items():
|
|
p = MODELS_DIR / f"v27_ou25_{name}.pkl"
|
|
with open(p, "wb") as f:
|
|
pickle.dump(m, f)
|
|
print(f" ✓ {p.name}")
|
|
|
|
meta = {
|
|
"version": "v27-pro", "trained_at": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"approach": "odds-free fundamentals + value edge detection",
|
|
"feature_count": len(clean_feats),
|
|
"total_samples": len(df),
|
|
"val_acc": round(val_acc, 4), "val_ll": round(val_ll, 4),
|
|
"best_config": {k: v for k, v in best_cfg.items() if k != "result"} if best_cfg else {},
|
|
"markets": ["ms"] + (["ou25"] if ou_models else []),
|
|
}
|
|
with open(MODELS_DIR / "v27_metadata.json", "w") as f:
|
|
json.dump(meta, f, indent=2, default=str)
|
|
with open(MODELS_DIR / "v27_feature_cols.json", "w") as f:
|
|
json.dump(clean_feats, f, indent=2)
|
|
print(f" ✓ metadata + feature_cols")
|
|
|
|
print(f"\n Total time: {(time.time()-t0)/60:.1f} min")
|
|
print(" DONE!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|