Files
iddaai-be/ai-engine/services/betting_brain.py
T
fahricansecer 988ee2f50d Add backtest pipeline, betting_brain filters, score coherence + social v3
betting_brain.py:
- HARD_MIN_SAMPLES=50 floor for calibrator bypass
- ev_edge < 0 + >= 0.20 hard vetoes
- BTTS muted (grid search found no profitable config)
- Per-market optimal envelopes (MS, OU25)
- Score coherence filter: main_pick must agree with score prediction
- HTFT reversal cross-check for MS picks

feature_builder.py / data_loader.py:
- Real home/away_position from data (was hardcoded 10)
- Cup detection wired into UpsetEngine
- _estimate_league_position with 300-day season filter

New scripts:
- diagnostic_backtest.py: per-bet diagnostic backtest with loss patterns
- optimize_filters.py: grid search per-market optimal thresholds
- analyze_backtest_csv.py: root-cause hypothesis testing on CSV
- compare_backtests.py: side-by-side validation with verdict
- test_score_coherence.py: smoke test for coherence filter (20/20 pass)

Reports:
- diagnostic_backtest_20260525_024437 (50-match smoke)
- diagnostic_backtest_20260525_035649 (1000-match in-sample)
- filter_optimization_patch.json (grid search winners per market)

Social poster v3:
- satori + resvg HTML/CSS rendering pipeline
- Twemoji football/basketball + flag SVGs
- caption SEO: 12 curated hashtags per post
- image SEO: descriptive filenames + .json metadata sidecar
- /health, /preview-png, /run-now endpoints

Docs:
- mds/SESSION_HANDOFF.md: full session state for cross-machine continuity
- mds/SOCIAL_POSTER_SETUP.md: API keys + test commands

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 20:43:28 +03:00

898 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Deterministic betting judge for prediction packages.
The model layer estimates event probabilities. BettingBrain decides whether
those probabilities are trustworthy enough to risk money.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
class BettingBrain:
MIN_ODDS = 1.30
MIN_BET_SCORE = 72.0
MIN_WATCH_SCORE = 62.0
MIN_BAND_SAMPLE = 8
HARD_DIVERGENCE = 0.22
SOFT_DIVERGENCE = 0.14
EXTREME_MODEL_PROB = 0.85
EXTREME_GAP = 0.30
SNIPER_BYPASSABLE_VETOES = {"play_score_too_low"}
TRAP_MARKET_GAP = 0.10
MARKET_MIN_CONFIDENCE = {
"MS": 45.0,
"DC": 55.0,
"OU25": 48.0,
"OU15": 55.0,
"OU35": 42.0,
"BTTS": 48.0,
"HT": 55.0,
"HTFT": 65.0,
"OE": 55.0,
"CARDS": 50.0,
"HT_OU05": 55.0,
"HT_OU15": 50.0,
}
SNIPER_BLOCKED_MARKETS = {"HT", "HTFT", "OE", "CARDS", "HT_OU05", "HT_OU15"}
# Markets that lose money under every filter combination per the
# diagnostic backtest (1000 matches). Until calibration is rebuilt for
# these specifically, force NO_BET. Re-evaluate after each backtest run.
MUTED_MARKETS = {"BTTS"}
# Per-market optimal filter envelopes derived from the diagnostic
# backtest grid search (reports/filter_optimization_patch.json). Any
# pick falling OUTSIDE this envelope is vetoed. Tightens the playable
# band to the ROI-positive zone identified empirically.
#
# Each entry: {min_conf, min_edge, max_edge, min_odds, max_odds,
# min_reliability, require_v27_agree}
MARKET_OPTIMAL_FILTERS = {
"MS": {
"min_edge": -0.05, "max_edge": 0.15,
"min_odds": 1.20, "max_odds": 10.0,
"min_reliability": 0.0, "require_v27_agree": True,
},
"OU25": {
"min_edge": -1.0, "max_edge": 0.15,
"min_odds": 1.80, "max_odds": 10.0,
"min_reliability": 0.0, "require_v27_agree": False,
},
}
MARKET_PRIORS = {
"DC": 4.0,
"OU15": 3.0,
"OU25": 2.0,
"BTTS": 0.0,
"MS": -2.0,
"OU35": -2.0,
"HT": -10.0,
"HTFT": -18.0,
"CARDS": -8.0,
"OE": -12.0,
}
def judge(self, package: Dict[str, Any]) -> Dict[str, Any]:
v27_engine = package.get("v27_engine")
if not isinstance(v27_engine, dict):
return package
guarded = dict(package)
rows = self._collect_rows(guarded)
if not rows:
return guarded
judged_rows: Dict[str, Dict[str, Any]] = {}
decisions: List[Dict[str, Any]] = []
for row in rows:
key = self._row_key(row)
judged = self._judge_row(dict(row), guarded)
judged_rows[key] = judged
decisions.append(judged["betting_brain"])
approved = [
row for row in judged_rows.values()
if row.get("betting_brain", {}).get("action") == "BET"
]
watchlist = [
row for row in judged_rows.values()
if row.get("betting_brain", {}).get("action") == "WATCH"
]
no_value = [
row for row in judged_rows.values()
if row.get("betting_brain", {}).get("action") == "WATCH_NO_VALUE"
]
approved.sort(key=self._candidate_sort_key, reverse=True)
watchlist.sort(key=self._candidate_sort_key, reverse=True)
no_value.sort(key=self._candidate_sort_key, reverse=True)
# ── SCORE COHERENCE FILTER ──────────────────────────────────────
# If the model also produced a score prediction (e.g. 1-0), pick
# main_pick from the subset of candidates that would WIN at that
# score. Stops the system from recommending OU25 Üst while also
# predicting 1-0 (only 1 goal). Falls back to original list if no
# coherent candidate exists.
coherent_set = self._score_consistent_markets(guarded)
coherent_flag = False
if coherent_set:
def is_coherent(row: Dict[str, Any]) -> bool:
m = str(row.get("market") or "")
p = str(row.get("pick") or "")
return (m, p) in coherent_set
approved_coh = [r for r in approved if is_coherent(r)]
watchlist_coh = [r for r in watchlist if is_coherent(r)]
if approved_coh:
approved = approved_coh
coherent_flag = True
elif watchlist_coh:
# No coherent BET candidates — at least promote a coherent
# watch over an incoherent BET.
watchlist = watchlist_coh + [r for r in watchlist if not is_coherent(r)]
coherent_flag = True
# Tag every row so the UI/diagnostics can see what happened
for row in judged_rows.values():
row.setdefault("betting_brain", {})
row["betting_brain"]["score_coherent"] = is_coherent(row)
original_main = guarded.get("main_pick") or {}
main_pick = None
decision = "NO_BET"
decision_reason = "No candidate passed the betting brain evidence gates."
if approved:
main_pick = dict(approved[0])
main_pick["is_guaranteed"] = bool(main_pick.get("betting_brain", {}).get("score", 0.0) >= 82.0)
main_pick["pick_reason"] = "betting_brain_approved"
decision = "BET"
decision_reason = main_pick.get("betting_brain", {}).get("summary", "Evidence is aligned.")
elif watchlist:
main_pick = dict(watchlist[0])
self._force_no_bet(main_pick, "betting_brain_watchlist")
decision = "WATCHLIST"
decision_reason = main_pick.get("betting_brain", {}).get("summary", "Interesting but not clean enough.")
elif no_value:
# B-1: model agrees with a low-odds market — surface it so the user
# sees the read, but explicitly mark as not-playable.
main_pick = dict(no_value[0])
self._force_no_bet(main_pick, "betting_brain_no_value_odds_below_minimum")
decision = "WATCH_NO_VALUE"
decision_reason = "Model favoriyle hemfikir ama oran bahis için çok düşük — bilgi amaçlı gösteriliyor."
elif original_main:
main_pick = dict(judged_rows.get(self._row_key(original_main), original_main))
self._force_no_bet(main_pick, "betting_brain_no_safe_pick")
main_key = self._row_key(main_pick) if main_pick else ""
supporting = [
dict(row)
for row in judged_rows.values()
if self._row_key(row) != main_key
]
supporting.sort(key=self._candidate_sort_key, reverse=True)
bet_summary = [
self._summary_item(row)
for row in sorted(judged_rows.values(), key=self._candidate_sort_key, reverse=True)
]
guarded["main_pick"] = main_pick
guarded["value_pick"] = self._pick_value_candidate(judged_rows, main_key)
guarded["supporting_picks"] = supporting[:6]
guarded["bet_summary"] = bet_summary
playable = decision == "BET" and bool(main_pick and main_pick.get("playable"))
advice = dict(guarded.get("bet_advice") or {})
advice["playable"] = playable
advice["suggested_stake_units"] = float(main_pick.get("stake_units", 0.0)) if playable and main_pick else 0.0
advice["reason"] = "betting_brain_approved" if playable else "betting_brain_no_bet"
advice["decision"] = decision
advice["confidence_band"] = self._decision_band(main_pick)
guarded["bet_advice"] = advice
rejected = [d for d in decisions if d.get("action") == "REJECT"]
guarded["betting_brain"] = {
"version": "judge-v2-score-coherent",
"decision": decision,
"reason": decision_reason,
"main_pick_key": main_key or None,
"score_coherent_filter_applied": coherent_flag,
"approved_count": len(approved),
"watchlist_count": len(watchlist),
"rejected_count": len(rejected),
"top_candidates": self._top_decisions(decisions),
"rules": {
"min_bet_score": self.MIN_BET_SCORE,
"min_watch_score": self.MIN_WATCH_SCORE,
"min_band_sample": self.MIN_BAND_SAMPLE,
"hard_divergence": self.HARD_DIVERGENCE,
"soft_divergence": self.SOFT_DIVERGENCE,
"extreme_model_probability": self.EXTREME_MODEL_PROB,
"extreme_model_market_gap": self.EXTREME_GAP,
},
}
guarded["upper_brain"] = guarded["betting_brain"]
guarded.setdefault("analysis_details", {})
guarded["analysis_details"]["betting_brain_applied"] = True
guarded["analysis_details"]["betting_brain_decision"] = decision
return guarded
def _judge_row(self, row: Dict[str, Any], package: Dict[str, Any]) -> Dict[str, Any]:
market = str(row.get("market") or "")
pick = str(row.get("pick") or "")
model_prob = self._market_probability(row, package)
odds = self._safe_float(row.get("odds"), 0.0) or 0.0
implied = (1.0 / odds) if odds > 1.0 else 0.0
model_gap = (model_prob - implied) if model_prob is not None and implied > 0 else None
calibrated_conf = self._safe_float(row.get("calibrated_confidence", row.get("confidence")), 0.0) or 0.0
play_score = self._safe_float(row.get("play_score"), 0.0) or 0.0
ev_edge = self._safe_float(row.get("ev_edge", row.get("edge")), 0.0) or 0.0
v27_prob = self._v27_probability(market, pick, package.get("v27_engine") or {})
divergence = abs(model_prob - v27_prob) if model_prob is not None and v27_prob is not None else None
triple_key = self._triple_key(market, pick)
triple = self._triple_value(package, triple_key)
band_sample = int(self._safe_float((triple or {}).get("band_sample"), 0.0) or 0.0)
triple_is_value = bool((triple or {}).get("is_value"))
consensus = str((package.get("v27_engine") or {}).get("consensus") or "").upper()
positives: List[str] = []
issues: List[str] = []
vetoes: List[str] = []
score = 0.0
if row.get("playable"):
score += 18.0
positives.append("base_model_playable")
else:
score -= 18.0
issues.append("base_model_not_playable")
is_value_sniper = bool(row.get("is_value_sniper"))
if market in self.SNIPER_BLOCKED_MARKETS:
is_value_sniper = False
if is_value_sniper:
score += 20.0
positives.append("value_sniper_override")
score += max(0.0, min(20.0, calibrated_conf * 0.22))
score += max(-8.0, min(16.0, ev_edge * 45.0))
score += max(0.0, min(14.0, play_score * 0.12))
score += self.MARKET_PRIORS.get(market, -3.0)
data_quality = package.get("data_quality") or {}
quality_score = self._safe_float(data_quality.get("score"), 0.6) or 0.6
score += max(-8.0, min(6.0, (quality_score - 0.55) * 16.0))
risk = str((package.get("risk") or {}).get("level") or "MEDIUM").upper()
score += {"LOW": 5.0, "MEDIUM": 0.0, "HIGH": -12.0, "EXTREME": -22.0}.get(risk, -4.0)
# League reliability penalty: weak leagues produce unreliable raw probabilities.
# odds_reliability is pre-computed per-league from historical Brier score analysis.
odds_rel = self._safe_float(row.get("odds_reliability"), 0.35) or 0.35
if odds_rel < 0.30:
score -= 22.0
issues.append("very_low_reliability_league")
if market in {"MS", "DC", "OU25", "BTTS"} and not is_value_sniper:
vetoes.append("low_reliability_league_hard_block")
elif odds_rel < 0.45:
score -= 12.0
issues.append("low_reliability_league")
elif odds_rel < 0.55:
score -= 5.0
# Inferred features penalty: when ELO/form/H2H come from live enrichment
# (not pre-computed table), statistical quality is unknown — penalise hard.
dq_flags = list(data_quality.get("flags") or [])
if "ai_features_inferred_from_history" in dq_flags:
score -= 18.0
issues.append("inferred_statistical_features")
if odds < self.MIN_ODDS:
vetoes.append("odds_below_minimum")
min_conf = self.MARKET_MIN_CONFIDENCE.get(market, 45.0)
if calibrated_conf < min_conf:
vetoes.append("calibrated_confidence_too_low")
if play_score < 50.0 and not is_value_sniper:
vetoes.append("play_score_too_low")
# ── HARD EV-EDGE VETO ───────────────────────────────────────────
# Diagnostic backtest (1000 maç, 524 settled bet) gösterdi ki
# ev_edge < 0 olan bahisler %76 of all picks ve ROI yaklaşık -%16.
# ev_edge < 0 = "model market'in altında olasılık veriyor" = vig'i
# yiyemeyeceğimiz negative-EV bahis. Hard veto: oynama.
# Sniper override hâlâ geçer (yüksek convicted alternatif pick'ler).
if ev_edge < 0.0 and not is_value_sniper:
vetoes.append("negative_ev_edge")
issues.append(f"ev_edge={ev_edge:.3f}_below_zero")
# Trap edge: bizim diagnostic backtest'te ev_edge >= 0.20 olan tüm
# bahisler kaybediyordu (n=10, hepsi -%25+ ROI). Model market'i bu
# kadar yanlış buluyorsa muhtemelen modelin kendisinin yanlış olduğu
# bir senaryo (eksik info, tuhaf maç, vs.) — oynama.
if ev_edge >= 0.20 and not is_value_sniper:
vetoes.append("ev_edge_too_high_trap")
issues.append(f"ev_edge={ev_edge:.3f}_trap_range")
# ── MUTED MARKETS (grid search showed no profitable config) ──
if market in self.MUTED_MARKETS and not is_value_sniper:
vetoes.append("market_muted_by_backtest")
issues.append(f"market_{market}_muted")
# ── PER-MARKET OPTIMAL ENVELOPE (from grid search) ──
envelope = self.MARKET_OPTIMAL_FILTERS.get(market)
if envelope and not is_value_sniper:
if ev_edge < envelope["min_edge"]:
vetoes.append("outside_envelope_edge_low")
if ev_edge > envelope["max_edge"]:
vetoes.append("outside_envelope_edge_high")
if odds and odds < envelope["min_odds"]:
vetoes.append("outside_envelope_odds_low")
if odds and odds > envelope["max_odds"]:
vetoes.append("outside_envelope_odds_high")
if odds_rel < envelope["min_reliability"]:
vetoes.append("outside_envelope_reliability_low")
if envelope["require_v27_agree"] and consensus != "AGREE":
vetoes.append("outside_envelope_v27_must_agree")
if divergence is not None:
if divergence >= self.HARD_DIVERGENCE and not is_value_sniper:
score -= 42.0
vetoes.append("v25_v27_hard_disagreement")
elif divergence >= self.SOFT_DIVERGENCE:
score -= 18.0
issues.append("v25_v27_soft_disagreement")
else:
score += 11.0
positives.append("v25_v27_aligned")
# Trap market detection: market overpriced vs historical band hit rate
trap_market_flag = False
trap_market_gap = None
if isinstance(triple, dict):
band_rate_val = self._safe_float(triple.get("band_rate"))
implied_val = self._safe_float(triple.get("implied_prob"))
if (
band_rate_val is not None
and implied_val is not None
and band_sample >= self.MIN_BAND_SAMPLE
and (implied_val - band_rate_val) > self.TRAP_MARKET_GAP
):
trap_market_flag = True
trap_market_gap = round(implied_val - band_rate_val, 4)
score -= 14.0
issues.append("trap_market_market_overpriced")
if isinstance(triple, dict):
if triple_is_value:
score += 18.0
positives.append("triple_value_confirmed")
elif market in {"DC", "MS", "OU25", "BTTS"}:
score -= 18.0
issues.append("triple_value_not_confirmed")
if band_sample >= 25:
score += 8.0
positives.append("strong_historical_sample")
elif band_sample >= self.MIN_BAND_SAMPLE:
score += 3.0
positives.append("usable_historical_sample")
else:
score -= 16.0
issues.append("historical_sample_too_low")
if market == "DC" and not is_value_sniper:
vetoes.append("dc_without_historical_sample")
elif market in {"MS", "DC", "OU25"}:
score -= 10.0
issues.append("missing_triple_value_evidence")
if consensus == "DISAGREE" and market in {"MS", "DC"}:
score -= 12.0
issues.append("engine_consensus_disagree")
if (
model_prob is not None
and model_gap is not None
and model_prob >= self.EXTREME_MODEL_PROB
and model_gap >= self.EXTREME_GAP
and not triple_is_value
and not is_value_sniper
):
score -= 24.0
vetoes.append("extreme_probability_without_evidence")
if market in {"HT", "HTFT", "OE"} and score < 86.0:
vetoes.append("volatile_market_requires_exceptional_evidence")
# Cross-market reversal risk for MS/DC picks.
# If HTFT model says the *opposite* of our MS pick is likely (a
# reversal scenario like 1/2 or 2/1), the MS pick is fragile even
# when its own calibrated_confidence is high. The Manchester City
# 1-0 → 1-2 case is exactly this: MS=1 looked solid but HTFT 1/2
# carried real probability mass that the MS scorer ignored.
if market == "MS" and pick in ("1", "2"):
board = package.get("market_board") or {}
htft_payload = board.get("HTFT") if isinstance(board, dict) else None
htft_probs = (
htft_payload.get("probs", {})
if isinstance(htft_payload, dict) else {}
)
risk_payload = package.get("risk") or {}
if not htft_probs and isinstance(risk_payload, dict):
htft_probs = risk_payload.get("ht_ft_probs", {}) or {}
# Reversal outcomes that would make this MS pick lose despite
# the team leading at half-time / trailing at half-time.
if pick == "1":
# Picked home win. Threats: 1/2 (led, lost) and 1/X (led, drew).
reversal_keys = ("1/2", "1/X")
drift_keys = ("X/2",)
else:
# Picked away win. Threats: 2/1, 2/X. Drift: X/1.
reversal_keys = ("2/1", "2/X")
drift_keys = ("X/1",)
reversal_prob = sum(
self._safe_float(htft_probs.get(key), 0.0) or 0.0
for key in reversal_keys
)
drift_prob = sum(
self._safe_float(htft_probs.get(key), 0.0) or 0.0
for key in drift_keys
)
combined_risk = reversal_prob + 0.5 * drift_prob
if combined_risk >= 0.25:
score -= 28.0
vetoes.append("htft_reversal_risk_high")
issues.append(f"htft_reversal_prob={combined_risk:.2f}")
elif combined_risk >= 0.15:
score -= 14.0
issues.append(f"htft_reversal_prob_moderate={combined_risk:.2f}")
elif combined_risk >= 0.10:
score -= 6.0
issues.append(f"htft_reversal_prob_minor={combined_risk:.2f}")
# Sniper override: bypass eligible vetoes when value sniper triggered
sniper_bypassed: List[str] = []
if is_value_sniper and vetoes:
remaining = []
for v in vetoes:
if v in self.SNIPER_BYPASSABLE_VETOES:
sniper_bypassed.append(v)
else:
remaining.append(v)
vetoes = remaining
if sniper_bypassed:
positives.append("sniper_bypassed_soft_vetoes")
score = max(0.0, min(100.0, score))
action = "BET"
if vetoes:
# B-1: when only veto is odds_below_minimum, switch to WATCH_NO_VALUE
# so user still sees model commentary instead of blank rejection.
if vetoes == ["odds_below_minimum"]:
action = "WATCH_NO_VALUE"
else:
action = "REJECT"
elif score < self.MIN_WATCH_SCORE and not is_value_sniper:
action = "REJECT"
elif score < self.MIN_BET_SCORE and not is_value_sniper:
action = "WATCH"
row["betting_brain"] = {
"action": action,
"score": round(score, 1),
"summary": self._summary(action, market, pick, positives, issues, vetoes),
"positives": positives[:5],
"issues": issues[:6],
"vetoes": vetoes[:6],
"sniper_bypassed": sniper_bypassed,
"trap_market_flag": trap_market_flag,
"trap_market_gap": trap_market_gap,
"model_prob": round(model_prob, 4) if model_prob is not None else None,
"implied_prob": round(implied, 4),
"model_market_gap": round(model_gap, 4) if model_gap is not None else None,
"v27_prob": round(v27_prob, 4) if v27_prob is not None else None,
"divergence": round(divergence, 4) if divergence is not None else None,
"triple_key": triple_key,
"triple_value": triple,
}
if action != "BET":
self._force_no_bet(row, f"betting_brain_{action.lower()}")
else:
row["is_guaranteed"] = bool(score >= 82.0)
row["pick_reason"] = "betting_brain_approved"
row["stake_units"] = self._brain_stake(row, score)
row["bet_grade"] = "A" if score >= 82.0 else "B"
row["playable"] = True
self._append_reason(row, f"betting_brain_{action.lower()}_{round(score)}")
return row
def _collect_rows(self, package: Dict[str, Any]) -> List[Dict[str, Any]]:
rows: Dict[str, Dict[str, Any]] = {}
for source in ("main_pick", "value_pick"):
item = package.get(source)
if isinstance(item, dict) and item.get("market"):
# print(f"DEBUG: {source} is_value_sniper: {item.get('is_value_sniper')}")
rows[self._row_key(item)] = dict(item)
for source in ("supporting_picks", "bet_summary"):
for item in package.get(source) or []:
if isinstance(item, dict) and item.get("market"):
key = self._row_key(item)
rows[key] = self._merge_row(rows.get(key), item)
# B-2: ensure both MS sides (and DC sides) have an entry — give user the
# model's read on the opposite outcome even when upstream filtered it out.
self._inject_reference_rows(rows, package)
return list(rows.values())
def _inject_reference_rows(
self,
rows: Dict[str, Dict[str, Any]],
package: Dict[str, Any],
) -> None:
market_board = package.get("market_board") or {}
ms_board = market_board.get("MS") if isinstance(market_board, dict) else None
if not isinstance(ms_board, dict):
return
probs = ms_board.get("probs") if isinstance(ms_board.get("probs"), dict) else {}
if not probs:
return
# Pull MS odds from any existing MS row to estimate the missing side's odds
existing_odds_by_pick: Dict[str, float] = {}
for row in rows.values():
if str(row.get("market")) == "MS":
pick = str(row.get("pick"))
odd = self._safe_float(row.get("odds"), 0.0) or 0.0
if pick and odd > 1.0:
existing_odds_by_pick[pick] = odd
for pick in ("1", "X", "2"):
key = f"MS:{pick}"
if key in rows:
continue
prob = self._safe_float(probs.get(pick), 0.0)
if prob is None or prob <= 0.0:
continue
implied_odd = round(1.0 / prob, 2) if prob > 0.01 else 0.0
ref_odd = existing_odds_by_pick.get(pick) or implied_odd
rows[key] = {
"market": "MS",
"pick": pick,
"probability": round(prob, 4),
"confidence": round(prob * 100.0, 1),
"raw_confidence": round(prob * 100.0, 1),
"calibrated_confidence": round(prob * 100.0, 1),
"odds": ref_odd,
"is_underdog_reference": True,
"playable": False,
"stake_units": 0.0,
"bet_grade": "PASS",
"decision_reasons": ["underdog_reference_for_completeness"],
}
@staticmethod
def _merge_row(existing: Optional[Dict[str, Any]], incoming: Dict[str, Any]) -> Dict[str, Any]:
if existing is None:
return dict(incoming)
merged = dict(incoming)
merged.update({k: v for k, v in existing.items() if v is not None})
for key in ("decision_reasons", "reasons"):
reasons = list(existing.get(key) or []) + list(incoming.get(key) or [])
if reasons:
merged[key] = list(dict.fromkeys(reasons))
return merged
def _pick_value_candidate(self, rows: Dict[str, Dict[str, Any]], main_key: str) -> Optional[Dict[str, Any]]:
candidates = [
row for key, row in rows.items()
if key != main_key
and row.get("betting_brain", {}).get("action") in {"BET", "WATCH"}
and (self._safe_float(row.get("odds"), 0.0) or 0.0) >= 1.60
]
candidates.sort(key=self._candidate_sort_key, reverse=True)
return dict(candidates[0]) if candidates else None
def _summary_item(self, row: Dict[str, Any]) -> Dict[str, Any]:
reasons = list(row.get("decision_reasons") or row.get("reasons") or [])
return {
"market": row.get("market"),
"pick": row.get("pick"),
"raw_confidence": row.get("raw_confidence", row.get("confidence")),
"calibrated_confidence": row.get("calibrated_confidence", row.get("confidence")),
"bet_grade": row.get("bet_grade", "PASS"),
"playable": bool(row.get("playable")),
"stake_units": float(row.get("stake_units", 0.0) or 0.0),
"play_score": row.get("play_score", 0.0),
"ev_edge": row.get("ev_edge", row.get("edge", 0.0)),
"implied_prob": row.get("implied_prob", 0.0),
"odds_reliability": row.get("odds_reliability", 0.35),
"odds": row.get("odds", 0.0),
"reasons": reasons[:6],
"is_underdog_reference": bool(row.get("is_underdog_reference")),
"betting_brain": row.get("betting_brain"),
}
@staticmethod
def _candidate_sort_key(row: Dict[str, Any]) -> Tuple[float, float, float]:
brain = row.get("betting_brain") or {}
action_boost = {"BET": 2.0, "WATCH": 1.0, "REJECT": 0.0}.get(str(brain.get("action")), 0.0)
return (
action_boost,
float(brain.get("score", 0.0) or 0.0),
float(row.get("play_score", 0.0) or 0.0),
)
@staticmethod
def _row_key(row: Optional[Dict[str, Any]]) -> str:
if not isinstance(row, dict):
return ""
return f"{row.get('market')}:{row.get('pick')}"
def _force_no_bet(self, row: Dict[str, Any], reason: str) -> None:
row["playable"] = False
row["stake_units"] = 0.0
row["bet_grade"] = "PASS"
row["is_guaranteed"] = False
row["pick_reason"] = reason
if row.get("signal_tier") == "CORE":
row["signal_tier"] = "PASS"
self._append_reason(row, reason)
@staticmethod
def _append_reason(row: Dict[str, Any], reason: str) -> None:
key = "decision_reasons" if "decision_reasons" in row else "reasons"
reasons = list(row.get(key) or [])
if reason not in reasons:
reasons.append(reason)
row[key] = reasons[:6]
def _brain_stake(self, row: Dict[str, Any], score: float) -> float:
existing = self._safe_float(row.get("stake_units"), 0.0) or 0.0
odds = self._safe_float(row.get("odds"), 0.0) or 0.0
if odds <= 1.0:
return 0.0
cap = 2.0 if score >= 82.0 else 1.2
if score < 78.0:
cap = 0.8
return round(max(0.25, min(existing if existing > 0 else cap, cap)), 1)
@staticmethod
def _decision_band(main_pick: Optional[Dict[str, Any]]) -> str:
if not main_pick:
return "LOW"
score = float((main_pick.get("betting_brain") or {}).get("score", 0.0) or 0.0)
if score >= 82.0:
return "HIGH"
if score >= 72.0:
return "MEDIUM"
return "LOW"
@staticmethod
def _top_decisions(decisions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
ordered = sorted(decisions, key=lambda d: float(d.get("score", 0.0) or 0.0), reverse=True)
return [
{
"action": item.get("action"),
"score": item.get("score"),
"summary": item.get("summary"),
"vetoes": item.get("vetoes", []),
"issues": item.get("issues", []),
}
for item in ordered[:5]
]
@staticmethod
def _summary(action: str, market: str, pick: str, positives: List[str], issues: List[str], vetoes: List[str]) -> str:
if action == "BET":
return f"{market} {pick} approved: evidence is aligned enough for a controlled stake."
if action == "WATCH":
return f"{market} {pick} is interesting but not clean enough for stake."
if action == "WATCH_NO_VALUE":
return f"{market} {pick}: model favoriyle hemfikir, fakat oran ({', '.join(vetoes[:1]) or 'düşük'}) bahis için yetersiz."
if vetoes:
return f"{market} {pick} rejected: {', '.join(vetoes[:3])}."
if issues:
return f"{market} {pick} rejected: {', '.join(issues[:3])}."
return f"{market} {pick} rejected by evidence score."
def _market_probability(self, row: Dict[str, Any], package: Dict[str, Any]) -> Optional[float]:
direct = self._safe_float(row.get("probability"))
if direct is not None:
return direct
board = package.get("market_board") or {}
payload = board.get(str(row.get("market") or "")) if isinstance(board, dict) else None
probs = payload.get("probs") if isinstance(payload, dict) else None
if not isinstance(probs, dict):
return None
key = self._prob_key(str(row.get("market") or ""), str(row.get("pick") or ""))
return self._safe_float(probs.get(key)) if key else None
def _v27_probability(self, market: str, pick: str, v27_engine: Dict[str, Any]) -> Optional[float]:
predictions = v27_engine.get("predictions") or {}
ms = predictions.get("ms") or {}
ou25 = predictions.get("ou25") or {}
if market == "MS":
return self._safe_float(ms.get({"1": "home", "X": "draw", "2": "away"}.get(pick, "")))
if market == "DC":
home = self._safe_float(ms.get("home"), 0.0) or 0.0
draw = self._safe_float(ms.get("draw"), 0.0) or 0.0
away = self._safe_float(ms.get("away"), 0.0) or 0.0
return {"1X": home + draw, "X2": draw + away, "12": home + away}.get(pick)
if market == "OU25":
key = self._prob_key(market, pick)
return self._safe_float(ou25.get(key)) if key else None
return None
def _score_consistent_markets(self, package: Dict[str, Any]) -> Optional[set]:
"""Build the set of (market, pick) tuples that WOULD WIN if the
model's own score prediction came true. We use this as a coherence
gate: if the model is confident about a 1-0 outcome but also wants
to play OU25 Üst, those two beliefs contradict each other — and the
score prediction is the more informative one because it aggregates
all market signals into a single most-likely scenario.
Returns None if the score prediction is missing or malformed; in
that case we skip the coherence check.
"""
score_pred = package.get("score_prediction") or {}
ft_raw = str(score_pred.get("ft") or score_pred.get("full_time") or "").strip()
ht_raw = str(score_pred.get("ht") or score_pred.get("half_time") or "").strip()
def parse(s: str) -> Optional[Tuple[int, int]]:
for sep in ("-", ":", ""):
if sep in s:
parts = s.split(sep, 1)
try:
return int(parts[0].strip()), int(parts[1].strip())
except (ValueError, IndexError):
return None
return None
ft = parse(ft_raw)
if ft is None:
return None
ht = parse(ht_raw)
fh, fa = ft
total = fh + fa
consistent: set = set()
# MS / 1X2 — single outcome
if fh > fa:
consistent.add(("MS", "1"))
consistent.add(("ML", "1"))
elif fh < fa:
consistent.add(("MS", "2"))
consistent.add(("ML", "2"))
else:
consistent.add(("MS", "X"))
consistent.add(("ML", "X"))
# DC — two of three legs win at any score
if fh >= fa:
consistent.add(("DC", "1X"))
if fh <= fa:
consistent.add(("DC", "X2"))
if fh != fa:
consistent.add(("DC", "12"))
# Over/Under main lines
for line, market in ((0.5, "OU05"), (1.5, "OU15"),
(2.5, "OU25"), (3.5, "OU35"), (4.5, "OU45")):
if total > line:
for p in ("Üst", "Ust", "Over", "OVER"):
consistent.add((market, p))
elif total < line:
for p in ("Alt", "Under", "UNDER"):
consistent.add((market, p))
# total == line → push, neither side wins → don't add
# BTTS — both teams score
if fh > 0 and fa > 0:
for p in ("Var", "KG Var", "Yes", "YES"):
consistent.add(("BTTS", p))
else:
for p in ("Yok", "KG Yok", "No", "NO"):
consistent.add(("BTTS", p))
# OE — total goals odd/even
if total % 2 == 1:
for p in ("Tek", "Odd", "ODD"):
consistent.add(("OE", p))
else:
for p in ("Çift", "Cift", "Even", "EVEN"):
consistent.add(("OE", p))
# HT-only markets (need HT score)
if ht is not None:
hh, ha = ht
ht_total = hh + ha
if hh > ha:
consistent.add(("HT", "1"))
elif hh < ha:
consistent.add(("HT", "2"))
else:
consistent.add(("HT", "X"))
for line, market in ((0.5, "HT_OU05"), (1.5, "HT_OU15"), (2.5, "HT_OU25")):
if ht_total > line:
for p in ("Üst", "Ust", "Over"):
consistent.add((market, p))
elif ht_total < line:
for p in ("Alt", "Under"):
consistent.add((market, p))
# HTFT — single combo
ht_o = "1" if hh > ha else "2" if hh < ha else "X"
ft_o = "1" if fh > fa else "2" if fh < fa else "X"
consistent.add(("HTFT", f"{ht_o}/{ft_o}"))
consistent.add(("HTFT", f"{ht_o}{ft_o}"))
return consistent
def _triple_value(self, package: Dict[str, Any], key: Optional[str]) -> Optional[Dict[str, Any]]:
if not key:
return None
value = ((package.get("v27_engine") or {}).get("triple_value") or {}).get(key)
return value if isinstance(value, dict) else None
def _triple_key(self, market: str, pick: str) -> Optional[str]:
prob_key = self._prob_key(market, pick)
if market == "MS":
return {"1": "home", "2": "away"}.get(pick)
if market == "DC" and pick.upper() in {"1X", "X2", "12"}:
return f"dc_{pick.lower()}"
if market in {"OU15", "OU25", "OU35"} and prob_key == "over":
return f"{market.lower()}_over"
if market == "BTTS" and prob_key == "yes":
return "btts_yes"
if market == "HT":
return {"1": "ht_home", "2": "ht_away"}.get(pick)
if market in {"HT_OU05", "HT_OU15"} and prob_key == "over":
return f"{market.lower()}_over"
if market == "OE" and prob_key == "odd":
return "oe_odd"
if market == "CARDS" and prob_key == "over":
return "cards_over"
if market == "HTFT" and "/" in pick:
return f"htft_{pick.replace('/', '').lower()}"
return None
@staticmethod
def _prob_key(market: str, pick: str) -> Optional[str]:
norm = str(pick or "").strip().casefold()
if market in {"MS", "HT", "HCAP"}:
return pick if pick in {"1", "X", "2"} else None
if market == "DC":
return pick.upper() if pick.upper() in {"1X", "X2", "12"} else None
if market in {"OU15", "OU25", "OU35", "HT_OU05", "HT_OU15", "CARDS"}:
if "over" in norm or "ust" in norm or "üst" in norm:
return "over"
if "under" in norm or "alt" in norm:
return "under"
if market == "BTTS":
if "yes" in norm or "var" in norm:
return "yes"
if "no" in norm or "yok" in norm:
return "no"
if market == "OE":
if "odd" in norm or "tek" in norm:
return "odd"
if "even" in norm or "cift" in norm or "çift" in norm:
return "even"
if market == "HTFT" and "/" in pick:
return pick
return None
@staticmethod
def _safe_float(value: Any, default: Optional[float] = None) -> Optional[float]:
try:
return float(value)
except (TypeError, ValueError):
return default