Files
iddaai-be/ai-engine/services/feature_enrichment.py
T

891 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Feature Enrichment Service
===========================
Computes real statistical features from DB for V25 model input.
Replaces hardcoded defaults in `_build_v25_features()` with rolling
averages from football_team_stats, matches, match_officials, and
match_player_events tables.
Each method receives a psycopg2 cursor + params and returns a dict.
All methods are fail-safe: they return sensible defaults when data
is missing or queries fail.
"""
from __future__ import annotations
import unicodedata
from typing import Any, Dict, Optional, Tuple
from psycopg2.extras import RealDictCursor
# ─── Turkish Name Normalization ──────────────────────────────────
_TR_CHAR_MAP = str.maketrans(
'çÇğĞıİöÖşŞüÜâÂîÎûÛ',
'cCgGiIoOsSuUaAiIuU',
)
def _normalize_name(name: str) -> str:
"""
Normalize a Turkish referee name for fuzzy matching.
Strips accents, lowercases, removes extra whitespace, and maps
Turkish-specific characters to their ASCII equivalents.
"""
if not name:
return ''
# 1. Turkish-specific character mapping
normalized = name.translate(_TR_CHAR_MAP)
# 2. Unicode NFKD decomposition → strip combining marks
normalized = unicodedata.normalize('NFKD', normalized)
normalized = ''.join(
c for c in normalized if not unicodedata.combining(c)
)
# 3. Lowercase + collapse whitespace
return ' '.join(normalized.lower().split())
class FeatureEnrichmentService:
"""Stateless service — all state comes from DB via cursor."""
# ─── Default fallback values ─────────────────────────────────────
_DEFAULT_TEAM_STATS = {
'avg_possession': 50.0,
'avg_shots_on_target': 4.0,
'shot_conversion': 0.1,
'avg_corners': 5.0,
}
_DEFAULT_H2H = {
'total_matches': 0,
'home_win_rate': 0.33,
'draw_rate': 0.33,
'avg_goals': 2.5,
'btts_rate': 0.5,
'over25_rate': 0.5,
# V27 expanded
'home_goals_avg': 1.3,
'away_goals_avg': 1.1,
'recent_trend': 0.0,
'venue_advantage': 0.0,
}
_DEFAULT_FORM = {
'clean_sheet_rate': 0.2,
'scoring_rate': 0.8,
'winning_streak': 0,
'unbeaten_streak': 0,
}
_DEFAULT_REFEREE = {
'home_bias': 0.0,
'avg_goals': 2.5,
'cards_total': 4.0,
'avg_yellow': 3.0,
'experience': 0,
}
_DEFAULT_LEAGUE = {
'avg_goals': 2.7,
'zero_goal_rate': 0.07,
# V27 expanded
'home_win_rate': 0.46,
'draw_rate': 0.26,
'btts_rate': 0.50,
'ou25_rate': 0.50,
'reliability_score': 0.0,
}
_DEFAULT_ROLLING = {
'rolling5_goals': 1.3,
'rolling5_conceded': 1.2,
'rolling10_goals': 1.3,
'rolling10_conceded': 1.2,
'rolling20_goals': 1.3,
'rolling20_conceded': 1.2,
'rolling5_cs': 0.2,
}
_DEFAULT_VENUE = {
'venue_goals': 1.4,
'venue_conceded': 1.1,
}
# ─── 1. Team Stats ──────────────────────────────────────────────
def compute_team_stats(
self,
cur: RealDictCursor,
team_id: str,
before_date_ms: int,
limit: int = 10,
) -> Dict[str, float]:
"""
Rolling averages from football_team_stats for a team's last N matches.
Returns avg_possession, avg_shots_on_target, shot_conversion, avg_corners.
"""
if not team_id:
return dict(self._DEFAULT_TEAM_STATS)
try:
cur.execute(
"""
SELECT
mts.possession_percentage,
mts.shots_on_target,
mts.total_shots,
mts.corners
FROM football_team_stats mts
JOIN matches m ON m.id = mts.match_id
WHERE mts.team_id = %s
AND m.status = 'FT'
AND m.mst_utc < %s
AND m.sport = 'football'
AND mts.possession_percentage IS NOT NULL
AND mts.possession_percentage > 0
ORDER BY m.mst_utc DESC
LIMIT %s
""",
(team_id, before_date_ms, limit),
)
rows = cur.fetchall()
except Exception:
return dict(self._DEFAULT_TEAM_STATS)
if not rows:
return dict(self._DEFAULT_TEAM_STATS)
possession_vals = []
sot_vals = []
conversion_vals = []
corner_vals = []
for row in rows:
poss = row.get('possession_percentage')
if poss is not None:
possession_vals.append(float(poss))
sot = row.get('shots_on_target')
if sot is not None:
sot_vals.append(float(sot))
total_shots = row.get('total_shots')
if total_shots and sot and float(total_shots) > 0:
conversion_vals.append(float(sot) / float(total_shots))
corners = row.get('corners')
if corners is not None:
corner_vals.append(float(corners))
return {
'avg_possession': _safe_avg(possession_vals, 50.0),
'avg_shots_on_target': _safe_avg(sot_vals, 4.0),
'shot_conversion': _safe_avg(conversion_vals, 0.1),
'avg_corners': _safe_avg(corner_vals, 5.0),
}
# ─── 2. Head-to-Head ────────────────────────────────────────────
def compute_h2h(
self,
cur: RealDictCursor,
home_team_id: str,
away_team_id: str,
before_date_ms: int,
limit: int = 20,
) -> Dict[str, float]:
"""
Historical head-to-head between two teams (both directions).
Returns total_matches, home_win_rate, draw_rate, avg_goals,
btts_rate, over25_rate.
"""
if not home_team_id or not away_team_id:
return dict(self._DEFAULT_H2H)
try:
cur.execute(
"""
SELECT
m.home_team_id,
m.away_team_id,
m.score_home,
m.score_away
FROM matches m
WHERE m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
AND (
(m.home_team_id = %s AND m.away_team_id = %s) OR
(m.home_team_id = %s AND m.away_team_id = %s)
)
ORDER BY m.mst_utc DESC
LIMIT %s
""",
(
before_date_ms,
home_team_id, away_team_id,
away_team_id, home_team_id,
limit,
),
)
rows = cur.fetchall()
except Exception:
return dict(self._DEFAULT_H2H)
if not rows:
return dict(self._DEFAULT_H2H)
total = len(rows)
home_wins = 0
draws = 0
total_goals = 0
btts_count = 0
over25_count = 0
# V27 expanded trackers
home_team_goals_list = []
away_team_goals_list = []
home_team_venue_wins = 0
home_team_venue_total = 0
away_team_venue_wins = 0
away_team_venue_total = 0
for row in rows:
sh = int(row['score_home'])
sa = int(row['score_away'])
match_goals = sh + sa
total_goals += match_goals
# Normalise: who is "home team" in THIS prediction context
if str(row['home_team_id']) == home_team_id:
home_team_goals_list.append(sh)
away_team_goals_list.append(sa)
home_team_venue_total += 1
if sh > sa:
home_wins += 1
home_team_venue_wins += 1
elif sh == sa:
draws += 1
else:
# Reversed fixture: away_team was at home
home_team_goals_list.append(sa)
away_team_goals_list.append(sh)
away_team_venue_total += 1
if sa > sh:
home_wins += 1
away_team_venue_wins += 1
elif sh == sa:
draws += 1
if sh > 0 and sa > 0:
btts_count += 1
if match_goals > 2:
over25_count += 1
# V27: recent_trend = last-5 home_win_rate - first-5 home_win_rate
recent_trend = 0.0
if total >= 6:
recent_5_wins = sum(
1 for r in rows[:5]
if (str(r['home_team_id']) == home_team_id and int(r['score_home']) > int(r['score_away']))
or (str(r['home_team_id']) != home_team_id and int(r['score_away']) > int(r['score_home']))
)
older_5_wins = sum(
1 for r in rows[-5:]
if (str(r['home_team_id']) == home_team_id and int(r['score_home']) > int(r['score_away']))
or (str(r['home_team_id']) != home_team_id and int(r['score_away']) > int(r['score_home']))
)
recent_trend = (recent_5_wins - older_5_wins) / 5.0
# V27: venue_advantage = home_win_rate_at_home - home_win_rate_away
venue_advantage = 0.0
if home_team_venue_total > 0 and away_team_venue_total > 0:
venue_advantage = (
home_team_venue_wins / home_team_venue_total
- away_team_venue_wins / away_team_venue_total
)
return {
'total_matches': total,
'home_win_rate': home_wins / total,
'draw_rate': draws / total,
'avg_goals': total_goals / total,
'btts_rate': btts_count / total,
'over25_rate': over25_count / total,
# V27 expanded
'home_goals_avg': _safe_avg(home_team_goals_list, 1.3),
'away_goals_avg': _safe_avg(away_team_goals_list, 1.1),
'recent_trend': round(recent_trend, 4),
'venue_advantage': round(venue_advantage, 4),
}
# ─── 3. Form & Streaks ──────────────────────────────────────────
def compute_form_streaks(
self,
cur: RealDictCursor,
team_id: str,
before_date_ms: int,
limit: int = 10,
) -> Dict[str, float]:
"""
Clean sheet rate, scoring rate, and current streaks.
"""
if not team_id:
return dict(self._DEFAULT_FORM)
try:
cur.execute(
"""
SELECT
m.home_team_id,
m.away_team_id,
m.score_home,
m.score_away
FROM matches m
WHERE (m.home_team_id = %s OR m.away_team_id = %s)
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT %s
""",
(team_id, team_id, before_date_ms, limit),
)
rows = cur.fetchall()
except Exception:
return dict(self._DEFAULT_FORM)
if not rows:
return dict(self._DEFAULT_FORM)
total = len(rows)
clean_sheets = 0
scored_count = 0
winning_streak = 0
unbeaten_streak = 0
streak_broken_w = False
streak_broken_u = False
for row in rows:
is_home = str(row['home_team_id']) == team_id
goals_for = int(row['score_home'] if is_home else row['score_away'])
goals_against = int(row['score_away'] if is_home else row['score_home'])
if goals_against == 0:
clean_sheets += 1
if goals_for > 0:
scored_count += 1
# Streak counting (most recent first)
won = goals_for > goals_against
not_lost = goals_for >= goals_against
if not streak_broken_w:
if won:
winning_streak += 1
else:
streak_broken_w = True
if not streak_broken_u:
if not_lost:
unbeaten_streak += 1
else:
streak_broken_u = True
return {
'clean_sheet_rate': clean_sheets / total,
'scoring_rate': scored_count / total,
'winning_streak': winning_streak,
'unbeaten_streak': unbeaten_streak,
}
# ─── 4. Referee Stats ───────────────────────────────────────────
def compute_referee_stats(
self,
cur: RealDictCursor,
referee_name: Optional[str],
before_date_ms: int,
limit: int = 30,
) -> Dict[str, float]:
"""
Referee tendencies: home win bias, avg goals, card rates.
Matches referee by name in match_officials (role_id=1 = Orta Hakem).
Uses Turkish-aware fuzzy matching as a fallback when exact name
lookup returns zero results.
"""
if not referee_name:
return dict(self._DEFAULT_REFEREE)
rows = self._query_referee_matches(cur, referee_name, before_date_ms, limit)
# Fuzzy fallback: if exact match fails, try normalized name search
if not rows:
rows = self._fuzzy_referee_lookup(
cur, referee_name, before_date_ms, limit,
)
if not rows:
return dict(self._DEFAULT_REFEREE)
total = len(rows)
home_wins = 0
total_goals = 0
match_ids = []
for row in rows:
sh = int(row['score_home'])
sa = int(row['score_away'])
total_goals += sh + sa
if sh > sa:
home_wins += 1
match_ids.append(row['match_id'])
# Card stats from match_player_events
total_yellows = 0.0
total_cards = 0.0
if match_ids:
try:
cur.execute(
"""
SELECT
COUNT(*) FILTER (WHERE event_subtype = 'yc') AS yellows,
COUNT(*) AS total_cards
FROM match_player_events
WHERE match_id = ANY(%s)
AND event_type = 'card'
""",
(match_ids,),
)
card_row = cur.fetchone()
if card_row:
total_yellows = float(card_row.get('yellows') or 0)
total_cards = float(card_row.get('total_cards') or 0)
except Exception:
pass
# home_bias: (actual home win rate) - 0.46 (league average ~46%)
home_bias = (home_wins / total) - 0.46
return {
'home_bias': round(home_bias, 4),
'avg_goals': total_goals / total,
'cards_total': total_cards / total if total > 0 else 4.0,
'avg_yellow': total_yellows / total if total > 0 else 3.0,
'experience': total,
}
def _query_referee_matches(
self,
cur: RealDictCursor,
referee_name: str,
before_date_ms: int,
limit: int,
) -> list:
"""Exact-match referee lookup in match_officials."""
try:
cur.execute(
"""
SELECT
m.home_team_id,
m.score_home,
m.score_away,
m.id AS match_id
FROM match_officials mo
JOIN matches m ON m.id = mo.match_id
WHERE mo.name = %s
AND mo.role_id = 1
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT %s
""",
(referee_name, before_date_ms, limit),
)
return cur.fetchall()
except Exception:
return []
def _fuzzy_referee_lookup(
self,
cur: RealDictCursor,
referee_name: str,
before_date_ms: int,
limit: int,
) -> list:
"""
Fuzzy referee lookup using Turkish name normalization.
Strategy: fetch recent distinct referee names from match_officials,
normalize both the query name and each candidate, and pick the
best match. This handles common mismatches like:
- 'Hüseyin Göçek' vs 'Huseyin Gocek'
- 'Ali Palabıyık' vs 'Ali Palabiyik'
- Extra/missing middle initials
"""
normalized_query = _normalize_name(referee_name)
if not normalized_query:
return []
try:
# Fetch candidate referee names (distinct, recent, role=1)
cur.execute(
"""
SELECT DISTINCT mo.name
FROM match_officials mo
JOIN matches m ON m.id = mo.match_id
WHERE mo.role_id = 1
AND m.status = 'FT'
AND m.mst_utc < %s
ORDER BY mo.name
LIMIT 2000
""",
(before_date_ms,),
)
candidates = cur.fetchall()
except Exception:
return []
if not candidates:
return []
# Find best match by normalized name comparison
best_match: Optional[str] = None
best_score = 0.0
for cand_row in candidates:
cand_name = cand_row.get('name', '')
if not cand_name:
continue
normalized_cand = _normalize_name(cand_name)
# Exact normalized match
if normalized_cand == normalized_query:
best_match = cand_name
best_score = 1.0
break
# Substring containment (handles "First Last" vs "First M. Last")
if (
normalized_query in normalized_cand
or normalized_cand in normalized_query
):
containment_score = min(
len(normalized_query), len(normalized_cand)
) / max(len(normalized_query), len(normalized_cand))
if containment_score > best_score and containment_score > 0.6:
best_match = cand_name
best_score = containment_score
if not best_match:
return []
# Re-query with the resolved name
return self._query_referee_matches(
cur, best_match, before_date_ms, limit,
)
# ─── 5. League Averages ─────────────────────────────────────────
def compute_league_averages(
self,
cur: RealDictCursor,
league_id: Optional[str],
before_date_ms: int,
limit: int = 100,
) -> Dict[str, float]:
"""
League-wide scoring tendencies.
"""
if not league_id:
return dict(self._DEFAULT_LEAGUE)
try:
cur.execute(
"""
SELECT
m.score_home,
m.score_away
FROM matches m
WHERE m.league_id = %s
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT %s
""",
(league_id, before_date_ms, limit),
)
rows = cur.fetchall()
except Exception:
return dict(self._DEFAULT_LEAGUE)
if not rows:
return dict(self._DEFAULT_LEAGUE)
total = len(rows)
total_goals = 0
zero_goal_matches = 0
home_wins = 0
draw_count = 0
btts_count = 0
over25_count = 0
for row in rows:
sh = int(row['score_home'])
sa = int(row['score_away'])
match_goals = sh + sa
total_goals += match_goals
if match_goals == 0:
zero_goal_matches += 1
if sh > sa:
home_wins += 1
elif sh == sa:
draw_count += 1
if sh > 0 and sa > 0:
btts_count += 1
if match_goals > 2:
over25_count += 1
return {
'avg_goals': total_goals / total,
'zero_goal_rate': zero_goal_matches / total,
# V27 expanded
'home_win_rate': home_wins / total,
'draw_rate': draw_count / total,
'btts_rate': btts_count / total,
'ou25_rate': over25_count / total,
'reliability_score': min(total / 50.0, 1.0),
}
# ─── 6. Momentum ───────────────────────────────────────────────
def compute_momentum(
self,
cur: RealDictCursor,
team_id: str,
before_date_ms: int,
limit: int = 5,
) -> float:
"""
Recency-weighted momentum score: W=3, D=1, L=-1.
Returns normalised score in [-1.0, 1.0].
"""
if not team_id:
return 0.0
try:
cur.execute(
"""
SELECT
m.home_team_id,
m.score_home,
m.score_away
FROM matches m
WHERE (m.home_team_id = %s OR m.away_team_id = %s)
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT %s
""",
(team_id, team_id, before_date_ms, limit),
)
rows = cur.fetchall()
except Exception:
return 0.0
if not rows:
return 0.0
total_count = len(rows)
weighted_score = 0.0
max_possible = 0.0
for idx, row in enumerate(rows):
weight = float(total_count - idx) # most recent = highest weight
is_home = str(row['home_team_id']) == team_id
gf = int(row['score_home'] if is_home else row['score_away'])
ga = int(row['score_away'] if is_home else row['score_home'])
if gf > ga:
result_score = 3.0
elif gf == ga:
result_score = 1.0
else:
result_score = -1.0
weighted_score += result_score * weight
max_possible += 3.0 * weight # max = all wins
if max_possible <= 0:
return 0.0
# Normalise to [-1.0, 1.0]
return round(weighted_score / max_possible, 4)
# ─── 7. Rolling Stats (V27) ─────────────────────────────────────
def compute_rolling_stats(
self,
cur: RealDictCursor,
team_id: str,
before_date_ms: int,
) -> Dict[str, float]:
"""
Rolling goal averages and clean-sheet rates over the last 5/10/20 matches.
Single DB query, three windows computed programmatically.
"""
if not team_id:
return dict(self._DEFAULT_ROLLING)
try:
cur.execute(
"""
SELECT
m.home_team_id,
m.score_home,
m.score_away
FROM matches m
WHERE (m.home_team_id = %s OR m.away_team_id = %s)
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT 20
""",
(team_id, team_id, before_date_ms),
)
rows = cur.fetchall()
except Exception:
return dict(self._DEFAULT_ROLLING)
if not rows:
return dict(self._DEFAULT_ROLLING)
goals = []
conceded = []
clean_sheets = []
for row in rows:
is_home = str(row['home_team_id']) == team_id
gf = int(row['score_home'] if is_home else row['score_away'])
ga = int(row['score_away'] if is_home else row['score_home'])
goals.append(gf)
conceded.append(ga)
clean_sheets.append(1 if ga == 0 else 0)
n = len(goals)
return {
'rolling5_goals': _safe_avg(goals[:5], 1.3),
'rolling5_conceded': _safe_avg(conceded[:5], 1.2),
'rolling10_goals': _safe_avg(goals[:min(10, n)], 1.3),
'rolling10_conceded': _safe_avg(conceded[:min(10, n)], 1.2),
'rolling20_goals': _safe_avg(goals[:n], 1.3),
'rolling20_conceded': _safe_avg(conceded[:n], 1.2),
'rolling5_cs': _safe_avg(clean_sheets[:5], 0.2),
}
# ─── 8. Venue Stats (V27) ──────────────────────────────────────
def compute_venue_stats(
self,
cur: RealDictCursor,
team_id: str,
before_date_ms: int,
is_home: bool = True,
) -> Dict[str, float]:
"""
Team goals scored/conceded at specific venue (home or away only).
"""
if not team_id:
return dict(self._DEFAULT_VENUE)
venue_col = 'home_team_id' if is_home else 'away_team_id'
try:
cur.execute(
f"""
SELECT m.score_home, m.score_away
FROM matches m
WHERE m.{venue_col} = %s
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT 20
""",
(team_id, before_date_ms),
)
rows = cur.fetchall()
except Exception:
return dict(self._DEFAULT_VENUE)
if not rows:
return dict(self._DEFAULT_VENUE)
goals = []
conceded_list = []
for row in rows:
sh = int(row['score_home'])
sa = int(row['score_away'])
if is_home:
goals.append(sh)
conceded_list.append(sa)
else:
goals.append(sa)
conceded_list.append(sh)
return {
'venue_goals': _safe_avg(goals, 1.4),
'venue_conceded': _safe_avg(conceded_list, 1.1),
}
# ─── 9. Days Rest (V27) ────────────────────────────────────────
def compute_days_rest(
self,
cur: RealDictCursor,
team_id: str,
before_date_ms: int,
) -> float:
"""
Returns number of days since the team's last match.
Default: 7.0 (one-week rest).
"""
if not team_id:
return 7.0
try:
cur.execute(
"""
SELECT m.mst_utc
FROM matches m
WHERE (m.home_team_id = %s OR m.away_team_id = %s)
AND m.status = 'FT'
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT 1
""",
(team_id, team_id, before_date_ms),
)
row = cur.fetchone()
except Exception:
return 7.0
if not row or not row.get('mst_utc'):
return 7.0
last_match_ms = int(row['mst_utc'])
diff_days = (before_date_ms - last_match_ms) / (1000 * 86400)
return round(max(0.0, min(diff_days, 30.0)), 1)
# ─── Utility ────────────────────────────────────────────────────────
def _safe_avg(values: list, default: float) -> float:
"""Average with fallback for empty lists."""
if not values:
return default
return sum(values) / len(values)