diff --git a/app/db.py b/app/db.py
index 031b8b50d..e73dd7d12 100644
--- a/app/db.py
+++ b/app/db.py
@@ -4,6 +4,7 @@ ClickHouse database operations.
import json
import logging
+import math
import threading
import time
import clickhouse_connect
@@ -816,3 +817,460 @@ def get_hot_cold_players(n: int = 5) -> dict:
cold = [p for p in all_players if p["pnl"] < 0][-n:]
cold.reverse() # most negative first
return {"hot": hot, "cold": cold}
+
+
+# ---------------------------------------------------------------------------
+# Prediction helpers (private, called inside the locked main function)
+# ---------------------------------------------------------------------------
+
+CHAIR_LABELS = ("A", "B", "C")
+
+
+def _normal_cdf(x):
+ """Abramowitz-Stegun approximation of the standard normal CDF."""
+ if x < -8:
+ return 0.0
+ if x > 8:
+ return 1.0
+ t = 1.0 / (1.0 + 0.2316419 * abs(x))
+ d = 0.3989422804014327 # 1/sqrt(2*pi)
+ p = d * math.exp(-x * x / 2.0) * (
+ t * (0.319381530 + t * (-0.356563782 + t * (1.781477937 + t * (-1.821255978 + t * 1.330274429))))
+ )
+ return 1.0 - p if x > 0 else p
+
+
+def _markov_matrix_1(winners):
+ """1st-order Markov transition matrix P(next | last)."""
+ counts = {a: {b: 0 for b in CHAIR_LABELS} for a in CHAIR_LABELS}
+ for i in range(len(winners) - 1):
+ prev, cur = winners[i], winners[i + 1]
+ if prev in counts and cur in CHAIR_LABELS:
+ counts[prev][cur] += 1
+ matrix = {}
+ for src in CHAIR_LABELS:
+ total = sum(counts[src].values())
+ matrix[src] = {dst: round(counts[src][dst] / total, 4) if total else 0 for dst in CHAIR_LABELS}
+ return matrix, counts
+
+
+def _markov_matrix_2(winners):
+ """2nd-order Markov transition matrix P(next | last two)."""
+ counts = {}
+ for a in CHAIR_LABELS:
+ for b in CHAIR_LABELS:
+ key = f"{a}{b}"
+ counts[key] = {c: 0 for c in CHAIR_LABELS}
+ for i in range(len(winners) - 2):
+ key = f"{winners[i]}{winners[i+1]}"
+ nxt = winners[i + 2]
+ if key in counts and nxt in CHAIR_LABELS:
+ counts[key][nxt] += 1
+ matrix = {}
+ for key in counts:
+ total = sum(counts[key].values())
+ matrix[key] = {dst: round(counts[key][dst] / total, 4) if total else 0 for dst in CHAIR_LABELS}
+ return matrix, counts
+
+
+def _autocorrelation(winners, max_lag=5):
+ """Pearson autocorrelation at lags 1..max_lag. Chairs encoded A=0,B=1,C=2."""
+ mapping = {"A": 0, "B": 1, "C": 2}
+ seq = [mapping.get(w, 0) for w in winners]
+ n = len(seq)
+ if n < max_lag + 2:
+ return [{"lag": i + 1, "r": 0, "significant": False} for i in range(max_lag)]
+ mean = sum(seq) / n
+ var = sum((x - mean) ** 2 for x in seq)
+ results = []
+ for lag in range(1, max_lag + 1):
+ if var == 0:
+ results.append({"lag": lag, "r": 0, "significant": False})
+ continue
+ cov = sum((seq[i] - mean) * (seq[i + lag] - mean) for i in range(n - lag))
+ r = round(cov / var, 4)
+ threshold = 1.96 / math.sqrt(n)
+ results.append({"lag": lag, "r": r, "significant": abs(r) > threshold})
+ return results
+
+
+def _chi_squared_test(winners):
+ """Chi-squared goodness-of-fit for uniform chair distribution (df=2)."""
+ n = len(winners)
+ if n == 0:
+ return {"chi2": 0, "p_value": 1, "significant": False, "counts": {c: 0 for c in CHAIR_LABELS}}
+ observed = {c: 0 for c in CHAIR_LABELS}
+ for w in winners:
+ if w in observed:
+ observed[w] += 1
+ expected = n / 3.0
+ chi2 = sum((observed[c] - expected) ** 2 / expected for c in CHAIR_LABELS)
+ p_value = math.exp(-chi2 / 2.0) # df=2 closed-form
+ return {
+ "chi2": round(chi2, 4),
+ "p_value": round(p_value, 6),
+ "significant": p_value < 0.05,
+ "counts": observed,
+ "expected": round(expected, 1),
+ }
+
+
+def _runs_test(winners):
+ """Wald-Wolfowitz runs test for randomness."""
+ if len(winners) < 10:
+ return {"runs": 0, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
+ # Count runs (sequences of same chair)
+ runs = 1
+ for i in range(1, len(winners)):
+ if winners[i] != winners[i - 1]:
+ runs += 1
+ n = len(winners)
+ counts = {c: 0 for c in CHAIR_LABELS}
+ for w in winners:
+ if w in counts:
+ counts[w] += 1
+ # Expected runs and variance for k categories
+ n_vals = [counts[c] for c in CHAIR_LABELS if counts[c] > 0]
+ sum_ni2 = sum(ni ** 2 for ni in n_vals)
+ expected_runs = 1 + (n * n - sum_ni2) / n
+ if n <= 1:
+ return {"runs": runs, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
+ var_num = sum_ni2 * (sum_ni2 + n * n) - 2 * n * sum(ni ** 3 for ni in n_vals) - n ** 3
+ var_den = n * n * (n - 1)
+ variance = var_num / var_den if var_den > 0 else 1
+ if variance <= 0:
+ return {"runs": runs, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
+ z = (runs - expected_runs) / math.sqrt(variance)
+ p_value = 2 * (1 - _normal_cdf(abs(z)))
+ if p_value < 0.05:
+ interpretation = "Too few runs (streaky)" if z < 0 else "Too many runs (alternating)"
+ else:
+ interpretation = "Random (no significant pattern)"
+ return {
+ "runs": runs,
+ "expected_runs": round(expected_runs, 1),
+ "z_score": round(z, 4),
+ "p_value": round(p_value, 6),
+ "significant": p_value < 0.05,
+ "interpretation": interpretation,
+ }
+
+
+def _bayesian_prediction(winners, markov1, markov2):
+ """Weighted Bayesian prediction combining 5 signals."""
+ if len(winners) < 3:
+ return {c: round(1 / 3, 4) for c in CHAIR_LABELS}, {}
+
+ # Signal 1: Base rate (overall frequency) — 20%
+ total = len(winners)
+ base = {c: winners.count(c) / total for c in CHAIR_LABELS}
+
+ # Signal 2: 1st-order Markov — 30%
+ last = winners[-1]
+ m1 = markov1.get(last, {c: 1 / 3 for c in CHAIR_LABELS})
+
+ # Signal 3: 2nd-order Markov — 25%
+ key2 = f"{winners[-2]}{winners[-1]}"
+ m2 = markov2.get(key2, {c: 1 / 3 for c in CHAIR_LABELS})
+
+ # Signal 4: Recent 20-game frequency — 15%
+ recent = winners[-20:] if len(winners) >= 20 else winners
+ recent_total = len(recent)
+ rec = {c: recent.count(c) / recent_total for c in CHAIR_LABELS}
+
+ # Signal 5: Streak momentum/regression — 10%
+ streak_chair = winners[-1]
+ streak_len = 0
+ for w in reversed(winners):
+ if w == streak_chair:
+ streak_len += 1
+ else:
+ break
+ # Regression to mean: longer streaks → lower probability of continuation
+ streak = {}
+ for c in CHAIR_LABELS:
+ if c == streak_chair:
+ streak[c] = max(0.1, 1 / 3 - streak_len * 0.05)
+ else:
+ streak[c] = 0
+ # Normalize streak signal
+ s_total = sum(streak.values())
+ if s_total > 0:
+ streak = {c: streak[c] / s_total for c in CHAIR_LABELS}
+ else:
+ streak = {c: 1 / 3 for c in CHAIR_LABELS}
+
+ weights = {"base_rate": 0.20, "markov_1": 0.30, "markov_2": 0.25, "recent_20": 0.15, "streak": 0.10}
+ signals = {"base_rate": base, "markov_1": m1, "markov_2": m2, "recent_20": rec, "streak": streak}
+
+ combined = {c: 0 for c in CHAIR_LABELS}
+ for sig_name, weight in weights.items():
+ for c in CHAIR_LABELS:
+ combined[c] += weight * signals[sig_name].get(c, 1 / 3)
+
+ # Normalize
+ c_total = sum(combined.values())
+ if c_total > 0:
+ combined = {c: round(combined[c] / c_total, 4) for c in CHAIR_LABELS}
+
+ # Round signal values for output
+ signal_detail = {}
+ for sig_name, sig_vals in signals.items():
+ signal_detail[sig_name] = {
+ "weight": weights[sig_name],
+ "probs": {c: round(sig_vals.get(c, 0), 4) for c in CHAIR_LABELS},
+ }
+
+ return combined, signal_detail
+
+
+def _card_value_distribution(cards_data):
+ """Count of each card value (A–K) per chair."""
+ value_names = ["A", "2", "3", "4", "5", "6", "7", "8", "9", "10", "J", "Q", "K"]
+ dist = {c: {v: 0 for v in value_names} for c in CHAIR_LABELS}
+ for cards_json_str, _ in cards_data:
+ try:
+ infos = json.loads(cards_json_str)
+ except (json.JSONDecodeError, TypeError):
+ continue
+ for p in infos:
+ chair = config.CHAIRS.get(p.get("country"), None)
+ if chair not in dist:
+ continue
+ for card in p.get("cards", []):
+ val = config.VALUES.get(card.get("cardValue"), None)
+ if val and val in dist[chair]:
+ dist[chair][val] += 1
+ return {"labels": value_names, "chairs": dist}
+
+
+def _face_card_frequency(cards_data):
+ """Percentage of face cards (J, Q, K, A) per chair."""
+ face_vals = {"J", "Q", "K", "A"}
+ face_counts = {c: 0 for c in CHAIR_LABELS}
+ total_counts = {c: 0 for c in CHAIR_LABELS}
+ for cards_json_str, _ in cards_data:
+ try:
+ infos = json.loads(cards_json_str)
+ except (json.JSONDecodeError, TypeError):
+ continue
+ for p in infos:
+ chair = config.CHAIRS.get(p.get("country"), None)
+ if chair not in face_counts:
+ continue
+ for card in p.get("cards", []):
+ val = config.VALUES.get(card.get("cardValue"), None)
+ if val:
+ total_counts[chair] += 1
+ if val in face_vals:
+ face_counts[chair] += 1
+ result = {}
+ for c in CHAIR_LABELS:
+ pct = round(face_counts[c] / total_counts[c] * 100, 2) if total_counts[c] else 0
+ result[c] = {"face_cards": face_counts[c], "total_cards": total_counts[c], "pct": pct}
+ return result
+
+
+def _suit_distribution(cards_data):
+ """Suit counts per chair."""
+ suit_names = ["\u2660", "\u2665", "\u2663", "\u2666"]
+ dist = {c: {s: 0 for s in suit_names} for c in CHAIR_LABELS}
+ for cards_json_str, _ in cards_data:
+ try:
+ infos = json.loads(cards_json_str)
+ except (json.JSONDecodeError, TypeError):
+ continue
+ for p in infos:
+ chair = config.CHAIRS.get(p.get("country"), None)
+ if chair not in dist:
+ continue
+ for card in p.get("cards", []):
+ suit = config.SUITS.get(card.get("cardColor"), None)
+ if suit and suit in dist[chair]:
+ dist[chair][suit] += 1
+ return {"labels": suit_names, "chairs": dist}
+
+
+def _winning_card_patterns(cards_data):
+ """Top 20 individual cards appearing in winning hands."""
+ card_counts = {}
+ for cards_json_str, winner in cards_data:
+ try:
+ infos = json.loads(cards_json_str)
+ except (json.JSONDecodeError, TypeError):
+ continue
+ for p in infos:
+ chair = config.CHAIRS.get(p.get("country"), None)
+ if chair is None:
+ continue
+ # Check if this chair won: winner is stored as chair_id (1=C, 2=B, 3=A)
+ if config.CHAIRS.get(winner) != chair:
+ continue
+ for card in p.get("cards", []):
+ val = config.VALUES.get(card.get("cardValue"), None)
+ suit = config.SUITS.get(card.get("cardColor"), None)
+ if val and suit:
+ label = f"{val}{suit}"
+ card_counts[label] = card_counts.get(label, 0) + 1
+ sorted_cards = sorted(card_counts.items(), key=lambda x: x[1], reverse=True)[:20]
+ return [{"card": c, "count": n} for c, n in sorted_cards]
+
+
+def _backtest_theories(winners):
+ """Backtest all prediction theories on historical data."""
+ warmup = 30
+ if len(winners) <= warmup:
+ return {"error": "Not enough data for backtesting"}
+
+ theories = ["base_rate", "markov_1", "markov_2", "recent_20", "streak", "combined"]
+ correct = {t: 0 for t in theories}
+ total_tested = 0
+ rolling = {t: [] for t in theories} # rolling accuracy over last 200
+
+ for i in range(warmup, len(winners)):
+ history = winners[:i]
+ actual = winners[i]
+ total_tested += 1
+
+ total_h = len(history)
+ # Base rate
+ base = {c: history.count(c) / total_h for c in CHAIR_LABELS}
+ base_pick = max(CHAIR_LABELS, key=lambda c: base[c])
+
+ # Markov-1
+ m1, _ = _markov_matrix_1(history)
+ last = history[-1]
+ m1_probs = m1.get(last, {c: 1 / 3 for c in CHAIR_LABELS})
+ m1_pick = max(CHAIR_LABELS, key=lambda c: m1_probs.get(c, 0))
+
+ # Markov-2
+ m2, _ = _markov_matrix_2(history)
+ key2 = f"{history[-2]}{history[-1]}"
+ m2_probs = m2.get(key2, {c: 1 / 3 for c in CHAIR_LABELS})
+ m2_pick = max(CHAIR_LABELS, key=lambda c: m2_probs.get(c, 0))
+
+ # Recent-20
+ recent = history[-20:] if len(history) >= 20 else history
+ rec = {c: recent.count(c) / len(recent) for c in CHAIR_LABELS}
+ rec_pick = max(CHAIR_LABELS, key=lambda c: rec[c])
+
+ # Streak
+ streak_chair = history[-1]
+ streak_len = 0
+ for w in reversed(history):
+ if w == streak_chair:
+ streak_len += 1
+ else:
+ break
+ streak_probs = {}
+ for c in CHAIR_LABELS:
+ if c == streak_chair:
+ streak_probs[c] = max(0.1, 1 / 3 - streak_len * 0.05)
+ else:
+ streak_probs[c] = 0
+ s_total = sum(streak_probs.values())
+ if s_total > 0:
+ streak_probs = {c: streak_probs[c] / s_total for c in CHAIR_LABELS}
+ else:
+ streak_probs = {c: 1 / 3 for c in CHAIR_LABELS}
+ streak_pick = max(CHAIR_LABELS, key=lambda c: streak_probs[c])
+
+ # Combined Bayesian
+ combined = {c: 0 for c in CHAIR_LABELS}
+ weights = {"base_rate": 0.20, "markov_1": 0.30, "markov_2": 0.25, "recent_20": 0.15, "streak": 0.10}
+ signals = {"base_rate": base, "markov_1": m1_probs, "markov_2": m2_probs, "recent_20": rec, "streak": streak_probs}
+ for sig_name, weight in weights.items():
+ for c in CHAIR_LABELS:
+ combined[c] += weight * signals[sig_name].get(c, 1 / 3)
+ combined_pick = max(CHAIR_LABELS, key=lambda c: combined[c])
+
+ picks = {
+ "base_rate": base_pick, "markov_1": m1_pick, "markov_2": m2_pick,
+ "recent_20": rec_pick, "streak": streak_pick, "combined": combined_pick,
+ }
+ for t in theories:
+ hit = 1 if picks[t] == actual else 0
+ if picks[t] == actual:
+ correct[t] += 1
+ rolling[t].append(hit)
+
+ accuracy = {t: round(correct[t] / total_tested * 100, 2) if total_tested else 0 for t in theories}
+
+ # Rolling accuracy over last 200 games
+ window = 200
+ rolling_accuracy = {t: [] for t in theories}
+ for t in theories:
+ data = rolling[t]
+ for j in range(len(data)):
+ start = max(0, j - window + 1)
+ chunk = data[start:j + 1]
+ rolling_accuracy[t].append(round(sum(chunk) / len(chunk) * 100, 2))
+ # Only keep last 200 points for the chart
+ for t in theories:
+ rolling_accuracy[t] = rolling_accuracy[t][-window:]
+
+ return {
+ "total_tested": total_tested,
+ "accuracy": accuracy,
+ "rolling_accuracy": rolling_accuracy,
+ "random_baseline": 33.33,
+ }
+
+
+@_with_lock
+def get_prediction_analysis() -> dict:
+ """Run all prediction/game-theory analysis and return results."""
+ client = get_client()
+
+ # Query 1: Full winner sequence
+ result = client.query("SELECT winner FROM games ORDER BY game_no ASC")
+ winners = [config.CHAIRS.get(r[0], "?") for r in result.result_rows]
+ winners = [w for w in winners if w in CHAIR_LABELS] # filter unknowns
+
+ # Query 2: Card data for last 500 games
+ cards_result = client.query(
+ "SELECT cards_json, winner FROM games WHERE cards_json != '' ORDER BY game_no DESC LIMIT 500"
+ )
+ cards_data = [(r[0], r[1]) for r in cards_result.result_rows]
+
+ # Markov matrices
+ markov1, markov1_counts = _markov_matrix_1(winners)
+ markov2, markov2_counts = _markov_matrix_2(winners)
+
+ # Autocorrelation
+ autocorrelation = _autocorrelation(winners)
+
+ # Chi-squared test
+ chi_squared = _chi_squared_test(winners)
+
+ # Runs test
+ runs_test = _runs_test(winners)
+
+ # Bayesian prediction
+ prediction, signals = _bayesian_prediction(winners, markov1, markov2)
+
+ # Backtesting
+ backtest = _backtest_theories(winners)
+
+ # Card analysis
+ card_values = _card_value_distribution(cards_data)
+ face_cards = _face_card_frequency(cards_data)
+ suits = _suit_distribution(cards_data)
+ winning_cards = _winning_card_patterns(cards_data)
+
+ return {
+ "total_games": len(winners),
+ "last_winners": winners[-10:] if len(winners) >= 10 else winners,
+ "prediction": prediction,
+ "signals": signals,
+ "markov1": {"matrix": markov1, "counts": {k: dict(v) for k, v in markov1_counts.items()}},
+ "markov2": {"matrix": markov2, "counts": {k: dict(v) for k, v in markov2_counts.items()}},
+ "autocorrelation": autocorrelation,
+ "chi_squared": chi_squared,
+ "runs_test": runs_test,
+ "backtest": backtest,
+ "card_values": card_values,
+ "face_cards": face_cards,
+ "suits": suits,
+ "winning_cards": winning_cards,
+ }
diff --git a/app/server.py b/app/server.py
index 460078294..af29ca3ff 100644
--- a/app/server.py
+++ b/app/server.py
@@ -43,6 +43,8 @@ class WebServer:
self.app.router.add_get("/api/analytics", self._handle_analytics)
self.app.router.add_get("/patterns", self._handle_patterns_page)
self.app.router.add_get("/api/patterns", self._handle_patterns)
+ self.app.router.add_get("/predictions", self._handle_predictions_page)
+ self.app.router.add_get("/api/predictions", self._handle_predictions)
self.app.router.add_get("/ws", self._handle_ws)
self.app.router.add_static("/static/", STATIC_DIR, name="static")
@@ -104,6 +106,18 @@ class WebServer:
log.error("Pattern analysis query failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
+ async def _handle_predictions_page(self, request: web.Request) -> web.Response:
+ path = os.path.join(STATIC_DIR, "predictions.html")
+ return web.FileResponse(path)
+
+ async def _handle_predictions(self, request: web.Request) -> web.Response:
+ try:
+ data = await _run_sync(db.get_prediction_analysis)
+ return web.json_response(data)
+ except Exception as e:
+ log.error("Prediction analysis query failed: %s", e)
+ return web.json_response({"error": str(e)}, status=500)
+
async def _handle_analytics(self, request: web.Request) -> web.Response:
period = request.query.get("period", "all")
if period not in ("1h", "6h", "24h", "7d", "all"):
diff --git a/static/analytics.html b/static/analytics.html
index 0524bb656..4681982e2 100644
--- a/static/analytics.html
+++ b/static/analytics.html
@@ -215,6 +215,7 @@
diff --git a/static/index.html b/static/index.html
index 35f954c78..0be961ffb 100644
--- a/static/index.html
+++ b/static/index.html
@@ -603,6 +603,7 @@
diff --git a/static/patterns.html b/static/patterns.html
index ca2c0f7b6..7de58d5c9 100644
--- a/static/patterns.html
+++ b/static/patterns.html
@@ -123,6 +123,7 @@
diff --git a/static/predictions.html b/static/predictions.html
new file mode 100644
index 000000000..49e504e14
--- /dev/null
+++ b/static/predictions.html
@@ -0,0 +1,528 @@
+
+
+
+
+
+Predictions & Game Theory
+
+
+
+
+
+
+
Loading prediction data...
+
+
+
+
+
Bayesian Next-Chair Prediction
+
+
+
+
+
+
+
Markov Transition Matrices
+
+
+
1st Order — P(next | last)
+
+
+
+
2nd Order — P(next | last two)
+
+
+
+
+
+
+
+
Autocorrelation (Lags 1–5)
+
+
+
+
+
+
Statistical Tests
+
+
+
Chi-Squared Goodness of Fit
+
+
+
+
Wald-Wolfowitz Runs Test
+
+
+
+
+
+
+
+
Theory Backtesting
+
+
+
Rolling Accuracy (Last 200 Games)
+
+
+
+
+
+
+
Card Value Distribution by Chair (Last 500 Games)
+
+
+
+
+
+
Face Card Frequency (J, Q, K, A)
+
+
+
+
+
+
Suit Distribution by Chair
+
+
+
+
+
+
Top 20 Cards in Winning Hands
+
+
+
+
+
+
+
+
+