Files
fahricansecer 94c7a4481a
Deploy Iddaai Backend / build-and-deploy (push) Successful in 37s
main
2026-05-17 02:17:22 +03:00

1112 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Data Loader Mixin — DB fetching, lineup/odds parsing.
Auto-extracted mixin module — split from services/single_match_orchestrator.py.
All methods here are composed into SingleMatchOrchestrator via inheritance.
`self` attributes (self.dsn, self.enrichment, self.v25_predictor, etc.) are
initialised in the main __init__.
"""
from __future__ import annotations
import json
import re
import time
import math
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, overload
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import RealDictCursor
from data.db import get_clean_dsn
from schemas.prediction import FullMatchPrediction
from schemas.match_data import MatchData
from models.v25_ensemble import V25Predictor, get_v25_predictor
try:
from models.v27_predictor import V27Predictor, compute_divergence, compute_value_edge
except ImportError:
class V27Predictor: # type: ignore[no-redef]
def __init__(self): self.models = {}
def load_models(self): return False
def predict_all(self, features): return {}
def compute_divergence(*args, **kwargs):
return {}
def compute_value_edge(*args, **kwargs):
return {}
from features.odds_band_analyzer import OddsBandAnalyzer
try:
from models.basketball_v25 import (
BasketballMatchPrediction,
get_basketball_v25_predictor,
)
except ImportError:
BasketballMatchPrediction = Any # type: ignore[misc]
def get_basketball_v25_predictor() -> Any:
raise ImportError("Basketball predictor is not available")
from core.engines.player_predictor import PlayerPrediction, get_player_predictor
from services.feature_enrichment import FeatureEnrichmentService
from services.betting_brain import BettingBrain
from services.v26_shadow_engine import V26ShadowEngine, get_v26_shadow_engine
from services.match_commentary import generate_match_commentary
from utils.top_leagues import load_top_league_ids
from utils.league_reliability import load_league_reliability
from config.config_loader import build_threshold_dict, get_threshold_default
from models.calibration import get_calibrator
class DataLoaderMixin:
def _load_match_data(self, match_id: str) -> Optional[MatchData]:
with psycopg2.connect(self.dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
row = self._fetch_live_match(cur, match_id)
if not row:
row = self._fetch_hist_match(cur, match_id)
if not row:
return None
home_team_id = row.get("home_team_id")
away_team_id = row.get("away_team_id")
if not home_team_id or not away_team_id:
# Hard gate: predictions with unknown teams are noisy and misleading.
return None
status, state, substate = self._normalize_match_status(
row.get("status"),
row.get("state"),
row.get("substate"),
row.get("score_home"),
row.get("score_away"),
)
odds_data = self._extract_odds(cur, row)
home_lineup, away_lineup, lineup_source, lineup_confidence = self._extract_lineups(cur, row)
sidelined = self._parse_json_dict(row.get("sidelined"))
match_date_ms = int(row.get("match_date_ms") or 0)
league_id = str(row.get("league_id")) if row.get("league_id") else None
home_id_str = str(home_team_id)
away_id_str = str(away_team_id)
home_goals_avg, home_conceded_avg = self._calculate_team_form(
cur=cur,
team_id=home_id_str,
before_date_ms=match_date_ms,
)
away_goals_avg, away_conceded_avg = self._calculate_team_form(
cur=cur,
team_id=away_id_str,
before_date_ms=match_date_ms,
)
home_position = self._estimate_league_position(
cur=cur,
team_id=home_id_str,
league_id=league_id,
before_date_ms=match_date_ms,
)
away_position = self._estimate_league_position(
cur=cur,
team_id=away_id_str,
league_id=league_id,
before_date_ms=match_date_ms,
)
return MatchData(
match_id=str(row["match_id"]),
home_team_id=home_id_str,
away_team_id=away_id_str,
home_team_name=row.get("home_team_name") or "Home",
away_team_name=row.get("away_team_name") or "Away",
match_date_ms=match_date_ms,
sport=str(row.get("sport") or "football").lower(),
league_id=league_id,
league_name=row.get("league_name") or "",
referee_name=row.get("referee_name"),
odds_data=odds_data,
home_lineup=home_lineup,
away_lineup=away_lineup,
sidelined_data=sidelined,
home_goals_avg=home_goals_avg,
home_conceded_avg=home_conceded_avg,
away_goals_avg=away_goals_avg,
away_conceded_avg=away_conceded_avg,
home_position=home_position,
away_position=away_position,
lineup_source=lineup_source,
status=status,
state=state,
substate=substate,
lineup_confidence=lineup_confidence,
source_table=str(row.get("source_table") or "matches"),
current_score_home=(
int(str(row.get("score_home")))
if row.get("score_home") is not None
else None
),
current_score_away=(
int(str(row.get("score_away")))
if row.get("score_away") is not None
else None
),
)
def _fetch_live_match(self, cur: RealDictCursor, match_id: str) -> Optional[Dict[str, Any]]:
cur.execute(
"""
SELECT
lm.id as match_id,
lm.home_team_id,
lm.away_team_id,
lm.league_id,
lm.sport,
lm.mst_utc as match_date_ms,
lm.status,
lm.state,
lm.substate,
lm.score_home,
lm.score_away,
lm.odds,
lm.lineups,
lm.sidelined,
lm.referee_name,
ht.name as home_team_name,
at.name as away_team_name,
l.name as league_name,
'live_matches'::text as source_table
FROM live_matches lm
LEFT JOIN teams ht ON ht.id = lm.home_team_id
LEFT JOIN teams at ON at.id = lm.away_team_id
LEFT JOIN leagues l ON l.id = lm.league_id
WHERE lm.id = %s
LIMIT 1
""",
(match_id,),
)
return cur.fetchone()
@staticmethod
def _normalize_match_status(
status: Any,
state: Any,
substate: Any,
score_home: Any,
score_away: Any,
) -> Tuple[str, Optional[str], Optional[str]]:
state_text = str(state or "").strip()
status_text = str(status or "").strip()
substate_text = str(substate or "").strip()
state_key = state_text.lower().replace("_", "").replace(" ", "")
status_key = status_text.lower().replace("_", "").replace(" ", "")
substate_key = substate_text.lower().replace("_", "").replace(" ", "")
live_tokens = {"live", "livegame", "firsthalf", "secondhalf", "halftime", "1h", "2h", "ht", "1q", "2q", "3q", "4q"}
finished_tokens = {"post", "postgame", "finished", "played", "ft", "ended", "aet", "pen", "penalties", "afterpenalties"}
pre_tokens = {"pre", "pregame", "scheduled", "ns", "notstarted", "timestamp"}
if state_key in live_tokens or status_key in live_tokens or substate_key in live_tokens:
return "LIVE", state_text or "live", substate_text or None
if state_key in finished_tokens or status_key in finished_tokens or substate_key in finished_tokens:
return "FT", state_text or "post", substate_text or None
if score_home is not None and score_away is not None and status_key not in pre_tokens:
return "FT", state_text or "post", substate_text or None
if state_key in pre_tokens or status_key in pre_tokens or substate_key in pre_tokens:
return "NS", state_text or "pre", substate_text or None
return status_text or "NS", state_text or None, substate_text or None
def _fetch_hist_match(self, cur: RealDictCursor, match_id: str) -> Optional[Dict[str, Any]]:
cur.execute(
"""
SELECT
m.id as match_id,
m.home_team_id,
m.away_team_id,
m.league_id,
m.sport,
m.mst_utc as match_date_ms,
m.status,
m.state,
NULL::text as substate,
m.score_home,
m.score_away,
NULL::jsonb as odds,
NULL::jsonb as lineups,
NULL::jsonb as sidelined,
ref.name as referee_name,
ht.name as home_team_name,
at.name as away_team_name,
l.name as league_name,
'matches'::text as source_table
FROM matches m
LEFT JOIN teams ht ON ht.id = m.home_team_id
LEFT JOIN teams at ON at.id = m.away_team_id
LEFT JOIN leagues l ON l.id = m.league_id
LEFT JOIN match_officials ref ON ref.match_id = m.id AND ref.role_id = 1
WHERE m.id = %s
LIMIT 1
""",
(match_id,),
)
return cur.fetchone()
def _extract_odds(self, cur: RealDictCursor, row: Dict[str, Any]) -> Dict[str, float]:
odds_data = self._parse_odds_json(row.get("odds"))
sport_key = str(row.get("sport") or "football").lower()
missing_relational_keys = [k for k in self.RELATIONAL_ODDS_KEYS if k not in odds_data]
if missing_relational_keys:
# fallback to relational odds tables when live odds JSON is incomplete
cur.execute(
"""
SELECT oc.name as category_name, os.name as selection_name, os.odd_value
FROM odd_categories oc
JOIN odd_selections os ON os.odd_category_db_id = oc.db_id
WHERE oc.match_id = %s
ORDER BY oc.db_id ASC, os.db_id ASC
""",
(row["match_id"],),
)
relational_rows = cur.fetchall()
rel_odds = self._parse_relational_odds([dict(r) for r in relational_rows])
if rel_odds:
for key, value in rel_odds.items():
odds_data.setdefault(key, value)
# Odds staleness check: warn if odds haven't been updated within 48h of match
# Uses a savepoint to avoid aborting the transaction if the column doesn't exist
try:
cur.execute("SAVEPOINT odds_staleness_check")
match_ts_ms = int(row.get("match_date_ms") or 0)
if match_ts_ms > 0:
cur.execute(
"""
SELECT EXTRACT(EPOCH FROM (NOW() - MAX(oc.updated_at))) / 3600 AS hours_stale
FROM odd_categories oc
WHERE oc.match_id = %s AND oc.updated_at IS NOT NULL
""",
(row["match_id"],),
)
stale_row = cur.fetchone()
if stale_row and stale_row.get("hours_stale") is not None:
hours_stale = float(stale_row["hours_stale"])
if hours_stale > 48:
print(f"⚠️ [DataLoader] Odds for {row['match_id']} are {hours_stale:.0f}h stale (threshold: 48h)")
odds_data["_odds_stale"] = True
cur.execute("RELEASE SAVEPOINT odds_staleness_check")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT odds_staleness_check") # restore transaction
if sport_key == "basketball":
# Reuse football aliases when source only publishes generic match-result naming.
if "ml_h" not in odds_data and "ms_h" in odds_data:
odds_data["ml_h"] = float(odds_data["ms_h"])
if "ml_a" not in odds_data and "ms_a" in odds_data:
odds_data["ml_a"] = float(odds_data["ms_a"])
if "ml_h" not in odds_data:
odds_data["ml_h"] = 1.90
if "ml_a" not in odds_data:
odds_data["ml_a"] = 1.90
if "tot_line" in odds_data and "tot_o" not in odds_data:
odds_data["tot_o"] = 1.90
if "tot_line" in odds_data and "tot_u" not in odds_data:
odds_data["tot_u"] = 1.90
else:
if "ms_h" not in odds_data:
odds_data["ms_h"] = self.DEFAULT_MS_H
if "ms_d" not in odds_data:
odds_data["ms_d"] = self.DEFAULT_MS_D
if "ms_a" not in odds_data:
odds_data["ms_a"] = self.DEFAULT_MS_A
return odds_data
def _extract_lineups(
self,
cur: RealDictCursor,
row: Dict[str, Any],
) -> Tuple[Optional[List[str]], Optional[List[str]], str, float]:
live_lineups = row.get("lineups")
status_upper = str(row.get("status") or "").upper()
state_upper = str(row.get("state") or "").upper()
substate_upper = str(row.get("substate") or "").upper()
can_trust_feed_lineups = (
status_upper in {"LIVE", "1H", "2H", "HT", "FT", "FINISHED"}
or state_upper in {"LIVE", "FIRSTHALF", "SECONDHALF", "POSTGAME", "POST_GAME"}
or substate_upper in {"LIVE", "FIRSTHALF", "SECONDHALF"}
)
home, away = self._parse_lineups_json(live_lineups) if can_trust_feed_lineups else (None, None)
if (home and len(home) >= 9) and (away and len(away) >= 9):
return home, away, "confirmed_live", 1.0
home_id = str(row["home_team_id"])
away_id = str(row["away_team_id"])
# fallback 1: current match participation table.
# Trust this only for live/finished matches; pre-match rows can be stale feed snapshots.
if can_trust_feed_lineups:
cur.execute(
"""
SELECT team_id, player_id
FROM match_player_participation
WHERE match_id = %s
AND is_starting = true
""",
(row["match_id"],),
)
rows = cur.fetchall()
if rows:
home_players = [str(r["player_id"]) for r in rows if str(r["team_id"]) == home_id]
away_players = [str(r["player_id"]) for r in rows if str(r["team_id"]) == away_id]
if not home and home_players:
home = home_players
if not away and away_players:
away = away_players
if (home and len(home) >= 9) and (away and len(away) >= 9):
return home, away, "confirmed_participation", 0.98
# fallback 2: probable XI from historical starts before match date
before_date_ms = int(row.get("match_date_ms") or 0)
sidelined = self._parse_json_dict(row.get("sidelined")) or {}
home_excluded = self._sidelined_player_ids(sidelined.get("homeTeam"))
away_excluded = self._sidelined_player_ids(sidelined.get("awayTeam"))
used_probable = False
home_conf = 0.0
away_conf = 0.0
if not home or len(home) < 9:
home, home_conf = self._build_probable_xi(
cur,
home_id,
before_date_ms,
excluded_player_ids=home_excluded,
)
used_probable = used_probable or bool(home)
if not away or len(away) < 9:
away, away_conf = self._build_probable_xi(
cur,
away_id,
before_date_ms,
excluded_player_ids=away_excluded,
)
used_probable = used_probable or bool(away)
if used_probable:
inferred_conf = min(
home_conf if home else 0.0,
away_conf if away else 0.0,
)
return home, away, "probable_xi", inferred_conf
return home, away, "none", 0.0
def _calculate_team_form(
self,
cur: RealDictCursor,
team_id: str,
before_date_ms: int,
limit: int = 5,
) -> Tuple[float, float]:
if not team_id:
return 1.5, 1.2
cur.execute(
"""
SELECT
m.home_team_id,
m.away_team_id,
m.score_home,
m.score_away
FROM matches m
WHERE (m.home_team_id = %s OR m.away_team_id = %s)
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
ORDER BY m.mst_utc DESC
LIMIT %s
""",
(team_id, team_id, before_date_ms, limit),
)
rows = cur.fetchall()
if not rows:
return 1.5, 1.2
weighted_for = 0.0
weighted_against = 0.0
total_weight = 0.0
for idx, row in enumerate(rows):
weight = float(limit - idx)
is_home = str(row["home_team_id"]) == team_id
goals_for = float(row["score_home"] if is_home else row["score_away"])
goals_against = float(row["score_away"] if is_home else row["score_home"])
weighted_for += goals_for * weight
weighted_against += goals_against * weight
total_weight += weight
if total_weight <= 0:
return 1.5, 1.2
return weighted_for / total_weight, weighted_against / total_weight
def _estimate_league_position(
self,
cur: RealDictCursor,
team_id: str,
league_id: Optional[str],
before_date_ms: int,
) -> int:
if not team_id or not league_id:
return 10
try:
cur.execute(
"""
SELECT
tm.team_id,
SUM(tm.points)::int AS points
FROM (
SELECT
m.home_team_id AS team_id,
CASE
WHEN m.score_home > m.score_away THEN 3
WHEN m.score_home = m.score_away THEN 1
ELSE 0
END AS points
FROM matches m
WHERE m.league_id = %s
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
UNION ALL
SELECT
m.away_team_id AS team_id,
CASE
WHEN m.score_away > m.score_home THEN 3
WHEN m.score_away = m.score_home THEN 1
ELSE 0
END AS points
FROM matches m
WHERE m.league_id = %s
AND m.status = 'FT'
AND m.score_home IS NOT NULL
AND m.score_away IS NOT NULL
AND m.mst_utc < %s
) tm
GROUP BY tm.team_id
ORDER BY points DESC
""",
(league_id, before_date_ms, league_id, before_date_ms),
)
rows = cur.fetchall()
if not rows:
return 10
for idx, row in enumerate(rows, start=1):
if str(row["team_id"]) == team_id:
return idx
return min(20, len(rows))
except Exception:
return 10
def _build_probable_xi(
self,
cur: RealDictCursor,
team_id: str,
before_date_ms: int,
match_limit: int = 5,
lookback_days: int = 370,
max_staleness_days: int = 120,
excluded_player_ids: Optional[Set[str]] = None,
) -> Tuple[Optional[List[str]], float]:
if not team_id:
return None, 0.0
min_date_ms = max(0, before_date_ms - (lookback_days * 24 * 60 * 60 * 1000))
cur.execute(
"""
SELECT
mpp.player_id,
m.id AS match_id,
m.mst_utc,
m.home_team_id,
m.away_team_id
FROM match_player_participation mpp
JOIN matches m ON m.id = mpp.match_id
WHERE mpp.team_id = %s
AND mpp.is_starting = true
AND NOT EXISTS (
SELECT 1
FROM match_player_participation later_mpp
JOIN matches later_m ON later_m.id = later_mpp.match_id
WHERE later_mpp.player_id = mpp.player_id
AND later_mpp.team_id <> %s
AND later_m.mst_utc > m.mst_utc
AND later_m.mst_utc < %s
AND (
later_m.status = 'FT'
OR later_m.state = 'postGame'
OR (later_m.score_home IS NOT NULL AND later_m.score_away IS NOT NULL)
)
)
AND m.id IN (
SELECT m2.id
FROM matches m2
JOIN match_player_participation recent_mpp
ON recent_mpp.match_id = m2.id
AND recent_mpp.team_id = %s
AND recent_mpp.is_starting = true
WHERE (m2.home_team_id = %s OR m2.away_team_id = %s)
AND (
m2.status = 'FT'
OR m2.state = 'postGame'
OR (m2.score_home IS NOT NULL AND m2.score_away IS NOT NULL)
)
AND m2.mst_utc < %s
AND m2.mst_utc >= %s
GROUP BY m2.id
HAVING COUNT(recent_mpp.*) >= 9
ORDER BY MAX(m2.mst_utc) DESC
LIMIT %s
)
ORDER BY m.mst_utc DESC
""",
(
team_id,
team_id,
before_date_ms,
team_id,
team_id,
team_id,
before_date_ms,
min_date_ms,
match_limit,
),
)
rows = cur.fetchall()
if not rows:
return None, 0.0
latest_mst = max(int(row.get("mst_utc") or 0) for row in rows)
age_days = (before_date_ms - latest_mst) / (24 * 60 * 60 * 1000)
stale_projection = age_days > max_staleness_days
excluded = {str(pid) for pid in (excluded_player_ids or set()) if pid}
match_order: Dict[str, int] = {}
for row in rows:
match_id = str(row["match_id"])
if match_id not in match_order:
match_order[match_id] = len(match_order)
player_scores: Dict[str, Dict[str, float]] = {}
for row in rows:
player_id = str(row["player_id"])
if player_id in excluded:
continue
idx = match_order.get(str(row["match_id"]), match_limit)
recency_weight = max(1.0, float(match_limit - idx))
score = recency_weight
if idx == 0:
score += 3.0
elif idx == 1:
score += 1.5
stats = player_scores.setdefault(
player_id,
{
"score": 0.0,
"starts": 0.0,
"last_seen_rank": float(idx),
},
)
stats["score"] += score
stats["starts"] += 1.0
stats["last_seen_rank"] = min(stats["last_seen_rank"], float(idx))
if not player_scores:
return None, 0.0
ranked = sorted(
player_scores.items(),
key=lambda item: (
item[1]["score"],
item[1]["starts"],
-item[1]["last_seen_rank"],
),
reverse=True,
)
lineup = [player_id for player_id, _ in ranked[:11]]
coverage = min(1.0, len(lineup) / 11.0)
available_matches = max(1, len(match_order))
history_score = min(1.0, available_matches / float(match_limit))
core_stability = 0.0
if ranked:
stable_core = sum(1 for _, stats in ranked[:11] if stats["starts"] >= 2.0)
core_stability = stable_core / 11.0
staleness_factor = max(
0.35,
min(1.0, float(max_staleness_days) / max(age_days, 1.0)),
)
confidence = (
(coverage * 0.45) + (history_score * 0.25) + (core_stability * 0.30)
) * staleness_factor
if excluded:
confidence *= 0.92
confidence_cap = 0.58 if stale_projection else 0.88
return lineup or None, round(max(0.0, min(confidence_cap, confidence)), 3)
@staticmethod
def _sidelined_player_ids(team_data: Any) -> Set[str]:
if not isinstance(team_data, dict):
return set()
players = team_data.get("players")
if not isinstance(players, list):
return set()
ids: Set[str] = set()
for player in players:
if not isinstance(player, dict):
continue
player_id = (
player.get("playerId")
or player.get("player_id")
or player.get("id")
or player.get("personId")
)
if player_id:
ids.add(str(player_id))
return ids
def _parse_odds_json(self, odds_json: Any) -> Dict[str, float]:
odds_json = self._parse_json_dict(odds_json)
if odds_json is None:
return {}
parsed: Dict[str, float] = {}
for category, selections in odds_json.items():
if not isinstance(selections, dict):
continue
category_text = str(category or "")
category_norm = self._normalize_text(category)
if category_norm in ("ms", "maç sonucu", "mac sonucu"):
parsed["ms_h"] = self._selection_value(selections, ("1",), 0.0)
parsed["ms_d"] = self._selection_value(selections, ("x", "0"), 0.0)
parsed["ms_a"] = self._selection_value(selections, ("2",), 0.0)
elif "maç sonucu (uzt. dahil)" in category_norm or "mac sonucu (uzt. dahil)" in category_norm:
parsed["ml_h"] = self._selection_value(selections, ("1",), 0.0)
parsed["ml_a"] = self._selection_value(selections, ("2",), 0.0)
elif category_norm in ("1. yarı sonucu", "1. yari sonucu", "ilk yarı sonucu", "ilk yari sonucu", "iy sonucu"):
parsed["ht_h"] = self._selection_value(selections, ("1",), 0.0)
parsed["ht_d"] = self._selection_value(selections, ("x", "0"), 0.0)
parsed["ht_a"] = self._selection_value(selections, ("2",), 0.0)
elif self._is_first_half_ou05_category(category_norm):
parsed["ht_ou05_o"] = self._selection_value(selections, ("üst", "ust", "over"), 0.0)
parsed["ht_ou05_u"] = self._selection_value(selections, ("alt", "under"), 0.0)
elif self._is_first_half_ou15_category(category_norm):
parsed["ht_ou15_o"] = self._selection_value(selections, ("üst", "ust", "over"), 0.0)
parsed["ht_ou15_u"] = self._selection_value(selections, ("alt", "under"), 0.0)
elif category_norm in ("2.5 alt/üst", "2,5 alt/üst"):
parsed["ou25_o"] = self._selection_value(selections, ("üst", "ust", "over"), 0.0)
parsed["ou25_u"] = self._selection_value(selections, ("alt", "under"), 0.0)
elif category_norm in ("1.5 alt/üst", "1,5 alt/üst"):
parsed["ou15_o"] = self._selection_value(selections, ("üst", "ust", "over"), 0.0)
parsed["ou15_u"] = self._selection_value(selections, ("alt", "under"), 0.0)
elif category_norm in ("3.5 alt/üst", "3,5 alt/üst"):
parsed["ou35_o"] = self._selection_value(selections, ("üst", "ust", "over"), 0.0)
parsed["ou35_u"] = self._selection_value(selections, ("alt", "under"), 0.0)
elif category_norm in ("karşılıklı gol", "karsilikli gol", "kg"):
parsed["btts_y"] = self._selection_value(selections, ("var", "yes"), 0.0)
parsed["btts_n"] = self._selection_value(selections, ("yok", "no"), 0.0)
elif category_norm in ("çifte şans", "cifte sans"):
parsed["dc_1x"] = self._selection_value(selections, ("1-x", "1x"), 0.0)
parsed["dc_x2"] = self._selection_value(selections, ("x-2", "x2"), 0.0)
parsed["dc_12"] = self._selection_value(selections, ("1-2", "12"), 0.0)
elif category_norm in ("tek/çift", "tek/cift"):
parsed["oe_odd"] = self._selection_value(selections, ("tek", "odd"), 0.0)
parsed["oe_even"] = self._selection_value(selections, ("çift", "cift", "even"), 0.0)
elif self._is_cards_ou_category(category_norm):
parsed["cards_o"] = self._selection_value(selections, ("üst", "ust", "over"), 0.0)
parsed["cards_u"] = self._selection_value(selections, ("alt", "under"), 0.0)
elif category_norm in (
"ilk yarı/maç sonucu",
"ilk yari/mac sonucu",
"iy/ms",
):
for sel_key, sel_val in selections.items():
norm_sel = self._normalize_text(sel_key)
if "/" in norm_sel:
odds_key = f"htft_{norm_sel.replace('/', '').lower()}"
parsed[odds_key] = self._to_float(sel_val, 0.0)
# Basketball full-game total line, e.g. "Alt/Üst (163,5)"
if self._is_basketball_total_category(category_norm):
if "tot_line" not in parsed:
line = self._extract_parenthesized_number(category_text)
if line is not None:
parsed["tot_line"] = line
parsed.setdefault("tot_o", self._selection_value(selections, ("üst", "ust", "over"), 0.0))
parsed.setdefault("tot_u", self._selection_value(selections, ("alt", "under"), 0.0))
# Basketball spread, e.g. "Hnd. MS (0:5,5)"
if (
"hnd. ms" in category_norm
or "hand. ms" in category_norm
or "hnd ms" in category_norm
):
home_line = self._parse_handicap_home_line(category_text)
if home_line is not None and "spread_home_line" not in parsed:
parsed["spread_home_line"] = home_line
if home_line is not None:
self._set_basketball_handicap_odds(parsed, selections, home_line)
elif self._is_football_handicap_category(category_norm):
self._set_football_handicap_odds(parsed, selections)
return parsed
def _parse_relational_odds(self, rows: List[Dict[str, Any]]) -> Dict[str, float]:
parsed: Dict[str, float] = {}
for row in rows:
category_name = str(row.get("category_name") or "")
selection_name = str(row.get("selection_name") or "")
category_norm = self._normalize_text(category_name)
selection_norm = self._normalize_text(selection_name)
odd_val = self._to_float(row.get("odd_value"), 0.0)
if odd_val <= 0:
continue
if category_norm in ("maç sonucu", "mac sonucu", "ms"):
if selection_norm == "1":
parsed["ms_h"] = odd_val
elif selection_norm in ("x", "0"):
parsed["ms_d"] = odd_val
elif selection_norm == "2":
parsed["ms_a"] = odd_val
elif "maç sonucu (uzt. dahil)" in category_norm or "mac sonucu (uzt. dahil)" in category_norm:
if selection_norm == "1":
parsed.setdefault("ml_h", odd_val)
elif selection_norm == "2":
parsed.setdefault("ml_a", odd_val)
elif category_norm in ("1. yarı sonucu", "1. yari sonucu", "ilk yarı sonucu", "ilk yari sonucu", "iy sonucu"):
if selection_norm == "1":
parsed["ht_h"] = odd_val
elif selection_norm in ("x", "0"):
parsed["ht_d"] = odd_val
elif selection_norm == "2":
parsed["ht_a"] = odd_val
elif self._is_first_half_ou05_category(category_norm):
if "üst" in selection_norm or "ust" in selection_norm or "over" in selection_norm:
parsed["ht_ou05_o"] = odd_val
elif "alt" in selection_norm or "under" in selection_norm:
parsed["ht_ou05_u"] = odd_val
elif self._is_first_half_ou15_category(category_norm):
if "üst" in selection_norm or "ust" in selection_norm or "over" in selection_norm:
parsed["ht_ou15_o"] = odd_val
elif "alt" in selection_norm or "under" in selection_norm:
parsed["ht_ou15_u"] = odd_val
elif category_norm in ("2,5 alt/üst", "2.5 alt/üst"):
if "üst" in selection_norm or "ust" in selection_norm or "over" in selection_norm:
parsed["ou25_o"] = odd_val
elif "alt" in selection_norm or "under" in selection_norm:
parsed["ou25_u"] = odd_val
elif category_norm in ("1,5 alt/üst", "1.5 alt/üst"):
if "üst" in selection_norm or "ust" in selection_norm or "over" in selection_norm:
parsed["ou15_o"] = odd_val
elif "alt" in selection_norm or "under" in selection_norm:
parsed["ou15_u"] = odd_val
elif category_norm in ("3,5 alt/üst", "3.5 alt/üst"):
if "üst" in selection_norm or "ust" in selection_norm or "over" in selection_norm:
parsed["ou35_o"] = odd_val
elif "alt" in selection_norm or "under" in selection_norm:
parsed["ou35_u"] = odd_val
elif category_norm in ("karşılıklı gol", "karsilikli gol", "kg"):
if selection_norm == "var" or "yes" in selection_norm:
parsed["btts_y"] = odd_val
elif selection_norm == "yok" or "no" in selection_norm:
parsed["btts_n"] = odd_val
elif category_norm in ("çifte şans", "cifte sans"):
if selection_norm in ("1-x", "1x"):
parsed["dc_1x"] = odd_val
elif selection_norm in ("x-2", "x2"):
parsed["dc_x2"] = odd_val
elif selection_norm in ("1-2", "12"):
parsed["dc_12"] = odd_val
elif category_norm in ("tek/çift", "tek/cift"):
if selection_norm in ("tek", "odd"):
parsed["oe_odd"] = odd_val
elif selection_norm in ("çift", "cift", "even"):
parsed["oe_even"] = odd_val
elif self._is_cards_ou_category(category_norm):
if "üst" in selection_norm or "ust" in selection_norm or "over" in selection_norm:
parsed["cards_o"] = odd_val
elif "alt" in selection_norm or "under" in selection_norm:
parsed["cards_u"] = odd_val
elif category_norm in (
"ilk yarı/maç sonucu",
"ilk yari/mac sonucu",
"iy/ms",
):
if "/" in selection_norm:
odds_key = f"htft_{selection_norm.replace('/', '').lower()}"
parsed[odds_key] = odd_val
if self._is_basketball_total_category(category_norm):
if "tot_line" not in parsed:
line = self._extract_parenthesized_number(category_name)
if line is not None:
parsed["tot_line"] = line
if "üst" in selection_norm or "ust" in selection_norm or "over" in selection_norm:
parsed.setdefault("tot_o", odd_val)
elif "alt" in selection_norm or "under" in selection_norm:
parsed.setdefault("tot_u", odd_val)
if (
"hnd. ms" in category_norm
or "hand. ms" in category_norm
or "hnd ms" in category_norm
):
home_line = self._parse_handicap_home_line(category_name)
if home_line is not None and "spread_home_line" not in parsed:
parsed["spread_home_line"] = home_line
if home_line is not None:
sel_map = {selection_name: odd_val}
self._set_basketball_handicap_odds(parsed, sel_map, home_line)
elif self._is_football_handicap_category(category_norm):
self._set_football_handicap_odds(parsed, {selection_name: odd_val})
return parsed
def _is_basketball_total_category(self, category_norm: str) -> bool:
if "alt/üst" not in category_norm and "alt/ust" not in category_norm:
return False
banned = (
"1. yarı",
"1. yari",
"periyot",
"ev sahibi",
"deplasman",
)
return not any(token in category_norm for token in banned)
def _is_first_half_ou05_category(self, category_norm: str) -> bool:
if "alt/üst" not in category_norm and "alt/ust" not in category_norm:
return False
if not any(
token in category_norm
for token in ("1. yarı", "1. yari", "ilk yarı", "ilk yari")
):
if not re.search(r"\biy\b", category_norm):
return False
# Exclude team-specific first-half totals (home/away) and non-goal props.
if any(token in category_norm for token in ("ev sahibi", "deplasman", "korner", "kart")):
return False
# Match only exact 0.5 line (avoid false positives like 100,5 / 90,5 in basketball totals).
for token in re.findall(r"\d+(?:[.,]\d+)?", category_norm):
try:
if abs(float(token.replace(",", ".")) - 0.5) < 1e-9:
return True
except Exception:
continue
return False
def _is_first_half_ou15_category(self, category_norm: str) -> bool:
if "alt/üst" not in category_norm and "alt/ust" not in category_norm:
return False
if not any(
token in category_norm
for token in ("1. yarı", "1. yari", "ilk yarı", "ilk yari")
):
if not re.search(r"\biy\b", category_norm):
return False
if any(token in category_norm for token in ("ev sahibi", "deplasman", "korner", "kart")):
return False
for token in re.findall(r"\d+(?:[.,]\d+)?", category_norm):
try:
if abs(float(token.replace(",", ".")) - 1.5) < 1e-9:
return True
except Exception:
continue
return False
def _is_cards_ou_category(self, category_norm: str) -> bool:
if "kart" not in category_norm and "card" not in category_norm:
return False
return "alt/üst" in category_norm or "alt/ust" in category_norm
def _is_football_handicap_category(self, category_norm: str) -> bool:
if any(token in category_norm for token in ("hnd. ms", "hand. ms", "hnd ms")):
return False
return any(
token in category_norm
for token in (
"handikapli maç sonucu",
"handikapli mac sonucu",
"handikaplı maç sonucu",
"hnd. maç sonucu",
"hnd. mac sonucu",
"hnd maç sonucu",
"hnd mac sonucu",
)
)
def _extract_parenthesized_number(self, category_name: str) -> Optional[float]:
if not category_name:
return None
try:
left = category_name.find("(")
right = category_name.find(")", left + 1)
if left < 0 or right < 0:
return None
raw = category_name[left + 1 : right].strip().replace(",", ".")
out = float(raw)
return out if out > 0 else None
except Exception:
return None
def _parse_handicap_home_line(self, category_name: str) -> Optional[float]:
if not category_name:
return None
try:
left = category_name.find("(")
right = category_name.find(")", left + 1)
if left < 0 or right < 0:
return None
payload = category_name[left + 1 : right].strip().replace(",", ".")
if ":" not in payload:
return None
home_raw, away_raw = payload.split(":", 1)
home_hcp = float(home_raw.strip())
away_hcp = float(away_raw.strip())
if abs(home_hcp) < 1e-6 and away_hcp > 0:
return -away_hcp
if home_hcp > 0 and abs(away_hcp) < 1e-6:
return home_hcp
if abs(home_hcp - away_hcp) < 1e-6 and home_hcp > 0:
return 0.0
except Exception:
return None
return None
def _set_basketball_handicap_odds(
self,
out: Dict[str, float],
selections: Dict[str, Any],
home_line: float,
) -> None:
if not isinstance(selections, dict):
return
has_home_plus = False
home_plus_odd = 0.0
one_odd = 0.0
two_odd = 0.0
for key, value in selections.items():
norm_key = self._normalize_text(key)
odd = self._to_float(value, 0.0)
if odd <= 1.0:
continue
if norm_key == "1":
one_odd = odd
elif norm_key == "2":
two_odd = odd
if "+h" in norm_key or norm_key.endswith("h"):
has_home_plus = True
home_plus_odd = odd
if home_line < 0:
# Home gives points. \"1\" normally means home -line covers.
if one_odd > 1.0:
out.setdefault("spread_h", one_odd)
if home_plus_odd > 1.0:
out.setdefault("spread_a", home_plus_odd)
elif two_odd > 1.0:
out.setdefault("spread_a", two_odd)
elif home_line > 0:
# Home receives points. +h entry or \"1\" means home side.
if home_plus_odd > 1.0:
out.setdefault("spread_h", home_plus_odd)
elif one_odd > 1.0:
out.setdefault("spread_h", one_odd)
if two_odd > 1.0:
out.setdefault("spread_a", two_odd)
else:
if one_odd > 1.0:
out.setdefault("spread_h", one_odd)
if two_odd > 1.0:
out.setdefault("spread_a", two_odd)
def _set_football_handicap_odds(
self,
out: Dict[str, float],
selections: Dict[str, Any],
) -> None:
if not isinstance(selections, dict):
return
for key, value in selections.items():
norm_key = self._normalize_text(key)
odd = self._to_float(value, 0.0)
if odd <= 1.0:
continue
if norm_key == "1":
out["hcap_h"] = odd
elif norm_key in ("x", "0"):
out["hcap_d"] = odd
elif norm_key == "2":
out["hcap_a"] = odd
def _parse_lineups_json(
self,
lineups_json: Any,
) -> Tuple[Optional[List[str]], Optional[List[str]]]:
if isinstance(lineups_json, str):
try:
lineups_json = json.loads(lineups_json)
except Exception:
lineups_json = None
if not isinstance(lineups_json, dict):
return None, None
def parse_side(side: str) -> Optional[List[str]]:
# Try direct access first (home/away at root level)
side_obj = lineups_json.get(side)
# Fallback: Check if inside "stats" key (Mackolik format)
if not isinstance(side_obj, (dict, list)):
stats = lineups_json.get("stats")
if isinstance(stats, dict):
side_obj = stats.get(side)
if not isinstance(side_obj, (dict, list)):
return None
# Try standard formats (xi, starting, lineup)
entries = None
if isinstance(side_obj, dict):
entries = side_obj.get("xi") or side_obj.get("starting") or side_obj.get("lineup")
# If the dict itself contains player dicts (no wrapper keys)
if not entries and "position" in side_obj:
# side_obj is likely a single player dict, wrap it
entries = [side_obj]
elif isinstance(side_obj, list):
# side_obj is already a list of players
entries = side_obj
if not isinstance(entries, list):
return None
ids: List[str] = []
for p in entries:
if isinstance(p, dict):
player_id = p.get("id") or p.get("playerId") or p.get("personId")
if player_id:
ids.append(str(player_id))
elif p:
ids.append(str(p))
return ids or None
return parse_side("home"), parse_side("away")