""" WebSocket client to StreamKar game room. Connects anonymously, captures USER_BET and GAME_WIN events. """ import asyncio import json import logging import time from datetime import datetime, timezone import websockets from . import config from .auth import get_socket_address, auto_login, get_user_profile from . import db log = logging.getLogger(__name__) class StreamKarWSClient: def __init__(self, broadcast_fn): """ broadcast_fn: async callable(event_type: str, data: dict) to push to browser clients. """ self.broadcast = broadcast_fn self.room_id = config.DEFAULT_ROOM_ID self.should_run = True self.reconnect_delay = 2.0 self._known_users: set[int] = set() self._user_id = 0 self._token = None self._session_wagered: dict[int, int] = {} # user_id -> total wagered this session async def _ensure_auth(self): if not self._token: self._user_id, self._token = auto_login() async def _fetch_user_profile(self, target_uid: int): """Fetch and cache user profile in background.""" if target_uid in self._known_users or target_uid == 0: return self._known_users.add(target_uid) try: await self._ensure_auth() profile = get_user_profile(target_uid, self._user_id, self._token) if profile: db.upsert_user(profile) log.debug("Fetched profile for %s: %s", target_uid, profile.get("nick_name")) except Exception as e: log.warning("Failed to fetch profile for %s: %s", target_uid, e) def _build_login_msg(self, room_source: int) -> dict: return { "p": config.PLATFORM, "a": config.APP_ID, "v": config.VERSION_CODE, "l": "en", "MsgTag": config.MSGTAG_ROOM_LOGIN, "roomId": self.room_id, "roomSource": room_source, "appId": config.APP_ID, "softVersion": str(config.VERSION_CODE), "container": 1, "enterFrom": "room_list", } async def _handle_message(self, msg: dict): tag = msg.get("MsgTag") if tag is None: return tag = int(tag) if tag == config.MSGTAG_USER_BET: bet_info = msg.get("betInfo", {}) uid = msg.get("userId", 0) game_no = msg.get("gameNo", 0) chair = bet_info.get("betCard", 0) amount = bet_info.get("betAmount", 0) total = bet_info.get("totalBet", 0) # Insert into ClickHouse try: db.insert_bet({ "game_no": game_no, "user_id": uid, "chair": chair, "bet_amount": amount, "total_bet": total, }) except Exception as e: log.warning("Failed to insert bet: %s", e) # Track session wagered self._session_wagered[uid] = self._session_wagered.get(uid, 0) + amount # Resolve user name nick = db.get_user_name(uid) if uid else None # Fetch profile async if unknown if not nick and uid: asyncio.create_task(self._fetch_user_profile(uid)) # Broadcast to browser await self.broadcast("user_bet", { "game_no": game_no, "user_id": uid, "nick_name": nick or str(uid), "chair": chair, "chair_name": config.CHAIRS.get(chair, "?"), "bet_amount": amount, "total_bet": total, "session_wagered": self._session_wagered.get(uid, 0), "timestamp": datetime.now(timezone.utc).isoformat(), }) elif tag == config.MSGTAG_GAME_WIN: log.info("GAME_WIN broadcast received") await self.broadcast("game_win_ws", { "raw": {k: v for k, v in msg.items() if k not in ("p", "a", "v", "l", "MsgTag")}, "timestamp": datetime.now(timezone.utc).isoformat(), }) elif tag == config.MSGTAG_LOGIN_ACK: log.info("WS login acknowledged — receiving room broadcasts") async def _connect_and_listen(self, ws_url: str, room_source: int): log.info("Connecting to %s ...", ws_url) try: async with websockets.connect( ws_url, additional_headers={"User-Agent": "okhttp/4.9.3"}, ping_interval=30, ping_timeout=10, close_timeout=5, ) as ws: self.reconnect_delay = 2.0 login_msg = self._build_login_msg(room_source) await ws.send(json.dumps(login_msg)) log.info("WS login sent for room %s", self.room_id) async for raw in ws: if not self.should_run: break if isinstance(raw, bytes): try: raw = raw.decode("utf-8") except UnicodeDecodeError: continue try: msg = json.loads(raw) except json.JSONDecodeError: continue await self._handle_message(msg) except websockets.ConnectionClosed as e: log.warning("WS connection closed: code=%s reason=%s", e.code, e.reason) except Exception as e: log.warning("WS error: %s: %s", type(e).__name__, e) async def run(self): """Connect with automatic reconnection.""" while self.should_run: try: await self._ensure_auth() socket_data = get_socket_address( self.room_id, user_id=self._user_id, token=self._token ) ws_url = socket_data.get("ws") if not ws_url: log.error("No WS URL returned, retrying in %ss", self.reconnect_delay) await asyncio.sleep(self.reconnect_delay) self.reconnect_delay = min(self.reconnect_delay * 1.5, 60) continue room_source = socket_data.get("roomSource", 9) await self._connect_and_listen(ws_url, room_source) except Exception as e: log.error("WS run error: %s", e) if not self.should_run: break log.info("Reconnecting WS in %ss...", self.reconnect_delay) await asyncio.sleep(self.reconnect_delay) self.reconnect_delay = min(self.reconnect_delay * 1.5, 60) def stop(self): self.should_run = False