192 lines
7.1 KiB
Python
192 lines
7.1 KiB
Python
"""
|
|
League-Specific Model Loader
|
|
=============================
|
|
Loads per-league XGBoost models + isotonic calibrators trained by
|
|
scripts/train_league_models.py and provides a unified prediction interface.
|
|
|
|
Falls back to general V25 for any market/league without a dedicated model.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import pickle
|
|
from functools import lru_cache
|
|
from typing import Dict, Optional, Tuple
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import xgboost as xgb
|
|
|
|
AI_ENGINE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
LEAGUE_MODEL_DIR = os.path.join(AI_ENGINE_DIR, "models", "league_specific")
|
|
|
|
# Market file name → (num_class, label_list)
|
|
MARKET_META: Dict[str, Tuple[int, list]] = {
|
|
"ms": (3, ["1", "X", "2"]),
|
|
"ou15": (2, ["Over", "Under"]),
|
|
"ou25": (2, ["Over", "Under"]),
|
|
"ou35": (2, ["Over", "Under"]),
|
|
"btts": (2, ["Yes", "No"]),
|
|
"ht": (3, ["1", "X", "2"]),
|
|
"ht_ou05": (2, ["Over", "Under"]),
|
|
"ht_ou15": (2, ["Over", "Under"]),
|
|
"htft": (9, ["1/1","1/X","1/2","X/1","X/X","X/2","2/1","2/X","2/2"]),
|
|
"oe": (2, ["Odd", "Even"]),
|
|
"cards": (2, ["Over", "Under"]),
|
|
"handicap": (3, ["1", "X", "2"]),
|
|
}
|
|
|
|
# Signal key map (file key → uppercase signal key used in _get_v25_signal)
|
|
FILE_TO_SIGNAL = {
|
|
"ms": "MS", "ou15": "OU15", "ou25": "OU25", "ou35": "OU35",
|
|
"btts": "BTTS", "ht": "HT", "ht_ou05": "HT_OU05", "ht_ou15": "HT_OU15",
|
|
"htft": "HTFT", "oe": "OE", "cards": "CARDS", "handicap": "HCAP",
|
|
}
|
|
|
|
|
|
class LeagueModel:
|
|
"""Holds XGBoost models + isotonic calibrators for one league."""
|
|
|
|
def __init__(self, league_id: str):
|
|
self.league_id = league_id
|
|
self.league_dir = os.path.join(LEAGUE_MODEL_DIR, league_id)
|
|
self.models: Dict[str, xgb.Booster] = {} # market_key → booster
|
|
self.calibrators: Dict[str, object] = {} # cal_key → isotonic
|
|
self.feature_cols: Optional[list] = None
|
|
self._loaded = False
|
|
|
|
def load(self) -> bool:
|
|
if not os.path.isdir(self.league_dir):
|
|
return False
|
|
try:
|
|
fc_path = os.path.join(self.league_dir, "feature_cols.json")
|
|
if os.path.exists(fc_path):
|
|
with open(fc_path) as f:
|
|
self.feature_cols = json.load(f)
|
|
|
|
for mkey in MARKET_META:
|
|
xgb_path = os.path.join(self.league_dir, f"xgb_{mkey}.json")
|
|
if os.path.exists(xgb_path) and os.path.getsize(xgb_path) > 100:
|
|
b = xgb.Booster()
|
|
b.load_model(xgb_path)
|
|
self.models[mkey] = b
|
|
|
|
for fname in os.listdir(self.league_dir):
|
|
if fname.startswith("cal_") and fname.endswith(".pkl"):
|
|
cal_key = fname[4:-4] # strip cal_ and .pkl
|
|
with open(os.path.join(self.league_dir, fname), "rb") as f:
|
|
self.calibrators[cal_key] = pickle.load(f)
|
|
|
|
self._loaded = bool(self.models or self.calibrators)
|
|
return self._loaded
|
|
except Exception as e:
|
|
print(f"[LeagueModel] Load failed for {self.league_id}: {e}")
|
|
return False
|
|
|
|
def has_market(self, mkey: str) -> bool:
|
|
return mkey in self.models
|
|
|
|
def predict_market(
|
|
self,
|
|
mkey: str,
|
|
feature_row: Dict[str, float],
|
|
) -> Optional[Dict[str, float]]:
|
|
"""
|
|
Predict one market using league-specific XGBoost + isotonic calibration.
|
|
Returns {label: prob} dict or None if no model available.
|
|
"""
|
|
if mkey not in self.models:
|
|
return None
|
|
|
|
num_class, labels = MARKET_META[mkey]
|
|
fc = self.feature_cols
|
|
if fc is None:
|
|
# Fallback to whatever the booster expects (it knows its feature names)
|
|
fc = list(self.models[mkey].feature_names or [])
|
|
|
|
try:
|
|
X = pd.DataFrame([{col: feature_row.get(col, 0.0) for col in fc}])
|
|
dmat = xgb.DMatrix(X)
|
|
raw = self.models[mkey].predict(dmat)
|
|
|
|
if num_class > 2:
|
|
probs_arr = raw.reshape(-1, num_class)[0]
|
|
probs = {labels[i]: float(probs_arr[i]) for i in range(num_class)}
|
|
# Apply isotonic calibration per class
|
|
cal_total = 0.0
|
|
for i, label in enumerate(labels):
|
|
cal_key = f"{mkey}_{i}"
|
|
if cal_key in self.calibrators:
|
|
p_cal = float(self.calibrators[cal_key].predict([probs_arr[i]])[0])
|
|
probs[label] = max(0.01, min(0.99, p_cal))
|
|
cal_total += probs[label]
|
|
if cal_total > 0:
|
|
probs = {k: v / cal_total for k, v in probs.items()}
|
|
else:
|
|
p = float(raw[0])
|
|
cal_key = mkey
|
|
if cal_key in self.calibrators:
|
|
p = float(self.calibrators[cal_key].predict([p])[0])
|
|
p = max(0.01, min(0.99, p))
|
|
probs = {labels[0]: p, labels[1]: 1.0 - p}
|
|
|
|
return probs
|
|
except Exception as e:
|
|
print(f"[LeagueModel] predict_market({mkey}) failed for {self.league_id}: {e}")
|
|
return None
|
|
|
|
|
|
class LeagueModelLoader:
|
|
"""
|
|
In-memory cache for league-specific models.
|
|
Thread-safe for single-process async servers (FastAPI/uvicorn).
|
|
"""
|
|
|
|
def __init__(self, max_cached: int = 80):
|
|
self._cache: Dict[str, Optional[LeagueModel]] = {}
|
|
self._max_cached = max_cached
|
|
|
|
def get(self, league_id: str) -> Optional[LeagueModel]:
|
|
"""Return loaded LeagueModel for this league, or None if unavailable."""
|
|
if league_id in self._cache:
|
|
return self._cache[league_id]
|
|
|
|
# Evict oldest entry if cache is full
|
|
if len(self._cache) >= self._max_cached:
|
|
oldest = next(iter(self._cache))
|
|
del self._cache[oldest]
|
|
|
|
model = LeagueModel(league_id)
|
|
loaded = model.load()
|
|
self._cache[league_id] = model if loaded else None
|
|
if loaded:
|
|
n_models = len(model.models)
|
|
n_cals = len(model.calibrators)
|
|
print(f"[LeagueModel] Loaded {league_id}: {n_models} XGB models, {n_cals} calibrators")
|
|
return self._cache[league_id]
|
|
|
|
def available_leagues(self) -> list:
|
|
if not os.path.isdir(LEAGUE_MODEL_DIR):
|
|
return []
|
|
return [d for d in os.listdir(LEAGUE_MODEL_DIR)
|
|
if os.path.isdir(os.path.join(LEAGUE_MODEL_DIR, d))]
|
|
|
|
def readiness_summary(self) -> dict:
|
|
leagues = self.available_leagues()
|
|
return {
|
|
"league_specific_dir": LEAGUE_MODEL_DIR,
|
|
"available_leagues": len(leagues),
|
|
"cached": len([v for v in self._cache.values() if v is not None]),
|
|
}
|
|
|
|
|
|
# ── Singleton ──────────────────────────────────────────────────────
|
|
_loader: Optional[LeagueModelLoader] = None
|
|
|
|
|
|
def get_league_model_loader() -> LeagueModelLoader:
|
|
global _loader
|
|
if _loader is None:
|
|
_loader = LeagueModelLoader()
|
|
return _loader
|