From c3e44ee6979ee93e82f68dfc4e8663a21240829c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fahri=20Can=20Se=C3=A7er?= Date: Sun, 7 Jun 2026 22:50:33 +0300 Subject: [PATCH] gg65 --- ai-engine/scripts/monitor_odds_movement.py | 255 +++++++++------------ src/modules/matches/matches.controller.ts | 16 ++ src/modules/matches/matches.service.ts | 45 ++++ 3 files changed, 164 insertions(+), 152 deletions(-) diff --git a/ai-engine/scripts/monitor_odds_movement.py b/ai-engine/scripts/monitor_odds_movement.py index 28eeb21..039a085 100644 --- a/ai-engine/scripts/monitor_odds_movement.py +++ b/ai-engine/scripts/monitor_odds_movement.py @@ -1,28 +1,26 @@ """ -Odds Movement Monitor — forward steam / odds-anomaly ("şike" signal) detector. -============================================================================= -The only viable version of "detect odds manipulation": capture upcoming-match -odds PERIODICALLY and flag abnormal moves (steam = a price shortening fast = -money/information arriving, sometimes a fixed match). Retrospective detection is -impossible here (odds_history empty); this builds the time-series going forward. +Odds Movement Monitor — opening→closing line movement + steam radar. +=================================================================== +Reads live_odds_history (filled by data-fetcher.task.ts every 15 min for +upcoming matches, all markets) and reports, PER MATCH: + * opening odd (first capture) vs closing odd (latest capture) + * total move % = (closing - opening) / opening ← the headline signal + * the steam side (the selection that shortened the most = money/info/şike) -No schema change: snapshots append to data/odds_snapshots.jsonl (reads -live_matches.odds, which the feeder refreshes every 15 min). +Why opening→closing matters: it is the market's TOTAL revision. A side that +shortened a lot from open to close = the market learned something. If you can +bet EARLY (before the shortening), that gap is real value (positive CLV) — the +one realistic edge vs İddaa. As a closing bettor it's a RISK FILTER: heavy +late steam against your pick = skip. -Run --snapshot every ~15-20 min (scheduler). Run --report anytime to see the -current movement watchlist. - -For a CLOSING-time bettor the use is mainly a RISK FILTER: a match with heavy -unexplained late steam against your pick = the market knows something you don't -→ skip it. (Profiting from steam needs betting BEFORE it, i.e. early.) +Capture is done by the NestJS cron now (DB); this is a pure READER. Usage: - python scripts/monitor_odds_movement.py --snapshot # capture now (cron this) - python scripts/monitor_odds_movement.py --report # show movement watchlist - python scripts/monitor_odds_movement.py --report --min-move 0.10 + python scripts/monitor_odds_movement.py # MS movers + python scripts/monitor_odds_movement.py --min-move 0.08 --market "Maç Sonucu" """ from __future__ import annotations -import argparse, json, os, sys, time, datetime +import argparse, os, sys, time from collections import defaultdict if sys.stdout and hasattr(sys.stdout, "reconfigure"): @@ -31,152 +29,105 @@ if sys.stdout and hasattr(sys.stdout, "reconfigure"): AI_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, AI_DIR) -SNAP = os.path.join(AI_DIR, "data", "odds_snapshots.jsonl") - -# markets tracked for steam (Turkish keys as stored in live_matches.odds) -TRACK = {"Maç Sonucu": ["1", "X", "2"], - "2,5 Alt/Üst": ["Üst", "Alt"], - "Karşılıklı Gol": ["Var", "Yok"]} +from data.db import get_clean_dsn # noqa: E402 +import psycopg2 # noqa: E402 +from psycopg2.extras import RealDictCursor # noqa: E402 -def _conn(): - from data.db import get_clean_dsn - import psycopg2 +def connect(): last = None - for _ in range(3): + for _ in range(8): try: return psycopg2.connect(get_clean_dsn()) except Exception as e: - last = e; time.sleep(1.2) + last = e; time.sleep(3) raise last -def _f(x): - try: return float(x) - except (TypeError, ValueError): return None - - -def snapshot(): - from psycopg2.extras import RealDictCursor - now_ms = int(time.time() * 1000) - n = 0 - with _conn() as c: - with c.cursor(cursor_factory=RealDictCursor) as cur: - cur.execute("""SELECT id, mst_utc, odds FROM live_matches - WHERE odds IS NOT NULL AND mst_utc > %s - ORDER BY mst_utc ASC""", (now_ms - 2*3600*1000,)) - rows = cur.fetchall() - os.makedirs(os.path.dirname(SNAP), exist_ok=True) - with open(SNAP, "a", encoding="utf-8") as f: - for r in rows: - odds = r["odds"] - if isinstance(odds, str): - try: odds = json.loads(odds) - except Exception: continue - if not isinstance(odds, dict): continue - compact = {} - for cat, sels in TRACK.items(): - cm = odds.get(cat) - if isinstance(cm, dict): - vals = {s: _f(cm.get(s)) for s in sels if _f(cm.get(s))} - if vals: compact[cat] = vals - if not compact: continue - f.write(json.dumps({"ts": now_ms, "match_id": r["id"], - "mst_utc": r["mst_utc"], "odds": compact}, - ensure_ascii=False) + "\n") - n += 1 - print(f"[snapshot] {datetime.datetime.now():%Y-%m-%d %H:%M} captured {n} upcoming matches -> {SNAP}") - - -def _names(ids): - try: - from psycopg2.extras import RealDictCursor - ids = [str(i) for i in ids] - if not ids: return {} - with _conn() as c: - with c.cursor(cursor_factory=RealDictCursor) as cur: - cur.execute("""SELECT m.id, ht.name h, at.name a - FROM matches m JOIN teams ht ON ht.id=m.home_team_id - JOIN teams at ON at.id=m.away_team_id WHERE m.id = ANY(%s)""", (ids,)) - return {str(r["id"]): f"{r['h']} v {r['a']}" for r in cur.fetchall()} - except Exception: - return {} - - -def report(min_move): - if not os.path.exists(SNAP): - print("No snapshots yet. Schedule '--snapshot' every ~15-20 min first."); return - series = defaultdict(list) # match_id -> [(ts, mst, odds_compact), ...] - with open(SNAP, encoding="utf-8") as f: - for line in f: - try: d = json.loads(line) - except Exception: continue - series[d["match_id"]].append((d["ts"], d.get("mst_utc"), d["odds"])) - - now_ms = int(time.time()*1000) - flagged = [] - for mid, snaps in series.items(): - if len(snaps) < 2: continue - snaps.sort(key=lambda x: x[0]) - mst = snaps[-1][1] - # focus on MS market - def ms(snap): return snap[2].get("Maç Sonucu", {}) - op, la = ms(snaps[0]), ms(snaps[-1]) - best = None # most-SHORTENED side = the steam (money/info) signal - for sel in ("1", "X", "2"): - o0, o1 = op.get(sel), la.get(sel) - if o0 and o1 and o0 > 1.0 and o1 > 1.0: - drift = (o1 - o0) / o0 # negative = shortened = steam - if best is None or drift < best[4]: - best = (abs(drift), sel, o0, o1, drift) - if best and abs(best[4]) >= min_move: - # velocity: biggest single-step move on that selection - sel = best[1]; steps = [s[2].get("Maç Sonucu", {}).get(sel) for s in snaps] - steps = [x for x in steps if x] - vmax = 0.0 - for i in range(1, len(steps)): - if steps[i-1]: - vmax = max(vmax, abs(steps[i]-steps[i-1])/steps[i-1]) - flagged.append((best[0], mid, best[1], best[2], best[3], best[4], vmax, - len(snaps), mst)) - flagged.sort(reverse=True) - names = _names([f[1] for f in flagged[:30]]) - - print("="*84) - print("ODDS MOVEMENT WATCHLIST (MS market; drift = (last-open)/open; ↓ = shortened = steam)") - print("="*84) - if not flagged: - print(f" No matches moved >= {min_move:.0%} yet. (Need more snapshots over time;") - print(" monitor only sees movement once it has captured several snapshots.)") - # still show coverage - multi = sum(1 for s in series.values() if len(s) >= 2) - print(f"\n coverage: {len(series)} matches tracked, {multi} with >=2 snapshots.") - return - print(f" {'match':<34}{'side':>5}{'open':>7}{'last':>7}{'drift':>8}{'maxStep':>8}{'snaps':>6}") - print(" "+"-"*78) - for ab, mid, sel, o0, o1, drift, vmax, ns, mst in flagged[:25]: - nm = (names.get(mid, mid) or mid)[:32] - arrow = "↓steam" if drift < 0 else "↑drift" - ko = "" - if mst: - mins = (mst - now_ms)/60000 - ko = f" KO~{mins/60:.1f}h" if mins > 0 else " (started)" - print(f" {nm:<34}{sel:>5}{o0:>7.2f}{o1:>7.2f}{100*drift:>+7.1f}%{100*vmax:>+7.1f}%{ns:>6} {arrow}{ko}") - print(f"\n {len(flagged)} matches flagged (moved >= {min_move:.0%}).") - print(" ↓steam on a side = market backing it hard (info/possible fix). As a closing") - print(" bettor: treat heavy late steam AGAINST your pick as a reason to SKIP.") - - def main(): ap = argparse.ArgumentParser(description=__doc__) - ap.add_argument("--snapshot", action="store_true") - ap.add_argument("--report", action="store_true") - ap.add_argument("--min-move", type=float, default=0.08, help="flag drift >= this fraction (default 0.08)") + ap.add_argument("--min-move", type=float, default=0.05, + help="flag matches whose focus-market move >= this fraction (default 0.05)") + ap.add_argument("--market", default="Maç Sonucu", help="focus market for the watchlist") + ap.add_argument("--limit", type=int, default=25) args = ap.parse_args() - if args.snapshot: - snapshot() - if args.report or not args.snapshot: - report(args.min_move) + + with connect() as c, c.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute("SELECT to_regclass('public.live_odds_history') AS ex") + if not cur.fetchall()[0]["ex"]: + print("live_odds_history yok — NestJS cron'u henüz yazmamış (deploy/build kontrol)."); return + + # opening (earliest) + closing (latest) per match/market/selection + cur.execute(""" + SELECT match_id, market, selection, + (array_agg(new_value ORDER BY change_time ASC))[1] AS opening, + (array_agg(new_value ORDER BY change_time DESC))[1] AS closing, + count(*) AS ticks + FROM live_odds_history + GROUP BY match_id, market, selection + """) + rows = cur.fetchall() + if not rows: + print("live_odds_history boş (henüz yakalama yok)."); return + + # per match aggregation + by_match = defaultdict(lambda: {"focus": {}, "any_ticks": 0, "max_abs": 0.0}) + for r in rows: + mid = r["match_id"]; o = r["opening"]; cl = r["closing"] + d = by_match[mid] + d["any_ticks"] = max(d["any_ticks"], r["ticks"]) + if o and cl and o > 0: + mv = (cl - o) / o + d["max_abs"] = max(d["max_abs"], abs(mv)) + if r["market"] == args.market: + d["focus"][r["selection"]] = (o, cl, mv) + + # team names + kickoff + ids = list(by_match.keys()) + names = {} + if ids: + cur.execute("""SELECT lm.id, ht.name h, at.name a, lm.mst_utc + FROM live_matches lm + JOIN teams ht ON ht.id=lm.home_team_id + JOIN teams at ON at.id=lm.away_team_id + WHERE lm.id = ANY(%s)""", (ids,)) + for r in cur.fetchall(): + names[r["id"]] = (f"{r['h']} v {r['a']}", r["mst_utc"]) + + moved = [(m, d) for m, d in by_match.items() if d["any_ticks"] > 1] + print("="*78) + print("ODDS MOVEMENT — açılış→kapanış (live_odds_history)") + print("="*78) + print(f"izlenen maç: {len(by_match)} | hareket başlamış (>1 yakalama): {len(moved)}") + if not moved: + print("\nHenüz hareket yok — hepsi tek yakalama (açılış). Oranlar oynadıkça dolacak.") + print("(NestJS 15-dk cron'u her tazelemede değişen oranı ekliyor.)") + return + + flagged = sorted( + [(m, d) for m, d in moved if d["focus"] and d["max_abs"] >= args.min_move], + key=lambda x: -x[1]["max_abs"], + ) + now = int(time.time()*1000) + print(f"\n{args.market} hareketi >= %{args.min_move*100:.0f} olan maçlar:") + print(f" {'maç':<32}{'sel':>5}{'açılış':>8}{'kapanış':>9}{'hareket':>9}") + print(" "+"-"*64) + for mid, d in flagged[:args.limit]: + nm, mst = names.get(mid, (mid[:30], None)) + ko = "" + if mst: + mins = (mst-now)/60000 + ko = f" KO~{mins/60:.1f}h" if mins > 0 else " (başladı)" + # steam side = most shortened (most negative move) + steam = min(d["focus"].items(), key=lambda kv: kv[1][2]) + print(f" {nm[:30]:<32}{'':>5}{'':>8}{'':>9}{'':>9}{ko}") + for sel, (o, cl, mv) in d["focus"].items(): + tag = " ↓STEAM" if sel == steam[0] and mv < 0 else "" + print(f" {'':<32}{sel:>5}{o:>8.2f}{cl:>9.2f}{100*mv:>+8.1f}%{tag}") + if not flagged: + print(" (eşiği geçen yok — hareketler küçük)") + print("\nOKUMA: kapanışta oynuyorsan, pick'ine KARŞI ↓STEAM olan maçı PAS geç.") + print("Erken oynayabiliyorsan, kısalan tarafı açılışta yakalamak = gerçek değer (CLV).") if __name__ == "__main__": diff --git a/src/modules/matches/matches.controller.ts b/src/modules/matches/matches.controller.ts index d031f82..23d8260 100755 --- a/src/modules/matches/matches.controller.ts +++ b/src/modules/matches/matches.controller.ts @@ -111,6 +111,22 @@ export class MatchesController { return this.matchesService.getActiveLeagues(sport || Sport.FOOTBALL); } + /** + * GET /matches/:id/odds-movement + * Opening→closing odds movement per market/selection (from live_odds_history) + */ + @Public() + @Get(":id/odds-movement") + @ApiOperation({ summary: "Opening→closing odds movement for a match" }) + @ApiParam({ name: "id", description: "Match ID" }) + @ApiResponse({ status: 200, description: "{ market: { selection: { open, close } } }" }) + async getOddsMovement(@Param("id") id: string) { + if (!id) { + throw new BadRequestException("Match ID is required"); + } + return this.matchesService.getOddsMovement(id); + } + /** * GET /matches/:id * Get full match details diff --git a/src/modules/matches/matches.service.ts b/src/modules/matches/matches.service.ts index cc27959..39c7d83 100755 --- a/src/modules/matches/matches.service.ts +++ b/src/modules/matches/matches.service.ts @@ -28,6 +28,51 @@ export class MatchesService { this.loadTopLeagues(); } + /** + * Per-match odds movement (opening→closing) from live_odds_history. + * Returns { [market]: { [selection]: { open, close } } } with the same + * Turkish market/selection labels used in match.odds, so the UI can line + * them up directly. Returns {} if there is no data or the table is absent. + */ + async getOddsMovement( + matchId: string, + ): Promise>> { + try { + const rows = await this.prisma.$queryRawUnsafe< + Array<{ + market: string; + selection: string; + open: number | null; + close: number | null; + }> + >( + `SELECT market, selection, + (array_agg(new_value ORDER BY change_time ASC))[1] AS open, + (array_agg(new_value ORDER BY change_time DESC))[1] AS close + FROM live_odds_history + WHERE match_id = $1 + GROUP BY market, selection`, + matchId, + ); + const out: Record< + string, + Record + > = {}; + for (const r of rows) { + if (r.open == null || r.close == null) continue; + (out[r.market] ??= {})[r.selection] = { + open: Number(r.open), + close: Number(r.close), + }; + } + return out; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + this.logger.warn(`getOddsMovement failed for ${matchId}: ${msg}`); + return {}; + } + } + private loadTopLeagues() { try { const filePath = path.join(process.cwd(), "top_leagues.json");