""" HTTP polling for Teen Patti game state. Polls FuncTag 86000041 every 3 seconds, tracks game phases, inserts completed rounds into ClickHouse, broadcasts to browser. """ import asyncio import json import logging import time from datetime import datetime from . import config from .auth import auto_login, get_full_state from . import db log = logging.getLogger(__name__) def _format_card(card: dict) -> str: v = config.VALUES.get(card["cardValue"], str(card["cardValue"])) s = config.SUITS.get(card["cardColor"], "?") return f"{v}{s}" def _format_hand(cards: list[dict]) -> str: return " ".join(_format_card(c) for c in cards) class GamePoller: def __init__(self, broadcast_fn, push_refresh_fn=None): self.broadcast = broadcast_fn self.push_refresh = push_refresh_fn self.user_id = 0 self.token = "" self.should_run = True self.current_round = None self.last_status = None self.round_data: dict = {} self.errors = 0 self.rounds_recorded = 0 def _login(self): self.user_id, self.token = auto_login() log.info("Poller logged in: userId=%s", self.user_id) def _poll(self) -> dict | None: try: data = get_full_state(self.user_id, self.token) tag = data.get("TagCode", "") if tag == "30001005": log.warning("Token expired, re-authenticating...") self._login() data = get_full_state(self.user_id, self.token) self.errors = 0 return data except Exception as e: self.errors += 1 log.warning("Poll failed (%dx): %s", self.errors, e) if self.errors > 10: log.error("Too many consecutive errors, re-authenticating") try: self._login() except Exception: pass self.errors = 0 return None async def _process(self, data: dict): gi = data.get("gameInfo", {}) gn = gi.get("gameNo") gs = gi.get("gameStatus") init = data.get("initData", {}) if gn is None: return # New round detected if gn != self.current_round: # Save previous round if complete if self.round_data.get("gameResult"): self._save_round() # Push updated stats over WS (non-blocking) if self.push_refresh: await self.push_refresh() self.current_round = gn self.last_status = None self.round_data = { "gameNo": gn, "time_start_ts": time.time(), } log.info("Round #%s started", gn) # Build current state for browser end_time = gi.get("endTime", 0) current_time = init.get("currentTime", int(time.time() * 1000)) remaining_ms = max(0, end_time - current_time) bets = gi.get("betInfos", {}).get("betInfoArry", []) total_pot = gi.get("betInfos", {}).get("total", 0) bet_map = {} for b in bets: chair = b["country"] bet_map[config.CHAIRS.get(chair, "?")] = int(b.get("totalBet", 0) or 0) # Cards (available in REVEALING/ENDED) cards = {} for p in gi.get("playerCardInfos", []): chair = config.CHAIRS.get(p["country"], "?") cards[chair] = { "hand": _format_hand(p["cards"]), "hand_type": config.HAND_TYPES.get(p["cardType"], "?"), "hand_type_id": p["cardType"], "cards_raw": p["cards"], } game_state = { "game_no": gn, "status": gs, "status_name": config.STATUS_NAMES.get(gs, "?"), "remaining_s": round(remaining_ms / 1000, 1), "total_pot": total_pot, "bets": bet_map, "cards": cards, "winner": gi.get("gameResult"), "winner_name": config.CHAIRS.get(gi.get("gameResult"), None), } # Broadcast current state await self.broadcast("game_state", game_state) # Status transitions if gs != self.last_status: prev = self.last_status self.last_status = gs if gs == 2: # Revealing — capture result self._capture_result(gi) await self.broadcast("cards_revealed", { "game_no": gn, "cards": cards, "winner": gi.get("gameResult"), "winner_name": config.CHAIRS.get(gi.get("gameResult"), "?"), }) elif gs == 3: # Ended self._capture_result(gi) self.round_data["time_end_ts"] = time.time() start_ts = self.round_data.get("time_start_ts", 0) self.round_data["duration_s"] = round(time.time() - start_ts) self._save_round() await self.broadcast("round_result", { "game_no": gn, "winner": gi.get("gameResult"), "winner_name": config.CHAIRS.get(gi.get("gameResult"), "?"), "total_pot": total_pot, "bets": bet_map, "cards": cards, "duration_s": self.round_data["duration_s"], }) # Push updated stats over WS if self.push_refresh: await self.push_refresh() def _capture_result(self, gi: dict): self.round_data["gameResult"] = gi.get("gameResult") bets = gi.get("betInfos", {}).get("betInfoArry", []) self.round_data["total_pot"] = gi.get("betInfos", {}).get("total", 0) for b in bets: chair = b["country"] self.round_data[f"bet_{config.CHAIRS.get(chair, '?').lower()}"] = ( int(b.get("totalBet", 0) or 0) ) for p in gi.get("playerCardInfos", []): chair_name = config.CHAIRS.get(p["country"], "?").lower() self.round_data[f"hand_{chair_name}"] = _format_hand(p["cards"]) self.round_data[f"hand_type_{chair_name}"] = p["cardType"] self.round_data["cards_json"] = json.dumps(gi.get("playerCardInfos", [])) def _save_round(self): rd = self.round_data if not rd.get("gameResult"): return try: db.insert_game({ "game_no": rd["gameNo"], "winner": rd["gameResult"], "total_pot": rd.get("total_pot", 0), "bet_a": rd.get("bet_a", 0), "bet_b": rd.get("bet_b", 0), "bet_c": rd.get("bet_c", 0), "hand_a": rd.get("hand_a", ""), "hand_b": rd.get("hand_b", ""), "hand_c": rd.get("hand_c", ""), "hand_type_a": rd.get("hand_type_a", 0), "hand_type_b": rd.get("hand_type_b", 0), "hand_type_c": rd.get("hand_type_c", 0), "cards_json": rd.get("cards_json", ""), "duration_s": rd.get("duration_s", 0), }) self.rounds_recorded += 1 log.info("Round #%s saved (%d total)", rd["gameNo"], self.rounds_recorded) except Exception as e: log.error("Failed to save round #%s: %s", rd["gameNo"], e) async def run(self): loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._login) log.info("Game poller started (interval=%ss)", config.POLL_INTERVAL) while self.should_run: data = await loop.run_in_executor(None, self._poll) if data: await self._process(data) await asyncio.sleep(config.POLL_INTERVAL) # Save last round if self.round_data.get("gameResult"): self.round_data.setdefault("time_end_ts", time.time()) start_ts = self.round_data.get("time_start_ts", 0) self.round_data.setdefault("duration_s", round(time.time() - start_ts)) self._save_round() log.info("Poller stopped. %d rounds recorded.", self.rounds_recorded) def stop(self): self.should_run = False