""" Feature Enrichment Service =========================== Computes real statistical features from DB for V25 model input. Replaces hardcoded defaults in `_build_v25_features()` with rolling averages from football_team_stats, matches, match_officials, and match_player_events tables. Each method receives a psycopg2 cursor + params and returns a dict. All methods are fail-safe: they return sensible defaults when data is missing or queries fail. """ from __future__ import annotations from typing import Any, Dict, Optional, Tuple from psycopg2.extras import RealDictCursor class FeatureEnrichmentService: """Stateless service — all state comes from DB via cursor.""" # ─── Default fallback values ───────────────────────────────────── _DEFAULT_TEAM_STATS = { 'avg_possession': 50.0, 'avg_shots_on_target': 4.0, 'shot_conversion': 0.1, 'avg_corners': 5.0, } _DEFAULT_H2H = { 'total_matches': 0, 'home_win_rate': 0.33, 'draw_rate': 0.33, 'avg_goals': 2.5, 'btts_rate': 0.5, 'over25_rate': 0.5, } _DEFAULT_FORM = { 'clean_sheet_rate': 0.2, 'scoring_rate': 0.8, 'winning_streak': 0, 'unbeaten_streak': 0, } _DEFAULT_REFEREE = { 'home_bias': 0.0, 'avg_goals': 2.5, 'cards_total': 4.0, 'avg_yellow': 3.0, 'experience': 0, } _DEFAULT_LEAGUE = { 'avg_goals': 2.7, 'zero_goal_rate': 0.07, } # ─── 1. Team Stats ────────────────────────────────────────────── def compute_team_stats( self, cur: RealDictCursor, team_id: str, before_date_ms: int, limit: int = 10, ) -> Dict[str, float]: """ Rolling averages from football_team_stats for a team's last N matches. Returns avg_possession, avg_shots_on_target, shot_conversion, avg_corners. """ if not team_id: return dict(self._DEFAULT_TEAM_STATS) try: cur.execute( """ SELECT mts.possession_percentage, mts.shots_on_target, mts.total_shots, mts.corners FROM football_team_stats mts JOIN matches m ON m.id = mts.match_id WHERE mts.team_id = %s AND m.status = 'FT' AND m.mst_utc < %s AND m.sport = 'football' AND mts.possession_percentage IS NOT NULL AND mts.possession_percentage > 0 ORDER BY m.mst_utc DESC LIMIT %s """, (team_id, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_TEAM_STATS) if not rows: return dict(self._DEFAULT_TEAM_STATS) possession_vals = [] sot_vals = [] conversion_vals = [] corner_vals = [] for row in rows: poss = row.get('possession_percentage') if poss is not None: possession_vals.append(float(poss)) sot = row.get('shots_on_target') if sot is not None: sot_vals.append(float(sot)) total_shots = row.get('total_shots') if total_shots and sot and float(total_shots) > 0: conversion_vals.append(float(sot) / float(total_shots)) corners = row.get('corners') if corners is not None: corner_vals.append(float(corners)) return { 'avg_possession': _safe_avg(possession_vals, 50.0), 'avg_shots_on_target': _safe_avg(sot_vals, 4.0), 'shot_conversion': _safe_avg(conversion_vals, 0.1), 'avg_corners': _safe_avg(corner_vals, 5.0), } # ─── 2. Head-to-Head ──────────────────────────────────────────── def compute_h2h( self, cur: RealDictCursor, home_team_id: str, away_team_id: str, before_date_ms: int, limit: int = 20, ) -> Dict[str, float]: """ Historical head-to-head between two teams (both directions). Returns total_matches, home_win_rate, draw_rate, avg_goals, btts_rate, over25_rate. """ if not home_team_id or not away_team_id: return dict(self._DEFAULT_H2H) try: cur.execute( """ SELECT m.home_team_id, m.away_team_id, m.score_home, m.score_away FROM matches m WHERE m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s AND ( (m.home_team_id = %s AND m.away_team_id = %s) OR (m.home_team_id = %s AND m.away_team_id = %s) ) ORDER BY m.mst_utc DESC LIMIT %s """, ( before_date_ms, home_team_id, away_team_id, away_team_id, home_team_id, limit, ), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_H2H) if not rows: return dict(self._DEFAULT_H2H) total = len(rows) home_wins = 0 draws = 0 total_goals = 0 btts_count = 0 over25_count = 0 for row in rows: sh = int(row['score_home']) sa = int(row['score_away']) match_goals = sh + sa total_goals += match_goals # Normalise: who is "home team" in THIS prediction context if str(row['home_team_id']) == home_team_id: if sh > sa: home_wins += 1 elif sh == sa: draws += 1 else: # Reversed fixture: away_team was at home if sa > sh: home_wins += 1 elif sh == sa: draws += 1 if sh > 0 and sa > 0: btts_count += 1 if match_goals > 2: over25_count += 1 return { 'total_matches': total, 'home_win_rate': home_wins / total, 'draw_rate': draws / total, 'avg_goals': total_goals / total, 'btts_rate': btts_count / total, 'over25_rate': over25_count / total, } # ─── 3. Form & Streaks ────────────────────────────────────────── def compute_form_streaks( self, cur: RealDictCursor, team_id: str, before_date_ms: int, limit: int = 10, ) -> Dict[str, float]: """ Clean sheet rate, scoring rate, and current streaks. """ if not team_id: return dict(self._DEFAULT_FORM) try: cur.execute( """ SELECT m.home_team_id, m.away_team_id, m.score_home, m.score_away FROM matches m WHERE (m.home_team_id = %s OR m.away_team_id = %s) AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT %s """, (team_id, team_id, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_FORM) if not rows: return dict(self._DEFAULT_FORM) total = len(rows) clean_sheets = 0 scored_count = 0 winning_streak = 0 unbeaten_streak = 0 streak_broken_w = False streak_broken_u = False for row in rows: is_home = str(row['home_team_id']) == team_id goals_for = int(row['score_home'] if is_home else row['score_away']) goals_against = int(row['score_away'] if is_home else row['score_home']) if goals_against == 0: clean_sheets += 1 if goals_for > 0: scored_count += 1 # Streak counting (most recent first) won = goals_for > goals_against not_lost = goals_for >= goals_against if not streak_broken_w: if won: winning_streak += 1 else: streak_broken_w = True if not streak_broken_u: if not_lost: unbeaten_streak += 1 else: streak_broken_u = True return { 'clean_sheet_rate': clean_sheets / total, 'scoring_rate': scored_count / total, 'winning_streak': winning_streak, 'unbeaten_streak': unbeaten_streak, } # ─── 4. Referee Stats ─────────────────────────────────────────── def compute_referee_stats( self, cur: RealDictCursor, referee_name: Optional[str], before_date_ms: int, limit: int = 30, ) -> Dict[str, float]: """ Referee tendencies: home win bias, avg goals, card rates. Matches referee by name in match_officials (role_id=1 = Orta Hakem). """ if not referee_name: return dict(self._DEFAULT_REFEREE) try: # Get match IDs officiated by this referee cur.execute( """ SELECT m.home_team_id, m.score_home, m.score_away, m.id AS match_id FROM match_officials mo JOIN matches m ON m.id = mo.match_id WHERE mo.name = %s AND mo.role_id = 1 AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT %s """, (referee_name, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_REFEREE) if not rows: return dict(self._DEFAULT_REFEREE) total = len(rows) home_wins = 0 total_goals = 0 match_ids = [] for row in rows: sh = int(row['score_home']) sa = int(row['score_away']) total_goals += sh + sa if sh > sa: home_wins += 1 match_ids.append(row['match_id']) # Card stats from match_player_events total_yellows = 0.0 total_cards = 0.0 if match_ids: try: cur.execute( """ SELECT COUNT(*) FILTER (WHERE event_subtype = 'yc') AS yellows, COUNT(*) AS total_cards FROM match_player_events WHERE match_id = ANY(%s) AND event_type = 'card' """, (match_ids,), ) card_row = cur.fetchone() if card_row: total_yellows = float(card_row.get('yellows') or 0) total_cards = float(card_row.get('total_cards') or 0) except Exception: pass # home_bias: (actual home win rate) - 0.46 (league average ~46%) home_bias = (home_wins / total) - 0.46 return { 'home_bias': round(home_bias, 4), 'avg_goals': total_goals / total, 'cards_total': total_cards / total if total > 0 else 4.0, 'avg_yellow': total_yellows / total if total > 0 else 3.0, 'experience': total, } # ─── 5. League Averages ───────────────────────────────────────── def compute_league_averages( self, cur: RealDictCursor, league_id: Optional[str], before_date_ms: int, limit: int = 100, ) -> Dict[str, float]: """ League-wide scoring tendencies. """ if not league_id: return dict(self._DEFAULT_LEAGUE) try: cur.execute( """ SELECT m.score_home, m.score_away FROM matches m WHERE m.league_id = %s AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT %s """, (league_id, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return dict(self._DEFAULT_LEAGUE) if not rows: return dict(self._DEFAULT_LEAGUE) total = len(rows) total_goals = 0 zero_goal_matches = 0 for row in rows: sh = int(row['score_home']) sa = int(row['score_away']) match_goals = sh + sa total_goals += match_goals if match_goals == 0: zero_goal_matches += 1 return { 'avg_goals': total_goals / total, 'zero_goal_rate': zero_goal_matches / total, } # ─── 6. Momentum ─────────────────────────────────────────────── def compute_momentum( self, cur: RealDictCursor, team_id: str, before_date_ms: int, limit: int = 5, ) -> float: """ Recency-weighted momentum score: W=3, D=1, L=-1. Returns normalised score in [-1.0, 1.0]. """ if not team_id: return 0.0 try: cur.execute( """ SELECT m.home_team_id, m.score_home, m.score_away FROM matches m WHERE (m.home_team_id = %s OR m.away_team_id = %s) AND m.status = 'FT' AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL AND m.mst_utc < %s ORDER BY m.mst_utc DESC LIMIT %s """, (team_id, team_id, before_date_ms, limit), ) rows = cur.fetchall() except Exception: return 0.0 if not rows: return 0.0 total_count = len(rows) weighted_score = 0.0 max_possible = 0.0 for idx, row in enumerate(rows): weight = float(total_count - idx) # most recent = highest weight is_home = str(row['home_team_id']) == team_id gf = int(row['score_home'] if is_home else row['score_away']) ga = int(row['score_away'] if is_home else row['score_home']) if gf > ga: result_score = 3.0 elif gf == ga: result_score = 1.0 else: result_score = -1.0 weighted_score += result_score * weight max_possible += 3.0 * weight # max = all wins if max_possible <= 0: return 0.0 # Normalise to [-1.0, 1.0] return round(weighted_score / max_possible, 4) # ─── Utility ──────────────────────────────────────────────────────── def _safe_avg(values: list, default: float) -> float: """Average with fallback for empty lists.""" if not values: return default return sum(values) / len(values)