Files
iddaai-be/ai-engine/features/elo_system.py
T
fahricansecer 2f0b85a0c7
Deploy Iddaai Backend / build-and-deploy (push) Failing after 18s
first (part 2: other directories)
2026-04-16 15:11:25 +03:00

656 lines
22 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ELO Rating System V2 - Venue-Adjusted & League-Weighted
V9 Model için geliştirilmiş ELO sistemi.
V1'den Farklar:
- Lig kalitesi faktörü (Premier League vs küçük lig)
- Form decay (son maçlar daha etkili)
- Venue-adjusted ELO (ev/deplasman ayrı)
- Win probability hesaplama
"""
import os
import json
from typing import Dict, Optional, Tuple
from dataclasses import dataclass, asdict, field
from datetime import datetime
try:
import psycopg2
except ImportError:
psycopg2 = None
MODELS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'models')
@dataclass
class TeamELO:
"""Takım ELO profili - Geliştirilmiş"""
team_id: str
team_name: str = ""
# Ana ELO'lar
overall_elo: float = 1500.0
home_elo: float = 1500.0
away_elo: float = 1500.0
# Form ELO (son 5 maça göre)
form_elo: float = 1500.0
# Meta
matches_played: int = 0
home_matches: int = 0
away_matches: int = 0
wins: int = 0
draws: int = 0
losses: int = 0
last_updated: Optional[str] = None
# Son 5 maç formu (W/D/L sequence)
recent_form: str = ""
def win_rate(self) -> float:
if self.matches_played == 0:
return 0.0
return self.wins / self.matches_played
def to_features(self) -> Dict[str, float]:
return {
'elo_overall': self.overall_elo,
'elo_home': self.home_elo,
'elo_away': self.away_elo,
'elo_form': self.form_elo,
'elo_matches': self.matches_played,
'elo_win_rate': self.win_rate(),
}
# Lig kalitesi faktörleri (1.0 = ortalama)
LEAGUE_QUALITY = {
# Top 5 Avrupa Ligleri
"premier league": 1.15,
"premier lig": 1.15,
"la liga": 1.12,
"bundesliga": 1.10,
"serie a": 1.08,
"ligue 1": 1.05,
# Güçlü ligler
"eredivisie": 1.02,
"primeira liga": 1.02,
"süper lig": 1.00,
# Avrupa kupaları
"champions league": 1.20,
"şampiyonlar ligi": 1.20,
"europa league": 1.10,
"avrupa ligi": 1.10,
"conference league": 1.00,
# Orta ligler
"championship": 0.95,
"2. bundesliga": 0.92,
"serie b": 0.90,
"la liga 2": 0.90,
# Küçük ligler
"default": 0.85,
}
class ELORatingSystem:
"""
ELO Rating System V2 - Venue-Adjusted & League-Weighted
Yenilikler:
- Ev/Deplasman ayrı ELO takibi
- Lig kalitesi faktörü
- Form ELO (son 5 maç ağırlıklı)
- Gol farkına göre K-faktör ayarı
"""
# ELO parametreleri
K_FACTOR_BASE = 32 # Temel K faktörü
K_FACTOR_NEW_TEAM = 48 # Yeni takımlar için daha yüksek (ilk 20 maç)
HOME_ADVANTAGE = 65 # Ev sahibi avantajı (ELO cinsinden)
INITIAL_ELO = 1500
FORM_WEIGHT = 0.7 # Form ELO için son maç ağırlığı
def __init__(self):
self.ratings: Dict[str, TeamELO] = {}
self.league_cache: Dict[str, str] = {} # team_id -> league_name
self.conn = None
self._load_ratings()
def _connect_db(self):
if psycopg2 is None:
return None
try:
from data.db import get_clean_dsn
self.conn = psycopg2.connect(get_clean_dsn())
return self.conn
except Exception as e:
print(f"[ELO] DB connection failed: {e}")
return None
def get_conn(self):
if self.conn is None or self.conn.closed:
self._connect_db()
return self.conn
def _load_ratings(self):
"""Rating'leri yükle — önce DB, sonra JSON fallback"""
if self._load_ratings_from_db():
return
self._load_ratings_from_json()
def _load_ratings_from_db(self) -> bool:
"""team_elo_ratings tablosundan rating'leri yükle"""
conn = self.get_conn()
if conn is None:
return False
try:
cur = conn.cursor()
cur.execute("""
SELECT ter.team_id, t.name,
ter.overall_elo, ter.home_elo, ter.away_elo,
ter.form_elo, ter.matches_played, ter.recent_form
FROM team_elo_ratings ter
LEFT JOIN teams t ON ter.team_id = t.id
""")
rows = cur.fetchall()
cur.close()
if not rows:
return False
for row in rows:
tid, name, overall, home, away, form, played, recent = row
self.ratings[str(tid)] = TeamELO(
team_id=str(tid),
team_name=name or "",
overall_elo=float(overall),
home_elo=float(home),
away_elo=float(away),
form_elo=float(form),
matches_played=int(played),
recent_form=recent or [],
)
print(f"[OK] ELO V2 ratings DB'den yuklendi ({len(self.ratings)} takim)")
return True
except Exception as e:
print(f"[WARN] ELO DB yuklenemedi, JSON'a dusuyuyor: {e}")
return False
def _load_ratings_from_json(self):
"""JSON dosyasından rating'leri yükle (fallback)"""
ratings_path = os.path.join(MODELS_DIR, 'elo_ratings_v2.json')
if os.path.exists(ratings_path):
try:
with open(ratings_path, 'r', encoding='utf-8') as f:
data = json.load(f)
for team_id, rating_data in data.items():
self.ratings[team_id] = TeamELO(**rating_data)
print(f"[OK] ELO V2 ratings JSON'dan yuklendi ({len(self.ratings)} takim)")
except Exception as e:
print(f"[WARN] ELO V2 ratings yuklenemedi: {e}")
def save_ratings(self):
"""Rating'leri kaydet"""
ratings_path = os.path.join(MODELS_DIR, 'elo_ratings_v2.json')
os.makedirs(MODELS_DIR, exist_ok=True)
data = {team_id: asdict(elo) for team_id, elo in self.ratings.items()}
with open(ratings_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"💾 ELO V2 ratings kaydedildi ({len(self.ratings)} takım)")
def get_or_create_rating(self, team_id: str, team_name: str = "") -> TeamELO:
"""Takımın ELO'sunu getir veya oluştur"""
if team_id not in self.ratings:
self.ratings[team_id] = TeamELO(team_id=team_id, team_name=team_name)
return self.ratings[team_id]
def get_league_quality(self, league_name: str) -> float:
"""Lig kalitesi faktörünü döndür"""
if not league_name:
return LEAGUE_QUALITY["default"]
league_lower = league_name.lower()
for key, quality in LEAGUE_QUALITY.items():
if key in league_lower:
return quality
return LEAGUE_QUALITY["default"]
def expected_score(self, rating_a: float, rating_b: float) -> float:
"""
A'nın B'ye karşı beklenen skoru (0-1 arası).
1 = kesin kazanır, 0.5 = eşit, 0 = kesin kaybeder
"""
return 1 / (1 + 10 ** ((rating_b - rating_a) / 400))
def get_k_factor(self, team_elo: TeamELO, goal_diff: int,
league_quality: float = 1.0) -> float:
"""
Dinamik K-faktörü hesapla.
- Yeni takımlar için yüksek (hızlı adaptasyon)
- Gol farkı yüksekse yüksek
- Kaliteli liglerde yüksek
"""
# Temel K
if team_elo.matches_played < 20:
k = self.K_FACTOR_NEW_TEAM
else:
k = self.K_FACTOR_BASE
# Gol farkı çarpanı
if goal_diff == 1:
goal_mult = 1.0
elif goal_diff == 2:
goal_mult = 1.25
elif goal_diff == 3:
goal_mult = 1.5
else:
goal_mult = 1.75 + (goal_diff - 3) * 0.1
# Lig kalitesi çarpanı
return k * goal_mult * league_quality
def update_after_match(
self,
home_id: str,
away_id: str,
home_goals: int,
away_goals: int,
home_name: str = "",
away_name: str = "",
league_name: str = ""
):
"""Maç sonrası ELO güncelle"""
home_elo = self.get_or_create_rating(home_id, home_name)
away_elo = self.get_or_create_rating(away_id, away_name)
# Gerçek skor
if home_goals > away_goals:
actual_home, actual_away = 1.0, 0.0
home_elo.wins += 1
away_elo.losses += 1
result_home, result_away = 'W', 'L'
elif home_goals < away_goals:
actual_home, actual_away = 0.0, 1.0
home_elo.losses += 1
away_elo.wins += 1
result_home, result_away = 'L', 'W'
else:
actual_home, actual_away = 0.5, 0.5
home_elo.draws += 1
away_elo.draws += 1
result_home, result_away = 'D', 'D'
goal_diff = abs(home_goals - away_goals)
league_quality = self.get_league_quality(league_name)
# K faktörleri
k_home = self.get_k_factor(home_elo, goal_diff, league_quality)
k_away = self.get_k_factor(away_elo, goal_diff, league_quality)
# -- Overall ELO --
expected_home = self.expected_score(
home_elo.overall_elo + self.HOME_ADVANTAGE,
away_elo.overall_elo
)
home_elo.overall_elo += k_home * (actual_home - expected_home)
away_elo.overall_elo += k_away * (actual_away - (1 - expected_home))
# -- Venue-Specific ELO --
expected_home_venue = self.expected_score(home_elo.home_elo, away_elo.away_elo)
home_elo.home_elo += k_home * (actual_home - expected_home_venue)
away_elo.away_elo += k_away * (actual_away - (1 - expected_home_venue))
# -- Form ELO (son maçlar daha ağırlıklı) --
home_elo.form_elo = (
home_elo.form_elo * (1 - self.FORM_WEIGHT) +
(1500 + (actual_home - 0.5) * 100) * self.FORM_WEIGHT
)
away_elo.form_elo = (
away_elo.form_elo * (1 - self.FORM_WEIGHT) +
(1500 + (actual_away - 0.5) * 100) * self.FORM_WEIGHT
)
# Meta güncelle
home_elo.matches_played += 1
away_elo.matches_played += 1
home_elo.home_matches += 1
away_elo.away_matches += 1
# Son 5 form güncelle
home_elo.recent_form = (result_home + home_elo.recent_form)[:5]
away_elo.recent_form = (result_away + away_elo.recent_form)[:5]
home_elo.last_updated = datetime.now().isoformat()
away_elo.last_updated = datetime.now().isoformat()
def predict_match(self, home_id: str, away_id: str) -> Dict[str, float]:
"""
Maç için kazanma olasılıklarını tahmin et.
"""
home_elo = self.get_or_create_rating(home_id)
away_elo = self.get_or_create_rating(away_id)
# Overall bazlı
exp_home_overall = self.expected_score(
home_elo.overall_elo + self.HOME_ADVANTAGE,
away_elo.overall_elo
)
# Venue bazlı
exp_home_venue = self.expected_score(
home_elo.home_elo,
away_elo.away_elo
)
# Kombine (ortama)
home_prob = (exp_home_overall + exp_home_venue) / 2
# Draw tahmini (ELO farkı küçükse daha yüksek)
elo_diff = abs(home_elo.overall_elo - away_elo.overall_elo)
draw_base = 0.25 # Temel beraberlik oranı
draw_prob = draw_base * (1 - elo_diff / 800) # Fark arttıkça beraberlik azalır
draw_prob = max(0.15, min(draw_prob, 0.35))
# Normalize
remaining = 1 - draw_prob
home_win = home_prob * remaining
away_win = (1 - home_prob) * remaining
return {
"home_win": round(home_win, 3),
"draw": round(draw_prob, 3),
"away_win": round(away_win, 3),
}
def get_match_features(self, home_id: str, away_id: str) -> Dict[str, float]:
"""Model için ELO feature'larını döndür"""
home_elo = self.get_or_create_rating(home_id)
away_elo = self.get_or_create_rating(away_id)
probs = self.predict_match(home_id, away_id)
# Form encode (WWWDL -> sayısal)
def form_to_score(form: str) -> float:
if not form:
return 0.5
score = 0
for char in form:
if char == 'W':
score += 1
elif char == 'D':
score += 0.5
return score / max(len(form), 1)
return {
# Overall ELO
'elo_home_overall': home_elo.overall_elo,
'elo_away_overall': away_elo.overall_elo,
'elo_diff_overall': home_elo.overall_elo - away_elo.overall_elo,
# Venue-Specific ELO
'elo_home_venue': home_elo.home_elo,
'elo_away_venue': away_elo.away_elo,
'elo_diff_venue': home_elo.home_elo - away_elo.away_elo,
# Form ELO
'elo_home_form': home_elo.form_elo,
'elo_away_form': away_elo.form_elo,
'elo_diff_form': home_elo.form_elo - away_elo.form_elo,
# Win probabilities
'elo_prob_home': probs['home_win'],
'elo_prob_draw': probs['draw'],
'elo_prob_away': probs['away_win'],
# Experience
'elo_home_matches': min(home_elo.matches_played, 100),
'elo_away_matches': min(away_elo.matches_played, 100),
# Form score
'elo_home_form_score': form_to_score(home_elo.recent_form),
'elo_away_form_score': form_to_score(away_elo.recent_form),
# Win rates
'elo_home_win_rate': home_elo.win_rate(),
'elo_away_win_rate': away_elo.win_rate(),
}
def save_ratings_to_db(self):
"""Rating'leri team_elo_ratings tablosuna yaz (upsert)"""
conn = self.get_conn()
if conn is None:
print("❌ DB bağlantısı yok, DB'ye yazılamadı!")
return
cur = conn.cursor()
batch_size = 500
teams = list(self.ratings.values())
written = 0
for i in range(0, len(teams), batch_size):
batch = teams[i:i + batch_size]
values = []
for elo in batch:
values.append(cur.mogrify(
"(%s, %s, %s, %s, %s, %s, %s, NOW())",
(
elo.team_id,
round(elo.overall_elo, 2),
round(elo.home_elo, 2),
round(elo.away_elo, 2),
round(elo.form_elo, 2),
elo.matches_played,
elo.recent_form[:5],
)
).decode('utf-8'))
sql = """
INSERT INTO team_elo_ratings
(team_id, overall_elo, home_elo, away_elo, form_elo, matches_played, recent_form, updated_at)
VALUES {}
ON CONFLICT (team_id) DO UPDATE SET
overall_elo = EXCLUDED.overall_elo,
home_elo = EXCLUDED.home_elo,
away_elo = EXCLUDED.away_elo,
form_elo = EXCLUDED.form_elo,
matches_played = EXCLUDED.matches_played,
recent_form = EXCLUDED.recent_form,
updated_at = EXCLUDED.updated_at
""".format(", ".join(values))
cur.execute(sql)
written += len(batch)
conn.commit()
cur.close()
print(f"💾 DB'ye {written} takım ELO yazıldı (team_elo_ratings)")
def _load_top_league_ids(self) -> set:
"""top_leagues.json'dan lig ID'lerini oku"""
paths = [
os.path.join(os.path.dirname(__file__), '..', '..', 'top_leagues.json'),
os.path.join(os.path.dirname(__file__), '..', 'top_leagues.json'),
]
for p in paths:
if os.path.exists(p):
with open(p) as f:
ids = set(json.load(f))
print(f"📋 {len(ids)} top lig yüklendi ({os.path.basename(p)})")
return ids
print("⚠️ top_leagues.json bulunamadı — tüm maçlar yazılacak")
return set()
def calculate_all_from_history(self, sport: str = 'football'):
"""Tüm tarihsel maçlardan ELO hesapla, top ligleri match_ai_features'a yaz"""
print(f"\n🔄 {sport.upper()} için ELO V2 hesaplanıyor...")
conn = self.get_conn()
if conn is None:
print("❌ DB bağlantısı yok!")
return
top_league_ids = self._load_top_league_ids()
cur = conn.cursor()
# Tüm bitmiş maçları tarih sırasına göre al (m.id ve league_id dahil)
cur.execute("""
SELECT m.id, m.home_team_id, m.away_team_id,
m.score_home, m.score_away, m.league_id,
t1.name as home_name, t2.name as away_name,
l.name as league_name
FROM matches m
LEFT JOIN teams t1 ON m.home_team_id = t1.id
LEFT JOIN teams t2 ON m.away_team_id = t2.id
LEFT JOIN leagues l ON m.league_id = l.id
WHERE m.sport = %s
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
ORDER BY m.mst_utc ASC
""", (sport,))
matches = cur.fetchall()
print(f"📊 {len(matches):,} maç işlenecek...")
BATCH_SIZE = 1000
batch: list = []
processed = 0
written = 0
for match in matches:
(match_id, home_id, away_id, score_h, score_a,
league_id, home_name, away_name, league) = match
if not (home_id and away_id):
continue
# Sadece top ligler için pre-match ELO kaydet
if not top_league_ids or league_id in top_league_ids:
home_elo_obj = self.get_or_create_rating(home_id, home_name or "")
away_elo_obj = self.get_or_create_rating(away_id, away_name or "")
batch.append((
match_id,
home_elo_obj.overall_elo,
away_elo_obj.overall_elo,
home_elo_obj.home_elo,
away_elo_obj.away_elo,
home_elo_obj.form_elo,
away_elo_obj.form_elo,
))
# Tüm maçlar için ELO güncelle
self.update_after_match(
home_id, away_id, score_h, score_a,
home_name or "", away_name or "", league or ""
)
processed += 1
if len(batch) >= BATCH_SIZE:
self._flush_elo_batch(cur, batch, sport)
conn.commit()
written += len(batch)
batch.clear()
if processed % 10000 == 0:
print(f" İşlenen: {processed:,} / {len(matches):,}")
# Kalan batch'i yaz
if batch:
self._flush_elo_batch(cur, batch, sport)
conn.commit()
written += len(batch)
cur.close()
print(f"{processed:,} maç işlendi, {len(self.ratings)} takım")
print(f"📝 {written:,} maç match_ai_features'a yazıldı")
# JSON'a kaydet
self.save_ratings()
# DB'ye kaydet
self.save_ratings_to_db()
# Top 20 takımı göster
self._show_top_teams()
@staticmethod
def _flush_elo_batch(cur, batch: list, sport: str = 'football') -> None:
"""Batch upsert pre-match ELO values into sport-partitioned ai_features table."""
from psycopg2.extras import execute_values
table_name = 'football_ai_features' if sport == 'football' else 'basketball_ai_features'
sql = f"""
INSERT INTO {table_name}
(match_id, home_elo, away_elo,
home_home_elo, away_away_elo,
home_form_elo, away_form_elo,
calculator_ver, updated_at)
VALUES %s
ON CONFLICT (match_id) DO UPDATE SET
home_elo = EXCLUDED.home_elo,
away_elo = EXCLUDED.away_elo,
home_home_elo = EXCLUDED.home_home_elo,
away_away_elo = EXCLUDED.away_away_elo,
home_form_elo = EXCLUDED.home_form_elo,
away_form_elo = EXCLUDED.away_form_elo,
calculator_ver = EXCLUDED.calculator_ver,
updated_at = EXCLUDED.updated_at
"""
now = datetime.now().isoformat()
values = [
(mid, h_elo, a_elo, hh_elo, aa_elo, hf_elo, af_elo,
'elo_v2_backfill', now)
for mid, h_elo, a_elo, hh_elo, aa_elo, hf_elo, af_elo in batch
]
execute_values(cur, sql, values, page_size=500)
def _show_top_teams(self, n: int = 20):
"""En güçlü takımları göster"""
sorted_teams = sorted(
self.ratings.items(),
key=lambda x: x[1].overall_elo,
reverse=True
)[:n]
print(f"\n🏆 Top {n} Takım (ELO V2):")
for i, (team_id, elo) in enumerate(sorted_teams, 1):
name = elo.team_name[:25] if elo.team_name else team_id[:25]
print(f" {i:2}. {name:25}{elo.overall_elo:.0f} (H:{elo.home_elo:.0f} A:{elo.away_elo:.0f})")
# Singleton
_system = None
def get_elo_system() -> ELORatingSystem:
global _system
if _system is None:
_system = ELORatingSystem()
return _system
if __name__ == "__main__":
import sys
from pathlib import Path
# Ensure ai-engine root is on sys.path (for `from data.db import ...`)
_AI_ENGINE_ROOT = Path(__file__).resolve().parent.parent
if str(_AI_ENGINE_ROOT) not in sys.path:
sys.path.insert(0, str(_AI_ENGINE_ROOT))
system = get_elo_system()
if len(sys.argv) > 1 and sys.argv[1] == 'calculate':
system.calculate_all_from_history('football')
else:
print("\n🧪 ELO V2 Test")
print("Kullanım: python elo_system.py calculate")
print(f"\n📊 Yüklü takım sayısı: {len(system.ratings)}")
if len(system.ratings) > 0:
system._show_top_teams(10)