Files
fahricansecer 94c7a4481a
Deploy Iddaai Backend / build-and-deploy (push) Successful in 37s
main
2026-05-17 02:17:22 +03:00

470 lines
18 KiB
Python

"""Reversal Mixin — HT/FT reversal watchlist and cycle metrics.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class ReversalMixin:
def get_reversal_watchlist(
self,
count: int = 20,
horizon_hours: int = 72,
min_score: float = 45.0,
top_leagues_only: bool = False,
) -> Dict[str, Any]:
safe_count = max(1, min(100, int(count)))
safe_horizon = max(6, min(168, int(horizon_hours)))
safe_min_score = max(0.0, min(100.0, float(min_score)))
now_ms = int(time.time() * 1000)
horizon_ms = now_ms + (safe_horizon * 60 * 60 * 1000)
with psycopg2.connect(self.dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT
lm.id,
lm.home_team_id,
lm.away_team_id,
lm.league_id,
lm.mst_utc
FROM live_matches lm
WHERE lm.sport = 'football'
AND lm.mst_utc >= %s
AND lm.mst_utc <= %s
ORDER BY lm.mst_utc ASC
LIMIT 200
""",
(now_ms, horizon_ms),
)
raw_candidates = cur.fetchall()
candidates = [
row
for row in raw_candidates
if row.get("home_team_id") and row.get("away_team_id")
]
if top_leagues_only:
candidates = [
row for row in candidates if self._is_top_league(row.get("league_id"))
]
team_ids: Set[str] = set()
pair_keys: Set[Tuple[str, str]] = set()
for row in candidates:
home_id = str(row["home_team_id"])
away_id = str(row["away_team_id"])
team_ids.add(home_id)
team_ids.add(away_id)
h, a = sorted((home_id, away_id))
pair_keys.add((h, a))
team_cycle = self._fetch_team_reversal_cycle_metrics(cur, team_ids, now_ms)
h2h_ctx = self._fetch_h2h_reversal_context(cur, pair_keys, now_ms)
watch_items_all: List[Dict[str, Any]] = []
scanned = 0
for row in candidates:
match_id = str(row["id"])
data = self._load_match_data(match_id)
if data is None:
continue
package = self.analyze_match(match_id)
if not package:
continue
scanned += 1
htft_probs = package.get("market_board", {}).get("HTFT", {}).get("probs", {})
prob_12 = float(htft_probs.get("1/2", 0.0))
prob_21 = float(htft_probs.get("2/1", 0.0))
if prob_12 <= 0.0 and prob_21 <= 0.0:
continue
overall_htft_pick = None
overall_htft_prob = 0.0
if htft_probs:
overall_htft_pick, overall_htft_prob = max(
htft_probs.items(),
key=lambda item: float(item[1]),
)
reversal_sum = prob_12 + prob_21
reversal_max = max(prob_12, prob_21)
top_pick = "2/1" if prob_21 >= prob_12 else "1/2"
top_prob = prob_21 if top_pick == "2/1" else prob_12
ms_h = self._to_float(data.odds_data.get("ms_h"), 0.0)
ms_a = self._to_float(data.odds_data.get("ms_a"), 0.0)
gap = abs(ms_h - ms_a) if ms_h > 1.0 and ms_a > 1.0 else 0.0
favorite_odd = min(ms_h, ms_a) if ms_h > 1.0 and ms_a > 1.0 else 0.0
# Reversal events are rare (~5% baseline), so convert raw probs to a more useful
# watchlist scale where p in [0.02, 0.08] becomes meaningfully separable.
base_score = (reversal_max * 100.0 * 8.0) + (reversal_sum * 100.0 * 4.0)
balance_bonus = 0.0
if gap > 0.0:
balance_bonus = max(0.0, (1.0 - min(gap, 1.2) / 1.2) * 7.0)
elif ms_h > 1.0 and ms_a > 1.0:
balance_bonus = 2.0
favorite_bonus = 0.0
if favorite_odd > 0.0 and favorite_odd <= 1.70 and reversal_max >= 0.02:
favorite_bonus = min(8.0, (1.70 - favorite_odd) * 12.0)
home_metrics = team_cycle.get(data.home_team_id, {})
away_metrics = team_cycle.get(data.away_team_id, {})
cycle_pressure = max(
float(home_metrics.get("cycle_pressure", 0.0)),
float(away_metrics.get("cycle_pressure", 0.0)),
)
cycle_bonus = cycle_pressure * 10.0
h, a = sorted((data.home_team_id, data.away_team_id))
pair_key = (h, a)
pair_ctx = h2h_ctx.get(pair_key, {})
blowout_bonus = 0.0
last_diff = int(pair_ctx.get("goal_diff", 0))
if abs(last_diff) >= 3:
blowout_bonus = 6.0
if abs(last_diff) >= 5:
blowout_bonus += 3.0
ou25_o = self._to_float(data.odds_data.get("ou25_o"), 0.0)
tempo_bonus = 0.0
if ou25_o > 1.0 and ou25_o <= 1.72:
tempo_bonus = 2.5
watch_score = max(
0.0,
min(
100.0,
base_score + balance_bonus + favorite_bonus + cycle_bonus + blowout_bonus + tempo_bonus,
),
)
reason_codes: List[str] = []
if top_prob >= 0.045:
reason_codes.append("reversal_prob_hot")
elif top_prob >= 0.030:
reason_codes.append("reversal_prob_warm")
if gap > 0.0 and gap <= 0.80:
reason_codes.append("balanced_matchup")
if favorite_bonus > 0.0:
reason_codes.append("strong_favorite_reversal_window")
if cycle_pressure >= 0.55:
reason_codes.append("team_reversal_cycle_pressure")
if blowout_bonus > 0.0:
reason_codes.append("h2h_blowout_rematch")
if tempo_bonus > 0.0:
reason_codes.append("high_tempo_profile")
if not reason_codes:
reason_codes.append("model_signal_only")
item = (
{
"match_id": data.match_id,
"match_name": f"{data.home_team_name} vs {data.away_team_name}",
"match_date_ms": data.match_date_ms,
"league_id": data.league_id,
"league": data.league_name,
"risk_band": self._watchlist_risk_band(watch_score),
"watch_score": round(watch_score, 2),
"top_pick": top_pick,
"top_pick_prob": round(top_prob, 4),
"top_pick_scope": "reversal_only",
"overall_htft_pick": overall_htft_pick,
"overall_htft_pick_prob": round(float(overall_htft_prob), 4),
"reversal_probs": {
"1/2": round(prob_12, 4),
"2/1": round(prob_21, 4),
},
"odds_snapshot": {
"ms_h": round(ms_h, 2) if ms_h > 0 else None,
"ms_a": round(ms_a, 2) if ms_a > 0 else None,
"ms_gap": round(gap, 3),
"favorite_odd": round(favorite_odd, 2) if favorite_odd > 0 else None,
},
"pattern_signals": {
"home_cycle_pressure": round(float(home_metrics.get("cycle_pressure", 0.0)), 3),
"away_cycle_pressure": round(float(away_metrics.get("cycle_pressure", 0.0)), 3),
"home_matches_since_last_reversal": int(home_metrics.get("matches_since_last_reversal", 99)),
"away_matches_since_last_reversal": int(away_metrics.get("matches_since_last_reversal", 99)),
"h2h_last_goal_diff": last_diff if pair_ctx else None,
"h2h_last_result": pair_ctx.get("result"),
},
"reason_codes": reason_codes,
}
)
watch_items_all.append(item)
watch_items_all.sort(
key=lambda item: (
float(item.get("watch_score", 0.0)),
float(item.get("top_pick_prob", 0.0)),
),
reverse=True,
)
selected = [
item for item in watch_items_all if float(item.get("watch_score", 0.0)) >= safe_min_score
][:safe_count]
preview = watch_items_all[: min(5, len(watch_items_all))]
return {
"engine": "v28.main",
"generated_at": __import__("datetime").datetime.utcnow().isoformat() + "Z",
"horizon_hours": safe_horizon,
"min_score": round(safe_min_score, 2),
"top_leagues_only": bool(top_leagues_only),
"scanned_matches": scanned,
"candidate_matches": len(candidates),
"listed_matches": len(selected),
"watchlist": selected,
"top_candidates_preview": preview,
}
def _fetch_team_reversal_cycle_metrics(
self,
cur: RealDictCursor,
team_ids: Set[str],
now_ms: int,
) -> Dict[str, Dict[str, float]]:
if not team_ids:
return {}
cur.execute(
"""
WITH team_matches AS (
SELECT
m.home_team_id AS team_id,
m.mst_utc,
CASE
WHEN m.ht_score_home > m.ht_score_away THEN 'L'
WHEN m.ht_score_home < m.ht_score_away THEN 'T'
ELSE 'D'
END AS ht_state,
CASE
WHEN m.score_home > m.score_away THEN 'W'
WHEN m.score_home < m.score_away THEN 'L'
ELSE 'D'
END AS ft_state
FROM matches m
WHERE m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.ht_score_home IS NOT NULL
AND m.ht_score_away IS NOT NULL
AND m.home_team_id = ANY(%s)
AND m.mst_utc < %s
UNION ALL
SELECT
m.away_team_id AS team_id,
m.mst_utc,
CASE
WHEN m.ht_score_away > m.ht_score_home THEN 'L'
WHEN m.ht_score_away < m.ht_score_home THEN 'T'
ELSE 'D'
END AS ht_state,
CASE
WHEN m.score_away > m.score_home THEN 'W'
WHEN m.score_away < m.score_home THEN 'L'
ELSE 'D'
END AS ft_state
FROM matches m
WHERE m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.ht_score_home IS NOT NULL
AND m.ht_score_away IS NOT NULL
AND m.away_team_id = ANY(%s)
AND m.mst_utc < %s
),
ranked AS (
SELECT
team_id,
mst_utc,
ht_state,
ft_state,
ROW_NUMBER() OVER (PARTITION BY team_id ORDER BY mst_utc DESC) AS rn
FROM team_matches
)
SELECT team_id, mst_utc, ht_state, ft_state
FROM ranked
WHERE rn <= 80
ORDER BY team_id ASC, mst_utc DESC
""",
(list(team_ids), now_ms, list(team_ids), now_ms),
)
rows = cur.fetchall()
by_team: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for row in rows:
by_team[str(row["team_id"])].append(row)
out: Dict[str, Dict[str, float]] = {}
for team_id in team_ids:
team_rows = by_team.get(str(team_id), [])
if not team_rows:
out[str(team_id)] = {
"recent_reversal_rate": 0.0,
"matches_since_last_reversal": 99.0,
"avg_gap_matches": 12.0,
"cycle_pressure": 0.0,
}
continue
reversal_indexes: List[int] = []
recent_reversal = 0
recent_n = min(15, len(team_rows))
for idx, row in enumerate(team_rows, start=1):
ht_state = str(row.get("ht_state") or "")
ft_state = str(row.get("ft_state") or "")
is_reversal = (ht_state == "L" and ft_state == "L") or (ht_state == "T" and ft_state == "W")
if idx <= recent_n and is_reversal:
recent_reversal += 1
if is_reversal:
reversal_indexes.append(idx)
recent_rate = (recent_reversal / recent_n) if recent_n > 0 else 0.0
since_last = float(reversal_indexes[0]) if reversal_indexes else 99.0
gaps: List[float] = []
if len(reversal_indexes) >= 2:
for i in range(1, len(reversal_indexes)):
gaps.append(float(reversal_indexes[i] - reversal_indexes[i - 1]))
avg_gap = (sum(gaps) / len(gaps)) if gaps else 12.0
if avg_gap <= 0:
avg_gap = 12.0
cycle_pressure = 0.0
if reversal_indexes:
tolerance = max(3.0, avg_gap * 0.7)
diff = abs(since_last - avg_gap)
cycle_pressure = max(0.0, 1.0 - (diff / tolerance))
out[str(team_id)] = {
"recent_reversal_rate": round(recent_rate, 4),
"matches_since_last_reversal": round(since_last, 2),
"avg_gap_matches": round(avg_gap, 2),
"cycle_pressure": round(cycle_pressure, 4),
}
return out
def _fetch_h2h_reversal_context(
self,
cur: RealDictCursor,
pair_keys: Set[Tuple[str, str]],
now_ms: int,
) -> Dict[Tuple[str, str], Dict[str, Any]]:
if not pair_keys:
return {}
team_ids = sorted({team_id for pair in pair_keys for team_id in pair})
cur.execute(
"""
SELECT
m.home_team_id,
m.away_team_id,
m.score_home,
m.score_away,
m.ht_score_home,
m.ht_score_away,
m.mst_utc
FROM matches m
WHERE m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.home_team_id = ANY(%s)
AND m.away_team_id = ANY(%s)
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT 4000
""",
(team_ids, team_ids, now_ms),
)
rows = cur.fetchall()
out: Dict[Tuple[str, str], Dict[str, Any]] = {}
for row in rows:
home_id = str(row["home_team_id"])
away_id = str(row["away_team_id"])
h, a = sorted((home_id, away_id))
key = (h, a)
if key not in pair_keys or key in out:
continue
score_home = int(row["score_home"])
score_away = int(row["score_away"])
goal_diff = score_home - score_away
out[key] = {
"goal_diff": goal_diff,
"result": f"{score_home}-{score_away}",
"match_date_ms": int(row["mst_utc"] or 0),
}
if len(out) >= len(pair_keys):
break
return out
@staticmethod
def _watchlist_risk_band(score: float) -> str:
if score >= 68.0:
return "HIGH"
if score >= 54.0:
return "MEDIUM"
return "LOW"