Files
iddaai-be/ai-engine/scripts/train_score_model.py
T
fahricansecer 244d8f5366
Deploy Iddaai Backend / build-and-deploy (push) Successful in 6s
feat(ai): expand training to 68K+ matches, add score model, backfill implied odds
- extract_training_data.py: switch from top_leagues.json (23) to qualified_leagues.json (265)
- update_implied_odds.py: new script to backfill implied odds from real market data
- train_score_model.py: rewrite with v25 102-feature set + temporal split
- single_match_orchestrator.py: integrate ML score model with heuristic fallback
2026-05-05 16:04:00 +03:00

272 lines
8.1 KiB
Python
Executable File

"""
V25-Compatible Score Prediction Model Trainer
===============================================
Trains 4 independent XGBoost regression models for:
- FT Home Goals
- FT Away Goals
- HT Home Goals
- HT Away Goals
Uses the same 102-feature set as v25_ensemble for full compatibility.
Temporal train/test split (80/20) to avoid future leakage.
Usage:
python3 scripts/train_score_model.py
"""
import os
import sys
import pickle
import numpy as np
import pandas as pd
import xgboost as xgb
from datetime import datetime
from sklearn.metrics import mean_absolute_error, r2_score, mean_squared_error
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Config
AI_ENGINE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_PATH = os.path.join(AI_ENGINE_DIR, "data", "training_data.csv")
MODEL_PATH = os.path.join(AI_ENGINE_DIR, "models", "xgb_score.pkl")
# Import the EXACT same feature set as v25 market models
from train_v25_clean import FEATURES
TARGETS = ["score_home", "score_away", "ht_score_home", "ht_score_away"]
# Model hyperparameters (tuned for goal count regression)
XGB_PARAMS = {
"objective": "reg:squarederror",
"n_estimators": 1200,
"learning_rate": 0.02,
"max_depth": 6,
"subsample": 0.8,
"colsample_bytree": 0.7,
"min_child_weight": 5,
"reg_alpha": 0.1,
"reg_lambda": 1.0,
"n_jobs": -1,
"random_state": 42,
}
def load_data() -> pd.DataFrame:
"""Load and validate training data."""
if not os.path.exists(DATA_PATH):
print(f"❌ Data file not found: {DATA_PATH}")
print(" Run extract_training_data.py first")
sys.exit(1)
print(f"📦 Loading data from {DATA_PATH}...")
df = pd.read_csv(DATA_PATH)
# Fill feature NaNs with 0 (same as v25 training)
for col in FEATURES:
if col in df.columns:
df[col] = df[col].fillna(0)
# Backward-compatible: add odds presence flags if missing
odds_base_columns = [
"odds_ms_h", "odds_ms_d", "odds_ms_a",
"odds_ht_ms_h", "odds_ht_ms_d", "odds_ht_ms_a",
"odds_ou05_o", "odds_ou05_u",
"odds_ou15_o", "odds_ou15_u",
"odds_ou25_o", "odds_ou25_u",
"odds_ou35_o", "odds_ou35_u",
"odds_ht_ou05_o", "odds_ht_ou05_u",
"odds_ht_ou15_o", "odds_ht_ou15_u",
"odds_btts_y", "odds_btts_n",
]
for base_col in odds_base_columns:
pres_col = f"{base_col}_present"
if pres_col not in df.columns and base_col in df.columns:
df[pres_col] = (df[base_col] > 1.0).astype(int)
# Drop rows where any target is missing
df = df.dropna(subset=TARGETS)
# Filter: at least MS odds must be present
df = df[df["odds_ms_h"] > 1.0].copy()
# Ensure all features exist
missing = [f for f in FEATURES if f not in df.columns]
if missing:
print(f"⚠️ Missing {len(missing)} features, filling with 0: {missing[:5]}...")
for f in missing:
df[f] = 0
return df
def temporal_split(df: pd.DataFrame, train_ratio: float = 0.80):
"""
Temporal train/test split by match date.
Ensures no future information leaks into training.
"""
if "match_date" in df.columns:
df = df.sort_values("match_date").reset_index(drop=True)
elif "round" in df.columns:
df = df.sort_values("round").reset_index(drop=True)
split_idx = int(len(df) * train_ratio)
return df.iloc[:split_idx].copy(), df.iloc[split_idx:].copy()
def train_single_model(X_train, y_train, X_test, y_test, name: str):
"""Train a single XGBoost regression model with early stopping."""
print(f"\n🏗️ Training {name} model...")
model = xgb.XGBRegressor(**XGB_PARAMS)
model.fit(
X_train, y_train,
eval_set=[(X_test, y_test)],
verbose=False,
)
preds = model.predict(X_test)
mae = mean_absolute_error(y_test, preds)
rmse = np.sqrt(mean_squared_error(y_test, preds))
r2 = r2_score(y_test, preds)
print(f" MAE: {mae:.4f} goals")
print(f" RMSE: {rmse:.4f}")
print(f" R²: {r2:.4f}")
return model, {"mae": mae, "rmse": rmse, "r2": r2}
def evaluate_combined(models: dict, X_test, y_test_dict: dict):
"""Evaluate combined score accuracy (FT and HT)."""
print("\n🎯 Combined Score Evaluation (Test Set):")
# FT Score
ft_h_preds = models["ft_home"].predict(X_test)
ft_a_preds = models["ft_away"].predict(X_test)
y_ft_h = y_test_dict["score_home"].values
y_ft_a = y_test_dict["score_away"].values
exact = 0
close = 0
result_correct = 0
total = len(X_test)
for h_true, a_true, h_pred, a_pred in zip(y_ft_h, y_ft_a, ft_h_preds, ft_a_preds):
hp = max(0, round(h_pred))
ap = max(0, round(a_pred))
# Exact score
if hp == h_true and ap == a_true:
exact += 1
# Close (±1 each)
if abs(hp - h_true) <= 1 and abs(ap - a_true) <= 1:
close += 1
# Result direction (1X2)
true_result = 1 if h_true > a_true else (0 if h_true == a_true else -1)
pred_result = 1 if hp > ap else (0 if hp == ap else -1)
if true_result == pred_result:
result_correct += 1
print(f" FT Exact Score: {exact / total * 100:.2f}% ({exact}/{total})")
print(f" FT Close (±1): {close / total * 100:.2f}% ({close}/{total})")
print(f" FT Result (1X2): {result_correct / total * 100:.2f}% ({result_correct}/{total})")
# HT Score
ht_h_preds = models["ht_home"].predict(X_test)
ht_a_preds = models["ht_away"].predict(X_test)
y_ht_h = y_test_dict["ht_score_home"].values
y_ht_a = y_test_dict["ht_score_away"].values
ht_exact = 0
ht_total = len(X_test)
for h_true, a_true, h_pred, a_pred in zip(y_ht_h, y_ht_a, ht_h_preds, ht_a_preds):
hp = max(0, round(h_pred))
ap = max(0, round(a_pred))
if hp == h_true and ap == a_true:
ht_exact += 1
print(f" HT Exact Score: {ht_exact / ht_total * 100:.2f}% ({ht_exact}/{ht_total})")
return {
"ft_exact_pct": exact / total * 100,
"ft_close_pct": close / total * 100,
"ft_result_pct": result_correct / total * 100,
"ht_exact_pct": ht_exact / ht_total * 100,
}
def train():
"""Main training pipeline."""
print("🚀 Score Prediction Model Trainer (V25-Compatible)")
print(f" Feature count: {len(FEATURES)}")
print("=" * 60)
# Load data
df = load_data()
print(f" Total valid rows: {len(df)}")
# Temporal split
train_df, test_df = temporal_split(df)
print(f" Training set: {len(train_df)} matches")
print(f" Test set: {len(test_df)} matches (temporally after training)")
X_train = train_df[FEATURES]
X_test = test_df[FEATURES]
# Train 4 models
models = {}
metrics = {}
for target_name, model_key in [
("score_home", "ft_home"),
("score_away", "ft_away"),
("ht_score_home", "ht_home"),
("ht_score_away", "ht_away"),
]:
model, metric = train_single_model(
X_train, train_df[target_name],
X_test, test_df[target_name],
model_key,
)
models[model_key] = model
metrics[model_key] = metric
# Combined evaluation
y_test_dict = {t: test_df[t] for t in TARGETS}
combined = evaluate_combined(models, X_test, y_test_dict)
# Save
print(f"\n💾 Saving to {MODEL_PATH}...")
model_data = {
"home_model": models["ft_home"],
"away_model": models["ft_away"],
"ht_home_model": models["ht_home"],
"ht_away_model": models["ht_away"],
"features": FEATURES,
"meta": {
**{f"{k}_{mk}": mv for k, m in metrics.items() for mk, mv in m.items()},
**combined,
"trained_at": datetime.now().isoformat(),
"feature_count": len(FEATURES),
"train_size": len(train_df),
"test_size": len(test_df),
},
}
with open(MODEL_PATH, "wb") as f:
pickle.dump(model_data, f)
print("\n✅ Score model training complete!")
print(f" Saved: {MODEL_PATH}")
if __name__ == "__main__":
train()