add predictions page with game theory analysis and card stats

Bayesian next-chair predictor (Markov chains, base rate, streak regression),
statistical tests (chi-squared, runs test, autocorrelation), theory
backtesting with rolling accuracy, and card-level analysis (value/suit
distribution, face card frequency, top winning cards).
This commit is contained in:
2026-02-25 23:16:37 +05:00
parent d8ec792a88
commit b07b073cc0
6 changed files with 1003 additions and 0 deletions

458
app/db.py
View File

@@ -4,6 +4,7 @@ ClickHouse database operations.
import json
import logging
import math
import threading
import time
import clickhouse_connect
@@ -816,3 +817,460 @@ def get_hot_cold_players(n: int = 5) -> dict:
cold = [p for p in all_players if p["pnl"] < 0][-n:]
cold.reverse() # most negative first
return {"hot": hot, "cold": cold}
# ---------------------------------------------------------------------------
# Prediction helpers (private, called inside the locked main function)
# ---------------------------------------------------------------------------
CHAIR_LABELS = ("A", "B", "C")
def _normal_cdf(x):
"""Abramowitz-Stegun approximation of the standard normal CDF."""
if x < -8:
return 0.0
if x > 8:
return 1.0
t = 1.0 / (1.0 + 0.2316419 * abs(x))
d = 0.3989422804014327 # 1/sqrt(2*pi)
p = d * math.exp(-x * x / 2.0) * (
t * (0.319381530 + t * (-0.356563782 + t * (1.781477937 + t * (-1.821255978 + t * 1.330274429))))
)
return 1.0 - p if x > 0 else p
def _markov_matrix_1(winners):
"""1st-order Markov transition matrix P(next | last)."""
counts = {a: {b: 0 for b in CHAIR_LABELS} for a in CHAIR_LABELS}
for i in range(len(winners) - 1):
prev, cur = winners[i], winners[i + 1]
if prev in counts and cur in CHAIR_LABELS:
counts[prev][cur] += 1
matrix = {}
for src in CHAIR_LABELS:
total = sum(counts[src].values())
matrix[src] = {dst: round(counts[src][dst] / total, 4) if total else 0 for dst in CHAIR_LABELS}
return matrix, counts
def _markov_matrix_2(winners):
"""2nd-order Markov transition matrix P(next | last two)."""
counts = {}
for a in CHAIR_LABELS:
for b in CHAIR_LABELS:
key = f"{a}{b}"
counts[key] = {c: 0 for c in CHAIR_LABELS}
for i in range(len(winners) - 2):
key = f"{winners[i]}{winners[i+1]}"
nxt = winners[i + 2]
if key in counts and nxt in CHAIR_LABELS:
counts[key][nxt] += 1
matrix = {}
for key in counts:
total = sum(counts[key].values())
matrix[key] = {dst: round(counts[key][dst] / total, 4) if total else 0 for dst in CHAIR_LABELS}
return matrix, counts
def _autocorrelation(winners, max_lag=5):
"""Pearson autocorrelation at lags 1..max_lag. Chairs encoded A=0,B=1,C=2."""
mapping = {"A": 0, "B": 1, "C": 2}
seq = [mapping.get(w, 0) for w in winners]
n = len(seq)
if n < max_lag + 2:
return [{"lag": i + 1, "r": 0, "significant": False} for i in range(max_lag)]
mean = sum(seq) / n
var = sum((x - mean) ** 2 for x in seq)
results = []
for lag in range(1, max_lag + 1):
if var == 0:
results.append({"lag": lag, "r": 0, "significant": False})
continue
cov = sum((seq[i] - mean) * (seq[i + lag] - mean) for i in range(n - lag))
r = round(cov / var, 4)
threshold = 1.96 / math.sqrt(n)
results.append({"lag": lag, "r": r, "significant": abs(r) > threshold})
return results
def _chi_squared_test(winners):
"""Chi-squared goodness-of-fit for uniform chair distribution (df=2)."""
n = len(winners)
if n == 0:
return {"chi2": 0, "p_value": 1, "significant": False, "counts": {c: 0 for c in CHAIR_LABELS}}
observed = {c: 0 for c in CHAIR_LABELS}
for w in winners:
if w in observed:
observed[w] += 1
expected = n / 3.0
chi2 = sum((observed[c] - expected) ** 2 / expected for c in CHAIR_LABELS)
p_value = math.exp(-chi2 / 2.0) # df=2 closed-form
return {
"chi2": round(chi2, 4),
"p_value": round(p_value, 6),
"significant": p_value < 0.05,
"counts": observed,
"expected": round(expected, 1),
}
def _runs_test(winners):
"""Wald-Wolfowitz runs test for randomness."""
if len(winners) < 10:
return {"runs": 0, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
# Count runs (sequences of same chair)
runs = 1
for i in range(1, len(winners)):
if winners[i] != winners[i - 1]:
runs += 1
n = len(winners)
counts = {c: 0 for c in CHAIR_LABELS}
for w in winners:
if w in counts:
counts[w] += 1
# Expected runs and variance for k categories
n_vals = [counts[c] for c in CHAIR_LABELS if counts[c] > 0]
sum_ni2 = sum(ni ** 2 for ni in n_vals)
expected_runs = 1 + (n * n - sum_ni2) / n
if n <= 1:
return {"runs": runs, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
var_num = sum_ni2 * (sum_ni2 + n * n) - 2 * n * sum(ni ** 3 for ni in n_vals) - n ** 3
var_den = n * n * (n - 1)
variance = var_num / var_den if var_den > 0 else 1
if variance <= 0:
return {"runs": runs, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
z = (runs - expected_runs) / math.sqrt(variance)
p_value = 2 * (1 - _normal_cdf(abs(z)))
if p_value < 0.05:
interpretation = "Too few runs (streaky)" if z < 0 else "Too many runs (alternating)"
else:
interpretation = "Random (no significant pattern)"
return {
"runs": runs,
"expected_runs": round(expected_runs, 1),
"z_score": round(z, 4),
"p_value": round(p_value, 6),
"significant": p_value < 0.05,
"interpretation": interpretation,
}
def _bayesian_prediction(winners, markov1, markov2):
"""Weighted Bayesian prediction combining 5 signals."""
if len(winners) < 3:
return {c: round(1 / 3, 4) for c in CHAIR_LABELS}, {}
# Signal 1: Base rate (overall frequency) — 20%
total = len(winners)
base = {c: winners.count(c) / total for c in CHAIR_LABELS}
# Signal 2: 1st-order Markov — 30%
last = winners[-1]
m1 = markov1.get(last, {c: 1 / 3 for c in CHAIR_LABELS})
# Signal 3: 2nd-order Markov — 25%
key2 = f"{winners[-2]}{winners[-1]}"
m2 = markov2.get(key2, {c: 1 / 3 for c in CHAIR_LABELS})
# Signal 4: Recent 20-game frequency — 15%
recent = winners[-20:] if len(winners) >= 20 else winners
recent_total = len(recent)
rec = {c: recent.count(c) / recent_total for c in CHAIR_LABELS}
# Signal 5: Streak momentum/regression — 10%
streak_chair = winners[-1]
streak_len = 0
for w in reversed(winners):
if w == streak_chair:
streak_len += 1
else:
break
# Regression to mean: longer streaks → lower probability of continuation
streak = {}
for c in CHAIR_LABELS:
if c == streak_chair:
streak[c] = max(0.1, 1 / 3 - streak_len * 0.05)
else:
streak[c] = 0
# Normalize streak signal
s_total = sum(streak.values())
if s_total > 0:
streak = {c: streak[c] / s_total for c in CHAIR_LABELS}
else:
streak = {c: 1 / 3 for c in CHAIR_LABELS}
weights = {"base_rate": 0.20, "markov_1": 0.30, "markov_2": 0.25, "recent_20": 0.15, "streak": 0.10}
signals = {"base_rate": base, "markov_1": m1, "markov_2": m2, "recent_20": rec, "streak": streak}
combined = {c: 0 for c in CHAIR_LABELS}
for sig_name, weight in weights.items():
for c in CHAIR_LABELS:
combined[c] += weight * signals[sig_name].get(c, 1 / 3)
# Normalize
c_total = sum(combined.values())
if c_total > 0:
combined = {c: round(combined[c] / c_total, 4) for c in CHAIR_LABELS}
# Round signal values for output
signal_detail = {}
for sig_name, sig_vals in signals.items():
signal_detail[sig_name] = {
"weight": weights[sig_name],
"probs": {c: round(sig_vals.get(c, 0), 4) for c in CHAIR_LABELS},
}
return combined, signal_detail
def _card_value_distribution(cards_data):
"""Count of each card value (AK) per chair."""
value_names = ["A", "2", "3", "4", "5", "6", "7", "8", "9", "10", "J", "Q", "K"]
dist = {c: {v: 0 for v in value_names} for c in CHAIR_LABELS}
for cards_json_str, _ in cards_data:
try:
infos = json.loads(cards_json_str)
except (json.JSONDecodeError, TypeError):
continue
for p in infos:
chair = config.CHAIRS.get(p.get("country"), None)
if chair not in dist:
continue
for card in p.get("cards", []):
val = config.VALUES.get(card.get("cardValue"), None)
if val and val in dist[chair]:
dist[chair][val] += 1
return {"labels": value_names, "chairs": dist}
def _face_card_frequency(cards_data):
"""Percentage of face cards (J, Q, K, A) per chair."""
face_vals = {"J", "Q", "K", "A"}
face_counts = {c: 0 for c in CHAIR_LABELS}
total_counts = {c: 0 for c in CHAIR_LABELS}
for cards_json_str, _ in cards_data:
try:
infos = json.loads(cards_json_str)
except (json.JSONDecodeError, TypeError):
continue
for p in infos:
chair = config.CHAIRS.get(p.get("country"), None)
if chair not in face_counts:
continue
for card in p.get("cards", []):
val = config.VALUES.get(card.get("cardValue"), None)
if val:
total_counts[chair] += 1
if val in face_vals:
face_counts[chair] += 1
result = {}
for c in CHAIR_LABELS:
pct = round(face_counts[c] / total_counts[c] * 100, 2) if total_counts[c] else 0
result[c] = {"face_cards": face_counts[c], "total_cards": total_counts[c], "pct": pct}
return result
def _suit_distribution(cards_data):
"""Suit counts per chair."""
suit_names = ["\u2660", "\u2665", "\u2663", "\u2666"]
dist = {c: {s: 0 for s in suit_names} for c in CHAIR_LABELS}
for cards_json_str, _ in cards_data:
try:
infos = json.loads(cards_json_str)
except (json.JSONDecodeError, TypeError):
continue
for p in infos:
chair = config.CHAIRS.get(p.get("country"), None)
if chair not in dist:
continue
for card in p.get("cards", []):
suit = config.SUITS.get(card.get("cardColor"), None)
if suit and suit in dist[chair]:
dist[chair][suit] += 1
return {"labels": suit_names, "chairs": dist}
def _winning_card_patterns(cards_data):
"""Top 20 individual cards appearing in winning hands."""
card_counts = {}
for cards_json_str, winner in cards_data:
try:
infos = json.loads(cards_json_str)
except (json.JSONDecodeError, TypeError):
continue
for p in infos:
chair = config.CHAIRS.get(p.get("country"), None)
if chair is None:
continue
# Check if this chair won: winner is stored as chair_id (1=C, 2=B, 3=A)
if config.CHAIRS.get(winner) != chair:
continue
for card in p.get("cards", []):
val = config.VALUES.get(card.get("cardValue"), None)
suit = config.SUITS.get(card.get("cardColor"), None)
if val and suit:
label = f"{val}{suit}"
card_counts[label] = card_counts.get(label, 0) + 1
sorted_cards = sorted(card_counts.items(), key=lambda x: x[1], reverse=True)[:20]
return [{"card": c, "count": n} for c, n in sorted_cards]
def _backtest_theories(winners):
"""Backtest all prediction theories on historical data."""
warmup = 30
if len(winners) <= warmup:
return {"error": "Not enough data for backtesting"}
theories = ["base_rate", "markov_1", "markov_2", "recent_20", "streak", "combined"]
correct = {t: 0 for t in theories}
total_tested = 0
rolling = {t: [] for t in theories} # rolling accuracy over last 200
for i in range(warmup, len(winners)):
history = winners[:i]
actual = winners[i]
total_tested += 1
total_h = len(history)
# Base rate
base = {c: history.count(c) / total_h for c in CHAIR_LABELS}
base_pick = max(CHAIR_LABELS, key=lambda c: base[c])
# Markov-1
m1, _ = _markov_matrix_1(history)
last = history[-1]
m1_probs = m1.get(last, {c: 1 / 3 for c in CHAIR_LABELS})
m1_pick = max(CHAIR_LABELS, key=lambda c: m1_probs.get(c, 0))
# Markov-2
m2, _ = _markov_matrix_2(history)
key2 = f"{history[-2]}{history[-1]}"
m2_probs = m2.get(key2, {c: 1 / 3 for c in CHAIR_LABELS})
m2_pick = max(CHAIR_LABELS, key=lambda c: m2_probs.get(c, 0))
# Recent-20
recent = history[-20:] if len(history) >= 20 else history
rec = {c: recent.count(c) / len(recent) for c in CHAIR_LABELS}
rec_pick = max(CHAIR_LABELS, key=lambda c: rec[c])
# Streak
streak_chair = history[-1]
streak_len = 0
for w in reversed(history):
if w == streak_chair:
streak_len += 1
else:
break
streak_probs = {}
for c in CHAIR_LABELS:
if c == streak_chair:
streak_probs[c] = max(0.1, 1 / 3 - streak_len * 0.05)
else:
streak_probs[c] = 0
s_total = sum(streak_probs.values())
if s_total > 0:
streak_probs = {c: streak_probs[c] / s_total for c in CHAIR_LABELS}
else:
streak_probs = {c: 1 / 3 for c in CHAIR_LABELS}
streak_pick = max(CHAIR_LABELS, key=lambda c: streak_probs[c])
# Combined Bayesian
combined = {c: 0 for c in CHAIR_LABELS}
weights = {"base_rate": 0.20, "markov_1": 0.30, "markov_2": 0.25, "recent_20": 0.15, "streak": 0.10}
signals = {"base_rate": base, "markov_1": m1_probs, "markov_2": m2_probs, "recent_20": rec, "streak": streak_probs}
for sig_name, weight in weights.items():
for c in CHAIR_LABELS:
combined[c] += weight * signals[sig_name].get(c, 1 / 3)
combined_pick = max(CHAIR_LABELS, key=lambda c: combined[c])
picks = {
"base_rate": base_pick, "markov_1": m1_pick, "markov_2": m2_pick,
"recent_20": rec_pick, "streak": streak_pick, "combined": combined_pick,
}
for t in theories:
hit = 1 if picks[t] == actual else 0
if picks[t] == actual:
correct[t] += 1
rolling[t].append(hit)
accuracy = {t: round(correct[t] / total_tested * 100, 2) if total_tested else 0 for t in theories}
# Rolling accuracy over last 200 games
window = 200
rolling_accuracy = {t: [] for t in theories}
for t in theories:
data = rolling[t]
for j in range(len(data)):
start = max(0, j - window + 1)
chunk = data[start:j + 1]
rolling_accuracy[t].append(round(sum(chunk) / len(chunk) * 100, 2))
# Only keep last 200 points for the chart
for t in theories:
rolling_accuracy[t] = rolling_accuracy[t][-window:]
return {
"total_tested": total_tested,
"accuracy": accuracy,
"rolling_accuracy": rolling_accuracy,
"random_baseline": 33.33,
}
@_with_lock
def get_prediction_analysis() -> dict:
"""Run all prediction/game-theory analysis and return results."""
client = get_client()
# Query 1: Full winner sequence
result = client.query("SELECT winner FROM games ORDER BY game_no ASC")
winners = [config.CHAIRS.get(r[0], "?") for r in result.result_rows]
winners = [w for w in winners if w in CHAIR_LABELS] # filter unknowns
# Query 2: Card data for last 500 games
cards_result = client.query(
"SELECT cards_json, winner FROM games WHERE cards_json != '' ORDER BY game_no DESC LIMIT 500"
)
cards_data = [(r[0], r[1]) for r in cards_result.result_rows]
# Markov matrices
markov1, markov1_counts = _markov_matrix_1(winners)
markov2, markov2_counts = _markov_matrix_2(winners)
# Autocorrelation
autocorrelation = _autocorrelation(winners)
# Chi-squared test
chi_squared = _chi_squared_test(winners)
# Runs test
runs_test = _runs_test(winners)
# Bayesian prediction
prediction, signals = _bayesian_prediction(winners, markov1, markov2)
# Backtesting
backtest = _backtest_theories(winners)
# Card analysis
card_values = _card_value_distribution(cards_data)
face_cards = _face_card_frequency(cards_data)
suits = _suit_distribution(cards_data)
winning_cards = _winning_card_patterns(cards_data)
return {
"total_games": len(winners),
"last_winners": winners[-10:] if len(winners) >= 10 else winners,
"prediction": prediction,
"signals": signals,
"markov1": {"matrix": markov1, "counts": {k: dict(v) for k, v in markov1_counts.items()}},
"markov2": {"matrix": markov2, "counts": {k: dict(v) for k, v in markov2_counts.items()}},
"autocorrelation": autocorrelation,
"chi_squared": chi_squared,
"runs_test": runs_test,
"backtest": backtest,
"card_values": card_values,
"face_cards": face_cards,
"suits": suits,
"winning_cards": winning_cards,
}