first (part 2: other directories)
Deploy Iddaai Backend / build-and-deploy (push) Failing after 18s

This commit is contained in:
2026-04-16 15:11:25 +03:00
parent 7814e0bc6b
commit 2f0b85a0c7
203 changed files with 59989 additions and 0 deletions
+8
View File
@@ -0,0 +1,8 @@
from .base_calculator import BaseCalculator, CalculationContext
from .match_result_calculator import MatchResultCalculator
from .over_under_calculator import OverUnderCalculator
from .half_time_calculator import HalfTimeCalculator
from .score_calculator import ScoreCalculator
from .other_markets_calculator import OtherMarketsCalculator
from .risk_assessor import RiskAssessor
from .bet_recommender import BetRecommender, MarketPredictionDTO
+53
View File
@@ -0,0 +1,53 @@
"""
Base classes and context dataclass for all calculators.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class CalculationContext:
"""Context object holding all inputs for calculators."""
team_pred: Any
player_pred: Any
odds_pred: Any
referee_pred: Any
upset_factors: Any
weights: dict[str, float]
player_mods: dict[str, float]
referee_mods: dict[str, float]
match_id: str
home_team_name: str
away_team_name: str
odds_data: dict[str, float]
home_xg: float
away_xg: float
total_xg: float
league_id: str | None = None
sport: str = "football"
is_top_league: bool = False
# Risk info (populated later)
risk_level: str = "MEDIUM"
is_surprise: bool = False
# XGBoost Predictions (New)
xgboost_preds: dict[str, dict[str, Any]] = field(default_factory=dict)
class BaseCalculator:
"""Base class for all market calculators."""
def __init__(self, config: dict[str, Any]) -> None:
self.config = config
def calculate(self, ctx: CalculationContext) -> dict[str, Any]:
raise NotImplementedError("Subclasses must implement calculate()")
+210
View File
@@ -0,0 +1,210 @@
from dataclasses import dataclass, field
from typing import List, Optional, Any
from .base_calculator import BaseCalculator, CalculationContext
from .match_result_calculator import MatchResultPrediction
from .over_under_calculator import OverUnderPrediction
from .risk_assessor import RiskAnalysis
@dataclass
class MarketPredictionDTO:
market_type: str
pick: str
probability: float
confidence: float
odds: float = 0.0
is_recommended: bool = False
is_value_bet: bool = False
edge: float = 0.0
is_skip: bool = False # NEW: If model is unsure, mark as skip
@dataclass
class RecommendationResult:
best_bet: Optional[MarketPredictionDTO]
recommended_bets: List[MarketPredictionDTO]
alternative_bet: Optional[MarketPredictionDTO]
value_bets: List[MarketPredictionDTO]
skipped_bets: List[MarketPredictionDTO] # NEW: Track what we decided NOT to predict
class BetRecommender(BaseCalculator):
def calculate(self,
ctx: CalculationContext,
ms_res: MatchResultPrediction,
ou_res: OverUnderPrediction,
risk: RiskAnalysis) -> RecommendationResult:
odds_data = ctx.odds_data
# Market-Specific Minimum Confidence Thresholds (Hard Gates)
# Below these, we say "I don't know" (SKIP)
min_conf_thresholds = {
"MS": 45.0, # 3-way is hard, need at least 45%
"ÇŞ": 40.0, # Double chance is safer, but still need 40%
"1.5 Üst/Alt": 50.0,
"2.5 Üst/Alt": 45.0,
"3.5 Üst/Alt": 45.0,
"BTTS": 45.0,
"HT": 40.0,
}
# Prepare candidates
markets = [
MarketPredictionDTO("MS", ms_res.ms_pick,
ms_res.ms_home_prob if ms_res.ms_pick == "1" else (ms_res.ms_away_prob if ms_res.ms_pick == "2" else ms_res.ms_draw_prob),
ms_res.ms_confidence,
odds_data.get(f"ms_{ms_res.ms_pick.lower()}", 0)),
MarketPredictionDTO("ÇŞ", ms_res.dc_pick,
ms_res.dc_1x_prob if ms_res.dc_pick == "1X" else (ms_res.dc_x2_prob if ms_res.dc_pick == "X2" else ms_res.dc_12_prob),
ms_res.dc_confidence,
odds_data.get(f"dc_{ms_res.dc_pick.lower()}", 0)),
MarketPredictionDTO("1.5 Üst/Alt", ou_res.ou15_pick,
ou_res.over_15_prob if "Üst" in ou_res.ou15_pick else ou_res.under_15_prob,
ou_res.ou15_confidence, 0),
MarketPredictionDTO("2.5 Üst/Alt", ou_res.ou25_pick,
ou_res.over_25_prob if "Üst" in ou_res.ou25_pick else ou_res.under_25_prob,
ou_res.ou25_confidence,
odds_data.get("ou25_o" if "Üst" in ou_res.ou25_pick else "ou25_u", 0)),
MarketPredictionDTO("3.5 Üst/Alt", ou_res.ou35_pick,
ou_res.over_35_prob if "Üst" in ou_res.ou35_pick else ou_res.under_35_prob,
ou_res.ou35_confidence, 0),
MarketPredictionDTO("BTTS", ou_res.btts_pick,
ou_res.btts_yes_prob if "Var" in ou_res.btts_pick else ou_res.btts_no_prob,
ou_res.btts_confidence,
odds_data.get("btts_y" if "Var" in ou_res.btts_pick else "btts_n", 0)),
]
# Market weights from config (historical accuracy weighting)
market_weights = self.config.get("recommendations.market_weights", {})
default_weight = 1.0
safe_markets = set(self.config.get("recommendations.safe_markets", ["ÇŞ", "1.5 Üst/Alt"]))
risk_level = risk.risk_level
# Confidence calibration (backtest-derived accuracy scaling)
market_accuracy = self.config.get("recommendations.market_accuracy", {})
baseline_accuracy = self.config.get("recommendations.baseline_accuracy", 65.0)
def _calibrated_confidence(m):
"""Scale raw confidence by market's historical accuracy ratio."""
accuracy = market_accuracy.get(m.market_type, baseline_accuracy) if isinstance(market_accuracy, dict) else baseline_accuracy
ratio = accuracy / baseline_accuracy
return m.confidence * ratio
def _score(m):
mw = market_weights.get(m.market_type, default_weight) if isinstance(market_weights, dict) else default_weight
# 1. Base Score: calibrated confidence * market weight
cal_conf = _calibrated_confidence(m)
score = cal_conf * mw
# 2. Value/Edge Bonus
odds_val = m.odds if m.odds is not None else 0.0
if odds_val > 0:
implied = 1.0 / odds_val
edge = (m.probability - implied) * 100
if edge > 0:
score += edge * 4.0
# 3. Risk adjustment
if risk_level in ("HIGH", "EXTREME"):
if m.market_type in safe_markets:
score *= self.config.get("recommendations.risk_safe_boost", 1.2)
elif m.market_type == "MS":
score *= self.config.get("recommendations.risk_ms_penalty_high", 0.5)
else:
score *= self.config.get("recommendations.risk_other_penalty", 0.7)
elif risk_level == "MEDIUM":
if m.market_type == "MS":
score *= self.config.get("recommendations.risk_ms_penalty_medium", 0.8)
# 4. Extreme Confidence Bonus
if cal_conf > 80:
score *= 1.15
return score
recommended = []
value_bets = []
skipped_bets = []
conf_thr = self.config.get("recommendations.confidence_threshold", 60)
val_min = self.config.get("recommendations.value_confidence_min", 45) # Increased from 30
val_max = self.config.get("recommendations.value_confidence_max", 60)
val_margin = self.config.get("recommendations.value_edge_margin", 0.03) # Increased from 0.02
val_upgrade = self.config.get("recommendations.value_upgrade_edge", 5.0)
for m in markets:
# --- SKIP LOGIC (Hard Gate) ---
# 1. Confidence is below market threshold
min_conf = min_conf_thresholds.get(m.market_type, 45.0)
if m.confidence < min_conf:
m.is_skip = True
skipped_bets.append(m)
continue
# 2. Negative Value Edge (Odds are too low for our probability)
if m.odds > 0:
implied = 1.0 / m.odds
edge = m.probability - implied
# If our prob is significantly lower than implied (negative edge > 3%), SKIP
if edge < -0.03:
m.is_skip = True
skipped_bets.append(m)
continue
# --- PROCESS BET ---
# 1. Regular recommended
if m.confidence >= conf_thr:
m.is_recommended = True
recommended.append(m)
# 2. Value bet logic
if m.confidence is not None and val_min <= m.confidence <= val_max and m.odds > 0:
implied = 1.0 / m.odds
if m.probability > (implied + val_margin):
m.is_value_bet = True
m.edge = (m.probability - implied) * 100
if m.edge > val_upgrade:
m.is_recommended = True
recommended.append(m)
else:
value_bets.append(m)
# Best bet (from recommended only)
best_bet = None
if recommended:
# Re-sort only recommended markets to find the best one
valid_markets = [m for m in markets if not m.is_skip and m.is_recommended]
if valid_markets:
valid_markets.sort(key=_score, reverse=True)
best_bet = valid_markets[0]
best_bet.is_recommended = True
# Alternative bet
alternative = None
if risk.is_surprise_risk and ms_res.ms_pick in ["1", "2"]:
# Check if alternative is not skipped
alt_candidate = MarketPredictionDTO(
"2.5 Üst/Alt", ou_res.ou25_pick,
ou_res.over_25_prob if "Üst" in ou_res.ou25_pick else ou_res.under_25_prob,
ou_res.ou25_confidence,
odds_data.get("ou25_o" if "Üst" in ou_res.ou25_pick else "ou25_u", 0)
)
if alt_candidate.confidence >= min_conf_thresholds.get("2.5 Üst/Alt", 45.0):
alternative = alt_candidate
return RecommendationResult(
best_bet=best_bet,
recommended_bets=recommended,
alternative_bet=alternative,
value_bets=value_bets,
skipped_bets=skipped_bets
)
+32
View File
@@ -0,0 +1,32 @@
def calc_confidence_3way(top_prob: float) -> float:
"""Returns the true win probability percentage (e.g. 0.45 -> 45.0)."""
return max(0, min(99.0, top_prob * 100))
def calc_confidence_2way(prob: float) -> float:
"""Returns the true win probability percentage for the favored side."""
# Find the probability of the >0.5 side
win_prob = prob if prob >= 0.5 else (1.0 - prob)
return max(0, min(99.0, win_prob * 100))
def calc_confidence_dc(top_prob: float) -> float:
"""Returns the true win probability percentage for double chance."""
return max(0, min(99.0, top_prob * 100))
def calc_confidence_3way_with_agreement(top_prob: float, agreement_ratio: float,
boost: float = 1.05, penalty: float = 0.95) -> float:
"""
Returns the true win probability percentage, slightly adjusted by engine consensus.
Args:
top_prob: highest probability among options
agreement_ratio: 0.0 to 1.0 — how many engines agree on the pick
"""
base = calc_confidence_3way(top_prob)
# Slight nudge rather than massive swing, to keep it feeling like a true probability
if agreement_ratio >= 0.75:
return min(99.0, base * boost)
elif agreement_ratio <= 0.25:
return max(0.0, base * penalty)
return base
@@ -0,0 +1,131 @@
"""
Expert Recommendation Engine (Senior Level)
============================================
Evaluates ALL markets, classifies by risk, and ensures NO "empty" recommendations.
Prioritizes user safety by clearly labeling risk levels.
"""
from dataclasses import dataclass, field
from typing import List, Optional, Any, Dict
from .base_calculator import BaseCalculator, CalculationContext
from .match_result_calculator import MatchResultPrediction
from .over_under_calculator import OverUnderPrediction
from .risk_assessor import RiskAnalysis
@dataclass
class ExpertPick:
market_type: str
pick: str
probability: float
confidence: float
odds: float
edge: float # Expected value percentage
# Risk Classification
risk_level: str # SAFE, MEDIUM, RISKY, SURPRISE
reasoning: str # Why this pick? (e.g., "High xG support", "Value detected")
@dataclass
class ExpertResult:
main_pick: ExpertPick
safe_alternative: Optional[ExpertPick]
value_picks: List[ExpertPick]
surprise_picks: List[ExpertPick]
market_summary: Dict[str, float] # {market: probability}
class ExpertRecommender(BaseCalculator):
def calculate(self,
ctx: CalculationContext,
ms_res: MatchResultPrediction,
ou_res: OverUnderPrediction,
risk: RiskAnalysis) -> ExpertResult:
odds_data = ctx.odds_data
all_picks: List[ExpertPick] = []
# ─── 1. Helper to Evaluate Pick ───
def evaluate(market: str, pick: str, prob: float, odd_key: str):
odd_val = float(odds_data.get(odd_key, 0))
# If odd is missing/low, estimate it via probability (Kelly-ish estimation)
if odd_val <= 1.01:
odd_val = round(1.0 / (prob + 0.05), 2) # Conservative estimation
reasoning = "Derived (No market odd)"
else:
reasoning = "Market Confirmed"
implied = 1.0 / odd_val
edge = (prob - implied) * 100
# ─── Risk Classification ───
if prob >= 0.75 and odd_val <= 1.45:
level = "SAFE"
elif edge > 5.0:
level = "VALUE"
elif odd_val >= 2.50 and prob >= 0.35:
level = "SURPRISE"
else:
level = "MEDIUM"
all_picks.append(ExpertPick(
market_type=market, pick=pick, probability=prob,
confidence=prob * 100, odds=odd_val, edge=edge,
risk_level=level, reasoning=reasoning
))
# ─── 2. Evaluate All Major Markets ───
# MS
evaluate("MS", ms_res.ms_pick,
ms_res.ms_home_prob if ms_res.ms_pick == "1" else (ms_res.ms_away_prob if ms_res.ms_pick == "2" else ms_res.ms_draw_prob),
f"ms_{ms_res.ms_pick.lower()}")
# Double Chance
evaluate("DC", ms_res.dc_pick,
ms_res.dc_1x_prob if ms_res.dc_pick == "1X" else (ms_res.dc_x2_prob if ms_res.dc_pick == "X2" else ms_res.dc_12_prob),
f"dc_{ms_res.dc_pick.lower()}")
# OU25
evaluate("OU25", ou_res.ou25_pick,
ou_res.over_25_prob if "Üst" in ou_res.ou25_pick else ou_res.under_25_prob,
"ou25_o" if "Üst" in ou_res.ou25_pick else "ou25_u")
# BTTS
evaluate("BTTS", ou_res.btts_pick,
ou_res.btts_yes_prob if "Var" in ou_res.btts_pick else ou_res.btts_no_prob,
"btts_y" if "Var" in ou_res.btts_pick else "btts_n")
# OU15
evaluate("OU15", ou_res.ou15_pick,
ou_res.over_15_prob if "Üst" in ou_res.ou15_pick else ou_res.under_15_prob,
"ou15_o" if "Üst" in ou_res.ou15_pick else "ou15_u")
# ─── 3. Sort and Select ───
# Sort by a mix of Confidence and Edge
all_picks.sort(key=lambda p: (p.probability * 0.6) + (max(0, p.edge/100) * 0.4), reverse=True)
main = all_picks[0]
# Find Safe Alternative (if main isn't Safe)
safe_alt = next((p for p in all_picks if p.risk_level == "SAFE"), None)
if safe_alt == main: safe_alt = None
value_picks = [p for p in all_picks if p.risk_level == "VALUE" and p != main]
surprise_picks = [p for p in all_picks if p.risk_level == "SURPRISE"]
# Market Summary for UI
market_summary = {
"MS_Home": ms_res.ms_home_prob,
"MS_Draw": ms_res.ms_draw_prob,
"MS_Away": ms_res.ms_away_prob,
"OU25_Over": ou_res.over_25_prob,
"BTTS_Yes": ou_res.btts_yes_prob
}
return ExpertResult(
main_pick=main,
safe_alternative=safe_alt,
value_picks=value_picks,
surprise_picks=surprise_picks,
market_summary=market_summary
)
+179
View File
@@ -0,0 +1,179 @@
import math
from dataclasses import dataclass
from .base_calculator import BaseCalculator, CalculationContext
from .confidence import calc_confidence_3way, calc_confidence_2way
@dataclass
class HalfTimePrediction:
ht_home_prob: float
ht_draw_prob: float
ht_away_prob: float
ht_pick: str
ht_confidence: float
ht_over_05_prob: float
ht_under_05_prob: float
ht_over_15_prob: float
ht_under_15_prob: float
ht_ou_pick: str
ht_ou15_pick: str
ht_home_xg: float
ht_away_xg: float
class HalfTimeCalculator(BaseCalculator):
def _poisson_pmf(self, k, lam):
"""Poisson probability mass function."""
if lam <= 0:
return 1.0 if k == 0 else 0.0
return (lam ** k) * math.exp(-lam) / math.factorial(k)
def calculate(self, ctx: CalculationContext) -> HalfTimePrediction:
team_pred = ctx.team_pred
odds_pred = ctx.odds_pred
# Config
ft_to_ht_ratio = self.config.get("half_time.ft_to_ht_ratio", 0.42)
grid_max = self.config.get("half_time.poisson_grid_max", 5)
draw_floor = self.config.get("half_time.ht_draw_floor", 0.35)
low_xg_thr = self.config.get("half_time.low_xg_threshold", 2.0)
low_xg_adj = self.config.get("half_time.low_xg_ratio_adjust", 0.85)
# FT xG (blended team + odds)
ft_home_xg = (team_pred.home_xg + odds_pred.poisson_home_xg) / 2
ft_away_xg = (team_pred.away_xg + odds_pred.poisson_away_xg) / 2
total_ft_xg = ft_home_xg + ft_away_xg
# Dynamic HT ratio: düşük xG maçlarda ratio'yu küçült
# Çünkü düşük gollü maçlarda ilk yarıda gol olma ihtimali daha da düşük
effective_ratio = ft_to_ht_ratio
if total_ft_xg < low_xg_thr:
effective_ratio *= low_xg_adj
# HT xG
ht_home_xg = ft_home_xg * effective_ratio
ht_away_xg = ft_away_xg * effective_ratio
ht_total_xg = ht_home_xg + ht_away_xg
# Compute HT 1X2 via bivariate Poisson grid
ht_home = 0.0
ht_away = 0.0
ht_draw = 0.0
# Also compute O/U while iterating
total_goals_prob = {}
for i in range(grid_max):
for j in range(grid_max):
p = self._poisson_pmf(i, ht_home_xg) * self._poisson_pmf(j, ht_away_xg)
if i > j:
ht_home += p
elif i < j:
ht_away += p
else:
ht_draw += p
total = i + j
total_goals_prob[total] = total_goals_prob.get(total, 0.0) + p
# Draw floor: düşük xG maçlarda beraberlik olasılığını minimum seviyeye çek
if ht_draw < draw_floor:
deficit = draw_floor - ht_draw
ht_draw = draw_floor
# Deficit'i home ve away'den orantılı düş
total_ha = ht_home + ht_away
if total_ha > 0:
ht_home -= deficit * (ht_home / total_ha)
ht_away -= deficit * (ht_away / total_ha)
# Normalize
total_prob = ht_home + ht_draw + ht_away
if total_prob > 0:
ht_home /= total_prob
ht_draw /= total_prob
ht_away /= total_prob
# XGBoost Integration (HT 1X2 and HT/FT Models)
w_xgb = self.config.get("xgboost.weight_ht", 0.60)
xgb_ht_home, xgb_ht_draw, xgb_ht_away = None, None, None
if "ht_result" in ctx.xgboost_preds:
probs = ctx.xgboost_preds["ht_result"]
xgb_ht_home, xgb_ht_draw, xgb_ht_away = probs["home"], probs["draw"], probs["away"]
elif "ht_ft" in ctx.xgboost_preds:
# Fallback to HT/FT marginals
htft_payload = ctx.xgboost_preds.get("ht_ft", {})
probs = None
if isinstance(htft_payload, dict):
labels = ("1/1", "1/X", "1/2", "X/1", "X/X", "X/2", "2/1", "2/X", "2/2")
if all(label in htft_payload for label in labels):
probs = [float(htft_payload[label]) for label in labels]
if probs is None:
probs = ctx.xgboost_preds.get("ht_ft_raw")
if probs is not None and len(probs) == 9:
xgb_ht_home = sum(probs[0:3])
xgb_ht_draw = sum(probs[3:6])
xgb_ht_away = sum(probs[6:9])
if xgb_ht_home is not None:
ht_home = ht_home * (1 - w_xgb) + xgb_ht_home * w_xgb
ht_draw = ht_draw * (1 - w_xgb) + xgb_ht_draw * w_xgb
ht_away = ht_away * (1 - w_xgb) + xgb_ht_away * w_xgb
# Re-normalize
total = ht_home + ht_draw + ht_away
ht_home /= total
ht_draw /= total
ht_away /= total
# HT O/U 0.5
ht_over_05 = 1.0 - math.exp(-ht_total_xg)
if "ht_ou05" in ctx.xgboost_preds:
w_xgb = self.config.get("xgboost.weight_ou", 0.60)
xgb_ht_over_05 = float(ctx.xgboost_preds["ht_ou05"])
ht_over_05 = ht_over_05 * (1 - w_xgb) + xgb_ht_over_05 * w_xgb
ht_over_05_min = self.config.get("half_time.ht_over_05_min", 0.20)
ht_over_05_max = self.config.get("half_time.ht_over_05_max", 0.95)
ht_over_05 = max(ht_over_05_min, min(ht_over_05_max, ht_over_05))
# HT O/U 1.5
# P(total >= 2) = 1 - P(0) - P(1)
ht_over_15 = sum(p for g, p in total_goals_prob.items() if g >= 2)
if "ht_ou15" in ctx.xgboost_preds:
w_xgb = self.config.get("xgboost.weight_ou", 0.60)
xgb_ht_over_15 = float(ctx.xgboost_preds["ht_ou15"])
ht_over_15 = ht_over_15 * (1 - w_xgb) + xgb_ht_over_15 * w_xgb
ht_over_15 = max(0.02, min(0.95, ht_over_15))
# Picks
ht_probs = [(ht_home, "İY 1"), (ht_draw, "İY X"), (ht_away, "İY 2")]
ht_sorted = sorted(ht_probs, key=lambda x: x[0], reverse=True)
ht_pick = ht_sorted[0][1]
ht_confidence = calc_confidence_3way(ht_sorted[0][0])
# HT O/U picks
ht_ou_thr = self.config.get("half_time.ht_ou_threshold", 0.55)
ht_ou_pick = "İY 0.5 Üst" if ht_over_05 > ht_ou_thr else "İY 0.5 Alt"
ht_ou15_pick = "İY 1.5 Üst" if ht_over_15 > 0.45 else "İY 1.5 Alt"
return HalfTimePrediction(
ht_home_prob=ht_home,
ht_draw_prob=ht_draw,
ht_away_prob=ht_away,
ht_pick=ht_pick,
ht_confidence=ht_confidence,
ht_over_05_prob=ht_over_05,
ht_under_05_prob=1.0 - ht_over_05,
ht_over_15_prob=ht_over_15,
ht_under_15_prob=1.0 - ht_over_15,
ht_ou_pick=ht_ou_pick,
ht_ou15_pick=ht_ou15_pick,
ht_home_xg=ht_home_xg,
ht_away_xg=ht_away_xg
)
+142
View File
@@ -0,0 +1,142 @@
from dataclasses import dataclass
from typing import Dict, Any, List
from .base_calculator import BaseCalculator, CalculationContext
from .confidence import calc_confidence_3way_with_agreement, calc_confidence_dc
@dataclass
class MatchResultPrediction:
ms_home_prob: float
ms_draw_prob: float
ms_away_prob: float
ms_pick: str
ms_confidence: float
dc_1x_prob: float
dc_x2_prob: float
dc_12_prob: float
dc_pick: str
dc_confidence: float
class MatchResultCalculator(BaseCalculator):
def _get_engine_winner(self, home_prob: float, draw_prob: float, away_prob: float) -> str:
"""Determine which outcome an engine favors."""
probs = {"1": home_prob, "X": draw_prob, "2": away_prob}
return max(probs, key=probs.get)
def calculate(self, ctx: CalculationContext) -> MatchResultPrediction:
# Weights
w_team = ctx.weights["team"]
w_player = ctx.weights["player"]
w_odds = ctx.weights["odds"]
w_referee = ctx.weights["referee"]
# Engine predictions
team_pred = ctx.team_pred
odds_pred = ctx.odds_pred
player_mods = ctx.player_mods
referee_mods = ctx.referee_mods
# Weighted ensemble for 1X2
ms_home = (
team_pred.home_win_prob * w_team +
odds_pred.market_home_prob * w_odds +
team_pred.home_win_prob * player_mods["home_modifier"] * w_player +
odds_pred.market_home_prob * referee_mods["home_modifier"] * w_referee
)
ms_away = (
team_pred.away_win_prob * w_team +
odds_pred.market_away_prob * w_odds +
team_pred.away_win_prob * player_mods["away_modifier"] * w_player +
odds_pred.market_away_prob / referee_mods["home_modifier"] * w_referee
)
ms_draw = 1.0 - ms_home - ms_away
# XGBoost Integration
if "ms" in ctx.xgboost_preds:
xgb_probs = ctx.xgboost_preds["ms"]
w_xgb = self.config.get("xgboost.weight_ms", 0.70)
w_heuristic = 1.0 - w_xgb
ms_home = ms_home * w_heuristic + xgb_probs["home"] * w_xgb
ms_draw = ms_draw * w_heuristic + xgb_probs["draw"] * w_xgb
ms_away = ms_away * w_heuristic + xgb_probs["away"] * w_xgb
# Re-normalize
total = ms_home + ms_draw + ms_away
ms_home /= total
ms_draw /= total
ms_away /= total
# Min draw probability clamping
min_draw = self.config.get("match_result.min_draw_prob", 0.15)
if ms_draw < min_draw:
ms_draw = min_draw
total = ms_home + ms_away + ms_draw
ms_home /= total
ms_away /= total
ms_draw /= total
# Double Chance
dc_1x = ms_home + ms_draw
dc_x2 = ms_draw + ms_away
dc_12 = ms_home + ms_away
# MS pick
ms_probs = [(ms_home, "1"), (ms_draw, "X"), (ms_away, "2")]
ms_sorted = sorted(ms_probs, key=lambda x: x[0], reverse=True)
ms_pick = ms_sorted[0][1]
# === ENGINE AGREEMENT ===
# Determine each engine's winner and calculate agreement ratio
team_winner = self._get_engine_winner(
team_pred.home_win_prob, team_pred.draw_prob, team_pred.away_win_prob
)
odds_winner = self._get_engine_winner(
odds_pred.market_home_prob, odds_pred.market_draw_prob, odds_pred.market_away_prob
)
# Player-modified: team probs * player modifiers
player_adj_home = team_pred.home_win_prob * player_mods["home_modifier"]
player_adj_away = team_pred.away_win_prob * player_mods["away_modifier"]
player_adj_draw = max(0.01, 1.0 - player_adj_home - player_adj_away)
player_winner = self._get_engine_winner(player_adj_home, player_adj_draw, player_adj_away)
# Referee-modified: odds probs * referee modifiers
ref_adj_home = odds_pred.market_home_prob * referee_mods["home_modifier"]
ref_adj_away = odds_pred.market_away_prob / referee_mods["home_modifier"]
ref_adj_draw = max(0.01, 1.0 - ref_adj_home - ref_adj_away)
referee_winner = self._get_engine_winner(ref_adj_home, ref_adj_draw, ref_adj_away)
# Count how many engines agree with final pick
engines = [team_winner, odds_winner, player_winner, referee_winner]
agreement_count = sum(1 for e in engines if e == ms_pick)
agreement_ratio = agreement_count / len(engines)
# Confidence with agreement
boost = self.config.get("confidence.agreement_boost", 1.3)
penalty = self.config.get("confidence.disagreement_penalty", 0.7)
ms_confidence = calc_confidence_3way_with_agreement(
ms_sorted[0][0], agreement_ratio, boost, penalty
)
# DC pick
dc_probs = [(dc_1x, "1X"), (dc_x2, "X2"), (dc_12, "12")]
dc_sorted = sorted(dc_probs, key=lambda x: x[0], reverse=True)
dc_pick = dc_sorted[0][1]
dc_confidence = calc_confidence_dc(dc_sorted[0][0])
return MatchResultPrediction(
ms_home_prob=ms_home,
ms_draw_prob=ms_draw,
ms_away_prob=ms_away,
ms_pick=ms_pick,
ms_confidence=ms_confidence,
dc_1x_prob=dc_1x,
dc_x2_prob=dc_x2,
dc_12_prob=dc_12,
dc_pick=dc_pick,
dc_confidence=dc_confidence
)
@@ -0,0 +1,56 @@
from dataclasses import dataclass
from typing import Dict, Tuple
@dataclass
class AnomalyResult:
is_anomaly: bool
side: str = ""
severity: float = 0.0
reason: str = ""
class OddsAnomalyDetector:
"""
Detects mismatches between bookmaker odds and underlying team metrics.
A 'Bookmaker Trap' is when a team has very low odds (heavy favorite)
but their xG/defense metrics are surprisingly poor.
"""
def __init__(self, config: Dict):
self.config = config
# Thresholds
self.fav_odds_threshold = self.config.get("anomaly.fav_odds_threshold", 1.75)
self.min_xg_for_fav = self.config.get("anomaly.min_xg_for_fav", 1.25)
self.max_conceded_for_fav = self.config.get("anomaly.max_conceded_for_fav", 1.30)
self.opp_min_xg_threat = self.config.get("anomaly.opp_min_xg_threat", 1.10)
def detect_trap(self,
odds_data: Dict[str, float],
home_xg: float,
away_xg: float,
home_conceded_avg: float,
away_conceded_avg: float) -> tuple[bool, AnomalyResult]:
"""
Check if the match is a potential odds trap.
Returns: (has_trap, AnomalyResult)
"""
ms_h = odds_data.get("ms_h", 0.0)
ms_a = odds_data.get("ms_a", 0.0)
# Check Home Favorite Trap
if 1.0 < ms_h <= self.fav_odds_threshold:
# Home is favored. Check metrics.
if home_xg < self.min_xg_for_fav and (away_xg > self.opp_min_xg_threat or home_conceded_avg > self.max_conceded_for_fav):
severity = (self.fav_odds_threshold - ms_h) + (self.min_xg_for_fav - home_xg)
reason = f"🚨 ODDS ANOMALY (TRAP): Home odds ({ms_h}) suspiciously low despite poor metrics (xG: {round(home_xg, 2)}, Conceded: {round(home_conceded_avg, 2)})"
return True, AnomalyResult(True, "H", min(10.0, severity * 2), reason)
# Check Away Favorite Trap
if 1.0 < ms_a <= self.fav_odds_threshold:
# Away is favored. Check metrics
if away_xg < self.min_xg_for_fav and (home_xg > self.opp_min_xg_threat or away_conceded_avg > self.max_conceded_for_fav):
severity = (self.fav_odds_threshold - ms_a) + (self.min_xg_for_fav - away_xg)
reason = f"🚨 ODDS ANOMALY (TRAP): Away odds ({ms_a}) suspiciously low despite poor metrics (xG: {round(away_xg, 2)}, Conceded: {round(away_conceded_avg, 2)})"
return True, AnomalyResult(True, "A", min(10.0, severity * 2), reason)
return False, AnomalyResult(False)
+115
View File
@@ -0,0 +1,115 @@
from dataclasses import dataclass
import math
from .base_calculator import BaseCalculator, CalculationContext
from .match_result_calculator import MatchResultPrediction
@dataclass
class OtherMarketsPrediction:
total_corners_pred: float
corner_pick: str | None
total_cards_pred: float
card_pick: str
cards_over_prob: float
cards_under_prob: float
cards_confidence: float
handicap_pick: str
handicap_home_prob: float
handicap_draw_prob: float
handicap_away_prob: float
handicap_confidence: float
odd_even_pick: str
odd_prob: float
even_prob: float
class OtherMarketsCalculator(BaseCalculator):
def calculate(
self,
ctx: CalculationContext,
ms_result: MatchResultPrediction,
) -> OtherMarketsPrediction:
if "handicap_ms" in ctx.xgboost_preds:
handicap_payload = ctx.xgboost_preds["handicap_ms"]
handicap_home_prob = float(handicap_payload.get("h1", 0.33))
handicap_draw_prob = float(handicap_payload.get("hx", 0.34))
handicap_away_prob = float(handicap_payload.get("h2", 0.33))
else:
xg_diff = ctx.home_xg - ctx.away_xg
threshold = float(self.config.get("handicap.xg_diff_threshold", 1.2))
if xg_diff > threshold:
handicap_home_prob, handicap_draw_prob, handicap_away_prob = 0.58, 0.24, 0.18
elif xg_diff < -threshold:
handicap_home_prob, handicap_draw_prob, handicap_away_prob = 0.18, 0.24, 0.58
else:
handicap_home_prob, handicap_draw_prob, handicap_away_prob = 0.28, 0.44, 0.28
handicap_confidence = max(
handicap_home_prob,
handicap_draw_prob,
handicap_away_prob,
) * 100.0
if handicap_home_prob >= handicap_draw_prob and handicap_home_prob >= handicap_away_prob:
handicap_pick = "H 1 (Ev -1)"
elif handicap_away_prob >= handicap_home_prob and handicap_away_prob >= handicap_draw_prob:
handicap_pick = "H 2 (Dep -1)"
else:
handicap_pick = "H 0 (Beraberlik)"
total_corners = 0.0
corner_pick = None
card_line = float(self.config.get("cards.line", 4.5))
if "cards_ou45" in ctx.xgboost_preds:
cards_over_prob = float(ctx.xgboost_preds["cards_ou45"])
total_cards = 5.0 if cards_over_prob > 0.50 else 3.5
else:
referee_average = float(ctx.referee_pred.avg_yellow_cards)
match_heat = 1.0
is_derby = bool(
ctx.upset_factors.reasoning
and "DERBY" in str(ctx.upset_factors.reasoning[0]),
)
if is_derby:
match_heat = float(self.config.get("cards.derby_heat_factor", 1.3))
total_cards = referee_average * match_heat
delta = total_cards - card_line
cards_over_prob = 1.0 / (1.0 + math.exp(-delta * 0.9))
cards_over_prob = max(0.02, min(0.98, cards_over_prob))
cards_under_prob = 1.0 - cards_over_prob
cards_confidence = max(cards_over_prob, cards_under_prob) * 100.0
card_pick = f"{card_line} Ust" if cards_over_prob > 0.50 else f"{card_line} Alt"
lambda_total = ctx.total_xg
even_prob = math.exp(-lambda_total) * math.cosh(lambda_total)
if "odd_even" in ctx.xgboost_preds:
xgb_weight = float(self.config.get("xgboost.weight_ou", 0.60))
xgb_even_prob = float(ctx.xgboost_preds["odd_even"])
even_prob = even_prob * (1 - xgb_weight) + xgb_even_prob * xgb_weight
even_prob = max(0.02, min(0.98, even_prob))
odd_prob = 1.0 - even_prob
odd_even_pick = "Cift" if even_prob > 0.5 else "Tek"
return OtherMarketsPrediction(
total_corners_pred=total_corners,
corner_pick=corner_pick,
total_cards_pred=total_cards,
card_pick=card_pick,
cards_over_prob=cards_over_prob,
cards_under_prob=cards_under_prob,
cards_confidence=cards_confidence,
handicap_pick=handicap_pick,
handicap_home_prob=handicap_home_prob,
handicap_draw_prob=handicap_draw_prob,
handicap_away_prob=handicap_away_prob,
handicap_confidence=handicap_confidence,
odd_even_pick=odd_even_pick,
odd_prob=odd_prob,
even_prob=even_prob,
)
+174
View File
@@ -0,0 +1,174 @@
import math
from dataclasses import dataclass
from .base_calculator import BaseCalculator, CalculationContext
from .confidence import calc_confidence_2way
@dataclass
class OverUnderPrediction:
over_15_prob: float
under_15_prob: float
ou15_pick: str
ou15_confidence: float
over_25_prob: float
under_25_prob: float
ou25_pick: str
ou25_confidence: float
over_35_prob: float
under_35_prob: float
ou35_pick: str
ou35_confidence: float
btts_yes_prob: float
btts_no_prob: float
btts_pick: str
btts_confidence: float
class OverUnderCalculator(BaseCalculator):
def _poisson_pmf(self, k: int, lam: float) -> float:
if lam <= 0:
return 1.0 if k == 0 else 0.0
return (lam ** k) * math.exp(-lam) / math.factorial(k)
def _poisson_ou_probs(self, home_xg: float, away_xg: float, grid_max: int = 6):
"""Bivariate Poisson grid → O/U probabilities."""
total_goals_prob = {} # total_goals → cumulative probability
for i in range(grid_max):
for j in range(grid_max):
p = self._poisson_pmf(i, home_xg) * self._poisson_pmf(j, away_xg)
total = i + j
total_goals_prob[total] = total_goals_prob.get(total, 0.0) + p
# Cumulative
over_15 = sum(p for g, p in total_goals_prob.items() if g >= 2)
over_25 = sum(p for g, p in total_goals_prob.items() if g >= 3)
over_35 = sum(p for g, p in total_goals_prob.items() if g >= 4)
# BTTS: P(home >= 1) * P(away >= 1)
p_home_0 = self._poisson_pmf(0, home_xg)
p_away_0 = self._poisson_pmf(0, away_xg)
btts_yes = (1 - p_home_0) * (1 - p_away_0)
return over_15, over_25, over_35, btts_yes
def calculate(self, ctx: CalculationContext) -> OverUnderPrediction:
odds_pred = ctx.odds_pred
referee_mods = ctx.referee_mods
# Config
prob_min = self.config.get("over_under.prob_min", 0.02)
prob_max = self.config.get("over_under.prob_max", 0.98)
blend_w = self.config.get("over_under.poisson_blend_weight", 0.4)
grid_max = self.config.get("over_under.poisson_grid_max", 6)
ou15_thr = self.config.get("over_under.ou15_threshold", 0.55)
ou25_thr = self.config.get("over_under.ou25_threshold", 0.52)
ou35_thr = self.config.get("over_under.ou35_threshold", 0.48)
btts_thr = self.config.get("over_under.btts_threshold", 0.58)
# 1. Poisson-based O/U from context xG (team + odds average)
p_over_15, p_over_25, p_over_35, p_btts = self._poisson_ou_probs(
ctx.home_xg, ctx.away_xg, int(grid_max)
)
# 2. Odds-based O/U (from odds engine Poisson)
o_over_15 = odds_pred.over_15_prob
o_over_25 = odds_pred.over_25_prob
o_over_35 = odds_pred.over_35_prob
o_btts = odds_pred.btts_yes_prob
# 3. Blend: poisson xG + odds Poisson
# Odds engine already uses Poisson internally, so keep blend weight low
# to avoid double-counting. Use majority odds weight for established markets.
over_15 = p_over_15 * blend_w + o_over_15 * (1 - blend_w)
over_25 = p_over_25 * blend_w + o_over_25 * (1 - blend_w)
over_35 = p_over_35 * blend_w + o_over_35 * (1 - blend_w)
# BTTS: keep primarily from odds engine (it was 63.6% accurate before)
# Only a small Poisson contribution to cross-validate
btts_blend = min(blend_w, 0.2)
btts_yes = p_btts * btts_blend + o_btts * (1 - btts_blend)
# XGBoost Integration (High Weight)
w_xgb = self.config.get("xgboost.weight_ou", 0.70)
if "ou25" in ctx.xgboost_preds:
over_25 = over_25 * (1 - w_xgb) + ctx.xgboost_preds["ou25"] * w_xgb
if "ou15" in ctx.xgboost_preds:
over_15 = over_15 * (1 - w_xgb) + ctx.xgboost_preds["ou15"] * w_xgb
if "ou35" in ctx.xgboost_preds:
over_35 = over_35 * (1 - w_xgb) + ctx.xgboost_preds["ou35"] * w_xgb
# BTTS: lower XGBoost weight (was 0.70) — Poisson/odds fundamentals matter more
w_xgb_btts = self.config.get("xgboost.weight_btts", 0.45)
if "btts" in ctx.xgboost_preds:
btts_yes = btts_yes * (1 - w_xgb_btts) + ctx.xgboost_preds["btts"] * w_xgb_btts
# 4. Referee modifier (only applied to goal totals, not BTTS)
ou_mod = referee_mods.get("over_25_modifier", 1.0)
over_15 *= ou_mod
over_25 *= ou_mod
over_35 *= ou_mod
# 5. Clamp
over_15 = max(prob_min, min(prob_max, over_15))
over_25 = max(prob_min, min(prob_max, over_25))
over_35 = max(prob_min, min(prob_max, over_35))
btts_yes = max(prob_min, min(prob_max, btts_yes))
# Picks & Confidence
ou15_pick = "Üst 1.5" if over_15 > ou15_thr else "Alt 1.5"
ou15_conf = calc_confidence_2way(over_15)
ou25_pick = "Üst 2.5" if over_25 > ou25_thr else "Alt 2.5"
ou25_conf = calc_confidence_2way(over_25)
ou35_pick = "Üst 3.5" if over_35 > ou35_thr else "Alt 3.5"
ou35_conf = calc_confidence_2way(over_35)
btts_pick = "KG Var" if btts_yes > btts_thr else "KG Yok"
btts_conf = calc_confidence_2way(btts_yes)
# --- SAFE BTTS PENALTY (v2 — tighter thresholds) ---
# Penalize BTTS confidence when fundamentals don't strongly support the pick.
try:
home_conceded = ctx.team_pred.raw_features.get("home_conceded_avg", 1.0)
away_conceded = ctx.team_pred.raw_features.get("away_conceded_avg", 1.0)
if btts_pick == "KG Var":
# "Var" needs BOTH teams to score → requires strong attack OR leaky defense
# Penalty if either xG is low AND defenses are solid
weak_attack = ctx.home_xg < 1.30 or ctx.away_xg < 1.15
solid_defense = home_conceded < 1.15 or away_conceded < 1.15
if weak_attack and solid_defense:
btts_conf *= 0.3
else: # KG Yok
# "Yok" needs at least one team to fail scoring
# Penalty if both have good xG AND both defenses are leaky
if ctx.home_xg >= 1.30 and ctx.away_xg >= 1.15 and home_conceded >= 1.20 and away_conceded >= 1.20:
btts_conf *= 0.3
except Exception as e:
print(f"⚠️ Safe BTTS Check Error: {e}")
pass
return OverUnderPrediction(
over_15_prob=over_15, under_15_prob=1-over_15,
ou15_pick=ou15_pick, ou15_confidence=ou15_conf,
over_25_prob=over_25, under_25_prob=1-over_25,
ou25_pick=ou25_pick, ou25_confidence=ou25_conf,
over_35_prob=over_35, under_35_prob=1-over_35,
ou35_pick=ou35_pick, ou35_confidence=ou35_conf,
btts_yes_prob=btts_yes, btts_no_prob=1-btts_yes,
btts_pick=btts_pick, btts_confidence=btts_conf
)
+278
View File
@@ -0,0 +1,278 @@
from dataclasses import dataclass, field
from typing import Dict, Any, List, Tuple
from .base_calculator import BaseCalculator, CalculationContext
from .odds_anomaly_detector import OddsAnomalyDetector
@dataclass
class RiskAnalysis:
risk_score: float
risk_level: str
is_surprise_risk: bool
reasons: List[str] = field(default_factory=list)
surprise_type: str = ""
risk_warnings: List[str] = field(default_factory=list)
class RiskAssessor(BaseCalculator):
"""
Assesses risk level of the match based on context and predictions.
"""
def __init__(self, config: Dict):
super().__init__(config)
self.anomaly_detector = OddsAnomalyDetector(config)
@staticmethod
def _safe_odd(value: Any) -> float:
try:
odd = float(value)
return odd if odd > 1.01 else 0.0
except (TypeError, ValueError):
return 0.0
def _favorite_profile_from_odds(self, odds_data: Dict[str, float]) -> Tuple[str, float]:
"""
Returns (favorite_side, gap_to_second_favorite).
favorite_side: H, A, D, or U (unknown)
"""
ms_h = self._safe_odd((odds_data or {}).get("ms_h"))
ms_d = self._safe_odd((odds_data or {}).get("ms_d"))
ms_a = self._safe_odd((odds_data or {}).get("ms_a"))
candidates = [(side, odd) for side, odd in (("H", ms_h), ("D", ms_d), ("A", ms_a)) if odd > 0.0]
if len(candidates) < 2:
return "U", 0.0
candidates.sort(key=lambda item: item[1])
favorite_side, favorite_odd = candidates[0]
second_odd = candidates[1][1]
return favorite_side, max(0.0, second_odd - favorite_odd)
def _dynamic_reversal_threshold(
self,
ctx: CalculationContext,
top_label: str,
) -> float:
"""
Dynamic threshold for reversal surprise flags.
Lower threshold => easier to trigger surprise.
"""
base_threshold = float(self.config.get("risk.surprise_threshold", 0.20))
sport_key = (ctx.sport or "football").lower().strip()
is_top_league = bool(getattr(ctx, "is_top_league", False))
if not is_top_league:
base_threshold = float(
self.config.get("risk.surprise_threshold_non_top", base_threshold + 0.04),
)
if sport_key == "basketball":
if is_top_league:
return float(
self.config.get("risk.surprise_threshold_basketball_top", self.config.get("risk.surprise_threshold_basketball", 0.30)),
)
return float(
self.config.get("risk.surprise_threshold_basketball_non_top", 0.34),
)
if top_label not in ("1/2", "2/1"):
return base_threshold
winner_side = "A" if top_label == "1/2" else "H"
favorite_side, gap = self._favorite_profile_from_odds(ctx.odds_data)
if is_top_league:
favorite_winner_threshold = float(
self.config.get(
"risk.surprise_threshold_favorite_reversal_top",
self.config.get("risk.surprise_threshold_favorite_reversal", 0.26),
),
)
underdog_winner_threshold = float(
self.config.get(
"risk.surprise_threshold_underdog_reversal_top",
self.config.get("risk.surprise_threshold_underdog_reversal", 0.20),
),
)
else:
favorite_winner_threshold = float(
self.config.get("risk.surprise_threshold_favorite_reversal_non_top", 0.30),
)
underdog_winner_threshold = float(
self.config.get("risk.surprise_threshold_underdog_reversal_non_top", 0.24),
)
gap_medium = float(self.config.get("risk.htft_reversal_gap_medium", 0.50))
gap_strong = float(self.config.get("risk.htft_reversal_gap_strong", 1.00))
if favorite_side in ("H", "A"):
threshold = (
favorite_winner_threshold
if winner_side == favorite_side
else underdog_winner_threshold
)
if winner_side != favorite_side and gap >= gap_strong:
threshold += 0.03
elif winner_side != favorite_side and gap >= gap_medium:
threshold += 0.015
return threshold
return base_threshold
def calculate(self, ctx: CalculationContext, ms_result=None) -> RiskAnalysis:
"""
Wrapper for assess_risk to match BaseCalculator interface but with extra arg.
"""
return self.assess_risk(ctx)
def assess_risk(self, ctx: CalculationContext) -> RiskAnalysis:
"""
Calculate risk score and level.
Returns RiskAnalysis object.
"""
score = 5.0
reasons = []
is_surprise = ctx.is_surprise
surprise_type = ""
# 1. League deviation (from UpsetEngine)
if ctx.is_surprise:
score += 2.0
reasons.append("High Upset Potential detected by UpsetEngine")
# 1.5 Odds Anomaly Detection
try:
home_conceded = ctx.team_pred.raw_features.get("home_conceded_avg", 1.0)
away_conceded = ctx.team_pred.raw_features.get("away_conceded_avg", 1.0)
has_anomaly, anomaly_res = self.anomaly_detector.detect_trap(
ctx.odds_data,
ctx.home_xg,
ctx.away_xg,
home_conceded,
away_conceded
)
if has_anomaly:
is_surprise = True
score += anomaly_res.severity + 2.0
surprise_type = "Bookmaker Trap"
reasons.append(anomaly_res.reason)
except Exception as e:
print(f"⚠️ Odds Anomaly Detection Error: {e}")
pass
# 2. HT/FT Surprise Hunter (XGBoost)
# We look for 1/2 (idx 2) and 2/1 (idx 6) from the V20 HT/FT model
if "ht_ft" in ctx.xgboost_preds:
ht_ft = ctx.xgboost_preds["ht_ft"]
valid_items = [(k, float(v)) for k, v in ht_ft.items() if isinstance(v, (int, float))]
if valid_items:
ranked = sorted(valid_items, key=lambda item: item[1], reverse=True)
top_label, top_prob = ranked[0]
second_prob = ranked[1][1] if len(ranked) > 1 else 0.0
top_gap = top_prob - second_prob
threshold = self._dynamic_reversal_threshold(ctx, top_label)
if getattr(ctx, "is_top_league", False):
min_gap = float(self.config.get("risk.surprise_min_top_gap_top", self.config.get("risk.surprise_min_top_gap", 0.02)))
else:
min_gap = float(self.config.get("risk.surprise_min_top_gap_non_top", 0.03))
# Trigger surprise only when reversal class is:
# - top HT/FT outcome
# - above dynamic threshold
# - separated from second class with a minimum gap
if top_label in ("1/2", "2/1") and top_prob > threshold and top_gap > min_gap:
is_surprise = True
score += 3.0
surprise_type = f"{top_label} Reversal"
reasons.append(
f"🔥 Surprise Hunter: {top_label} potential ({round(top_prob*100, 1)}%, gap {round(top_gap*100, 1)}pp)"
)
# NEW: Potential Upset Alert - even if reversal is not the top prediction
# This catches cases like Bayern vs Augsburg where 1/2 was only 2% but it happened
favorite_side, gap = self._favorite_profile_from_odds(ctx.odds_data)
# Get reversal probabilities
prob_12 = float(ht_ft.get("1/2", 0))
prob_21 = float(ht_ft.get("2/1", 0))
# DYNAMIC threshold based on odds - stronger favorite = lower threshold
# When home odds are 1.30, even 1% reversal probability is significant
base_threshold = float(self.config.get("risk.upset_alert_threshold", 0.05))
# Calculate dynamic threshold based on favorite strength
if favorite_side == "H":
home_odds = float(ctx.odds_data.get("ms_h", 2.0))
# Stronger favorite (lower odds) = lower threshold
# 1.20 odds -> 0.01 threshold, 1.50 odds -> 0.03 threshold, 2.0+ odds -> base threshold
if home_odds <= 1.25:
dynamic_threshold = 0.01 # 1% - extremely strong favorite
elif home_odds <= 1.40:
dynamic_threshold = 0.015 # 1.5% - very strong favorite
elif home_odds <= 1.60:
dynamic_threshold = 0.02 # 2% - strong favorite
elif home_odds < 2.00:
dynamic_threshold = 0.03 # 3% - moderate favorite
else:
dynamic_threshold = base_threshold
elif favorite_side == "A":
away_odds = float(ctx.odds_data.get("ms_a", 2.0))
if away_odds <= 1.25:
dynamic_threshold = 0.01
elif away_odds <= 1.40:
dynamic_threshold = 0.015
elif away_odds <= 1.60:
dynamic_threshold = 0.02
elif away_odds < 2.00:
dynamic_threshold = 0.03
else:
dynamic_threshold = base_threshold
else:
dynamic_threshold = base_threshold
# Check for potential upset based on favorite
if favorite_side == "H" and prob_12 > dynamic_threshold:
# Home favorite, but 1/2 (home leads HT, away wins FT) has potential
is_surprise = True
score += 2.0
surprise_type = "1/2 Potential Upset"
reasons.append(
f"⚠️ UPSET ALERT: Home favorite ({ctx.odds_data.get('ms_h', 'N/A')}) but 1/2 reversal risk ({round(prob_12*100, 1)}% > {round(dynamic_threshold*100, 1)}% threshold)"
)
elif favorite_side == "A" and prob_21 > dynamic_threshold:
# Away favorite, but 2/1 (away leads HT, home wins FT) has potential
is_surprise = True
score += 2.0
surprise_type = "2/1 Potential Upset"
reasons.append(
f"⚠️ UPSET ALERT: Away favorite ({ctx.odds_data.get('ms_a', 'N/A')}) but 2/1 reversal risk ({round(prob_21*100, 1)}% > {round(dynamic_threshold*100, 1)}% threshold)"
)
elif gap > 0.5 and (prob_12 > dynamic_threshold or prob_21 > dynamic_threshold):
# Strong favorite (big odds gap) with any reversal potential
reversal_type = "1/2" if prob_12 > prob_21 else "2/1"
reversal_prob = max(prob_12, prob_21)
is_surprise = True
score += 1.5
surprise_type = f"{reversal_type} Potential Upset"
reasons.append(
f"⚠️ UPSET ALERT: Strong favorite (gap {round(gap, 2)}) with {reversal_type} risk ({round(reversal_prob*100, 1)}%)"
)
# Determine level
if score < 4.0:
level = "LOW"
elif score < 7.0:
level = "MEDIUM"
elif score < 9.0:
level = "HIGH"
else:
level = "EXTREME"
return RiskAnalysis(
risk_score=score,
risk_level=level,
is_surprise_risk=is_surprise,
surprise_type=surprise_type,
reasons=reasons
)
+229
View File
@@ -0,0 +1,229 @@
import os
import pickle
import pandas as pd
import xgboost as xgb
from dataclasses import dataclass
from typing import List, Dict, Tuple
import math
from .base_calculator import BaseCalculator, CalculationContext
from .confidence import calc_confidence_3way, calc_confidence_dc
from .match_result_calculator import MatchResultPrediction
@dataclass
class ScorePrediction:
predicted_ft_score: str
predicted_ht_score: str
ft_scores_top5: List[Dict]
# Reconciled MS/DC predictions (can be updated here)
reconciled_ms: MatchResultPrediction = None
class ScoreCalculator(BaseCalculator):
def __init__(self, config: Dict):
super().__init__(config)
self.xgb_home = None
self.xgb_away = None
self.xgb_ht_home = None
self.xgb_ht_away = None
self.scaler = None # If used
self.features = []
self._load_model()
def _load_model(self):
try:
model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "models", "xgb_score.pkl")
if os.path.exists(model_path):
with open(model_path, "rb") as f:
data = pickle.load(f)
# Handle both dictionary and direct model formats (just in case)
if isinstance(data, dict):
self.xgb_home = data.get("home_model")
self.xgb_away = data.get("away_model")
self.xgb_ht_home = data.get("ht_home_model")
self.xgb_ht_away = data.get("ht_away_model")
self.features = data.get("features", [])
else:
print("⚠️ Unexpected XGB score model format.")
print("✅ XGBoost Score Model loaded.")
else:
print(f"⚠️ XGBoost Score Model not found at {model_path}")
except Exception as e:
print(f"❌ Error loading XGBoost Score Model: {e}")
def _poisson_pmf(self, k, lam):
"""Poisson probability mass function."""
if lam <= 0:
return 1.0 if k == 0 else 0.0
return (lam ** k) * math.exp(-lam) / math.factorial(k)
def calculate(self, ctx: CalculationContext, ms_result: MatchResultPrediction) -> ScorePrediction:
# Default Lambdas (fallback)
lambda_home = max(0.5, ctx.home_xg)
lambda_away = max(0.5, ctx.away_xg)
# --- XGBOOST PREDICTION ---
if self.xgb_home and self.xgb_away and hasattr(ctx.team_pred, "raw_features"):
try:
# 1. Prepare Features
# We need to map ctx data to self.features list columns
raw = ctx.team_pred.raw_features
odds = ctx.odds_data or {}
# Use unified feature adapter for exact 56-feature sync
from features.feature_adapter import get_feature_adapter
df_input = get_feature_adapter().get_features(ctx)
# Predict FT
pred_h = self.xgb_home.predict(df_input)[0]
pred_a = self.xgb_away.predict(df_input)[0]
# Predict HT (if available)
if self.xgb_ht_home and self.xgb_ht_away:
pred_ht_h = self.xgb_ht_home.predict(df_input)[0]
pred_ht_a = self.xgb_ht_away.predict(df_input)[0]
# Clamp HT predictions (min 0, and shouldn't exceed FT in logic, but models are independent)
# We trust the model but ensure sanity (HT <= FT is hard to enforce without joint training, but usually holds)
ht_h_val = max(0.0, float(pred_ht_h))
ht_a_val = max(0.0, float(pred_ht_a))
predicted_ht = f"{round(ht_h_val)}-{round(ht_a_val)}"
else:
# Fallback if HT models missing
ht_h_val = max(0.0, float(pred_h) * 0.42)
ht_a_val = max(0.0, float(pred_a) * 0.42)
predicted_ht = f"{round(ht_h_val)}-{round(ht_a_val)}"
# Update lambdas with ML predictions
lambda_home = max(0.1, min(6.0, float(pred_h)))
lambda_away = max(0.1, min(6.0, float(pred_a)))
# Store raw XGB preds in context
ctx.xgboost_preds["score"] = {
"home": lambda_home,
"away": lambda_away,
"ht_home": ht_h_val,
"ht_away": ht_a_val
}
except Exception as e:
print(f"⚠️ XGBoost Score Prediction failed: {e}. Falling back to Poisson xG.")
# Fallback to current simple logic if ML fails
predicted_ht = f"{round(lambda_home * 0.42)}-{round(lambda_away * 0.42)}"
# --- POISSON GRID GENERATION ---
# Now use lambda_home/away (either ML or fallback) to generate grid
score_probs = {}
grid_max = self.config.get("score.poisson_grid_max", 7)
for i in range(grid_max):
for j in range(grid_max):
p = self._poisson_pmf(i, lambda_home) * self._poisson_pmf(j, lambda_away)
score_probs[f"{i}-{j}"] = round(p * 100, 2)
sorted_scores = sorted(score_probs.items(), key=lambda x: x[1], reverse=True)
# --- DERIVE MS PROBS FROM SCORES (CONSISTENCY CHECK) ---
poisson_ms_home = sum(p for s, p in score_probs.items()
for h, a in [s.split("-")] if int(h) > int(a))
poisson_ms_away = sum(p for s, p in score_probs.items()
for h, a in [s.split("-")] if int(h) < int(a))
poisson_ms_draw = sum(p for s, p in score_probs.items()
for h, a in [s.split("-")] if int(h) == int(a))
# Normalize
poisson_total = poisson_ms_home + poisson_ms_away + poisson_ms_draw
if poisson_total > 0:
poisson_ms_home /= poisson_total
poisson_ms_away /= poisson_total
poisson_ms_draw /= poisson_total
# --- HYBRID RECONCILIATION ---
threshold = self.config.get("score.ms_confidence_threshold", 15.0)
reconciled_result = ms_result
# If original confidence is low, trust new Score Model more
if ms_result.ms_confidence < threshold:
poisson_probs = [(poisson_ms_home, "1"), (poisson_ms_draw, "X"), (poisson_ms_away, "2")]
poisson_sorted = sorted(poisson_probs, key=lambda x: x[0], reverse=True)
new_ms_pick = poisson_sorted[0][1]
new_ms_conf = calc_confidence_3way(poisson_sorted[0][0])
# Recalculate DC
dc_1x = poisson_ms_home + poisson_ms_draw
dc_x2 = poisson_ms_draw + poisson_ms_away
dc_12 = poisson_ms_home + poisson_ms_away
dc_probs = [(dc_1x, "1X"), (dc_x2, "X2"), (dc_12, "12")]
dc_sorted = sorted(dc_probs, key=lambda x: x[0], reverse=True)
new_dc_pick = dc_sorted[0][1]
new_dc_conf = calc_confidence_dc(dc_sorted[0][0])
reconciled_result = MatchResultPrediction(
ms_home_prob=poisson_ms_home,
ms_draw_prob=poisson_ms_draw,
ms_away_prob=poisson_ms_away,
ms_pick=new_ms_pick,
ms_confidence=new_ms_conf,
dc_1x_prob=dc_1x,
dc_x2_prob=dc_x2,
dc_12_prob=dc_12,
dc_pick=new_dc_pick,
dc_confidence=new_dc_conf
)
# Select best score that matches MS Pick
# NEW LOGIC: We trust XGBoost/Poisson top score over generic MS Pick if MS Confidence is low.
# Otherwise, we filter the grid to match the MS pick.
ms_pick = reconciled_result.ms_pick
def _score_matches_ms(score_str, pick):
h, a = map(int, score_str.split("-"))
if pick == "1": return h > a
if pick == "2": return h < a
return h == a
matching_scores = [(s, p) for s, p in sorted_scores if _score_matches_ms(s, ms_pick)]
# Primary Prediction Strategy:
# If MS pick is highly confident, enforce it.
# But if the absolute best score in the grid contradicts it and has a high probability (e.g. >10%), trust the score model directly.
top_overall_score, top_overall_prob = sorted_scores[0]
if matching_scores and not (top_overall_prob > 12.0 and not _score_matches_ms(top_overall_score, ms_pick)):
predicted_ft = matching_scores[0][0]
else:
predicted_ft = top_overall_score
# If we didn't calculate HT via ML (exception case), do it now
if 'predicted_ht' not in locals():
ft_to_ht = self.config.get("half_time.ft_to_ht_ratio", 0.42)
ht_h = round(lambda_home * ft_to_ht)
ht_a = round(lambda_away * ft_to_ht)
predicted_ht = f"{ht_h}-{ht_a}"
# --- CONSISTENCY CHECK ---
# Ensure HT score <= FT score
try:
ft_h, ft_a = map(int, predicted_ft.split("-"))
ht_h, ht_a = map(int, predicted_ht.split("-"))
# Clamp HT values
ht_h = min(ht_h, ft_h)
ht_a = min(ht_a, ft_a)
predicted_ht = f"{ht_h}-{ht_a}"
except ValueError:
pass # Malformed score string, ignore correction
ft_scores = [{"score": s, "prob": p} for s, p in sorted_scores[:5]]
return ScorePrediction(
predicted_ft_score=predicted_ft,
predicted_ht_score=predicted_ht,
ft_scores_top5=ft_scores,
reconciled_ms=reconciled_result
)
+16
View File
@@ -0,0 +1,16 @@
# ai-engine/core/engines/__init__.py
"""
V20 Ensemble Prediction Engines
"""
from .team_predictor import TeamPredictorEngine, get_team_predictor
from .player_predictor import PlayerPredictorEngine, get_player_predictor
from .odds_predictor import OddsPredictorEngine, get_odds_predictor
from .referee_predictor import RefereePredictorEngine, get_referee_predictor
__all__ = [
"TeamPredictorEngine", "get_team_predictor",
"PlayerPredictorEngine", "get_player_predictor",
"OddsPredictorEngine", "get_odds_predictor",
"RefereePredictorEngine", "get_referee_predictor"
]
+237
View File
@@ -0,0 +1,237 @@
"""
Odds Predictor Engine - V20 Ensemble Component
Uses market odds and Poisson mathematics for predictions.
Weight: 30% in ensemble
"""
import os
import sys
from typing import Dict, Optional
from dataclasses import dataclass
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from features.poisson_engine import get_poisson_engine
from features.value_calculator import get_value_calculator
@dataclass
class OddsPrediction:
"""Odds engine prediction output."""
# Market-implied probabilities
market_home_prob: float = 0.33
market_draw_prob: float = 0.33
market_away_prob: float = 0.33
# Poisson xG
poisson_home_xg: float = 1.3
poisson_away_xg: float = 1.1
# Over/Under probabilities
over_15_prob: float = 0.75
over_25_prob: float = 0.55
over_35_prob: float = 0.30
# BTTS
btts_yes_prob: float = 0.50
# Most likely scores
most_likely_score: str = "1-1"
second_likely_score: str = "1-0"
third_likely_score: str = "2-1"
# Value bet opportunities
value_bets: list = None
confidence: float = 0.0
def __post_init__(self):
if self.value_bets is None:
self.value_bets = []
def to_dict(self) -> dict:
return {
"market_home_prob": round(self.market_home_prob * 100, 1),
"market_draw_prob": round(self.market_draw_prob * 100, 1),
"market_away_prob": round(self.market_away_prob * 100, 1),
"poisson_home_xg": round(self.poisson_home_xg, 2),
"poisson_away_xg": round(self.poisson_away_xg, 2),
"over_15_prob": round(self.over_15_prob * 100, 1),
"over_25_prob": round(self.over_25_prob * 100, 1),
"over_35_prob": round(self.over_35_prob * 100, 1),
"btts_yes_prob": round(self.btts_yes_prob * 100, 1),
"most_likely_score": self.most_likely_score,
"second_likely_score": self.second_likely_score,
"third_likely_score": self.third_likely_score,
"value_bets": self.value_bets,
"confidence": round(self.confidence, 1)
}
class OddsPredictorEngine:
"""
Odds-based prediction engine.
Uses:
- Market odds to extract implied probabilities
- Poisson distribution for mathematical xG
- Value calculator for EV+ opportunities
"""
def __init__(self):
self.poisson_engine = get_poisson_engine()
try:
self.value_calc = get_value_calculator()
except Exception:
self.value_calc = None
self.default_ms_h = 2.65
self.default_ms_d = 3.20
self.default_ms_a = 2.65
print("✅ OddsPredictorEngine initialized")
def _odds_to_prob(self, odds: float) -> float:
"""Convert decimal odds to probability."""
try:
odds = float(odds)
except (TypeError, ValueError):
return 0.0
if odds <= 1.0:
return 0.0
return 1.0 / odds
def predict(self,
odds_data: Dict[str, float],
home_goals_avg: float = 1.5,
home_conceded_avg: float = 1.2,
away_goals_avg: float = 1.2,
away_conceded_avg: float = 1.4) -> OddsPrediction:
"""
Generate odds-based prediction.
Args:
odds_data: Dict with keys like 'ms_h', 'ms_d', 'ms_a', 'ou25_o', 'btts_y'
home_goals_avg: Home team's average goals scored
home_conceded_avg: Home team's average goals conceded
away_goals_avg: Away team's average goals scored
away_conceded_avg: Away team's average goals conceded
Returns:
OddsPrediction with market and Poisson analysis
"""
# 1. Extract market probabilities from odds
ms_h = odds_data.get("ms_h", self.default_ms_h)
ms_d = odds_data.get("ms_d", self.default_ms_d)
ms_a = odds_data.get("ms_a", self.default_ms_a)
# Remove vig to get fair probabilities
raw_probs = [
self._odds_to_prob(ms_h),
self._odds_to_prob(ms_d),
self._odds_to_prob(ms_a)
]
total = sum(raw_probs) or 1
market_home = raw_probs[0] / total
market_draw = raw_probs[1] / total
market_away = raw_probs[2] / total
# 2. Poisson prediction
poisson_pred = self.poisson_engine.predict(
home_goals_avg, home_conceded_avg,
away_goals_avg, away_conceded_avg
)
# 3. Get most likely scores
likely_scores = poisson_pred.most_likely_scores[:3] if poisson_pred.most_likely_scores else []
score_1 = likely_scores[0]["score"] if len(likely_scores) > 0 else "1-1"
score_2 = likely_scores[1]["score"] if len(likely_scores) > 1 else "1-0"
score_3 = likely_scores[2]["score"] if len(likely_scores) > 2 else "2-1"
# 4. Value bet detection
value_bets = []
# Check if our Poisson model disagrees with market significantly
if abs(poisson_pred.home_win_prob - market_home) > 0.10:
if poisson_pred.home_win_prob > market_home:
value_bets.append({
"market": "MS 1",
"edge": round((poisson_pred.home_win_prob - market_home) * 100, 1),
"confidence": "medium"
})
else:
value_bets.append({
"market": "MS 2",
"edge": round((poisson_pred.away_win_prob - market_away) * 100, 1),
"confidence": "medium"
})
# O/U value check
ou25_o = odds_data.get("ou25_o", 1.9)
market_over25 = self._odds_to_prob(ou25_o)
if abs(poisson_pred.over_25_prob - market_over25) > 0.08:
pick = "2.5 Üst" if poisson_pred.over_25_prob > market_over25 else "2.5 Alt"
edge = abs(poisson_pred.over_25_prob - market_over25) * 100
value_bets.append({
"market": pick,
"edge": round(edge, 1),
"confidence": "high" if edge > 10 else "medium"
})
# Calculate confidence
# Higher when market and Poisson agree
agreement = 1.0 - abs(poisson_pred.home_win_prob - market_home)
confidence = 50.0 + (agreement * 40) + (len(value_bets) * 5)
return OddsPrediction(
market_home_prob=market_home,
market_draw_prob=market_draw,
market_away_prob=market_away,
poisson_home_xg=poisson_pred.home_xg,
poisson_away_xg=poisson_pred.away_xg,
over_15_prob=poisson_pred.over_15_prob,
over_25_prob=poisson_pred.over_25_prob,
over_35_prob=poisson_pred.over_35_prob,
btts_yes_prob=poisson_pred.btts_yes_prob,
most_likely_score=score_1,
second_likely_score=score_2,
third_likely_score=score_3,
value_bets=value_bets,
confidence=min(99.9, confidence)
)
# Singleton
_engine: Optional[OddsPredictorEngine] = None
def get_odds_predictor() -> OddsPredictorEngine:
global _engine
if _engine is None:
_engine = OddsPredictorEngine()
return _engine
if __name__ == "__main__":
engine = get_odds_predictor()
print("\n🧪 Odds Predictor Engine Test")
print("=" * 50)
pred = engine.predict(
odds_data={
"ms_h": 1.85,
"ms_d": 3.40,
"ms_a": 4.20,
"ou25_o": 1.90
},
home_goals_avg=1.8,
home_conceded_avg=1.0,
away_goals_avg=1.2,
away_conceded_avg=1.5
)
print(f"\n📊 Prediction:")
for k, v in pred.to_dict().items():
print(f" {k}: {v}")
+224
View File
@@ -0,0 +1,224 @@
"""
Player Predictor Engine - V20 Ensemble Component
Analyzes squad quality, key players, and missing player impact.
Weight: 25% in ensemble
"""
import os
import sys
from typing import Dict, Optional, List
from dataclasses import dataclass
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from features.squad_analysis_engine import get_squad_analysis_engine
from features.sidelined_analyzer import get_sidelined_analyzer
@dataclass
class PlayerPrediction:
"""Player engine prediction output."""
home_squad_quality: float = 50.0 # 0-100
away_squad_quality: float = 50.0
squad_diff: float = 0.0 # -100 to +100
home_key_players: int = 0
away_key_players: int = 0
home_missing_impact: float = 0.0 # 0-1, how much weaker due to missing players
away_missing_impact: float = 0.0
home_goals_form: int = 0 # Goals in last 5 matches
away_goals_form: int = 0
lineup_available: bool = False
confidence: float = 0.0
def to_dict(self) -> dict:
return {
"home_squad_quality": round(self.home_squad_quality, 1),
"away_squad_quality": round(self.away_squad_quality, 1),
"squad_diff": round(self.squad_diff, 1),
"home_key_players": self.home_key_players,
"away_key_players": self.away_key_players,
"home_missing_impact": round(self.home_missing_impact, 2),
"away_missing_impact": round(self.away_missing_impact, 2),
"home_goals_form": self.home_goals_form,
"away_goals_form": self.away_goals_form,
"lineup_available": self.lineup_available,
"confidence": round(self.confidence, 1)
}
class PlayerPredictorEngine:
"""
Player/Squad-based prediction engine.
Analyzes:
- Starting 11 quality
- Key player availability (top scorers)
- Missing player impact
- Recent goalscoring form per player
"""
def __init__(self):
self.squad_engine = get_squad_analysis_engine()
self.sidelined_analyzer = get_sidelined_analyzer()
print("✅ PlayerPredictorEngine initialized")
def predict(self,
match_id: str,
home_team_id: str,
away_team_id: str,
home_lineup: List[str] = None,
away_lineup: List[str] = None,
sidelined_data: Dict = None) -> PlayerPrediction:
"""
Generate player-based prediction.
Args:
match_id: Match ID for lineup lookup
home_team_id: Home team ID
away_team_id: Away team ID
home_lineup: Optional list of home player IDs
away_lineup: Optional list of away player IDs
Returns:
PlayerPrediction with squad analysis
"""
# Get squad features
if home_lineup and away_lineup:
# Use provided lineups (for live matches)
home_analysis = self.squad_engine.analyze_squad_from_list(
home_lineup, home_team_id
)
away_analysis = self.squad_engine.analyze_squad_from_list(
away_lineup, away_team_id
)
lineup_available = True
# Build features dict from analysis objects
features = {
"home_starting_11": home_analysis.starting_count or 11,
"home_goals_last_5": home_analysis.total_goals_last_5,
"home_assists_last_5": home_analysis.total_assists_last_5,
"home_key_players": home_analysis.key_players_count,
"away_starting_11": away_analysis.starting_count or 11,
"away_goals_last_5": away_analysis.total_goals_last_5,
"away_assists_last_5": away_analysis.total_assists_last_5,
"away_key_players": away_analysis.key_players_count,
}
elif match_id:
# Try to get from database
try:
features = self.squad_engine.get_features(
match_id, home_team_id, away_team_id
)
lineup_available = (
features.get("home_starting_11", 0) >= 11 and
features.get("away_starting_11", 0) >= 11
)
except Exception:
features = self.squad_engine.get_features_without_match(
home_team_id, away_team_id
)
lineup_available = False
else:
features = self.squad_engine.get_features_without_match(
home_team_id, away_team_id
)
lineup_available = False
# Extract features
home_goals = features.get("home_goals_last_5", 0)
away_goals = features.get("away_goals_last_5", 0)
home_key = features.get("home_key_players", 0)
away_key = features.get("away_key_players", 0)
# Calculate squad quality (0-100)
# Based on: goals scored, key players, assists
home_quality = min(100, 50 + (home_goals * 3) + (home_key * 5) +
features.get("home_assists_last_5", 0) * 2)
away_quality = min(100, 50 + (away_goals * 3) + (away_key * 5) +
features.get("away_assists_last_5", 0) * 2)
# Squad difference
squad_diff = home_quality - away_quality
# Missing player impact
# Priority: sidelined data (position-weighted) > lineup count (basic)
if sidelined_data:
home_impact, away_impact = self.sidelined_analyzer.analyze_match(sidelined_data)
home_missing = home_impact.impact_score
away_missing = away_impact.impact_score
sidelined_available = True
else:
# Fallback: basic lineup count method
expected_xi = 11
actual_home_xi = features.get("home_starting_11", 11)
actual_away_xi = features.get("away_starting_11", 11)
home_missing = (expected_xi - actual_home_xi) / expected_xi if actual_home_xi < expected_xi else 0
away_missing = (expected_xi - actual_away_xi) / expected_xi if actual_away_xi < expected_xi else 0
sidelined_available = False
# Confidence: more data sources = higher confidence
confidence = 70.0 if lineup_available else 35.0
if home_goals + away_goals > 10:
confidence += 15
if sidelined_available:
confidence += self.sidelined_analyzer.config.get("sidelined.confidence_boost", 10)
if not lineup_available:
confidence -= 5.0
return PlayerPrediction(
home_squad_quality=home_quality,
away_squad_quality=away_quality,
squad_diff=squad_diff,
home_key_players=home_key,
away_key_players=away_key,
home_missing_impact=home_missing,
away_missing_impact=away_missing,
home_goals_form=home_goals,
away_goals_form=away_goals,
lineup_available=lineup_available,
confidence=max(5.0, confidence)
)
def get_1x2_modifier(self, prediction: PlayerPrediction) -> Dict[str, float]:
"""
Calculate 1X2 probability modifiers based on squad analysis.
Returns modifiers to apply to base probabilities.
"""
diff = prediction.squad_diff / 100 # -1 to +1
return {
"home_modifier": 1.0 + (diff * 0.3), # Up to +/-30%
"away_modifier": 1.0 - (diff * 0.3),
"draw_modifier": 1.0 - abs(diff) * 0.2 # Less draw if big diff
}
# Singleton
_engine: Optional[PlayerPredictorEngine] = None
def get_player_predictor() -> PlayerPredictorEngine:
global _engine
if _engine is None:
_engine = PlayerPredictorEngine()
return _engine
if __name__ == "__main__":
engine = get_player_predictor()
print("\n🧪 Player Predictor Engine Test")
print("=" * 50)
pred = engine.predict(
match_id=None,
home_team_id="test_home",
away_team_id="test_away"
)
print(f"\n📊 Prediction:")
for k, v in pred.to_dict().items():
print(f" {k}: {v}")
+188
View File
@@ -0,0 +1,188 @@
"""
Referee Predictor Engine - V20 Ensemble Component
Analyzes referee patterns for cards, goals, and home bias.
Weight: 15% in ensemble
"""
import os
import sys
from typing import Dict, Optional
from dataclasses import dataclass
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from features.referee_engine import get_referee_engine
@dataclass
class RefereePrediction:
"""Referee engine prediction output."""
referee_name: str = ""
matches_officiated: int = 0
# Card tendencies
avg_yellow_cards: float = 4.0
avg_red_cards: float = 0.2
is_card_heavy: bool = False # Above average cards
# Goal tendencies
avg_goals_per_match: float = 2.5
over_25_rate: float = 0.50
is_high_scoring: bool = False # Above average goals
# Home bias
home_win_rate: float = 0.45
home_bias: float = 0.0 # -1 to +1, positive = favors home
# Penalty tendency
penalty_rate: float = 0.15
confidence: float = 0.0
def to_dict(self) -> dict:
return {
"referee_name": self.referee_name,
"matches_officiated": self.matches_officiated,
"avg_yellow_cards": round(self.avg_yellow_cards, 1),
"avg_red_cards": round(self.avg_red_cards, 2),
"is_card_heavy": self.is_card_heavy,
"avg_goals_per_match": round(self.avg_goals_per_match, 2),
"over_25_rate": round(self.over_25_rate * 100, 1),
"is_high_scoring": self.is_high_scoring,
"home_win_rate": round(self.home_win_rate * 100, 1),
"home_bias": round(self.home_bias, 2),
"penalty_rate": round(self.penalty_rate * 100, 1),
"confidence": round(self.confidence, 1)
}
class RefereePredictorEngine:
"""
Referee-based prediction engine.
Analyzes:
- Card tendency (sarı/kırmızı kart ortalaması)
- Goal tendency (maç başına gol, 2.5 üst oranı)
- Home bias (ev sahibi lehine karar oranı)
- Penalty tendency (penaltı verme oranı)
"""
# League average benchmarks
LEAGUE_AVG_GOALS = 2.65
LEAGUE_AVG_YELLOW = 4.0
LEAGUE_HOME_WIN_RATE = 0.45
def __init__(self):
self.referee_engine = get_referee_engine()
print("✅ RefereePredictorEngine initialized")
def predict(self,
match_id: str = None,
referee_name: str = None,
league_id: str = None) -> RefereePrediction:
"""
Generate referee-based prediction.
Args:
match_id: Match ID to find referee
referee_name: Or provide referee name directly
league_id: League ID to scope stats (prevents name collisions)
Returns:
RefereePrediction with referee analysis
"""
# Get referee features
if match_id:
features = self.referee_engine.get_features(match_id, league_id=league_id)
# Live flows may already have referee_name while match_officials table is sparse.
# Prefer the richer profile if direct-name lookup has more history.
if referee_name:
name_features = self.referee_engine.get_features_by_name(referee_name, league_id=league_id)
if (name_features.get("referee_matches", 0) or 0) > (features.get("referee_matches", 0) or 0):
features = name_features
elif referee_name:
features = self.referee_engine.get_features_by_name(referee_name, league_id=league_id)
else:
# Return default
return RefereePrediction(confidence=10.0)
ref_name = features.get("referee_name", "Unknown")
matches = features.get("referee_matches", 0)
if matches < 5:
# Not enough data
return RefereePrediction(
referee_name=ref_name,
matches_officiated=matches,
confidence=20.0
)
# Extract features
avg_yellow = features.get("referee_avg_yellow", 4.0)
avg_red = features.get("referee_avg_red", 0.2)
avg_goals = features.get("referee_avg_goals", 2.5)
over25_rate = features.get("referee_over25_rate", 0.5)
home_win_rate = features.get("referee_home_win_rate", 0.45) if "referee_home_win_rate" in features else 0.45
home_bias = features.get("referee_home_bias", 0.0)
penalty_rate = features.get("referee_penalty_rate", 0.15)
# Determine tendencies
is_card_heavy = (avg_yellow + avg_red * 4) > (self.LEAGUE_AVG_YELLOW + 1)
is_high_scoring = avg_goals > self.LEAGUE_AVG_GOALS
# Confidence based on matches officiated
confidence = min(90.0, 30.0 + matches * 2)
return RefereePrediction(
referee_name=ref_name,
matches_officiated=matches,
avg_yellow_cards=avg_yellow,
avg_red_cards=avg_red,
is_card_heavy=is_card_heavy,
avg_goals_per_match=avg_goals,
over_25_rate=over25_rate,
is_high_scoring=is_high_scoring,
home_win_rate=home_win_rate,
home_bias=home_bias,
penalty_rate=penalty_rate,
confidence=confidence
)
def get_modifiers(self, prediction: RefereePrediction) -> Dict[str, float]:
"""
Get modifiers to apply to other predictions based on referee profile.
"""
return {
# Home team gets slight boost if referee has home bias
"home_modifier": 1.0 + (prediction.home_bias * 0.05),
# O/U modifier
"over_25_modifier": 1.0 + (prediction.avg_goals_per_match - self.LEAGUE_AVG_GOALS) * 0.1,
# Card modifier for card markets
"cards_modifier": 1.0 + (prediction.avg_yellow_cards - self.LEAGUE_AVG_YELLOW) * 0.05
}
# Singleton
_engine: Optional[RefereePredictorEngine] = None
def get_referee_predictor() -> RefereePredictorEngine:
global _engine
if _engine is None:
_engine = RefereePredictorEngine()
return _engine
if __name__ == "__main__":
engine = get_referee_predictor()
print("\n🧪 Referee Predictor Engine Test")
print("=" * 50)
pred = engine.predict(referee_name="Cüneyt Çakır")
print(f"\n📊 Prediction:")
for k, v in pred.to_dict().items():
print(f" {k}: {v}")
+286
View File
@@ -0,0 +1,286 @@
"""
Team Predictor Engine - V20 Ensemble Component
Combines ELO ratings, form stats, H2H records and team statistics.
Weight: 30% in ensemble
"""
import os
import sys
from typing import Dict, Optional, Tuple, Any
from dataclasses import dataclass, field
# Add parent to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from features.elo_system import get_elo_system
from features.h2h_engine import get_h2h_engine
from features.momentum_engine import get_momentum_engine, MomentumData
from features.team_stats_engine import get_team_stats_engine
@dataclass
class TeamPrediction:
"""Team engine prediction output."""
home_win_prob: float = 0.33
draw_prob: float = 0.33
away_win_prob: float = 0.33
home_xg: float = 1.3
away_xg: float = 1.1
form_advantage: float = 0.0 # -1 to +1, positive = home advantage
h2h_advantage: float = 0.0 # -1 to +1
elo_diff: float = 0.0
confidence: float = 0.0
def to_dict(self) -> dict:
return {
"home_win_prob": round(self.home_win_prob * 100, 1),
"draw_prob": round(self.draw_prob * 100, 1),
"away_win_prob": round(self.away_win_prob * 100, 1),
"home_xg": round(self.home_xg, 2),
"away_xg": round(self.away_xg, 2),
"form_advantage": round(self.form_advantage, 2),
"h2h_advantage": round(self.h2h_advantage, 2),
"elo_diff": round(self.elo_diff, 0),
"confidence": round(self.confidence, 1)
}
raw_features: Dict[str, Any] = field(default_factory=dict)
class TeamPredictorEngine:
"""
Team-based prediction engine.
Uses:
- ELO Rating System (venue-adjusted, league-weighted)
- H2H Engine (head-to-head history)
- Momentum Engine (recent form)
- Team Stats Engine (possession, shots, corners)
"""
def __init__(self):
self.elo_system = get_elo_system()
self.h2h_engine = get_h2h_engine()
self.momentum_engine = get_momentum_engine()
self.team_stats_engine = get_team_stats_engine()
print("✅ TeamPredictorEngine initialized")
def predict(self,
home_team_id: str,
away_team_id: str,
match_date_ms: int,
home_team_name: str = "",
away_team_name: str = "") -> TeamPrediction:
"""
Generate team-based prediction.
Args:
home_team_id: Home team ID
away_team_id: Away team ID
match_date_ms: Match date in milliseconds
home_team_name: Home team name (for ELO)
away_team_name: Away team name (for ELO)
Returns:
TeamPrediction with 1X2 probabilities and xG
"""
# 1. Get ELO predictions
elo_pred = self.elo_system.predict_match(home_team_id, away_team_id)
elo_features = self.elo_system.get_match_features(home_team_id, away_team_id)
# 2. Get H2H features
try:
h2h_features = self.h2h_engine.get_features(
home_team_id, away_team_id, match_date_ms
)
except Exception:
h2h_features = {
"h2h_home_win_rate": 0.5,
"h2h_away_win_rate": 0.5,
"h2h_avg_goals": 2.5,
"h2h_btts_rate": 0.5
}
# 3. Get Momentum/Form features
try:
# key: form_score should be 0-1 derived from momentum_score (-1 to 1)
home_mom_data = self.momentum_engine.calculate_momentum(home_team_id, match_date_ms)
away_mom_data = self.momentum_engine.calculate_momentum(away_team_id, match_date_ms)
home_form_score = (home_mom_data.momentum_score + 1) / 2
away_form_score = (away_mom_data.momentum_score + 1) / 2
except Exception as e:
print(f"⚠️ MomentumEngine error: {e}")
home_mom_data = MomentumData()
away_mom_data = MomentumData()
home_form_score = 0.5
away_form_score = 0.5
# 4. Get Team Stats
home_stats = self.team_stats_engine.get_features(home_team_id, match_date_ms)
away_stats = self.team_stats_engine.get_features(away_team_id, match_date_ms)
# 5. Combine predictions
# ELO-based 1X2 (60% weight)
elo_home = elo_pred.get("home_win_prob", 0.33)
elo_draw = elo_pred.get("draw_prob", 0.33)
elo_away = elo_pred.get("away_win_prob", 0.33)
# Adjust based on H2H (20% weight)
h2h_home_rate = h2h_features.get("h2h_home_win_rate", 0.5)
h2h_away_rate = h2h_features.get("h2h_away_win_rate", 0.5)
# Adjust based on form (20% weight)
home_form = home_form_score
away_form = away_form_score
form_diff = (home_form - away_form) # -1 to +1
# Weighted combination
final_home = elo_home * 0.6 + h2h_home_rate * 0.2 + (0.5 + form_diff * 0.3) * 0.2
final_away = elo_away * 0.6 + h2h_away_rate * 0.2 + (0.5 - form_diff * 0.3) * 0.2
final_draw = 1.0 - final_home - final_away
# Normalize
total = final_home + final_draw + final_away
if total > 0:
final_home /= total
final_draw /= total
final_away /= total
# Calculate xG based on stats and form (conservative base)
home_conversion = home_stats.get("shot_conversion_rate", 0.1)
away_conversion = away_stats.get("shot_conversion_rate", 0.1)
base_home_xg = 1.35 + (home_conversion * 3.0)
base_away_xg = 1.10 + (away_conversion * 2.5)
# Defense weakness factor: opponent's defensive quality affects xG
# Higher shots on target against = weaker defense
away_def_weakness = away_stats.get("shot_accuracy", 0.35) # opponent's shot accuracy as proxy
home_def_weakness = home_stats.get("shot_accuracy", 0.35)
# Adjust xG: stronger opponent defense → lower xG
home_xg = base_home_xg * (1 + form_diff * 0.15) * (0.8 + away_def_weakness * 0.6)
away_xg = base_away_xg * (1 - form_diff * 0.15) * (0.8 + home_def_weakness * 0.6)
# Apply xG Underperformance Penalty directly to calculated xG
# If a team chronically underperforms its xG, we subtract that historical difference here
if hasattr(home_mom_data, 'xg_underperformance') and home_mom_data.xg_underperformance > 0.2:
home_xg -= min(0.5, home_mom_data.xg_underperformance * 0.5)
if hasattr(away_mom_data, 'xg_underperformance') and away_mom_data.xg_underperformance > 0.2:
away_xg -= min(0.5, away_mom_data.xg_underperformance * 0.5)
# H2H adjustment (more conservative)
h2h_avg_goals = h2h_features.get("h2h_avg_goals", 2.5)
if h2h_avg_goals > 3.0:
home_xg *= 1.05
away_xg *= 1.05
elif h2h_avg_goals < 2.0:
home_xg *= 0.95
away_xg *= 0.95
# Clamp xG to reasonable range
home_xg = max(0.5, min(3.5, home_xg))
away_xg = max(0.3, min(3.0, away_xg))
# Calculate confidence
# Higher when ELO, H2H, and Form all agree
elo_winner = "H" if elo_home > max(elo_draw, elo_away) else ("A" if elo_away > elo_draw else "D")
h2h_winner = "H" if h2h_home_rate > h2h_away_rate else "A"
form_winner = "H" if form_diff > 0.1 else ("A" if form_diff < -0.1 else "D")
agreement = sum([
elo_winner == h2h_winner,
elo_winner == form_winner,
h2h_winner == form_winner
])
max_prob = max(final_home, final_draw, final_away)
confidence = max_prob * 100 * (0.7 + agreement * 0.1)
# Collect Raw Features for XGBoost
# Note: home_mom_data is an object now
def get_rate(val): return val if val is not None else 0.5
raw_features = {
**elo_features, # 8 features
# Form Features (need key mapping to match extract_training_data.py)
"home_goals_avg": 1.5 + home_mom_data.goals_trend, # Proxy
"home_conceded_avg": 1.5 - home_mom_data.conceded_trend, # Proxy
"away_goals_avg": 1.5 + away_mom_data.goals_trend,
"away_conceded_avg": 1.5 - away_mom_data.conceded_trend,
"home_clean_sheet_rate": 0.2, # Not in new MomentumData
"away_clean_sheet_rate": 0.2,
"home_scoring_rate": 0.8,
"away_scoring_rate": 0.8,
"home_winning_streak": home_mom_data.winning_streak,
"away_winning_streak": away_mom_data.winning_streak,
"home_unbeaten_streak": home_mom_data.unbeaten_streak,
"away_unbeaten_streak": away_mom_data.unbeaten_streak,
# H2H Features
**h2h_features,
# Team Stats
"home_avg_possession": home_stats.get("avg_possession", 0.5),
"away_avg_possession": away_stats.get("avg_possession", 0.5),
"home_avg_shots_on_target": home_stats.get("avg_shots_on_target", 3.5),
"away_avg_shots_on_target": away_stats.get("avg_shots_on_target", 3.5),
"home_shot_conversion": home_stats.get("shot_conversion_rate", 0.1),
"away_shot_conversion": away_stats.get("shot_conversion_rate", 0.1),
"home_avg_corners": home_stats.get("avg_corners", 4.5),
"away_avg_corners": away_stats.get("avg_corners", 4.5),
# Derived
"home_xga": 1.5 - home_mom_data.conceded_trend, # reusing as proxy
"away_xga": 1.5 - away_mom_data.conceded_trend
}
return TeamPrediction(
home_win_prob=final_home,
draw_prob=final_draw,
away_win_prob=final_away,
home_xg=home_xg,
away_xg=away_xg,
form_advantage=form_diff,
h2h_advantage=h2h_home_rate - h2h_away_rate,
elo_diff=elo_features.get("elo_diff", 0),
confidence=confidence,
raw_features=raw_features
)
# Singleton
_engine: Optional[TeamPredictorEngine] = None
def get_team_predictor() -> TeamPredictorEngine:
global _engine
if _engine is None:
_engine = TeamPredictorEngine()
return _engine
if __name__ == "__main__":
engine = get_team_predictor()
print("\n🧪 Team Predictor Engine Test")
print("=" * 50)
# Test with sample IDs
pred = engine.predict(
home_team_id="test_home",
away_team_id="test_away",
match_date_ms=1707393600000
)
print(f"\n📊 Prediction:")
for k, v in pred.to_dict().items():
print(f" {k}: {v}")
+302
View File
@@ -0,0 +1,302 @@
"""
Quantitative Finance Module — V2 Betting Engine
Edge calculation, Fractional Kelly Criterion staking, bet grading, and risk assessment.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Any
# ═══════════════════════════════════════════════════════════════════════════
# Constants
# ═══════════════════════════════════════════════════════════════════════════
BANKROLL_UNITS: float = 10.0 # Total bankroll in abstract units
KELLY_FRACTION: float = 0.25 # Quarter-Kelly (conservative, anti-ruin)
MIN_EDGE_PLAYABLE: float = 0.05 # 5% edge minimum to mark as playable
MIN_ODDS_PLAYABLE: float = 1.30 # Skip extreme chalk below 1.30
# ═══════════════════════════════════════════════════════════════════════════
# Edge Calculation
# ═══════════════════════════════════════════════════════════════════════════
def calculate_edge(true_prob: float, decimal_odds: float) -> float:
"""
Edge = (True_Probability × Decimal_Odds) - 1.0
Positive edge → the model says we have an advantage over the bookmaker.
"""
if decimal_odds <= 1.0 or true_prob <= 0.0:
return -1.0
return round((true_prob * decimal_odds) - 1.0, 4)
# ═══════════════════════════════════════════════════════════════════════════
# Kelly Criterion Staking
# ═══════════════════════════════════════════════════════════════════════════
def kelly_stake(true_prob: float, decimal_odds: float) -> float:
"""
Fractional Kelly Criterion for a bankroll of BANKROLL_UNITS.
Full Kelly: f* = ((b × p) - q) / b
where b = decimal_odds - 1, p = true_prob, q = 1 - true_prob
We use KELLY_FRACTION (25%) to reduce variance and avoid ruin.
Returns stake in units, rounded to 0.1.
"""
if decimal_odds <= 1.0 or true_prob <= 0.0 or true_prob >= 1.0:
return 0.0
b = decimal_odds - 1.0
p = true_prob
q = 1.0 - p
f_star = ((b * p) - q) / b
if f_star <= 0.0:
return 0.0
# Scale by fraction and bankroll
stake = f_star * KELLY_FRACTION * BANKROLL_UNITS
# Cap at a sensible maximum (3 units on a 10-unit bankroll)
stake = min(stake, 3.0)
return round(max(0.0, stake), 1)
# ═══════════════════════════════════════════════════════════════════════════
# Bet Grading
# ═══════════════════════════════════════════════════════════════════════════
def grade_bet(edge: float, playable: bool) -> str:
"""
Assign a letter grade based on edge magnitude.
A: Edge > 10% — Elite value, rare
B: Edge > 5% — Strong value, core bets
C: Edge > 2% — Marginal value, supporting picks only
PASS: Below threshold — Do not bet
"""
if not playable or edge < 0.02:
return "PASS"
if edge > 0.10:
return "A"
if edge > 0.05:
return "B"
return "C"
def is_playable(edge: float, decimal_odds: float) -> bool:
"""A pick is playable if it has sufficient edge AND reasonable odds."""
return edge >= MIN_EDGE_PLAYABLE and decimal_odds >= MIN_ODDS_PLAYABLE
# ═══════════════════════════════════════════════════════════════════════════
# Play Score (0-100 composite)
# ═══════════════════════════════════════════════════════════════════════════
def calculate_play_score(
edge: float,
true_prob: float,
data_quality: float,
) -> float:
"""
Composite score combining edge strength, probability confidence,
and data quality. Used for ranking picks and filtering.
Components:
- Edge contribution (0-50): edge * 250, capped at 50
- Prob contribution (0-30): probability * 30
- DQ contribution (0-20): data_quality * 20
"""
edge_score = min(50.0, max(0.0, edge * 250.0))
prob_score = min(30.0, max(0.0, true_prob * 30.0))
dq_score = min(20.0, max(0.0, data_quality * 20.0))
return round(edge_score + prob_score + dq_score, 1)
# ═══════════════════════════════════════════════════════════════════════════
# Risk Assessment
# ═══════════════════════════════════════════════════════════════════════════
@dataclass
class RiskResult:
level: str # LOW, MEDIUM, HIGH, EXTREME
score: float # 0.0 - 1.0
is_surprise_risk: bool
surprise_type: str | None
warnings: list[str]
def assess_risk(
missing_players_impact: float,
data_quality_score: float,
elo_diff: float,
implied_prob_fav: float,
) -> RiskResult:
"""
Multi-factor risk assessment.
Factors:
1. Missing key players (injuries/suspensions)
2. Data quality (missing stats, odds)
3. ELO closeness (tight matches are riskier)
4. Surprise potential (heavy favorite vulnerable)
"""
warnings: list[str] = []
risk_score = 0.0
# ─── Factor 1: Missing players ────────────────────────────────────
if missing_players_impact > 0.3:
risk_score += 0.35
warnings.append(
f"High missing-player impact: {missing_players_impact:.2f}"
)
elif missing_players_impact > 0.15:
risk_score += 0.15
warnings.append(
f"Moderate missing-player impact: {missing_players_impact:.2f}"
)
# ─── Factor 2: Data quality ───────────────────────────────────────
if data_quality_score < 0.5:
risk_score += 0.25
warnings.append(
f"Low data quality: {data_quality_score:.2f}"
)
elif data_quality_score < 0.75:
risk_score += 0.10
# ─── Factor 3: ELO closeness ──────────────────────────────────────
abs_elo_diff = abs(elo_diff)
if abs_elo_diff < 50:
risk_score += 0.15
warnings.append("Very tight ELO difference — coin-flip territory")
elif abs_elo_diff < 100:
risk_score += 0.05
# ─── Factor 4: Surprise detection ─────────────────────────────────
is_surprise = False
surprise_type: str | None = None
if implied_prob_fav > 0.65 and abs_elo_diff < 80:
# Heavy favorite by odds but ELO says match is closer
is_surprise = True
surprise_type = "odds_elo_divergence"
risk_score += 0.15
warnings.append(
"Upset potential: bookmaker odds suggest heavy favorite "
"but ELO says the match is closer than the market thinks"
)
# ─── Classify ─────────────────────────────────────────────────────
risk_score = min(1.0, risk_score)
if risk_score >= 0.7:
level = "EXTREME"
elif risk_score >= 0.45:
level = "HIGH"
elif risk_score >= 0.2:
level = "MEDIUM"
else:
level = "LOW"
return RiskResult(
level=level,
score=round(risk_score, 3),
is_surprise_risk=is_surprise,
surprise_type=surprise_type,
warnings=warnings,
)
# ═══════════════════════════════════════════════════════════════════════════
# Market Analysis (orchestrates edge/kelly/grade per market)
# ═══════════════════════════════════════════════════════════════════════════
@dataclass
class MarketPick:
market: str
pick: str
probability: float
odds: float
edge: float
playable: bool
bet_grade: str
stake_units: float
play_score: float
decision_reasons: list[str]
def analyze_market(
market: str,
probs: dict[str, float],
odds_map: dict[str, float],
data_quality_score: float,
) -> MarketPick:
"""
For a given market (MS, OU25, BTTS), find the best pick,
calculate edge, kelly stake, and grade it.
Parameters:
market: "MS", "OU25", "BTTS"
probs: {"1": 0.55, "X": 0.25, "2": 0.20} — calibrated model probs
odds_map: {"1": 2.10, "X": 3.40, "2": 3.50} — decimal odds
data_quality_score: 0.0-1.0
"""
best_pick: str = ""
best_edge: float = -99.0
best_prob: float = 0.0
best_odds: float = 0.0
reasons: list[str] = []
for pick_name, prob in probs.items():
odd = odds_map.get(pick_name, 0.0)
if odd <= 1.0:
continue
edge = calculate_edge(prob, odd)
if edge > best_edge:
best_edge = edge
best_pick = pick_name
best_prob = prob
best_odds = odd
if not best_pick:
return MarketPick(
market=market, pick="", probability=0.0, odds=0.0,
edge=0.0, playable=False, bet_grade="PASS",
stake_units=0.0, play_score=0.0,
decision_reasons=["no_valid_odds_found"],
)
playable = is_playable(best_edge, best_odds)
grade = grade_bet(best_edge, playable)
stake = kelly_stake(best_prob, best_odds) if playable else 0.0
play_score = calculate_play_score(best_edge, best_prob, data_quality_score)
# Build decision reasons
if playable:
reasons.append(f"edge_{best_edge:.1%}_above_threshold")
reasons.append(f"kelly_stake_{stake:.1f}_units")
else:
if best_edge < MIN_EDGE_PLAYABLE:
reasons.append(f"edge_{best_edge:.1%}_below_{MIN_EDGE_PLAYABLE:.0%}_threshold")
if best_odds < MIN_ODDS_PLAYABLE:
reasons.append(f"odds_{best_odds:.2f}_below_{MIN_ODDS_PLAYABLE:.2f}_minimum")
return MarketPick(
market=market,
pick=best_pick,
probability=round(best_prob, 4),
odds=round(best_odds, 2),
edge=round(best_edge, 4),
playable=playable,
bet_grade=grade,
stake_units=stake,
play_score=play_score,
decision_reasons=reasons,
)