"""Basketball Mixin — basketball-specific market construction. Auto-extracted mixin module — split from services/single_match_orchestrator.py. All methods here are composed into SingleMatchOrchestrator via inheritance. `self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are initialised in the main __init__. """ from __future__ import annotations import json import re import time import math import os import pickle from collections import defaultdict from typing import Any, Dict, List, Optional, Set, Tuple, overload import pandas as pd import numpy as np import psycopg2 from psycopg2.extras import RealDictCursor from data.db import get_clean_dsn from schemas.prediction import FullMatchPrediction from schemas.match_data import MatchData from models.v25_ensemble import V25Predictor, get_v25_predictor try: from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge except ImportError: class V27Predictor: # type: ignore[no-redef] def __init__(self): self.models = {} def load_models(self): return False def predict_all(self, features): return {} def compute_divergence(*args, **kwargs): return {} def compute_value_edge(*args, **kwargs): return {} from features.odds_band_analyzer import OddsBandAnalyzer try: from models.basketball_v25 import ( BasketballMatchPrediction, get_basketball_v25_predictor, ) except ImportError: BasketballMatchPrediction = Any # type: ignore[misc] def get_basketball_v25_predictor() -> Any: raise ImportError("Basketball predictor is not available") from core.engines.player_predictor import PlayerPrediction, get_player_predictor from services.feature_enrichment import FeatureEnrichmentService from services.betting_brain import BettingBrain from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine from services.match_commentary import generate_match_commentary from utils.top_leagues import load_top_league_ids from utils.league_reliability import load_league_reliability from config.config_loader import build_threshold_dict, get_threshold_default from models.calibration import get_calibrator class BasketballMixin: def _build_basketball_prediction_package( self, data: MatchData, prediction: Dict[str, Any], ) -> Dict[str, Any]: quality = self._compute_data_quality(data) raw_market_rows = self._build_basketball_market_rows(data, prediction) market_rows = [ self._decorate_basketball_market_row(data, prediction, quality, row) for row in raw_market_rows ] market_rows.sort( key=lambda row: ( 1 if row.get("playable") else 0, float(row.get("play_score", 0.0)), ), reverse=True, ) playable_rows = [row for row in market_rows if row.get("playable")] MIN_ODDS = 1.30 playable_with_odds = [ row for row in playable_rows if float(row.get("odds", 0.0)) >= MIN_ODDS ] if playable_with_odds: playable_with_odds.sort( key=lambda r: ( float(r.get("ev_edge", 0.0)), float(r.get("play_score", 0.0)), ), reverse=True, ) main_pick = playable_with_odds[0] main_pick["is_guaranteed"] = False main_pick["pick_reason"] = "positive_ev_pick" else: fallback_with_odds = [r for r in market_rows if float(r.get("odds", 0.0)) > 1.0] fallback_with_odds.sort(key=lambda r: float(r.get("play_score", 0.0)), reverse=True) main_pick = fallback_with_odds[0] if fallback_with_odds else (market_rows[0] if market_rows else None) if main_pick: main_pick["is_guaranteed"] = False main_pick["playable"] = False main_pick["stake_units"] = 0.0 main_pick["bet_grade"] = "PASS" main_pick["pick_reason"] = "no_playable_value_found" supporting: List[Dict[str, Any]] = [] for row in market_rows: if main_pick and row["market"] == main_pick["market"] and row["pick"] == main_pick["pick"]: continue supporting.append(row) supporting = supporting[:5] bet_summary = [self._to_bet_summary_item(row) for row in market_rows] scenarios = self._build_basketball_scenarios(prediction) reasons = self._build_basketball_reasoning_factors(data, prediction, quality) aggressive_pick: Optional[Dict[str, Any]] = None risk_level = prediction.get("risk_level", "MEDIUM") risk_score = float(prediction.get("risk_score", 50.0) or 50.0) # Build aggressive pick if available from Spreak in market_board board = prediction.get("market_board", {}) if risk_level in ("LOW", "MEDIUM") and "Spread" in board: spr_data = board["Spread"] probs = list(spr_data.values()) keys = list(spr_data.keys()) if len(probs) >= 2: prob_a = float(str(probs[0]).replace('%', '')) / 100.0 prob_h = float(str(probs[1]).replace('%', '')) / 100.0 max_prob = max(prob_a, prob_h) spr_pick = "Home" if prob_h >= prob_a else "Away" conf = 50.0 line_str = "Spread" for b in prediction.get("bet_summary", []): if b["market"] == "Spread": conf = float(b["confidence"]) line_str = b["pick"] aggressive_pick = { "market": "SPREAD", "pick": line_str, "probability": round(max_prob, 4), "confidence": round(conf, 1), "odds": round( float( data.odds_data.get( "spread_h" if spr_pick == "Home" else "spread_a", 0.0 ) ), 2, ), } scores = prediction.get("score_prediction", {}) home_score = scores.get("home_expected", 80.0) away_score = scores.get("away_expected", 80.0) total_score = scores.get("total_expected", 160.0) mb_out = { "PLAYER_TOP": board.get("PLAYER_TOP", []), } if "ML" in board: ml_data = board["ML"] keys = list(ml_data.keys()) if len(keys) >= 2: mb_out["ML"] = { "pick": prediction.get("main_pick", ""), "confidence": 60.0, "probs": { "1": round(float(str(ml_data[keys[0]]).replace('%', '')) / 100.0, 4), "2": round(float(str(ml_data[keys[1]]).replace('%', '')) / 100.0, 4), }, } if "Totals" in board: tot_data = board["Totals"] keys = list(tot_data.keys()) if len(keys) >= 2: mb_out["TOTAL"] = { "line": 160.5, "pick": prediction.get("main_pick", ""), "confidence": 60.0, "probs": { "under": round(float(str(tot_data[keys[0]]).replace('%', '')) / 100.0, 4), "over": round(float(str(tot_data[keys[1]]).replace('%', '')) / 100.0, 4), }, } if "Spread" in board: spr_data = board["Spread"] keys = list(spr_data.keys()) if len(keys) >= 2: mb_out["SPREAD"] = { "line_home": 0.0, "pick": prediction.get("main_pick", ""), "confidence": 60.0, "probs": { "away_cover": round(float(str(spr_data[keys[0]]).replace('%', '')) / 100.0, 4), "home_cover": round(float(str(spr_data[keys[1]]).replace('%', '')) / 100.0, 4), }, } return { "model_version": str(prediction.get("engine_version") or "v28.main.basketball"), "match_info": { "match_id": data.match_id, "match_name": f"{data.home_team_name} vs {data.away_team_name}", "home_team": data.home_team_name, "away_team": data.away_team_name, "league": data.league_name, "match_date_ms": data.match_date_ms, "sport": data.sport, }, "data_quality": quality, "risk": { "level": risk_level, "score": round(risk_score, 1), "is_surprise_risk": False, "surprise_type": "", "warnings": [], }, "engine_breakdown": prediction.get("engine_breakdown") or { "team": 60.0, "player": 60.0, "odds": 80.0, "referee": 50.0, }, "main_pick": main_pick, "bet_advice": { "playable": bool(main_pick and main_pick.get("playable")), "suggested_stake_units": float(main_pick.get("stake_units", 0.0)) if (main_pick and main_pick.get("playable")) else 0.0, "reason": "playable_pick_found" if (main_pick and main_pick.get("playable")) else "no_bet_conditions_met", }, "bet_summary": bet_summary, "supporting_picks": supporting, "aggressive_pick": aggressive_pick, "scenario_top5": scenarios, "score_prediction": { "ft": f"{int(round(home_score))}-{int(round(away_score))}", "ht": f"{int(round(home_score * 0.52))}-{int(round(away_score * 0.52))}", "xg_home": round(float(home_score), 2), "xg_away": round(float(away_score), 2), "xg_total": round(float(total_score), 2), }, "market_board": mb_out, "reasoning_factors": reasons, } def _build_basketball_market_rows( self, data: MatchData, pred: Dict[str, Any], ) -> List[Dict[str, Any]]: odds = data.odds_data market_board = pred.get("market_board", {}) # 1. Moneyline ml_row = None if "ML" in market_board: ml_data = market_board["ML"] # To get specific pick (MS 1 or MS 2), look at the probability values probs = list(ml_data.values()) keys = list(ml_data.keys()) if len(probs) >= 2: prob_1 = float(str(probs[0]).replace('%', '')) / 100.0 prob_2 = float(str(probs[1]).replace('%', '')) / 100.0 max_prob = max(prob_1, prob_2) # Derive pick string ml_pick_val = keys[0] if prob_1 >= prob_2 else keys[1] ml_pick = "1" if "1" in ml_pick_val else "2" ml_odd_key = "ml_h" if ml_pick == "1" else "ml_a" # Find confidence from bet summary conf = 50.0 for b in pred.get("bet_summary", []): if b["market"] == "Moneyline": conf = float(b["confidence"]) ml_row = { "market": "ML", "pick": ml_pick, "probability": round(max_prob, 4), "confidence": round(conf, 1), "odds": round(float(odds.get(ml_odd_key, 0.0)), 2), } # 2. Totals tot_row = None if "Totals" in market_board: tot_data = market_board["Totals"] probs = list(tot_data.values()) keys = list(tot_data.keys()) if len(probs) >= 2: prob_u = float(str(probs[0]).replace('%', '')) / 100.0 prob_o = float(str(probs[1]).replace('%', '')) / 100.0 max_prob = max(prob_u, prob_o) pick_str = keys[1] if prob_o >= prob_u else keys[0] tot_pick = "Over" if "Over" in pick_str else "Under" line_val = pick_str.replace("Over", "").replace("Under", "").strip() conf = 50.0 for b in pred.get("bet_summary", []): if b["market"] == "Totals": conf = float(b["confidence"]) tot_row = { "market": "TOTAL", "pick": f"{tot_pick} {line_val}", "probability": round(max_prob, 4), "confidence": round(conf, 1), "odds": round(float(odds.get("tot_o" if tot_pick == "Over" else "tot_u", 0.0)), 2), } # 3. Spread spr_row = None if "Spread" in market_board: spr_data = market_board["Spread"] probs = list(spr_data.values()) keys = list(spr_data.keys()) if len(probs) >= 2: prob_a = float(str(probs[0]).replace('%', '')) / 100.0 prob_h = float(str(probs[1]).replace('%', '')) / 100.0 max_prob = max(prob_a, prob_h) spr_pick = "Home" if prob_h >= prob_a else "Away" conf = 50.0 line_str = "" for b in pred.get("bet_summary", []): if b["market"] == "Spread": conf = float(b["confidence"]) line_str = b["pick"] spr_row = { "market": "SPREAD", "pick": spr_pick + " " + line_str, "probability": round(max_prob, 4), "confidence": round(conf, 1), "odds": round(float(odds.get("spread_h" if spr_pick == "Home" else "spread_a", 0.0)), 2), } # Return valid rows rows = [] if ml_row: rows.append(ml_row) if tot_row: rows.append(tot_row) if spr_row: rows.append(spr_row) return rows def _decorate_basketball_market_row( self, data: MatchData, prediction: Dict[str, Any], quality: Dict[str, Any], row: Dict[str, Any], ) -> Dict[str, Any]: market = str(row.get("market") or "") raw_conf = float(row.get("confidence") or 0.0) prob = float(row.get("probability") or 0.0) odd = float(row.get("odds") or 0.0) calibration = {"ML": 0.90, "TOTAL": 0.88, "SPREAD": 0.86}.get(market, 0.88) min_conf = {"ML": 55.0, "TOTAL": 56.0, "SPREAD": 55.0}.get(market, 55.0) calibrated_conf = max(1.0, min(99.0, raw_conf * calibration)) implied_prob = (1.0 / odd) if odd > 1.0 else 0.0 edge = prob - implied_prob if implied_prob > 0 else 0.0 risk_level = str(prediction.get("risk_level", "MEDIUM")).upper() risk_penalty = {"LOW": 0.0, "MEDIUM": 3.0, "HIGH": 8.0, "EXTREME": 12.0}.get( risk_level, 4.0, ) quality_label = str(quality.get("label") or "MEDIUM").upper() quality_penalty = {"HIGH": 0.0, "MEDIUM": 2.0, "LOW": 6.0}.get( quality_label, 4.0, ) base_score = calibrated_conf + (edge * 100.0) play_score = max(0.0, min(100.0, base_score - risk_penalty - quality_penalty)) reasons: List[str] = [] playable = True min_play_score = self.market_min_play_score.get(market, 68.0) min_edge = self.market_min_edge.get(market, 0.02) if calibrated_conf < min_conf: playable = False reasons.append("below_calibrated_conf_threshold") if market in self.ODDS_REQUIRED_MARKETS and odd <= 1.01: playable = False reasons.append("market_odds_missing") if risk_level in ("HIGH", "EXTREME") and quality_label == "LOW": playable = False reasons.append("high_risk_low_data_quality") if odd > 1.0 and edge < -0.05: playable = False reasons.append("negative_model_edge") if not reasons: reasons.append("market_passed_all_gates") if not playable: grade = "PASS" stake_units = 0.0 elif play_score >= 72: grade = "A" stake_units = 1.0 elif play_score >= 61: grade = "B" stake_units = 0.5 else: grade = "C" stake_units = 0.25 out = dict(row) out.update( { "raw_confidence": round(raw_conf, 1), "calibrated_confidence": round(calibrated_conf, 1), "min_required_confidence": round(min_conf, 1), "edge": round(edge, 4), "play_score": round(play_score, 1), "playable": playable, "bet_grade": grade, "stake_units": stake_units, "decision_reasons": reasons[:3], }, ) return out def _build_basketball_scenarios( self, prediction: Dict[str, Any], ) -> List[Dict[str, Any]]: scores = prediction.get("score_prediction", {}) home = float(scores.get("home_expected", 80.0)) away = float(scores.get("away_expected", 80.0)) templates = [ (0.00, 0.23), (+3.5, 0.20), (-3.5, 0.19), (+6.0, 0.16), (-6.0, 0.14), ] out: List[Dict[str, Any]] = [] for delta, prob in templates: h = int(round(home + delta)) a = int(round(away - delta)) out.append({"score": f"{h}-{a}", "prob": prob}) return out def _build_basketball_reasoning_factors( self, data: MatchData, prediction: Dict[str, Any], quality: Dict[str, Any], ) -> List[str]: factors: List[str] = [] # XGBoost models are odds-aware, weight it heavily factors.append("market_signal_dominant") if quality.get("label") in ("HIGH", "MEDIUM"): factors.append("player_form_signal_strong") else: factors.append("player_form_signal_limited") if prediction.get("is_surprise_risk"): factors.append("upset_risk_detected") if quality.get("label") == "LOW": factors.append("limited_data_confidence") factors.append("basketball_points_model") return factors def _compute_basketball_data_quality(self, data: MatchData) -> Dict[str, Any]: flags: List[str] = [] has_ml = float(data.odds_data.get("ml_h", 0.0)) > 1.0 and float(data.odds_data.get("ml_a", 0.0)) > 1.0 has_total = ( float(data.odds_data.get("tot_line", 0.0)) > 0.0 and float(data.odds_data.get("tot_o", 0.0)) > 1.0 and float(data.odds_data.get("tot_u", 0.0)) > 1.0 ) has_spread = ( "spread_home_line" in data.odds_data and float(data.odds_data.get("spread_h", 0.0)) > 1.0 and float(data.odds_data.get("spread_a", 0.0)) > 1.0 ) odds_components = [has_ml, has_total, has_spread] odds_score = sum(1.0 for x in odds_components if x) / 3.0 if not has_ml: flags.append("missing_moneyline_odds") if not has_total: flags.append("missing_total_odds") if not has_spread: flags.append("missing_spread_odds") # Basketball live lineup/referee coverage is structurally lower in this project. # Keep neutral baseline and rely mostly on odds depth. lineup_score = 0.7 ref_score = 0.7 total_score = (odds_score * 0.75) + (lineup_score * 0.15) + (ref_score * 0.10) if total_score >= 0.75: label = "HIGH" elif total_score >= 0.52: label = "MEDIUM" else: label = "LOW" return { "label": label, "score": round(total_score, 3), "home_lineup_count": len(data.home_lineup or []), "away_lineup_count": len(data.away_lineup or []), "lineup_source": data.lineup_source, "flags": flags, }