Files
3pmonitor/app/streamkar_ws.py
Junaid Saeed Uppal 85f44e6a22 Initial commit: Teen Patti live monitor with analytics
Live dashboard with real-time WebSocket updates, analytics page with
time-filtered stats, ClickHouse storage, and Caddy reverse proxy.
2026-02-21 22:36:40 +05:00

188 lines
6.7 KiB
Python

"""
WebSocket client to StreamKar game room.
Connects anonymously, captures USER_BET and GAME_WIN events.
"""
import asyncio
import json
import logging
import time
from datetime import datetime, timezone
import websockets
from . import config
from .auth import get_socket_address, auto_login, get_user_profile
from . import db
log = logging.getLogger(__name__)
class StreamKarWSClient:
def __init__(self, broadcast_fn):
"""
broadcast_fn: async callable(event_type: str, data: dict) to push to browser clients.
"""
self.broadcast = broadcast_fn
self.room_id = config.DEFAULT_ROOM_ID
self.should_run = True
self.reconnect_delay = 2.0
self._known_users: set[int] = set()
self._user_id = 0
self._token = None
self._session_wagered: dict[int, int] = {} # user_id -> total wagered this session
async def _ensure_auth(self):
if not self._token:
self._user_id, self._token = auto_login()
async def _fetch_user_profile(self, target_uid: int):
"""Fetch and cache user profile in background."""
if target_uid in self._known_users or target_uid == 0:
return
self._known_users.add(target_uid)
try:
await self._ensure_auth()
profile = get_user_profile(target_uid, self._user_id, self._token)
if profile:
db.upsert_user(profile)
log.debug("Fetched profile for %s: %s", target_uid, profile.get("nick_name"))
except Exception as e:
log.warning("Failed to fetch profile for %s: %s", target_uid, e)
def _build_login_msg(self, room_source: int) -> dict:
return {
"p": config.PLATFORM,
"a": config.APP_ID,
"v": config.VERSION_CODE,
"l": "en",
"MsgTag": config.MSGTAG_ROOM_LOGIN,
"roomId": self.room_id,
"roomSource": room_source,
"appId": config.APP_ID,
"softVersion": str(config.VERSION_CODE),
"container": 1,
"enterFrom": "room_list",
}
async def _handle_message(self, msg: dict):
tag = msg.get("MsgTag")
if tag is None:
return
tag = int(tag)
if tag == config.MSGTAG_USER_BET:
bet_info = msg.get("betInfo", {})
uid = msg.get("userId", 0)
game_no = msg.get("gameNo", 0)
chair = bet_info.get("betCard", 0)
amount = bet_info.get("betAmount", 0)
total = bet_info.get("totalBet", 0)
# Insert into ClickHouse
try:
db.insert_bet({
"game_no": game_no,
"user_id": uid,
"chair": chair,
"bet_amount": amount,
"total_bet": total,
})
except Exception as e:
log.warning("Failed to insert bet: %s", e)
# Track session wagered
self._session_wagered[uid] = self._session_wagered.get(uid, 0) + amount
# Resolve user name
nick = db.get_user_name(uid) if uid else None
# Fetch profile async if unknown
if not nick and uid:
asyncio.create_task(self._fetch_user_profile(uid))
# Broadcast to browser
await self.broadcast("user_bet", {
"game_no": game_no,
"user_id": uid,
"nick_name": nick or str(uid),
"chair": chair,
"chair_name": config.CHAIRS.get(chair, "?"),
"bet_amount": amount,
"total_bet": total,
"session_wagered": self._session_wagered.get(uid, 0),
"timestamp": datetime.now(timezone.utc).isoformat(),
})
elif tag == config.MSGTAG_GAME_WIN:
log.info("GAME_WIN broadcast received")
await self.broadcast("game_win_ws", {
"raw": {k: v for k, v in msg.items()
if k not in ("p", "a", "v", "l", "MsgTag")},
"timestamp": datetime.now(timezone.utc).isoformat(),
})
elif tag == config.MSGTAG_LOGIN_ACK:
log.info("WS login acknowledged — receiving room broadcasts")
async def _connect_and_listen(self, ws_url: str, room_source: int):
log.info("Connecting to %s ...", ws_url)
try:
async with websockets.connect(
ws_url,
additional_headers={"User-Agent": "okhttp/4.9.3"},
ping_interval=30,
ping_timeout=10,
close_timeout=5,
) as ws:
self.reconnect_delay = 2.0
login_msg = self._build_login_msg(room_source)
await ws.send(json.dumps(login_msg))
log.info("WS login sent for room %s", self.room_id)
async for raw in ws:
if not self.should_run:
break
if isinstance(raw, bytes):
try:
raw = raw.decode("utf-8")
except UnicodeDecodeError:
continue
try:
msg = json.loads(raw)
except json.JSONDecodeError:
continue
await self._handle_message(msg)
except websockets.ConnectionClosed as e:
log.warning("WS connection closed: code=%s reason=%s", e.code, e.reason)
except Exception as e:
log.warning("WS error: %s: %s", type(e).__name__, e)
async def run(self):
"""Connect with automatic reconnection."""
while self.should_run:
try:
await self._ensure_auth()
socket_data = get_socket_address(
self.room_id, user_id=self._user_id, token=self._token
)
ws_url = socket_data.get("ws")
if not ws_url:
log.error("No WS URL returned, retrying in %ss", self.reconnect_delay)
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 1.5, 60)
continue
room_source = socket_data.get("roomSource", 9)
await self._connect_and_listen(ws_url, room_source)
except Exception as e:
log.error("WS run error: %s", e)
if not self.should_run:
break
log.info("Reconnecting WS in %ss...", self.reconnect_delay)
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 1.5, 60)
def stop(self):
self.should_run = False