Files
iddaai-be/ai-engine/services/betting_brain.py
T
fahricansecer b6d64b59bf
Deploy Iddaai Backend / build-and-deploy (push) Failing after 2m6s
main
2026-05-12 02:43:02 +03:00

613 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Deterministic betting judge for prediction packages.
The model layer estimates event probabilities. BettingBrain decides whether
those probabilities are trustworthy enough to risk money.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
class BettingBrain:
MIN_ODDS = 1.30
MIN_BET_SCORE = 72.0
MIN_WATCH_SCORE = 62.0
MIN_BAND_SAMPLE = 8
HARD_DIVERGENCE = 0.22
SOFT_DIVERGENCE = 0.14
EXTREME_MODEL_PROB = 0.85
EXTREME_GAP = 0.30
# Vetoes that is_value_sniper bypasses (does NOT bypass odds_below_minimum)
SNIPER_BYPASSABLE_VETOES = {"calibrated_confidence_too_low", "play_score_too_low"}
# Trap market: market implied probability massively exceeds historical band hit rate
TRAP_MARKET_GAP = 0.10
MARKET_PRIORS = {
"DC": 4.0,
"OU15": 3.0,
"OU25": 2.0,
"BTTS": 0.0,
"MS": -2.0,
"OU35": -2.0,
"HT": -6.0,
"HTFT": -12.0,
"CARDS": -5.0,
"OE": -8.0,
}
def judge(self, package: Dict[str, Any]) -> Dict[str, Any]:
v27_engine = package.get("v27_engine")
if not isinstance(v27_engine, dict):
return package
guarded = dict(package)
rows = self._collect_rows(guarded)
if not rows:
return guarded
judged_rows: Dict[str, Dict[str, Any]] = {}
decisions: List[Dict[str, Any]] = []
for row in rows:
key = self._row_key(row)
judged = self._judge_row(dict(row), guarded)
judged_rows[key] = judged
decisions.append(judged["betting_brain"])
approved = [
row for row in judged_rows.values()
if row.get("betting_brain", {}).get("action") == "BET"
]
watchlist = [
row for row in judged_rows.values()
if row.get("betting_brain", {}).get("action") == "WATCH"
]
no_value = [
row for row in judged_rows.values()
if row.get("betting_brain", {}).get("action") == "WATCH_NO_VALUE"
]
approved.sort(key=self._candidate_sort_key, reverse=True)
watchlist.sort(key=self._candidate_sort_key, reverse=True)
no_value.sort(key=self._candidate_sort_key, reverse=True)
original_main = guarded.get("main_pick") or {}
main_pick = None
decision = "NO_BET"
decision_reason = "No candidate passed the betting brain evidence gates."
if approved:
main_pick = dict(approved[0])
main_pick["is_guaranteed"] = bool(main_pick.get("betting_brain", {}).get("score", 0.0) >= 82.0)
main_pick["pick_reason"] = "betting_brain_approved"
decision = "BET"
decision_reason = main_pick.get("betting_brain", {}).get("summary", "Evidence is aligned.")
elif watchlist:
main_pick = dict(watchlist[0])
self._force_no_bet(main_pick, "betting_brain_watchlist")
decision = "WATCHLIST"
decision_reason = main_pick.get("betting_brain", {}).get("summary", "Interesting but not clean enough.")
elif no_value:
# B-1: model agrees with a low-odds market — surface it so the user
# sees the read, but explicitly mark as not-playable.
main_pick = dict(no_value[0])
self._force_no_bet(main_pick, "betting_brain_no_value_odds_below_minimum")
decision = "WATCH_NO_VALUE"
decision_reason = "Model favoriyle hemfikir ama oran bahis için çok düşük — bilgi amaçlı gösteriliyor."
elif original_main:
main_pick = dict(judged_rows.get(self._row_key(original_main), original_main))
self._force_no_bet(main_pick, "betting_brain_no_safe_pick")
main_key = self._row_key(main_pick) if main_pick else ""
supporting = [
dict(row)
for row in judged_rows.values()
if self._row_key(row) != main_key
]
supporting.sort(key=self._candidate_sort_key, reverse=True)
bet_summary = [
self._summary_item(row)
for row in sorted(judged_rows.values(), key=self._candidate_sort_key, reverse=True)
]
guarded["main_pick"] = main_pick
guarded["value_pick"] = self._pick_value_candidate(judged_rows, main_key)
guarded["supporting_picks"] = supporting[:6]
guarded["bet_summary"] = bet_summary
playable = decision == "BET" and bool(main_pick and main_pick.get("playable"))
advice = dict(guarded.get("bet_advice") or {})
advice["playable"] = playable
advice["suggested_stake_units"] = float(main_pick.get("stake_units", 0.0)) if playable and main_pick else 0.0
advice["reason"] = "betting_brain_approved" if playable else "betting_brain_no_bet"
advice["decision"] = decision
advice["confidence_band"] = self._decision_band(main_pick)
guarded["bet_advice"] = advice
rejected = [d for d in decisions if d.get("action") == "REJECT"]
guarded["betting_brain"] = {
"version": "judge-v1",
"decision": decision,
"reason": decision_reason,
"main_pick_key": main_key or None,
"approved_count": len(approved),
"watchlist_count": len(watchlist),
"rejected_count": len(rejected),
"top_candidates": self._top_decisions(decisions),
"rules": {
"min_bet_score": self.MIN_BET_SCORE,
"min_watch_score": self.MIN_WATCH_SCORE,
"min_band_sample": self.MIN_BAND_SAMPLE,
"hard_divergence": self.HARD_DIVERGENCE,
"soft_divergence": self.SOFT_DIVERGENCE,
"extreme_model_probability": self.EXTREME_MODEL_PROB,
"extreme_model_market_gap": self.EXTREME_GAP,
},
}
guarded["upper_brain"] = guarded["betting_brain"]
guarded.setdefault("analysis_details", {})
guarded["analysis_details"]["betting_brain_applied"] = True
guarded["analysis_details"]["betting_brain_decision"] = decision
return guarded
def _judge_row(self, row: Dict[str, Any], package: Dict[str, Any]) -> Dict[str, Any]:
market = str(row.get("market") or "")
pick = str(row.get("pick") or "")
model_prob = self._market_probability(row, package)
odds = self._safe_float(row.get("odds"), 0.0) or 0.0
implied = (1.0 / odds) if odds > 1.0 else 0.0
model_gap = (model_prob - implied) if model_prob is not None and implied > 0 else None
calibrated_conf = self._safe_float(row.get("calibrated_confidence", row.get("confidence")), 0.0) or 0.0
play_score = self._safe_float(row.get("play_score"), 0.0) or 0.0
ev_edge = self._safe_float(row.get("ev_edge", row.get("edge")), 0.0) or 0.0
v27_prob = self._v27_probability(market, pick, package.get("v27_engine") or {})
divergence = abs(model_prob - v27_prob) if model_prob is not None and v27_prob is not None else None
triple_key = self._triple_key(market, pick)
triple = self._triple_value(package, triple_key)
band_sample = int(self._safe_float((triple or {}).get("band_sample"), 0.0) or 0.0)
triple_is_value = bool((triple or {}).get("is_value"))
consensus = str((package.get("v27_engine") or {}).get("consensus") or "").upper()
positives: List[str] = []
issues: List[str] = []
vetoes: List[str] = []
score = 0.0
if row.get("playable"):
score += 18.0
positives.append("base_model_playable")
else:
score -= 18.0
issues.append("base_model_not_playable")
is_value_sniper = bool(row.get("is_value_sniper"))
if is_value_sniper:
score += 35.0
positives.append("value_sniper_override")
score += max(0.0, min(20.0, calibrated_conf * 0.22))
score += max(-8.0, min(16.0, ev_edge * 45.0))
score += max(0.0, min(14.0, play_score * 0.12))
score += self.MARKET_PRIORS.get(market, -3.0)
data_quality = package.get("data_quality") or {}
quality_score = self._safe_float(data_quality.get("score"), 0.6) or 0.6
score += max(-8.0, min(6.0, (quality_score - 0.55) * 16.0))
risk = str((package.get("risk") or {}).get("level") or "MEDIUM").upper()
score += {"LOW": 5.0, "MEDIUM": 0.0, "HIGH": -12.0, "EXTREME": -22.0}.get(risk, -4.0)
if odds < self.MIN_ODDS:
vetoes.append("odds_below_minimum")
if calibrated_conf < 38.0 and not is_value_sniper:
vetoes.append("calibrated_confidence_too_low")
if play_score < 50.0 and not is_value_sniper:
vetoes.append("play_score_too_low")
if divergence is not None:
if divergence >= self.HARD_DIVERGENCE and not is_value_sniper:
score -= 42.0
vetoes.append("v25_v27_hard_disagreement")
elif divergence >= self.SOFT_DIVERGENCE:
score -= 18.0
issues.append("v25_v27_soft_disagreement")
else:
score += 11.0
positives.append("v25_v27_aligned")
# Trap market detection: market overpriced vs historical band hit rate
trap_market_flag = False
trap_market_gap = None
if isinstance(triple, dict):
band_rate_val = self._safe_float(triple.get("band_rate"))
implied_val = self._safe_float(triple.get("implied_prob"))
if (
band_rate_val is not None
and implied_val is not None
and band_sample >= self.MIN_BAND_SAMPLE
and (implied_val - band_rate_val) > self.TRAP_MARKET_GAP
):
trap_market_flag = True
trap_market_gap = round(implied_val - band_rate_val, 4)
score -= 14.0
issues.append("trap_market_market_overpriced")
if isinstance(triple, dict):
if triple_is_value:
score += 18.0
positives.append("triple_value_confirmed")
elif market in {"DC", "MS", "OU25", "BTTS"}:
score -= 18.0
issues.append("triple_value_not_confirmed")
if band_sample >= 25:
score += 8.0
positives.append("strong_historical_sample")
elif band_sample >= self.MIN_BAND_SAMPLE:
score += 3.0
positives.append("usable_historical_sample")
else:
score -= 16.0
issues.append("historical_sample_too_low")
if market == "DC" and not is_value_sniper:
vetoes.append("dc_without_historical_sample")
elif market in {"MS", "DC", "OU25"}:
score -= 10.0
issues.append("missing_triple_value_evidence")
if consensus == "DISAGREE" and market in {"MS", "DC"}:
score -= 12.0
issues.append("engine_consensus_disagree")
if (
model_prob is not None
and model_gap is not None
and model_prob >= self.EXTREME_MODEL_PROB
and model_gap >= self.EXTREME_GAP
and not triple_is_value
and not is_value_sniper
):
score -= 24.0
vetoes.append("extreme_probability_without_evidence")
if market in {"HT", "HTFT", "OE"} and score < 86.0 and not is_value_sniper:
vetoes.append("volatile_market_requires_exceptional_evidence")
# Sniper override: bypass eligible vetoes when value sniper triggered
sniper_bypassed: List[str] = []
if is_value_sniper and vetoes:
remaining = []
for v in vetoes:
if v in self.SNIPER_BYPASSABLE_VETOES:
sniper_bypassed.append(v)
else:
remaining.append(v)
vetoes = remaining
if sniper_bypassed:
positives.append("sniper_bypassed_soft_vetoes")
score = max(0.0, min(100.0, score))
action = "BET"
if vetoes:
# B-1: when only veto is odds_below_minimum, switch to WATCH_NO_VALUE
# so user still sees model commentary instead of blank rejection.
if vetoes == ["odds_below_minimum"]:
action = "WATCH_NO_VALUE"
else:
action = "REJECT"
elif score < self.MIN_WATCH_SCORE and not is_value_sniper:
action = "REJECT"
elif score < self.MIN_BET_SCORE and not is_value_sniper:
action = "WATCH"
row["betting_brain"] = {
"action": action,
"score": round(score, 1),
"summary": self._summary(action, market, pick, positives, issues, vetoes),
"positives": positives[:5],
"issues": issues[:6],
"vetoes": vetoes[:6],
"sniper_bypassed": sniper_bypassed,
"trap_market_flag": trap_market_flag,
"trap_market_gap": trap_market_gap,
"model_prob": round(model_prob, 4) if model_prob is not None else None,
"implied_prob": round(implied, 4),
"model_market_gap": round(model_gap, 4) if model_gap is not None else None,
"v27_prob": round(v27_prob, 4) if v27_prob is not None else None,
"divergence": round(divergence, 4) if divergence is not None else None,
"triple_key": triple_key,
"triple_value": triple,
}
if action != "BET":
self._force_no_bet(row, f"betting_brain_{action.lower()}")
else:
row["is_guaranteed"] = bool(score >= 82.0)
row["pick_reason"] = "betting_brain_approved"
row["stake_units"] = self._brain_stake(row, score)
row["bet_grade"] = "A" if score >= 82.0 else "B"
row["playable"] = True
self._append_reason(row, f"betting_brain_{action.lower()}_{round(score)}")
return row
def _collect_rows(self, package: Dict[str, Any]) -> List[Dict[str, Any]]:
rows: Dict[str, Dict[str, Any]] = {}
for source in ("main_pick", "value_pick"):
item = package.get(source)
if isinstance(item, dict) and item.get("market"):
# print(f"DEBUG: {source} is_value_sniper: {item.get('is_value_sniper')}")
rows[self._row_key(item)] = dict(item)
for source in ("supporting_picks", "bet_summary"):
for item in package.get(source) or []:
if isinstance(item, dict) and item.get("market"):
key = self._row_key(item)
rows[key] = self._merge_row(rows.get(key), item)
# B-2: ensure both MS sides (and DC sides) have an entry — give user the
# model's read on the opposite outcome even when upstream filtered it out.
self._inject_reference_rows(rows, package)
return list(rows.values())
def _inject_reference_rows(
self,
rows: Dict[str, Dict[str, Any]],
package: Dict[str, Any],
) -> None:
market_board = package.get("market_board") or {}
ms_board = market_board.get("MS") if isinstance(market_board, dict) else None
if not isinstance(ms_board, dict):
return
probs = ms_board.get("probs") if isinstance(ms_board.get("probs"), dict) else {}
if not probs:
return
# Pull MS odds from any existing MS row to estimate the missing side's odds
existing_odds_by_pick: Dict[str, float] = {}
for row in rows.values():
if str(row.get("market")) == "MS":
pick = str(row.get("pick"))
odd = self._safe_float(row.get("odds"), 0.0) or 0.0
if pick and odd > 1.0:
existing_odds_by_pick[pick] = odd
for pick in ("1", "X", "2"):
key = f"MS:{pick}"
if key in rows:
continue
prob = self._safe_float(probs.get(pick), 0.0)
if prob is None or prob <= 0.0:
continue
implied_odd = round(1.0 / prob, 2) if prob > 0.01 else 0.0
ref_odd = existing_odds_by_pick.get(pick) or implied_odd
rows[key] = {
"market": "MS",
"pick": pick,
"probability": round(prob, 4),
"confidence": round(prob * 100.0, 1),
"raw_confidence": round(prob * 100.0, 1),
"calibrated_confidence": round(prob * 100.0, 1),
"odds": ref_odd,
"is_underdog_reference": True,
"playable": False,
"stake_units": 0.0,
"bet_grade": "PASS",
"decision_reasons": ["underdog_reference_for_completeness"],
}
@staticmethod
def _merge_row(existing: Optional[Dict[str, Any]], incoming: Dict[str, Any]) -> Dict[str, Any]:
if existing is None:
return dict(incoming)
merged = dict(incoming)
merged.update({k: v for k, v in existing.items() if v is not None})
for key in ("decision_reasons", "reasons"):
reasons = list(existing.get(key) or []) + list(incoming.get(key) or [])
if reasons:
merged[key] = list(dict.fromkeys(reasons))
return merged
def _pick_value_candidate(self, rows: Dict[str, Dict[str, Any]], main_key: str) -> Optional[Dict[str, Any]]:
candidates = [
row for key, row in rows.items()
if key != main_key
and row.get("betting_brain", {}).get("action") in {"BET", "WATCH"}
and (self._safe_float(row.get("odds"), 0.0) or 0.0) >= 1.60
]
candidates.sort(key=self._candidate_sort_key, reverse=True)
return dict(candidates[0]) if candidates else None
def _summary_item(self, row: Dict[str, Any]) -> Dict[str, Any]:
reasons = list(row.get("decision_reasons") or row.get("reasons") or [])
return {
"market": row.get("market"),
"pick": row.get("pick"),
"raw_confidence": row.get("raw_confidence", row.get("confidence")),
"calibrated_confidence": row.get("calibrated_confidence", row.get("confidence")),
"bet_grade": row.get("bet_grade", "PASS"),
"playable": bool(row.get("playable")),
"stake_units": float(row.get("stake_units", 0.0) or 0.0),
"play_score": row.get("play_score", 0.0),
"ev_edge": row.get("ev_edge", row.get("edge", 0.0)),
"implied_prob": row.get("implied_prob", 0.0),
"odds_reliability": row.get("odds_reliability", 0.35),
"odds": row.get("odds", 0.0),
"reasons": reasons[:6],
"is_underdog_reference": bool(row.get("is_underdog_reference")),
"betting_brain": row.get("betting_brain"),
}
@staticmethod
def _candidate_sort_key(row: Dict[str, Any]) -> Tuple[float, float, float]:
brain = row.get("betting_brain") or {}
action_boost = {"BET": 2.0, "WATCH": 1.0, "REJECT": 0.0}.get(str(brain.get("action")), 0.0)
return (
action_boost,
float(brain.get("score", 0.0) or 0.0),
float(row.get("play_score", 0.0) or 0.0),
)
@staticmethod
def _row_key(row: Optional[Dict[str, Any]]) -> str:
if not isinstance(row, dict):
return ""
return f"{row.get('market')}:{row.get('pick')}"
def _force_no_bet(self, row: Dict[str, Any], reason: str) -> None:
row["playable"] = False
row["stake_units"] = 0.0
row["bet_grade"] = "PASS"
row["is_guaranteed"] = False
row["pick_reason"] = reason
if row.get("signal_tier") == "CORE":
row["signal_tier"] = "PASS"
self._append_reason(row, reason)
@staticmethod
def _append_reason(row: Dict[str, Any], reason: str) -> None:
key = "decision_reasons" if "decision_reasons" in row else "reasons"
reasons = list(row.get(key) or [])
if reason not in reasons:
reasons.append(reason)
row[key] = reasons[:6]
def _brain_stake(self, row: Dict[str, Any], score: float) -> float:
existing = self._safe_float(row.get("stake_units"), 0.0) or 0.0
odds = self._safe_float(row.get("odds"), 0.0) or 0.0
if odds <= 1.0:
return 0.0
cap = 2.0 if score >= 82.0 else 1.2
if score < 78.0:
cap = 0.8
return round(max(0.25, min(existing if existing > 0 else cap, cap)), 1)
@staticmethod
def _decision_band(main_pick: Optional[Dict[str, Any]]) -> str:
if not main_pick:
return "LOW"
score = float((main_pick.get("betting_brain") or {}).get("score", 0.0) or 0.0)
if score >= 82.0:
return "HIGH"
if score >= 72.0:
return "MEDIUM"
return "LOW"
@staticmethod
def _top_decisions(decisions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
ordered = sorted(decisions, key=lambda d: float(d.get("score", 0.0) or 0.0), reverse=True)
return [
{
"action": item.get("action"),
"score": item.get("score"),
"summary": item.get("summary"),
"vetoes": item.get("vetoes", []),
"issues": item.get("issues", []),
}
for item in ordered[:5]
]
@staticmethod
def _summary(action: str, market: str, pick: str, positives: List[str], issues: List[str], vetoes: List[str]) -> str:
if action == "BET":
return f"{market} {pick} approved: evidence is aligned enough for a controlled stake."
if action == "WATCH":
return f"{market} {pick} is interesting but not clean enough for stake."
if action == "WATCH_NO_VALUE":
return f"{market} {pick}: model favoriyle hemfikir, fakat oran ({', '.join(vetoes[:1]) or 'düşük'}) bahis için yetersiz."
if vetoes:
return f"{market} {pick} rejected: {', '.join(vetoes[:3])}."
if issues:
return f"{market} {pick} rejected: {', '.join(issues[:3])}."
return f"{market} {pick} rejected by evidence score."
def _market_probability(self, row: Dict[str, Any], package: Dict[str, Any]) -> Optional[float]:
direct = self._safe_float(row.get("probability"))
if direct is not None:
return direct
board = package.get("market_board") or {}
payload = board.get(str(row.get("market") or "")) if isinstance(board, dict) else None
probs = payload.get("probs") if isinstance(payload, dict) else None
if not isinstance(probs, dict):
return None
key = self._prob_key(str(row.get("market") or ""), str(row.get("pick") or ""))
return self._safe_float(probs.get(key)) if key else None
def _v27_probability(self, market: str, pick: str, v27_engine: Dict[str, Any]) -> Optional[float]:
predictions = v27_engine.get("predictions") or {}
ms = predictions.get("ms") or {}
ou25 = predictions.get("ou25") or {}
if market == "MS":
return self._safe_float(ms.get({"1": "home", "X": "draw", "2": "away"}.get(pick, "")))
if market == "DC":
home = self._safe_float(ms.get("home"), 0.0) or 0.0
draw = self._safe_float(ms.get("draw"), 0.0) or 0.0
away = self._safe_float(ms.get("away"), 0.0) or 0.0
return {"1X": home + draw, "X2": draw + away, "12": home + away}.get(pick)
if market == "OU25":
key = self._prob_key(market, pick)
return self._safe_float(ou25.get(key)) if key else None
return None
def _triple_value(self, package: Dict[str, Any], key: Optional[str]) -> Optional[Dict[str, Any]]:
if not key:
return None
value = ((package.get("v27_engine") or {}).get("triple_value") or {}).get(key)
return value if isinstance(value, dict) else None
def _triple_key(self, market: str, pick: str) -> Optional[str]:
prob_key = self._prob_key(market, pick)
if market == "MS":
return {"1": "home", "2": "away"}.get(pick)
if market == "DC" and pick.upper() in {"1X", "X2", "12"}:
return f"dc_{pick.lower()}"
if market in {"OU15", "OU25", "OU35"} and prob_key == "over":
return f"{market.lower()}_over"
if market == "BTTS" and prob_key == "yes":
return "btts_yes"
if market == "HT":
return {"1": "ht_home", "2": "ht_away"}.get(pick)
if market in {"HT_OU05", "HT_OU15"} and prob_key == "over":
return f"{market.lower()}_over"
if market == "OE" and prob_key == "odd":
return "oe_odd"
if market == "CARDS" and prob_key == "over":
return "cards_over"
if market == "HTFT" and "/" in pick:
return f"htft_{pick.replace('/', '').lower()}"
return None
@staticmethod
def _prob_key(market: str, pick: str) -> Optional[str]:
norm = str(pick or "").strip().casefold()
if market in {"MS", "HT", "HCAP"}:
return pick if pick in {"1", "X", "2"} else None
if market == "DC":
return pick.upper() if pick.upper() in {"1X", "X2", "12"} else None
if market in {"OU15", "OU25", "OU35", "HT_OU05", "HT_OU15", "CARDS"}:
if "over" in norm or "ust" in norm or "üst" in norm:
return "over"
if "under" in norm or "alt" in norm:
return "under"
if market == "BTTS":
if "yes" in norm or "var" in norm:
return "yes"
if "no" in norm or "yok" in norm:
return "no"
if market == "OE":
if "odd" in norm or "tek" in norm:
return "odd"
if "even" in norm or "cift" in norm or "çift" in norm:
return "even"
if market == "HTFT" and "/" in pick:
return pick
return None
@staticmethod
def _safe_float(value: Any, default: Optional[float] = None) -> Optional[float]:
try:
return float(value)
except (TypeError, ValueError):
return default