""" Train basketball V25-style market models. """ from __future__ import annotations import json import os import sys from datetime import datetime from typing import Any, Dict, List, Tuple import lightgbm as lgb import numpy as np import pandas as pd import xgboost as xgb from sklearn.metrics import accuracy_score, classification_report, log_loss AI_ENGINE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, AI_ENGINE_DIR) from models.basketball_v25_features import DEFAULT_FEATURE_COLS DATA_PATH = os.path.join(AI_ENGINE_DIR, "data", "basketball_training_data_v25.csv") MODELS_DIR = os.path.join(AI_ENGINE_DIR, "models", "basketball_v25") REPORTS_DIR = os.path.join(AI_ENGINE_DIR, "reports", "training_basketball_v25") os.makedirs(MODELS_DIR, exist_ok=True) os.makedirs(REPORTS_DIR, exist_ok=True) MARKETS = [ {"target": "label_ml", "name": "ml"}, {"target": "label_total", "name": "total"}, {"target": "label_spread", "name": "spread"}, ] def load_data() -> pd.DataFrame: if not os.path.exists(DATA_PATH): raise FileNotFoundError(DATA_PATH) frame = pd.read_csv(DATA_PATH) for col in DEFAULT_FEATURE_COLS: if col not in frame.columns: frame[col] = 0.0 frame[DEFAULT_FEATURE_COLS] = frame[DEFAULT_FEATURE_COLS].fillna(0.0) return frame def temporal_split(frame: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: ordered = frame.sort_values("mst_utc").reset_index(drop=True) size = len(ordered) train_end = max(int(size * 0.70), 1) val_end = max(int(size * 0.85), train_end + 1) val_end = min(val_end, size - 1) return ( ordered.iloc[:train_end].copy(), ordered.iloc[train_end:val_end].copy(), ordered.iloc[val_end:].copy(), ) def train_xgb(X_train, y_train, X_val, y_val): dtrain = xgb.DMatrix(X_train, label=y_train) dval = xgb.DMatrix(X_val, label=y_val) params = { "objective": "binary:logistic", "eval_metric": "logloss", "max_depth": 6, "eta": 0.04, "subsample": 0.84, "colsample_bytree": 0.82, "min_child_weight": 4, "gamma": 0.08, "n_jobs": 4, "random_state": 42, } return xgb.train( params, dtrain, num_boost_round=1200, evals=[(dtrain, "train"), (dval, "val")], early_stopping_rounds=60, verbose_eval=100, ) def train_lgb(X_train, y_train, X_val, y_val): train_data = lgb.Dataset(X_train, label=y_train) val_data = lgb.Dataset(X_val, label=y_val, reference=train_data) params = { "objective": "binary", "metric": "binary_logloss", "learning_rate": 0.04, "max_depth": 6, "feature_fraction": 0.82, "bagging_fraction": 0.84, "bagging_freq": 5, "min_child_samples": 24, "n_jobs": 4, "seed": 42, "verbose": -1, } return lgb.train( params, train_data, num_boost_round=1200, valid_sets=[train_data, val_data], valid_names=["train", "val"], callbacks=[ lgb.early_stopping(stopping_rounds=60), lgb.log_evaluation(period=100), ], ) def evaluate_binary(model: Any, X_test, y_test, model_type: str) -> Tuple[np.ndarray, Dict[str, float]]: if model_type == "xgb": probs = model.predict(xgb.DMatrix(X_test)) else: probs = model.predict(X_test, num_iteration=model.best_iteration) probs = np.asarray(probs, dtype=float) probs = np.clip(probs, 1e-6, 1.0 - 1e-6) preds = (probs >= 0.5).astype(int) metrics = { "accuracy": round(float(accuracy_score(y_test, preds)), 4), "logloss": round(float(log_loss(y_test, probs)), 4), } print(classification_report(y_test, preds, zero_division=0)) return probs, metrics def train_market(frame: pd.DataFrame, market_name: str, target_col: str) -> Dict[str, Any]: valid = frame[frame[target_col].notna()].copy() if len(valid) < 400: return {"skipped": True, "reason": "not_enough_samples", "samples": int(len(valid))} train_df, val_df, test_df = temporal_split(valid) X_train = train_df[DEFAULT_FEATURE_COLS].values y_train = train_df[target_col].astype(int).values X_val = val_df[DEFAULT_FEATURE_COLS].values y_val = val_df[target_col].astype(int).values X_test = test_df[DEFAULT_FEATURE_COLS].values y_test = test_df[target_col].astype(int).values print(f"\n[MARKET] {market_name.upper()} samples={len(valid)}") xgb_model = train_xgb(X_train, y_train, X_val, y_val) lgb_model = train_lgb(X_train, y_train, X_val, y_val) xgb_probs, xgb_metrics = evaluate_binary(xgb_model, X_test, y_test, "xgb") lgb_probs, lgb_metrics = evaluate_binary(lgb_model, X_test, y_test, "lgb") ensemble_probs = np.clip((xgb_probs + lgb_probs) / 2.0, 1e-6, 1.0 - 1e-6) ensemble_preds = (ensemble_probs >= 0.5).astype(int) ensemble_metrics = { "accuracy": round(float(accuracy_score(y_test, ensemble_preds)), 4), "logloss": round(float(log_loss(y_test, ensemble_probs)), 4), } xgb_path = os.path.join(MODELS_DIR, f"xgb_basketball_v25_{market_name}.json") lgb_path = os.path.join(MODELS_DIR, f"lgb_basketball_v25_{market_name}.txt") xgb_model.save_model(xgb_path) lgb_model.save_model(lgb_path) return { "skipped": False, "samples": int(len(valid)), "train_samples": int(len(train_df)), "val_samples": int(len(val_df)), "test_samples": int(len(test_df)), "xgb": xgb_metrics, "lgb": lgb_metrics, "ensemble": ensemble_metrics, "xgb_path": xgb_path, "lgb_path": lgb_path, } def main() -> None: print("[INFO] training basketball_v25 started", flush=True) frame = load_data() report: Dict[str, Any] = { "trained_at": datetime.utcnow().isoformat() + "Z", "rows": int(len(frame)), "markets": {}, } for market in MARKETS: report["markets"][market["name"]] = train_market(frame, market["name"], market["target"]) feature_path = os.path.join(MODELS_DIR, "feature_cols.json") with open(feature_path, "w", encoding="utf-8") as handle: json.dump(DEFAULT_FEATURE_COLS, handle, indent=2) report_path = os.path.join(REPORTS_DIR, "basketball_v25_market_metrics.json") with open(report_path, "w", encoding="utf-8") as handle: json.dump(report, handle, indent=2) print(f"[OK] feature_cols={feature_path}", flush=True) print(f"[OK] report={report_path}", flush=True) if __name__ == "__main__": main()