""" ELO Rating System V2 - Venue-Adjusted & League-Weighted V9 Model için geliştirilmiş ELO sistemi. V1'den Farklar: - Lig kalitesi faktörü (Premier League vs küçük lig) - Form decay (son maçlar daha etkili) - Venue-adjusted ELO (ev/deplasman ayrı) - Win probability hesaplama """ import os import json from typing import Dict, Optional, Tuple from dataclasses import dataclass, asdict, field from datetime import datetime try: import psycopg2 except ImportError: psycopg2 = None MODELS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'models') @dataclass class TeamELO: """Takım ELO profili - Geliştirilmiş""" team_id: str team_name: str = "" # Ana ELO'lar overall_elo: float = 1500.0 home_elo: float = 1500.0 away_elo: float = 1500.0 # Form ELO (son 5 maça göre) form_elo: float = 1500.0 # Meta matches_played: int = 0 home_matches: int = 0 away_matches: int = 0 wins: int = 0 draws: int = 0 losses: int = 0 last_updated: Optional[str] = None # Son 5 maç formu (W/D/L sequence) recent_form: str = "" def win_rate(self) -> float: if self.matches_played == 0: return 0.0 return self.wins / self.matches_played def to_features(self) -> Dict[str, float]: return { 'elo_overall': self.overall_elo, 'elo_home': self.home_elo, 'elo_away': self.away_elo, 'elo_form': self.form_elo, 'elo_matches': self.matches_played, 'elo_win_rate': self.win_rate(), } # Lig kalitesi faktörleri (1.0 = ortalama) LEAGUE_QUALITY = { # Top 5 Avrupa Ligleri "premier league": 1.15, "premier lig": 1.15, "la liga": 1.12, "bundesliga": 1.10, "serie a": 1.08, "ligue 1": 1.05, # Güçlü ligler "eredivisie": 1.02, "primeira liga": 1.02, "süper lig": 1.00, # Avrupa kupaları "champions league": 1.20, "şampiyonlar ligi": 1.20, "europa league": 1.10, "avrupa ligi": 1.10, "conference league": 1.00, # Orta ligler "championship": 0.95, "2. bundesliga": 0.92, "serie b": 0.90, "la liga 2": 0.90, # Küçük ligler "default": 0.85, } class ELORatingSystem: """ ELO Rating System V2 - Venue-Adjusted & League-Weighted Yenilikler: - Ev/Deplasman ayrı ELO takibi - Lig kalitesi faktörü - Form ELO (son 5 maç ağırlıklı) - Gol farkına göre K-faktör ayarı """ # ELO parametreleri K_FACTOR_BASE = 32 # Temel K faktörü K_FACTOR_NEW_TEAM = 48 # Yeni takımlar için daha yüksek (ilk 20 maç) HOME_ADVANTAGE = 65 # Ev sahibi avantajı (ELO cinsinden) INITIAL_ELO = 1500 FORM_WEIGHT = 0.7 # Form ELO için son maç ağırlığı def __init__(self): self.ratings: Dict[str, TeamELO] = {} self.league_cache: Dict[str, str] = {} # team_id -> league_name self.conn = None self._load_ratings() def _connect_db(self): if psycopg2 is None: return None try: from data.db import get_clean_dsn self.conn = psycopg2.connect(get_clean_dsn()) return self.conn except Exception as e: print(f"[ELO] DB connection failed: {e}") return None def get_conn(self): if self.conn is None or self.conn.closed: self._connect_db() return self.conn def _load_ratings(self): """Rating'leri yükle — önce DB, sonra JSON fallback""" if self._load_ratings_from_db(): return self._load_ratings_from_json() def _load_ratings_from_db(self) -> bool: """team_elo_ratings tablosundan rating'leri yükle""" conn = self.get_conn() if conn is None: return False try: cur = conn.cursor() cur.execute(""" SELECT ter.team_id, t.name, ter.overall_elo, ter.home_elo, ter.away_elo, ter.form_elo, ter.matches_played, ter.recent_form FROM team_elo_ratings ter LEFT JOIN teams t ON ter.team_id = t.id """) rows = cur.fetchall() cur.close() if not rows: return False for row in rows: tid, name, overall, home, away, form, played, recent = row self.ratings[str(tid)] = TeamELO( team_id=str(tid), team_name=name or "", overall_elo=float(overall), home_elo=float(home), away_elo=float(away), form_elo=float(form), matches_played=int(played), recent_form=recent or [], ) print(f"[OK] ELO V2 ratings DB'den yuklendi ({len(self.ratings)} takim)") return True except Exception as e: print(f"[WARN] ELO DB yuklenemedi, JSON'a dusuyuyor: {e}") return False def _load_ratings_from_json(self): """JSON dosyasından rating'leri yükle (fallback)""" ratings_path = os.path.join(MODELS_DIR, 'elo_ratings_v2.json') if os.path.exists(ratings_path): try: with open(ratings_path, 'r', encoding='utf-8') as f: data = json.load(f) for team_id, rating_data in data.items(): self.ratings[team_id] = TeamELO(**rating_data) print(f"[OK] ELO V2 ratings JSON'dan yuklendi ({len(self.ratings)} takim)") except Exception as e: print(f"[WARN] ELO V2 ratings yuklenemedi: {e}") def save_ratings(self): """Rating'leri kaydet""" ratings_path = os.path.join(MODELS_DIR, 'elo_ratings_v2.json') os.makedirs(MODELS_DIR, exist_ok=True) data = {team_id: asdict(elo) for team_id, elo in self.ratings.items()} with open(ratings_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"💾 ELO V2 ratings kaydedildi ({len(self.ratings)} takım)") def get_or_create_rating(self, team_id: str, team_name: str = "") -> TeamELO: """Takımın ELO'sunu getir veya oluştur""" if team_id not in self.ratings: self.ratings[team_id] = TeamELO(team_id=team_id, team_name=team_name) return self.ratings[team_id] def get_league_quality(self, league_name: str) -> float: """Lig kalitesi faktörünü döndür""" if not league_name: return LEAGUE_QUALITY["default"] league_lower = league_name.lower() for key, quality in LEAGUE_QUALITY.items(): if key in league_lower: return quality return LEAGUE_QUALITY["default"] def expected_score(self, rating_a: float, rating_b: float) -> float: """ A'nın B'ye karşı beklenen skoru (0-1 arası). 1 = kesin kazanır, 0.5 = eşit, 0 = kesin kaybeder """ return 1 / (1 + 10 ** ((rating_b - rating_a) / 400)) def get_k_factor(self, team_elo: TeamELO, goal_diff: int, league_quality: float = 1.0) -> float: """ Dinamik K-faktörü hesapla. - Yeni takımlar için yüksek (hızlı adaptasyon) - Gol farkı yüksekse yüksek - Kaliteli liglerde yüksek """ # Temel K if team_elo.matches_played < 20: k = self.K_FACTOR_NEW_TEAM else: k = self.K_FACTOR_BASE # Gol farkı çarpanı if goal_diff == 1: goal_mult = 1.0 elif goal_diff == 2: goal_mult = 1.25 elif goal_diff == 3: goal_mult = 1.5 else: goal_mult = 1.75 + (goal_diff - 3) * 0.1 # Lig kalitesi çarpanı return k * goal_mult * league_quality def update_after_match( self, home_id: str, away_id: str, home_goals: int, away_goals: int, home_name: str = "", away_name: str = "", league_name: str = "" ): """Maç sonrası ELO güncelle""" home_elo = self.get_or_create_rating(home_id, home_name) away_elo = self.get_or_create_rating(away_id, away_name) # Gerçek skor if home_goals > away_goals: actual_home, actual_away = 1.0, 0.0 home_elo.wins += 1 away_elo.losses += 1 result_home, result_away = 'W', 'L' elif home_goals < away_goals: actual_home, actual_away = 0.0, 1.0 home_elo.losses += 1 away_elo.wins += 1 result_home, result_away = 'L', 'W' else: actual_home, actual_away = 0.5, 0.5 home_elo.draws += 1 away_elo.draws += 1 result_home, result_away = 'D', 'D' goal_diff = abs(home_goals - away_goals) league_quality = self.get_league_quality(league_name) # K faktörleri k_home = self.get_k_factor(home_elo, goal_diff, league_quality) k_away = self.get_k_factor(away_elo, goal_diff, league_quality) # -- Overall ELO -- expected_home = self.expected_score( home_elo.overall_elo + self.HOME_ADVANTAGE, away_elo.overall_elo ) home_elo.overall_elo += k_home * (actual_home - expected_home) away_elo.overall_elo += k_away * (actual_away - (1 - expected_home)) # -- Venue-Specific ELO -- expected_home_venue = self.expected_score(home_elo.home_elo, away_elo.away_elo) home_elo.home_elo += k_home * (actual_home - expected_home_venue) away_elo.away_elo += k_away * (actual_away - (1 - expected_home_venue)) # -- Form ELO (son maçlar daha ağırlıklı) -- home_elo.form_elo = ( home_elo.form_elo * (1 - self.FORM_WEIGHT) + (1500 + (actual_home - 0.5) * 100) * self.FORM_WEIGHT ) away_elo.form_elo = ( away_elo.form_elo * (1 - self.FORM_WEIGHT) + (1500 + (actual_away - 0.5) * 100) * self.FORM_WEIGHT ) # Meta güncelle home_elo.matches_played += 1 away_elo.matches_played += 1 home_elo.home_matches += 1 away_elo.away_matches += 1 # Son 5 form güncelle home_elo.recent_form = (result_home + home_elo.recent_form)[:5] away_elo.recent_form = (result_away + away_elo.recent_form)[:5] home_elo.last_updated = datetime.now().isoformat() away_elo.last_updated = datetime.now().isoformat() def predict_match(self, home_id: str, away_id: str) -> Dict[str, float]: """ Maç için kazanma olasılıklarını tahmin et. """ home_elo = self.get_or_create_rating(home_id) away_elo = self.get_or_create_rating(away_id) # Overall bazlı exp_home_overall = self.expected_score( home_elo.overall_elo + self.HOME_ADVANTAGE, away_elo.overall_elo ) # Venue bazlı exp_home_venue = self.expected_score( home_elo.home_elo, away_elo.away_elo ) # Kombine (ortama) home_prob = (exp_home_overall + exp_home_venue) / 2 # Draw tahmini (ELO farkı küçükse daha yüksek) elo_diff = abs(home_elo.overall_elo - away_elo.overall_elo) draw_base = 0.25 # Temel beraberlik oranı draw_prob = draw_base * (1 - elo_diff / 800) # Fark arttıkça beraberlik azalır draw_prob = max(0.15, min(draw_prob, 0.35)) # Normalize remaining = 1 - draw_prob home_win = home_prob * remaining away_win = (1 - home_prob) * remaining return { "home_win": round(home_win, 3), "draw": round(draw_prob, 3), "away_win": round(away_win, 3), } def get_match_features(self, home_id: str, away_id: str) -> Dict[str, float]: """Model için ELO feature'larını döndür""" home_elo = self.get_or_create_rating(home_id) away_elo = self.get_or_create_rating(away_id) probs = self.predict_match(home_id, away_id) # Form encode (WWWDL -> sayısal) def form_to_score(form: str) -> float: if not form: return 0.5 score = 0 for char in form: if char == 'W': score += 1 elif char == 'D': score += 0.5 return score / max(len(form), 1) return { # Overall ELO 'elo_home_overall': home_elo.overall_elo, 'elo_away_overall': away_elo.overall_elo, 'elo_diff_overall': home_elo.overall_elo - away_elo.overall_elo, # Venue-Specific ELO 'elo_home_venue': home_elo.home_elo, 'elo_away_venue': away_elo.away_elo, 'elo_diff_venue': home_elo.home_elo - away_elo.away_elo, # Form ELO 'elo_home_form': home_elo.form_elo, 'elo_away_form': away_elo.form_elo, 'elo_diff_form': home_elo.form_elo - away_elo.form_elo, # Win probabilities 'elo_prob_home': probs['home_win'], 'elo_prob_draw': probs['draw'], 'elo_prob_away': probs['away_win'], # Experience 'elo_home_matches': min(home_elo.matches_played, 100), 'elo_away_matches': min(away_elo.matches_played, 100), # Form score 'elo_home_form_score': form_to_score(home_elo.recent_form), 'elo_away_form_score': form_to_score(away_elo.recent_form), # Win rates 'elo_home_win_rate': home_elo.win_rate(), 'elo_away_win_rate': away_elo.win_rate(), } def save_ratings_to_db(self): """Rating'leri team_elo_ratings tablosuna yaz (upsert)""" conn = self.get_conn() if conn is None: print("❌ DB bağlantısı yok, DB'ye yazılamadı!") return cur = conn.cursor() batch_size = 500 teams = list(self.ratings.values()) written = 0 for i in range(0, len(teams), batch_size): batch = teams[i:i + batch_size] values = [] for elo in batch: values.append(cur.mogrify( "(%s, %s, %s, %s, %s, %s, %s, NOW())", ( elo.team_id, round(elo.overall_elo, 2), round(elo.home_elo, 2), round(elo.away_elo, 2), round(elo.form_elo, 2), elo.matches_played, elo.recent_form[:5], ) ).decode('utf-8')) sql = """ INSERT INTO team_elo_ratings (team_id, overall_elo, home_elo, away_elo, form_elo, matches_played, recent_form, updated_at) VALUES {} ON CONFLICT (team_id) DO UPDATE SET overall_elo = EXCLUDED.overall_elo, home_elo = EXCLUDED.home_elo, away_elo = EXCLUDED.away_elo, form_elo = EXCLUDED.form_elo, matches_played = EXCLUDED.matches_played, recent_form = EXCLUDED.recent_form, updated_at = EXCLUDED.updated_at """.format(", ".join(values)) cur.execute(sql) written += len(batch) conn.commit() cur.close() print(f"💾 DB'ye {written} takım ELO yazıldı (team_elo_ratings)") def _load_top_league_ids(self) -> set: """top_leagues.json'dan lig ID'lerini oku""" paths = [ os.path.join(os.path.dirname(__file__), '..', '..', 'top_leagues.json'), os.path.join(os.path.dirname(__file__), '..', 'top_leagues.json'), ] for p in paths: if os.path.exists(p): with open(p) as f: ids = set(json.load(f)) print(f"📋 {len(ids)} top lig yüklendi ({os.path.basename(p)})") return ids print("⚠️ top_leagues.json bulunamadı — tüm maçlar yazılacak") return set() def calculate_all_from_history(self, sport: str = 'football'): """Tüm tarihsel maçlardan ELO hesapla, top ligleri match_ai_features'a yaz""" print(f"\n🔄 {sport.upper()} için ELO V2 hesaplanıyor...") conn = self.get_conn() if conn is None: print("❌ DB bağlantısı yok!") return top_league_ids = self._load_top_league_ids() cur = conn.cursor() # Tüm bitmiş maçları tarih sırasına göre al (m.id ve league_id dahil) cur.execute(""" SELECT m.id, m.home_team_id, m.away_team_id, m.score_home, m.score_away, m.league_id, t1.name as home_name, t2.name as away_name, l.name as league_name FROM matches m LEFT JOIN teams t1 ON m.home_team_id = t1.id LEFT JOIN teams t2 ON m.away_team_id = t2.id LEFT JOIN leagues l ON m.league_id = l.id WHERE m.sport = %s AND m.score_home IS NOT NULL AND m.score_away IS NOT NULL ORDER BY m.mst_utc ASC """, (sport,)) matches = cur.fetchall() print(f"📊 {len(matches):,} maç işlenecek...") BATCH_SIZE = 1000 batch: list = [] processed = 0 written = 0 for match in matches: (match_id, home_id, away_id, score_h, score_a, league_id, home_name, away_name, league) = match if not (home_id and away_id): continue # Sadece top ligler için pre-match ELO kaydet if not top_league_ids or league_id in top_league_ids: home_elo_obj = self.get_or_create_rating(home_id, home_name or "") away_elo_obj = self.get_or_create_rating(away_id, away_name or "") batch.append(( match_id, home_elo_obj.overall_elo, away_elo_obj.overall_elo, home_elo_obj.home_elo, away_elo_obj.away_elo, home_elo_obj.form_elo, away_elo_obj.form_elo, )) # Tüm maçlar için ELO güncelle self.update_after_match( home_id, away_id, score_h, score_a, home_name or "", away_name or "", league or "" ) processed += 1 if len(batch) >= BATCH_SIZE: self._flush_elo_batch(cur, batch, sport) conn.commit() written += len(batch) batch.clear() if processed % 10000 == 0: print(f" İşlenen: {processed:,} / {len(matches):,}") # Kalan batch'i yaz if batch: self._flush_elo_batch(cur, batch, sport) conn.commit() written += len(batch) cur.close() print(f"✅ {processed:,} maç işlendi, {len(self.ratings)} takım") print(f"📝 {written:,} maç match_ai_features'a yazıldı") # JSON'a kaydet self.save_ratings() # DB'ye kaydet self.save_ratings_to_db() # Top 20 takımı göster self._show_top_teams() @staticmethod def _flush_elo_batch(cur, batch: list, sport: str = 'football') -> None: """Batch upsert pre-match ELO values into sport-partitioned ai_features table.""" from psycopg2.extras import execute_values table_name = 'football_ai_features' if sport == 'football' else 'basketball_ai_features' sql = f""" INSERT INTO {table_name} (match_id, home_elo, away_elo, home_home_elo, away_away_elo, home_form_elo, away_form_elo, calculator_ver, updated_at) VALUES %s ON CONFLICT (match_id) DO UPDATE SET home_elo = EXCLUDED.home_elo, away_elo = EXCLUDED.away_elo, home_home_elo = EXCLUDED.home_home_elo, away_away_elo = EXCLUDED.away_away_elo, home_form_elo = EXCLUDED.home_form_elo, away_form_elo = EXCLUDED.away_form_elo, calculator_ver = EXCLUDED.calculator_ver, updated_at = EXCLUDED.updated_at """ now = datetime.now().isoformat() values = [ (mid, h_elo, a_elo, hh_elo, aa_elo, hf_elo, af_elo, 'elo_v2_backfill', now) for mid, h_elo, a_elo, hh_elo, aa_elo, hf_elo, af_elo in batch ] execute_values(cur, sql, values, page_size=500) def _show_top_teams(self, n: int = 20): """En güçlü takımları göster""" sorted_teams = sorted( self.ratings.items(), key=lambda x: x[1].overall_elo, reverse=True )[:n] print(f"\n🏆 Top {n} Takım (ELO V2):") for i, (team_id, elo) in enumerate(sorted_teams, 1): name = elo.team_name[:25] if elo.team_name else team_id[:25] print(f" {i:2}. {name:25} → {elo.overall_elo:.0f} (H:{elo.home_elo:.0f} A:{elo.away_elo:.0f})") # Singleton _system = None def get_elo_system() -> ELORatingSystem: global _system if _system is None: _system = ELORatingSystem() return _system if __name__ == "__main__": import sys from pathlib import Path # Ensure ai-engine root is on sys.path (for `from data.db import ...`) _AI_ENGINE_ROOT = Path(__file__).resolve().parent.parent if str(_AI_ENGINE_ROOT) not in sys.path: sys.path.insert(0, str(_AI_ENGINE_ROOT)) system = get_elo_system() if len(sys.argv) > 1 and sys.argv[1] == 'calculate': system.calculate_all_from_history('football') else: print("\n🧪 ELO V2 Test") print("Kullanım: python elo_system.py calculate") print(f"\n📊 Yüklü takım sayısı: {len(system.ratings)}") if len(system.ratings) > 0: system._show_top_teams(10)