Files
iddaai-be/ai-engine/scripts/enrich_ai_features.py
T

460 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
AI Features Full Enrichment Script
====================================
Fills empty/default columns in football_ai_features that were not populated
by the original elo_backfill_v1 script.
Enriches: H2H, referee, team_stats, league_averages, form_streaks,
rolling_goals, implied_odds, and clean_sheet/scoring rates.
Usage:
python scripts/enrich_ai_features.py # enrich all
python scripts/enrich_ai_features.py --batch-size 500 # smaller batches
python scripts/enrich_ai_features.py --dry-run # preview only
python scripts/enrich_ai_features.py --force # re-enrich all rows
python scripts/enrich_ai_features.py --limit 1000 # process N rows max
Designed to be idempotent: uses ON CONFLICT upserts, skips already-enriched rows.
"""
from __future__ import annotations
import os
import sys
import time
import argparse
from typing import Any, Dict, List, Optional, Tuple
# Add ai-engine root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
from data.db import get_clean_dsn
from services.feature_enrichment import FeatureEnrichmentService
# ────────────────────────── constants ──────────────────────────
CALCULATOR_VER = 'enrichment_v2.0'
DEFAULT_BATCH_SIZE = 200
# ────────────────────────── helpers ────────────────────────────
def fetch_unenriched_matches(
conn: psycopg2.extensions.connection,
force: bool = False,
limit: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Fetch matches from football_ai_features that still have default values
in the enrichment columns (h2h_total=0 AND referee_avg_cards=0).
If force=True, fetches ALL rows regardless of current state.
"""
with conn.cursor(cursor_factory=RealDictCursor) as cur:
where_clause = "WHERE 1=1" if force else (
"WHERE (faf.h2h_total = 0 AND faf.referee_avg_cards = 0)"
)
limit_clause = f"LIMIT {limit}" if limit else ""
cur.execute(f"""
SELECT
faf.match_id,
m.home_team_id,
m.away_team_id,
m.mst_utc,
m.league_id,
m.score_home,
m.score_away
FROM football_ai_features faf
JOIN matches m ON m.id = faf.match_id
WHERE m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.sport = 'football'
AND ({where_clause.replace('WHERE ', '')})
ORDER BY m.mst_utc ASC
{limit_clause}
""")
return cur.fetchall()
def fetch_referee_for_match(
cur: RealDictCursor,
match_id: str,
) -> Optional[str]:
"""Get the head referee name for a match from match_officials."""
try:
cur.execute("""
SELECT mo.name
FROM match_officials mo
WHERE mo.match_id = %s
AND mo.role_id = 1
LIMIT 1
""", (match_id,))
row = cur.fetchone()
return row['name'] if row else None
except Exception:
return None
def fetch_implied_odds(
cur: RealDictCursor,
match_id: str,
) -> Dict[str, float]:
"""Get implied probabilities from odd_categories + odd_selections."""
defaults = {
'implied_home': 0.33,
'implied_draw': 0.33,
'implied_away': 0.33,
'implied_over25': 0.50,
'implied_btts_yes': 0.50,
'odds_overround': 0.0,
}
try:
cur.execute("""
SELECT oc.name AS cat_name, os.name AS sel_name, os.odd_value
FROM odd_selections os
JOIN odd_categories oc ON os.odd_category_db_id = oc.db_id
WHERE oc.match_id = %s
""", (match_id,))
rows = cur.fetchall()
except Exception:
return defaults
odds: Dict[str, float] = {}
for row in rows:
try:
cat = (row.get('cat_name') or '').lower().strip()
sel = (row.get('sel_name') or '').strip()
val = float(row.get('odd_value', 0))
if val <= 0:
continue
if cat == 'maç sonucu':
if sel == '1':
odds['ms_h'] = val
elif sel in ('0', 'X'):
odds['ms_d'] = val
elif sel == '2':
odds['ms_a'] = val
elif cat == '2,5 alt/üst':
if 'üst' in sel.lower():
odds['ou25_o'] = val
elif 'alt' in sel.lower():
odds['ou25_u'] = val
elif cat == 'karşılıklı gol':
if 'var' in sel.lower():
odds['btts_y'] = val
elif 'yok' in sel.lower():
odds['btts_n'] = val
except (ValueError, TypeError):
continue
# Compute implied probabilities
ms_h = odds.get('ms_h', 0)
ms_d = odds.get('ms_d', 0)
ms_a = odds.get('ms_a', 0)
if ms_h > 1.0 and ms_d > 1.0 and ms_a > 1.0:
raw_sum = 1 / ms_h + 1 / ms_d + 1 / ms_a
overround = raw_sum - 1.0
defaults['implied_home'] = round((1 / ms_h) / raw_sum, 4)
defaults['implied_draw'] = round((1 / ms_d) / raw_sum, 4)
defaults['implied_away'] = round((1 / ms_a) / raw_sum, 4)
defaults['odds_overround'] = round(overround, 4)
ou25_o = odds.get('ou25_o', 0)
ou25_u = odds.get('ou25_u', 0)
if ou25_o > 1.0 and ou25_u > 1.0:
raw_sum = 1 / ou25_o + 1 / ou25_u
defaults['implied_over25'] = round((1 / ou25_o) / raw_sum, 4)
btts_y = odds.get('btts_y', 0)
btts_n = odds.get('btts_n', 0)
if btts_y > 1.0 and btts_n > 1.0:
raw_sum = 1 / btts_y + 1 / btts_n
defaults['implied_btts_yes'] = round((1 / btts_y) / raw_sum, 4)
return defaults
def enrich_single_match(
enrichment: FeatureEnrichmentService,
cur: RealDictCursor,
match: Dict[str, Any],
) -> Dict[str, Any]:
"""
Compute all enrichment features for a single match and return
a dict ready for DB upsert.
"""
match_id = match['match_id']
home_id = str(match['home_team_id'])
away_id = str(match['away_team_id'])
mst_utc = int(match['mst_utc']) if match['mst_utc'] else 0
league_id = str(match['league_id']) if match['league_id'] else None
# 1. Team stats
home_stats = enrichment.compute_team_stats(cur, home_id, mst_utc)
away_stats = enrichment.compute_team_stats(cur, away_id, mst_utc)
# 2. H2H
h2h = enrichment.compute_h2h(cur, home_id, away_id, mst_utc)
# 3. Form & streaks
home_form = enrichment.compute_form_streaks(cur, home_id, mst_utc)
away_form = enrichment.compute_form_streaks(cur, away_id, mst_utc)
# 4. Referee
referee_name = fetch_referee_for_match(cur, match_id)
referee = enrichment.compute_referee_stats(cur, referee_name, mst_utc)
# 5. League averages
league = enrichment.compute_league_averages(cur, league_id, mst_utc)
# 6. Rolling stats (for goals avg)
home_rolling = enrichment.compute_rolling_stats(cur, home_id, mst_utc)
away_rolling = enrichment.compute_rolling_stats(cur, away_id, mst_utc)
# 7. Implied odds
implied = fetch_implied_odds(cur, match_id)
return {
'match_id': match_id,
# Team stats
'home_avg_possession': round(home_stats['avg_possession'], 2),
'away_avg_possession': round(away_stats['avg_possession'], 2),
'home_avg_shots_on_target': round(home_stats['avg_shots_on_target'], 2),
'away_avg_shots_on_target': round(away_stats['avg_shots_on_target'], 2),
'home_shot_conversion': round(home_stats['shot_conversion'], 4),
'away_shot_conversion': round(away_stats['shot_conversion'], 4),
'home_avg_corners': round(home_stats['avg_corners'], 2),
'away_avg_corners': round(away_stats['avg_corners'], 2),
# H2H
'h2h_total': h2h['total_matches'],
'h2h_home_win_rate': round(h2h['home_win_rate'], 4),
'h2h_avg_goals': round(h2h['avg_goals'], 2),
'h2h_over25_rate': round(h2h['over25_rate'], 4),
'h2h_btts_rate': round(h2h['btts_rate'], 4),
# Form
'home_clean_sheet_rate': round(home_form['clean_sheet_rate'], 4),
'away_clean_sheet_rate': round(away_form['clean_sheet_rate'], 4),
'home_scoring_rate': round(home_form['scoring_rate'], 4),
'away_scoring_rate': round(away_form['scoring_rate'], 4),
'home_win_streak': home_form['winning_streak'],
'away_win_streak': away_form['winning_streak'],
# Rolling goals
'home_goals_avg_5': round(home_rolling['rolling5_goals'], 2),
'away_goals_avg_5': round(away_rolling['rolling5_goals'], 2),
'home_conceded_avg_5': round(home_rolling['rolling5_conceded'], 2),
'away_conceded_avg_5': round(away_rolling['rolling5_conceded'], 2),
# Referee
'referee_avg_cards': round(referee['cards_total'], 2),
'referee_home_bias': round(referee['home_bias'], 4),
'referee_avg_goals': round(referee['avg_goals'], 2),
# League
'league_avg_goals': round(league['avg_goals'], 2),
'league_home_win_pct': round(league['home_win_rate'], 4),
'league_over25_pct': round(league['ou25_rate'], 4),
# Implied odds
'implied_home': implied['implied_home'],
'implied_draw': implied['implied_draw'],
'implied_away': implied['implied_away'],
'implied_over25': implied['implied_over25'],
'implied_btts_yes': implied['implied_btts_yes'],
'odds_overround': implied['odds_overround'],
# Missing players impact — default (no lineup data for historical)
'missing_players_impact': 0.0,
# Version
'calculator_ver': CALCULATOR_VER,
}
def flush_enrichment_batch(
conn: psycopg2.extensions.connection,
rows: List[Dict[str, Any]],
dry_run: bool,
) -> int:
"""Bulk upsert enriched features into football_ai_features."""
if not rows or dry_run:
return 0
columns = [
'match_id',
'home_avg_possession', 'away_avg_possession',
'home_avg_shots_on_target', 'away_avg_shots_on_target',
'home_shot_conversion', 'away_shot_conversion',
'home_avg_corners', 'away_avg_corners',
'h2h_total', 'h2h_home_win_rate', 'h2h_avg_goals',
'h2h_over25_rate', 'h2h_btts_rate',
'home_clean_sheet_rate', 'away_clean_sheet_rate',
'home_scoring_rate', 'away_scoring_rate',
'home_win_streak', 'away_win_streak',
'home_goals_avg_5', 'away_goals_avg_5',
'home_conceded_avg_5', 'away_conceded_avg_5',
'referee_avg_cards', 'referee_home_bias', 'referee_avg_goals',
'league_avg_goals', 'league_home_win_pct', 'league_over25_pct',
'implied_home', 'implied_draw', 'implied_away',
'implied_over25', 'implied_btts_yes', 'odds_overround',
'missing_players_impact', 'calculator_ver',
]
# Build update SET clause (skip match_id)
update_cols = [c for c in columns if c != 'match_id']
set_clause = ', '.join(f'{c} = EXCLUDED.{c}' for c in update_cols)
placeholders = ', '.join(['%s'] * len(columns))
values = [
tuple(row[c] for c in columns)
for row in rows
]
with conn.cursor() as cur:
execute_values(
cur,
f"""
INSERT INTO football_ai_features ({', '.join(columns)})
VALUES %s
ON CONFLICT (match_id) DO UPDATE SET
{set_clause},
updated_at = NOW()
""",
values,
template=f"({placeholders})",
page_size=200,
)
conn.commit()
return len(rows)
# ────────────────────────── main ───────────────────────────────
def run_enrichment(
batch_size: int,
dry_run: bool,
force: bool,
limit: Optional[int],
) -> None:
"""Core enrichment loop."""
dsn = get_clean_dsn()
conn = psycopg2.connect(dsn)
print(f"\n{'=' * 60}")
print(f"🧠 AI Features Full Enrichment — {CALCULATOR_VER}")
print(f" batch_size={batch_size} dry_run={dry_run} force={force}")
print(f"{'=' * 60}")
# 1. Fetch unenriched matches
t0 = time.time()
matches = fetch_unenriched_matches(conn, force=force, limit=limit)
print(f"\n📊 {len(matches):,} matches to enrich ({time.time() - t0:.1f}s)")
if not matches:
print("✅ Nothing to enrich — all rows already populated.")
conn.close()
return
# 2. Initialize enrichment service
enrichment = FeatureEnrichmentService()
# 3. Process in batches
total = len(matches)
processed = 0
written = 0
errors = 0
batch_buf: List[Dict[str, Any]] = []
t_start = time.time()
# Use a dedicated cursor with RealDictCursor for all enrichment queries
enrich_cur = conn.cursor(cursor_factory=RealDictCursor)
for idx, match in enumerate(matches):
try:
enriched = enrich_single_match(enrichment, enrich_cur, match)
batch_buf.append(enriched)
except Exception as e:
errors += 1
if errors <= 10:
print(f" ⚠️ Error enriching {match.get('match_id', '?')}: {e}")
processed += 1
# Flush batch
if len(batch_buf) >= batch_size:
flushed = flush_enrichment_batch(conn, batch_buf, dry_run)
written += flushed
batch_buf.clear()
# Progress reporting
if processed % 500 == 0:
elapsed = time.time() - t_start
rate = processed / elapsed if elapsed > 0 else 0
remaining = (total - processed) / rate if rate > 0 else 0
pct = processed / total * 100
print(
f" [{processed:>8,} / {total:,}] "
f"({pct:.1f}%) | {rate:.0f} matches/s | "
f"ETA: {remaining / 60:.1f} min | "
f"errors: {errors}"
)
# Flush remaining
if batch_buf:
flushed = flush_enrichment_batch(conn, batch_buf, dry_run)
written += flushed
enrich_cur.close()
elapsed = time.time() - t_start
print(f"\n{'=' * 60}")
print(f"✅ Enrichment complete:")
print(f" Processed: {processed:,} matches in {elapsed:.1f}s")
print(f" Written: {written:,} rows")
print(f" Errors: {errors:,}")
print(f" Rate: {processed / elapsed:.0f} matches/s")
print(f"{'=' * 60}")
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(
description="Enrich football_ai_features with H2H, referee, stats, and odds data"
)
parser.add_argument(
'--batch-size',
type=int,
default=DEFAULT_BATCH_SIZE,
help=f'DB insert batch size (default: {DEFAULT_BATCH_SIZE})',
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Compute features but do not write to DB',
)
parser.add_argument(
'--force',
action='store_true',
help='Re-enrich ALL rows, not just empty ones',
)
parser.add_argument(
'--limit',
type=int,
default=None,
help='Max number of matches to process',
)
args = parser.parse_args()
run_enrichment(
batch_size=args.batch_size,
dry_run=args.dry_run,
force=args.force,
limit=args.limit,
)
if __name__ == '__main__':
main()