470 lines
18 KiB
Python
470 lines
18 KiB
Python
"""Reversal Mixin — HT/FT reversal watchlist and cycle metrics.
|
|
|
|
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
|
|
All methods here are composed into SingleMatchOrchestrator via inheritance.
|
|
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
|
|
initialised in the main __init__.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import time
|
|
import math
|
|
import os
|
|
import pickle
|
|
from collections import defaultdict
|
|
from typing import Any, Dict, List, Optional, Set, Tuple, overload
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
from data.db import get_clean_dsn
|
|
from schemas.prediction import FullMatchPrediction
|
|
from schemas.match_data import MatchData
|
|
from models.v25_ensemble import V25Predictor, get_v25_predictor
|
|
try:
|
|
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
|
|
except ImportError:
|
|
class V27Predictor: # type: ignore[no-redef]
|
|
def __init__(self): self.models = {}
|
|
def load_models(self): return False
|
|
def predict_all(self, features): return {}
|
|
def compute_divergence(*args, **kwargs):
|
|
return {}
|
|
def compute_value_edge(*args, **kwargs):
|
|
return {}
|
|
from features.odds_band_analyzer import OddsBandAnalyzer
|
|
try:
|
|
from models.basketball_v25 import (
|
|
BasketballMatchPrediction,
|
|
get_basketball_v25_predictor,
|
|
)
|
|
except ImportError:
|
|
BasketballMatchPrediction = Any # type: ignore[misc]
|
|
def get_basketball_v25_predictor() -> Any:
|
|
raise ImportError("Basketball predictor is not available")
|
|
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
|
|
from services.feature_enrichment import FeatureEnrichmentService
|
|
from services.betting_brain import BettingBrain
|
|
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
|
|
from services.match_commentary import generate_match_commentary
|
|
from utils.top_leagues import load_top_league_ids
|
|
from utils.league_reliability import load_league_reliability
|
|
from config.config_loader import build_threshold_dict, get_threshold_default
|
|
from models.calibration import get_calibrator
|
|
|
|
|
|
class ReversalMixin:
|
|
def get_reversal_watchlist(
|
|
self,
|
|
count: int = 20,
|
|
horizon_hours: int = 72,
|
|
min_score: float = 45.0,
|
|
top_leagues_only: bool = False,
|
|
) -> Dict[str, Any]:
|
|
safe_count = max(1, min(100, int(count)))
|
|
safe_horizon = max(6, min(168, int(horizon_hours)))
|
|
safe_min_score = max(0.0, min(100.0, float(min_score)))
|
|
now_ms = int(time.time() * 1000)
|
|
horizon_ms = now_ms + (safe_horizon * 60 * 60 * 1000)
|
|
|
|
with psycopg2.connect(self.dsn) as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
lm.id,
|
|
lm.home_team_id,
|
|
lm.away_team_id,
|
|
lm.league_id,
|
|
lm.mst_utc
|
|
FROM live_matches lm
|
|
WHERE lm.sport = 'football'
|
|
AND lm.mst_utc >= %s
|
|
AND lm.mst_utc <= %s
|
|
ORDER BY lm.mst_utc ASC
|
|
LIMIT 200
|
|
""",
|
|
(now_ms, horizon_ms),
|
|
)
|
|
raw_candidates = cur.fetchall()
|
|
|
|
candidates = [
|
|
row
|
|
for row in raw_candidates
|
|
if row.get("home_team_id") and row.get("away_team_id")
|
|
]
|
|
if top_leagues_only:
|
|
candidates = [
|
|
row for row in candidates if self._is_top_league(row.get("league_id"))
|
|
]
|
|
|
|
team_ids: Set[str] = set()
|
|
pair_keys: Set[Tuple[str, str]] = set()
|
|
for row in candidates:
|
|
home_id = str(row["home_team_id"])
|
|
away_id = str(row["away_team_id"])
|
|
team_ids.add(home_id)
|
|
team_ids.add(away_id)
|
|
h, a = sorted((home_id, away_id))
|
|
pair_keys.add((h, a))
|
|
|
|
team_cycle = self._fetch_team_reversal_cycle_metrics(cur, team_ids, now_ms)
|
|
h2h_ctx = self._fetch_h2h_reversal_context(cur, pair_keys, now_ms)
|
|
|
|
watch_items_all: List[Dict[str, Any]] = []
|
|
scanned = 0
|
|
for row in candidates:
|
|
match_id = str(row["id"])
|
|
data = self._load_match_data(match_id)
|
|
if data is None:
|
|
continue
|
|
|
|
package = self.analyze_match(match_id)
|
|
if not package:
|
|
continue
|
|
|
|
scanned += 1
|
|
htft_probs = package.get("market_board", {}).get("HTFT", {}).get("probs", {})
|
|
prob_12 = float(htft_probs.get("1/2", 0.0))
|
|
prob_21 = float(htft_probs.get("2/1", 0.0))
|
|
if prob_12 <= 0.0 and prob_21 <= 0.0:
|
|
continue
|
|
overall_htft_pick = None
|
|
overall_htft_prob = 0.0
|
|
if htft_probs:
|
|
overall_htft_pick, overall_htft_prob = max(
|
|
htft_probs.items(),
|
|
key=lambda item: float(item[1]),
|
|
)
|
|
|
|
reversal_sum = prob_12 + prob_21
|
|
reversal_max = max(prob_12, prob_21)
|
|
top_pick = "2/1" if prob_21 >= prob_12 else "1/2"
|
|
top_prob = prob_21 if top_pick == "2/1" else prob_12
|
|
|
|
ms_h = self._to_float(data.odds_data.get("ms_h"), 0.0)
|
|
ms_a = self._to_float(data.odds_data.get("ms_a"), 0.0)
|
|
gap = abs(ms_h - ms_a) if ms_h > 1.0 and ms_a > 1.0 else 0.0
|
|
favorite_odd = min(ms_h, ms_a) if ms_h > 1.0 and ms_a > 1.0 else 0.0
|
|
|
|
# Reversal events are rare (~5% baseline), so convert raw probs to a more useful
|
|
# watchlist scale where p in [0.02, 0.08] becomes meaningfully separable.
|
|
base_score = (reversal_max * 100.0 * 8.0) + (reversal_sum * 100.0 * 4.0)
|
|
|
|
balance_bonus = 0.0
|
|
if gap > 0.0:
|
|
balance_bonus = max(0.0, (1.0 - min(gap, 1.2) / 1.2) * 7.0)
|
|
elif ms_h > 1.0 and ms_a > 1.0:
|
|
balance_bonus = 2.0
|
|
|
|
favorite_bonus = 0.0
|
|
if favorite_odd > 0.0 and favorite_odd <= 1.70 and reversal_max >= 0.02:
|
|
favorite_bonus = min(8.0, (1.70 - favorite_odd) * 12.0)
|
|
|
|
home_metrics = team_cycle.get(data.home_team_id, {})
|
|
away_metrics = team_cycle.get(data.away_team_id, {})
|
|
cycle_pressure = max(
|
|
float(home_metrics.get("cycle_pressure", 0.0)),
|
|
float(away_metrics.get("cycle_pressure", 0.0)),
|
|
)
|
|
cycle_bonus = cycle_pressure * 10.0
|
|
|
|
h, a = sorted((data.home_team_id, data.away_team_id))
|
|
pair_key = (h, a)
|
|
pair_ctx = h2h_ctx.get(pair_key, {})
|
|
blowout_bonus = 0.0
|
|
last_diff = int(pair_ctx.get("goal_diff", 0))
|
|
if abs(last_diff) >= 3:
|
|
blowout_bonus = 6.0
|
|
if abs(last_diff) >= 5:
|
|
blowout_bonus += 3.0
|
|
|
|
ou25_o = self._to_float(data.odds_data.get("ou25_o"), 0.0)
|
|
tempo_bonus = 0.0
|
|
if ou25_o > 1.0 and ou25_o <= 1.72:
|
|
tempo_bonus = 2.5
|
|
|
|
watch_score = max(
|
|
0.0,
|
|
min(
|
|
100.0,
|
|
base_score + balance_bonus + favorite_bonus + cycle_bonus + blowout_bonus + tempo_bonus,
|
|
),
|
|
)
|
|
reason_codes: List[str] = []
|
|
if top_prob >= 0.045:
|
|
reason_codes.append("reversal_prob_hot")
|
|
elif top_prob >= 0.030:
|
|
reason_codes.append("reversal_prob_warm")
|
|
if gap > 0.0 and gap <= 0.80:
|
|
reason_codes.append("balanced_matchup")
|
|
if favorite_bonus > 0.0:
|
|
reason_codes.append("strong_favorite_reversal_window")
|
|
if cycle_pressure >= 0.55:
|
|
reason_codes.append("team_reversal_cycle_pressure")
|
|
if blowout_bonus > 0.0:
|
|
reason_codes.append("h2h_blowout_rematch")
|
|
if tempo_bonus > 0.0:
|
|
reason_codes.append("high_tempo_profile")
|
|
if not reason_codes:
|
|
reason_codes.append("model_signal_only")
|
|
|
|
item = (
|
|
{
|
|
"match_id": data.match_id,
|
|
"match_name": f"{data.home_team_name} vs {data.away_team_name}",
|
|
"match_date_ms": data.match_date_ms,
|
|
"league_id": data.league_id,
|
|
"league": data.league_name,
|
|
"risk_band": self._watchlist_risk_band(watch_score),
|
|
"watch_score": round(watch_score, 2),
|
|
"top_pick": top_pick,
|
|
"top_pick_prob": round(top_prob, 4),
|
|
"top_pick_scope": "reversal_only",
|
|
"overall_htft_pick": overall_htft_pick,
|
|
"overall_htft_pick_prob": round(float(overall_htft_prob), 4),
|
|
"reversal_probs": {
|
|
"1/2": round(prob_12, 4),
|
|
"2/1": round(prob_21, 4),
|
|
},
|
|
"odds_snapshot": {
|
|
"ms_h": round(ms_h, 2) if ms_h > 0 else None,
|
|
"ms_a": round(ms_a, 2) if ms_a > 0 else None,
|
|
"ms_gap": round(gap, 3),
|
|
"favorite_odd": round(favorite_odd, 2) if favorite_odd > 0 else None,
|
|
},
|
|
"pattern_signals": {
|
|
"home_cycle_pressure": round(float(home_metrics.get("cycle_pressure", 0.0)), 3),
|
|
"away_cycle_pressure": round(float(away_metrics.get("cycle_pressure", 0.0)), 3),
|
|
"home_matches_since_last_reversal": int(home_metrics.get("matches_since_last_reversal", 99)),
|
|
"away_matches_since_last_reversal": int(away_metrics.get("matches_since_last_reversal", 99)),
|
|
"h2h_last_goal_diff": last_diff if pair_ctx else None,
|
|
"h2h_last_result": pair_ctx.get("result"),
|
|
},
|
|
"reason_codes": reason_codes,
|
|
}
|
|
)
|
|
watch_items_all.append(item)
|
|
|
|
watch_items_all.sort(
|
|
key=lambda item: (
|
|
float(item.get("watch_score", 0.0)),
|
|
float(item.get("top_pick_prob", 0.0)),
|
|
),
|
|
reverse=True,
|
|
)
|
|
|
|
selected = [
|
|
item for item in watch_items_all if float(item.get("watch_score", 0.0)) >= safe_min_score
|
|
][:safe_count]
|
|
preview = watch_items_all[: min(5, len(watch_items_all))]
|
|
return {
|
|
"engine": "v28.main",
|
|
"generated_at": __import__("datetime").datetime.utcnow().isoformat() + "Z",
|
|
"horizon_hours": safe_horizon,
|
|
"min_score": round(safe_min_score, 2),
|
|
"top_leagues_only": bool(top_leagues_only),
|
|
"scanned_matches": scanned,
|
|
"candidate_matches": len(candidates),
|
|
"listed_matches": len(selected),
|
|
"watchlist": selected,
|
|
"top_candidates_preview": preview,
|
|
}
|
|
|
|
def _fetch_team_reversal_cycle_metrics(
|
|
self,
|
|
cur: RealDictCursor,
|
|
team_ids: Set[str],
|
|
now_ms: int,
|
|
) -> Dict[str, Dict[str, float]]:
|
|
if not team_ids:
|
|
return {}
|
|
|
|
cur.execute(
|
|
"""
|
|
WITH team_matches AS (
|
|
SELECT
|
|
m.home_team_id AS team_id,
|
|
m.mst_utc,
|
|
CASE
|
|
WHEN m.ht_score_home > m.ht_score_away THEN 'L'
|
|
WHEN m.ht_score_home < m.ht_score_away THEN 'T'
|
|
ELSE 'D'
|
|
END AS ht_state,
|
|
CASE
|
|
WHEN m.score_home > m.score_away THEN 'W'
|
|
WHEN m.score_home < m.score_away THEN 'L'
|
|
ELSE 'D'
|
|
END AS ft_state
|
|
FROM matches m
|
|
WHERE m.status = 'FT'
|
|
AND m.score_home IS NOT NULL
|
|
AND m.score_away IS NOT NULL
|
|
AND m.ht_score_home IS NOT NULL
|
|
AND m.ht_score_away IS NOT NULL
|
|
AND m.home_team_id = ANY(%s)
|
|
AND m.mst_utc < %s
|
|
UNION ALL
|
|
SELECT
|
|
m.away_team_id AS team_id,
|
|
m.mst_utc,
|
|
CASE
|
|
WHEN m.ht_score_away > m.ht_score_home THEN 'L'
|
|
WHEN m.ht_score_away < m.ht_score_home THEN 'T'
|
|
ELSE 'D'
|
|
END AS ht_state,
|
|
CASE
|
|
WHEN m.score_away > m.score_home THEN 'W'
|
|
WHEN m.score_away < m.score_home THEN 'L'
|
|
ELSE 'D'
|
|
END AS ft_state
|
|
FROM matches m
|
|
WHERE m.status = 'FT'
|
|
AND m.score_home IS NOT NULL
|
|
AND m.score_away IS NOT NULL
|
|
AND m.ht_score_home IS NOT NULL
|
|
AND m.ht_score_away IS NOT NULL
|
|
AND m.away_team_id = ANY(%s)
|
|
AND m.mst_utc < %s
|
|
),
|
|
ranked AS (
|
|
SELECT
|
|
team_id,
|
|
mst_utc,
|
|
ht_state,
|
|
ft_state,
|
|
ROW_NUMBER() OVER (PARTITION BY team_id ORDER BY mst_utc DESC) AS rn
|
|
FROM team_matches
|
|
)
|
|
SELECT team_id, mst_utc, ht_state, ft_state
|
|
FROM ranked
|
|
WHERE rn <= 80
|
|
ORDER BY team_id ASC, mst_utc DESC
|
|
""",
|
|
(list(team_ids), now_ms, list(team_ids), now_ms),
|
|
)
|
|
rows = cur.fetchall()
|
|
|
|
by_team: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
for row in rows:
|
|
by_team[str(row["team_id"])].append(row)
|
|
|
|
out: Dict[str, Dict[str, float]] = {}
|
|
for team_id in team_ids:
|
|
team_rows = by_team.get(str(team_id), [])
|
|
if not team_rows:
|
|
out[str(team_id)] = {
|
|
"recent_reversal_rate": 0.0,
|
|
"matches_since_last_reversal": 99.0,
|
|
"avg_gap_matches": 12.0,
|
|
"cycle_pressure": 0.0,
|
|
}
|
|
continue
|
|
|
|
reversal_indexes: List[int] = []
|
|
recent_reversal = 0
|
|
recent_n = min(15, len(team_rows))
|
|
for idx, row in enumerate(team_rows, start=1):
|
|
ht_state = str(row.get("ht_state") or "")
|
|
ft_state = str(row.get("ft_state") or "")
|
|
is_reversal = (ht_state == "L" and ft_state == "L") or (ht_state == "T" and ft_state == "W")
|
|
if idx <= recent_n and is_reversal:
|
|
recent_reversal += 1
|
|
if is_reversal:
|
|
reversal_indexes.append(idx)
|
|
|
|
recent_rate = (recent_reversal / recent_n) if recent_n > 0 else 0.0
|
|
since_last = float(reversal_indexes[0]) if reversal_indexes else 99.0
|
|
|
|
gaps: List[float] = []
|
|
if len(reversal_indexes) >= 2:
|
|
for i in range(1, len(reversal_indexes)):
|
|
gaps.append(float(reversal_indexes[i] - reversal_indexes[i - 1]))
|
|
avg_gap = (sum(gaps) / len(gaps)) if gaps else 12.0
|
|
if avg_gap <= 0:
|
|
avg_gap = 12.0
|
|
|
|
cycle_pressure = 0.0
|
|
if reversal_indexes:
|
|
tolerance = max(3.0, avg_gap * 0.7)
|
|
diff = abs(since_last - avg_gap)
|
|
cycle_pressure = max(0.0, 1.0 - (diff / tolerance))
|
|
|
|
out[str(team_id)] = {
|
|
"recent_reversal_rate": round(recent_rate, 4),
|
|
"matches_since_last_reversal": round(since_last, 2),
|
|
"avg_gap_matches": round(avg_gap, 2),
|
|
"cycle_pressure": round(cycle_pressure, 4),
|
|
}
|
|
return out
|
|
|
|
def _fetch_h2h_reversal_context(
|
|
self,
|
|
cur: RealDictCursor,
|
|
pair_keys: Set[Tuple[str, str]],
|
|
now_ms: int,
|
|
) -> Dict[Tuple[str, str], Dict[str, Any]]:
|
|
if not pair_keys:
|
|
return {}
|
|
|
|
team_ids = sorted({team_id for pair in pair_keys for team_id in pair})
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
m.home_team_id,
|
|
m.away_team_id,
|
|
m.score_home,
|
|
m.score_away,
|
|
m.ht_score_home,
|
|
m.ht_score_away,
|
|
m.mst_utc
|
|
FROM matches m
|
|
WHERE m.status = 'FT'
|
|
AND m.score_home IS NOT NULL
|
|
AND m.score_away IS NOT NULL
|
|
AND m.home_team_id = ANY(%s)
|
|
AND m.away_team_id = ANY(%s)
|
|
AND m.mst_utc < %s
|
|
ORDER BY m.mst_utc DESC
|
|
LIMIT 4000
|
|
""",
|
|
(team_ids, team_ids, now_ms),
|
|
)
|
|
rows = cur.fetchall()
|
|
|
|
out: Dict[Tuple[str, str], Dict[str, Any]] = {}
|
|
for row in rows:
|
|
home_id = str(row["home_team_id"])
|
|
away_id = str(row["away_team_id"])
|
|
h, a = sorted((home_id, away_id))
|
|
key = (h, a)
|
|
if key not in pair_keys or key in out:
|
|
continue
|
|
|
|
score_home = int(row["score_home"])
|
|
score_away = int(row["score_away"])
|
|
goal_diff = score_home - score_away
|
|
out[key] = {
|
|
"goal_diff": goal_diff,
|
|
"result": f"{score_home}-{score_away}",
|
|
"match_date_ms": int(row["mst_utc"] or 0),
|
|
}
|
|
if len(out) >= len(pair_keys):
|
|
break
|
|
|
|
return out
|
|
|
|
@staticmethod
|
|
def _watchlist_risk_band(score: float) -> str:
|
|
if score >= 68.0:
|
|
return "HIGH"
|
|
if score >= 54.0:
|
|
return "MEDIUM"
|
|
return "LOW"
|