main
Deploy Iddaai Backend / build-and-deploy (push) Successful in 37s

This commit is contained in:
2026-05-17 02:17:22 +03:00
parent 17ace9bd12
commit 94c7a4481a
53 changed files with 29602 additions and 7832 deletions
@@ -0,0 +1,28 @@
"""Orchestrator package — mixin modules split from the original 5786-line
monolithic SingleMatchOrchestrator. Behaviour is identical to the pre-refactor
version; only file layout has changed.
"""
from services.orchestrator.data_loader import DataLoaderMixin
from services.orchestrator.feature_builder import FeatureBuilderMixin
from services.orchestrator.prediction import PredictionMixin
from services.orchestrator.basketball import BasketballMixin
from services.orchestrator.upper_brain import UpperBrainMixin
from services.orchestrator.htms import HtmsMixin
from services.orchestrator.coupon import CouponMixin
from services.orchestrator.reversal import ReversalMixin
from services.orchestrator.market_board import MarketBoardMixin
from services.orchestrator.utils import UtilsMixin
__all__ = [
"DataLoaderMixin",
"FeatureBuilderMixin",
"PredictionMixin",
"BasketballMixin",
"UpperBrainMixin",
"HtmsMixin",
"CouponMixin",
"ReversalMixin",
"MarketBoardMixin",
"UtilsMixin",
]
@@ -0,0 +1,538 @@
"""Basketball Mixin — basketball-specific market construction.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class BasketballMixin:
def _build_basketball_prediction_package(
self,
data: MatchData,
prediction: Dict[str, Any],
) -> Dict[str, Any]:
quality = self._compute_data_quality(data)
raw_market_rows = self._build_basketball_market_rows(data, prediction)
market_rows = [
self._decorate_basketball_market_row(data, prediction, quality, row)
for row in raw_market_rows
]
market_rows.sort(
key=lambda row: (
1 if row.get("playable") else 0,
float(row.get("play_score", 0.0)),
),
reverse=True,
)
playable_rows = [row for row in market_rows if row.get("playable")]
MIN_ODDS = 1.30
playable_with_odds = [
row for row in playable_rows
if float(row.get("odds", 0.0)) >= MIN_ODDS
]
if playable_with_odds:
playable_with_odds.sort(
key=lambda r: (
float(r.get("ev_edge", 0.0)),
float(r.get("play_score", 0.0)),
),
reverse=True,
)
main_pick = playable_with_odds[0]
main_pick["is_guaranteed"] = False
main_pick["pick_reason"] = "positive_ev_pick"
else:
fallback_with_odds = [r for r in market_rows if float(r.get("odds", 0.0)) > 1.0]
fallback_with_odds.sort(key=lambda r: float(r.get("play_score", 0.0)), reverse=True)
main_pick = fallback_with_odds[0] if fallback_with_odds else (market_rows[0] if market_rows else None)
if main_pick:
main_pick["is_guaranteed"] = False
main_pick["playable"] = False
main_pick["stake_units"] = 0.0
main_pick["bet_grade"] = "PASS"
main_pick["pick_reason"] = "no_playable_value_found"
supporting: List[Dict[str, Any]] = []
for row in market_rows:
if main_pick and row["market"] == main_pick["market"] and row["pick"] == main_pick["pick"]:
continue
supporting.append(row)
supporting = supporting[:5]
bet_summary = [self._to_bet_summary_item(row) for row in market_rows]
scenarios = self._build_basketball_scenarios(prediction)
reasons = self._build_basketball_reasoning_factors(data, prediction, quality)
aggressive_pick: Optional[Dict[str, Any]] = None
risk_level = prediction.get("risk_level", "MEDIUM")
risk_score = float(prediction.get("risk_score", 50.0) or 50.0)
# Build aggressive pick if available from Spreak in market_board
board = prediction.get("market_board", {})
if risk_level in ("LOW", "MEDIUM") and "Spread" in board:
spr_data = board["Spread"]
probs = list(spr_data.values())
keys = list(spr_data.keys())
if len(probs) >= 2:
prob_a = float(str(probs[0]).replace('%', '')) / 100.0
prob_h = float(str(probs[1]).replace('%', '')) / 100.0
max_prob = max(prob_a, prob_h)
spr_pick = "Home" if prob_h >= prob_a else "Away"
conf = 50.0
line_str = "Spread"
for b in prediction.get("bet_summary", []):
if b["market"] == "Spread":
conf = float(b["confidence"])
line_str = b["pick"]
aggressive_pick = {
"market": "SPREAD",
"pick": line_str,
"probability": round(max_prob, 4),
"confidence": round(conf, 1),
"odds": round(
float(
data.odds_data.get(
"spread_h" if spr_pick == "Home" else "spread_a", 0.0
)
),
2,
),
}
scores = prediction.get("score_prediction", {})
home_score = scores.get("home_expected", 80.0)
away_score = scores.get("away_expected", 80.0)
total_score = scores.get("total_expected", 160.0)
mb_out = {
"PLAYER_TOP": board.get("PLAYER_TOP", []),
}
if "ML" in board:
ml_data = board["ML"]
keys = list(ml_data.keys())
if len(keys) >= 2:
mb_out["ML"] = {
"pick": prediction.get("main_pick", ""),
"confidence": 60.0,
"probs": {
"1": round(float(str(ml_data[keys[0]]).replace('%', '')) / 100.0, 4),
"2": round(float(str(ml_data[keys[1]]).replace('%', '')) / 100.0, 4),
},
}
if "Totals" in board:
tot_data = board["Totals"]
keys = list(tot_data.keys())
if len(keys) >= 2:
mb_out["TOTAL"] = {
"line": 160.5,
"pick": prediction.get("main_pick", ""),
"confidence": 60.0,
"probs": {
"under": round(float(str(tot_data[keys[0]]).replace('%', '')) / 100.0, 4),
"over": round(float(str(tot_data[keys[1]]).replace('%', '')) / 100.0, 4),
},
}
if "Spread" in board:
spr_data = board["Spread"]
keys = list(spr_data.keys())
if len(keys) >= 2:
mb_out["SPREAD"] = {
"line_home": 0.0,
"pick": prediction.get("main_pick", ""),
"confidence": 60.0,
"probs": {
"away_cover": round(float(str(spr_data[keys[0]]).replace('%', '')) / 100.0, 4),
"home_cover": round(float(str(spr_data[keys[1]]).replace('%', '')) / 100.0, 4),
},
}
return {
"model_version": str(prediction.get("engine_version") or "v28.main.basketball"),
"match_info": {
"match_id": data.match_id,
"match_name": f"{data.home_team_name} vs {data.away_team_name}",
"home_team": data.home_team_name,
"away_team": data.away_team_name,
"league": data.league_name,
"match_date_ms": data.match_date_ms,
"sport": data.sport,
},
"data_quality": quality,
"risk": {
"level": risk_level,
"score": round(risk_score, 1),
"is_surprise_risk": False,
"surprise_type": "",
"warnings": [],
},
"engine_breakdown": prediction.get("engine_breakdown")
or {
"team": 60.0,
"player": 60.0,
"odds": 80.0,
"referee": 50.0,
},
"main_pick": main_pick,
"bet_advice": {
"playable": bool(main_pick and main_pick.get("playable")),
"suggested_stake_units": float(main_pick.get("stake_units", 0.0))
if (main_pick and main_pick.get("playable"))
else 0.0,
"reason": "playable_pick_found"
if (main_pick and main_pick.get("playable"))
else "no_bet_conditions_met",
},
"bet_summary": bet_summary,
"supporting_picks": supporting,
"aggressive_pick": aggressive_pick,
"scenario_top5": scenarios,
"score_prediction": {
"ft": f"{int(round(home_score))}-{int(round(away_score))}",
"ht": f"{int(round(home_score * 0.52))}-{int(round(away_score * 0.52))}",
"xg_home": round(float(home_score), 2),
"xg_away": round(float(away_score), 2),
"xg_total": round(float(total_score), 2),
},
"market_board": mb_out,
"reasoning_factors": reasons,
}
def _build_basketball_market_rows(
self,
data: MatchData,
pred: Dict[str, Any],
) -> List[Dict[str, Any]]:
odds = data.odds_data
market_board = pred.get("market_board", {})
# 1. Moneyline
ml_row = None
if "ML" in market_board:
ml_data = market_board["ML"]
# To get specific pick (MS 1 or MS 2), look at the probability values
probs = list(ml_data.values())
keys = list(ml_data.keys())
if len(probs) >= 2:
prob_1 = float(str(probs[0]).replace('%', '')) / 100.0
prob_2 = float(str(probs[1]).replace('%', '')) / 100.0
max_prob = max(prob_1, prob_2)
# Derive pick string
ml_pick_val = keys[0] if prob_1 >= prob_2 else keys[1]
ml_pick = "1" if "1" in ml_pick_val else "2"
ml_odd_key = "ml_h" if ml_pick == "1" else "ml_a"
# Find confidence from bet summary
conf = 50.0
for b in pred.get("bet_summary", []):
if b["market"] == "Moneyline": conf = float(b["confidence"])
ml_row = {
"market": "ML",
"pick": ml_pick,
"probability": round(max_prob, 4),
"confidence": round(conf, 1),
"odds": round(float(odds.get(ml_odd_key, 0.0)), 2),
}
# 2. Totals
tot_row = None
if "Totals" in market_board:
tot_data = market_board["Totals"]
probs = list(tot_data.values())
keys = list(tot_data.keys())
if len(probs) >= 2:
prob_u = float(str(probs[0]).replace('%', '')) / 100.0
prob_o = float(str(probs[1]).replace('%', '')) / 100.0
max_prob = max(prob_u, prob_o)
pick_str = keys[1] if prob_o >= prob_u else keys[0]
tot_pick = "Over" if "Over" in pick_str else "Under"
line_val = pick_str.replace("Over", "").replace("Under", "").strip()
conf = 50.0
for b in pred.get("bet_summary", []):
if b["market"] == "Totals": conf = float(b["confidence"])
tot_row = {
"market": "TOTAL",
"pick": f"{tot_pick} {line_val}",
"probability": round(max_prob, 4),
"confidence": round(conf, 1),
"odds": round(float(odds.get("tot_o" if tot_pick == "Over" else "tot_u", 0.0)), 2),
}
# 3. Spread
spr_row = None
if "Spread" in market_board:
spr_data = market_board["Spread"]
probs = list(spr_data.values())
keys = list(spr_data.keys())
if len(probs) >= 2:
prob_a = float(str(probs[0]).replace('%', '')) / 100.0
prob_h = float(str(probs[1]).replace('%', '')) / 100.0
max_prob = max(prob_a, prob_h)
spr_pick = "Home" if prob_h >= prob_a else "Away"
conf = 50.0
line_str = ""
for b in pred.get("bet_summary", []):
if b["market"] == "Spread":
conf = float(b["confidence"])
line_str = b["pick"]
spr_row = {
"market": "SPREAD",
"pick": spr_pick + " " + line_str,
"probability": round(max_prob, 4),
"confidence": round(conf, 1),
"odds": round(float(odds.get("spread_h" if spr_pick == "Home" else "spread_a", 0.0)), 2),
}
# Return valid rows
rows = []
if ml_row: rows.append(ml_row)
if tot_row: rows.append(tot_row)
if spr_row: rows.append(spr_row)
return rows
def _decorate_basketball_market_row(
self,
data: MatchData,
prediction: Dict[str, Any],
quality: Dict[str, Any],
row: Dict[str, Any],
) -> Dict[str, Any]:
market = str(row.get("market") or "")
raw_conf = float(row.get("confidence") or 0.0)
prob = float(row.get("probability") or 0.0)
odd = float(row.get("odds") or 0.0)
calibration = {"ML": 0.90, "TOTAL": 0.88, "SPREAD": 0.86}.get(market, 0.88)
min_conf = {"ML": 55.0, "TOTAL": 56.0, "SPREAD": 55.0}.get(market, 55.0)
calibrated_conf = max(1.0, min(99.0, raw_conf * calibration))
implied_prob = (1.0 / odd) if odd > 1.0 else 0.0
edge = prob - implied_prob if implied_prob > 0 else 0.0
risk_level = str(prediction.get("risk_level", "MEDIUM")).upper()
risk_penalty = {"LOW": 0.0, "MEDIUM": 3.0, "HIGH": 8.0, "EXTREME": 12.0}.get(
risk_level,
4.0,
)
quality_label = str(quality.get("label") or "MEDIUM").upper()
quality_penalty = {"HIGH": 0.0, "MEDIUM": 2.0, "LOW": 6.0}.get(
quality_label,
4.0,
)
base_score = calibrated_conf + (edge * 100.0)
play_score = max(0.0, min(100.0, base_score - risk_penalty - quality_penalty))
reasons: List[str] = []
playable = True
min_play_score = self.market_min_play_score.get(market, 68.0)
min_edge = self.market_min_edge.get(market, 0.02)
if calibrated_conf < min_conf:
playable = False
reasons.append("below_calibrated_conf_threshold")
if market in self.ODDS_REQUIRED_MARKETS and odd <= 1.01:
playable = False
reasons.append("market_odds_missing")
if risk_level in ("HIGH", "EXTREME") and quality_label == "LOW":
playable = False
reasons.append("high_risk_low_data_quality")
if odd > 1.0 and edge < -0.05:
playable = False
reasons.append("negative_model_edge")
if not reasons:
reasons.append("market_passed_all_gates")
if not playable:
grade = "PASS"
stake_units = 0.0
elif play_score >= 72:
grade = "A"
stake_units = 1.0
elif play_score >= 61:
grade = "B"
stake_units = 0.5
else:
grade = "C"
stake_units = 0.25
out = dict(row)
out.update(
{
"raw_confidence": round(raw_conf, 1),
"calibrated_confidence": round(calibrated_conf, 1),
"min_required_confidence": round(min_conf, 1),
"edge": round(edge, 4),
"play_score": round(play_score, 1),
"playable": playable,
"bet_grade": grade,
"stake_units": stake_units,
"decision_reasons": reasons[:3],
},
)
return out
def _build_basketball_scenarios(
self,
prediction: Dict[str, Any],
) -> List[Dict[str, Any]]:
scores = prediction.get("score_prediction", {})
home = float(scores.get("home_expected", 80.0))
away = float(scores.get("away_expected", 80.0))
templates = [
(0.00, 0.23),
(+3.5, 0.20),
(-3.5, 0.19),
(+6.0, 0.16),
(-6.0, 0.14),
]
out: List[Dict[str, Any]] = []
for delta, prob in templates:
h = int(round(home + delta))
a = int(round(away - delta))
out.append({"score": f"{h}-{a}", "prob": prob})
return out
def _build_basketball_reasoning_factors(
self,
data: MatchData,
prediction: Dict[str, Any],
quality: Dict[str, Any],
) -> List[str]:
factors: List[str] = []
# XGBoost models are odds-aware, weight it heavily
factors.append("market_signal_dominant")
if quality.get("label") in ("HIGH", "MEDIUM"):
factors.append("player_form_signal_strong")
else:
factors.append("player_form_signal_limited")
if prediction.get("is_surprise_risk"):
factors.append("upset_risk_detected")
if quality.get("label") == "LOW":
factors.append("limited_data_confidence")
factors.append("basketball_points_model")
return factors
def _compute_basketball_data_quality(self, data: MatchData) -> Dict[str, Any]:
flags: List[str] = []
has_ml = float(data.odds_data.get("ml_h", 0.0)) > 1.0 and float(data.odds_data.get("ml_a", 0.0)) > 1.0
has_total = (
float(data.odds_data.get("tot_line", 0.0)) > 0.0
and float(data.odds_data.get("tot_o", 0.0)) > 1.0
and float(data.odds_data.get("tot_u", 0.0)) > 1.0
)
has_spread = (
"spread_home_line" in data.odds_data
and float(data.odds_data.get("spread_h", 0.0)) > 1.0
and float(data.odds_data.get("spread_a", 0.0)) > 1.0
)
odds_components = [has_ml, has_total, has_spread]
odds_score = sum(1.0 for x in odds_components if x) / 3.0
if not has_ml:
flags.append("missing_moneyline_odds")
if not has_total:
flags.append("missing_total_odds")
if not has_spread:
flags.append("missing_spread_odds")
# Basketball live lineup/referee coverage is structurally lower in this project.
# Keep neutral baseline and rely mostly on odds depth.
lineup_score = 0.7
ref_score = 0.7
total_score = (odds_score * 0.75) + (lineup_score * 0.15) + (ref_score * 0.10)
if total_score >= 0.75:
label = "HIGH"
elif total_score >= 0.52:
label = "MEDIUM"
else:
label = "LOW"
return {
"label": label,
"score": round(total_score, 3),
"home_lineup_count": len(data.home_lineup or []),
"away_lineup_count": len(data.away_lineup or []),
"lineup_source": data.lineup_source,
"flags": flags,
}
+444
View File
@@ -0,0 +1,444 @@
"""Coupon Mixin — multi-match coupon builder + daily bankers.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class CouponMixin:
def build_coupon(
self,
match_ids: List[str],
strategy: str = "BALANCED",
max_matches: Optional[int] = None,
min_confidence: Optional[float] = None,
) -> Dict[str, Any]:
strategy_name = (strategy or "BALANCED").upper()
strategy_config = {
"SAFE": {"max_matches": 4, "min_conf": 66.0},
"BALANCED": {"max_matches": 5, "min_conf": 58.0},
"AGGRESSIVE": {"max_matches": 8, "min_conf": 52.0},
"VALUE": {"max_matches": 8, "min_conf": 48.0},
"MIRACLE": {"max_matches": 10, "min_conf": 44.0},
}
cfg = strategy_config.get(strategy_name, strategy_config["BALANCED"])
max_allowed = max_matches if max_matches is not None else cfg["max_matches"]
min_conf = min_confidence if min_confidence is not None else cfg["min_conf"]
candidates: List[Dict[str, Any]] = []
rejected: List[Dict[str, Any]] = []
for match_id in match_ids:
package = self.analyze_match(match_id)
if not package:
rejected.append({"match_id": match_id, "reason": "match_not_found"})
continue
risk_level = str(package.get("risk", {}).get("level", "MEDIUM")).upper()
data_quality = str(package.get("data_quality", {}).get("label", "MEDIUM")).upper()
match_candidates: List[Dict[str, Any]] = []
seen_keys: Set[Tuple[str, str]] = set()
bet_summary = package.get("bet_summary") or []
raw_picks = []
for candidate in [
package.get("main_pick"),
package.get("value_pick"),
*(package.get("supporting_picks") or []),
]:
if isinstance(candidate, dict):
raw_picks.append(candidate)
for candidate in bet_summary:
if isinstance(candidate, dict):
raw_picks.append(candidate)
for candidate in raw_picks:
market = str(candidate.get("market") or "")
pick = str(candidate.get("pick") or "")
if not market or not pick:
continue
dedupe_key = (market, pick)
if dedupe_key in seen_keys:
continue
seen_keys.add(dedupe_key)
calibrated_conf = float(
candidate.get("calibrated_confidence", candidate.get("confidence", 0.0))
or 0.0
)
odds = float(candidate.get("odds", 0.0) or 0.0)
probability = float(candidate.get("probability", 0.0) or 0.0)
play_score = float(candidate.get("play_score", 0.0) or 0.0)
ev_edge = float(
candidate.get("ev_edge", candidate.get("edge", 0.0)) or 0.0
)
playable = bool(candidate.get("playable"))
bet_grade = str(candidate.get("bet_grade", "PASS")).upper()
if odds <= 1.01:
continue
strict_candidate = (
playable
and calibrated_conf >= min_conf
and bet_grade != "PASS"
)
if strategy_name == "SAFE":
strict_pass = strict_candidate
if odds > 2.35 or play_score < 60.0 or risk_level in {"HIGH", "EXTREME"}:
strict_pass = False
if data_quality == "LOW" or ev_edge < 0.01 or bet_grade == "PASS":
strict_pass = False
strict_score = (
calibrated_conf * 1.10
+ play_score * 0.90
+ (ev_edge * 180.0)
- abs(odds - 1.55) * 12.0
)
soft_pass = (
calibrated_conf >= max(min_conf - 10.0, 56.0)
and odds <= 2.70
and play_score >= 50.0
and risk_level != "EXTREME"
and data_quality != "LOW"
and ev_edge >= -0.01
)
soft_score = (
calibrated_conf
+ play_score * 0.85
+ (ev_edge * 140.0)
- abs(odds - 1.65) * 9.0
)
elif strategy_name == "BALANCED":
strict_pass = strict_candidate
if odds > 3.40 or play_score < 52.0 or risk_level == "EXTREME":
strict_pass = False
if ev_edge < 0.0 or bet_grade == "PASS":
strict_pass = False
strict_score = (
calibrated_conf
+ play_score
+ (ev_edge * 220.0)
+ min(odds, 3.0) * 3.0
)
soft_pass = (
calibrated_conf >= max(min_conf - 10.0, 48.0)
and odds <= 4.20
and play_score >= 44.0
and risk_level != "EXTREME"
and ev_edge >= -0.015
)
soft_score = (
calibrated_conf * 0.95
+ play_score * 0.90
+ (ev_edge * 180.0)
+ min(odds, 3.5) * 3.5
)
elif strategy_name == "AGGRESSIVE":
strict_pass = strict_candidate
if odds < 1.35 or odds > 7.50 or play_score < 46.0:
strict_pass = False
if risk_level == "EXTREME" or bet_grade == "PASS":
strict_pass = False
strict_score = (
calibrated_conf * 0.85
+ play_score * 0.75
+ (ev_edge * 260.0)
+ min(odds, 6.0) * 7.0
)
soft_pass = (
calibrated_conf >= max(min_conf - 10.0, 42.0)
and 1.25 <= odds <= 8.50
and play_score >= 40.0
and risk_level != "EXTREME"
and ev_edge >= -0.02
)
soft_score = (
calibrated_conf * 0.80
+ play_score * 0.70
+ (ev_edge * 210.0)
+ min(odds, 7.0) * 7.5
)
elif strategy_name == "VALUE":
strict_pass = strict_candidate
if odds < 1.55 or play_score < 48.0 or ev_edge < 0.03:
strict_pass = False
if risk_level == "EXTREME" or data_quality == "LOW" or bet_grade == "PASS":
strict_pass = False
strict_score = (
calibrated_conf * 0.75
+ play_score * 0.85
+ (ev_edge * 320.0)
+ min(odds, 6.5) * 8.0
)
soft_pass = (
calibrated_conf >= max(min_conf - 10.0, 40.0)
and odds >= 1.35
and play_score >= 40.0
and risk_level != "EXTREME"
and data_quality != "LOW"
and ev_edge >= 0.0
)
soft_score = (
calibrated_conf * 0.70
+ play_score * 0.80
+ (ev_edge * 260.0)
+ min(odds, 7.0) * 7.0
)
else: # MIRACLE
strict_pass = strict_candidate
if odds < 2.10 or play_score < 40.0 or ev_edge < 0.01:
strict_pass = False
if risk_level == "EXTREME" or bet_grade == "PASS":
strict_pass = False
strict_score = (
calibrated_conf * 0.55
+ play_score * 0.60
+ (ev_edge * 260.0)
+ min(odds, 10.0) * 10.0
)
soft_pass = (
calibrated_conf >= max(min_conf - 10.0, 36.0)
and odds >= 1.60
and play_score >= 34.0
and risk_level != "EXTREME"
and ev_edge >= -0.02
)
soft_score = (
calibrated_conf * 0.50
+ play_score * 0.55
+ (ev_edge * 200.0)
+ min(odds, 10.0) * 9.0
)
fallback_pass = (
calibrated_conf >= max(min_conf - 14.0, 34.0)
and odds >= 1.20
and play_score >= 32.0
and risk_level != "EXTREME"
)
fallback_score = (
calibrated_conf * 0.60
+ play_score * 0.65
+ (ev_edge * 120.0)
+ min(odds, 6.0) * 4.0
)
strategy_score = strict_score
selection_mode = "strict"
if strict_pass:
pass
elif soft_pass:
strategy_score = soft_score
selection_mode = "soft"
elif fallback_pass:
strategy_score = fallback_score
selection_mode = "fallback"
else:
continue
match_candidates.append(
{
"match_id": package["match_info"]["match_id"],
"match_name": package["match_info"]["match_name"],
"market": market,
"pick": pick,
"probability": probability,
"confidence": calibrated_conf,
"odds": odds,
"risk_level": risk_level,
"data_quality": data_quality,
"bet_grade": bet_grade,
"playable": playable,
"play_score": round(play_score, 1),
"ev_edge": round(ev_edge, 4),
"selection_mode": selection_mode,
"strategy_score": round(strategy_score, 3),
}
)
if not match_candidates:
rejected.append(
{
"match_id": match_id,
"reason": "no_strategy_fit",
"threshold": min_conf,
}
)
continue
match_candidates.sort(
key=lambda item: (
float(item.get("strategy_score", 0.0)),
float(item.get("confidence", 0.0)),
float(item.get("ev_edge", 0.0)),
),
reverse=True,
)
candidates.append(match_candidates[0])
candidates.sort(
key=lambda item: (
float(item.get("strategy_score", 0.0)),
float(item.get("confidence", 0.0)),
float(item.get("ev_edge", 0.0)),
),
reverse=True,
)
selected = candidates[: max(1, max_allowed)]
total_odds = 1.0
win_probability = 1.0
for pick in selected:
odd = float(pick.get("odds") or 1.0)
prob = float(pick.get("probability") or 0.0)
total_odds *= odd if odd > 1.0 else 1.0
win_probability *= prob
return {
"strategy": strategy_name,
"generated_at": __import__("datetime").datetime.utcnow().isoformat() + "Z",
"match_count": len(selected),
"bets": selected,
"total_odds": round(total_odds, 2),
"expected_win_rate": round(win_probability, 4),
"rejected_matches": rejected,
}
def get_daily_bankers_live(self, count: int = 3) -> List[Dict[str, Any]]:
with psycopg2.connect(self.dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT id
FROM live_matches
WHERE mst_utc > EXTRACT(EPOCH FROM NOW()) * 1000
AND mst_utc < EXTRACT(EPOCH FROM NOW() + INTERVAL '24 hours') * 1000
ORDER BY mst_utc ASC
LIMIT 60
""",
)
ids = [row["id"] for row in cur.fetchall()]
if not ids:
return []
coupon = self.build_coupon(
match_ids=ids,
strategy="SAFE",
max_matches=max(1, count),
min_confidence=78.0,
)
return coupon.get("bets", [])[: max(1, count)]
def get_daily_bankers(self, count: int = 3) -> List[Dict[str, Any]]:
"""
Identifies the safest, highest value bets for the next 24 hours.
"""
now_ms = int(time.time() * 1000)
horizon_ms = now_ms + (24 * 60 * 60 * 1000)
with psycopg2.connect(self.dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT m.id, m.match_name, m.mst_utc
FROM matches m
WHERE m.mst_utc >= %s AND m.mst_utc <= %s
AND m.status = 'NS'
AND EXISTS (SELECT 1 FROM odd_categories oc WHERE oc.match_id = m.id)
ORDER BY m.mst_utc ASC
LIMIT 50
""", (now_ms, horizon_ms))
matches = cur.fetchall()
potential_bankers = []
print(f"🔍 Scanning {len(matches)} upcoming matches for Bankers...")
for match in matches:
try:
data = self._load_match_data(match['id'])
if data is None: continue
result = self.analyze_match(match['id'])
if result and 'main_pick' in result:
pick = result['main_pick']
conf = pick.get('calibrated_confidence', pick.get('confidence', 0))
odds = pick.get('odds', 0)
market = pick.get('market', '')
pick_name = pick.get('pick', '')
# Banker Criteria: High Confidence (>75%) AND Decent Odds (>1.30)
if conf >= 75.0 and odds >= 1.30:
score = conf * (odds - 1.0)
potential_bankers.append({
"match_id": match['id'],
"match_name": match['match_name'] or f"{data.home_team_name} vs {data.away_team_name}",
"league": data.league_name,
"pick": f"{market} - {pick_name}",
"confidence": conf,
"odds": odds,
"value_score": score
})
except Exception:
pass
potential_bankers.sort(key=lambda x: x['value_score'], reverse=True)
return potential_bankers[:count]
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,498 @@
"""Feature Builder Mixin — V25/V28 feature vector assembly.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from features.upset_engine import get_upset_engine
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class FeatureBuilderMixin:
def _build_v25_features(self, data: MatchData) -> Dict[str, float]:
"""
Build the single authoritative V25 pre-match feature vector.
"""
odds = self._sanitize_v25_odds(data.odds_data or {})
ms_h = float(odds.get('ms_h') or 0)
ms_d = float(odds.get('ms_d') or 0)
ms_a = float(odds.get('ms_a') or 0)
# Implied probabilities (vig-normalised)
implied_home, implied_draw, implied_away = 0.33, 0.33, 0.33
if ms_h > 0 and ms_d > 0 and ms_a > 0:
raw_sum = 1 / ms_h + 1 / ms_d + 1 / ms_a
implied_home = (1 / ms_h) / raw_sum
implied_draw = (1 / ms_d) / raw_sum
implied_away = (1 / ms_a) / raw_sum
upset_potential = max(
0.0,
min(
1.0,
1.0 - abs(implied_home - implied_away) + (implied_draw * 0.35),
),
)
# All enrichment queries in a single DB connection
home_elo, away_elo = 1500.0, 1500.0
home_venue_elo, away_venue_elo = 1500.0, 1500.0
home_form_elo_val, away_form_elo_val = 1500.0, 1500.0
enr = self.enrichment
# Defaults — overridden by successful queries
home_stats = dict(enr._DEFAULT_TEAM_STATS)
away_stats = dict(enr._DEFAULT_TEAM_STATS)
h2h = dict(enr._DEFAULT_H2H)
home_form = dict(enr._DEFAULT_FORM)
away_form = dict(enr._DEFAULT_FORM)
ref = dict(enr._DEFAULT_REFEREE)
league = dict(enr._DEFAULT_LEAGUE)
home_momentum, away_momentum = 0.0, 0.0
home_rolling = dict(enr._DEFAULT_ROLLING)
away_rolling = dict(enr._DEFAULT_ROLLING)
home_venue = dict(enr._DEFAULT_VENUE)
away_venue = dict(enr._DEFAULT_VENUE)
home_rest, away_rest = 7.0, 7.0
odds_band_features = {}
enrichment_failures = []
try:
with psycopg2.connect(self.dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# ELO
try:
cur.execute(
"SELECT home_elo, away_elo, "
" home_home_elo, away_away_elo, "
" home_form_elo, away_form_elo "
"FROM football_ai_features "
"WHERE match_id = %s LIMIT 1",
(data.match_id,),
)
elo_row = cur.fetchone()
if elo_row:
home_elo = float(elo_row.get('home_elo') or 1500.0)
away_elo = float(elo_row.get('away_elo') or 1500.0)
home_venue_elo = float(elo_row.get('home_home_elo') or home_elo)
away_venue_elo = float(elo_row.get('away_away_elo') or away_elo)
home_form_elo_val = float(elo_row.get('home_form_elo') or home_elo)
away_form_elo_val = float(elo_row.get('away_form_elo') or away_elo)
else:
cur.execute(
"SELECT team_id, overall_elo, home_elo, away_elo, form_elo "
"FROM team_elo_ratings WHERE team_id IN (%s, %s)",
(data.home_team_id, data.away_team_id),
)
by_team = {str(r.get("team_id")): r for r in cur.fetchall()}
home_row = by_team.get(str(data.home_team_id))
away_row = by_team.get(str(data.away_team_id))
if home_row:
home_elo = float(home_row.get("overall_elo") or 1500.0)
home_venue_elo = float(home_row.get("home_elo") or home_elo)
home_form_elo_val = float(home_row.get("form_elo") or home_elo)
if away_row:
away_elo = float(away_row.get("overall_elo") or 1500.0)
away_venue_elo = float(away_row.get("away_elo") or away_elo)
away_form_elo_val = float(away_row.get("form_elo") or away_elo)
setattr(data, "feature_source", "football_ai_features" if elo_row else "live_prematch_enrichment")
# Staleness check: both teams at exact 1500 → ELO was never computed
if home_elo == 1500.0 and away_elo == 1500.0:
enrichment_failures.append("elo_stale:both_teams_at_default_1500")
except Exception as e:
enrichment_failures.append(f"elo:{e}")
setattr(data, "feature_source", "fallback_defaults")
# Team stats
try:
home_stats = enr.compute_team_stats(cur, data.home_team_id, data.match_date_ms)
away_stats = enr.compute_team_stats(cur, data.away_team_id, data.match_date_ms)
except Exception as e:
enrichment_failures.append(f"team_stats:{e}")
# H2H
try:
h2h = enr.compute_h2h(cur, data.home_team_id, data.away_team_id, data.match_date_ms)
except Exception as e:
enrichment_failures.append(f"h2h:{e}")
# Form
try:
home_form = enr.compute_form_streaks(cur, data.home_team_id, data.match_date_ms)
away_form = enr.compute_form_streaks(cur, data.away_team_id, data.match_date_ms)
except Exception as e:
enrichment_failures.append(f"form:{e}")
# Referee
try:
ref = enr.compute_referee_stats(cur, data.referee_name, data.match_date_ms)
except Exception as e:
enrichment_failures.append(f"referee:{e}")
# League
try:
league = enr.compute_league_averages(cur, data.league_id, data.match_date_ms)
except Exception as e:
enrichment_failures.append(f"league:{e}")
# Momentum
try:
home_momentum = enr.compute_momentum(cur, data.home_team_id, data.match_date_ms)
away_momentum = enr.compute_momentum(cur, data.away_team_id, data.match_date_ms)
except Exception as e:
enrichment_failures.append(f"momentum:{e}")
# V27 Rolling + Venue + Rest
try:
home_rolling = enr.compute_rolling_stats(cur, data.home_team_id, data.match_date_ms)
away_rolling = enr.compute_rolling_stats(cur, data.away_team_id, data.match_date_ms)
home_venue = enr.compute_venue_stats(cur, data.home_team_id, data.match_date_ms, is_home=True)
away_venue = enr.compute_venue_stats(cur, data.away_team_id, data.match_date_ms, is_home=False)
home_rest = enr.compute_days_rest(cur, data.home_team_id, data.match_date_ms)
away_rest = enr.compute_days_rest(cur, data.away_team_id, data.match_date_ms)
except Exception as e:
enrichment_failures.append(f"rolling/venue:{e}")
# V28 Odds-Band
try:
odds_band_features = self.odds_band_analyzer.compute_all(
cur=cur,
home_team_id=data.home_team_id,
away_team_id=data.away_team_id,
league_id=data.league_id,
odds=odds,
before_ts=data.match_date_ms,
referee_name=data.referee_name,
)
except Exception as e:
enrichment_failures.append(f"odds_band:{e}")
except Exception as e:
enrichment_failures.append(f"db_connection:{e}")
setattr(data, "feature_source", "fallback_defaults")
setattr(data, "odds_band_features", odds_band_features)
if enrichment_failures:
print(f"⚠️ Enrichment partial failures for {data.match_id}: {', '.join(enrichment_failures)}")
# Upset engine features
upset_atmosphere, upset_motivation, upset_fatigue = 0.0, 0.0, 0.0
try:
upset_engine = get_upset_engine()
upset_feats = upset_engine.get_features(
home_team_name=getattr(data, 'home_team_name', '') or '',
home_team_id=data.home_team_id,
away_team_name=getattr(data, 'away_team_name', '') or '',
league_name=getattr(data, 'league_name', '') or '',
home_position=10,
away_position=10,
match_date_ms=data.match_date_ms,
home_days_rest=int(home_rest),
away_days_rest=int(away_rest),
)
upset_atmosphere = upset_feats.get('upset_atmosphere', 0.0)
upset_motivation = upset_feats.get('upset_motivation', 0.0)
upset_fatigue = upset_feats.get('upset_fatigue', 0.0)
except Exception as e:
print(f"⚠️ Upset engine failed: {e}")
odds_presence = {
'odds_ms_h_present': 1.0 if ms_h > 1.01 else 0.0,
'odds_ms_d_present': 1.0 if ms_d > 1.01 else 0.0,
'odds_ms_a_present': 1.0 if ms_a > 1.01 else 0.0,
'odds_ht_ms_h_present': 1.0 if float(odds.get('ht_h') or 0) > 1.01 else 0.0,
'odds_ht_ms_d_present': 1.0 if float(odds.get('ht_d') or 0) > 1.01 else 0.0,
'odds_ht_ms_a_present': 1.0 if float(odds.get('ht_a') or 0) > 1.01 else 0.0,
'odds_ou05_o_present': 1.0 if float(odds.get('ou05_o') or 0) > 1.01 else 0.0,
'odds_ou05_u_present': 1.0 if float(odds.get('ou05_u') or 0) > 1.01 else 0.0,
'odds_ou15_o_present': 1.0 if float(odds.get('ou15_o') or 0) > 1.01 else 0.0,
'odds_ou15_u_present': 1.0 if float(odds.get('ou15_u') or 0) > 1.01 else 0.0,
'odds_ou25_o_present': 1.0 if float(odds.get('ou25_o') or 0) > 1.01 else 0.0,
'odds_ou25_u_present': 1.0 if float(odds.get('ou25_u') or 0) > 1.01 else 0.0,
'odds_ou35_o_present': 1.0 if float(odds.get('ou35_o') or 0) > 1.01 else 0.0,
'odds_ou35_u_present': 1.0 if float(odds.get('ou35_u') or 0) > 1.01 else 0.0,
'odds_ht_ou05_o_present': 1.0 if float(odds.get('ht_ou05_o') or 0) > 1.01 else 0.0,
'odds_ht_ou05_u_present': 1.0 if float(odds.get('ht_ou05_u') or 0) > 1.01 else 0.0,
'odds_ht_ou15_o_present': 1.0 if float(odds.get('ht_ou15_o') or 0) > 1.01 else 0.0,
'odds_ht_ou15_u_present': 1.0 if float(odds.get('ht_ou15_u') or 0) > 1.01 else 0.0,
'odds_btts_y_present': 1.0 if float(odds.get('btts_y') or 0) > 1.01 else 0.0,
'odds_btts_n_present': 1.0 if float(odds.get('btts_n') or 0) > 1.01 else 0.0,
}
# ── Calendar features (V27) ──
import datetime
match_dt = datetime.datetime.utcfromtimestamp(data.match_date_ms / 1000)
match_month = match_dt.month
is_season_start = 1.0 if match_month in (7, 8, 9) else 0.0
is_season_end = 1.0 if match_month in (5, 6) else 0.0
# ── Cup game detection: dampen home advantage in feature space ──
_league_name = (getattr(data, 'league_name', '') or '').lower()
_cup_keywords = ("kupa", "cup", "coupe", "copa", "coppa", "pokal",
"trophy", "shield", "ziraat", "süper kupa", "super cup")
_is_cup = any(kw in _league_name for kw in _cup_keywords)
# ── Derived / Interaction features (V27) ──
# Cup games: home ELO advantage is ~30% weaker (rotation, lower motivation)
elo_diff = (home_elo - away_elo) * (0.70 if _is_cup else 1.0)
form_elo_diff = home_form_elo_val - away_form_elo_val
attack_vs_defense_home = data.home_goals_avg - data.away_conceded_avg
attack_vs_defense_away = data.away_goals_avg - data.home_conceded_avg
xga_home = data.home_conceded_avg
xga_away = data.away_conceded_avg
xg_diff = xga_home - xga_away
mom_diff = home_momentum - away_momentum
form_momentum_interaction = mom_diff * form_elo_diff / 1000.0
elo_form_consistency = 1.0 - abs(elo_diff - form_elo_diff) / max(abs(elo_diff), 100.0)
upset_x_elo_gap = upset_potential * abs(elo_diff) / 500.0
return {
# META (1)
'mst_utc': float(data.match_date_ms),
# ELO (8)
'home_overall_elo': home_elo,
'away_overall_elo': away_elo,
'elo_diff': elo_diff,
'home_home_elo': home_venue_elo,
'away_away_elo': away_venue_elo,
'home_form_elo': home_form_elo_val,
'away_form_elo': away_form_elo_val,
'form_elo_diff': form_elo_diff,
# Form (12)
'home_goals_avg': data.home_goals_avg,
'home_conceded_avg': data.home_conceded_avg,
'away_goals_avg': data.away_goals_avg,
'away_conceded_avg': data.away_conceded_avg,
'home_clean_sheet_rate': home_form['clean_sheet_rate'],
'away_clean_sheet_rate': away_form['clean_sheet_rate'],
'home_scoring_rate': home_form['scoring_rate'],
'away_scoring_rate': away_form['scoring_rate'],
'home_winning_streak': home_form['winning_streak'],
'away_winning_streak': away_form['winning_streak'],
'home_unbeaten_streak': home_form['unbeaten_streak'],
'away_unbeaten_streak': away_form['unbeaten_streak'],
# H2H (10 — original 6 + V27 expanded 4)
'h2h_total_matches': h2h['total_matches'],
'h2h_home_win_rate': h2h['home_win_rate'],
'h2h_draw_rate': h2h['draw_rate'],
'h2h_avg_goals': h2h['avg_goals'],
'h2h_btts_rate': h2h['btts_rate'],
'h2h_over25_rate': h2h['over25_rate'],
'h2h_home_goals_avg': h2h['home_goals_avg'],
'h2h_away_goals_avg': h2h['away_goals_avg'],
'h2h_recent_trend': h2h['recent_trend'],
'h2h_venue_advantage': h2h['venue_advantage'],
# Stats (8)
'home_avg_possession': home_stats['avg_possession'],
'away_avg_possession': away_stats['avg_possession'],
'home_avg_shots_on_target': home_stats['avg_shots_on_target'],
'away_avg_shots_on_target': away_stats['avg_shots_on_target'],
'home_shot_conversion': home_stats['shot_conversion'],
'away_shot_conversion': away_stats['shot_conversion'],
'home_avg_corners': home_stats['avg_corners'],
'away_avg_corners': away_stats['avg_corners'],
# Odds (24)
'odds_ms_h': ms_h,
'odds_ms_d': ms_d,
'odds_ms_a': ms_a,
'implied_home': implied_home,
'implied_draw': implied_draw,
'implied_away': implied_away,
'odds_ht_ms_h': float(odds.get('ht_h') or 0),
'odds_ht_ms_d': float(odds.get('ht_d') or 0),
'odds_ht_ms_a': float(odds.get('ht_a') or 0),
'odds_ou05_o': float(odds.get('ou05_o') or 0),
'odds_ou05_u': float(odds.get('ou05_u') or 0),
'odds_ou15_o': float(odds.get('ou15_o') or 0),
'odds_ou15_u': float(odds.get('ou15_u') or 0),
'odds_ou25_o': float(odds.get('ou25_o') or 0),
'odds_ou25_u': float(odds.get('ou25_u') or 0),
'odds_ou35_o': float(odds.get('ou35_o') or 0),
'odds_ou35_u': float(odds.get('ou35_u') or 0),
'odds_ht_ou05_o': float(odds.get('ht_ou05_o') or 0),
'odds_ht_ou05_u': float(odds.get('ht_ou05_u') or 0),
'odds_ht_ou15_o': float(odds.get('ht_ou15_o') or 0),
'odds_ht_ou15_u': float(odds.get('ht_ou15_u') or 0),
'odds_btts_y': float(odds.get('btts_y') or 0),
'odds_btts_n': float(odds.get('btts_n') or 0),
**odds_presence,
# League (9 — original 2 + V27 expanded 5 + xga 2)
'home_xga': xga_home,
'away_xga': xga_away,
'league_avg_goals': league['avg_goals'],
'league_zero_goal_rate': league['zero_goal_rate'],
'league_home_win_rate': league['home_win_rate'],
'league_draw_rate': league['draw_rate'],
'league_btts_rate': league['btts_rate'],
'league_ou25_rate': league['ou25_rate'],
'league_reliability_score': league['reliability_score'],
# Upset (4)
'upset_atmosphere': upset_atmosphere,
'upset_motivation': upset_motivation,
'upset_fatigue': upset_fatigue,
'upset_potential': upset_potential,
# Referee (5)
'referee_home_bias': ref['home_bias'],
'referee_avg_goals': ref['avg_goals'],
'referee_cards_total': ref['cards_total'],
'referee_avg_yellow': ref['avg_yellow'],
'referee_experience': ref['experience'],
# Momentum (3)
'home_momentum_score': home_momentum,
'away_momentum_score': away_momentum,
'momentum_diff': mom_diff,
# ── V27 Rolling Stats (13) ──
'home_rolling5_goals': home_rolling['rolling5_goals'],
'home_rolling5_conceded': home_rolling['rolling5_conceded'],
'home_rolling10_goals': home_rolling['rolling10_goals'],
'home_rolling10_conceded': home_rolling['rolling10_conceded'],
'home_rolling20_goals': home_rolling['rolling20_goals'],
'home_rolling20_conceded': home_rolling['rolling20_conceded'],
'away_rolling5_goals': away_rolling['rolling5_goals'],
'away_rolling5_conceded': away_rolling['rolling5_conceded'],
'away_rolling10_goals': away_rolling['rolling10_goals'],
'away_rolling10_conceded': away_rolling['rolling10_conceded'],
'home_rolling5_cs': home_rolling['rolling5_cs'],
'away_rolling5_cs': away_rolling['rolling5_cs'],
# ── V27 Venue Stats (4) ──
'home_venue_goals': home_venue['venue_goals'],
'home_venue_conceded': home_venue['venue_conceded'],
'away_venue_goals': away_venue['venue_goals'],
'away_venue_conceded': away_venue['venue_conceded'],
# ── V27 Goal Trend (2) ──
'home_goal_trend': home_rolling['rolling5_goals'] - home_rolling['rolling10_goals'],
'away_goal_trend': away_rolling['rolling5_goals'] - away_rolling['rolling10_goals'],
# ── V27 Calendar (4) ──
'home_days_rest': home_rest,
'away_days_rest': away_rest,
'match_month': float(match_month),
'is_season_start': is_season_start,
'is_season_end': is_season_end,
# ── V27 Interaction (6) ──
'attack_vs_defense_home': attack_vs_defense_home,
'attack_vs_defense_away': attack_vs_defense_away,
'xg_diff': xg_diff,
'form_momentum_interaction': form_momentum_interaction,
'elo_form_consistency': elo_form_consistency,
'upset_x_elo_gap': upset_x_elo_gap,
# Squad Features (9) — PlayerPredictorEngine
**self._get_squad_features(data),
# V28 Odds-Band Historical Performance Features
**odds_band_features,
}
def _get_squad_features(self, data: MatchData) -> Dict[str, float]:
"""Non-fatal squad analysis with 12 player-level features."""
defaults = {
'home_squad_quality': 12.0, 'away_squad_quality': 12.0, 'squad_diff': 0.0,
'home_key_players': 3.0, 'away_key_players': 3.0,
'home_missing_impact': 0.0, 'away_missing_impact': 0.0,
'home_goals_form': 1.3, 'away_goals_form': 1.3,
'home_lineup_goals_per90': 0.0, 'away_lineup_goals_per90': 0.0,
'home_lineup_assists_per90': 0.0, 'away_lineup_assists_per90': 0.0,
'home_squad_continuity': 0.5, 'away_squad_continuity': 0.5,
'home_top_scorer_form': 0.0, 'away_top_scorer_form': 0.0,
'home_avg_player_exp': 0.0, 'away_avg_player_exp': 0.0,
'home_goals_diversity': 0.0, 'away_goals_diversity': 0.0,
}
try:
engine = get_player_predictor()
pred = engine.predict(
match_id=data.match_id,
home_team_id=data.home_team_id,
away_team_id=data.away_team_id,
home_lineup=data.home_lineup,
away_lineup=data.away_lineup,
sidelined_data=data.sidelined_data,
)
result = {
'home_squad_quality': float(pred.home_squad_quality or 0.0),
'away_squad_quality': float(pred.away_squad_quality or 0.0),
'squad_diff': float(pred.squad_diff or 0.0),
'home_key_players': float(pred.home_key_players or 0),
'away_key_players': float(pred.away_key_players or 0),
'home_missing_impact': float(pred.home_missing_impact or 0.0),
'away_missing_impact': float(pred.away_missing_impact or 0.0),
'home_goals_form': float(pred.home_goals_form or 0.0),
'away_goals_form': float(pred.away_goals_form or 0.0),
'home_lineup_goals_per90': float(pred.home_lineup_goals_per90 or 0.0),
'away_lineup_goals_per90': float(pred.away_lineup_goals_per90 or 0.0),
'home_lineup_assists_per90': float(pred.home_lineup_assists_per90 or 0.0),
'away_lineup_assists_per90': float(pred.away_lineup_assists_per90 or 0.0),
'home_squad_continuity': float(pred.home_squad_continuity or 0.5),
'away_squad_continuity': float(pred.away_squad_continuity or 0.5),
'home_top_scorer_form': float(pred.home_top_scorer_form or 0),
'away_top_scorer_form': float(pred.away_top_scorer_form or 0),
'home_avg_player_exp': float(pred.home_avg_player_exp or 0.0),
'away_avg_player_exp': float(pred.away_avg_player_exp or 0.0),
'home_goals_diversity': float(pred.home_goals_diversity or 0.0),
'away_goals_diversity': float(pred.away_goals_diversity or 0.0),
}
for side in ('home', 'away'):
sq = result[f'{side}_squad_quality']
if sq > 50 or sq < 0:
print(f"🚨 SCALE MISMATCH: {side}_squad_quality={sq:.1f} "
f"(expected 3-36). Check player_predictor formula!")
return result
except Exception as e:
print(f"⚠️ Squad features failed: {e}")
return defaults
def _sanitize_v25_odds(self, odds_data: Dict[str, Any]) -> Dict[str, float]:
sanitized: Dict[str, float] = {}
for key in self.V25_ODDS_FEATURE_KEYS:
sanitized[key] = self._real_market_odds(odds_data, key)
for key in ("dc_1x", "dc_x2", "dc_12", "oe_odd", "oe_even", "cards_o", "cards_u", "hcap_h", "hcap_d", "hcap_a"):
if key in odds_data:
sanitized[key] = self._real_market_odds(odds_data, key)
return sanitized
+231
View File
@@ -0,0 +1,231 @@
"""HT/MS Mixin — analyze_match_htms endpoint and helpers.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class HtmsMixin:
def analyze_match_htms(self, match_id: str) -> Optional[Dict[str, Any]]:
"""
HT/MS focused response for upset-hunting workflows.
This endpoint is intentionally additive and does not mutate the
standard /v20plus/analyze package contract.
"""
data = self._load_match_data(match_id)
if data is None:
return None
if str(data.sport or "").lower() != "football":
return {
"status": "skip",
"match_id": match_id,
"reason": "unsupported_sport",
"engine_used": "htms_router",
}
is_top_league = self._is_top_league(data.league_id)
engine_used = "v20plus_top_htms"
# Hard gate: HT/MS upset model is trained on top leagues only.
if not is_top_league:
return {
"status": "skip",
"match_id": match_id,
"reason": "out_of_training_scope",
"engine_used": engine_used,
"data_quality": {
"label": "LOW",
"flags": ["league_out_of_scope"],
},
}
missing_requirements = self._missing_htms_requirements(data)
if missing_requirements:
return {
"status": "skip",
"match_id": match_id,
"reason": "missing_critical_data",
"missing": missing_requirements,
"engine_used": engine_used,
"data_quality": {
"label": "LOW",
"flags": [f"missing_{item}" for item in missing_requirements],
},
}
base_package = self.analyze_match(match_id)
if not base_package:
return None
data_quality = base_package.get("data_quality", {})
market_board = base_package.get("market_board", {})
ms_market = market_board.get("MS", {})
ht_market = market_board.get("HT", {})
htft_probs = market_board.get("HTFT", {}).get("probs", {})
reversal_probs = {
"1/2": float(htft_probs.get("1/2", 0.0)),
"2/1": float(htft_probs.get("2/1", 0.0)),
"X/1": float(htft_probs.get("X/1", 0.0)),
"X/2": float(htft_probs.get("X/2", 0.0)),
}
top_reversal = max(reversal_probs.items(), key=lambda item: item[1])
ms_conf = float(ms_market.get("confidence", 0.0))
ht_conf = float(ht_market.get("confidence", 0.0))
base_conf = (ms_conf + ht_conf) / 2.0
confidence_cap = 100.0
penalties: List[str] = []
if data.lineup_source == "probable_xi":
confidence_cap = min(confidence_cap, 72.0)
penalties.append("lineup_probable_xi")
if data.lineup_source == "none":
confidence_cap = min(confidence_cap, 58.0)
penalties.append("lineup_unavailable")
if str(data_quality.get("label", "LOW")).upper() == "LOW":
confidence_cap = min(confidence_cap, 55.0)
penalties.append("low_data_quality")
final_conf = min(base_conf, confidence_cap)
upset_score = self._compute_htms_upset_score(
reversal_probs=reversal_probs,
odds_data=data.odds_data,
is_top_league=is_top_league,
)
upset_threshold = 58.0 if is_top_league else 54.0
upset_playable = (
upset_score >= upset_threshold
and top_reversal[1] >= 0.045
and final_conf >= 45.0
and "low_data_quality" not in penalties
)
return {
"status": "ok",
"engine_used": engine_used,
"match_info": base_package.get("match_info", {}),
"data_quality": data_quality,
"htms_core": {
"ms_pick": ms_market.get("pick"),
"ms_confidence": round(ms_conf, 1),
"ht_pick": ht_market.get("pick"),
"ht_confidence": round(ht_conf, 1),
"combined_confidence": round(final_conf, 1),
"confidence_cap": round(confidence_cap, 1),
"penalties": penalties,
},
"surprise_hunter": {
"upset_score": round(upset_score, 1),
"threshold": upset_threshold,
"playable": upset_playable,
"top_reversal_pick": top_reversal[0],
"top_reversal_prob": round(top_reversal[1], 4),
"reversal_probs": {
key: round(value, 4) for key, value in reversal_probs.items()
},
},
"risk": base_package.get("risk", {}),
"reasoning_factors": base_package.get("reasoning_factors", []),
}
def _is_top_league(self, league_id: Optional[str]) -> bool:
if not league_id:
return False
return str(league_id) in self.top_league_ids
def _missing_htms_requirements(self, data: MatchData) -> List[str]:
missing: List[str] = []
ms_keys = ("ms_h", "ms_d", "ms_a")
ht_keys = ("ht_h", "ht_d", "ht_a")
if not all(float(data.odds_data.get(k, 0.0) or 0.0) > 1.0 for k in ms_keys):
missing.append("ms_odds")
if not all(float(data.odds_data.get(k, 0.0) or 0.0) > 1.0 for k in ht_keys):
missing.append("ht_odds")
return missing
def _compute_htms_upset_score(
self,
reversal_probs: Dict[str, float],
odds_data: Dict[str, float],
is_top_league: bool,
) -> float:
ms_h = self._to_float(odds_data.get("ms_h"), 0.0)
ms_a = self._to_float(odds_data.get("ms_a"), 0.0)
if ms_h <= 1.0 or ms_a <= 1.0:
favorite_gap = 0.0
else:
favorite_gap = abs(ms_h - ms_a)
reversal_max = max(reversal_probs.values()) if reversal_probs else 0.0
reversal_sum = sum(reversal_probs.values())
# Strong favorite + reversal probability is the core upset signal.
gap_factor = min(1.0, favorite_gap / 2.0)
score = (
(reversal_max * 100.0 * 0.60)
+ (reversal_sum * 100.0 * 0.25)
+ (gap_factor * 100.0 * 0.15)
)
if not is_top_league:
# Non-top leagues are noisier; keep it slightly conservative.
score *= 0.92
return max(0.0, min(100.0, score))
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,662 @@
"""Prediction Mixin — V25 signal extraction and prediction building.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default, get_config
from models.calibration import get_calibrator
from models.league_model import get_league_model_loader, FILE_TO_SIGNAL
class PredictionMixin:
def _get_score_model(self) -> Optional[Dict]:
"""Load XGBoost score prediction model (non-fatal)."""
if hasattr(self, "_score_model_cache"):
return self._score_model_cache
score_model_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"models", "xgb_score.pkl",
)
try:
if os.path.exists(score_model_path):
with open(score_model_path, "rb") as f:
model_data = pickle.load(f)
if all(k in model_data for k in ("home_model", "away_model", "ht_home_model", "ht_away_model", "features")):
self._score_model_cache = model_data
print(f"[SCORE] ✅ Score model loaded ({len(model_data['features'])} features)")
return self._score_model_cache
except Exception as e:
print(f"[SCORE] ⚠ Load failed (non-fatal, using heuristic): {e}")
self._score_model_cache = None
return None
def _predict_score_with_model(self, features: Dict[str, float]) -> Optional[Dict[str, float]]:
"""Predict FT/HT scores using XGBoost score model."""
score_model = self._get_score_model()
if score_model is None:
return None
try:
import pandas as _pd
model_features = score_model["features"]
row = {f: float(features.get(f, 0)) for f in model_features}
df = _pd.DataFrame([row])
ft_home = max(0.0, float(score_model["home_model"].predict(df)[0]))
ft_away = max(0.0, float(score_model["away_model"].predict(df)[0]))
ht_home = max(0.0, float(score_model["ht_home_model"].predict(df)[0]))
ht_away = max(0.0, float(score_model["ht_away_model"].predict(df)[0]))
return {
"ft_home": round(ft_home, 2),
"ft_away": round(ft_away, 2),
"ht_home": round(ht_home, 2),
"ht_away": round(ht_away, 2),
}
except Exception as e:
print(f"[SCORE] ⚠ Prediction error (fallback to heuristic): {e}")
return None
_V25_KEY_MAP = {
"ms": "MS",
"ou15": "OU15",
"ou25": "OU25",
"ou35": "OU35",
"btts": "BTTS",
"ht_result": "HT",
"ht_ou05": "HT_OU05",
"ht_ou15": "HT_OU15",
"htft": "HTFT",
"cards_ou45": "CARDS",
"handicap_ms": "HCAP",
"odd_even": "OE",
}
def _get_v25_signal(
self,
data: MatchData,
features: Optional[Dict[str, float]] = None,
) -> Dict[str, Any]:
"""
Get V25 ensemble predictions for all available markets.
Returns a dict keyed by UPPERCASE market name (MS, OU25, BTTS, etc.)
each with a 'probs' sub-dict that _prob_map can consume.
CRITICAL: Keys MUST be uppercase to match _build_v25_prediction lookups.
"""
v25 = self._get_v25_predictor()
feature_row = features or self._build_v25_features(data)
signal: Dict[str, Any] = {}
# ── League-specific model override ─────────────────────────────────
league_id = getattr(data, "league_id", None)
league_model = None
if league_id:
try:
league_model = get_league_model_loader().get(league_id)
except Exception:
league_model = None
if league_model:
# Predict all available markets with league-specific XGBoost
for mkey, sig_key in FILE_TO_SIGNAL.items():
probs = league_model.predict_market(mkey, feature_row)
if probs:
best_label = max(probs, key=probs.__getitem__)
signal[sig_key] = {
"probs": probs,
"raw_probs": probs,
"pick": best_label,
"probability": float(probs[best_label]),
"confidence": round(float(probs[best_label]) * 100.0, 1),
"source": "league_specific",
}
if signal:
print(f" [LEAGUE-MODEL] {league_id}: {len(signal)} markets predicted")
# Fill remaining markets from general V25 (markets not in league model)
# fall through to general prediction below for missing ones
def _temperature_scale(probs_dict: Dict[str, float], temperature: float = 1.5) -> Dict[str, float]:
"""
Apply temperature scaling to soften overconfident model outputs.
LightGBM often produces extreme probabilities (e.g., 0.999 / 0.001).
Temperature scaling converts to log-odds, divides by T, then re-normalizes.
T=1.0 → no change, T>1 → softer probabilities.
Standard approach for post-hoc model calibration (Guo et al., 2017).
V34: Reduced from 2.5 to 1.5 — V25 model is already calibrated via
odds-aware training. Excessive flattening was destroying signal.
"""
import math
eps = 1e-7 # numerical stability
n = len(probs_dict)
# V34: Reduced temperature — odds-aware model is already calibrated
# Binary markets (2-class) tend to be more overconfident in LGB
if n <= 2:
T = max(temperature, 1.5) # was 2.0
elif n == 3:
T = max(temperature * 0.8, 1.2) # was 1.5 — 3-way slightly less aggressive
else:
T = max(temperature * 0.6, 1.0) # was 1.3 — 9-way (HTFT) already spread
# Convert to log-odds and apply temperature
labels = list(probs_dict.keys())
log_odds = []
for label in labels:
p = max(eps, min(1.0 - eps, float(probs_dict[label])))
log_odds.append(math.log(p) / T)
# Softmax re-normalization
max_lo = max(log_odds)
exp_vals = [math.exp(lo - max_lo) for lo in log_odds]
total = sum(exp_vals)
scaled = {}
for i, label in enumerate(labels):
scaled[label] = exp_vals[i] / total
return scaled
calibrator = get_calibrator()
_temperature = float(get_config().get('model_ensemble.temperature', 1.5))
# Map (market_key, label) → calibrator market key
_CAL_KEY_MAP: Dict[str, str] = {
"ms_1": "ms_home", "ms_x": "ms_draw", "ms_2": "ms_away",
"ou15_over": "ou15", "ou15_under": "ou15",
"ou25_over": "ou25", "ou25_under": "ou25",
"ou35_over": "ou35", "ou35_under": "ou35",
"btts_yes": "btts", "btts_no": "btts",
"ht_1": "ht_home", "ht_x": "ht_draw", "ht_2": "ht_away",
}
def _enrich_signal_entry(probs_dict: Dict[str, float], market_key: str = "") -> Dict[str, Any]:
"""Temperature scaling + Isotonic calibration pipeline."""
scaled_probs = _temperature_scale(probs_dict, temperature=_temperature)
# Isotonic calibration per outcome (if trained models exist)
if market_key:
calibrated = {}
for label, prob in scaled_probs.items():
raw_key = f"{market_key}_{label}".lower().replace(" ", "_")
cal_key = _CAL_KEY_MAP.get(raw_key, raw_key)
calibrated[label] = calibrator.calibrate(cal_key, prob)
total = sum(calibrated.values())
if total > 0:
calibrated = {k: v / total for k, v in calibrated.items()}
scaled_probs = calibrated
best_label = max(scaled_probs, key=scaled_probs.__getitem__)
best_prob = float(scaled_probs[best_label])
return {
"probs": scaled_probs,
"raw_probs": probs_dict,
"pick": best_label,
"probability": best_prob,
"confidence": round(best_prob * 100.0, 1),
}
# Core markets using dedicated methods (skip if league model already covered them)
if "MS" not in signal:
h, d, a = v25.predict_ms(feature_row)
signal["MS"] = _enrich_signal_entry({"1": h, "X": d, "2": a}, "ms")
print(f" [V25-SIGNAL] MS → H={h:.4f} D={d:.4f} A={a:.4f}")
else:
print(f" [LEAGUE-MODEL] MS → {signal['MS']['probs']}")
if "OU25" not in signal:
over25, under25 = v25.predict_ou25(feature_row)
signal["OU25"] = _enrich_signal_entry({"Over": over25, "Under": under25}, "ou25")
print(f" [V25-SIGNAL] OU25 → O={over25:.4f} U={under25:.4f}")
if "BTTS" not in signal:
btts_y, btts_n = v25.predict_btts(feature_row)
signal["BTTS"] = _enrich_signal_entry({"Yes": btts_y, "No": btts_n}, "btts")
print(f" [V25-SIGNAL] BTTS → Y={btts_y:.4f} N={btts_n:.4f}")
# Additional markets via generic predict_market (skip if league model covered them)
for model_key, label_map in [
("ou15", {"Over": 0, "Under": None}),
("ou35", {"Over": 0, "Under": None}),
("ht_result", {"1": 0, "X": 1, "2": 2}),
("ht_ou05", {"Over": 0, "Under": None}),
("ht_ou15", {"Over": 0, "Under": None}),
("htft", None),
("cards_ou45", {"Over": 0, "Under": None}),
("handicap_ms", {"1": 0, "X": 1, "2": 2}),
("odd_even", {"Odd": 0, "Even": None}),
]:
out_key = str(self._V25_KEY_MAP.get(model_key, model_key.upper()))
if out_key in signal:
continue # already predicted by league-specific model
if not v25.has_market(model_key):
continue
raw = v25.predict_market(model_key, feature_row)
if raw is None:
continue
if label_map is None:
# HTFT — 9 combinations
htft_labels = ["1/1", "1/X", "1/2", "X/1", "X/X", "X/2", "2/1", "2/X", "2/2"]
probs_dict = {}
for i, label in enumerate(htft_labels):
probs_dict[label] = float(raw[i]) if i < len(raw) else 0.0
signal[out_key] = _enrich_signal_entry(probs_dict, model_key)
elif len(label_map) == 2:
# Binary market
labels = list(label_map.keys())
p = float(raw[0]) if len(raw) >= 1 else None
if p is None:
print(f" [V25-SIGNAL] {out_key} → EMPTY raw output, skipped")
continue
signal[out_key] = _enrich_signal_entry({labels[0]: p, labels[1]: 1.0 - p}, model_key)
elif len(label_map) == 3:
# 3-class market
labels = list(label_map.keys())
probs_dict = {}
for i, label in enumerate(labels):
if i >= len(raw):
print(f" [V25-SIGNAL] {out_key} → insufficient probabilities in raw output")
break
probs_dict[label] = float(raw[i])
else:
signal[out_key] = _enrich_signal_entry(probs_dict, model_key)
if out_key in signal:
print(f" [V25-SIGNAL] {out_key}{signal[out_key]['probs']}")
print(f" [V25-SIGNAL] Total markets with real predictions: {len(signal)}")
if not signal:
raise RuntimeError("V25 model produced ZERO market predictions — cannot continue")
return signal
@staticmethod
def _prob_map(signal: Optional[Dict[str, Any]], market: str, defaults: Dict[str, float]) -> Dict[str, float]:
"""Extract normalised probabilities from signal.
If the signal contains real model output for this market, use it.
If the market is missing from the signal, log a warning and return
the defaults as a LAST RESORT (so the pipeline doesn't crash).
The defaults are ONLY used for non-core / secondary markets that
may not have a trained model yet (e.g. CARDS, HCAP, OE).
"""
market_payload = signal.get(market, {}) if isinstance(signal, dict) else {}
probs = market_payload.get("probs", {}) if isinstance(market_payload, dict) else {}
if not isinstance(probs, dict) or not probs:
print(f" ⚠️ [PROB_MAP] Market '{market}' NOT found in V25 signal — model output missing")
return dict(defaults)
out = {key: float(probs.get(key, value)) for key, value in defaults.items()}
total = sum(out.values())
if total <= 0:
print(f" ⚠️ [PROB_MAP] Market '{market}' has zero total probability")
return dict(defaults)
return {key: value / total for key, value in out.items()}
@staticmethod
def _is_cup_game(league_name: str) -> bool:
"""Detect cup/knockout competitions where home advantage is significantly weaker."""
name = (league_name or "").lower()
cup_keywords = (
"kupa", "cup", "coupe", "copa", "coppa", "pokal",
"trophy", "shield", "challenge",
"ziraat", "süper kupa", "super cup",
)
return any(kw in name for kw in cup_keywords)
@staticmethod
def _best_prob_pick(prob_map: Dict[str, float]) -> Tuple[str, float]:
if not prob_map:
return "", 0.0
pick = max(prob_map, key=prob_map.__getitem__)
return pick, float(prob_map[pick])
@staticmethod
def _poisson_score_top5(home_xg: float, away_xg: float, max_goals: int = 5) -> List[Dict[str, Any]]:
def poisson_p(lmbda: float, k: int) -> float:
return math.exp(-lmbda) * (lmbda ** k) / math.factorial(k)
scores: List[Tuple[str, float]] = []
for home_goals in range(max_goals + 1):
for away_goals in range(max_goals + 1):
prob = poisson_p(home_xg, home_goals) * poisson_p(away_xg, away_goals)
scores.append((f"{home_goals}-{away_goals}", prob))
scores.sort(key=lambda item: item[1], reverse=True)
return [
{"score": score, "prob": round(prob, 4)}
for score, prob in scores[:5]
]
def _build_v25_prediction(
self,
data: MatchData,
features: Dict[str, float],
v25_signal: Dict[str, Any],
) -> FullMatchPrediction:
prediction = FullMatchPrediction(
match_id=data.match_id,
home_team=data.home_team_name,
away_team=data.away_team_name,
)
ms_probs = self._prob_map(v25_signal, "MS", {"1": 0.33, "X": 0.34, "2": 0.33})
ou15_probs = self._prob_map(v25_signal, "OU15", {"Under": 0.5, "Over": 0.5})
ou25_probs = self._prob_map(v25_signal, "OU25", {"Under": 0.5, "Over": 0.5})
ou35_probs = self._prob_map(v25_signal, "OU35", {"Under": 0.5, "Over": 0.5})
btts_probs = self._prob_map(v25_signal, "BTTS", {"No": 0.5, "Yes": 0.5})
ht_probs = self._prob_map(v25_signal, "HT", {"1": 0.33, "X": 0.34, "2": 0.33})
ht_ou05_probs = self._prob_map(v25_signal, "HT_OU05", {"Under": 0.5, "Over": 0.5})
ht_ou15_probs = self._prob_map(v25_signal, "HT_OU15", {"Under": 0.5, "Over": 0.5})
htft_probs = self._prob_map(
v25_signal,
"HTFT",
{"1/1": 1 / 9, "1/X": 1 / 9, "1/2": 1 / 9, "X/1": 1 / 9, "X/X": 1 / 9, "X/2": 1 / 9, "2/1": 1 / 9, "2/X": 1 / 9, "2/2": 1 / 9},
)
oe_probs = self._prob_map(v25_signal, "OE", {"Even": 0.5, "Odd": 0.5})
cards_probs = self._prob_map(v25_signal, "CARDS", {"Under": 0.5, "Over": 0.5})
hcap_probs = self._prob_map(v25_signal, "HCAP", {"1": 0.33, "X": 0.34, "2": 0.33})
# Cup game: dampen home advantage — model trained on league data overestimates home edge
is_cup = self._is_cup_game(getattr(data, "league_name", "") or "")
if is_cup:
# Shift 8% of home probability toward away and draw (rotation, neutral venue effect)
cup_transfer = ms_probs["1"] * 0.08
ms_probs = {
"1": ms_probs["1"] - cup_transfer,
"X": ms_probs["X"] + cup_transfer * 0.4,
"2": ms_probs["2"] + cup_transfer * 0.6,
}
total = sum(ms_probs.values())
ms_probs = {k: v / total for k, v in ms_probs.items()}
prediction.ms_home_prob = ms_probs["1"]
prediction.ms_draw_prob = ms_probs["X"]
prediction.ms_away_prob = ms_probs["2"]
prediction.ms_pick, ms_top = self._best_prob_pick(ms_probs)
prediction.ms_confidence = ms_top * 100.0
prediction.dc_1x_prob = prediction.ms_home_prob + prediction.ms_draw_prob
prediction.dc_x2_prob = prediction.ms_draw_prob + prediction.ms_away_prob
prediction.dc_12_prob = prediction.ms_home_prob + prediction.ms_away_prob
dc_probs = {"1X": prediction.dc_1x_prob, "X2": prediction.dc_x2_prob, "12": prediction.dc_12_prob}
prediction.dc_pick, dc_top = self._best_prob_pick(dc_probs)
prediction.dc_confidence = dc_top * 100.0
prediction.over_15_prob = ou15_probs["Over"]
prediction.under_15_prob = ou15_probs["Under"]
prediction.ou15_pick = "1.5 Üst" if prediction.over_15_prob >= prediction.under_15_prob else "1.5 Alt"
prediction.ou15_confidence = max(prediction.over_15_prob, prediction.under_15_prob) * 100.0
prediction.over_25_prob = ou25_probs["Over"]
prediction.under_25_prob = ou25_probs["Under"]
prediction.ou25_pick = "2.5 Üst" if prediction.over_25_prob >= prediction.under_25_prob else "2.5 Alt"
prediction.ou25_confidence = max(prediction.over_25_prob, prediction.under_25_prob) * 100.0
prediction.over_35_prob = ou35_probs["Over"]
prediction.under_35_prob = ou35_probs["Under"]
prediction.ou35_pick = "3.5 Üst" if prediction.over_35_prob >= prediction.under_35_prob else "3.5 Alt"
prediction.ou35_confidence = max(prediction.over_35_prob, prediction.under_35_prob) * 100.0
prediction.btts_yes_prob = btts_probs["Yes"]
prediction.btts_no_prob = btts_probs["No"]
prediction.btts_pick = "KG Var" if prediction.btts_yes_prob >= prediction.btts_no_prob else "KG Yok"
prediction.btts_confidence = max(prediction.btts_yes_prob, prediction.btts_no_prob) * 100.0
prediction.ht_home_prob = ht_probs["1"]
prediction.ht_draw_prob = ht_probs["X"]
prediction.ht_away_prob = ht_probs["2"]
prediction.ht_pick, ht_top = self._best_prob_pick(ht_probs)
prediction.ht_confidence = ht_top * 100.0
prediction.ht_over_05_prob = ht_ou05_probs["Over"]
prediction.ht_under_05_prob = ht_ou05_probs["Under"]
prediction.ht_ou_pick = "İY 0.5 Üst" if prediction.ht_over_05_prob >= prediction.ht_under_05_prob else "İY 0.5 Alt"
prediction.ht_over_15_prob = ht_ou15_probs["Over"]
prediction.ht_under_15_prob = ht_ou15_probs["Under"]
prediction.ht_ou15_pick = "İY 1.5 Üst" if prediction.ht_over_15_prob >= prediction.ht_under_15_prob else "İY 1.5 Alt"
prediction.ht_ft_probs = htft_probs
prediction.odd_prob = oe_probs["Odd"]
prediction.even_prob = oe_probs["Even"]
prediction.odd_even_pick = "Tek" if prediction.odd_prob >= prediction.even_prob else "Çift"
prediction.cards_over_prob = cards_probs["Over"]
prediction.cards_under_prob = cards_probs["Under"]
prediction.card_pick = "4.5 Üst" if prediction.cards_over_prob >= prediction.cards_under_prob else "4.5 Alt"
prediction.cards_confidence = max(prediction.cards_over_prob, prediction.cards_under_prob) * 100.0
prediction.handicap_home_prob = hcap_probs["1"]
prediction.handicap_draw_prob = hcap_probs["X"]
prediction.handicap_away_prob = hcap_probs["2"]
prediction.handicap_pick, hcap_top = self._best_prob_pick(hcap_probs)
prediction.handicap_confidence = hcap_top * 100.0
# ── Score Prediction: Model-first, heuristic fallback ──────────
ms_edge = prediction.ms_home_prob - prediction.ms_away_prob
score_result = self._predict_score_with_model(features)
if score_result is not None:
# ML model predicted scores
prediction.home_xg = score_result["ft_home"]
prediction.away_xg = score_result["ft_away"]
prediction.total_xg = round(prediction.home_xg + prediction.away_xg, 2)
ht_home_xg = score_result["ht_home"]
ht_away_xg = score_result["ht_away"]
prediction.predicted_ft_score = f"{int(round(prediction.home_xg))}-{int(round(prediction.away_xg))}"
prediction.predicted_ht_score = f"{int(round(ht_home_xg))}-{int(round(ht_away_xg))}"
else:
# Heuristic fallback (original formula)
base_home_xg = max(0.25, (float(data.home_goals_avg or 1.3) + float(features.get("away_xga", data.away_conceded_avg) or 1.2)) / 2.0)
base_away_xg = max(0.25, (float(data.away_goals_avg or 1.3) + float(features.get("home_xga", data.home_conceded_avg) or 1.2)) / 2.0)
# ms_edge already computed above
total_target = max(
1.4,
min(
4.8,
(float(features.get("league_avg_goals", 2.7)) * 0.55)
+ ((float(data.home_goals_avg or 1.3) + float(data.away_goals_avg or 1.3)) * 0.45)
+ ((prediction.over_25_prob - prediction.under_25_prob) * 1.15),
),
)
home_xg = max(0.2, base_home_xg + (ms_edge * 0.55) + ((prediction.btts_yes_prob - 0.5) * 0.18))
away_xg = max(0.2, base_away_xg - (ms_edge * 0.55) + ((prediction.btts_yes_prob - 0.5) * 0.18))
scale = total_target / max(home_xg + away_xg, 0.1)
prediction.home_xg = round(home_xg * scale, 2)
prediction.away_xg = round(away_xg * scale, 2)
prediction.total_xg = round(prediction.home_xg + prediction.away_xg, 2)
# Cup game: reduce xG by 20% — rotation + lower motivation + defensive tactics
if is_cup:
prediction.home_xg = round(prediction.home_xg * 0.80, 2)
prediction.away_xg = round(prediction.away_xg * 0.80, 2)
prediction.total_xg = round(prediction.home_xg + prediction.away_xg, 2)
prediction.predicted_ft_score = f"{int(round(prediction.home_xg))}-{int(round(prediction.away_xg))}"
prediction.predicted_ht_score = f"{int(round(prediction.home_xg * 0.45))}-{int(round(prediction.away_xg * 0.45))}"
prediction.ft_scores_top5 = self._poisson_score_top5(prediction.home_xg, prediction.away_xg)
# Score prediction: find the most likely scoreline consistent with the MS pick
# Instead of just rounding xG (misleading), filter Poisson top scores by result direction
ms_pick = prediction.ms_pick # "1", "X", or "2"
top5 = prediction.ft_scores_top5
if top5 and ms_pick in ("1", "X", "2"):
def _result_of(score_str: str) -> str:
try:
h, a = map(int, score_str.split("-"))
if h > a: return "1"
if h < a: return "2"
return "X"
except Exception:
return "?"
# Filter to scorelines matching the predicted result
matching = [s for s in top5 if _result_of(s["score"]) == ms_pick]
if matching:
best = matching[0] # already sorted by probability desc
h_str, a_str = best["score"].split("-")
prediction.predicted_ft_score = best["score"]
# Recalculate HT score proportionally from the FT pick
h_val, a_val = int(h_str), int(a_str)
prediction.predicted_ht_score = f"{int(round(h_val * 0.45))}-{int(round(a_val * 0.45))}"
max_market_conf = max(
prediction.ms_confidence,
prediction.ou15_confidence,
prediction.ou25_confidence,
prediction.ou35_confidence,
prediction.btts_confidence,
prediction.ht_confidence,
prediction.cards_confidence,
prediction.handicap_confidence,
)
lineup_conf = max(0.0, min(1.0, float(getattr(data, "lineup_confidence", 0.0) or 0.0)))
lineup_penalty = 12.0 if data.lineup_source == "none" else max(1.5, (1.0 - lineup_conf) * 8.0) if data.lineup_source == "probable_xi" else 0.0
referee_penalty = 6.0 if not data.referee_name else 0.0
parity_penalty = 8.0 if abs(ms_edge) < 0.08 else 0.0
# Cup game penalty: model trained on league data has lower reliability for cup matches
cup_penalty = 10.0 if is_cup else 0.0
# Bookmaker margin penalty: high margin signals that even the market is uncertain
bm_margin = 0.0
odds_data = getattr(data, "odds_data", {}) or {}
_h, _d, _a = float(odds_data.get("ms_h") or 0), float(odds_data.get("ms_d") or 0), float(odds_data.get("ms_a") or 0)
if _h > 1.01 and _d > 1.01 and _a > 1.01:
bm_margin = (1 / _h + 1 / _d + 1 / _a) - 1
bookmaker_penalty = 12.0 if bm_margin > 0.20 else 6.0 if bm_margin > 0.15 else 0.0
prediction.risk_score = round(min(100.0, max(10.0, 100.0 - max_market_conf + lineup_penalty + referee_penalty + parity_penalty + cup_penalty + bookmaker_penalty)), 1)
if prediction.risk_score >= 78:
prediction.risk_level = "EXTREME"
elif prediction.risk_score >= 62:
prediction.risk_level = "HIGH"
elif prediction.risk_score >= 40:
prediction.risk_level = "MEDIUM"
else:
prediction.risk_level = "LOW"
prediction.is_surprise_risk = prediction.risk_level in {"HIGH", "EXTREME"} or prediction.ms_draw_prob >= 0.30
prediction.surprise_type = "balanced_match_risk" if abs(ms_edge) < 0.08 else "draw_pressure" if prediction.ms_draw_prob >= 0.30 else ""
prediction.risk_warnings = []
if is_cup:
prediction.risk_warnings.append("cup_game_home_advantage_reduced")
if bookmaker_penalty > 0:
prediction.risk_warnings.append(f"bookmaker_margin_high_{bm_margin*100:.0f}pct")
if data.lineup_source == "probable_xi":
prediction.risk_warnings.append("lineup_probable_not_confirmed")
if lineup_conf < 0.65:
prediction.risk_warnings.append("lineup_projection_low_confidence")
if data.lineup_source == "none":
prediction.risk_warnings.append("lineup_unavailable")
if not data.referee_name:
prediction.risk_warnings.append("missing_referee")
if prediction.ms_draw_prob >= 0.30:
prediction.risk_warnings.append("draw_probability_elevated")
prediction.upset_score = int(round(max(0.0, min(100.0, (prediction.ms_draw_prob + min(prediction.ms_home_prob, prediction.ms_away_prob)) * 100.0))))
prediction.upset_level = "HIGH" if prediction.upset_score >= 65 else "MEDIUM" if prediction.upset_score >= 45 else "LOW"
prediction.upset_reasons = [prediction.surprise_type] if prediction.surprise_type else []
surprise = self._build_surprise_profile(data, prediction)
prediction.surprise_score = surprise["score"]
prediction.surprise_comment = surprise["comment"]
prediction.surprise_reasons = surprise["reasons"]
prediction.surprise_breakdown = surprise.get("breakdown", [])
# Auto-flag is_surprise_risk when score crosses 45 even if other paths didn't fire
if surprise["score"] >= 45.0:
prediction.is_surprise_risk = True
prediction.team_confidence = round(max(35.0, min(95.0, 45.0 + (abs(ms_edge) * 85.0) + (abs(float(features.get("form_elo_diff", 0.0))) / 40.0))), 1)
prediction.player_confidence = round(max(20.0, min(95.0, 38.0 + (float(features.get("home_key_players", 0.0)) + float(features.get("away_key_players", 0.0))) * 2.0 - (float(features.get("home_missing_impact", 0.0)) + float(features.get("away_missing_impact", 0.0))) * 22.0)), 1)
prediction.odds_confidence = round(max(30.0, min(95.0, float(np.mean([prediction.ms_confidence, prediction.ou25_confidence, prediction.btts_confidence])))), 1)
prediction.referee_confidence = 62.0 if data.referee_name else 35.0
prediction.total_cards_pred = 4.8 if prediction.cards_over_prob >= prediction.cards_under_prob else 4.1
prediction.total_corners_pred = round(8.8 + (prediction.over_25_prob - 0.5) * 2.5, 1)
prediction.corner_pick = "9.5 Üst" if prediction.total_corners_pred >= 9.5 else "9.5 Alt"
prediction.analysis_details = {
"primary_model": "v25",
"features_source": "v25.pre_match",
"market_count": len([key for key in v25_signal.keys() if key != "value_bets"]),
"lineup_source": data.lineup_source,
}
return prediction
def _build_engine_breakdown(self, prediction: FullMatchPrediction) -> Dict[str, Any]:
"""
Engine breakdown with backward-compatible flat scores + rich detail siblings.
Shape:
{
team: 74.1, player: 55.7, odds: 55.2, referee: 62.0, # legacy flat scores
detail: { team: {score, label, ...}, player: {...}, ... }
}
"""
components = {
"team": ("Takım modeli", float(prediction.team_confidence)),
"player": ("Oyuncu / kadro modeli", float(prediction.player_confidence)),
"odds": ("Oran piyasası", float(prediction.odds_confidence)),
"referee": ("Hakem etkisi", float(prediction.referee_confidence)),
}
flat: Dict[str, Any] = {}
detail: Dict[str, Any] = {}
for key, (display, raw) in components.items():
score = round(raw, 1)
label, interpretation = self._confidence_label(score)
flat[key] = score
detail[key] = {
"score": score,
"label": label,
"display_name": display,
"interpretation": interpretation,
}
flat["detail"] = detail
return flat
+469
View File
@@ -0,0 +1,469 @@
"""Reversal Mixin — HT/FT reversal watchlist and cycle metrics.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class ReversalMixin:
def get_reversal_watchlist(
self,
count: int = 20,
horizon_hours: int = 72,
min_score: float = 45.0,
top_leagues_only: bool = False,
) -> Dict[str, Any]:
safe_count = max(1, min(100, int(count)))
safe_horizon = max(6, min(168, int(horizon_hours)))
safe_min_score = max(0.0, min(100.0, float(min_score)))
now_ms = int(time.time() * 1000)
horizon_ms = now_ms + (safe_horizon * 60 * 60 * 1000)
with psycopg2.connect(self.dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT
lm.id,
lm.home_team_id,
lm.away_team_id,
lm.league_id,
lm.mst_utc
FROM live_matches lm
WHERE lm.sport = 'football'
AND lm.mst_utc >= %s
AND lm.mst_utc <= %s
ORDER BY lm.mst_utc ASC
LIMIT 200
""",
(now_ms, horizon_ms),
)
raw_candidates = cur.fetchall()
candidates = [
row
for row in raw_candidates
if row.get("home_team_id") and row.get("away_team_id")
]
if top_leagues_only:
candidates = [
row for row in candidates if self._is_top_league(row.get("league_id"))
]
team_ids: Set[str] = set()
pair_keys: Set[Tuple[str, str]] = set()
for row in candidates:
home_id = str(row["home_team_id"])
away_id = str(row["away_team_id"])
team_ids.add(home_id)
team_ids.add(away_id)
h, a = sorted((home_id, away_id))
pair_keys.add((h, a))
team_cycle = self._fetch_team_reversal_cycle_metrics(cur, team_ids, now_ms)
h2h_ctx = self._fetch_h2h_reversal_context(cur, pair_keys, now_ms)
watch_items_all: List[Dict[str, Any]] = []
scanned = 0
for row in candidates:
match_id = str(row["id"])
data = self._load_match_data(match_id)
if data is None:
continue
package = self.analyze_match(match_id)
if not package:
continue
scanned += 1
htft_probs = package.get("market_board", {}).get("HTFT", {}).get("probs", {})
prob_12 = float(htft_probs.get("1/2", 0.0))
prob_21 = float(htft_probs.get("2/1", 0.0))
if prob_12 <= 0.0 and prob_21 <= 0.0:
continue
overall_htft_pick = None
overall_htft_prob = 0.0
if htft_probs:
overall_htft_pick, overall_htft_prob = max(
htft_probs.items(),
key=lambda item: float(item[1]),
)
reversal_sum = prob_12 + prob_21
reversal_max = max(prob_12, prob_21)
top_pick = "2/1" if prob_21 >= prob_12 else "1/2"
top_prob = prob_21 if top_pick == "2/1" else prob_12
ms_h = self._to_float(data.odds_data.get("ms_h"), 0.0)
ms_a = self._to_float(data.odds_data.get("ms_a"), 0.0)
gap = abs(ms_h - ms_a) if ms_h > 1.0 and ms_a > 1.0 else 0.0
favorite_odd = min(ms_h, ms_a) if ms_h > 1.0 and ms_a > 1.0 else 0.0
# Reversal events are rare (~5% baseline), so convert raw probs to a more useful
# watchlist scale where p in [0.02, 0.08] becomes meaningfully separable.
base_score = (reversal_max * 100.0 * 8.0) + (reversal_sum * 100.0 * 4.0)
balance_bonus = 0.0
if gap > 0.0:
balance_bonus = max(0.0, (1.0 - min(gap, 1.2) / 1.2) * 7.0)
elif ms_h > 1.0 and ms_a > 1.0:
balance_bonus = 2.0
favorite_bonus = 0.0
if favorite_odd > 0.0 and favorite_odd <= 1.70 and reversal_max >= 0.02:
favorite_bonus = min(8.0, (1.70 - favorite_odd) * 12.0)
home_metrics = team_cycle.get(data.home_team_id, {})
away_metrics = team_cycle.get(data.away_team_id, {})
cycle_pressure = max(
float(home_metrics.get("cycle_pressure", 0.0)),
float(away_metrics.get("cycle_pressure", 0.0)),
)
cycle_bonus = cycle_pressure * 10.0
h, a = sorted((data.home_team_id, data.away_team_id))
pair_key = (h, a)
pair_ctx = h2h_ctx.get(pair_key, {})
blowout_bonus = 0.0
last_diff = int(pair_ctx.get("goal_diff", 0))
if abs(last_diff) >= 3:
blowout_bonus = 6.0
if abs(last_diff) >= 5:
blowout_bonus += 3.0
ou25_o = self._to_float(data.odds_data.get("ou25_o"), 0.0)
tempo_bonus = 0.0
if ou25_o > 1.0 and ou25_o <= 1.72:
tempo_bonus = 2.5
watch_score = max(
0.0,
min(
100.0,
base_score + balance_bonus + favorite_bonus + cycle_bonus + blowout_bonus + tempo_bonus,
),
)
reason_codes: List[str] = []
if top_prob >= 0.045:
reason_codes.append("reversal_prob_hot")
elif top_prob >= 0.030:
reason_codes.append("reversal_prob_warm")
if gap > 0.0 and gap <= 0.80:
reason_codes.append("balanced_matchup")
if favorite_bonus > 0.0:
reason_codes.append("strong_favorite_reversal_window")
if cycle_pressure >= 0.55:
reason_codes.append("team_reversal_cycle_pressure")
if blowout_bonus > 0.0:
reason_codes.append("h2h_blowout_rematch")
if tempo_bonus > 0.0:
reason_codes.append("high_tempo_profile")
if not reason_codes:
reason_codes.append("model_signal_only")
item = (
{
"match_id": data.match_id,
"match_name": f"{data.home_team_name} vs {data.away_team_name}",
"match_date_ms": data.match_date_ms,
"league_id": data.league_id,
"league": data.league_name,
"risk_band": self._watchlist_risk_band(watch_score),
"watch_score": round(watch_score, 2),
"top_pick": top_pick,
"top_pick_prob": round(top_prob, 4),
"top_pick_scope": "reversal_only",
"overall_htft_pick": overall_htft_pick,
"overall_htft_pick_prob": round(float(overall_htft_prob), 4),
"reversal_probs": {
"1/2": round(prob_12, 4),
"2/1": round(prob_21, 4),
},
"odds_snapshot": {
"ms_h": round(ms_h, 2) if ms_h > 0 else None,
"ms_a": round(ms_a, 2) if ms_a > 0 else None,
"ms_gap": round(gap, 3),
"favorite_odd": round(favorite_odd, 2) if favorite_odd > 0 else None,
},
"pattern_signals": {
"home_cycle_pressure": round(float(home_metrics.get("cycle_pressure", 0.0)), 3),
"away_cycle_pressure": round(float(away_metrics.get("cycle_pressure", 0.0)), 3),
"home_matches_since_last_reversal": int(home_metrics.get("matches_since_last_reversal", 99)),
"away_matches_since_last_reversal": int(away_metrics.get("matches_since_last_reversal", 99)),
"h2h_last_goal_diff": last_diff if pair_ctx else None,
"h2h_last_result": pair_ctx.get("result"),
},
"reason_codes": reason_codes,
}
)
watch_items_all.append(item)
watch_items_all.sort(
key=lambda item: (
float(item.get("watch_score", 0.0)),
float(item.get("top_pick_prob", 0.0)),
),
reverse=True,
)
selected = [
item for item in watch_items_all if float(item.get("watch_score", 0.0)) >= safe_min_score
][:safe_count]
preview = watch_items_all[: min(5, len(watch_items_all))]
return {
"engine": "v28.main",
"generated_at": __import__("datetime").datetime.utcnow().isoformat() + "Z",
"horizon_hours": safe_horizon,
"min_score": round(safe_min_score, 2),
"top_leagues_only": bool(top_leagues_only),
"scanned_matches": scanned,
"candidate_matches": len(candidates),
"listed_matches": len(selected),
"watchlist": selected,
"top_candidates_preview": preview,
}
def _fetch_team_reversal_cycle_metrics(
self,
cur: RealDictCursor,
team_ids: Set[str],
now_ms: int,
) -> Dict[str, Dict[str, float]]:
if not team_ids:
return {}
cur.execute(
"""
WITH team_matches AS (
SELECT
m.home_team_id AS team_id,
m.mst_utc,
CASE
WHEN m.ht_score_home > m.ht_score_away THEN 'L'
WHEN m.ht_score_home < m.ht_score_away THEN 'T'
ELSE 'D'
END AS ht_state,
CASE
WHEN m.score_home > m.score_away THEN 'W'
WHEN m.score_home < m.score_away THEN 'L'
ELSE 'D'
END AS ft_state
FROM matches m
WHERE m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.ht_score_home IS NOT NULL
AND m.ht_score_away IS NOT NULL
AND m.home_team_id = ANY(%s)
AND m.mst_utc < %s
UNION ALL
SELECT
m.away_team_id AS team_id,
m.mst_utc,
CASE
WHEN m.ht_score_away > m.ht_score_home THEN 'L'
WHEN m.ht_score_away < m.ht_score_home THEN 'T'
ELSE 'D'
END AS ht_state,
CASE
WHEN m.score_away > m.score_home THEN 'W'
WHEN m.score_away < m.score_home THEN 'L'
ELSE 'D'
END AS ft_state
FROM matches m
WHERE m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.ht_score_home IS NOT NULL
AND m.ht_score_away IS NOT NULL
AND m.away_team_id = ANY(%s)
AND m.mst_utc < %s
),
ranked AS (
SELECT
team_id,
mst_utc,
ht_state,
ft_state,
ROW_NUMBER() OVER (PARTITION BY team_id ORDER BY mst_utc DESC) AS rn
FROM team_matches
)
SELECT team_id, mst_utc, ht_state, ft_state
FROM ranked
WHERE rn <= 80
ORDER BY team_id ASC, mst_utc DESC
""",
(list(team_ids), now_ms, list(team_ids), now_ms),
)
rows = cur.fetchall()
by_team: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for row in rows:
by_team[str(row["team_id"])].append(row)
out: Dict[str, Dict[str, float]] = {}
for team_id in team_ids:
team_rows = by_team.get(str(team_id), [])
if not team_rows:
out[str(team_id)] = {
"recent_reversal_rate": 0.0,
"matches_since_last_reversal": 99.0,
"avg_gap_matches": 12.0,
"cycle_pressure": 0.0,
}
continue
reversal_indexes: List[int] = []
recent_reversal = 0
recent_n = min(15, len(team_rows))
for idx, row in enumerate(team_rows, start=1):
ht_state = str(row.get("ht_state") or "")
ft_state = str(row.get("ft_state") or "")
is_reversal = (ht_state == "L" and ft_state == "L") or (ht_state == "T" and ft_state == "W")
if idx <= recent_n and is_reversal:
recent_reversal += 1
if is_reversal:
reversal_indexes.append(idx)
recent_rate = (recent_reversal / recent_n) if recent_n > 0 else 0.0
since_last = float(reversal_indexes[0]) if reversal_indexes else 99.0
gaps: List[float] = []
if len(reversal_indexes) >= 2:
for i in range(1, len(reversal_indexes)):
gaps.append(float(reversal_indexes[i] - reversal_indexes[i - 1]))
avg_gap = (sum(gaps) / len(gaps)) if gaps else 12.0
if avg_gap <= 0:
avg_gap = 12.0
cycle_pressure = 0.0
if reversal_indexes:
tolerance = max(3.0, avg_gap * 0.7)
diff = abs(since_last - avg_gap)
cycle_pressure = max(0.0, 1.0 - (diff / tolerance))
out[str(team_id)] = {
"recent_reversal_rate": round(recent_rate, 4),
"matches_since_last_reversal": round(since_last, 2),
"avg_gap_matches": round(avg_gap, 2),
"cycle_pressure": round(cycle_pressure, 4),
}
return out
def _fetch_h2h_reversal_context(
self,
cur: RealDictCursor,
pair_keys: Set[Tuple[str, str]],
now_ms: int,
) -> Dict[Tuple[str, str], Dict[str, Any]]:
if not pair_keys:
return {}
team_ids = sorted({team_id for pair in pair_keys for team_id in pair})
cur.execute(
"""
SELECT
m.home_team_id,
m.away_team_id,
m.score_home,
m.score_away,
m.ht_score_home,
m.ht_score_away,
m.mst_utc
FROM matches m
WHERE m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.home_team_id = ANY(%s)
AND m.away_team_id = ANY(%s)
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT 4000
""",
(team_ids, team_ids, now_ms),
)
rows = cur.fetchall()
out: Dict[Tuple[str, str], Dict[str, Any]] = {}
for row in rows:
home_id = str(row["home_team_id"])
away_id = str(row["away_team_id"])
h, a = sorted((home_id, away_id))
key = (h, a)
if key not in pair_keys or key in out:
continue
score_home = int(row["score_home"])
score_away = int(row["score_away"])
goal_diff = score_home - score_away
out[key] = {
"goal_diff": goal_diff,
"result": f"{score_home}-{score_away}",
"match_date_ms": int(row["mst_utc"] or 0),
}
if len(out) >= len(pair_keys):
break
return out
@staticmethod
def _watchlist_risk_band(score: float) -> str:
if score >= 68.0:
return "HIGH"
if score >= 54.0:
return "MEDIUM"
return "LOW"
@@ -0,0 +1,350 @@
"""Upper Brain Mixin — V27 cross-check guards and assessments.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class UpperBrainMixin:
def _apply_upper_brain_guards(self, package: Dict[str, Any]) -> Dict[str, Any]:
return BettingBrain().judge(package)
v27_engine = package.get("v27_engine")
if not isinstance(v27_engine, dict) or not v27_engine.get("triple_value"):
return package
guarded = dict(package)
vetoed_keys = set()
guarded_keys = set()
def mark_guard(item: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(item, dict):
return item
out = dict(item)
assessment = self._upper_brain_assessment(out, guarded)
if not assessment.get("applies"):
return out
key = f"{out.get('market')}:{out.get('pick')}"
guarded_keys.add(key)
out["upper_brain"] = assessment
reason_key = "decision_reasons" if "decision_reasons" in out else "reasons"
reasons = list(out.get(reason_key) or [])
for reason in assessment.get("reason_codes", []):
if reason not in reasons:
reasons.append(reason)
out[reason_key] = reasons[:6]
if assessment.get("veto"):
vetoed_keys.add(key)
out["playable"] = False
out["stake_units"] = 0.0
out["bet_grade"] = "PASS"
out["is_guaranteed"] = False
out["pick_reason"] = "upper_brain_veto"
if "signal_tier" in out:
out["signal_tier"] = "PASS"
elif assessment.get("downgrade"):
out["is_guaranteed"] = False
if out.get("signal_tier") == "CORE":
out["signal_tier"] = "LEAN"
if out.get("pick_reason") == "high_accuracy_market":
out["pick_reason"] = "upper_brain_downgraded"
return out
main_pick = mark_guard(guarded.get("main_pick") or {})
value_pick = mark_guard(guarded.get("value_pick") or {}) if guarded.get("value_pick") else None
supporting = [
mark_guard(row)
for row in list(guarded.get("supporting_picks") or [])
if isinstance(row, dict)
]
bet_summary = [
mark_guard(row)
for row in list(guarded.get("bet_summary") or [])
if isinstance(row, dict)
]
main_safe = bool(main_pick and main_pick.get("playable") and not main_pick.get("upper_brain", {}).get("veto"))
if not main_safe:
candidates = [
row for row in supporting
if row.get("playable")
and not row.get("upper_brain", {}).get("veto")
and float(row.get("odds", 0.0) or 0.0) >= 1.30
]
candidates.sort(key=lambda row: float(row.get("play_score", 0.0) or 0.0), reverse=True)
if candidates:
main_pick = dict(candidates[0])
main_pick["is_guaranteed"] = False
main_pick["pick_reason"] = "upper_brain_reselected"
reasons = list(main_pick.get("decision_reasons") or [])
if "upper_brain_reselected_after_veto" not in reasons:
reasons.append("upper_brain_reselected_after_veto")
main_pick["decision_reasons"] = reasons[:6]
elif main_pick:
main_pick["is_guaranteed"] = False
main_pick["pick_reason"] = "upper_brain_no_safe_pick"
if main_pick:
supporting = [
row for row in supporting
if not (
row.get("market") == main_pick.get("market")
and row.get("pick") == main_pick.get("pick")
)
][:6]
guarded["main_pick"] = main_pick if main_pick else None
guarded["value_pick"] = value_pick
guarded["supporting_picks"] = supporting
guarded["bet_summary"] = bet_summary
playable = bool(main_pick and main_pick.get("playable") and not main_pick.get("upper_brain", {}).get("veto"))
advice = dict(guarded.get("bet_advice") or {})
advice["playable"] = playable
advice["suggested_stake_units"] = float(main_pick.get("stake_units", 0.0)) if playable else 0.0
if playable:
advice["reason"] = "playable_pick_found"
elif vetoed_keys:
advice["reason"] = "upper_brain_no_safe_pick"
else:
advice["reason"] = "no_bet_conditions_met"
guarded["bet_advice"] = advice
guarded["upper_brain"] = {
"applied": True,
"guarded_count": len(guarded_keys),
"vetoed_count": len(vetoed_keys),
"vetoed": sorted(vetoed_keys)[:8],
"rules": {
"min_band_sample": 8,
"max_v25_v27_divergence": 0.18,
"dc_requires_triple_value": True,
},
}
guarded.setdefault("analysis_details", {})
guarded["analysis_details"]["upper_brain_guards_applied"] = True
guarded["analysis_details"]["upper_brain_vetoed_count"] = len(vetoed_keys)
return guarded
def _upper_brain_assessment(
self,
item: Dict[str, Any],
package: Dict[str, Any],
) -> Dict[str, Any]:
market = str(item.get("market") or "")
pick = str(item.get("pick") or "")
if not market or not pick:
return {"applies": False}
v27_engine = package.get("v27_engine") or {}
triple_value = v27_engine.get("triple_value") or {}
model_prob = self._upper_brain_market_probability(item, package)
v27_prob = self._upper_brain_v27_probability(market, pick, v27_engine)
triple_key = self._upper_brain_triple_key(market, pick)
triple = triple_value.get(triple_key) if triple_key else None
veto = False
downgrade = False
reasons: List[str] = []
divergence = None
if model_prob is not None and v27_prob is not None:
divergence = abs(float(model_prob) - float(v27_prob))
if divergence >= 0.18:
veto = True
reasons.append("upper_brain_v25_v27_divergence")
elif divergence >= 0.12:
downgrade = True
reasons.append("upper_brain_v25_v27_warning")
if isinstance(triple, dict):
band_sample = int(float(triple.get("band_sample", 0) or 0))
is_value = bool(triple.get("is_value"))
if market == "DC":
if band_sample < 8:
veto = True
reasons.append("upper_brain_band_sample_too_low")
elif not is_value:
veto = True
reasons.append("upper_brain_triple_value_rejected")
elif market in {"MS", "OU25"} and band_sample > 0 and band_sample < 8:
downgrade = True
reasons.append("upper_brain_band_sample_thin")
elif market in {"OU15", "HT_OU05"} and band_sample < 8:
downgrade = True
reasons.append("upper_brain_band_sample_thin")
consensus = str(v27_engine.get("consensus") or "").upper()
if consensus == "DISAGREE" and market in {"MS", "DC"} and not veto:
downgrade = True
reasons.append("upper_brain_consensus_disagree")
applies = bool(reasons or triple is not None or v27_prob is not None)
return {
"applies": applies,
"veto": veto,
"downgrade": downgrade,
"reason_codes": reasons,
"model_prob": round(float(model_prob), 4) if model_prob is not None else None,
"v27_prob": round(float(v27_prob), 4) if v27_prob is not None else None,
"divergence": round(float(divergence), 4) if divergence is not None else None,
"triple_key": triple_key,
"triple_value": triple,
}
def _upper_brain_market_probability(
self,
item: Dict[str, Any],
package: Dict[str, Any],
) -> Optional[float]:
raw_prob = item.get("probability")
if raw_prob is not None:
try:
return float(raw_prob)
except (TypeError, ValueError):
pass
market = str(item.get("market") or "")
pick = str(item.get("pick") or "")
board = package.get("market_board") or {}
payload = board.get(market) if isinstance(board, dict) else None
probs = payload.get("probs") if isinstance(payload, dict) else None
if not isinstance(probs, dict):
return None
prob_key = self._upper_brain_prob_key(market, pick)
if prob_key is None:
return None
return self._safe_float(probs.get(prob_key))
def _upper_brain_v27_probability(
self,
market: str,
pick: str,
v27_engine: Dict[str, Any],
) -> Optional[float]:
predictions = v27_engine.get("predictions") or {}
ms = predictions.get("ms") or {}
ou25 = predictions.get("ou25") or {}
if market == "MS":
ms_key = {"1": "home", "X": "draw", "2": "away"}.get(pick or "")
return self._safe_float(ms.get(ms_key), 0.0) if ms_key else 0.0
if market == "DC":
if pick == "1X":
return self._safe_float(ms.get("home"), 0.0) + self._safe_float(ms.get("draw"), 0.0)
if pick == "X2":
return self._safe_float(ms.get("draw"), 0.0) + self._safe_float(ms.get("away"), 0.0)
if pick == "12":
return self._safe_float(ms.get("home"), 0.0) + self._safe_float(ms.get("away"), 0.0)
if market == "OU25":
prob_key = self._upper_brain_prob_key(market, pick)
return self._safe_float(ou25.get(prob_key), 0.0) if prob_key else 0.0
return 0.0
@staticmethod
def _upper_brain_prob_key(market: str, pick: str) -> Optional[str]:
pick_norm = str(pick or "").strip().casefold()
if market in {"MS", "HT", "HCAP"}:
return pick if pick in {"1", "X", "2"} else None
if market == "DC":
return pick.upper() if pick.upper() in {"1X", "X2", "12"} else None
if market in {"OU15", "OU25", "OU35", "HT_OU05", "HT_OU15", "CARDS"}:
if "over" in pick_norm or "st" in pick_norm:
return "over"
if "under" in pick_norm or "alt" in pick_norm:
return "under"
if market == "BTTS":
if "yes" in pick_norm or "var" in pick_norm:
return "yes"
if "no" in pick_norm or "yok" in pick_norm:
return "no"
if market == "OE":
if "odd" in pick_norm or "tek" in pick_norm:
return "odd"
if "even" in pick_norm or "ift" in pick_norm:
return "even"
if market == "HTFT" and "/" in pick:
return pick
return None
def _upper_brain_triple_key(self, market: str, pick: str) -> Optional[str]:
prob_key = self._upper_brain_prob_key(market, pick)
if market == "MS":
return {"1": "home", "2": "away"}.get(pick)
if market == "DC":
return f"dc_{pick.lower()}" if pick.upper() in {"1X", "X2", "12"} else None
if market in {"OU15", "OU25", "OU35"} and prob_key == "over":
return f"{market.lower()}_over"
if market == "BTTS" and prob_key == "yes":
return "btts_yes"
if market == "HT":
return {"1": "ht_home", "2": "ht_away"}.get(pick)
if market in {"HT_OU05", "HT_OU15"} and prob_key == "over":
return f"{market.lower()}_over"
if market == "OE" and prob_key == "odd":
return "oe_odd"
if market == "CARDS" and prob_key == "over":
return "cards_over"
if market == "HTFT" and "/" in pick:
return f"htft_{pick.replace('/', '').lower()}"
return None
+174
View File
@@ -0,0 +1,174 @@
"""Utility Mixin — generic helpers (safe_float, label normalisation, JSON parsing).
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class UtilsMixin:
@staticmethod
@overload
def _safe_float(value: Any, default: float) -> float: ...
@staticmethod
@overload
def _safe_float(value: Any, default: None = ...) -> Optional[float]: ...
@staticmethod
def _safe_float(value: Any, default: Optional[float] = None) -> Optional[float]:
try:
return float(value)
except (TypeError, ValueError):
return default
@staticmethod
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
@staticmethod
def _calibrator_key(market: str, pick: str) -> Optional[str]:
"""Map (market, pick) → trained-calibrator key in models/calibration."""
m = (market or "").upper()
p = (pick or "").strip().casefold()
if m == "MS":
if p == "1":
return "ms_home"
if p == "x" or p == "0":
return "ms_draw"
if p == "2":
return "ms_away"
return None
if m == "DC":
return "dc"
if m == "OU15" and ("over" in p or "üst" in p or "ust" in p):
return "ou15"
if m == "OU25" and ("over" in p or "üst" in p or "ust" in p):
return "ou25"
if m == "OU35" and ("over" in p or "üst" in p or "ust" in p):
return "ou35"
if m == "BTTS" and ("yes" in p or "var" in p):
return "btts"
if m == "HT":
if p == "1":
return "ht_home"
if p == "x" or p == "0":
return "ht_draw"
if p == "2":
return "ht_away"
return None
if m == "HTFT":
return "ht_ft"
return None
@staticmethod
def _confidence_label(score: float) -> Tuple[str, str]:
"""Turkish UX label + interpretation for a 0-100 confidence score."""
if score >= 75:
return "YUKSEK", "Bu sinyal güçlü ve güvenilir"
if score >= 60:
return "ORTA", "Sinyal makul, çelişen veri yok"
if score >= 45:
return "DUSUK", "Sinyal zayıf, dikkatli yorumla"
return "COK_DUSUK", "Veri yetersiz veya çelişkili — bu motoru bu maç için ihmal et"
@staticmethod
def _to_float(value: Any, default: float) -> float:
try:
if value is None:
return default
return float(value)
except Exception:
return default
@staticmethod
def _normalize_text(value: Any) -> str:
text = str(value or "").casefold().replace("", "i")
return " ".join(text.split())
def _selection_value(
self,
selections: Dict[str, Any],
aliases: Tuple[str, ...],
default: float,
) -> float:
if not isinstance(selections, dict):
return default
normalized_aliases = {self._normalize_text(alias) for alias in aliases}
for key, value in selections.items():
key_norm = self._normalize_text(key)
if key_norm in normalized_aliases:
return self._to_float(value, default)
# Secondary match for entries like "2,5 Üst" or "Toplam Alt"
for key, value in selections.items():
key_norm = self._normalize_text(key)
if any(alias in key_norm for alias in normalized_aliases):
return self._to_float(value, default)
return default
def _parse_json_dict(self, payload: Any) -> Optional[Dict[str, Any]]:
if isinstance(payload, str):
try:
payload = json.loads(payload)
except Exception:
return None
return payload if isinstance(payload, dict) else None