commit 85f44e6a22cea2a6359bbad12c4ff73c6d4841b6 Author: Junaid Saeed Uppal Date: Sat Feb 21 22:36:40 2026 +0500 Initial commit: Teen Patti live monitor with analytics Live dashboard with real-time WebSocket updates, analytics page with time-filtered stats, ClickHouse storage, and Caddy reverse proxy. diff --git a/Caddyfile b/Caddyfile new file mode 100644 index 000000000..da7084f1e --- /dev/null +++ b/Caddyfile @@ -0,0 +1,9 @@ +:8443 { + tls internal + + reverse_proxy app:8765 { + transport http { + versions 1.1 + } + } +} diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..b2cd41122 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.13-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY app/ ./app/ +COPY static/ ./static/ +CMD ["python", "-m", "app.main"] diff --git a/README.md b/README.md new file mode 100644 index 000000000..41f94e62a --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +# Teen Patti Live Monitor + +Real-time Teen Patti game monitor with live dashboard and analytics. + +## Quick Start + +```bash +docker compose up --build +``` + +- **Live Dashboard:** https://localhost:8443 +- **Analytics:** https://localhost:8443/analytics diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/app/auth.py b/app/auth.py new file mode 100644 index 000000000..08cc1bfda --- /dev/null +++ b/app/auth.py @@ -0,0 +1,212 @@ +""" +Authentication and API helpers for StreamKar. +Handles auto_login, sv signature computation, and API calls. +""" + +import hashlib +import json +import logging +import ssl +import time +import urllib.parse +import urllib.request +from typing import Optional + +from . import config + +log = logging.getLogger(__name__) + +_nonce_counter = 0 +_ssl_ctx = ssl.create_default_context() + + +def _get_nonce() -> int: + global _nonce_counter + _nonce_counter += 1 + return _nonce_counter + + +def compute_sv(params_dict: dict) -> str: + """Compute the 'sv' signature verification parameter.""" + params = {k: str(v) for k, v in params_dict.items() if k != "sv"} + sorted_keys = sorted(params.keys(), key=str.lower) + parts = "" + last_value = "" + for i, key in enumerate(sorted_keys): + parts += key + ":" + if i == len(sorted_keys) - 1: + last_value = params[key] + else: + parts += params[key] + full_string = parts + last_value + config.EM5_SALT + md5_hash = hashlib.md5(full_string.encode()).digest() + value = int.from_bytes(md5_hash, "big") + result = [] + for i in range(26): + shift = 128 - 5 - (i * 5) + if shift >= 0: + idx = (value >> shift) & 0x1F + else: + idx = (value << (-shift)) & 0x1F + result.append(config.EM5_ALPHABET[idx]) + return "".join(result) + + +def auto_login() -> tuple[int, str]: + """Login using credentials file. Returns (userId, token).""" + with open(config.SK_CREDENTIALS_FILE) as f: + creds = json.load(f) + + payload = { + "platform": config.PLATFORM, + "a": config.APP_ID, + "c": int(config.CHANNEL_CODE), + "v": config.VERSION_CODE, + "l": "en", + "deviceModel": creds.get("deviceModel", "generic"), + "deviceUId": creds["deviceUId"], + "FuncTag": 40000002, + "loginType": creds.get("loginType", 48), + "autoLogin": 1, + "uuid": creds["uuid"], + } + if creds.get("sessionId"): + payload["sessionId"] = creds["sessionId"] + payload["sv"] = compute_sv(payload) + + payload_json = json.dumps(payload, separators=(",", ":")) + qs = urllib.parse.urlencode({"parameter": payload_json}) + url = f"{config.API_ENTRANCE}?{qs}" + + req = urllib.request.Request(url, headers={ + "User-Agent": "okhttp/4.9.3", + "Accept-Encoding": "gzip", + }) + + with urllib.request.urlopen(req, timeout=15, context=_ssl_ctx) as resp: + data = json.loads(resp.read()) + + tag_code = data.get("TagCode") + if tag_code != "00000000" or not data.get("token"): + raise RuntimeError(f"Login failed: TagCode={tag_code}") + + user_id = data["userId"] + token = data["token"] + log.info("Logged in: userId=%s", user_id) + return user_id, token + + +def build_auth_headers(user_id: int = 0, token: Optional[str] = None) -> dict: + """Build auth headers for native API requests.""" + headers = { + "AccessKeyId": config.ACCESS_KEY_ID, + "Nonce": str(_get_nonce()), + "Timestamp": str(int(time.time() * 1000)), + "platform": str(config.PLATFORM), + "a": str(config.APP_ID), + "c": config.CHANNEL_CODE, + "v": str(config.VERSION_CODE), + "l": "en", + "sk-kimi": config.SK_KIMI, + "userId": str(user_id), + "User-Agent": "okhttp/4.9.3", + "Accept-Encoding": "gzip", + } + if token: + headers["token"] = token + return headers + + +def get_socket_address(room_id: int, user_id: int = 0, token: Optional[str] = None) -> dict: + """Fetch WebSocket URL for a room via the socket-address API.""" + import requests + + params = { + "softVersion": 10080, + "c": config.CHANNEL_CODE, + "v": config.VERSION_CODE, + "appId": config.APP_ID, + "userId": user_id, + "roomId": room_id, + "platform": config.PLATFORM, + } + headers = build_auth_headers(user_id=user_id, token=token) + resp = requests.get(f"{config.SOCKET_BASE}/", params=params, headers=headers, timeout=15) + data = resp.json() + ws_url = data.get("ws", "") + if ws_url: + log.info("WebSocket URL: %s", ws_url) + else: + log.error("No WebSocket URL in response: %s", json.dumps(data)[:300]) + return data + + +def call_api(func_tag: int, user_id: int, token: str, extra: dict = None) -> dict: + """Call the game web API (meShowApi).""" + params = { + "FuncTag": func_tag, + "userId": user_id, + "token": token, + "platform": 1, + "c": 100, + "a": 1, + } + if extra: + params.update(extra) + qs = urllib.parse.urlencode({ + "parameter": json.dumps(params), + "_": str(int(time.time() * 1000)), + }) + url = f"{config.API_URL}?{qs}" + req = urllib.request.Request(url, headers={ + "User-Agent": "Mozilla/5.0 (Linux; Android 15; SM-G990E) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/131.0.0.0 Mobile Safari/537.36", + "Accept": "application/json", + }) + with urllib.request.urlopen(req, timeout=10, context=_ssl_ctx) as resp: + return json.loads(resp.read()) + + +def get_full_state(user_id: int, token: str) -> dict: + """FuncTag 86000041 — full game state.""" + return call_api(86000041, user_id, token) + + +def get_user_profile(target_user_id: int, user_id: int, token: str) -> dict | None: + """FuncTag 50001010 — user profile (native API). Returns parsed profile or None.""" + import requests + payload = { + "FuncTag": 50001010, + "userId": target_user_id, + "platform": config.PLATFORM, + "a": config.APP_ID, + "c": int(config.CHANNEL_CODE), + "v": config.VERSION_CODE, + "l": "en", + } + payload["sv"] = compute_sv(payload) + payload_json = json.dumps(payload, separators=(",", ":")) + headers = build_auth_headers(user_id=user_id, token=token) + resp = requests.get( + config.API_ENTRANCE, + params={"parameter": payload_json}, + headers=headers, + timeout=10, + ) + data = resp.json() + users = data.get("userList", []) + if users: + u = users[0] + return { + "user_id": target_user_id, + "nick_name": u.get("nickName", ""), + "rich_level": u.get("richLevel", 0), + "actor_level": u.get("actorLevel", 0), + "gender": u.get("gender", 0), + "consume_total": u.get("consumeTotal", 0), + "earn_total": u.get("earnTotal", 0), + "is_actor": u.get("isActor", 0), + "portrait": u.get("portrait", ""), + } + return None diff --git a/app/config.py b/app/config.py new file mode 100644 index 000000000..a8cd9f622 --- /dev/null +++ b/app/config.py @@ -0,0 +1,43 @@ +import os + +# StreamKar API constants (from decompiled APK) +ACCESS_KEY_ID = "cBdjSuFA3gi9Y92wKcCilh9NeloE" +SK_KIMI = "dsgafoaodhafpdnfopauifda352176sn" +APP_ID = 1 +VERSION_CODE = 1337 +VERSION_NAME = "9.18.0" +PLATFORM = 2 +CHANNEL_CODE = "12002" + +API_BASE = "https://api.loee.link" +SOCKET_BASE = "https://into1.loee.link" +API_ENTRANCE = f"{API_BASE}/meShow/entrance" +API_URL = "https://web.kktv9.com/meShowApi" + +DEFAULT_ROOM_ID = 668797247 +POLL_INTERVAL = 1.0 + +# sv signature constants +EM5_SALT = "cc16be4b:346c51d" +EM5_ALPHABET = "AB56DE3C8L2WF4UVM7JRSGPQYZTXK9HN" + +# MsgTags +MSGTAG_ROOM_LOGIN = 10010201 +MSGTAG_LOGIN_ACK = 10010202 +MSGTAG_GAME_WIN = 10010778 +MSGTAG_USER_BET = 10010795 + +# Game constants +SUITS = {1: "\u2660", 2: "\u2665", 3: "\u2663", 4: "\u2666"} +VALUES = {1: "A", 2: "2", 3: "3", 4: "4", 5: "5", 6: "6", 7: "7", + 8: "8", 9: "9", 10: "10", 11: "J", 12: "Q", 13: "K", 14: "A"} +HAND_TYPES = {1: "High Card", 2: "Pair", 3: "Flush", 4: "Straight", + 5: "Straight Flush", 6: "Trail"} +CHAIRS = {1: "A", 2: "B", 3: "C"} +STATUS_NAMES = {0: "NEW", 1: "BETTING", 2: "REVEALING", 3: "ENDED"} + +# Environment +CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "localhost") +CLICKHOUSE_PORT = int(os.environ.get("CLICKHOUSE_PORT", "8123")) +SK_CREDENTIALS_FILE = os.environ.get("SK_CREDENTIALS_FILE", "sk_credentials.json") +WEB_PORT = int(os.environ.get("WEB_PORT", "8765")) diff --git a/app/db.py b/app/db.py new file mode 100644 index 000000000..6d1de1e19 --- /dev/null +++ b/app/db.py @@ -0,0 +1,593 @@ +""" +ClickHouse database operations. +""" + +import json +import logging +import threading +import time +import clickhouse_connect +from . import config + +log = logging.getLogger(__name__) + +_client = None +_lock = threading.Lock() + + +def get_client(): + global _client + if _client is None: + for attempt in range(30): + try: + _client = clickhouse_connect.get_client( + host=config.CLICKHOUSE_HOST, + port=config.CLICKHOUSE_PORT, + ) + _client.ping() + log.info("Connected to ClickHouse at %s:%s", config.CLICKHOUSE_HOST, config.CLICKHOUSE_PORT) + return _client + except Exception as e: + log.warning("ClickHouse not ready (attempt %d): %s", attempt + 1, e) + time.sleep(2) + raise RuntimeError("Could not connect to ClickHouse") + return _client + + +def _with_lock(fn): + """Decorator to serialize all ClickHouse operations.""" + def wrapper(*args, **kwargs): + with _lock: + return fn(*args, **kwargs) + wrapper.__name__ = fn.__name__ + return wrapper + + +@_with_lock +def insert_game(game: dict): + """Insert a completed round into the games table.""" + client = get_client() + client.insert("games", + [[ + game["game_no"], + game["winner"], + game["total_pot"], + game.get("bet_a", 0), + game.get("bet_b", 0), + game.get("bet_c", 0), + game.get("hand_a", ""), + game.get("hand_b", ""), + game.get("hand_c", ""), + game.get("hand_type_a", 0), + game.get("hand_type_b", 0), + game.get("hand_type_c", 0), + game.get("cards_json", ""), + game.get("duration_s", 0), + ]], + column_names=[ + "game_no", "winner", "total_pot", + "bet_a", "bet_b", "bet_c", + "hand_a", "hand_b", "hand_c", + "hand_type_a", "hand_type_b", "hand_type_c", + "cards_json", "duration_s", + ], + ) + + +@_with_lock +def insert_bet(bet: dict): + """Insert an individual user bet into the bets table.""" + client = get_client() + client.insert("bets", + [[ + bet["game_no"], + bet["user_id"], + bet["chair"], + bet["bet_amount"], + bet["total_bet"], + ]], + column_names=["game_no", "user_id", "chair", "bet_amount", "total_bet"], + ) + + +@_with_lock +def upsert_user(user: dict): + """Insert/update a user profile.""" + client = get_client() + client.insert("users", + [[ + user["user_id"], + user.get("nick_name", ""), + user.get("rich_level", 0), + user.get("actor_level", 0), + user.get("gender", 0), + user.get("consume_total", 0), + user.get("earn_total", 0), + user.get("is_actor", 0), + user.get("portrait", ""), + ]], + column_names=[ + "user_id", "nick_name", "rich_level", "actor_level", + "gender", "consume_total", "earn_total", "is_actor", "portrait", + ], + ) + + +@_with_lock +def get_recent_games(n: int = 50) -> list[dict]: + """Get last N completed games.""" + client = get_client() + result = client.query( + "SELECT game_no, winner, total_pot, bet_a, bet_b, bet_c, " + "hand_a, hand_b, hand_c, hand_type_a, hand_type_b, hand_type_c, " + "cards_json, duration_s, created_at " + "FROM games ORDER BY game_no DESC LIMIT {n:UInt32}", + parameters={"n": n}, + ) + games = [] + for row in result.result_rows: + games.append({ + "game_no": row[0], + "winner": row[1], + "total_pot": row[2], + "bet_a": row[3], + "bet_b": row[4], + "bet_c": row[5], + "hand_a": row[6], + "hand_b": row[7], + "hand_c": row[8], + "hand_type_a": row[9], + "hand_type_b": row[10], + "hand_type_c": row[11], + "cards_json": row[12], + "duration_s": row[13], + "created_at": str(row[14]), + }) + return games + + +@_with_lock +def get_leaderboard(n: int = 10) -> list[dict]: + """ + Get top N users by P&L. + P&L = sum of winning bets * 1.9 - sum of losing bets. + With 2.9x fixed payout: win → +1.9x bet, loss → -1.0x bet. + """ + client = get_client() + result = client.query( + """ + SELECT + b.user_id, + any(u.nick_name) AS nick_name, + count() AS total_bets, + countIf(b.chair = g.winner) AS wins, + countIf(b.chair != g.winner) AS losses, + toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9 + - sumIf(b.bet_amount, b.chair != g.winner)) AS pnl, + sum(b.bet_amount) AS total_wagered + FROM bets b + JOIN games g ON b.game_no = g.game_no + LEFT JOIN users u ON b.user_id = u.user_id + GROUP BY b.user_id + HAVING total_bets >= 3 + ORDER BY pnl DESC + LIMIT {n:UInt32} + """, + parameters={"n": n}, + ) + leaders = [] + for row in result.result_rows: + leaders.append({ + "user_id": row[0], + "nick_name": row[1] or str(row[0]), + "total_bets": row[2], + "wins": row[3], + "losses": row[4], + "pnl": row[5], + "total_wagered": row[6], + }) + return leaders + + +@_with_lock +def get_win_distribution() -> dict: + """Get win counts per chair + bet rank distribution.""" + client = get_client() + result = client.query( + "SELECT winner, count() AS cnt FROM games GROUP BY winner ORDER BY winner" + ) + dist = {"A": 0, "B": 0, "C": 0} + for row in result.result_rows: + chair = config.CHAIRS.get(row[0], "?") + if chair in dist: + dist[chair] = row[1] + + # Bet rank distribution: how often the winning chair had high/mid/low bet + rank_result = client.query( + """ + SELECT + countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high, + countIf(winner_bet > least(bet_a, bet_b, bet_c) + AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid, + countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low + FROM ( + SELECT + bet_a, bet_b, bet_c, + multiIf(winner = 1, bet_a, winner = 2, bet_b, bet_c) AS winner_bet + FROM games + WHERE bet_a + bet_b + bet_c > 0 + ) + """ + ) + bet_rank = {"high": 0, "mid": 0, "low": 0} + if rank_result.result_rows: + row = rank_result.result_rows[0] + bet_rank = {"high": row[0], "mid": row[1], "low": row[2]} + + return {"chairs": dist, "bet_rank": bet_rank} + + +@_with_lock +def get_user_name(user_id: int) -> str | None: + """Lookup user nickname from cache.""" + client = get_client() + result = client.query( + "SELECT nick_name FROM users WHERE user_id = {uid:UInt64} LIMIT 1", + parameters={"uid": user_id}, + ) + if result.result_rows: + return result.result_rows[0][0] or None + return None + + +@_with_lock +def get_user_detail(user_id: int) -> dict | None: + """Get full user profile + session betting stats.""" + client = get_client() + # Profile + result = client.query( + "SELECT user_id, nick_name, rich_level, actor_level, gender, " + "consume_total, earn_total, is_actor, portrait " + "FROM users WHERE user_id = {uid:UInt64} LIMIT 1", + parameters={"uid": user_id}, + ) + if not result.result_rows: + return None + row = result.result_rows[0] + profile = { + "user_id": row[0], + "nick_name": row[1], + "rich_level": row[2], + "actor_level": row[3], + "gender": row[4], + "consume_total": row[5], + "earn_total": row[6], + "is_actor": row[7], + "portrait": row[8], + } + + # Session betting stats + stats_result = client.query( + """ + SELECT + count() AS total_bets, + sum(b.bet_amount) AS total_wagered, + countDistinct(b.game_no) AS rounds_played, + countIf(b.chair = g.winner) AS wins, + countIf(b.chair != g.winner) AS losses, + toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9 + - sumIf(b.bet_amount, b.chair != g.winner)) AS pnl + FROM bets b + LEFT JOIN games g ON b.game_no = g.game_no + WHERE b.user_id = {uid:UInt64} + """, + parameters={"uid": user_id}, + ) + if stats_result.result_rows: + sr = stats_result.result_rows[0] + profile["total_bets"] = sr[0] + profile["total_wagered"] = sr[1] + profile["rounds_played"] = sr[2] + profile["wins"] = sr[3] + profile["losses"] = sr[4] + profile["pnl"] = sr[5] + + # Recent bets + bets_result = client.query( + """ + SELECT b.game_no, b.chair, b.bet_amount, b.total_bet, + g.winner, b.created_at + FROM bets b + LEFT JOIN games g ON b.game_no = g.game_no + WHERE b.user_id = {uid:UInt64} + ORDER BY b.created_at DESC + LIMIT 20 + """, + parameters={"uid": user_id}, + ) + profile["recent_bets"] = [] + for br in bets_result.result_rows: + profile["recent_bets"].append({ + "game_no": br[0], + "chair": br[1], + "chair_name": config.CHAIRS.get(br[1], "?"), + "bet_amount": br[2], + "total_bet": br[3], + "winner": br[4], + "won": br[1] == br[4] if br[4] else None, + "created_at": str(br[5]), + }) + + return profile + + +@_with_lock +def get_biggest_winner() -> dict | None: + """Get the single biggest winner by P&L this session.""" + client = get_client() + result = client.query( + """ + SELECT + b.user_id, + any(u.nick_name) AS nick_name, + any(u.portrait) AS portrait, + any(u.rich_level) AS rich_level, + count() AS total_bets, + countIf(b.chair = g.winner) AS wins, + toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9 + - sumIf(b.bet_amount, b.chair != g.winner)) AS pnl, + sum(b.bet_amount) AS total_wagered + FROM bets b + JOIN games g ON b.game_no = g.game_no + LEFT JOIN users u ON b.user_id = u.user_id + GROUP BY b.user_id + HAVING total_bets >= 3 + ORDER BY pnl DESC + LIMIT 1 + """ + ) + if result.result_rows: + row = result.result_rows[0] + return { + "user_id": row[0], + "nick_name": row[1] or str(row[0]), + "portrait": row[2] or "", + "rich_level": row[3], + "total_bets": row[4], + "wins": row[5], + "pnl": row[6], + "total_wagered": row[7], + } + return None + + +@_with_lock +def get_analytics(period: str = "all") -> dict: + """Get all analytics data for a given time period.""" + client = get_client() + + intervals = { + "1h": "now() - INTERVAL 1 HOUR", + "6h": "now() - INTERVAL 6 HOUR", + "24h": "now() - INTERVAL 24 HOUR", + "7d": "now() - INTERVAL 7 DAY", + } + cutoff_expr = intervals.get(period) + game_where = f"WHERE created_at >= {cutoff_expr}" if cutoff_expr else "" + bet_where = f"WHERE b.created_at >= {cutoff_expr}" if cutoff_expr else "" + + # 1. Summary stats + summary_result = client.query( + f"SELECT count(), sum(total_pot), toInt64(avg(total_pot)) FROM games {game_where}" + ) + sr = summary_result.result_rows[0] if summary_result.result_rows else (0, 0, 0) + # Bet counts + bet_summary = client.query( + f"SELECT count(), countDistinct(user_id) FROM bets {'WHERE created_at >= ' + cutoff_expr if cutoff_expr else ''}" + ) + bs = bet_summary.result_rows[0] if bet_summary.result_rows else (0, 0) + summary = { + "total_games": sr[0], + "total_volume": int(sr[1] or 0), + "avg_pot": int(sr[2] or 0), + "total_bets_placed": bs[0], + "unique_bettors": bs[1], + } + + # 2. Win distribution (chairs + bet rank) + dist_result = client.query( + f"SELECT winner, count() AS cnt FROM games {game_where} GROUP BY winner ORDER BY winner" + ) + chairs_dist = {"A": 0, "B": 0, "C": 0} + for row in dist_result.result_rows: + chair = config.CHAIRS.get(row[0], "?") + if chair in chairs_dist: + chairs_dist[chair] = row[1] + + rank_result = client.query( + f""" + SELECT + countIf(winner_bet >= greatest(bet_a, bet_b, bet_c)) AS high, + countIf(winner_bet > least(bet_a, bet_b, bet_c) + AND winner_bet < greatest(bet_a, bet_b, bet_c)) AS mid, + countIf(winner_bet <= least(bet_a, bet_b, bet_c)) AS low + FROM ( + SELECT + bet_a, bet_b, bet_c, + multiIf(winner = 1, bet_a, winner = 2, bet_b, bet_c) AS winner_bet + FROM games + {game_where + ' AND' if game_where else 'WHERE'} bet_a + bet_b + bet_c > 0 + ) + """ + ) + bet_rank = {"high": 0, "mid": 0, "low": 0} + if rank_result.result_rows: + rr = rank_result.result_rows[0] + bet_rank = {"high": rr[0], "mid": rr[1], "low": rr[2]} + + win_distribution = {"chairs": chairs_dist, "bet_rank": bet_rank} + + # 3. Hand type distribution (winning hand types) + hand_type_result = client.query( + f""" + SELECT hand_type, count() AS cnt FROM ( + SELECT multiIf(winner = 1, hand_type_a, winner = 2, hand_type_b, hand_type_c) AS hand_type + FROM games + {game_where} + ) + WHERE hand_type > 0 + GROUP BY hand_type + ORDER BY hand_type + """ + ) + hand_type_distribution = {} + for row in hand_type_result.result_rows: + type_name = config.HAND_TYPES.get(row[0], f"Type {row[0]}") + hand_type_distribution[type_name] = row[1] + + # 4. Leaderboard + leaderboard_result = client.query( + f""" + SELECT + b.user_id, + any(u.nick_name) AS nick_name, + count() AS total_bets, + countIf(b.chair = g.winner) AS wins, + countIf(b.chair != g.winner) AS losses, + toInt64(sumIf(b.bet_amount, b.chair = g.winner) * 1.9 + - sumIf(b.bet_amount, b.chair != g.winner)) AS pnl, + sum(b.bet_amount) AS total_wagered + FROM bets b + JOIN games g ON b.game_no = g.game_no + LEFT JOIN users u ON b.user_id = u.user_id + {bet_where} + GROUP BY b.user_id + HAVING total_bets >= 3 + ORDER BY pnl DESC + LIMIT 20 + """ + ) + leaderboard = [] + for row in leaderboard_result.result_rows: + leaderboard.append({ + "user_id": row[0], + "nick_name": row[1] or str(row[0]), + "total_bets": row[2], + "wins": row[3], + "losses": row[4], + "pnl": row[5], + "total_wagered": row[6], + }) + + # 5. Hourly volume + hourly_result = client.query( + f""" + SELECT + toStartOfHour(created_at) AS hour, + count() AS games, + sum(total_pot) AS volume + FROM games + {game_where} + GROUP BY hour + ORDER BY hour + """ + ) + hourly_volume = [] + for row in hourly_result.result_rows: + hourly_volume.append({ + "hour": str(row[0]), + "games": row[1], + "volume": int(row[2] or 0), + }) + + # 6. Games list + games_result = client.query( + f""" + SELECT game_no, winner, total_pot, bet_a, bet_b, bet_c, + hand_a, hand_b, hand_c, hand_type_a, hand_type_b, hand_type_c, + cards_json, duration_s, created_at + FROM games + {game_where} + ORDER BY game_no DESC + LIMIT 200 + """ + ) + games = [] + for row in games_result.result_rows: + games.append({ + "game_no": row[0], + "winner": row[1], + "total_pot": row[2], + "bet_a": row[3], + "bet_b": row[4], + "bet_c": row[5], + "hand_a": row[6], + "hand_b": row[7], + "hand_c": row[8], + "hand_type_a": row[9], + "hand_type_b": row[10], + "hand_type_c": row[11], + "cards_json": row[12], + "duration_s": row[13], + "created_at": str(row[14]), + }) + + return { + "summary": summary, + "win_distribution": win_distribution, + "hand_type_distribution": hand_type_distribution, + "leaderboard": leaderboard, + "hourly_volume": hourly_volume, + "games": games, + } + + +@_with_lock +def get_hot_cold_players(n: int = 5) -> dict: + """ + Get players with highest and lowest P&L over their last 10 bets. + Returns {"hot": [...], "cold": [...]}. + """ + client = get_client() + sql = """ + WITH ranked AS ( + SELECT + b.user_id, + b.game_no, + b.chair, + b.bet_amount, + g.winner, + row_number() OVER (PARTITION BY b.user_id ORDER BY b.created_at DESC) AS rn + FROM bets b + JOIN games g ON b.game_no = g.game_no + ), + last10 AS ( + SELECT * FROM ranked WHERE rn <= 10 + ) + SELECT + l.user_id, + any(u.nick_name) AS nick_name, + count() AS total_bets, + countIf(l.chair = l.winner) AS wins, + toInt64(sumIf(l.bet_amount, l.chair = l.winner) * 1.9 + - sumIf(l.bet_amount, l.chair != l.winner)) AS pnl + FROM last10 l + LEFT JOIN users u ON l.user_id = u.user_id + GROUP BY l.user_id + HAVING total_bets >= 5 + ORDER BY pnl DESC + """ + result = client.query(sql) + all_players = [] + for row in result.result_rows: + all_players.append({ + "user_id": row[0], + "nick_name": row[1] or str(row[0]), + "total_bets": row[2], + "wins": row[3], + "pnl": row[4], + }) + hot = [p for p in all_players if p["pnl"] > 0][:n] + cold = [p for p in all_players if p["pnl"] < 0][-n:] + cold.reverse() # most negative first + return {"hot": hot, "cold": cold} diff --git a/app/game_poller.py b/app/game_poller.py new file mode 100644 index 000000000..b0b0d1292 --- /dev/null +++ b/app/game_poller.py @@ -0,0 +1,233 @@ +""" +HTTP polling for Teen Patti game state. +Polls FuncTag 86000041 every 3 seconds, tracks game phases, +inserts completed rounds into ClickHouse, broadcasts to browser. +""" + +import asyncio +import json +import logging +import time +from datetime import datetime + +from . import config +from .auth import auto_login, get_full_state +from . import db + +log = logging.getLogger(__name__) + + +def _format_card(card: dict) -> str: + v = config.VALUES.get(card["cardValue"], str(card["cardValue"])) + s = config.SUITS.get(card["cardColor"], "?") + return f"{v}{s}" + + +def _format_hand(cards: list[dict]) -> str: + return " ".join(_format_card(c) for c in cards) + + +class GamePoller: + def __init__(self, broadcast_fn, push_refresh_fn=None): + self.broadcast = broadcast_fn + self.push_refresh = push_refresh_fn + self.user_id = 0 + self.token = "" + self.should_run = True + self.current_round = None + self.last_status = None + self.round_data: dict = {} + self.errors = 0 + self.rounds_recorded = 0 + + def _login(self): + self.user_id, self.token = auto_login() + log.info("Poller logged in: userId=%s", self.user_id) + + def _poll(self) -> dict | None: + try: + data = get_full_state(self.user_id, self.token) + tag = data.get("TagCode", "") + if tag == "30001005": + log.warning("Token expired, re-authenticating...") + self._login() + data = get_full_state(self.user_id, self.token) + self.errors = 0 + return data + except Exception as e: + self.errors += 1 + log.warning("Poll failed (%dx): %s", self.errors, e) + if self.errors > 10: + log.error("Too many consecutive errors, re-authenticating") + try: + self._login() + except Exception: + pass + self.errors = 0 + return None + + async def _process(self, data: dict): + gi = data.get("gameInfo", {}) + gn = gi.get("gameNo") + gs = gi.get("gameStatus") + init = data.get("initData", {}) + + if gn is None: + return + + # New round detected + if gn != self.current_round: + # Save previous round if complete + if self.round_data.get("gameResult"): + self._save_round() + # Push updated stats over WS (non-blocking) + if self.push_refresh: + await self.push_refresh() + + self.current_round = gn + self.last_status = None + self.round_data = { + "gameNo": gn, + "time_start_ts": time.time(), + } + log.info("Round #%s started", gn) + + # Build current state for browser + end_time = gi.get("endTime", 0) + current_time = init.get("currentTime", int(time.time() * 1000)) + remaining_ms = max(0, end_time - current_time) + + bets = gi.get("betInfos", {}).get("betInfoArry", []) + total_pot = gi.get("betInfos", {}).get("total", 0) + bet_map = {} + for b in bets: + chair = b["country"] + bet_map[config.CHAIRS.get(chair, "?")] = int(b.get("totalBet", 0) or 0) + + # Cards (available in REVEALING/ENDED) + cards = {} + for p in gi.get("playerCardInfos", []): + chair = config.CHAIRS.get(p["country"], "?") + cards[chair] = { + "hand": _format_hand(p["cards"]), + "hand_type": config.HAND_TYPES.get(p["cardType"], "?"), + "hand_type_id": p["cardType"], + "cards_raw": p["cards"], + } + + game_state = { + "game_no": gn, + "status": gs, + "status_name": config.STATUS_NAMES.get(gs, "?"), + "remaining_s": round(remaining_ms / 1000, 1), + "total_pot": total_pot, + "bets": bet_map, + "cards": cards, + "winner": gi.get("gameResult"), + "winner_name": config.CHAIRS.get(gi.get("gameResult"), None), + } + + # Broadcast current state + await self.broadcast("game_state", game_state) + + # Status transitions + if gs != self.last_status: + prev = self.last_status + self.last_status = gs + + if gs == 2: + # Revealing — capture result + self._capture_result(gi) + await self.broadcast("cards_revealed", { + "game_no": gn, + "cards": cards, + "winner": gi.get("gameResult"), + "winner_name": config.CHAIRS.get(gi.get("gameResult"), "?"), + }) + + elif gs == 3: + # Ended + self._capture_result(gi) + self.round_data["time_end_ts"] = time.time() + start_ts = self.round_data.get("time_start_ts", 0) + self.round_data["duration_s"] = round(time.time() - start_ts) + + await self.broadcast("round_result", { + "game_no": gn, + "winner": gi.get("gameResult"), + "winner_name": config.CHAIRS.get(gi.get("gameResult"), "?"), + "total_pot": total_pot, + "bets": bet_map, + "cards": cards, + "duration_s": self.round_data["duration_s"], + }) + # Push updated stats over WS + if self.push_refresh: + await self.push_refresh() + + def _capture_result(self, gi: dict): + self.round_data["gameResult"] = gi.get("gameResult") + + bets = gi.get("betInfos", {}).get("betInfoArry", []) + self.round_data["total_pot"] = gi.get("betInfos", {}).get("total", 0) + for b in bets: + chair = b["country"] + self.round_data[f"bet_{config.CHAIRS.get(chair, '?').lower()}"] = ( + int(b.get("totalBet", 0) or 0) + ) + + for p in gi.get("playerCardInfos", []): + chair_name = config.CHAIRS.get(p["country"], "?").lower() + self.round_data[f"hand_{chair_name}"] = _format_hand(p["cards"]) + self.round_data[f"hand_type_{chair_name}"] = p["cardType"] + + self.round_data["cards_json"] = json.dumps(gi.get("playerCardInfos", [])) + + def _save_round(self): + rd = self.round_data + if not rd.get("gameResult"): + return + try: + db.insert_game({ + "game_no": rd["gameNo"], + "winner": rd["gameResult"], + "total_pot": rd.get("total_pot", 0), + "bet_a": rd.get("bet_a", 0), + "bet_b": rd.get("bet_b", 0), + "bet_c": rd.get("bet_c", 0), + "hand_a": rd.get("hand_a", ""), + "hand_b": rd.get("hand_b", ""), + "hand_c": rd.get("hand_c", ""), + "hand_type_a": rd.get("hand_type_a", 0), + "hand_type_b": rd.get("hand_type_b", 0), + "hand_type_c": rd.get("hand_type_c", 0), + "cards_json": rd.get("cards_json", ""), + "duration_s": rd.get("duration_s", 0), + }) + self.rounds_recorded += 1 + log.info("Round #%s saved (%d total)", rd["gameNo"], self.rounds_recorded) + except Exception as e: + log.error("Failed to save round #%s: %s", rd["gameNo"], e) + + async def run(self): + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._login) + log.info("Game poller started (interval=%ss)", config.POLL_INTERVAL) + + while self.should_run: + data = await loop.run_in_executor(None, self._poll) + if data: + await self._process(data) + await asyncio.sleep(config.POLL_INTERVAL) + + # Save last round + if self.round_data.get("gameResult"): + self.round_data.setdefault("time_end_ts", time.time()) + start_ts = self.round_data.get("time_start_ts", 0) + self.round_data.setdefault("duration_s", round(time.time() - start_ts)) + self._save_round() + + log.info("Poller stopped. %d rounds recorded.", self.rounds_recorded) + + def stop(self): + self.should_run = False diff --git a/app/main.py b/app/main.py new file mode 100644 index 000000000..b8d801b85 --- /dev/null +++ b/app/main.py @@ -0,0 +1,61 @@ +""" +Entry point — starts all async tasks concurrently: +1. StreamKar WebSocket client (anonymous, captures user bets) +2. Game state HTTP poller (authenticated, captures full game state) +3. aiohttp web server (serves dashboard, pushes events to browsers) +""" + +import asyncio +import logging +import signal + +from .server import WebServer +from .streamkar_ws import StreamKarWSClient +from .game_poller import GamePoller + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + datefmt="%H:%M:%S", +) +log = logging.getLogger(__name__) + + +async def main(): + server = WebServer() + ws_client = StreamKarWSClient(broadcast_fn=server.broadcast) + poller = GamePoller(broadcast_fn=server.broadcast, push_refresh_fn=server.push_refresh) + + loop = asyncio.get_event_loop() + + def shutdown(): + log.info("Shutting down...") + ws_client.stop() + poller.stop() + for task in asyncio.all_tasks(loop): + task.cancel() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, shutdown) + + log.info("Starting Teen Patti Live Monitor") + log.info("Dashboard: http://localhost:8765") + + tasks = [ + asyncio.create_task(server.run(), name="web_server"), + asyncio.create_task(poller.run(), name="game_poller"), + asyncio.create_task(ws_client.run(), name="ws_client"), + ] + + try: + await asyncio.gather(*tasks) + except asyncio.CancelledError: + pass + except Exception as e: + log.error("Fatal: %s", e) + finally: + log.info("Stopped.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/app/server.py b/app/server.py new file mode 100644 index 000000000..cb54ba625 --- /dev/null +++ b/app/server.py @@ -0,0 +1,191 @@ +""" +aiohttp web server + browser WebSocket. +Serves the dashboard and pushes real-time events to connected browsers. +All blocking DB calls run in a thread executor to avoid blocking the event loop. +""" + +import asyncio +import json +import logging +import os +from functools import partial + +from aiohttp import web + +from . import config +from . import db + +log = logging.getLogger(__name__) + +STATIC_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "static") + + +async def _run_sync(fn, *args): + """Run a blocking function in the default thread executor.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, partial(fn, *args)) + + +class WebServer: + def __init__(self): + self.app = web.Application() + self.clients: set[web.WebSocketResponse] = set() + self._setup_routes() + + def _setup_routes(self): + self.app.router.add_get("/", self._handle_index) + self.app.router.add_get("/api/history", self._handle_history) + self.app.router.add_get("/api/leaderboard", self._handle_leaderboard) + self.app.router.add_get("/api/stats", self._handle_stats) + self.app.router.add_get("/api/user/{user_id}", self._handle_user) + self.app.router.add_get("/api/hot-cold", self._handle_hot_cold) + self.app.router.add_get("/analytics", self._handle_analytics_page) + self.app.router.add_get("/api/analytics", self._handle_analytics) + self.app.router.add_get("/ws", self._handle_ws) + self.app.router.add_static("/static/", STATIC_DIR, name="static") + + async def _handle_index(self, request: web.Request) -> web.Response: + index_path = os.path.join(STATIC_DIR, "index.html") + return web.FileResponse(index_path) + + async def _handle_history(self, request: web.Request) -> web.Response: + n = int(request.query.get("n", 50)) + try: + games = await _run_sync(db.get_recent_games, n) + return web.json_response(games) + except Exception as e: + log.error("History query failed: %s", e) + return web.json_response([], status=500) + + async def _handle_leaderboard(self, request: web.Request) -> web.Response: + n = int(request.query.get("n", 10)) + try: + leaders = await _run_sync(db.get_leaderboard, n) + return web.json_response(leaders) + except Exception as e: + log.error("Leaderboard query failed: %s", e) + return web.json_response([], status=500) + + async def _handle_stats(self, request: web.Request) -> web.Response: + try: + dist = await _run_sync(db.get_win_distribution) + biggest = await _run_sync(db.get_biggest_winner) + return web.json_response({"win_distribution": dist, "biggest_winner": biggest}) + except Exception as e: + log.error("Stats query failed: %s", e) + return web.json_response({}, status=500) + + async def _handle_user(self, request: web.Request) -> web.Response: + uid = int(request.match_info["user_id"]) + try: + detail = await _run_sync(db.get_user_detail, uid) + if detail: + return web.json_response(detail) + return web.json_response({"error": "User not found"}, status=404) + except Exception as e: + log.error("User detail query failed: %s", e) + return web.json_response({"error": str(e)}, status=500) + + async def _handle_analytics_page(self, request: web.Request) -> web.Response: + path = os.path.join(STATIC_DIR, "analytics.html") + return web.FileResponse(path) + + async def _handle_analytics(self, request: web.Request) -> web.Response: + period = request.query.get("period", "all") + if period not in ("1h", "6h", "24h", "7d", "all"): + return web.json_response({"error": "Invalid period"}, status=400) + try: + data = await _run_sync(db.get_analytics, period) + return web.json_response(data) + except Exception as e: + log.error("Analytics query failed: %s", e) + return web.json_response({"error": str(e)}, status=500) + + async def _handle_hot_cold(self, request: web.Request) -> web.Response: + try: + data = await _run_sync(db.get_hot_cold_players) + return web.json_response(data) + except Exception as e: + log.error("Hot/cold query failed: %s", e) + return web.json_response({"hot": [], "cold": []}, status=500) + + async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse: + ws = web.WebSocketResponse() + await ws.prepare(request) + self.clients.add(ws) + log.info("Browser connected (%d total)", len(self.clients)) + + # Send initial data (all DB calls in executor) + try: + if not ws.closed: + games = await _run_sync(db.get_recent_games, 50) + await ws.send_json({"type": "history", "data": games}) + if not ws.closed: + dist = await _run_sync(db.get_win_distribution) + await ws.send_json({"type": "win_distribution", "data": dist}) + if not ws.closed: + leaders = await _run_sync(db.get_leaderboard, 10) + await ws.send_json({"type": "leaderboard", "data": leaders}) + if not ws.closed: + biggest = await _run_sync(db.get_biggest_winner) + if biggest: + await ws.send_json({"type": "biggest_winner", "data": biggest}) + if not ws.closed: + hot_cold = await _run_sync(db.get_hot_cold_players) + await ws.send_json({"type": "hot_cold", "data": hot_cold}) + except Exception as e: + log.warning("Failed to send initial data: %s", e) + + try: + async for msg in ws: + pass # Browser doesn't send data, just listens + finally: + self.clients.discard(ws) + log.info("Browser disconnected (%d remaining)", len(self.clients)) + + return ws + + async def broadcast(self, event_type: str, data: dict): + """Push event to all connected browsers.""" + if not self.clients: + return + payload = json.dumps({"type": event_type, "data": data}) + stale = set() + for ws in self.clients: + try: + await ws.send_str(payload) + except Exception: + stale.add(ws) + self.clients -= stale + + async def push_refresh(self): + """Compute stats in executor and push over WS — no browser HTTP needed.""" + try: + dist, leaders, biggest, hot_cold = await asyncio.gather( + _run_sync(db.get_win_distribution), + _run_sync(db.get_leaderboard, 10), + _run_sync(db.get_biggest_winner), + _run_sync(db.get_hot_cold_players), + ) + await self.broadcast("win_distribution", dist) + await self.broadcast("leaderboard", leaders) + if biggest: + await self.broadcast("biggest_winner", biggest) + await self.broadcast("hot_cold", hot_cold) + except Exception as e: + log.warning("push_refresh failed: %s", e) + + async def run(self): + runner = web.AppRunner(self.app) + await runner.setup() + site = web.TCPSite(runner, "0.0.0.0", config.WEB_PORT) + await site.start() + log.info("Web server listening on http://0.0.0.0:%s", config.WEB_PORT) + # Keep running until cancelled + try: + while True: + await asyncio.sleep(3600) + except asyncio.CancelledError: + pass + finally: + await runner.cleanup() diff --git a/app/streamkar_ws.py b/app/streamkar_ws.py new file mode 100644 index 000000000..a847cf158 --- /dev/null +++ b/app/streamkar_ws.py @@ -0,0 +1,187 @@ +""" +WebSocket client to StreamKar game room. +Connects anonymously, captures USER_BET and GAME_WIN events. +""" + +import asyncio +import json +import logging +import time +from datetime import datetime, timezone + +import websockets + +from . import config +from .auth import get_socket_address, auto_login, get_user_profile +from . import db + +log = logging.getLogger(__name__) + + +class StreamKarWSClient: + def __init__(self, broadcast_fn): + """ + broadcast_fn: async callable(event_type: str, data: dict) to push to browser clients. + """ + self.broadcast = broadcast_fn + self.room_id = config.DEFAULT_ROOM_ID + self.should_run = True + self.reconnect_delay = 2.0 + self._known_users: set[int] = set() + self._user_id = 0 + self._token = None + self._session_wagered: dict[int, int] = {} # user_id -> total wagered this session + + async def _ensure_auth(self): + if not self._token: + self._user_id, self._token = auto_login() + + async def _fetch_user_profile(self, target_uid: int): + """Fetch and cache user profile in background.""" + if target_uid in self._known_users or target_uid == 0: + return + self._known_users.add(target_uid) + try: + await self._ensure_auth() + profile = get_user_profile(target_uid, self._user_id, self._token) + if profile: + db.upsert_user(profile) + log.debug("Fetched profile for %s: %s", target_uid, profile.get("nick_name")) + except Exception as e: + log.warning("Failed to fetch profile for %s: %s", target_uid, e) + + def _build_login_msg(self, room_source: int) -> dict: + return { + "p": config.PLATFORM, + "a": config.APP_ID, + "v": config.VERSION_CODE, + "l": "en", + "MsgTag": config.MSGTAG_ROOM_LOGIN, + "roomId": self.room_id, + "roomSource": room_source, + "appId": config.APP_ID, + "softVersion": str(config.VERSION_CODE), + "container": 1, + "enterFrom": "room_list", + } + + async def _handle_message(self, msg: dict): + tag = msg.get("MsgTag") + if tag is None: + return + tag = int(tag) + + if tag == config.MSGTAG_USER_BET: + bet_info = msg.get("betInfo", {}) + uid = msg.get("userId", 0) + game_no = msg.get("gameNo", 0) + chair = bet_info.get("betCard", 0) + amount = bet_info.get("betAmount", 0) + total = bet_info.get("totalBet", 0) + + # Insert into ClickHouse + try: + db.insert_bet({ + "game_no": game_no, + "user_id": uid, + "chair": chair, + "bet_amount": amount, + "total_bet": total, + }) + except Exception as e: + log.warning("Failed to insert bet: %s", e) + + # Track session wagered + self._session_wagered[uid] = self._session_wagered.get(uid, 0) + amount + + # Resolve user name + nick = db.get_user_name(uid) if uid else None + # Fetch profile async if unknown + if not nick and uid: + asyncio.create_task(self._fetch_user_profile(uid)) + + # Broadcast to browser + await self.broadcast("user_bet", { + "game_no": game_no, + "user_id": uid, + "nick_name": nick or str(uid), + "chair": chair, + "chair_name": config.CHAIRS.get(chair, "?"), + "bet_amount": amount, + "total_bet": total, + "session_wagered": self._session_wagered.get(uid, 0), + "timestamp": datetime.now(timezone.utc).isoformat(), + }) + + elif tag == config.MSGTAG_GAME_WIN: + log.info("GAME_WIN broadcast received") + await self.broadcast("game_win_ws", { + "raw": {k: v for k, v in msg.items() + if k not in ("p", "a", "v", "l", "MsgTag")}, + "timestamp": datetime.now(timezone.utc).isoformat(), + }) + + elif tag == config.MSGTAG_LOGIN_ACK: + log.info("WS login acknowledged — receiving room broadcasts") + + async def _connect_and_listen(self, ws_url: str, room_source: int): + log.info("Connecting to %s ...", ws_url) + try: + async with websockets.connect( + ws_url, + additional_headers={"User-Agent": "okhttp/4.9.3"}, + ping_interval=30, + ping_timeout=10, + close_timeout=5, + ) as ws: + self.reconnect_delay = 2.0 + login_msg = self._build_login_msg(room_source) + await ws.send(json.dumps(login_msg)) + log.info("WS login sent for room %s", self.room_id) + + async for raw in ws: + if not self.should_run: + break + if isinstance(raw, bytes): + try: + raw = raw.decode("utf-8") + except UnicodeDecodeError: + continue + try: + msg = json.loads(raw) + except json.JSONDecodeError: + continue + await self._handle_message(msg) + + except websockets.ConnectionClosed as e: + log.warning("WS connection closed: code=%s reason=%s", e.code, e.reason) + except Exception as e: + log.warning("WS error: %s: %s", type(e).__name__, e) + + async def run(self): + """Connect with automatic reconnection.""" + while self.should_run: + try: + await self._ensure_auth() + socket_data = get_socket_address( + self.room_id, user_id=self._user_id, token=self._token + ) + ws_url = socket_data.get("ws") + if not ws_url: + log.error("No WS URL returned, retrying in %ss", self.reconnect_delay) + await asyncio.sleep(self.reconnect_delay) + self.reconnect_delay = min(self.reconnect_delay * 1.5, 60) + continue + room_source = socket_data.get("roomSource", 9) + await self._connect_and_listen(ws_url, room_source) + except Exception as e: + log.error("WS run error: %s", e) + + if not self.should_run: + break + log.info("Reconnecting WS in %ss...", self.reconnect_delay) + await asyncio.sleep(self.reconnect_delay) + self.reconnect_delay = min(self.reconnect_delay * 1.5, 60) + + def stop(self): + self.should_run = False diff --git a/clickhouse/init.sql b/clickhouse/init.sql new file mode 100644 index 000000000..1930ab4ec --- /dev/null +++ b/clickhouse/init.sql @@ -0,0 +1,42 @@ +CREATE TABLE IF NOT EXISTS games ( + game_no UInt64, + winner UInt8, + total_pot UInt64, + bet_a UInt64, + bet_b UInt64, + bet_c UInt64, + hand_a String, + hand_b String, + hand_c String, + hand_type_a UInt8, + hand_type_b UInt8, + hand_type_c UInt8, + cards_json String, + duration_s UInt32, + created_at DateTime DEFAULT now() +) ENGINE = ReplacingMergeTree() +ORDER BY game_no; + +CREATE TABLE IF NOT EXISTS bets ( + game_no UInt64, + user_id UInt64, + chair UInt8, + bet_amount UInt64, + total_bet UInt64, + created_at DateTime64(3) DEFAULT now64(3) +) ENGINE = MergeTree() +ORDER BY (game_no, user_id, created_at); + +CREATE TABLE IF NOT EXISTS users ( + user_id UInt64, + nick_name String, + rich_level UInt16, + actor_level UInt16, + gender UInt8, + consume_total UInt64, + earn_total UInt64, + is_actor UInt8, + portrait String, + updated_at DateTime DEFAULT now() +) ENGINE = ReplacingMergeTree(updated_at) +ORDER BY user_id; diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..6105d6c31 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,41 @@ +services: + clickhouse: + image: clickhouse/clickhouse-server:latest + ports: + - "8124:8123" + volumes: + - clickhouse_data:/var/lib/clickhouse + - ./clickhouse/init.sql:/docker-entrypoint-initdb.d/init.sql + environment: + - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 + - CLICKHOUSE_PASSWORD= + ulimits: + nofile: + soft: 262144 + hard: 262144 + + app: + build: . + depends_on: + - clickhouse + volumes: + - ../sk_credentials.json:/app/sk_credentials.json:ro + environment: + - CLICKHOUSE_HOST=clickhouse + - SK_CREDENTIALS_FILE=/app/sk_credentials.json + + caddy: + image: caddy:2-alpine + ports: + - "8443:8443" + volumes: + - ./Caddyfile:/etc/caddy/Caddyfile:ro + - caddy_data:/data + - caddy_config:/config + depends_on: + - app + +volumes: + clickhouse_data: + caddy_data: + caddy_config: diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..57ad70091 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +aiohttp>=3.9 +websockets>=12.0 +clickhouse-connect>=0.7 +requests>=2.31 diff --git a/static/analytics.html b/static/analytics.html new file mode 100644 index 000000000..7a662a4f8 --- /dev/null +++ b/static/analytics.html @@ -0,0 +1,579 @@ + + + + + +Teen Patti Analytics + + + + + +
+

Teen Patti Analytics

+ Live Dashboard → +
+ +
+ Period: + + + + + + +
+ +
+ +
+
+
Total Games
+
-
+
+
+
Volume
+
-
+
+
+
Avg Pot
+
-
+
+
+
Bets Placed
+
-
+
+
+
Unique Bettors
+
-
+
+
+ + +
+
Volume Over Time
+
+ +
+
+ + +
+
+
Win Distribution
+
+
Winner Bet Rank
+
+
+
+
Hand Type Distribution
+
+
+
+ + +
+
Leaderboard (Top 20 by P&L)
+ + + + + + + + + + + + + +
#NameBetsWinsW%P&LWagered
+
+ + +
+
Game History
+
+ + + + + + + + + + + + + +
#WPotBetHand AHand BHand C
+
+
+
+ + + + diff --git a/static/index.html b/static/index.html new file mode 100644 index 000000000..84b4cbea4 --- /dev/null +++ b/static/index.html @@ -0,0 +1,1566 @@ + + + + + +Teen Patti Live Monitor + + + + + +
+

Teen Patti Live Monitor

+
+ Round + +
+
+ Analytics → +
+ Connecting... +
+
+ +
+ +
+
Current Game
+
+
Total Pot: 0
+
+
+
A
+
0
+
+
+
+
+
+
B
+
0
+
+
+
+
+
+
C
+
0
+
+
+
+
+
+ + + + + + + + + + + + +
Live Bets
+
+ +
+
Leaderboard (P&L)
+
+
+
+ + +
+
Bet Flow
+
+ +
+ + + +
Win Distribution
+
+
+
Chair A
+
0
+
+
+
+
Chair B
+
0
+
+
+
+
Chair C
+
0
+
+
+
+
Winner Bet Size
+
+
+
Low Bet
+
0
+
+
+
+
Mid Bet
+
0
+
+
+
+
High Bet
+
0
+
+
+
+ +
History
+
+ + + + + + + + + + + + + +
#WPotBetHand AHand BHand C
+
+
+
+ + + + + + +