@@ -0,0 +1,183 @@
|
||||
"""
|
||||
Odds Movement Monitor — forward steam / odds-anomaly ("şike" signal) detector.
|
||||
=============================================================================
|
||||
The only viable version of "detect odds manipulation": capture upcoming-match
|
||||
odds PERIODICALLY and flag abnormal moves (steam = a price shortening fast =
|
||||
money/information arriving, sometimes a fixed match). Retrospective detection is
|
||||
impossible here (odds_history empty); this builds the time-series going forward.
|
||||
|
||||
No schema change: snapshots append to data/odds_snapshots.jsonl (reads
|
||||
live_matches.odds, which the feeder refreshes every 15 min).
|
||||
|
||||
Run --snapshot every ~15-20 min (scheduler). Run --report anytime to see the
|
||||
current movement watchlist.
|
||||
|
||||
For a CLOSING-time bettor the use is mainly a RISK FILTER: a match with heavy
|
||||
unexplained late steam against your pick = the market knows something you don't
|
||||
→ skip it. (Profiting from steam needs betting BEFORE it, i.e. early.)
|
||||
|
||||
Usage:
|
||||
python scripts/monitor_odds_movement.py --snapshot # capture now (cron this)
|
||||
python scripts/monitor_odds_movement.py --report # show movement watchlist
|
||||
python scripts/monitor_odds_movement.py --report --min-move 0.10
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import argparse, json, os, sys, time, datetime
|
||||
from collections import defaultdict
|
||||
|
||||
if sys.stdout and hasattr(sys.stdout, "reconfigure"):
|
||||
try: sys.stdout.reconfigure(encoding="utf-8")
|
||||
except Exception: pass
|
||||
|
||||
AI_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.insert(0, AI_DIR)
|
||||
SNAP = os.path.join(AI_DIR, "data", "odds_snapshots.jsonl")
|
||||
|
||||
# markets tracked for steam (Turkish keys as stored in live_matches.odds)
|
||||
TRACK = {"Maç Sonucu": ["1", "X", "2"],
|
||||
"2,5 Alt/Üst": ["Üst", "Alt"],
|
||||
"Karşılıklı Gol": ["Var", "Yok"]}
|
||||
|
||||
|
||||
def _conn():
|
||||
from data.db import get_clean_dsn
|
||||
import psycopg2
|
||||
last = None
|
||||
for _ in range(3):
|
||||
try:
|
||||
return psycopg2.connect(get_clean_dsn())
|
||||
except Exception as e:
|
||||
last = e; time.sleep(1.2)
|
||||
raise last
|
||||
|
||||
|
||||
def _f(x):
|
||||
try: return float(x)
|
||||
except (TypeError, ValueError): return None
|
||||
|
||||
|
||||
def snapshot():
|
||||
from psycopg2.extras import RealDictCursor
|
||||
now_ms = int(time.time() * 1000)
|
||||
n = 0
|
||||
with _conn() as c:
|
||||
with c.cursor(cursor_factory=RealDictCursor) as cur:
|
||||
cur.execute("""SELECT id, mst_utc, odds FROM live_matches
|
||||
WHERE odds IS NOT NULL AND mst_utc > %s
|
||||
ORDER BY mst_utc ASC""", (now_ms - 2*3600*1000,))
|
||||
rows = cur.fetchall()
|
||||
os.makedirs(os.path.dirname(SNAP), exist_ok=True)
|
||||
with open(SNAP, "a", encoding="utf-8") as f:
|
||||
for r in rows:
|
||||
odds = r["odds"]
|
||||
if isinstance(odds, str):
|
||||
try: odds = json.loads(odds)
|
||||
except Exception: continue
|
||||
if not isinstance(odds, dict): continue
|
||||
compact = {}
|
||||
for cat, sels in TRACK.items():
|
||||
cm = odds.get(cat)
|
||||
if isinstance(cm, dict):
|
||||
vals = {s: _f(cm.get(s)) for s in sels if _f(cm.get(s))}
|
||||
if vals: compact[cat] = vals
|
||||
if not compact: continue
|
||||
f.write(json.dumps({"ts": now_ms, "match_id": r["id"],
|
||||
"mst_utc": r["mst_utc"], "odds": compact},
|
||||
ensure_ascii=False) + "\n")
|
||||
n += 1
|
||||
print(f"[snapshot] {datetime.datetime.now():%Y-%m-%d %H:%M} captured {n} upcoming matches -> {SNAP}")
|
||||
|
||||
|
||||
def _names(ids):
|
||||
try:
|
||||
from psycopg2.extras import RealDictCursor
|
||||
ids = [str(i) for i in ids]
|
||||
if not ids: return {}
|
||||
with _conn() as c:
|
||||
with c.cursor(cursor_factory=RealDictCursor) as cur:
|
||||
cur.execute("""SELECT m.id, ht.name h, at.name a
|
||||
FROM matches m JOIN teams ht ON ht.id=m.home_team_id
|
||||
JOIN teams at ON at.id=m.away_team_id WHERE m.id = ANY(%s)""", (ids,))
|
||||
return {str(r["id"]): f"{r['h']} v {r['a']}" for r in cur.fetchall()}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def report(min_move):
|
||||
if not os.path.exists(SNAP):
|
||||
print("No snapshots yet. Schedule '--snapshot' every ~15-20 min first."); return
|
||||
series = defaultdict(list) # match_id -> [(ts, mst, odds_compact), ...]
|
||||
with open(SNAP, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
try: d = json.loads(line)
|
||||
except Exception: continue
|
||||
series[d["match_id"]].append((d["ts"], d.get("mst_utc"), d["odds"]))
|
||||
|
||||
now_ms = int(time.time()*1000)
|
||||
flagged = []
|
||||
for mid, snaps in series.items():
|
||||
if len(snaps) < 2: continue
|
||||
snaps.sort(key=lambda x: x[0])
|
||||
mst = snaps[-1][1]
|
||||
# focus on MS market
|
||||
def ms(snap): return snap[2].get("Maç Sonucu", {})
|
||||
op, la = ms(snaps[0]), ms(snaps[-1])
|
||||
best = None # most-SHORTENED side = the steam (money/info) signal
|
||||
for sel in ("1", "X", "2"):
|
||||
o0, o1 = op.get(sel), la.get(sel)
|
||||
if o0 and o1 and o0 > 1.0 and o1 > 1.0:
|
||||
drift = (o1 - o0) / o0 # negative = shortened = steam
|
||||
if best is None or drift < best[4]:
|
||||
best = (abs(drift), sel, o0, o1, drift)
|
||||
if best and abs(best[4]) >= min_move:
|
||||
# velocity: biggest single-step move on that selection
|
||||
sel = best[1]; steps = [s[2].get("Maç Sonucu", {}).get(sel) for s in snaps]
|
||||
steps = [x for x in steps if x]
|
||||
vmax = 0.0
|
||||
for i in range(1, len(steps)):
|
||||
if steps[i-1]:
|
||||
vmax = max(vmax, abs(steps[i]-steps[i-1])/steps[i-1])
|
||||
flagged.append((best[0], mid, best[1], best[2], best[3], best[4], vmax,
|
||||
len(snaps), mst))
|
||||
flagged.sort(reverse=True)
|
||||
names = _names([f[1] for f in flagged[:30]])
|
||||
|
||||
print("="*84)
|
||||
print("ODDS MOVEMENT WATCHLIST (MS market; drift = (last-open)/open; ↓ = shortened = steam)")
|
||||
print("="*84)
|
||||
if not flagged:
|
||||
print(f" No matches moved >= {min_move:.0%} yet. (Need more snapshots over time;")
|
||||
print(" monitor only sees movement once it has captured several snapshots.)")
|
||||
# still show coverage
|
||||
multi = sum(1 for s in series.values() if len(s) >= 2)
|
||||
print(f"\n coverage: {len(series)} matches tracked, {multi} with >=2 snapshots.")
|
||||
return
|
||||
print(f" {'match':<34}{'side':>5}{'open':>7}{'last':>7}{'drift':>8}{'maxStep':>8}{'snaps':>6}")
|
||||
print(" "+"-"*78)
|
||||
for ab, mid, sel, o0, o1, drift, vmax, ns, mst in flagged[:25]:
|
||||
nm = (names.get(mid, mid) or mid)[:32]
|
||||
arrow = "↓steam" if drift < 0 else "↑drift"
|
||||
ko = ""
|
||||
if mst:
|
||||
mins = (mst - now_ms)/60000
|
||||
ko = f" KO~{mins/60:.1f}h" if mins > 0 else " (started)"
|
||||
print(f" {nm:<34}{sel:>5}{o0:>7.2f}{o1:>7.2f}{100*drift:>+7.1f}%{100*vmax:>+7.1f}%{ns:>6} {arrow}{ko}")
|
||||
print(f"\n {len(flagged)} matches flagged (moved >= {min_move:.0%}).")
|
||||
print(" ↓steam on a side = market backing it hard (info/possible fix). As a closing")
|
||||
print(" bettor: treat heavy late steam AGAINST your pick as a reason to SKIP.")
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description=__doc__)
|
||||
ap.add_argument("--snapshot", action="store_true")
|
||||
ap.add_argument("--report", action="store_true")
|
||||
ap.add_argument("--min-move", type=float, default=0.08, help="flag drift >= this fraction (default 0.08)")
|
||||
args = ap.parse_args()
|
||||
if args.snapshot:
|
||||
snapshot()
|
||||
if args.report or not args.snapshot:
|
||||
report(args.min_move)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user