""" Authentication and API helpers for StreamKar. Handles auto_login, sv signature computation, and API calls. """ import hashlib import json import logging import ssl import time import urllib.parse import urllib.request from typing import Optional from . import config log = logging.getLogger(__name__) _nonce_counter = 0 _ssl_ctx = ssl.create_default_context() def _get_nonce() -> int: global _nonce_counter _nonce_counter += 1 return _nonce_counter def compute_sv(params_dict: dict) -> str: """Compute the 'sv' signature verification parameter.""" params = {k: str(v) for k, v in params_dict.items() if k != "sv"} sorted_keys = sorted(params.keys(), key=str.lower) parts = "" last_value = "" for i, key in enumerate(sorted_keys): parts += key + ":" if i == len(sorted_keys) - 1: last_value = params[key] else: parts += params[key] full_string = parts + last_value + config.EM5_SALT md5_hash = hashlib.md5(full_string.encode()).digest() value = int.from_bytes(md5_hash, "big") result = [] for i in range(26): shift = 128 - 5 - (i * 5) if shift >= 0: idx = (value >> shift) & 0x1F else: idx = (value << (-shift)) & 0x1F result.append(config.EM5_ALPHABET[idx]) return "".join(result) def auto_login() -> tuple[int, str]: """Login using credentials file. Returns (userId, token).""" with open(config.SK_CREDENTIALS_FILE) as f: creds = json.load(f) payload = { "platform": config.PLATFORM, "a": config.APP_ID, "c": int(config.CHANNEL_CODE), "v": config.VERSION_CODE, "l": "en", "deviceModel": creds.get("deviceModel", "generic"), "deviceUId": creds["deviceUId"], "FuncTag": 40000002, "loginType": creds.get("loginType", 48), "autoLogin": 1, "uuid": creds["uuid"], } if creds.get("sessionId"): payload["sessionId"] = creds["sessionId"] payload["sv"] = compute_sv(payload) payload_json = json.dumps(payload, separators=(",", ":")) qs = urllib.parse.urlencode({"parameter": payload_json}) url = f"{config.API_ENTRANCE}?{qs}" req = urllib.request.Request(url, headers={ "User-Agent": "okhttp/4.9.3", "Accept-Encoding": "gzip", }) with urllib.request.urlopen(req, timeout=15, context=_ssl_ctx) as resp: data = json.loads(resp.read()) tag_code = data.get("TagCode") if tag_code != "00000000" or not data.get("token"): raise RuntimeError(f"Login failed: TagCode={tag_code}") user_id = data["userId"] token = data["token"] log.info("Logged in: userId=%s", user_id) return user_id, token def build_auth_headers(user_id: int = 0, token: Optional[str] = None) -> dict: """Build auth headers for native API requests.""" headers = { "AccessKeyId": config.ACCESS_KEY_ID, "Nonce": str(_get_nonce()), "Timestamp": str(int(time.time() * 1000)), "platform": str(config.PLATFORM), "a": str(config.APP_ID), "c": config.CHANNEL_CODE, "v": str(config.VERSION_CODE), "l": "en", "sk-kimi": config.SK_KIMI, "userId": str(user_id), "User-Agent": "okhttp/4.9.3", "Accept-Encoding": "gzip", } if token: headers["token"] = token return headers def get_socket_address(room_id: int, user_id: int = 0, token: Optional[str] = None) -> dict: """Fetch WebSocket URL for a room via the socket-address API.""" import requests params = { "softVersion": 10080, "c": config.CHANNEL_CODE, "v": config.VERSION_CODE, "appId": config.APP_ID, "userId": user_id, "roomId": room_id, "platform": config.PLATFORM, } headers = build_auth_headers(user_id=user_id, token=token) resp = requests.get(f"{config.SOCKET_BASE}/", params=params, headers=headers, timeout=15) data = resp.json() ws_url = data.get("ws", "") if ws_url: log.info("WebSocket URL: %s", ws_url) else: log.error("No WebSocket URL in response: %s", json.dumps(data)[:300]) return data def call_api(func_tag: int, user_id: int, token: str, extra: dict = None) -> dict: """Call the game web API (meShowApi).""" params = { "FuncTag": func_tag, "userId": user_id, "token": token, "platform": 1, "c": 100, "a": 1, } if extra: params.update(extra) qs = urllib.parse.urlencode({ "parameter": json.dumps(params), "_": str(int(time.time() * 1000)), }) url = f"{config.API_URL}?{qs}" req = urllib.request.Request(url, headers={ "User-Agent": "Mozilla/5.0 (Linux; Android 15; SM-G990E) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/131.0.0.0 Mobile Safari/537.36", "Accept": "application/json", }) with urllib.request.urlopen(req, timeout=10, context=_ssl_ctx) as resp: return json.loads(resp.read()) def get_full_state(user_id: int, token: str) -> dict: """FuncTag 86000041 — full game state.""" return call_api(86000041, user_id, token) def get_user_profile(target_user_id: int, user_id: int, token: str) -> dict | None: """FuncTag 50001010 — user profile (native API). Returns parsed profile or None.""" import requests payload = { "FuncTag": 50001010, "userId": target_user_id, "platform": config.PLATFORM, "a": config.APP_ID, "c": int(config.CHANNEL_CODE), "v": config.VERSION_CODE, "l": "en", } payload["sv"] = compute_sv(payload) payload_json = json.dumps(payload, separators=(",", ":")) headers = build_auth_headers(user_id=user_id, token=token) resp = requests.get( config.API_ENTRANCE, params={"parameter": payload_json}, headers=headers, timeout=10, ) data = resp.json() users = data.get("userList", []) if users: u = users[0] return { "user_id": target_user_id, "nick_name": u.get("nickName", ""), "rich_level": u.get("richLevel", 0), "actor_level": u.get("actorLevel", 0), "gender": u.get("gender", 0), "consume_total": u.get("consumeTotal", 0), "earn_total": u.get("earnTotal", 0), "is_actor": u.get("isActor", 0), "portrait": u.get("portrait", ""), } return None