diff --git a/ai-engine/scripts/extract_upcoming_features.py b/ai-engine/scripts/extract_upcoming_features.py new file mode 100644 index 0000000..690beb3 --- /dev/null +++ b/ai-engine/scripts/extract_upcoming_features.py @@ -0,0 +1,198 @@ +""" +Extract Upcoming Features — leak-free feature rows for UPCOMING (NS) matches, +produced by the EXACT same pipeline that built training_data_v27.csv. +============================================================================= +Why this exists: the picker (generate_daily_picks.py) needs the 133 leak-free +features for tomorrow's matches, computed IDENTICALLY to training (any drift = +train/serve skew = broken model). So we reuse V27Loader + V27Extractor verbatim: + + 1. load_all() builds ELO / team history / league / squad caches from FT + matches ONLY (untouched — guarantees identical feature computation). + 2. We then APPEND upcoming NS matches as targets and inject their odds from + live_matches.odds (all markets, same mapping as the trainer's _load_odds). + 3. extract_all() replays FT chronologically (ELO fully built), then computes + features for the NS targets at the end. ELO update + labels are guarded + for null scores (NS has no result yet); the 133 model features never use + the current score, so they come out identical to training. + 4. Write ONLY the upcoming rows -> data/upcoming_features.csv + +Then: generate_daily_picks.py --features data/upcoming_features.csv --log + +Run nightly (heavy: full ELO replay, like training). Read-only on the DB. +""" +from __future__ import annotations +import csv +import json +import os +import sys +import time + +if sys.stdout and hasattr(sys.stdout, "reconfigure"): + try: + sys.stdout.reconfigure(encoding="utf-8") + except Exception: + pass + +AI_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, AI_DIR) + +from scripts.extract_training_data_v27 import ( # noqa: E402 + V27Loader, V27Extractor, ALL_COLS, get_conn, +) + +OUTPUT = os.path.join(AI_DIR, "data", "upcoming_features.csv") +DAYS_AHEAD = 4 + + +def map_live_odds(odds_json) -> dict: + """Map live_matches.odds JSON → odds_cache keys, IDENTICAL to the trainer's + _load_odds category/selection logic (so odds features match training).""" + out: dict = {} + if isinstance(odds_json, str): + try: + odds_json = json.loads(odds_json) + except Exception: + return out + if not isinstance(odds_json, dict): + return out + for cat, sels in odds_json.items(): + if not isinstance(sels, dict): + continue + c = str(cat).lower().strip() + for sel, val in sels.items(): + try: + v = float(val) + except (TypeError, ValueError): + continue + if v <= 0: + continue + sn = str(sel) + s = sn.lower().strip() + if c == "maç sonucu": + if sn == "1": out["ms_h"] = v + elif sn in ("0", "X"): out["ms_d"] = v + elif sn == "2": out["ms_a"] = v + elif c == "1. yarı sonucu": + if sn == "1": out["ht_ms_h"] = v + elif sn in ("0", "X"): out["ht_ms_d"] = v + elif sn == "2": out["ht_ms_a"] = v + elif c == "karşılıklı gol": + if "var" in s: out["btts_y"] = v + elif "yok" in s: out["btts_n"] = v + elif c == "0,5 alt/üst": + if "alt" in s: out["ou05_u"] = v + elif "üst" in s: out["ou05_o"] = v + elif c == "1,5 alt/üst": + if "alt" in s: out["ou15_u"] = v + elif "üst" in s: out["ou15_o"] = v + elif c == "2,5 alt/üst": + if "alt" in s: out["ou25_u"] = v + elif "üst" in s: out["ou25_o"] = v + elif c == "3,5 alt/üst": + if "alt" in s: out["ou35_u"] = v + elif "üst" in s: out["ou35_o"] = v + elif c == "1. yarı 0,5 alt/üst": + if "alt" in s: out["ht_ou05_u"] = v + elif "üst" in s: out["ht_ou05_o"] = v + elif c == "1. yarı 1,5 alt/üst": + if "alt" in s: out["ht_ou15_u"] = v + elif "üst" in s: out["ht_ou15_o"] = v + return out + + +class UpcomingExtractor(V27Extractor): + """Same feature computation as training; only guards null (NS) scores.""" + + def _update_elo(self, home_id, away_id, score_home, score_away): + if score_home is None or score_away is None: + return # upcoming match — no result, don't move ELO + return super()._update_elo(home_id, away_id, score_home, score_away) + + def _extract_one(self, mid, hid, aid, sh, sa, hth, hta, mst, lid, hn, an, ln): + if sh is None or sa is None: + # Upcoming TARGET. Dummy scores so label/total_goals don't crash; + # those columns are labels/LEAKY and are NOT among the 133 model + # features, so the served feature vector is identical to training. + row = super()._extract_one(mid, hid, aid, 0, 0, 0, 0, mst, lid, hn, an, ln) + if row: + row["_upcoming"] = 1 + return row + # FT match: needed ONLY to advance ELO (extract_all calls _update_elo + # afterwards regardless). Skip the expensive per-match feature + # computation — that turns a ~6h full extraction into seconds while + # producing the IDENTICAL final ELO the upcoming targets read. + return None + + +def main(): + t0 = time.time() + conn = get_conn() + + # ── Cheap check FIRST: are there upcoming matches with odds? ── + now_ms = int(time.time() * 1000) + hi_ms = now_ms + DAYS_AHEAD * 24 * 3600 * 1000 + cur = conn.cursor() + cur.execute( + """ + SELECT lm.id, lm.home_team_id, lm.away_team_id, lm.mst_utc, lm.league_id, + ht.name, at.name, l.name, lm.odds + FROM live_matches lm + JOIN teams ht ON ht.id = lm.home_team_id + JOIN teams at ON at.id = lm.away_team_id + JOIN leagues l ON l.id = lm.league_id + WHERE lm.sport = 'football' + AND lm.odds IS NOT NULL + AND lm.mst_utc > %s AND lm.mst_utc <= %s + ORDER BY lm.mst_utc ASC + """, + (now_ms, hi_ms), + ) + upcoming = cur.fetchall() + targets = [] + for mid, hid, aid, mst, lid, hn, an, ln, odds_json in upcoming: + oc = map_live_odds(odds_json) + if "ms_h" not in oc or "ms_a" not in oc: + continue # need MS odds for the policy + targets.append((mid, hid, aid, mst, lid, hn, an, ln, oc)) + print(f"Upcoming NS matches with MS odds (next {DAYS_AHEAD}d): {len(targets)}", flush=True) + if not targets: + print("⚠️ Nothing to extract. Deploy the 4-day window + let the odds cron\n" + " populate live_matches, then re-run.") + conn.close() + return + + print("📦 Loading FT history (ELO/form/league/squad caches; heavy) ...", flush=True) + loader = V27Loader(conn) + loader.load_all() + loader.load_league_matches() + print(f" FT matches: {len(loader.matches)}", flush=True) + + for mid, hid, aid, mst, lid, hn, an, ln, oc in targets: + loader.odds_cache[mid] = oc + loader.matches.append( + (mid, hid, aid, None, None, None, None, mst, lid, hn, an, ln) + ) + # NS targets must be processed AFTER all FT (ELO fully built) + loader.matches.sort(key=lambda m: m[7] if m[7] is not None else 0) + added = len(targets) + + print("🔄 Extracting features (FT replay + upcoming targets) ...", flush=True) + ext = UpcomingExtractor(conn, loader) + rows = ext.extract_all() + up_rows = [r for r in rows if r.get("_upcoming")] + + os.makedirs(os.path.dirname(OUTPUT), exist_ok=True) + with open(OUTPUT, "w", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=ALL_COLS, extrasaction="ignore") + w.writeheader() + w.writerows(up_rows) + + with_odds = sum(1 for r in up_rows if r.get("odds_ms_h", 0) and r["odds_ms_h"] > 0) + print(f"\n✅ Wrote {len(up_rows)} upcoming feature rows ({with_odds} with MS odds) → {OUTPUT}") + print(f" Time: {(time.time()-t0)/60:.1f} min") + print(" Next: python scripts/generate_daily_picks.py --features data/upcoming_features.csv --log") + conn.close() + + +if __name__ == "__main__": + main() diff --git a/src/tasks/data-fetcher.task.ts b/src/tasks/data-fetcher.task.ts index c662883..27c8a80 100755 --- a/src/tasks/data-fetcher.task.ts +++ b/src/tasks/data-fetcher.task.ts @@ -23,9 +23,8 @@ import { import { TaskLockService } from "./task-lock.service"; import { FeederService } from "../modules/feeder/feeder.service"; -// ──────────────────────────────────────────────────────────────── + // Types -// ──────────────────────────────────────────────────────────────── interface LiveScoreTeamPayload { id: string; @@ -93,9 +92,9 @@ interface PendingPredictionRunForSettlement { type SportType = "football" | "basketball"; -// ──────────────────────────────────────────────────────────────── + // Service -// ──────────────────────────────────────────────────────────────── + @Injectable() export class DataFetcherTask { @@ -108,7 +107,7 @@ export class DataFetcherTask { private readonly scraper: FeederScraperService, private readonly taskLock: TaskLockService, private readonly feeder: FeederService, - ) {} + ) { } // ──────────────────────────────────────────────────────────── // CRON 1: Main sync — every 15 minutes @@ -200,10 +199,20 @@ export class DataFetcherTask { this.logger.log("syncLiveMatches START"); + // 4-day forward window: pull today .. +3 days so upcoming matches enter + // live_matches early and their odds are refreshed every cycle. That rolling + // refresh is what lets the odds-movement / steam monitor (and forward CLV) + // see a real opening→closing range. Finished/live matches are already + // excluded from odds re-fetch in fetchOddsForMatches(), so closed-match + // odds and their ranges are never re-pulled. + const SYNC_DAYS_AHEAD = 4; const today = getDateStringInTimeZone(new Date(), this.timeZone); - const tomorrow = getShiftedDateStringInTimeZone(1, this.timeZone); await this.syncMatchList(today); - await this.syncMatchList(tomorrow); + for (let dayOffset = 1; dayOffset < SYNC_DAYS_AHEAD; dayOffset++) { + await this.syncMatchList( + getShiftedDateStringInTimeZone(dayOffset, this.timeZone), + ); + } await this.updateLiveScores(); await this.archiveNewlyFinishedMatches(today); await this.settlePredictionRuns(); @@ -324,13 +333,13 @@ export class DataFetcherTask { const scoreAway = matchData.awayScore ?? null; const htScoreHome = this.asInt( matchData.score?.ht?.home ?? - matchData.htHomeScore ?? - matchData.homeHtScore, + matchData.htHomeScore ?? + matchData.homeHtScore, ); const htScoreAway = this.asInt( matchData.score?.ht?.away ?? - matchData.htAwayScore ?? - matchData.awayHtScore, + matchData.htAwayScore ?? + matchData.awayHtScore, ); const storedStatus = deriveStoredMatchStatus({ state: matchData.state, @@ -893,9 +902,9 @@ export class DataFetcherTask { ]); const sidelined = match.matchSlug ? await this.scraper.fetchSidelinedPlayers( - match.id, - match.matchSlug, - ) + match.id, + match.matchSlug, + ) : null; // Normalize to same home.xi/away.xi format used by processMatchOdds @@ -969,9 +978,9 @@ export class DataFetcherTask { const targetMatches = topLeagueIds.size > 0 ? allMatches.filter( - (m) => - !!m.competitionId && topLeagueIds.has(String(m.competitionId)), - ) + (m) => + !!m.competitionId && topLeagueIds.has(String(m.competitionId)), + ) : allMatches; if (targetMatches.length === 0) { @@ -1167,7 +1176,7 @@ export class DataFetcherTask { updatedAt: new Date(), }, }) - .catch(() => {}); + .catch(() => { }); this.logger.debug( `[${sport}] Marked as POSTPONED: ${match.matchName}`, ); @@ -1240,6 +1249,77 @@ export class DataFetcherTask { // (Preserved from original — no logic changes) // ──────────────────────────────────────────────────────────── + /** + * Persist live odds movement into the structured odd_selections / odds_history + * tables (the live odds elsewhere are only a JSON blob on live_matches). This + * is what makes opening→closing ranges and steam queryable in SQL. Mirrors the + * historical feeder's change-detection (feeder-persistence.service.ts) but for + * the live HTML odds format. Change-only writes; ALL markets in the payload + * are tracked (so anomalies/steam on any bet type are captured). + */ + private async persistOddsHistory( + matchId: string, + odds: Record>, + ): Promise { + try { + const cats = await this.prisma.oddCategory.findMany({ + where: { matchId }, + include: { selections: true }, + }); + for (const [catName, sels] of Object.entries(odds)) { + let category = cats.find((c) => c.name === catName); + if (!category) { + category = await this.prisma.oddCategory.create({ + data: { matchId, name: catName }, + include: { selections: true }, + }); + cats.push(category); + } + for (const [selName, value] of Object.entries(sels)) { + const sVal = String(value); + if (!selName || !sVal || !(value > 0)) continue; + const existing = category.selections.find((s) => s.name === selName); + if (existing) { + if (existing.oddValue !== sVal) { + const oldVal = parseFloat(existing.oddValue || "0"); + const newVal = parseFloat(sVal); + if (!isNaN(oldVal) && !isNaN(newVal) && oldVal > 0) { + await this.prisma.oddsHistory.create({ + data: { + selectionId: existing.dbId, + matchId, + previousValue: oldVal, + newValue: newVal, + }, + }); + } + await this.prisma.oddSelection.update({ + where: { dbId: existing.dbId }, + data: { oddValue: sVal }, + }); + existing.oddValue = sVal; + } + } else { + const created = await this.prisma.oddSelection.create({ + data: { + categoryId: category.dbId, + name: selName, + oddValue: sVal, + openingValue: sVal, + }, + }); + category.selections.push(created); + } + } + } + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + this.logger.debug( + `odds_history persist skipped for ${matchId}: ${message}`, + ); + } + } + private async processMatchOdds(match: LiveMatchOddsTarget): Promise { const matchSlug = match.matchSlug || "match"; const sport = String(match.sport || "football").toLowerCase(); @@ -1408,6 +1488,13 @@ export class DataFetcherTask { }, }); + // Fill the structured odds_history table from this pre-match odds refresh, + // so opening→closing ranges & steam are captured in the DB (not just the + // live_matches JSON blob). Runs only for pre-match matches reached here. + if (Object.keys(odds).length > 0) { + await this.persistOddsHistory(match.id, odds); + } + if ( Object.keys(odds).length > 0 || refereeName || @@ -1651,9 +1738,9 @@ export class DataFetcherTask { const score = this.isRecord(value.score) ? { - home: this.asInt(value.score.home), - away: this.asInt(value.score.away), - } + home: this.asInt(value.score.home), + away: this.asInt(value.score.away), + } : null; return {