""" ClickHouse database operations. """ import json import logging import threading import time import clickhouse_connect from . import config log = logging.getLogger(__name__) _client = None _lock = threading.Lock() _migrations_applied = False def get_client(): global _client if _client is None: for attempt in range(30): try: _client = clickhouse_connect.get_client( host=config.CLICKHOUSE_HOST, port=config.CLICKHOUSE_PORT, ) _client.ping() log.info("Connected to ClickHouse at %s:%s", config.CLICKHOUSE_HOST, config.CLICKHOUSE_PORT) return _client except Exception as e: log.warning("ClickHouse not ready (attempt %d): %s", attempt + 1, e) time.sleep(2) raise RuntimeError("Could not connect to ClickHouse") return _client def run_migrations(): """Run one-time data migrations on startup.""" global _migrations_applied if _migrations_applied: return client = get_client() client.command( "CREATE TABLE IF NOT EXISTS _migrations (" " name String, applied_at DateTime DEFAULT now()" ") ENGINE = MergeTree() ORDER BY name" ) result = client.query( "SELECT count() FROM _migrations WHERE name = 'swap_ac_chairs'" ) if result.result_rows[0][0] == 0: log.info("Running migration: swap_ac_chairs") client.command( "ALTER TABLE games UPDATE " "hand_a = hand_c, hand_c = hand_a, " "bet_a = bet_c, bet_c = bet_a, " "hand_type_a = hand_type_c, hand_type_c = hand_type_a " "WHERE 1=1" ) client.insert("_migrations", [["swap_ac_chairs"]], column_names=["name"]) log.info("Migration swap_ac_chairs applied") _migrations_applied = True def _with_lock(fn): """Decorator to serialize all ClickHouse operations.""" def wrapper(*args, **kwargs): with _lock: return fn(*args, **kwargs) wrapper.__name__ = fn.__name__ return wrapper @_with_lock def insert_game(game: dict): """Insert a completed round into the games table.""" client = get_client() client.insert("games", [[ game["game_no"], game["winner"], game["total_pot"], game.get("bet_a", 0), game.get("bet_b", 0), game.get("bet_c", 0), game.get("hand_a", ""), game.get("hand_b", ""), game.get("hand_c", ""), game.get("hand_type_a", 0), game.get("hand_type_b", 0), game.get("hand_type_c", 0), game.get("cards_json", ""), game.get("duration_s", 0), ]], column_names=[ "game_no", "winner", "total_pot", "bet_a", "bet_b", "bet_c", "hand_a", "hand_b", "hand_c", "hand_type_a", "hand_type_b", "hand_type_c", "cards_json", "duration_s", ], ) @_with_lock def insert_bet(bet: dict): """Insert an individual user bet into the bets table.""" client = get_client() client.insert("bets", [[ bet["game_no"], bet["user_id"], bet["chair"], bet["bet_amount"], bet["total_bet"], ]], column_names=["game_no", "user_id", "chair", "bet_amount", "total_bet"], ) @_with_lock def upsert_user(user: dict): """Insert/update a user profile.""" client = get_client() client.insert("users", [[ user["user_id"], user.get("nick_name", ""), user.get("rich_level", 0), user.get("actor_level", 0), user.get("gender", 0), user.get("consume_total", 0), user.get("earn_total", 0), user.get("is_actor", 0), user.get("portrait", ""), ]], column_names=[ "user_id", "nick_name", "rich_level", "actor_level", "gender", "consume_total", "earn_total", "is_actor", "portrait", ], ) @_with_lock def get_recent_games(n: int = 50) -> list[dict]: """Get last N completed games.""" client = get_client() result = client.query( "SELECT game_no, winner, total_pot, bet_a, bet_b, bet_c, " "hand_a, hand_b, hand_c, hand_type_a, hand_type_b, hand_type_c, " "cards_json, duration_s, created_at " "FROM games ORDER BY game_no DESC LIMIT {n:UInt32}", parameters={"n": n}, ) games = [] for row in result.result_rows: games.append({ "game_no": row[0], "winner": row[1], "total_pot": row[2], "bet_a": row[3], "bet_b": row[4], "bet_c": row[5], "hand_a": row[6], "hand_b": row[7], "hand_c": row[8], "hand_type_a": row[9], "hand_type_b": row[10], "hand_type_c": row[11], "cards_json": row[12], "duration_s": row[13], "created_at": str(row[14]), }) return games @_with_lock def get_leaderboard(n: int = 10) -> list[dict]: """ Get top N users by P&L. P&L = sum of winning bets * 1.9 - sum of losing bets. With 2.9x fixed payout: win → +1.9x bet, loss → -1.0x bet. """ client = get_client() result = client.query( """ SELECT b.user_id, any(u.nick_name) AS nick_name, count() AS total_bets, countIf(b.chair = g.winner) AS wins, countIf(b.chair != g.winner) AS losses, toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9 - sumIf(b.bet_amount, b.chair != g.winner)) AS pnl, sum(b.bet_amount) AS total_wagered FROM bets b JOIN games g ON b.game_no = g.game_no LEFT JOIN users u ON b.user_id = u.user_id GROUP BY b.user_id HAVING total_bets >= 3 ORDER BY pnl DESC LIMIT {n:UInt32} """, parameters={"n": n}, ) leaders = [] for row in result.result_rows: leaders.append({ "user_id": row[0], "nick_name": row[1] or str(row[0]), "total_bets": row[2], "wins": row[3], "losses": row[4], "pnl": row[5], "total_wagered": row[6], }) return leaders @_with_lock def get_win_distribution() -> dict: """Get win counts per chair + bet rank distribution.""" client = get_client() result = client.query( "SELECT winner, count() AS cnt FROM games GROUP BY winner ORDER BY winner" ) dist = {"A": 0, "B": 0, "C": 0} for row in result.result_rows: chair = config.CHAIRS.get(row[0], "?") if chair in dist: dist[chair] = row[1] # Bet rank distribution: how often the winning chair had high/mid/low bet rank_result = client.query( """ SELECT countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high, countIf(winner_bet > least(bet_a, bet_b, bet_c) AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid, countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low FROM ( SELECT bet_a, bet_b, bet_c, multiIf(winner = 3, bet_a, winner = 2, bet_b, bet_c) AS winner_bet FROM games WHERE bet_a + bet_b + bet_c > 0 ) """ ) bet_rank = {"high": 0, "mid": 0, "low": 0} if rank_result.result_rows: row = rank_result.result_rows[0] bet_rank = {"high": row[0], "mid": row[1], "low": row[2]} return {"chairs": dist, "bet_rank": bet_rank} @_with_lock def get_user_name(user_id: int) -> str | None: """Lookup user nickname from cache.""" client = get_client() result = client.query( "SELECT nick_name FROM users WHERE user_id = {uid:UInt64} LIMIT 1", parameters={"uid": user_id}, ) if result.result_rows: return result.result_rows[0][0] or None return None @_with_lock def get_user_detail(user_id: int) -> dict | None: """Get full user profile + session betting stats.""" client = get_client() # Profile result = client.query( "SELECT user_id, nick_name, rich_level, actor_level, gender, " "consume_total, earn_total, is_actor, portrait " "FROM users WHERE user_id = {uid:UInt64} LIMIT 1", parameters={"uid": user_id}, ) if not result.result_rows: return None row = result.result_rows[0] profile = { "user_id": row[0], "nick_name": row[1], "rich_level": row[2], "actor_level": row[3], "gender": row[4], "consume_total": row[5], "earn_total": row[6], "is_actor": row[7], "portrait": row[8], } # Session betting stats stats_result = client.query( """ SELECT count() AS total_bets, sum(b.bet_amount) AS total_wagered, countDistinct(b.game_no) AS rounds_played, countIf(b.chair = g.winner) AS wins, countIf(b.chair != g.winner) AS losses, toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9 - sumIf(b.bet_amount, b.chair != g.winner)) AS pnl FROM bets b LEFT JOIN games g ON b.game_no = g.game_no WHERE b.user_id = {uid:UInt64} """, parameters={"uid": user_id}, ) if stats_result.result_rows: sr = stats_result.result_rows[0] profile["total_bets"] = sr[0] profile["total_wagered"] = sr[1] profile["rounds_played"] = sr[2] profile["wins"] = sr[3] profile["losses"] = sr[4] profile["pnl"] = sr[5] # Recent bets bets_result = client.query( """ SELECT b.game_no, b.chair, b.bet_amount, b.total_bet, g.winner, b.created_at FROM bets b LEFT JOIN games g ON b.game_no = g.game_no WHERE b.user_id = {uid:UInt64} ORDER BY b.created_at DESC LIMIT 20 """, parameters={"uid": user_id}, ) profile["recent_bets"] = [] for br in bets_result.result_rows: profile["recent_bets"].append({ "game_no": br[0], "chair": br[1], "chair_name": config.CHAIRS.get(br[1], "?"), "bet_amount": br[2], "total_bet": br[3], "winner": br[4], "won": br[1] == br[4] if br[4] else None, "created_at": str(br[5]), }) return profile @_with_lock def get_biggest_winner() -> dict | None: """Get the single biggest winner by P&L this session.""" client = get_client() result = client.query( """ SELECT b.user_id, any(u.nick_name) AS nick_name, any(u.portrait) AS portrait, any(u.rich_level) AS rich_level, count() AS total_bets, countIf(b.chair = g.winner) AS wins, toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9 - sumIf(b.bet_amount, b.chair != g.winner)) AS pnl, sum(b.bet_amount) AS total_wagered FROM bets b JOIN games g ON b.game_no = g.game_no LEFT JOIN users u ON b.user_id = u.user_id GROUP BY b.user_id HAVING total_bets >= 3 ORDER BY pnl DESC LIMIT 1 """ ) if result.result_rows: row = result.result_rows[0] return { "user_id": row[0], "nick_name": row[1] or str(row[0]), "portrait": row[2] or "", "rich_level": row[3], "total_bets": row[4], "wins": row[5], "pnl": row[6], "total_wagered": row[7], } return None @_with_lock def get_analytics(period: str = "all") -> dict: """Get all analytics data for a given time period.""" client = get_client() intervals = { "1h": "now() - INTERVAL 1 HOUR", "6h": "now() - INTERVAL 6 HOUR", "24h": "now() - INTERVAL 24 HOUR", "7d": "now() - INTERVAL 7 DAY", } cutoff_expr = intervals.get(period) game_where = f"WHERE created_at >= {cutoff_expr}" if cutoff_expr else "" bet_where = f"WHERE b.created_at >= {cutoff_expr}" if cutoff_expr else "" # 1. Summary stats summary_result = client.query( f"SELECT count(), sum(total_pot), toInt64(avg(total_pot)) FROM games {game_where}" ) sr = summary_result.result_rows[0] if summary_result.result_rows else (0, 0, 0) # Bet counts bet_summary = client.query( f"SELECT count(), countDistinct(user_id) FROM bets {'WHERE created_at >= ' + cutoff_expr if cutoff_expr else ''}" ) bs = bet_summary.result_rows[0] if bet_summary.result_rows else (0, 0) summary = { "total_games": sr[0], "total_volume": int(sr[1] or 0), "avg_pot": int(sr[2] or 0), "total_bets_placed": bs[0], "unique_bettors": bs[1], } # 2. Win distribution (chairs + bet rank) dist_result = client.query( f"SELECT winner, count() AS cnt FROM games {game_where} GROUP BY winner ORDER BY winner" ) chairs_dist = {"A": 0, "B": 0, "C": 0} for row in dist_result.result_rows: chair = config.CHAIRS.get(row[0], "?") if chair in chairs_dist: chairs_dist[chair] = row[1] rank_result = client.query( f""" SELECT countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high, countIf(winner_bet > least(bet_a, bet_b, bet_c) AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid, countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low FROM ( SELECT bet_a, bet_b, bet_c, multiIf(winner = 3, bet_a, winner = 2, bet_b, bet_c) AS winner_bet FROM games {game_where + ' AND' if game_where else 'WHERE'} bet_a + bet_b + bet_c > 0 ) """ ) bet_rank = {"high": 0, "mid": 0, "low": 0} if rank_result.result_rows: rr = rank_result.result_rows[0] bet_rank = {"high": rr[0], "mid": rr[1], "low": rr[2]} win_distribution = {"chairs": chairs_dist, "bet_rank": bet_rank} # 3. Hand type distribution (winning hand types) hand_type_result = client.query( f""" SELECT hand_type, count() AS cnt FROM ( SELECT multiIf(winner = 3, hand_type_a, winner = 2, hand_type_b, hand_type_c) AS hand_type FROM games {game_where} ) WHERE hand_type > 0 GROUP BY hand_type ORDER BY hand_type """ ) hand_type_distribution = {} for row in hand_type_result.result_rows: type_name = config.HAND_TYPES.get(row[0], f"Type {row[0]}") hand_type_distribution[type_name] = row[1] # 4. Leaderboard leaderboard_result = client.query( f""" SELECT b.user_id, any(u.nick_name) AS nick_name, count() AS total_bets, countIf(b.chair = g.winner) AS wins, countIf(b.chair != g.winner) AS losses, toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9 - sumIf(b.bet_amount, b.chair != g.winner)) AS pnl, sum(b.bet_amount) AS total_wagered FROM bets b JOIN games g ON b.game_no = g.game_no LEFT JOIN users u ON b.user_id = u.user_id {bet_where} GROUP BY b.user_id HAVING total_bets >= 3 ORDER BY pnl DESC LIMIT 20 """ ) leaderboard = [] for row in leaderboard_result.result_rows: leaderboard.append({ "user_id": row[0], "nick_name": row[1] or str(row[0]), "total_bets": row[2], "wins": row[3], "losses": row[4], "pnl": row[5], "total_wagered": row[6], }) # 5. Hourly volume hourly_result = client.query( f""" SELECT toStartOfHour(created_at) AS hour, count() AS games, sum(total_pot) AS volume FROM games {game_where} GROUP BY hour ORDER BY hour """ ) hourly_volume = [] for row in hourly_result.result_rows: hourly_volume.append({ "hour": str(row[0]), "games": row[1], "volume": int(row[2] or 0), }) # 6. Games list games_result = client.query( f""" SELECT game_no, winner, total_pot, bet_a, bet_b, bet_c, hand_a, hand_b, hand_c, hand_type_a, hand_type_b, hand_type_c, cards_json, duration_s, created_at FROM games {game_where} ORDER BY game_no DESC LIMIT 200 """ ) games = [] for row in games_result.result_rows: games.append({ "game_no": row[0], "winner": row[1], "total_pot": row[2], "bet_a": row[3], "bet_b": row[4], "bet_c": row[5], "hand_a": row[6], "hand_b": row[7], "hand_c": row[8], "hand_type_a": row[9], "hand_type_b": row[10], "hand_type_c": row[11], "cards_json": row[12], "duration_s": row[13], "created_at": str(row[14]), }) return { "summary": summary, "win_distribution": win_distribution, "hand_type_distribution": hand_type_distribution, "leaderboard": leaderboard, "hourly_volume": hourly_volume, "games": games, } @_with_lock def get_pattern_analysis() -> dict: """Run all pattern analysis queries and return a single dict.""" client = get_client() # 1. Chair win bias result = client.query( "SELECT winner, count() AS cnt FROM games GROUP BY winner ORDER BY winner" ) chair_wins = {} total_games = 0 for row in result.result_rows: chair = config.CHAIRS.get(row[0], "?") chair_wins[chair] = row[1] total_games += row[1] chair_bias = {"total_games": total_games} for ch in ("A", "B", "C"): wins = chair_wins.get(ch, 0) pct = round(wins / total_games * 100, 2) if total_games else 0 chair_bias[ch] = {"wins": wins, "pct": pct} # 2. Bet rank analysis — how often the highest/mid/lowest bet chair wins rank_result = client.query(""" SELECT countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high, countIf(winner_bet > least(bet_a, bet_b, bet_c) AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid, countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low FROM ( SELECT bet_a, bet_b, bet_c, multiIf(winner = 3, bet_a, winner = 2, bet_b, bet_c) AS winner_bet FROM games WHERE bet_a + bet_b + bet_c > 0 ) """) bet_rank = {"high": 0, "mid": 0, "low": 0} if rank_result.result_rows: r = rank_result.result_rows[0] bet_rank = {"high": r[0], "mid": r[1], "low": r[2]} # 3. Per-chair bet rank — when chair X has max bet, how often does X win? pcr = client.query(""" SELECT countIf(bet_a >= greatest(bet_a, bet_b, bet_c) AND bet_a > 0) AS a_high, countIf(bet_a >= greatest(bet_a, bet_b, bet_c) AND bet_a > 0 AND winner = 3) AS a_win, countIf(bet_b >= greatest(bet_a, bet_b, bet_c) AND bet_b > 0) AS b_high, countIf(bet_b >= greatest(bet_a, bet_b, bet_c) AND bet_b > 0 AND winner = 2) AS b_win, countIf(bet_c >= greatest(bet_a, bet_b, bet_c) AND bet_c > 0) AS c_high, countIf(bet_c >= greatest(bet_a, bet_b, bet_c) AND bet_c > 0 AND winner = 1) AS c_win FROM games WHERE bet_a + bet_b + bet_c > 0 """) per_chair_rank = {} if pcr.result_rows: r = pcr.result_rows[0] for i, ch in enumerate(("A", "B", "C")): has = r[i * 2] wins = r[i * 2 + 1] per_chair_rank[ch] = { "has_highest": has, "wins": wins, "win_pct": round(wins / has * 100, 2) if has else 0, } # 4. Hand type distribution by chair (all dealt hands, not just winners) ht_result = client.query(""" SELECT 'A' AS chair, hand_type_a AS ht, count() AS cnt FROM games WHERE hand_type_a > 0 GROUP BY ht UNION ALL SELECT 'B', hand_type_b, count() FROM games WHERE hand_type_b > 0 GROUP BY hand_type_b UNION ALL SELECT 'C', hand_type_c, count() FROM games WHERE hand_type_c > 0 GROUP BY hand_type_c ORDER BY chair, ht """) hand_types_by_chair = {"A": {}, "B": {}, "C": {}} for row in ht_result.result_rows: ch = row[0] type_name = config.HAND_TYPES.get(row[1], f"Type {row[1]}") hand_types_by_chair[ch][type_name] = row[2] # 5. Hand type win rates (winning hand type distribution) htw = client.query(""" SELECT hand_type, count() AS cnt FROM ( SELECT multiIf(winner = 3, hand_type_a, winner = 2, hand_type_b, hand_type_c) AS hand_type FROM games ) WHERE hand_type > 0 GROUP BY hand_type ORDER BY hand_type """) hand_type_wins = {} for row in htw.result_rows: type_name = config.HAND_TYPES.get(row[0], f"Type {row[0]}") hand_type_wins[type_name] = row[1] # 6. Pot size buckets — win rates by pot quartile qr = client.query(""" SELECT quantile(0.25)(total_pot) AS q1, quantile(0.5)(total_pot) AS q2, quantile(0.75)(total_pot) AS q3 FROM games """) pot_buckets = {} if qr.result_rows: q1, q2, q3 = int(qr.result_rows[0][0]), int(qr.result_rows[0][1]), int(qr.result_rows[0][2]) br = client.query(f""" SELECT multiIf( total_pot <= {q1}, 'small', total_pot <= {q2}, 'medium', total_pot <= {q3}, 'large', 'whale' ) AS bucket, winner, count() AS cnt FROM games GROUP BY bucket, winner """) for row in br.result_rows: bucket, chair_id, cnt = row[0], row[1], row[2] chair = config.CHAIRS.get(chair_id, "?") if bucket not in pot_buckets: pot_buckets[bucket] = {"A": 0, "B": 0, "C": 0, "total": 0} pot_buckets[bucket][chair] = cnt pot_buckets[bucket]["total"] += cnt pot_buckets["_ranges"] = { "small": f"0–{q1}", "medium": f"{q1+1}–{q2}", "large": f"{q2+1}–{q3}", "whale": f">{q3}", } # 7. Streak analysis — compute in Python from ordered winners streak_result = client.query( "SELECT winner FROM games ORDER BY game_no ASC" ) winners_list = [config.CHAIRS.get(r[0], "?") for r in streak_result.result_rows] streaks = {} for ch in ("A", "B", "C"): max_s = cur = 0 for w in winners_list: if w == ch: cur += 1 max_s = max(max_s, cur) else: cur = 0 # current streak from the end current = 0 for w in reversed(winners_list): if w == ch: current += 1 else: break streaks[ch] = {"max_streak": max_s, "current_streak": current} # 8. Hourly patterns — win rates by hour of day hr_result = client.query(""" SELECT toHour(created_at) AS hr, winner, count() AS cnt FROM games GROUP BY hr, winner ORDER BY hr, winner """) hourly = {} for row in hr_result.result_rows: h = str(row[0]) chair = config.CHAIRS.get(row[1], "?") if h not in hourly: hourly[h] = {"A": 0, "B": 0, "C": 0, "total": 0} hourly[h][chair] = row[2] hourly[h]["total"] += row[2] # 9. Recent (last 100) vs overall recent = client.query(""" SELECT winner, count() AS cnt FROM ( SELECT winner FROM games ORDER BY game_no DESC LIMIT 100 ) GROUP BY winner """) recent_dist = {"A": 0, "B": 0, "C": 0} recent_total = 0 for row in recent.result_rows: chair = config.CHAIRS.get(row[0], "?") if chair in recent_dist: recent_dist[chair] = row[1] recent_total += row[1] return { "chair_bias": chair_bias, "bet_rank": bet_rank, "per_chair_rank": per_chair_rank, "hand_types_by_chair": hand_types_by_chair, "hand_type_wins": hand_type_wins, "pot_buckets": pot_buckets, "streaks": streaks, "hourly": hourly, "recent_vs_all": { "recent": {"dist": recent_dist, "total": recent_total}, "all": { "dist": {ch: chair_bias[ch]["wins"] for ch in ("A", "B", "C")}, "total": total_games, }, }, } @_with_lock def get_hot_cold_players(n: int = 5) -> dict: """ Get players with highest and lowest P&L over their last 10 bets. Returns {"hot": [...], "cold": [...]}. """ client = get_client() sql = """ WITH ranked AS ( SELECT b.user_id, b.game_no, b.chair, b.bet_amount, g.winner, row_number() OVER (PARTITION BY b.user_id ORDER BY b.created_at DESC) AS rn FROM bets b JOIN games g ON b.game_no = g.game_no ), last10 AS ( SELECT * FROM ranked WHERE rn <= 10 ) SELECT l.user_id, any(u.nick_name) AS nick_name, count() AS total_bets, countIf(l.chair = l.winner) AS wins, toInt64(sumIf(l.bet_amount, l.chair = l.winner) * 1.9 - sumIf(l.bet_amount, l.chair != l.winner)) AS pnl FROM last10 l LEFT JOIN users u ON l.user_id = u.user_id GROUP BY l.user_id HAVING total_bets >= 5 ORDER BY pnl DESC """ result = client.query(sql) all_players = [] for row in result.result_rows: all_players.append({ "user_id": row[0], "nick_name": row[1] or str(row[0]), "total_bets": row[2], "wins": row[3], "pnl": row[4], }) hot = [p for p in all_players if p["pnl"] > 0][:n] cold = [p for p in all_players if p["pnl"] < 0][-n:] cold.reverse() # most negative first return {"hot": hot, "cold": cold}