663 lines
32 KiB
Python
663 lines
32 KiB
Python
"""Prediction Mixin — V25 signal extraction and prediction building.
|
||
|
||
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
|
||
All methods here are composed into SingleMatchOrchestrator via inheritance.
|
||
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
|
||
initialised in the main __init__.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
import time
|
||
import math
|
||
import os
|
||
import pickle
|
||
from collections import defaultdict
|
||
from typing import Any, Dict, List, Optional, Set, Tuple, overload
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
|
||
import psycopg2
|
||
from psycopg2.extras import RealDictCursor
|
||
|
||
from data.db import get_clean_dsn
|
||
from schemas.prediction import FullMatchPrediction
|
||
from schemas.match_data import MatchData
|
||
from models.v25_ensemble import V25Predictor, get_v25_predictor
|
||
try:
|
||
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
|
||
except ImportError:
|
||
class V27Predictor: # type: ignore[no-redef]
|
||
def __init__(self): self.models = {}
|
||
def load_models(self): return False
|
||
def predict_all(self, features): return {}
|
||
def compute_divergence(*args, **kwargs):
|
||
return {}
|
||
def compute_value_edge(*args, **kwargs):
|
||
return {}
|
||
from features.odds_band_analyzer import OddsBandAnalyzer
|
||
try:
|
||
from models.basketball_v25 import (
|
||
BasketballMatchPrediction,
|
||
get_basketball_v25_predictor,
|
||
)
|
||
except ImportError:
|
||
BasketballMatchPrediction = Any # type: ignore[misc]
|
||
def get_basketball_v25_predictor() -> Any:
|
||
raise ImportError("Basketball predictor is not available")
|
||
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
|
||
from services.feature_enrichment import FeatureEnrichmentService
|
||
from services.betting_brain import BettingBrain
|
||
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
|
||
from services.match_commentary import generate_match_commentary
|
||
from utils.top_leagues import load_top_league_ids
|
||
from utils.league_reliability import load_league_reliability
|
||
from config.config_loader import build_threshold_dict, get_threshold_default, get_config
|
||
from models.calibration import get_calibrator
|
||
from models.league_model import get_league_model_loader, FILE_TO_SIGNAL
|
||
|
||
|
||
class PredictionMixin:
|
||
def _get_score_model(self) -> Optional[Dict]:
|
||
"""Load XGBoost score prediction model (non-fatal)."""
|
||
if hasattr(self, "_score_model_cache"):
|
||
return self._score_model_cache
|
||
score_model_path = os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
||
"models", "xgb_score.pkl",
|
||
)
|
||
try:
|
||
if os.path.exists(score_model_path):
|
||
with open(score_model_path, "rb") as f:
|
||
model_data = pickle.load(f)
|
||
if all(k in model_data for k in ("home_model", "away_model", "ht_home_model", "ht_away_model", "features")):
|
||
self._score_model_cache = model_data
|
||
print(f"[SCORE] ✅ Score model loaded ({len(model_data['features'])} features)")
|
||
return self._score_model_cache
|
||
except Exception as e:
|
||
print(f"[SCORE] ⚠ Load failed (non-fatal, using heuristic): {e}")
|
||
self._score_model_cache = None
|
||
return None
|
||
|
||
def _predict_score_with_model(self, features: Dict[str, float]) -> Optional[Dict[str, float]]:
|
||
"""Predict FT/HT scores using XGBoost score model."""
|
||
score_model = self._get_score_model()
|
||
if score_model is None:
|
||
return None
|
||
try:
|
||
import pandas as _pd
|
||
model_features = score_model["features"]
|
||
row = {f: float(features.get(f, 0)) for f in model_features}
|
||
df = _pd.DataFrame([row])
|
||
ft_home = max(0.0, float(score_model["home_model"].predict(df)[0]))
|
||
ft_away = max(0.0, float(score_model["away_model"].predict(df)[0]))
|
||
ht_home = max(0.0, float(score_model["ht_home_model"].predict(df)[0]))
|
||
ht_away = max(0.0, float(score_model["ht_away_model"].predict(df)[0]))
|
||
return {
|
||
"ft_home": round(ft_home, 2),
|
||
"ft_away": round(ft_away, 2),
|
||
"ht_home": round(ht_home, 2),
|
||
"ht_away": round(ht_away, 2),
|
||
}
|
||
except Exception as e:
|
||
print(f"[SCORE] ⚠ Prediction error (fallback to heuristic): {e}")
|
||
return None
|
||
|
||
_V25_KEY_MAP = {
|
||
"ms": "MS",
|
||
"ou15": "OU15",
|
||
"ou25": "OU25",
|
||
"ou35": "OU35",
|
||
"btts": "BTTS",
|
||
"ht_result": "HT",
|
||
"ht_ou05": "HT_OU05",
|
||
"ht_ou15": "HT_OU15",
|
||
"htft": "HTFT",
|
||
"cards_ou45": "CARDS",
|
||
"handicap_ms": "HCAP",
|
||
"odd_even": "OE",
|
||
}
|
||
|
||
def _get_v25_signal(
|
||
self,
|
||
data: MatchData,
|
||
features: Optional[Dict[str, float]] = None,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Get V25 ensemble predictions for all available markets.
|
||
Returns a dict keyed by UPPERCASE market name (MS, OU25, BTTS, etc.)
|
||
each with a 'probs' sub-dict that _prob_map can consume.
|
||
|
||
CRITICAL: Keys MUST be uppercase to match _build_v25_prediction lookups.
|
||
"""
|
||
v25 = self._get_v25_predictor()
|
||
feature_row = features or self._build_v25_features(data)
|
||
|
||
signal: Dict[str, Any] = {}
|
||
|
||
# ── League-specific model override ─────────────────────────────────
|
||
league_id = getattr(data, "league_id", None)
|
||
league_model = None
|
||
if league_id:
|
||
try:
|
||
league_model = get_league_model_loader().get(league_id)
|
||
except Exception:
|
||
league_model = None
|
||
|
||
if league_model:
|
||
# Predict all available markets with league-specific XGBoost
|
||
for mkey, sig_key in FILE_TO_SIGNAL.items():
|
||
probs = league_model.predict_market(mkey, feature_row)
|
||
if probs:
|
||
best_label = max(probs, key=probs.__getitem__)
|
||
signal[sig_key] = {
|
||
"probs": probs,
|
||
"raw_probs": probs,
|
||
"pick": best_label,
|
||
"probability": float(probs[best_label]),
|
||
"confidence": round(float(probs[best_label]) * 100.0, 1),
|
||
"source": "league_specific",
|
||
}
|
||
if signal:
|
||
print(f" [LEAGUE-MODEL] {league_id}: {len(signal)} markets predicted")
|
||
# Fill remaining markets from general V25 (markets not in league model)
|
||
# fall through to general prediction below for missing ones
|
||
|
||
def _temperature_scale(probs_dict: Dict[str, float], temperature: float = 1.5) -> Dict[str, float]:
|
||
"""
|
||
Apply temperature scaling to soften overconfident model outputs.
|
||
|
||
LightGBM often produces extreme probabilities (e.g., 0.999 / 0.001).
|
||
Temperature scaling converts to log-odds, divides by T, then re-normalizes.
|
||
T=1.0 → no change, T>1 → softer probabilities.
|
||
|
||
Standard approach for post-hoc model calibration (Guo et al., 2017).
|
||
|
||
V34: Reduced from 2.5 to 1.5 — V25 model is already calibrated via
|
||
odds-aware training. Excessive flattening was destroying signal.
|
||
"""
|
||
import math
|
||
eps = 1e-7 # numerical stability
|
||
n = len(probs_dict)
|
||
|
||
# V34: Reduced temperature — odds-aware model is already calibrated
|
||
# Binary markets (2-class) tend to be more overconfident in LGB
|
||
if n <= 2:
|
||
T = max(temperature, 1.5) # was 2.0
|
||
elif n == 3:
|
||
T = max(temperature * 0.8, 1.2) # was 1.5 — 3-way slightly less aggressive
|
||
else:
|
||
T = max(temperature * 0.6, 1.0) # was 1.3 — 9-way (HTFT) already spread
|
||
|
||
# Convert to log-odds and apply temperature
|
||
labels = list(probs_dict.keys())
|
||
log_odds = []
|
||
for label in labels:
|
||
p = max(eps, min(1.0 - eps, float(probs_dict[label])))
|
||
log_odds.append(math.log(p) / T)
|
||
|
||
# Softmax re-normalization
|
||
max_lo = max(log_odds)
|
||
exp_vals = [math.exp(lo - max_lo) for lo in log_odds]
|
||
total = sum(exp_vals)
|
||
|
||
scaled = {}
|
||
for i, label in enumerate(labels):
|
||
scaled[label] = exp_vals[i] / total
|
||
|
||
return scaled
|
||
|
||
calibrator = get_calibrator()
|
||
_temperature = float(get_config().get('model_ensemble.temperature', 1.5))
|
||
|
||
# Map (market_key, label) → calibrator market key
|
||
_CAL_KEY_MAP: Dict[str, str] = {
|
||
"ms_1": "ms_home", "ms_x": "ms_draw", "ms_2": "ms_away",
|
||
"ou15_over": "ou15", "ou15_under": "ou15",
|
||
"ou25_over": "ou25", "ou25_under": "ou25",
|
||
"ou35_over": "ou35", "ou35_under": "ou35",
|
||
"btts_yes": "btts", "btts_no": "btts",
|
||
"ht_1": "ht_home", "ht_x": "ht_draw", "ht_2": "ht_away",
|
||
}
|
||
|
||
def _enrich_signal_entry(probs_dict: Dict[str, float], market_key: str = "") -> Dict[str, Any]:
|
||
"""Temperature scaling + Isotonic calibration pipeline."""
|
||
scaled_probs = _temperature_scale(probs_dict, temperature=_temperature)
|
||
|
||
# Isotonic calibration per outcome (if trained models exist)
|
||
if market_key:
|
||
calibrated = {}
|
||
for label, prob in scaled_probs.items():
|
||
raw_key = f"{market_key}_{label}".lower().replace(" ", "_")
|
||
cal_key = _CAL_KEY_MAP.get(raw_key, raw_key)
|
||
calibrated[label] = calibrator.calibrate(cal_key, prob)
|
||
total = sum(calibrated.values())
|
||
if total > 0:
|
||
calibrated = {k: v / total for k, v in calibrated.items()}
|
||
scaled_probs = calibrated
|
||
|
||
best_label = max(scaled_probs, key=scaled_probs.__getitem__)
|
||
best_prob = float(scaled_probs[best_label])
|
||
return {
|
||
"probs": scaled_probs,
|
||
"raw_probs": probs_dict,
|
||
"pick": best_label,
|
||
"probability": best_prob,
|
||
"confidence": round(best_prob * 100.0, 1),
|
||
}
|
||
|
||
# Core markets using dedicated methods (skip if league model already covered them)
|
||
if "MS" not in signal:
|
||
h, d, a = v25.predict_ms(feature_row)
|
||
signal["MS"] = _enrich_signal_entry({"1": h, "X": d, "2": a}, "ms")
|
||
print(f" [V25-SIGNAL] MS → H={h:.4f} D={d:.4f} A={a:.4f}")
|
||
else:
|
||
print(f" [LEAGUE-MODEL] MS → {signal['MS']['probs']}")
|
||
|
||
if "OU25" not in signal:
|
||
over25, under25 = v25.predict_ou25(feature_row)
|
||
signal["OU25"] = _enrich_signal_entry({"Over": over25, "Under": under25}, "ou25")
|
||
print(f" [V25-SIGNAL] OU25 → O={over25:.4f} U={under25:.4f}")
|
||
|
||
if "BTTS" not in signal:
|
||
btts_y, btts_n = v25.predict_btts(feature_row)
|
||
signal["BTTS"] = _enrich_signal_entry({"Yes": btts_y, "No": btts_n}, "btts")
|
||
print(f" [V25-SIGNAL] BTTS → Y={btts_y:.4f} N={btts_n:.4f}")
|
||
|
||
# Additional markets via generic predict_market (skip if league model covered them)
|
||
for model_key, label_map in [
|
||
("ou15", {"Over": 0, "Under": None}),
|
||
("ou35", {"Over": 0, "Under": None}),
|
||
("ht_result", {"1": 0, "X": 1, "2": 2}),
|
||
("ht_ou05", {"Over": 0, "Under": None}),
|
||
("ht_ou15", {"Over": 0, "Under": None}),
|
||
("htft", None),
|
||
("cards_ou45", {"Over": 0, "Under": None}),
|
||
("handicap_ms", {"1": 0, "X": 1, "2": 2}),
|
||
("odd_even", {"Odd": 0, "Even": None}),
|
||
]:
|
||
out_key = str(self._V25_KEY_MAP.get(model_key, model_key.upper()))
|
||
if out_key in signal:
|
||
continue # already predicted by league-specific model
|
||
if not v25.has_market(model_key):
|
||
continue
|
||
raw = v25.predict_market(model_key, feature_row)
|
||
if raw is None:
|
||
continue
|
||
|
||
if label_map is None:
|
||
# HTFT — 9 combinations
|
||
htft_labels = ["1/1", "1/X", "1/2", "X/1", "X/X", "X/2", "2/1", "2/X", "2/2"]
|
||
probs_dict = {}
|
||
for i, label in enumerate(htft_labels):
|
||
probs_dict[label] = float(raw[i]) if i < len(raw) else 0.0
|
||
signal[out_key] = _enrich_signal_entry(probs_dict, model_key)
|
||
elif len(label_map) == 2:
|
||
# Binary market
|
||
labels = list(label_map.keys())
|
||
p = float(raw[0]) if len(raw) >= 1 else None
|
||
if p is None:
|
||
print(f" [V25-SIGNAL] {out_key} → EMPTY raw output, skipped")
|
||
continue
|
||
signal[out_key] = _enrich_signal_entry({labels[0]: p, labels[1]: 1.0 - p}, model_key)
|
||
elif len(label_map) == 3:
|
||
# 3-class market
|
||
labels = list(label_map.keys())
|
||
probs_dict = {}
|
||
for i, label in enumerate(labels):
|
||
if i >= len(raw):
|
||
print(f" [V25-SIGNAL] {out_key} → insufficient probabilities in raw output")
|
||
break
|
||
probs_dict[label] = float(raw[i])
|
||
else:
|
||
signal[out_key] = _enrich_signal_entry(probs_dict, model_key)
|
||
|
||
if out_key in signal:
|
||
print(f" [V25-SIGNAL] {out_key} → {signal[out_key]['probs']}")
|
||
|
||
print(f" [V25-SIGNAL] Total markets with real predictions: {len(signal)}")
|
||
if not signal:
|
||
raise RuntimeError("V25 model produced ZERO market predictions — cannot continue")
|
||
|
||
return signal
|
||
|
||
@staticmethod
|
||
def _prob_map(signal: Optional[Dict[str, Any]], market: str, defaults: Dict[str, float]) -> Dict[str, float]:
|
||
"""Extract normalised probabilities from signal.
|
||
|
||
If the signal contains real model output for this market, use it.
|
||
If the market is missing from the signal, log a warning and return
|
||
the defaults as a LAST RESORT (so the pipeline doesn't crash).
|
||
The defaults are ONLY used for non-core / secondary markets that
|
||
may not have a trained model yet (e.g. CARDS, HCAP, OE).
|
||
"""
|
||
market_payload = signal.get(market, {}) if isinstance(signal, dict) else {}
|
||
probs = market_payload.get("probs", {}) if isinstance(market_payload, dict) else {}
|
||
if not isinstance(probs, dict) or not probs:
|
||
print(f" ⚠️ [PROB_MAP] Market '{market}' NOT found in V25 signal — model output missing")
|
||
return dict(defaults)
|
||
out = {key: float(probs.get(key, value)) for key, value in defaults.items()}
|
||
total = sum(out.values())
|
||
if total <= 0:
|
||
print(f" ⚠️ [PROB_MAP] Market '{market}' has zero total probability")
|
||
return dict(defaults)
|
||
return {key: value / total for key, value in out.items()}
|
||
|
||
@staticmethod
|
||
def _is_cup_game(league_name: str) -> bool:
|
||
"""Detect cup/knockout competitions where home advantage is significantly weaker."""
|
||
name = (league_name or "").lower()
|
||
cup_keywords = (
|
||
"kupa", "cup", "coupe", "copa", "coppa", "pokal",
|
||
"trophy", "shield", "challenge",
|
||
"ziraat", "süper kupa", "super cup",
|
||
)
|
||
return any(kw in name for kw in cup_keywords)
|
||
|
||
@staticmethod
|
||
def _best_prob_pick(prob_map: Dict[str, float]) -> Tuple[str, float]:
|
||
if not prob_map:
|
||
return "", 0.0
|
||
pick = max(prob_map, key=prob_map.__getitem__)
|
||
return pick, float(prob_map[pick])
|
||
|
||
@staticmethod
|
||
def _poisson_score_top5(home_xg: float, away_xg: float, max_goals: int = 5) -> List[Dict[str, Any]]:
|
||
def poisson_p(lmbda: float, k: int) -> float:
|
||
return math.exp(-lmbda) * (lmbda ** k) / math.factorial(k)
|
||
|
||
scores: List[Tuple[str, float]] = []
|
||
for home_goals in range(max_goals + 1):
|
||
for away_goals in range(max_goals + 1):
|
||
prob = poisson_p(home_xg, home_goals) * poisson_p(away_xg, away_goals)
|
||
scores.append((f"{home_goals}-{away_goals}", prob))
|
||
scores.sort(key=lambda item: item[1], reverse=True)
|
||
return [
|
||
{"score": score, "prob": round(prob, 4)}
|
||
for score, prob in scores[:5]
|
||
]
|
||
|
||
def _build_v25_prediction(
|
||
self,
|
||
data: MatchData,
|
||
features: Dict[str, float],
|
||
v25_signal: Dict[str, Any],
|
||
) -> FullMatchPrediction:
|
||
prediction = FullMatchPrediction(
|
||
match_id=data.match_id,
|
||
home_team=data.home_team_name,
|
||
away_team=data.away_team_name,
|
||
)
|
||
|
||
ms_probs = self._prob_map(v25_signal, "MS", {"1": 0.33, "X": 0.34, "2": 0.33})
|
||
ou15_probs = self._prob_map(v25_signal, "OU15", {"Under": 0.5, "Over": 0.5})
|
||
ou25_probs = self._prob_map(v25_signal, "OU25", {"Under": 0.5, "Over": 0.5})
|
||
ou35_probs = self._prob_map(v25_signal, "OU35", {"Under": 0.5, "Over": 0.5})
|
||
btts_probs = self._prob_map(v25_signal, "BTTS", {"No": 0.5, "Yes": 0.5})
|
||
ht_probs = self._prob_map(v25_signal, "HT", {"1": 0.33, "X": 0.34, "2": 0.33})
|
||
ht_ou05_probs = self._prob_map(v25_signal, "HT_OU05", {"Under": 0.5, "Over": 0.5})
|
||
ht_ou15_probs = self._prob_map(v25_signal, "HT_OU15", {"Under": 0.5, "Over": 0.5})
|
||
htft_probs = self._prob_map(
|
||
v25_signal,
|
||
"HTFT",
|
||
{"1/1": 1 / 9, "1/X": 1 / 9, "1/2": 1 / 9, "X/1": 1 / 9, "X/X": 1 / 9, "X/2": 1 / 9, "2/1": 1 / 9, "2/X": 1 / 9, "2/2": 1 / 9},
|
||
)
|
||
oe_probs = self._prob_map(v25_signal, "OE", {"Even": 0.5, "Odd": 0.5})
|
||
cards_probs = self._prob_map(v25_signal, "CARDS", {"Under": 0.5, "Over": 0.5})
|
||
hcap_probs = self._prob_map(v25_signal, "HCAP", {"1": 0.33, "X": 0.34, "2": 0.33})
|
||
|
||
# Cup game: dampen home advantage — model trained on league data overestimates home edge
|
||
is_cup = self._is_cup_game(getattr(data, "league_name", "") or "")
|
||
if is_cup:
|
||
# Shift 8% of home probability toward away and draw (rotation, neutral venue effect)
|
||
cup_transfer = ms_probs["1"] * 0.08
|
||
ms_probs = {
|
||
"1": ms_probs["1"] - cup_transfer,
|
||
"X": ms_probs["X"] + cup_transfer * 0.4,
|
||
"2": ms_probs["2"] + cup_transfer * 0.6,
|
||
}
|
||
total = sum(ms_probs.values())
|
||
ms_probs = {k: v / total for k, v in ms_probs.items()}
|
||
|
||
prediction.ms_home_prob = ms_probs["1"]
|
||
prediction.ms_draw_prob = ms_probs["X"]
|
||
prediction.ms_away_prob = ms_probs["2"]
|
||
prediction.ms_pick, ms_top = self._best_prob_pick(ms_probs)
|
||
prediction.ms_confidence = ms_top * 100.0
|
||
|
||
prediction.dc_1x_prob = prediction.ms_home_prob + prediction.ms_draw_prob
|
||
prediction.dc_x2_prob = prediction.ms_draw_prob + prediction.ms_away_prob
|
||
prediction.dc_12_prob = prediction.ms_home_prob + prediction.ms_away_prob
|
||
dc_probs = {"1X": prediction.dc_1x_prob, "X2": prediction.dc_x2_prob, "12": prediction.dc_12_prob}
|
||
prediction.dc_pick, dc_top = self._best_prob_pick(dc_probs)
|
||
prediction.dc_confidence = dc_top * 100.0
|
||
|
||
prediction.over_15_prob = ou15_probs["Over"]
|
||
prediction.under_15_prob = ou15_probs["Under"]
|
||
prediction.ou15_pick = "1.5 Üst" if prediction.over_15_prob >= prediction.under_15_prob else "1.5 Alt"
|
||
prediction.ou15_confidence = max(prediction.over_15_prob, prediction.under_15_prob) * 100.0
|
||
|
||
prediction.over_25_prob = ou25_probs["Over"]
|
||
prediction.under_25_prob = ou25_probs["Under"]
|
||
prediction.ou25_pick = "2.5 Üst" if prediction.over_25_prob >= prediction.under_25_prob else "2.5 Alt"
|
||
prediction.ou25_confidence = max(prediction.over_25_prob, prediction.under_25_prob) * 100.0
|
||
|
||
prediction.over_35_prob = ou35_probs["Over"]
|
||
prediction.under_35_prob = ou35_probs["Under"]
|
||
prediction.ou35_pick = "3.5 Üst" if prediction.over_35_prob >= prediction.under_35_prob else "3.5 Alt"
|
||
prediction.ou35_confidence = max(prediction.over_35_prob, prediction.under_35_prob) * 100.0
|
||
|
||
prediction.btts_yes_prob = btts_probs["Yes"]
|
||
prediction.btts_no_prob = btts_probs["No"]
|
||
prediction.btts_pick = "KG Var" if prediction.btts_yes_prob >= prediction.btts_no_prob else "KG Yok"
|
||
prediction.btts_confidence = max(prediction.btts_yes_prob, prediction.btts_no_prob) * 100.0
|
||
|
||
prediction.ht_home_prob = ht_probs["1"]
|
||
prediction.ht_draw_prob = ht_probs["X"]
|
||
prediction.ht_away_prob = ht_probs["2"]
|
||
prediction.ht_pick, ht_top = self._best_prob_pick(ht_probs)
|
||
prediction.ht_confidence = ht_top * 100.0
|
||
|
||
prediction.ht_over_05_prob = ht_ou05_probs["Over"]
|
||
prediction.ht_under_05_prob = ht_ou05_probs["Under"]
|
||
prediction.ht_ou_pick = "İY 0.5 Üst" if prediction.ht_over_05_prob >= prediction.ht_under_05_prob else "İY 0.5 Alt"
|
||
|
||
prediction.ht_over_15_prob = ht_ou15_probs["Over"]
|
||
prediction.ht_under_15_prob = ht_ou15_probs["Under"]
|
||
prediction.ht_ou15_pick = "İY 1.5 Üst" if prediction.ht_over_15_prob >= prediction.ht_under_15_prob else "İY 1.5 Alt"
|
||
|
||
prediction.ht_ft_probs = htft_probs
|
||
|
||
prediction.odd_prob = oe_probs["Odd"]
|
||
prediction.even_prob = oe_probs["Even"]
|
||
prediction.odd_even_pick = "Tek" if prediction.odd_prob >= prediction.even_prob else "Çift"
|
||
|
||
prediction.cards_over_prob = cards_probs["Over"]
|
||
prediction.cards_under_prob = cards_probs["Under"]
|
||
prediction.card_pick = "4.5 Üst" if prediction.cards_over_prob >= prediction.cards_under_prob else "4.5 Alt"
|
||
prediction.cards_confidence = max(prediction.cards_over_prob, prediction.cards_under_prob) * 100.0
|
||
|
||
prediction.handicap_home_prob = hcap_probs["1"]
|
||
prediction.handicap_draw_prob = hcap_probs["X"]
|
||
prediction.handicap_away_prob = hcap_probs["2"]
|
||
prediction.handicap_pick, hcap_top = self._best_prob_pick(hcap_probs)
|
||
prediction.handicap_confidence = hcap_top * 100.0
|
||
|
||
# ── Score Prediction: Model-first, heuristic fallback ──────────
|
||
ms_edge = prediction.ms_home_prob - prediction.ms_away_prob
|
||
score_result = self._predict_score_with_model(features)
|
||
if score_result is not None:
|
||
# ML model predicted scores
|
||
prediction.home_xg = score_result["ft_home"]
|
||
prediction.away_xg = score_result["ft_away"]
|
||
prediction.total_xg = round(prediction.home_xg + prediction.away_xg, 2)
|
||
ht_home_xg = score_result["ht_home"]
|
||
ht_away_xg = score_result["ht_away"]
|
||
prediction.predicted_ft_score = f"{int(round(prediction.home_xg))}-{int(round(prediction.away_xg))}"
|
||
prediction.predicted_ht_score = f"{int(round(ht_home_xg))}-{int(round(ht_away_xg))}"
|
||
else:
|
||
# Heuristic fallback (original formula)
|
||
base_home_xg = max(0.25, (float(data.home_goals_avg or 1.3) + float(features.get("away_xga", data.away_conceded_avg) or 1.2)) / 2.0)
|
||
base_away_xg = max(0.25, (float(data.away_goals_avg or 1.3) + float(features.get("home_xga", data.home_conceded_avg) or 1.2)) / 2.0)
|
||
# ms_edge already computed above
|
||
total_target = max(
|
||
1.4,
|
||
min(
|
||
4.8,
|
||
(float(features.get("league_avg_goals", 2.7)) * 0.55)
|
||
+ ((float(data.home_goals_avg or 1.3) + float(data.away_goals_avg or 1.3)) * 0.45)
|
||
+ ((prediction.over_25_prob - prediction.under_25_prob) * 1.15),
|
||
),
|
||
)
|
||
home_xg = max(0.2, base_home_xg + (ms_edge * 0.55) + ((prediction.btts_yes_prob - 0.5) * 0.18))
|
||
away_xg = max(0.2, base_away_xg - (ms_edge * 0.55) + ((prediction.btts_yes_prob - 0.5) * 0.18))
|
||
scale = total_target / max(home_xg + away_xg, 0.1)
|
||
prediction.home_xg = round(home_xg * scale, 2)
|
||
prediction.away_xg = round(away_xg * scale, 2)
|
||
prediction.total_xg = round(prediction.home_xg + prediction.away_xg, 2)
|
||
|
||
# Cup game: reduce xG by 20% — rotation + lower motivation + defensive tactics
|
||
if is_cup:
|
||
prediction.home_xg = round(prediction.home_xg * 0.80, 2)
|
||
prediction.away_xg = round(prediction.away_xg * 0.80, 2)
|
||
prediction.total_xg = round(prediction.home_xg + prediction.away_xg, 2)
|
||
prediction.predicted_ft_score = f"{int(round(prediction.home_xg))}-{int(round(prediction.away_xg))}"
|
||
prediction.predicted_ht_score = f"{int(round(prediction.home_xg * 0.45))}-{int(round(prediction.away_xg * 0.45))}"
|
||
prediction.ft_scores_top5 = self._poisson_score_top5(prediction.home_xg, prediction.away_xg)
|
||
|
||
# Score prediction: find the most likely scoreline consistent with the MS pick
|
||
# Instead of just rounding xG (misleading), filter Poisson top scores by result direction
|
||
ms_pick = prediction.ms_pick # "1", "X", or "2"
|
||
top5 = prediction.ft_scores_top5
|
||
if top5 and ms_pick in ("1", "X", "2"):
|
||
def _result_of(score_str: str) -> str:
|
||
try:
|
||
h, a = map(int, score_str.split("-"))
|
||
if h > a: return "1"
|
||
if h < a: return "2"
|
||
return "X"
|
||
except Exception:
|
||
return "?"
|
||
|
||
# Filter to scorelines matching the predicted result
|
||
matching = [s for s in top5 if _result_of(s["score"]) == ms_pick]
|
||
if matching:
|
||
best = matching[0] # already sorted by probability desc
|
||
h_str, a_str = best["score"].split("-")
|
||
prediction.predicted_ft_score = best["score"]
|
||
# Recalculate HT score proportionally from the FT pick
|
||
h_val, a_val = int(h_str), int(a_str)
|
||
prediction.predicted_ht_score = f"{int(round(h_val * 0.45))}-{int(round(a_val * 0.45))}"
|
||
|
||
max_market_conf = max(
|
||
prediction.ms_confidence,
|
||
prediction.ou15_confidence,
|
||
prediction.ou25_confidence,
|
||
prediction.ou35_confidence,
|
||
prediction.btts_confidence,
|
||
prediction.ht_confidence,
|
||
prediction.cards_confidence,
|
||
prediction.handicap_confidence,
|
||
)
|
||
lineup_conf = max(0.0, min(1.0, float(getattr(data, "lineup_confidence", 0.0) or 0.0)))
|
||
lineup_penalty = 12.0 if data.lineup_source == "none" else max(1.5, (1.0 - lineup_conf) * 8.0) if data.lineup_source == "probable_xi" else 0.0
|
||
referee_penalty = 6.0 if not data.referee_name else 0.0
|
||
parity_penalty = 8.0 if abs(ms_edge) < 0.08 else 0.0
|
||
# Cup game penalty: model trained on league data has lower reliability for cup matches
|
||
cup_penalty = 10.0 if is_cup else 0.0
|
||
# Bookmaker margin penalty: high margin signals that even the market is uncertain
|
||
bm_margin = 0.0
|
||
odds_data = getattr(data, "odds_data", {}) or {}
|
||
_h, _d, _a = float(odds_data.get("ms_h") or 0), float(odds_data.get("ms_d") or 0), float(odds_data.get("ms_a") or 0)
|
||
if _h > 1.01 and _d > 1.01 and _a > 1.01:
|
||
bm_margin = (1 / _h + 1 / _d + 1 / _a) - 1
|
||
bookmaker_penalty = 12.0 if bm_margin > 0.20 else 6.0 if bm_margin > 0.15 else 0.0
|
||
prediction.risk_score = round(min(100.0, max(10.0, 100.0 - max_market_conf + lineup_penalty + referee_penalty + parity_penalty + cup_penalty + bookmaker_penalty)), 1)
|
||
if prediction.risk_score >= 78:
|
||
prediction.risk_level = "EXTREME"
|
||
elif prediction.risk_score >= 62:
|
||
prediction.risk_level = "HIGH"
|
||
elif prediction.risk_score >= 40:
|
||
prediction.risk_level = "MEDIUM"
|
||
else:
|
||
prediction.risk_level = "LOW"
|
||
prediction.is_surprise_risk = prediction.risk_level in {"HIGH", "EXTREME"} or prediction.ms_draw_prob >= 0.30
|
||
prediction.surprise_type = "balanced_match_risk" if abs(ms_edge) < 0.08 else "draw_pressure" if prediction.ms_draw_prob >= 0.30 else ""
|
||
prediction.risk_warnings = []
|
||
if is_cup:
|
||
prediction.risk_warnings.append("cup_game_home_advantage_reduced")
|
||
if bookmaker_penalty > 0:
|
||
prediction.risk_warnings.append(f"bookmaker_margin_high_{bm_margin*100:.0f}pct")
|
||
if data.lineup_source == "probable_xi":
|
||
prediction.risk_warnings.append("lineup_probable_not_confirmed")
|
||
if lineup_conf < 0.65:
|
||
prediction.risk_warnings.append("lineup_projection_low_confidence")
|
||
if data.lineup_source == "none":
|
||
prediction.risk_warnings.append("lineup_unavailable")
|
||
if not data.referee_name:
|
||
prediction.risk_warnings.append("missing_referee")
|
||
if prediction.ms_draw_prob >= 0.30:
|
||
prediction.risk_warnings.append("draw_probability_elevated")
|
||
|
||
prediction.upset_score = int(round(max(0.0, min(100.0, (prediction.ms_draw_prob + min(prediction.ms_home_prob, prediction.ms_away_prob)) * 100.0))))
|
||
prediction.upset_level = "HIGH" if prediction.upset_score >= 65 else "MEDIUM" if prediction.upset_score >= 45 else "LOW"
|
||
prediction.upset_reasons = [prediction.surprise_type] if prediction.surprise_type else []
|
||
surprise = self._build_surprise_profile(data, prediction)
|
||
prediction.surprise_score = surprise["score"]
|
||
prediction.surprise_comment = surprise["comment"]
|
||
prediction.surprise_reasons = surprise["reasons"]
|
||
prediction.surprise_breakdown = surprise.get("breakdown", [])
|
||
# Auto-flag is_surprise_risk when score crosses 45 even if other paths didn't fire
|
||
if surprise["score"] >= 45.0:
|
||
prediction.is_surprise_risk = True
|
||
|
||
prediction.team_confidence = round(max(35.0, min(95.0, 45.0 + (abs(ms_edge) * 85.0) + (abs(float(features.get("form_elo_diff", 0.0))) / 40.0))), 1)
|
||
prediction.player_confidence = round(max(20.0, min(95.0, 38.0 + (float(features.get("home_key_players", 0.0)) + float(features.get("away_key_players", 0.0))) * 2.0 - (float(features.get("home_missing_impact", 0.0)) + float(features.get("away_missing_impact", 0.0))) * 22.0)), 1)
|
||
prediction.odds_confidence = round(max(30.0, min(95.0, float(np.mean([prediction.ms_confidence, prediction.ou25_confidence, prediction.btts_confidence])))), 1)
|
||
prediction.referee_confidence = 62.0 if data.referee_name else 35.0
|
||
|
||
prediction.total_cards_pred = 4.8 if prediction.cards_over_prob >= prediction.cards_under_prob else 4.1
|
||
prediction.total_corners_pred = round(8.8 + (prediction.over_25_prob - 0.5) * 2.5, 1)
|
||
prediction.corner_pick = "9.5 Üst" if prediction.total_corners_pred >= 9.5 else "9.5 Alt"
|
||
prediction.analysis_details = {
|
||
"primary_model": "v25",
|
||
"features_source": "v25.pre_match",
|
||
"market_count": len([key for key in v25_signal.keys() if key != "value_bets"]),
|
||
"lineup_source": data.lineup_source,
|
||
}
|
||
return prediction
|
||
|
||
def _build_engine_breakdown(self, prediction: FullMatchPrediction) -> Dict[str, Any]:
|
||
"""
|
||
Engine breakdown with backward-compatible flat scores + rich detail siblings.
|
||
|
||
Shape:
|
||
{
|
||
team: 74.1, player: 55.7, odds: 55.2, referee: 62.0, # legacy flat scores
|
||
detail: { team: {score, label, ...}, player: {...}, ... }
|
||
}
|
||
"""
|
||
components = {
|
||
"team": ("Takım modeli", float(prediction.team_confidence)),
|
||
"player": ("Oyuncu / kadro modeli", float(prediction.player_confidence)),
|
||
"odds": ("Oran piyasası", float(prediction.odds_confidence)),
|
||
"referee": ("Hakem etkisi", float(prediction.referee_confidence)),
|
||
}
|
||
flat: Dict[str, Any] = {}
|
||
detail: Dict[str, Any] = {}
|
||
for key, (display, raw) in components.items():
|
||
score = round(raw, 1)
|
||
label, interpretation = self._confidence_label(score)
|
||
flat[key] = score
|
||
detail[key] = {
|
||
"score": score,
|
||
"label": label,
|
||
"display_name": display,
|
||
"interpretation": interpretation,
|
||
}
|
||
flat["detail"] = detail
|
||
return flat
|