Initial commit: Teen Patti live monitor with analytics

Live dashboard with real-time WebSocket updates, analytics page with
time-filtered stats, ClickHouse storage, and Caddy reverse proxy.
This commit is contained in:
2026-02-21 22:36:40 +05:00
commit 85f44e6a22
16 changed files with 3780 additions and 0 deletions

212
app/auth.py Normal file
View File

@@ -0,0 +1,212 @@
"""
Authentication and API helpers for StreamKar.
Handles auto_login, sv signature computation, and API calls.
"""
import hashlib
import json
import logging
import ssl
import time
import urllib.parse
import urllib.request
from typing import Optional
from . import config
log = logging.getLogger(__name__)
_nonce_counter = 0
_ssl_ctx = ssl.create_default_context()
def _get_nonce() -> int:
global _nonce_counter
_nonce_counter += 1
return _nonce_counter
def compute_sv(params_dict: dict) -> str:
"""Compute the 'sv' signature verification parameter."""
params = {k: str(v) for k, v in params_dict.items() if k != "sv"}
sorted_keys = sorted(params.keys(), key=str.lower)
parts = ""
last_value = ""
for i, key in enumerate(sorted_keys):
parts += key + ":"
if i == len(sorted_keys) - 1:
last_value = params[key]
else:
parts += params[key]
full_string = parts + last_value + config.EM5_SALT
md5_hash = hashlib.md5(full_string.encode()).digest()
value = int.from_bytes(md5_hash, "big")
result = []
for i in range(26):
shift = 128 - 5 - (i * 5)
if shift >= 0:
idx = (value >> shift) & 0x1F
else:
idx = (value << (-shift)) & 0x1F
result.append(config.EM5_ALPHABET[idx])
return "".join(result)
def auto_login() -> tuple[int, str]:
"""Login using credentials file. Returns (userId, token)."""
with open(config.SK_CREDENTIALS_FILE) as f:
creds = json.load(f)
payload = {
"platform": config.PLATFORM,
"a": config.APP_ID,
"c": int(config.CHANNEL_CODE),
"v": config.VERSION_CODE,
"l": "en",
"deviceModel": creds.get("deviceModel", "generic"),
"deviceUId": creds["deviceUId"],
"FuncTag": 40000002,
"loginType": creds.get("loginType", 48),
"autoLogin": 1,
"uuid": creds["uuid"],
}
if creds.get("sessionId"):
payload["sessionId"] = creds["sessionId"]
payload["sv"] = compute_sv(payload)
payload_json = json.dumps(payload, separators=(",", ":"))
qs = urllib.parse.urlencode({"parameter": payload_json})
url = f"{config.API_ENTRANCE}?{qs}"
req = urllib.request.Request(url, headers={
"User-Agent": "okhttp/4.9.3",
"Accept-Encoding": "gzip",
})
with urllib.request.urlopen(req, timeout=15, context=_ssl_ctx) as resp:
data = json.loads(resp.read())
tag_code = data.get("TagCode")
if tag_code != "00000000" or not data.get("token"):
raise RuntimeError(f"Login failed: TagCode={tag_code}")
user_id = data["userId"]
token = data["token"]
log.info("Logged in: userId=%s", user_id)
return user_id, token
def build_auth_headers(user_id: int = 0, token: Optional[str] = None) -> dict:
"""Build auth headers for native API requests."""
headers = {
"AccessKeyId": config.ACCESS_KEY_ID,
"Nonce": str(_get_nonce()),
"Timestamp": str(int(time.time() * 1000)),
"platform": str(config.PLATFORM),
"a": str(config.APP_ID),
"c": config.CHANNEL_CODE,
"v": str(config.VERSION_CODE),
"l": "en",
"sk-kimi": config.SK_KIMI,
"userId": str(user_id),
"User-Agent": "okhttp/4.9.3",
"Accept-Encoding": "gzip",
}
if token:
headers["token"] = token
return headers
def get_socket_address(room_id: int, user_id: int = 0, token: Optional[str] = None) -> dict:
"""Fetch WebSocket URL for a room via the socket-address API."""
import requests
params = {
"softVersion": 10080,
"c": config.CHANNEL_CODE,
"v": config.VERSION_CODE,
"appId": config.APP_ID,
"userId": user_id,
"roomId": room_id,
"platform": config.PLATFORM,
}
headers = build_auth_headers(user_id=user_id, token=token)
resp = requests.get(f"{config.SOCKET_BASE}/", params=params, headers=headers, timeout=15)
data = resp.json()
ws_url = data.get("ws", "")
if ws_url:
log.info("WebSocket URL: %s", ws_url)
else:
log.error("No WebSocket URL in response: %s", json.dumps(data)[:300])
return data
def call_api(func_tag: int, user_id: int, token: str, extra: dict = None) -> dict:
"""Call the game web API (meShowApi)."""
params = {
"FuncTag": func_tag,
"userId": user_id,
"token": token,
"platform": 1,
"c": 100,
"a": 1,
}
if extra:
params.update(extra)
qs = urllib.parse.urlencode({
"parameter": json.dumps(params),
"_": str(int(time.time() * 1000)),
})
url = f"{config.API_URL}?{qs}"
req = urllib.request.Request(url, headers={
"User-Agent": "Mozilla/5.0 (Linux; Android 15; SM-G990E) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Mobile Safari/537.36",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=10, context=_ssl_ctx) as resp:
return json.loads(resp.read())
def get_full_state(user_id: int, token: str) -> dict:
"""FuncTag 86000041 — full game state."""
return call_api(86000041, user_id, token)
def get_user_profile(target_user_id: int, user_id: int, token: str) -> dict | None:
"""FuncTag 50001010 — user profile (native API). Returns parsed profile or None."""
import requests
payload = {
"FuncTag": 50001010,
"userId": target_user_id,
"platform": config.PLATFORM,
"a": config.APP_ID,
"c": int(config.CHANNEL_CODE),
"v": config.VERSION_CODE,
"l": "en",
}
payload["sv"] = compute_sv(payload)
payload_json = json.dumps(payload, separators=(",", ":"))
headers = build_auth_headers(user_id=user_id, token=token)
resp = requests.get(
config.API_ENTRANCE,
params={"parameter": payload_json},
headers=headers,
timeout=10,
)
data = resp.json()
users = data.get("userList", [])
if users:
u = users[0]
return {
"user_id": target_user_id,
"nick_name": u.get("nickName", ""),
"rich_level": u.get("richLevel", 0),
"actor_level": u.get("actorLevel", 0),
"gender": u.get("gender", 0),
"consume_total": u.get("consumeTotal", 0),
"earn_total": u.get("earnTotal", 0),
"is_actor": u.get("isActor", 0),
"portrait": u.get("portrait", ""),
}
return None