""" Feature Enrichment Service =========================== Computes real statistical features from DB for V25 model input. Replaces hardcoded defaults in `_build_v25_features()` with rolling averages from football_team_stats, matches, match_officials, and match_player_events tables. Each method receives a psycopg2 cursor + params and returns a dict. All methods are fail-safe: they return sensible defaults when data is missing or queries fail. """ from __future__ import annotations from typing import Any, Dict, Optional, Tuple from psycopg2.extras import RealDictCursor class FeatureEnrichmentService: """Stateless service — all state comes from DB via cursor.""" # ─── Default fallback values ───────────────────────────────────── _DEFAULT_TEAM_STATS = { 'avg_possession': 50.0, 'avg_shots_on_target': 4.0, 'shot_conversion': 0.1, 'avg_corners': 5.0, } _DEFAULT_H2H = { 'total_matches': 0, 'home_win_rate': 0.33, 'draw_rate': 0.33, 'avg_goals': 2.5, 'btts_rate': 0.5, 'over25_rate': 0.5, # V27 expanded 'home_goals_avg': 1.3, 'away_goals_avg': 1.1, 'recent_trend': 0.0, 'venue_advantage': 0.0, } _DEFAULT_FORM = { 'clean_sheet_rate': 0.2, 'scoring_rate': 0.8, 'winning_streak': 0, 'unbeaten_streak': 0, } _DEFAULT_REFEREE = { 'home_bias': 0.0, 'avg_goals': 2.5, 'cards_total': 4.0, 'avg_yellow': 3.0, 'experience': 0, } _DEFAULT_LEAGUE = { 'avg_goals': 2.7, 'zero_goal_rate': 0.07, # V27 expanded 'home_win_rate': 0.46, 'draw_rate': 0.26, 'btts_rate': 0.50, 'ou25_rate': 0.50, 'reliability_score': 0.0, } _DEFAULT_ROLLING = { 'rolling5_goals': 1.3, 'rolling5_conceded': 1.2, 'rolling10_goals': 1.3, 'rolling10_conceded': 1.2, 'rolling20_goals': 1.3, 'rolling20_conceded': 1.2, 'rolling5_cs': 0.2, } _DEFAULT_VENUE = { 'venue_goals': 1.4, 'venue_conceded': 1.1, } # ─── 1. Team Stats ────────────────────────────────────────────── def compute_team_stats( self, cur: RealDictCursor, team_id: str, before_date_ms: int, limit: int = 10, ) -> Dict[str, float]: """ Rolling averages from football_team_stats for a team's last N matches. Returns avg_possession, avg_shots_on_target, shot_conversion, avg_corners. """ if not team_id: return dict(self._DEFAULT_TEAM_STATS) try: cur.execute( """ SELECT mts.possession_percentage, mts.shots_on_target, mts.total_shots, mts.corners FROM football_team_stats mts JOIN matches m ON m.id = mts.match_id WHERE mts.team_id = %s AND m.status = 'FT' AND m.mst_utc < %s AND m.sport = 'football' AND mts.possession_percentage IS NOT NULL AND mts.possession_percentage > 0 ORDER BY m.mst_utc DESC LIMIT %s """, (team_id, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_TEAM_STATS) if not rows: return dict(self._DEFAULT_TEAM_STATS) possession_vals = [] sot_vals = [] conversion_vals = [] corner_vals = [] for row in rows: poss = row.get('possession_percentage') if poss is not None: possession_vals.append(float(poss)) sot = row.get('shots_on_target') if sot is not None: sot_vals.append(float(sot)) total_shots = row.get('total_shots') if total_shots and sot and float(total_shots) > 0: conversion_vals.append(float(sot) / float(total_shots)) corners = row.get('corners') if corners is not None: corner_vals.append(float(corners)) return { 'avg_possession': _safe_avg(possession_vals, 50.0), 'avg_shots_on_target': _safe_avg(sot_vals, 4.0), 'shot_conversion': _safe_avg(conversion_vals, 0.1), 'avg_corners': _safe_avg(corner_vals, 5.0), } # ─── 2. Head-to-Head ──────────────────────────────────────────── def compute_h2h( self, cur: RealDictCursor, home_team_id: str, away_team_id: str, before_date_ms: int, limit: int = 20, ) -> Dict[str, float]: """ Historical head-to-head between two teams (both directions). Returns total_matches, home_win_rate, draw_rate, avg_goals, btts_rate, over25_rate. """ if not home_team_id or not away_team_id: return dict(self._DEFAULT_H2H) try: cur.execute( """ SELECT m.home_team_id, m.away_team_id, m.score_home, m.score_away FROM matches m WHERE m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s AND ( (m.home_team_id = %s AND m.away_team_id = %s) OR (m.home_team_id = %s AND m.away_team_id = %s) ) ORDER BY m.mst_utc DESC LIMIT %s """, ( before_date_ms, home_team_id, away_team_id, away_team_id, home_team_id, limit, ), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_H2H) if not rows: return dict(self._DEFAULT_H2H) total = len(rows) home_wins = 0 draws = 0 total_goals = 0 btts_count = 0 over25_count = 0 # V27 expanded trackers home_team_goals_list = [] away_team_goals_list = [] home_team_venue_wins = 0 home_team_venue_total = 0 away_team_venue_wins = 0 away_team_venue_total = 0 for row in rows: sh = int(row['score_home']) sa = int(row['score_away']) match_goals = sh + sa total_goals += match_goals # Normalise: who is "home team" in THIS prediction context if str(row['home_team_id']) == home_team_id: home_team_goals_list.append(sh) away_team_goals_list.append(sa) home_team_venue_total += 1 if sh > sa: home_wins += 1 home_team_venue_wins += 1 elif sh == sa: draws += 1 else: # Reversed fixture: away_team was at home home_team_goals_list.append(sa) away_team_goals_list.append(sh) away_team_venue_total += 1 if sa > sh: home_wins += 1 away_team_venue_wins += 1 elif sh == sa: draws += 1 if sh > 0 and sa > 0: btts_count += 1 if match_goals > 2: over25_count += 1 # V27: recent_trend = last-5 home_win_rate - first-5 home_win_rate recent_trend = 0.0 if total >= 6: recent_5_wins = sum( 1 for r in rows[:5] if (str(r['home_team_id']) == home_team_id and int(r['score_home']) > int(r['score_away'])) or (str(r['home_team_id']) != home_team_id and int(r['score_away']) > int(r['score_home'])) ) older_5_wins = sum( 1 for r in rows[-5:] if (str(r['home_team_id']) == home_team_id and int(r['score_home']) > int(r['score_away'])) or (str(r['home_team_id']) != home_team_id and int(r['score_away']) > int(r['score_home'])) ) recent_trend = (recent_5_wins - older_5_wins) / 5.0 # V27: venue_advantage = home_win_rate_at_home - home_win_rate_away venue_advantage = 0.0 if home_team_venue_total > 0 and away_team_venue_total > 0: venue_advantage = ( home_team_venue_wins / home_team_venue_total - away_team_venue_wins / away_team_venue_total ) return { 'total_matches': total, 'home_win_rate': home_wins / total, 'draw_rate': draws / total, 'avg_goals': total_goals / total, 'btts_rate': btts_count / total, 'over25_rate': over25_count / total, # V27 expanded 'home_goals_avg': _safe_avg(home_team_goals_list, 1.3), 'away_goals_avg': _safe_avg(away_team_goals_list, 1.1), 'recent_trend': round(recent_trend, 4), 'venue_advantage': round(venue_advantage, 4), } # ─── 3. Form & Streaks ────────────────────────────────────────── def compute_form_streaks( self, cur: RealDictCursor, team_id: str, before_date_ms: int, limit: int = 10, ) -> Dict[str, float]: """ Clean sheet rate, scoring rate, and current streaks. """ if not team_id: return dict(self._DEFAULT_FORM) try: cur.execute( """ SELECT m.home_team_id, m.away_team_id, m.score_home, m.score_away FROM matches m WHERE (m.home_team_id = %s OR m.away_team_id = %s) AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT %s """, (team_id, team_id, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_FORM) if not rows: return dict(self._DEFAULT_FORM) total = len(rows) clean_sheets = 0 scored_count = 0 winning_streak = 0 unbeaten_streak = 0 streak_broken_w = False streak_broken_u = False for row in rows: is_home = str(row['home_team_id']) == team_id goals_for = int(row['score_home'] if is_home else row['score_away']) goals_against = int(row['score_away'] if is_home else row['score_home']) if goals_against == 0: clean_sheets += 1 if goals_for > 0: scored_count += 1 # Streak counting (most recent first) won = goals_for > goals_against not_lost = goals_for >= goals_against if not streak_broken_w: if won: winning_streak += 1 else: streak_broken_w = True if not streak_broken_u: if not_lost: unbeaten_streak += 1 else: streak_broken_u = True return { 'clean_sheet_rate': clean_sheets / total, 'scoring_rate': scored_count / total, 'winning_streak': winning_streak, 'unbeaten_streak': unbeaten_streak, } # ─── 4. Referee Stats ─────────────────────────────────────────── def compute_referee_stats( self, cur: RealDictCursor, referee_name: Optional[str], before_date_ms: int, limit: int = 30, ) -> Dict[str, float]: """ Referee tendencies: home win bias, avg goals, card rates. Matches referee by name in match_officials (role_id=1 = Orta Hakem). """ if not referee_name: return dict(self._DEFAULT_REFEREE) try: # Get match IDs officiated by this referee cur.execute( """ SELECT m.home_team_id, m.score_home, m.score_away, m.id AS match_id FROM match_officials mo JOIN matches m ON m.id = mo.match_id WHERE mo.name = %s AND mo.role_id = 1 AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT %s """, (referee_name, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_REFEREE) if not rows: return dict(self._DEFAULT_REFEREE) total = len(rows) home_wins = 0 total_goals = 0 match_ids = [] for row in rows: sh = int(row['score_home']) sa = int(row['score_away']) total_goals += sh + sa if sh > sa: home_wins += 1 match_ids.append(row['match_id']) # Card stats from match_player_events total_yellows = 0.0 total_cards = 0.0 if match_ids: try: cur.execute( """ SELECT COUNT(*) FILTER (WHERE event_subtype = 'yc') AS yellows, COUNT(*) AS total_cards FROM match_player_events WHERE match_id = ANY(%s) AND event_type = 'card' """, (match_ids,), ) card_row = cur.fetchone() if card_row: total_yellows = float(card_row.get('yellows') or 0) total_cards = float(card_row.get('total_cards') or 0) except Exception: pass # home_bias: (actual home win rate) - 0.46 (league average ~46%) home_bias = (home_wins / total) - 0.46 return { 'home_bias': round(home_bias, 4), 'avg_goals': total_goals / total, 'cards_total': total_cards / total if total > 0 else 4.0, 'avg_yellow': total_yellows / total if total > 0 else 3.0, 'experience': total, } # ─── 5. League Averages ───────────────────────────────────────── def compute_league_averages( self, cur: RealDictCursor, league_id: Optional[str], before_date_ms: int, limit: int = 100, ) -> Dict[str, float]: """ League-wide scoring tendencies. """ if not league_id: return dict(self._DEFAULT_LEAGUE) try: cur.execute( """ SELECT m.score_home, m.score_away FROM matches m WHERE m.league_id = %s AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT %s """, (league_id, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_LEAGUE) if not rows: return dict(self._DEFAULT_LEAGUE) total = len(rows) total_goals = 0 zero_goal_matches = 0 home_wins = 0 draw_count = 0 btts_count = 0 over25_count = 0 for row in rows: sh = int(row['score_home']) sa = int(row['score_away']) match_goals = sh + sa total_goals += match_goals if match_goals == 0: zero_goal_matches += 1 if sh > sa: home_wins += 1 elif sh == sa: draw_count += 1 if sh > 0 and sa > 0: btts_count += 1 if match_goals > 2: over25_count += 1 return { 'avg_goals': total_goals / total, 'zero_goal_rate': zero_goal_matches / total, # V27 expanded 'home_win_rate': home_wins / total, 'draw_rate': draw_count / total, 'btts_rate': btts_count / total, 'ou25_rate': over25_count / total, 'reliability_score': min(total / 50.0, 1.0), } # ─── 6. Momentum ─────────────────────────────────────────────── def compute_momentum( self, cur: RealDictCursor, team_id: str, before_date_ms: int, limit: int = 5, ) -> float: """ Recency-weighted momentum score: W=3, D=1, L=-1. Returns normalised score in [-1.0, 1.0]. """ if not team_id: return 0.0 try: cur.execute( """ SELECT m.home_team_id, m.score_home, m.score_away FROM matches m WHERE (m.home_team_id = %s OR m.away_team_id = %s) AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT %s """, (team_id, team_id, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return 0.0 if not rows: return 0.0 total_count = len(rows) weighted_score = 0.0 max_possible = 0.0 for idx, row in enumerate(rows): weight = float(total_count - idx) # most recent = highest weight is_home = str(row['home_team_id']) == team_id gf = int(row['score_home'] if is_home else row['score_away']) ga = int(row['score_away'] if is_home else row['score_home']) if gf > ga: result_score = 3.0 elif gf == ga: result_score = 1.0 else: result_score = -1.0 weighted_score += result_score * weight max_possible += 3.0 * weight # max = all wins if max_possible <= 0: return 0.0 # Normalise to [-1.0, 1.0] return round(weighted_score / max_possible, 4) # ─── 7. Rolling Stats (V27) ───────────────────────────────────── def compute_rolling_stats( self, cur: RealDictCursor, team_id: str, before_date_ms: int, ) -> Dict[str, float]: """ Rolling goal averages and clean-sheet rates over the last 5/10/20 matches. Single DB query, three windows computed programmatically. """ if not team_id: return dict(self._DEFAULT_ROLLING) try: cur.execute( """ SELECT m.home_team_id, m.score_home, m.score_away FROM matches m WHERE (m.home_team_id = %s OR m.away_team_id = %s) AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT 20 """, (team_id, team_id, before_date_ms), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_ROLLING) if not rows: return dict(self._DEFAULT_ROLLING) goals = [] conceded = [] clean_sheets = [] for row in rows: is_home = str(row['home_team_id']) == team_id gf = int(row['score_home'] if is_home else row['score_away']) ga = int(row['score_away'] if is_home else row['score_home']) goals.append(gf) conceded.append(ga) clean_sheets.append(1 if ga == 0 else 0) n = len(goals) return { 'rolling5_goals': _safe_avg(goals[:5], 1.3), 'rolling5_conceded': _safe_avg(conceded[:5], 1.2), 'rolling10_goals': _safe_avg(goals[:min(10, n)], 1.3), 'rolling10_conceded': _safe_avg(conceded[:min(10, n)], 1.2), 'rolling20_goals': _safe_avg(goals[:n], 1.3), 'rolling20_conceded': _safe_avg(conceded[:n], 1.2), 'rolling5_cs': _safe_avg(clean_sheets[:5], 0.2), } # ─── 8. Venue Stats (V27) ────────────────────────────────────── def compute_venue_stats( self, cur: RealDictCursor, team_id: str, before_date_ms: int, is_home: bool = True, ) -> Dict[str, float]: """ Team goals scored/conceded at specific venue (home or away only). """ if not team_id: return dict(self._DEFAULT_VENUE) venue_col = 'home_team_id' if is_home else 'away_team_id' try: cur.execute( f""" SELECT m.score_home, m.score_away FROM matches m WHERE m.{venue_col} = %s AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT 20 """, (team_id, before_date_ms), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_VENUE) if not rows: return dict(self._DEFAULT_VENUE) goals = [] conceded_list = [] for row in rows: sh = int(row['score_home']) sa = int(row['score_away']) if is_home: goals.append(sh) conceded_list.append(sa) else: goals.append(sa) conceded_list.append(sh) return { 'venue_goals': _safe_avg(goals, 1.4), 'venue_conceded': _safe_avg(conceded_list, 1.1), } # ─── 9. Days Rest (V27) ──────────────────────────────────────── def compute_days_rest( self, cur: RealDictCursor, team_id: str, before_date_ms: int, ) -> float: """ Returns number of days since the team's last match. Default: 7.0 (one-week rest). """ if not team_id: return 7.0 try: cur.execute( """ SELECT m.mst_utc FROM matches m WHERE (m.home_team_id = %s OR m.away_team_id = %s) AND m.status = 'FT' AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT 1 """, (team_id, team_id, before_date_ms), ) row = cur.fetchone() except Exception: return 7.0 if not row or not row.get('mst_utc'): return 7.0 last_match_ms = int(row['mst_utc']) diff_days = (before_date_ms - last_match_ms) / (1000 * 86400) return round(max(0.0, min(diff_days, 30.0)), 1) # ─── Utility ──────────────────────────────────────────────────────── def _safe_avg(values: list, default: float) -> float: """Average with fallback for empty lists.""" if not values: return default return sum(values) / len(values)