Files
3pmonitor/app/db.py
Junaid Saeed Uppal 2c7cd67ab7 fix streak signal, reweight predictions, and reorder UI
- Fix streak signal: was giving 100% to streak chair after normalization
  (non-streak chairs were 0), now properly distributes probability with
  streak chair getting less as streak grows (actual mean reversion)
- Change recent window from 20 to 50 games
- Reweight signals based on backtest: base_rate 0.15→0.20 (best performer),
  recent 0.10→0.15, streak 0.10→0.05, balance 0.15→0.10
- Move Live Market Sentiment above Signal Breakdown
2026-02-26 10:36:53 +05:00

1616 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ClickHouse database operations.
"""
import json
import logging
import math
import threading
import time
import clickhouse_connect
from . import config
log = logging.getLogger(__name__)
_client = None
_lock = threading.Lock()
_migrations_applied = False
def get_client():
global _client
if _client is None:
for attempt in range(30):
try:
_client = clickhouse_connect.get_client(
host=config.CLICKHOUSE_HOST,
port=config.CLICKHOUSE_PORT,
)
_client.ping()
log.info("Connected to ClickHouse at %s:%s", config.CLICKHOUSE_HOST, config.CLICKHOUSE_PORT)
return _client
except Exception as e:
log.warning("ClickHouse not ready (attempt %d): %s", attempt + 1, e)
time.sleep(2)
raise RuntimeError("Could not connect to ClickHouse")
return _client
def run_migrations():
"""Run one-time data migrations on startup."""
global _migrations_applied
if _migrations_applied:
return
client = get_client()
client.command(
"CREATE TABLE IF NOT EXISTS _migrations ("
" name String, applied_at DateTime DEFAULT now()"
") ENGINE = MergeTree() ORDER BY name"
)
result = client.query(
"SELECT count() FROM _migrations WHERE name = 'swap_ac_chairs'"
)
if result.result_rows[0][0] == 0:
log.info("Running migration: swap_ac_chairs")
client.command(
"ALTER TABLE games UPDATE "
"hand_a = hand_c, hand_c = hand_a, "
"bet_a = bet_c, bet_c = bet_a, "
"hand_type_a = hand_type_c, hand_type_c = hand_type_a "
"WHERE 1=1"
)
client.insert("_migrations", [["swap_ac_chairs"]], column_names=["name"])
log.info("Migration swap_ac_chairs applied")
# Ensure visitors table exists (for existing deployments)
client.command(
"CREATE TABLE IF NOT EXISTS visitors ("
" ip String, country String, path String, method String,"
" user_agent String, referer String, accept_lang String,"
" created_at DateTime DEFAULT now()"
") ENGINE = MergeTree() ORDER BY (created_at, ip)"
" TTL created_at + INTERVAL 90 DAY"
)
_migrations_applied = True
def _with_lock(fn):
"""Decorator to serialize all ClickHouse operations."""
def wrapper(*args, **kwargs):
with _lock:
return fn(*args, **kwargs)
wrapper.__name__ = fn.__name__
return wrapper
@_with_lock
def insert_game(game: dict):
"""Insert a completed round into the games table."""
client = get_client()
client.insert("games",
[[
game["game_no"],
game["winner"],
game["total_pot"],
game.get("bet_a", 0),
game.get("bet_b", 0),
game.get("bet_c", 0),
game.get("hand_a", ""),
game.get("hand_b", ""),
game.get("hand_c", ""),
game.get("hand_type_a", 0),
game.get("hand_type_b", 0),
game.get("hand_type_c", 0),
game.get("cards_json", ""),
game.get("duration_s", 0),
]],
column_names=[
"game_no", "winner", "total_pot",
"bet_a", "bet_b", "bet_c",
"hand_a", "hand_b", "hand_c",
"hand_type_a", "hand_type_b", "hand_type_c",
"cards_json", "duration_s",
],
)
@_with_lock
def insert_bet(bet: dict):
"""Insert an individual user bet into the bets table."""
client = get_client()
client.insert("bets",
[[
bet["game_no"],
bet["user_id"],
bet["chair"],
bet["bet_amount"],
bet["total_bet"],
]],
column_names=["game_no", "user_id", "chair", "bet_amount", "total_bet"],
)
@_with_lock
def upsert_user(user: dict):
"""Insert/update a user profile."""
client = get_client()
client.insert("users",
[[
user["user_id"],
user.get("nick_name", ""),
user.get("rich_level", 0),
user.get("actor_level", 0),
user.get("gender", 0),
user.get("consume_total", 0),
user.get("earn_total", 0),
user.get("is_actor", 0),
user.get("portrait", ""),
]],
column_names=[
"user_id", "nick_name", "rich_level", "actor_level",
"gender", "consume_total", "earn_total", "is_actor", "portrait",
],
)
@_with_lock
def insert_visitors(batch: list[dict]):
"""Bulk insert visitor records."""
if not batch:
return
client = get_client()
rows = [
[
v.get("ip", ""),
v.get("country", ""),
v.get("path", ""),
v.get("method", ""),
v.get("user_agent", ""),
v.get("referer", ""),
v.get("accept_lang", ""),
]
for v in batch
]
client.insert(
"visitors",
rows,
column_names=["ip", "country", "path", "method", "user_agent", "referer", "accept_lang"],
)
@_with_lock
def get_recent_visitors(limit: int = 200) -> list[dict]:
"""Get recent visitor log entries."""
client = get_client()
result = client.query(
"SELECT ip, country, path, method, user_agent, referer, accept_lang, created_at "
"FROM visitors ORDER BY created_at DESC LIMIT {limit:UInt32}",
parameters={"limit": limit},
)
visitors = []
for row in result.result_rows:
visitors.append({
"ip": row[0],
"country": row[1],
"path": row[2],
"method": row[3],
"user_agent": row[4],
"referer": row[5],
"accept_lang": row[6],
"created_at": str(row[7]),
})
return visitors
@_with_lock
def get_recent_games(n: int = 50) -> list[dict]:
"""Get last N completed games."""
client = get_client()
result = client.query(
"SELECT game_no, winner, total_pot, bet_a, bet_b, bet_c, "
"hand_a, hand_b, hand_c, hand_type_a, hand_type_b, hand_type_c, "
"cards_json, duration_s, created_at "
"FROM games ORDER BY game_no DESC LIMIT {n:UInt32}",
parameters={"n": n},
)
games = []
for row in result.result_rows:
games.append({
"game_no": row[0],
"winner": row[1],
"total_pot": row[2],
"bet_a": row[3],
"bet_b": row[4],
"bet_c": row[5],
"hand_a": row[6],
"hand_b": row[7],
"hand_c": row[8],
"hand_type_a": row[9],
"hand_type_b": row[10],
"hand_type_c": row[11],
"cards_json": row[12],
"duration_s": row[13],
"created_at": str(row[14]),
})
return games
@_with_lock
def get_leaderboard(n: int = 10) -> list[dict]:
"""
Get top N users by P&L.
P&L = sum of winning bets * 1.9 - sum of losing bets.
With 2.9x fixed payout: win → +1.9x bet, loss → -1.0x bet.
"""
client = get_client()
result = client.query(
"""
SELECT
b.user_id,
any(u.nick_name) AS nick_name,
count() AS total_bets,
countIf(b.chair = g.winner) AS wins,
countIf(b.chair != g.winner) AS losses,
toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9
- sumIf(b.bet_amount, b.chair != g.winner)) AS pnl,
sum(b.bet_amount) AS total_wagered
FROM bets b
JOIN games g ON b.game_no = g.game_no
LEFT JOIN users u ON b.user_id = u.user_id
GROUP BY b.user_id
HAVING total_bets >= 3
ORDER BY pnl DESC
LIMIT {n:UInt32}
""",
parameters={"n": n},
)
leaders = []
for row in result.result_rows:
leaders.append({
"user_id": row[0],
"nick_name": row[1] or str(row[0]),
"total_bets": row[2],
"wins": row[3],
"losses": row[4],
"pnl": row[5],
"total_wagered": row[6],
})
return leaders
@_with_lock
def get_win_distribution() -> dict:
"""Get win counts per chair + bet rank distribution."""
client = get_client()
result = client.query(
"SELECT winner, count() AS cnt FROM games GROUP BY winner ORDER BY winner"
)
dist = {"A": 0, "B": 0, "C": 0}
for row in result.result_rows:
chair = config.CHAIRS.get(row[0], "?")
if chair in dist:
dist[chair] = row[1]
# Bet rank distribution: how often the winning chair had high/mid/low bet
rank_result = client.query(
"""
SELECT
countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high,
countIf(winner_bet > least(bet_a, bet_b, bet_c)
AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid,
countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low
FROM (
SELECT
bet_a, bet_b, bet_c,
multiIf(winner = 3, bet_a, winner = 2, bet_b, bet_c) AS winner_bet
FROM games
WHERE bet_a + bet_b + bet_c > 0
)
"""
)
bet_rank = {"high": 0, "mid": 0, "low": 0}
if rank_result.result_rows:
row = rank_result.result_rows[0]
bet_rank = {"high": row[0], "mid": row[1], "low": row[2]}
return {"chairs": dist, "bet_rank": bet_rank}
@_with_lock
def get_user_name(user_id: int) -> str | None:
"""Lookup user nickname from cache."""
client = get_client()
result = client.query(
"SELECT nick_name FROM users WHERE user_id = {uid:UInt64} LIMIT 1",
parameters={"uid": user_id},
)
if result.result_rows:
return result.result_rows[0][0] or None
return None
@_with_lock
def get_user_detail(user_id: int) -> dict | None:
"""Get full user profile + session betting stats."""
client = get_client()
# Profile
result = client.query(
"SELECT user_id, nick_name, rich_level, actor_level, gender, "
"consume_total, earn_total, is_actor, portrait "
"FROM users WHERE user_id = {uid:UInt64} LIMIT 1",
parameters={"uid": user_id},
)
if not result.result_rows:
return None
row = result.result_rows[0]
profile = {
"user_id": row[0],
"nick_name": row[1],
"rich_level": row[2],
"actor_level": row[3],
"gender": row[4],
"consume_total": row[5],
"earn_total": row[6],
"is_actor": row[7],
"portrait": row[8],
}
# Session betting stats
stats_result = client.query(
"""
SELECT
count() AS total_bets,
sum(b.bet_amount) AS total_wagered,
countDistinct(b.game_no) AS rounds_played,
countIf(b.chair = g.winner) AS wins,
countIf(b.chair != g.winner) AS losses,
toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9
- sumIf(b.bet_amount, b.chair != g.winner)) AS pnl
FROM bets b
LEFT JOIN games g ON b.game_no = g.game_no
WHERE b.user_id = {uid:UInt64}
""",
parameters={"uid": user_id},
)
if stats_result.result_rows:
sr = stats_result.result_rows[0]
profile["total_bets"] = sr[0]
profile["total_wagered"] = sr[1]
profile["rounds_played"] = sr[2]
profile["wins"] = sr[3]
profile["losses"] = sr[4]
profile["pnl"] = sr[5]
# Recent bets
bets_result = client.query(
"""
SELECT b.game_no, b.chair, b.bet_amount, b.total_bet,
g.winner, b.created_at
FROM bets b
LEFT JOIN games g ON b.game_no = g.game_no
WHERE b.user_id = {uid:UInt64}
ORDER BY b.created_at DESC
LIMIT 20
""",
parameters={"uid": user_id},
)
profile["recent_bets"] = []
for br in bets_result.result_rows:
profile["recent_bets"].append({
"game_no": br[0],
"chair": br[1],
"chair_name": config.CHAIRS.get(br[1], "?"),
"bet_amount": br[2],
"total_bet": br[3],
"winner": br[4],
"won": br[1] == br[4] if br[4] else None,
"created_at": str(br[5]),
})
return profile
@_with_lock
def get_biggest_winner() -> dict | None:
"""Get the single biggest winner by P&L this session."""
client = get_client()
result = client.query(
"""
SELECT
b.user_id,
any(u.nick_name) AS nick_name,
any(u.portrait) AS portrait,
any(u.rich_level) AS rich_level,
count() AS total_bets,
countIf(b.chair = g.winner) AS wins,
toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9
- sumIf(b.bet_amount, b.chair != g.winner)) AS pnl,
sum(b.bet_amount) AS total_wagered
FROM bets b
JOIN games g ON b.game_no = g.game_no
LEFT JOIN users u ON b.user_id = u.user_id
GROUP BY b.user_id
HAVING total_bets >= 3
ORDER BY pnl DESC
LIMIT 1
"""
)
if result.result_rows:
row = result.result_rows[0]
return {
"user_id": row[0],
"nick_name": row[1] or str(row[0]),
"portrait": row[2] or "",
"rich_level": row[3],
"total_bets": row[4],
"wins": row[5],
"pnl": row[6],
"total_wagered": row[7],
}
return None
@_with_lock
def get_analytics(period: str = "all") -> dict:
"""Get all analytics data for a given time period."""
client = get_client()
intervals = {
"1h": "now() - INTERVAL 1 HOUR",
"6h": "now() - INTERVAL 6 HOUR",
"24h": "now() - INTERVAL 24 HOUR",
"7d": "now() - INTERVAL 7 DAY",
}
cutoff_expr = intervals.get(period)
game_where = f"WHERE created_at >= {cutoff_expr}" if cutoff_expr else ""
bet_where = f"WHERE b.created_at >= {cutoff_expr}" if cutoff_expr else ""
# 1. Summary stats
summary_result = client.query(
f"SELECT count(), sum(total_pot), toInt64(avg(total_pot)) FROM games {game_where}"
)
sr = summary_result.result_rows[0] if summary_result.result_rows else (0, 0, 0)
# Bet counts
bet_summary = client.query(
f"SELECT count(), countDistinct(user_id) FROM bets {'WHERE created_at >= ' + cutoff_expr if cutoff_expr else ''}"
)
bs = bet_summary.result_rows[0] if bet_summary.result_rows else (0, 0)
summary = {
"total_games": sr[0],
"total_volume": int(sr[1] or 0),
"avg_pot": int(sr[2] or 0),
"total_bets_placed": bs[0],
"unique_bettors": bs[1],
}
# 2. Win distribution (chairs + bet rank)
dist_result = client.query(
f"SELECT winner, count() AS cnt FROM games {game_where} GROUP BY winner ORDER BY winner"
)
chairs_dist = {"A": 0, "B": 0, "C": 0}
for row in dist_result.result_rows:
chair = config.CHAIRS.get(row[0], "?")
if chair in chairs_dist:
chairs_dist[chair] = row[1]
rank_result = client.query(
f"""
SELECT
countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high,
countIf(winner_bet > least(bet_a, bet_b, bet_c)
AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid,
countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low
FROM (
SELECT
bet_a, bet_b, bet_c,
multiIf(winner = 3, bet_a, winner = 2, bet_b, bet_c) AS winner_bet
FROM games
{game_where + ' AND' if game_where else 'WHERE'} bet_a + bet_b + bet_c > 0
)
"""
)
bet_rank = {"high": 0, "mid": 0, "low": 0}
if rank_result.result_rows:
rr = rank_result.result_rows[0]
bet_rank = {"high": rr[0], "mid": rr[1], "low": rr[2]}
win_distribution = {"chairs": chairs_dist, "bet_rank": bet_rank}
# 3. Hand type distribution (winning hand types)
hand_type_result = client.query(
f"""
SELECT hand_type, count() AS cnt FROM (
SELECT multiIf(winner = 3, hand_type_a, winner = 2, hand_type_b, hand_type_c) AS hand_type
FROM games
{game_where}
)
WHERE hand_type > 0
GROUP BY hand_type
ORDER BY hand_type
"""
)
hand_type_distribution = {}
for row in hand_type_result.result_rows:
type_name = config.HAND_TYPES.get(row[0], f"Type {row[0]}")
hand_type_distribution[type_name] = row[1]
# 4. Leaderboard
leaderboard_result = client.query(
f"""
SELECT
b.user_id,
any(u.nick_name) AS nick_name,
count() AS total_bets,
countIf(b.chair = g.winner) AS wins,
countIf(b.chair != g.winner) AS losses,
toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9
- sumIf(b.bet_amount, b.chair != g.winner)) AS pnl,
sum(b.bet_amount) AS total_wagered
FROM bets b
JOIN games g ON b.game_no = g.game_no
LEFT JOIN users u ON b.user_id = u.user_id
{bet_where}
GROUP BY b.user_id
HAVING total_bets >= 3
ORDER BY pnl DESC
LIMIT 20
"""
)
leaderboard = []
for row in leaderboard_result.result_rows:
leaderboard.append({
"user_id": row[0],
"nick_name": row[1] or str(row[0]),
"total_bets": row[2],
"wins": row[3],
"losses": row[4],
"pnl": row[5],
"total_wagered": row[6],
})
# 5. Hourly volume
hourly_result = client.query(
f"""
SELECT
toStartOfHour(created_at) AS hour,
count() AS games,
sum(total_pot) AS volume
FROM games
{game_where}
GROUP BY hour
ORDER BY hour
"""
)
hourly_volume = []
for row in hourly_result.result_rows:
hourly_volume.append({
"hour": str(row[0]),
"games": row[1],
"volume": int(row[2] or 0),
})
# 6. Games list
games_result = client.query(
f"""
SELECT game_no, winner, total_pot, bet_a, bet_b, bet_c,
hand_a, hand_b, hand_c, hand_type_a, hand_type_b, hand_type_c,
cards_json, duration_s, created_at
FROM games
{game_where}
ORDER BY game_no DESC
LIMIT 200
"""
)
games = []
for row in games_result.result_rows:
games.append({
"game_no": row[0],
"winner": row[1],
"total_pot": row[2],
"bet_a": row[3],
"bet_b": row[4],
"bet_c": row[5],
"hand_a": row[6],
"hand_b": row[7],
"hand_c": row[8],
"hand_type_a": row[9],
"hand_type_b": row[10],
"hand_type_c": row[11],
"cards_json": row[12],
"duration_s": row[13],
"created_at": str(row[14]),
})
return {
"summary": summary,
"win_distribution": win_distribution,
"hand_type_distribution": hand_type_distribution,
"leaderboard": leaderboard,
"hourly_volume": hourly_volume,
"games": games,
}
@_with_lock
def get_pattern_analysis() -> dict:
"""Run all pattern analysis queries and return a single dict."""
client = get_client()
# 1. Chair win bias
result = client.query(
"SELECT winner, count() AS cnt FROM games GROUP BY winner ORDER BY winner"
)
chair_wins = {}
total_games = 0
for row in result.result_rows:
chair = config.CHAIRS.get(row[0], "?")
chair_wins[chair] = row[1]
total_games += row[1]
chair_bias = {"total_games": total_games}
for ch in ("A", "B", "C"):
wins = chair_wins.get(ch, 0)
pct = round(wins / total_games * 100, 2) if total_games else 0
chair_bias[ch] = {"wins": wins, "pct": pct}
# 2. Bet rank analysis — how often the highest/mid/lowest bet chair wins
rank_result = client.query("""
SELECT
countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high,
countIf(winner_bet > least(bet_a, bet_b, bet_c)
AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid,
countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low
FROM (
SELECT bet_a, bet_b, bet_c,
multiIf(winner = 3, bet_a, winner = 2, bet_b, bet_c) AS winner_bet
FROM games WHERE bet_a + bet_b + bet_c > 0
)
""")
bet_rank = {"high": 0, "mid": 0, "low": 0}
if rank_result.result_rows:
r = rank_result.result_rows[0]
bet_rank = {"high": r[0], "mid": r[1], "low": r[2]}
# 3. Per-chair bet rank — when chair X has max bet, how often does X win?
pcr = client.query("""
SELECT
countIf(bet_a >= greatest(bet_a, bet_b, bet_c) AND bet_a > 0) AS a_high,
countIf(bet_a >= greatest(bet_a, bet_b, bet_c) AND bet_a > 0 AND winner = 3) AS a_win,
countIf(bet_b >= greatest(bet_a, bet_b, bet_c) AND bet_b > 0) AS b_high,
countIf(bet_b >= greatest(bet_a, bet_b, bet_c) AND bet_b > 0 AND winner = 2) AS b_win,
countIf(bet_c >= greatest(bet_a, bet_b, bet_c) AND bet_c > 0) AS c_high,
countIf(bet_c >= greatest(bet_a, bet_b, bet_c) AND bet_c > 0 AND winner = 1) AS c_win
FROM games WHERE bet_a + bet_b + bet_c > 0
""")
per_chair_rank = {}
if pcr.result_rows:
r = pcr.result_rows[0]
for i, ch in enumerate(("A", "B", "C")):
has = r[i * 2]
wins = r[i * 2 + 1]
per_chair_rank[ch] = {
"has_highest": has, "wins": wins,
"win_pct": round(wins / has * 100, 2) if has else 0,
}
# 4. Hand type distribution by chair (all dealt hands, not just winners)
ht_result = client.query("""
SELECT 'A' AS chair, hand_type_a AS ht, count() AS cnt
FROM games WHERE hand_type_a > 0 GROUP BY ht
UNION ALL
SELECT 'B', hand_type_b, count()
FROM games WHERE hand_type_b > 0 GROUP BY hand_type_b
UNION ALL
SELECT 'C', hand_type_c, count()
FROM games WHERE hand_type_c > 0 GROUP BY hand_type_c
ORDER BY 1, 2
""")
hand_types_by_chair = {"A": {}, "B": {}, "C": {}}
for row in ht_result.result_rows:
ch = row[0]
type_name = config.HAND_TYPES.get(row[1], f"Type {row[1]}")
hand_types_by_chair[ch][type_name] = row[2]
# 5. Hand type win rates (winning hand type distribution)
htw = client.query("""
SELECT hand_type, count() AS cnt FROM (
SELECT multiIf(winner = 3, hand_type_a, winner = 2, hand_type_b, hand_type_c) AS hand_type
FROM games
) WHERE hand_type > 0
GROUP BY hand_type ORDER BY hand_type
""")
hand_type_wins = {}
for row in htw.result_rows:
type_name = config.HAND_TYPES.get(row[0], f"Type {row[0]}")
hand_type_wins[type_name] = row[1]
# 6. Pot size buckets — win rates by pot quartile
qr = client.query("""
SELECT
quantile(0.25)(total_pot) AS q1,
quantile(0.5)(total_pot) AS q2,
quantile(0.75)(total_pot) AS q3
FROM games
""")
pot_buckets = {}
if qr.result_rows:
q1, q2, q3 = int(qr.result_rows[0][0]), int(qr.result_rows[0][1]), int(qr.result_rows[0][2])
br = client.query(f"""
SELECT
multiIf(
total_pot <= {q1}, 'small',
total_pot <= {q2}, 'medium',
total_pot <= {q3}, 'large',
'whale'
) AS bucket,
winner, count() AS cnt
FROM games GROUP BY bucket, winner
""")
for row in br.result_rows:
bucket, chair_id, cnt = row[0], row[1], row[2]
chair = config.CHAIRS.get(chair_id, "?")
if bucket not in pot_buckets:
pot_buckets[bucket] = {"A": 0, "B": 0, "C": 0, "total": 0}
pot_buckets[bucket][chair] = cnt
pot_buckets[bucket]["total"] += cnt
pot_buckets["_ranges"] = {
"small": f"0{q1}", "medium": f"{q1+1}{q2}",
"large": f"{q2+1}{q3}", "whale": f">{q3}",
}
# 7. Streak analysis — compute in Python from ordered winners
streak_result = client.query(
"SELECT winner FROM games ORDER BY game_no ASC"
)
winners_list = [config.CHAIRS.get(r[0], "?") for r in streak_result.result_rows]
streaks = {}
for ch in ("A", "B", "C"):
max_s = cur = 0
for w in winners_list:
if w == ch:
cur += 1
max_s = max(max_s, cur)
else:
cur = 0
# current streak from the end
current = 0
for w in reversed(winners_list):
if w == ch:
current += 1
else:
break
streaks[ch] = {"max_streak": max_s, "current_streak": current}
# 8. Hourly patterns — win rates by hour of day
hr_result = client.query("""
SELECT toHour(created_at) AS hr, winner, count() AS cnt
FROM games GROUP BY hr, winner ORDER BY hr, winner
""")
hourly = {}
for row in hr_result.result_rows:
h = str(row[0])
chair = config.CHAIRS.get(row[1], "?")
if h not in hourly:
hourly[h] = {"A": 0, "B": 0, "C": 0, "total": 0}
hourly[h][chair] = row[2]
hourly[h]["total"] += row[2]
# 9. Recent (last 100) vs overall
recent = client.query("""
SELECT winner, count() AS cnt FROM (
SELECT winner FROM games ORDER BY game_no DESC LIMIT 100
) GROUP BY winner
""")
recent_dist = {"A": 0, "B": 0, "C": 0}
recent_total = 0
for row in recent.result_rows:
chair = config.CHAIRS.get(row[0], "?")
if chair in recent_dist:
recent_dist[chair] = row[1]
recent_total += row[1]
return {
"chair_bias": chair_bias,
"bet_rank": bet_rank,
"per_chair_rank": per_chair_rank,
"hand_types_by_chair": hand_types_by_chair,
"hand_type_wins": hand_type_wins,
"pot_buckets": pot_buckets,
"streaks": streaks,
"hourly": hourly,
"recent_vs_all": {
"recent": {"dist": recent_dist, "total": recent_total},
"all": {
"dist": {ch: chair_bias[ch]["wins"] for ch in ("A", "B", "C")},
"total": total_games,
},
},
}
@_with_lock
def get_hot_cold_players(n: int = 5) -> dict:
"""
Get players with highest and lowest P&L over their last 10 bets.
Returns {"hot": [...], "cold": [...]}.
"""
client = get_client()
sql = """
WITH ranked AS (
SELECT
b.user_id,
b.game_no,
b.chair,
b.bet_amount,
g.winner,
row_number() OVER (PARTITION BY b.user_id ORDER BY b.created_at DESC) AS rn
FROM bets b
JOIN games g ON b.game_no = g.game_no
),
last10 AS (
SELECT * FROM ranked WHERE rn <= 10
)
SELECT
l.user_id,
any(u.nick_name) AS nick_name,
count() AS total_bets,
countIf(l.chair = l.winner) AS wins,
toInt64(sumIf(l.bet_amount, l.chair = l.winner) * 1.9
- sumIf(l.bet_amount, l.chair != l.winner)) AS pnl
FROM last10 l
LEFT JOIN users u ON l.user_id = u.user_id
GROUP BY l.user_id
HAVING total_bets >= 5
ORDER BY pnl DESC
"""
result = client.query(sql)
all_players = []
for row in result.result_rows:
all_players.append({
"user_id": row[0],
"nick_name": row[1] or str(row[0]),
"total_bets": row[2],
"wins": row[3],
"pnl": row[4],
})
hot = [p for p in all_players if p["pnl"] > 0][:n]
cold = [p for p in all_players if p["pnl"] < 0][-n:]
cold.reverse() # most negative first
return {"hot": hot, "cold": cold}
# ---------------------------------------------------------------------------
# Prediction helpers (private, called inside the locked main function)
# ---------------------------------------------------------------------------
CHAIR_LABELS = ("A", "B", "C")
def _normal_cdf(x):
"""Abramowitz-Stegun approximation of the standard normal CDF."""
if x < -8:
return 0.0
if x > 8:
return 1.0
t = 1.0 / (1.0 + 0.2316419 * abs(x))
d = 0.3989422804014327 # 1/sqrt(2*pi)
p = d * math.exp(-x * x / 2.0) * (
t * (0.319381530 + t * (-0.356563782 + t * (1.781477937 + t * (-1.821255978 + t * 1.330274429))))
)
return 1.0 - p if x > 0 else p
def _markov_matrix_1(winners):
"""1st-order Markov transition matrix P(next | last)."""
counts = {a: {b: 0 for b in CHAIR_LABELS} for a in CHAIR_LABELS}
for i in range(len(winners) - 1):
prev, cur = winners[i], winners[i + 1]
if prev in counts and cur in CHAIR_LABELS:
counts[prev][cur] += 1
matrix = {}
for src in CHAIR_LABELS:
total = sum(counts[src].values())
matrix[src] = {dst: round(counts[src][dst] / total, 4) if total else 0 for dst in CHAIR_LABELS}
return matrix, counts
def _markov_matrix_2(winners):
"""2nd-order Markov transition matrix P(next | last two)."""
counts = {}
for a in CHAIR_LABELS:
for b in CHAIR_LABELS:
key = f"{a}{b}"
counts[key] = {c: 0 for c in CHAIR_LABELS}
for i in range(len(winners) - 2):
key = f"{winners[i]}{winners[i+1]}"
nxt = winners[i + 2]
if key in counts and nxt in CHAIR_LABELS:
counts[key][nxt] += 1
matrix = {}
for key in counts:
total = sum(counts[key].values())
matrix[key] = {dst: round(counts[key][dst] / total, 4) if total else 0 for dst in CHAIR_LABELS}
return matrix, counts
def _autocorrelation(winners, max_lag=5):
"""Pearson autocorrelation at lags 1..max_lag. Chairs encoded A=0,B=1,C=2."""
mapping = {"A": 0, "B": 1, "C": 2}
seq = [mapping.get(w, 0) for w in winners]
n = len(seq)
if n < max_lag + 2:
return [{"lag": i + 1, "r": 0, "significant": False} for i in range(max_lag)]
mean = sum(seq) / n
var = sum((x - mean) ** 2 for x in seq)
results = []
for lag in range(1, max_lag + 1):
if var == 0:
results.append({"lag": lag, "r": 0, "significant": False})
continue
cov = sum((seq[i] - mean) * (seq[i + lag] - mean) for i in range(n - lag))
r = round(cov / var, 4)
threshold = 1.96 / math.sqrt(n)
results.append({"lag": lag, "r": r, "significant": abs(r) > threshold})
return results
def _chi_squared_test(winners):
"""Chi-squared goodness-of-fit for uniform chair distribution (df=2)."""
n = len(winners)
if n == 0:
return {"chi2": 0, "p_value": 1, "significant": False, "counts": {c: 0 for c in CHAIR_LABELS}}
observed = {c: 0 for c in CHAIR_LABELS}
for w in winners:
if w in observed:
observed[w] += 1
expected = n / 3.0
chi2 = sum((observed[c] - expected) ** 2 / expected for c in CHAIR_LABELS)
p_value = math.exp(-chi2 / 2.0) # df=2 closed-form
return {
"chi2": round(chi2, 4),
"p_value": round(p_value, 6),
"significant": p_value < 0.05,
"counts": observed,
"expected": round(expected, 1),
}
def _runs_test(winners):
"""Wald-Wolfowitz runs test for randomness."""
if len(winners) < 10:
return {"runs": 0, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
# Count runs (sequences of same chair)
runs = 1
for i in range(1, len(winners)):
if winners[i] != winners[i - 1]:
runs += 1
n = len(winners)
counts = {c: 0 for c in CHAIR_LABELS}
for w in winners:
if w in counts:
counts[w] += 1
# Expected runs and variance for k categories
n_vals = [counts[c] for c in CHAIR_LABELS if counts[c] > 0]
sum_ni2 = sum(ni ** 2 for ni in n_vals)
expected_runs = 1 + (n * n - sum_ni2) / n
if n <= 1:
return {"runs": runs, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
var_num = sum_ni2 * (sum_ni2 + n * n) - 2 * n * sum(ni ** 3 for ni in n_vals) - n ** 3
var_den = n * n * (n - 1)
variance = var_num / var_den if var_den > 0 else 1
if variance <= 0:
return {"runs": runs, "z_score": 0, "p_value": 1, "interpretation": "Not enough data"}
z = (runs - expected_runs) / math.sqrt(variance)
p_value = 2 * (1 - _normal_cdf(abs(z)))
if p_value < 0.05:
interpretation = "Too few runs (streaky)" if z < 0 else "Too many runs (alternating)"
else:
interpretation = "Random (no significant pattern)"
return {
"runs": runs,
"expected_runs": round(expected_runs, 1),
"z_score": round(z, 4),
"p_value": round(p_value, 6),
"significant": p_value < 0.05,
"interpretation": interpretation,
}
def _bayesian_prediction(winners, markov1, markov2):
"""Weighted Bayesian prediction combining 5 signals."""
if len(winners) < 3:
return {c: round(1 / 3, 4) for c in CHAIR_LABELS}, {}
# Signal 1: Base rate (overall frequency) — 20%
total = len(winners)
base = {c: winners.count(c) / total for c in CHAIR_LABELS}
# Signal 2: 1st-order Markov — 30%
last = winners[-1]
m1 = markov1.get(last, {c: 1 / 3 for c in CHAIR_LABELS})
# Signal 3: 2nd-order Markov — 25%
key2 = f"{winners[-2]}{winners[-1]}"
m2 = markov2.get(key2, {c: 1 / 3 for c in CHAIR_LABELS})
# Signal 4: Recent 50-game frequency — 15%
recent = winners[-50:] if len(winners) >= 50 else winners
recent_total = len(recent)
rec = {c: recent.count(c) / recent_total for c in CHAIR_LABELS}
# Signal 5: Streak regression — 5%
streak_chair = winners[-1]
streak_len = 0
for w in reversed(winners):
if w == streak_chair:
streak_len += 1
else:
break
# Regression to mean: longer streaks → lower probability of continuation
cont_prob = max(0.1, 1 / 3 - streak_len * 0.05)
other_prob = (1 - cont_prob) / 2
streak = {c: (cont_prob if c == streak_chair else other_prob) for c in CHAIR_LABELS}
# Signal 6: Balance / Mean Reversion — 10%
# Look at last 50 games, invert frequencies to favor under-represented chairs
window = min(50, len(winners))
recent_50 = winners[-window:]
freq = {c: recent_50.count(c) / window for c in CHAIR_LABELS}
balance = {c: max(0.01, 2 / 3 - freq[c]) for c in CHAIR_LABELS}
bal_total = sum(balance.values())
balance = {c: balance[c] / bal_total for c in CHAIR_LABELS}
weights = {"base_rate": 0.20, "markov_1": 0.25, "markov_2": 0.25, "recent_50": 0.15, "streak": 0.05, "balance": 0.10}
signals = {"base_rate": base, "markov_1": m1, "markov_2": m2, "recent_50": rec, "streak": streak, "balance": balance}
combined = {c: 0 for c in CHAIR_LABELS}
for sig_name, weight in weights.items():
for c in CHAIR_LABELS:
combined[c] += weight * signals[sig_name].get(c, 1 / 3)
# Normalize
c_total = sum(combined.values())
if c_total > 0:
combined = {c: round(combined[c] / c_total, 4) for c in CHAIR_LABELS}
# Round signal values for output
signal_detail = {}
for sig_name, sig_vals in signals.items():
signal_detail[sig_name] = {
"weight": weights[sig_name],
"probs": {c: round(sig_vals.get(c, 0), 4) for c in CHAIR_LABELS},
}
return combined, signal_detail
def _card_value_distribution(cards_data):
"""Count of each card value (AK) per chair."""
value_names = ["A", "2", "3", "4", "5", "6", "7", "8", "9", "10", "J", "Q", "K"]
dist = {c: {v: 0 for v in value_names} for c in CHAIR_LABELS}
for cards_json_str, _ in cards_data:
try:
infos = json.loads(cards_json_str)
except (json.JSONDecodeError, TypeError):
continue
for p in infos:
chair = config.CHAIRS.get(p.get("country"), None)
if chair not in dist:
continue
for card in p.get("cards", []):
val = config.VALUES.get(card.get("cardValue"), None)
if val and val in dist[chair]:
dist[chair][val] += 1
return {"labels": value_names, "chairs": dist}
def _face_card_frequency(cards_data):
"""Percentage of face cards (J, Q, K, A) per chair."""
face_vals = {"J", "Q", "K", "A"}
face_counts = {c: 0 for c in CHAIR_LABELS}
total_counts = {c: 0 for c in CHAIR_LABELS}
for cards_json_str, _ in cards_data:
try:
infos = json.loads(cards_json_str)
except (json.JSONDecodeError, TypeError):
continue
for p in infos:
chair = config.CHAIRS.get(p.get("country"), None)
if chair not in face_counts:
continue
for card in p.get("cards", []):
val = config.VALUES.get(card.get("cardValue"), None)
if val:
total_counts[chair] += 1
if val in face_vals:
face_counts[chair] += 1
result = {}
for c in CHAIR_LABELS:
pct = round(face_counts[c] / total_counts[c] * 100, 2) if total_counts[c] else 0
result[c] = {"face_cards": face_counts[c], "total_cards": total_counts[c], "pct": pct}
return result
def _suit_distribution(cards_data):
"""Suit counts per chair."""
suit_names = ["\u2660", "\u2665", "\u2663", "\u2666"]
dist = {c: {s: 0 for s in suit_names} for c in CHAIR_LABELS}
for cards_json_str, _ in cards_data:
try:
infos = json.loads(cards_json_str)
except (json.JSONDecodeError, TypeError):
continue
for p in infos:
chair = config.CHAIRS.get(p.get("country"), None)
if chair not in dist:
continue
for card in p.get("cards", []):
suit = config.SUITS.get(card.get("cardColor"), None)
if suit and suit in dist[chair]:
dist[chair][suit] += 1
return {"labels": suit_names, "chairs": dist}
def _winning_card_patterns(cards_data):
"""Top 20 individual cards appearing in winning hands."""
card_counts = {}
for cards_json_str, winner in cards_data:
try:
infos = json.loads(cards_json_str)
except (json.JSONDecodeError, TypeError):
continue
for p in infos:
chair = config.CHAIRS.get(p.get("country"), None)
if chair is None:
continue
# Check if this chair won: winner is stored as chair_id (1=C, 2=B, 3=A)
if config.CHAIRS.get(winner) != chair:
continue
for card in p.get("cards", []):
val = config.VALUES.get(card.get("cardValue"), None)
suit = config.SUITS.get(card.get("cardColor"), None)
if val and suit:
label = f"{val}{suit}"
card_counts[label] = card_counts.get(label, 0) + 1
sorted_cards = sorted(card_counts.items(), key=lambda x: x[1], reverse=True)[:20]
return [{"card": c, "count": n} for c, n in sorted_cards]
def _backtest_theories(winners):
"""Backtest all prediction theories on historical data."""
warmup = 30
if len(winners) <= warmup:
return {"error": "Not enough data for backtesting"}
theories = ["base_rate", "markov_1", "markov_2", "recent_50", "streak", "balance", "combined"]
full_hits = {t: 0 for t in theories}
semi_hits = {t: 0 for t in theories}
total_tested = 0
rolling = {t: [] for t in theories} # rolling accuracy over last 200
for i in range(warmup, len(winners)):
history = winners[:i]
actual = winners[i]
total_tested += 1
total_h = len(history)
# Base rate
base = {c: history.count(c) / total_h for c in CHAIR_LABELS}
base_ranked = sorted(CHAIR_LABELS, key=lambda c: base[c], reverse=True)
# Markov-1
m1, _ = _markov_matrix_1(history)
last = history[-1]
m1_probs = m1.get(last, {c: 1 / 3 for c in CHAIR_LABELS})
m1_ranked = sorted(CHAIR_LABELS, key=lambda c: m1_probs.get(c, 0), reverse=True)
# Markov-2
m2, _ = _markov_matrix_2(history)
key2 = f"{history[-2]}{history[-1]}"
m2_probs = m2.get(key2, {c: 1 / 3 for c in CHAIR_LABELS})
m2_ranked = sorted(CHAIR_LABELS, key=lambda c: m2_probs.get(c, 0), reverse=True)
# Recent-50
recent = history[-50:] if len(history) >= 50 else history
rec = {c: recent.count(c) / len(recent) for c in CHAIR_LABELS}
rec_ranked = sorted(CHAIR_LABELS, key=lambda c: rec[c], reverse=True)
# Streak
streak_chair = history[-1]
streak_len = 0
for w in reversed(history):
if w == streak_chair:
streak_len += 1
else:
break
cont_prob = max(0.1, 1 / 3 - streak_len * 0.05)
other_prob = (1 - cont_prob) / 2
streak_probs = {c: (cont_prob if c == streak_chair else other_prob) for c in CHAIR_LABELS}
streak_ranked = sorted(CHAIR_LABELS, key=lambda c: streak_probs[c], reverse=True)
# Balance / Mean Reversion
bal_window = min(50, len(history))
bal_recent = history[-bal_window:]
bal_freq = {c: bal_recent.count(c) / bal_window for c in CHAIR_LABELS}
bal_probs = {c: max(0.01, 2 / 3 - bal_freq[c]) for c in CHAIR_LABELS}
bal_t = sum(bal_probs.values())
bal_probs = {c: bal_probs[c] / bal_t for c in CHAIR_LABELS}
bal_ranked = sorted(CHAIR_LABELS, key=lambda c: bal_probs[c], reverse=True)
# Combined Bayesian
combined = {c: 0 for c in CHAIR_LABELS}
weights = {"base_rate": 0.20, "markov_1": 0.25, "markov_2": 0.25, "recent_50": 0.15, "streak": 0.05, "balance": 0.10}
signals = {"base_rate": base, "markov_1": m1_probs, "markov_2": m2_probs, "recent_50": rec, "streak": streak_probs, "balance": bal_probs}
for sig_name, weight in weights.items():
for c in CHAIR_LABELS:
combined[c] += weight * signals[sig_name].get(c, 1 / 3)
combined_ranked = sorted(CHAIR_LABELS, key=lambda c: combined[c], reverse=True)
ranked = {
"base_rate": base_ranked, "markov_1": m1_ranked, "markov_2": m2_ranked,
"recent_50": rec_ranked, "streak": streak_ranked, "balance": bal_ranked,
"combined": combined_ranked,
}
for t in theories:
pick = ranked[t][0]
second = ranked[t][1]
if pick == actual:
full_hits[t] += 1
rolling[t].append(1.0)
elif second == actual:
semi_hits[t] += 1
rolling[t].append(0.5)
else:
rolling[t].append(0.0)
accuracy = {
t: round((full_hits[t] + semi_hits[t] * 0.5) / total_tested * 100, 2) if total_tested else 0
for t in theories
}
# Rolling accuracy over last 200 games
window = 200
rolling_accuracy = {t: [] for t in theories}
for t in theories:
data = rolling[t]
for j in range(len(data)):
start = max(0, j - window + 1)
chunk = data[start:j + 1]
rolling_accuracy[t].append(round(sum(chunk) / len(chunk) * 100, 2))
# Only keep last 200 points for the chart
for t in theories:
rolling_accuracy[t] = rolling_accuracy[t][-window:]
return {
"total_tested": total_tested,
"accuracy": accuracy,
"full_hits": {t: full_hits[t] for t in theories},
"semi_hits": {t: semi_hits[t] for t in theories},
"rolling_accuracy": rolling_accuracy,
"random_baseline": 33.33,
}
def _compute_whale_public_picks(client, game_nos):
"""Compute whale pick and public pick for a list of game_nos from bets data.
Returns {game_no: {whale_pick, public_pick, whale_count, public_count, bettor_counts}}.
"""
if not game_nos:
return {}
game_nos_csv = ",".join(str(g) for g in game_nos)
result = client.query(
f"SELECT game_no, user_id, chair, sum(bet_amount) AS total_user_bet "
f"FROM bets WHERE game_no IN ({game_nos_csv}) "
f"GROUP BY game_no, user_id, chair"
)
# Collect per-game, per-user, per-chair totals
# game_data[game_no][user_id][chair] = amount
game_data = {}
for row in result.result_rows:
gno, uid, chair_id, amount = row[0], row[1], row[2], row[3]
if gno not in game_data:
game_data[gno] = {}
if uid not in game_data[gno]:
game_data[gno][uid] = {}
chair_name = config.CHAIRS.get(chair_id, "?")
if chair_name in CHAIR_LABELS:
game_data[gno][uid][chair_name] = game_data[gno][uid].get(chair_name, 0) + amount
picks = {}
for gno in game_nos:
users = game_data.get(gno, {})
if not users:
picks[gno] = {"whale_pick": None, "whale_second_pick": None,
"public_pick": None, "public_second_pick": None,
"whale_count": 0, "public_count": 0,
"bettor_counts": {"A": 0, "B": 0, "C": 0}}
continue
# Rank users by total bet across all chairs
user_totals = []
for uid, chairs in users.items():
user_totals.append((uid, sum(chairs.values())))
user_totals.sort(key=lambda x: x[1], reverse=True)
whale_uids = set(uid for uid, _ in user_totals[:5])
pub_uids = set(uid for uid, _ in user_totals[5:])
# Sum whale money per chair
whale_money = {"A": 0, "B": 0, "C": 0}
for uid in whale_uids:
for c in CHAIR_LABELS:
whale_money[c] += users[uid].get(c, 0)
whale_total = sum(whale_money.values())
if whale_total > 0:
whale_ranked = sorted(CHAIR_LABELS, key=lambda c: whale_money[c], reverse=True)
whale_pick = whale_ranked[0]
whale_second_pick = whale_ranked[1]
else:
whale_pick = None
whale_second_pick = None
# Sum public money per chair
pub_money = {"A": 0, "B": 0, "C": 0}
for uid in pub_uids:
for c in CHAIR_LABELS:
pub_money[c] += users[uid].get(c, 0)
pub_total = sum(pub_money.values())
total_bettors = len(user_totals)
if pub_total > 0 and total_bettors > 5:
pub_ranked = sorted(CHAIR_LABELS, key=lambda c: pub_money[c], reverse=True)
public_pick = pub_ranked[0]
public_second_pick = pub_ranked[1]
else:
public_pick = None
public_second_pick = None
# Count bettors per chair
bettor_counts = {"A": 0, "B": 0, "C": 0}
for uid, chairs in users.items():
for c in CHAIR_LABELS:
if chairs.get(c, 0) > 0:
bettor_counts[c] += 1
picks[gno] = {
"whale_pick": whale_pick,
"whale_second_pick": whale_second_pick,
"public_pick": public_pick,
"public_second_pick": public_second_pick,
"whale_count": len(whale_uids),
"public_count": len(pub_uids),
"bettor_counts": bettor_counts,
}
return picks
def _last_n_predictions(winners, n=20):
"""Get detailed prediction vs actual for the last N games."""
warmup = 30
if len(winners) <= warmup:
return []
start = max(warmup, len(winners) - n)
results = []
for i in range(start, len(winners)):
history = winners[:i]
actual = winners[i]
m1, _ = _markov_matrix_1(history)
m2, _ = _markov_matrix_2(history)
pred, _ = _bayesian_prediction(history, m1, m2)
ranked = sorted(CHAIR_LABELS, key=lambda c: pred[c], reverse=True)
predicted = ranked[0]
second_predicted = ranked[1]
results.append({
"index": i,
"predicted": predicted,
"second_predicted": second_predicted,
"actual": actual,
"correct": predicted == actual,
"semi_correct": predicted != actual and second_predicted == actual,
"probs": {c: round(pred[c], 4) for c in CHAIR_LABELS},
})
return results
@_with_lock
def get_prediction_analysis() -> dict:
"""Run all prediction/game-theory analysis and return results."""
client = get_client()
# Query 1: Full winner sequence with game numbers
result = client.query("SELECT game_no, winner FROM games ORDER BY game_no ASC")
game_nos = [r[0] for r in result.result_rows if config.CHAIRS.get(r[1], "?") in CHAIR_LABELS]
winners = [config.CHAIRS.get(r[1], "?") for r in result.result_rows]
winners = [w for w in winners if w in CHAIR_LABELS] # filter unknowns
# Query 2: Card data for last 500 games
cards_result = client.query(
"SELECT cards_json, winner FROM games WHERE cards_json != '' ORDER BY game_no DESC LIMIT 500"
)
cards_data = [(r[0], r[1]) for r in cards_result.result_rows]
# Markov matrices
markov1, markov1_counts = _markov_matrix_1(winners)
markov2, markov2_counts = _markov_matrix_2(winners)
# Autocorrelation
autocorrelation = _autocorrelation(winners)
# Chi-squared test
chi_squared = _chi_squared_test(winners)
# Runs test
runs_test = _runs_test(winners)
# Bayesian prediction
prediction, signals = _bayesian_prediction(winners, markov1, markov2)
# Backtesting
backtest = _backtest_theories(winners)
# Last 50 prediction vs actual
last_50_raw = _last_n_predictions(winners, 50)
# Attach game_nos to last_50
for entry in last_50_raw:
idx = entry["index"]
entry["game_no"] = game_nos[idx] if idx < len(game_nos) else 0
# Merge whale/public picks into last_50
last_50_game_nos = [e["game_no"] for e in last_50_raw if e.get("game_no")]
wp_data = _compute_whale_public_picks(client, last_50_game_nos)
for entry in last_50_raw:
gno = entry.get("game_no", 0)
wp = wp_data.get(gno, {})
entry["whale_pick"] = wp.get("whale_pick")
entry["whale_second_pick"] = wp.get("whale_second_pick")
entry["public_pick"] = wp.get("public_pick")
entry["public_second_pick"] = wp.get("public_second_pick")
entry["bettor_counts"] = wp.get("bettor_counts", {"A": 0, "B": 0, "C": 0})
actual = entry["actual"]
entry["whale_hit"] = (wp.get("whale_pick") == actual) if wp.get("whale_pick") else None
entry["whale_semi"] = (not entry["whale_hit"] and wp.get("whale_second_pick") == actual) if wp.get("whale_pick") else None
entry["public_hit"] = (wp.get("public_pick") == actual) if wp.get("public_pick") else None
entry["public_semi"] = (not entry["public_hit"] and wp.get("public_second_pick") == actual) if wp.get("public_pick") else None
# Card analysis
card_values = _card_value_distribution(cards_data)
face_cards = _face_card_frequency(cards_data)
suits = _suit_distribution(cards_data)
winning_cards = _winning_card_patterns(cards_data)
# Bet rank analysis — how often the winning chair had high/mid/low bet
rank_result = client.query("""
SELECT
countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high,
countIf(winner_bet > least(bet_a, bet_b, bet_c)
AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid,
countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low
FROM (
SELECT bet_a, bet_b, bet_c,
multiIf(winner = 3, bet_a, winner = 2, bet_b, bet_c) AS winner_bet
FROM games WHERE bet_a + bet_b + bet_c > 0
)
""")
bet_rank = {"high": 0, "mid": 0, "low": 0}
if rank_result.result_rows:
r = rank_result.result_rows[0]
bet_rank = {"high": r[0], "mid": r[1], "low": r[2]}
return {
"total_games": len(winners),
"last_winners": winners[-10:] if len(winners) >= 10 else winners,
"last_20_predictions": last_50_raw,
"prediction": prediction,
"signals": signals,
"markov1": {"matrix": markov1, "counts": {k: dict(v) for k, v in markov1_counts.items()}},
"markov2": {"matrix": markov2, "counts": {k: dict(v) for k, v in markov2_counts.items()}},
"autocorrelation": autocorrelation,
"chi_squared": chi_squared,
"runs_test": runs_test,
"backtest": backtest,
"bet_rank": bet_rank,
"card_values": card_values,
"face_cards": face_cards,
"suits": suits,
"winning_cards": winning_cards,
}
@_with_lock
def get_prediction_history(limit: int = 100) -> dict:
"""Get prediction history with whale/public picks and accuracy summary."""
client = get_client()
# Full winner sequence with game numbers
result = client.query("SELECT game_no, winner FROM games ORDER BY game_no ASC")
game_nos = [r[0] for r in result.result_rows if config.CHAIRS.get(r[1], "?") in CHAIR_LABELS]
winners = [config.CHAIRS.get(r[1], "?") for r in result.result_rows]
winners = [w for w in winners if w in CHAIR_LABELS]
# Get last N predictions
predictions = _last_n_predictions(winners, limit)
# Attach game_nos
for entry in predictions:
idx = entry["index"]
entry["game_no"] = game_nos[idx] if idx < len(game_nos) else 0
# Compute whale/public picks
pred_game_nos = [e["game_no"] for e in predictions if e.get("game_no")]
wp_data = _compute_whale_public_picks(client, pred_game_nos)
# Merge and compute accuracy
whale_hits = 0
whale_semi_hits = 0
whale_total = 0
public_hits = 0
public_semi_hits = 0
public_total = 0
model_full_hits = 0
model_semi_hits = 0
for entry in predictions:
gno = entry.get("game_no", 0)
wp = wp_data.get(gno, {})
entry["whale_pick"] = wp.get("whale_pick")
entry["whale_second_pick"] = wp.get("whale_second_pick")
entry["public_pick"] = wp.get("public_pick")
entry["public_second_pick"] = wp.get("public_second_pick")
entry["bettor_counts"] = wp.get("bettor_counts", {"A": 0, "B": 0, "C": 0})
actual = entry["actual"]
entry["whale_hit"] = (wp.get("whale_pick") == actual) if wp.get("whale_pick") else None
entry["whale_semi"] = (not entry["whale_hit"] and wp.get("whale_second_pick") == actual) if wp.get("whale_pick") else None
entry["public_hit"] = (wp.get("public_pick") == actual) if wp.get("public_pick") else None
entry["public_semi"] = (not entry["public_hit"] and wp.get("public_second_pick") == actual) if wp.get("public_pick") else None
# Remove internal index from output
del entry["index"]
# Accumulate accuracy
if entry["correct"]:
model_full_hits += 1
elif entry["semi_correct"]:
model_semi_hits += 1
if entry["whale_hit"] is not None:
whale_total += 1
if entry["whale_hit"]:
whale_hits += 1
elif entry.get("whale_semi"):
whale_semi_hits += 1
if entry["public_hit"] is not None:
public_total += 1
if entry["public_hit"]:
public_hits += 1
elif entry.get("public_semi"):
public_semi_hits += 1
total_pred = len(predictions)
model_score = model_full_hits + model_semi_hits * 0.5
whale_score = whale_hits + whale_semi_hits * 0.5
public_score = public_hits + public_semi_hits * 0.5
return {
"total_games": len(winners),
"limit": limit,
"predictions": predictions,
"accuracy": {
"model": {
"hits": model_full_hits,
"semi": model_semi_hits,
"total": total_pred,
"pct": round(model_score / total_pred * 100, 1) if total_pred else 0,
},
"whale": {
"hits": whale_hits,
"semi": whale_semi_hits,
"total": whale_total,
"pct": round(whale_score / whale_total * 100, 1) if whale_total else 0,
},
"public": {
"hits": public_hits,
"semi": public_semi_hits,
"total": public_total,
"pct": round(public_score / public_total * 100, 1) if public_total else 0,
},
},
}