Live dashboard with real-time WebSocket updates, analytics page with time-filtered stats, ClickHouse storage, and Caddy reverse proxy.
188 lines
6.7 KiB
Python
188 lines
6.7 KiB
Python
"""
|
|
WebSocket client to StreamKar game room.
|
|
Connects anonymously, captures USER_BET and GAME_WIN events.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
import websockets
|
|
|
|
from . import config
|
|
from .auth import get_socket_address, auto_login, get_user_profile
|
|
from . import db
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamKarWSClient:
|
|
def __init__(self, broadcast_fn):
|
|
"""
|
|
broadcast_fn: async callable(event_type: str, data: dict) to push to browser clients.
|
|
"""
|
|
self.broadcast = broadcast_fn
|
|
self.room_id = config.DEFAULT_ROOM_ID
|
|
self.should_run = True
|
|
self.reconnect_delay = 2.0
|
|
self._known_users: set[int] = set()
|
|
self._user_id = 0
|
|
self._token = None
|
|
self._session_wagered: dict[int, int] = {} # user_id -> total wagered this session
|
|
|
|
async def _ensure_auth(self):
|
|
if not self._token:
|
|
self._user_id, self._token = auto_login()
|
|
|
|
async def _fetch_user_profile(self, target_uid: int):
|
|
"""Fetch and cache user profile in background."""
|
|
if target_uid in self._known_users or target_uid == 0:
|
|
return
|
|
self._known_users.add(target_uid)
|
|
try:
|
|
await self._ensure_auth()
|
|
profile = get_user_profile(target_uid, self._user_id, self._token)
|
|
if profile:
|
|
db.upsert_user(profile)
|
|
log.debug("Fetched profile for %s: %s", target_uid, profile.get("nick_name"))
|
|
except Exception as e:
|
|
log.warning("Failed to fetch profile for %s: %s", target_uid, e)
|
|
|
|
def _build_login_msg(self, room_source: int) -> dict:
|
|
return {
|
|
"p": config.PLATFORM,
|
|
"a": config.APP_ID,
|
|
"v": config.VERSION_CODE,
|
|
"l": "en",
|
|
"MsgTag": config.MSGTAG_ROOM_LOGIN,
|
|
"roomId": self.room_id,
|
|
"roomSource": room_source,
|
|
"appId": config.APP_ID,
|
|
"softVersion": str(config.VERSION_CODE),
|
|
"container": 1,
|
|
"enterFrom": "room_list",
|
|
}
|
|
|
|
async def _handle_message(self, msg: dict):
|
|
tag = msg.get("MsgTag")
|
|
if tag is None:
|
|
return
|
|
tag = int(tag)
|
|
|
|
if tag == config.MSGTAG_USER_BET:
|
|
bet_info = msg.get("betInfo", {})
|
|
uid = msg.get("userId", 0)
|
|
game_no = msg.get("gameNo", 0)
|
|
chair = bet_info.get("betCard", 0)
|
|
amount = bet_info.get("betAmount", 0)
|
|
total = bet_info.get("totalBet", 0)
|
|
|
|
# Insert into ClickHouse
|
|
try:
|
|
db.insert_bet({
|
|
"game_no": game_no,
|
|
"user_id": uid,
|
|
"chair": chair,
|
|
"bet_amount": amount,
|
|
"total_bet": total,
|
|
})
|
|
except Exception as e:
|
|
log.warning("Failed to insert bet: %s", e)
|
|
|
|
# Track session wagered
|
|
self._session_wagered[uid] = self._session_wagered.get(uid, 0) + amount
|
|
|
|
# Resolve user name
|
|
nick = db.get_user_name(uid) if uid else None
|
|
# Fetch profile async if unknown
|
|
if not nick and uid:
|
|
asyncio.create_task(self._fetch_user_profile(uid))
|
|
|
|
# Broadcast to browser
|
|
await self.broadcast("user_bet", {
|
|
"game_no": game_no,
|
|
"user_id": uid,
|
|
"nick_name": nick or str(uid),
|
|
"chair": chair,
|
|
"chair_name": config.CHAIRS.get(chair, "?"),
|
|
"bet_amount": amount,
|
|
"total_bet": total,
|
|
"session_wagered": self._session_wagered.get(uid, 0),
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
|
|
elif tag == config.MSGTAG_GAME_WIN:
|
|
log.info("GAME_WIN broadcast received")
|
|
await self.broadcast("game_win_ws", {
|
|
"raw": {k: v for k, v in msg.items()
|
|
if k not in ("p", "a", "v", "l", "MsgTag")},
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
|
|
elif tag == config.MSGTAG_LOGIN_ACK:
|
|
log.info("WS login acknowledged — receiving room broadcasts")
|
|
|
|
async def _connect_and_listen(self, ws_url: str, room_source: int):
|
|
log.info("Connecting to %s ...", ws_url)
|
|
try:
|
|
async with websockets.connect(
|
|
ws_url,
|
|
additional_headers={"User-Agent": "okhttp/4.9.3"},
|
|
ping_interval=30,
|
|
ping_timeout=10,
|
|
close_timeout=5,
|
|
) as ws:
|
|
self.reconnect_delay = 2.0
|
|
login_msg = self._build_login_msg(room_source)
|
|
await ws.send(json.dumps(login_msg))
|
|
log.info("WS login sent for room %s", self.room_id)
|
|
|
|
async for raw in ws:
|
|
if not self.should_run:
|
|
break
|
|
if isinstance(raw, bytes):
|
|
try:
|
|
raw = raw.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
continue
|
|
try:
|
|
msg = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
await self._handle_message(msg)
|
|
|
|
except websockets.ConnectionClosed as e:
|
|
log.warning("WS connection closed: code=%s reason=%s", e.code, e.reason)
|
|
except Exception as e:
|
|
log.warning("WS error: %s: %s", type(e).__name__, e)
|
|
|
|
async def run(self):
|
|
"""Connect with automatic reconnection."""
|
|
while self.should_run:
|
|
try:
|
|
await self._ensure_auth()
|
|
socket_data = get_socket_address(
|
|
self.room_id, user_id=self._user_id, token=self._token
|
|
)
|
|
ws_url = socket_data.get("ws")
|
|
if not ws_url:
|
|
log.error("No WS URL returned, retrying in %ss", self.reconnect_delay)
|
|
await asyncio.sleep(self.reconnect_delay)
|
|
self.reconnect_delay = min(self.reconnect_delay * 1.5, 60)
|
|
continue
|
|
room_source = socket_data.get("roomSource", 9)
|
|
await self._connect_and_listen(ws_url, room_source)
|
|
except Exception as e:
|
|
log.error("WS run error: %s", e)
|
|
|
|
if not self.should_run:
|
|
break
|
|
log.info("Reconnecting WS in %ss...", self.reconnect_delay)
|
|
await asyncio.sleep(self.reconnect_delay)
|
|
self.reconnect_delay = min(self.reconnect_delay * 1.5, 60)
|
|
|
|
def stop(self):
|
|
self.should_run = False
|