Files
3pmonitor/app/game_poller.py
Junaid Saeed Uppal 85f44e6a22 Initial commit: Teen Patti live monitor with analytics
Live dashboard with real-time WebSocket updates, analytics page with
time-filtered stats, ClickHouse storage, and Caddy reverse proxy.
2026-02-21 22:36:40 +05:00

234 lines
8.2 KiB
Python

"""
HTTP polling for Teen Patti game state.
Polls FuncTag 86000041 every 3 seconds, tracks game phases,
inserts completed rounds into ClickHouse, broadcasts to browser.
"""
import asyncio
import json
import logging
import time
from datetime import datetime
from . import config
from .auth import auto_login, get_full_state
from . import db
log = logging.getLogger(__name__)
def _format_card(card: dict) -> str:
v = config.VALUES.get(card["cardValue"], str(card["cardValue"]))
s = config.SUITS.get(card["cardColor"], "?")
return f"{v}{s}"
def _format_hand(cards: list[dict]) -> str:
return " ".join(_format_card(c) for c in cards)
class GamePoller:
def __init__(self, broadcast_fn, push_refresh_fn=None):
self.broadcast = broadcast_fn
self.push_refresh = push_refresh_fn
self.user_id = 0
self.token = ""
self.should_run = True
self.current_round = None
self.last_status = None
self.round_data: dict = {}
self.errors = 0
self.rounds_recorded = 0
def _login(self):
self.user_id, self.token = auto_login()
log.info("Poller logged in: userId=%s", self.user_id)
def _poll(self) -> dict | None:
try:
data = get_full_state(self.user_id, self.token)
tag = data.get("TagCode", "")
if tag == "30001005":
log.warning("Token expired, re-authenticating...")
self._login()
data = get_full_state(self.user_id, self.token)
self.errors = 0
return data
except Exception as e:
self.errors += 1
log.warning("Poll failed (%dx): %s", self.errors, e)
if self.errors > 10:
log.error("Too many consecutive errors, re-authenticating")
try:
self._login()
except Exception:
pass
self.errors = 0
return None
async def _process(self, data: dict):
gi = data.get("gameInfo", {})
gn = gi.get("gameNo")
gs = gi.get("gameStatus")
init = data.get("initData", {})
if gn is None:
return
# New round detected
if gn != self.current_round:
# Save previous round if complete
if self.round_data.get("gameResult"):
self._save_round()
# Push updated stats over WS (non-blocking)
if self.push_refresh:
await self.push_refresh()
self.current_round = gn
self.last_status = None
self.round_data = {
"gameNo": gn,
"time_start_ts": time.time(),
}
log.info("Round #%s started", gn)
# Build current state for browser
end_time = gi.get("endTime", 0)
current_time = init.get("currentTime", int(time.time() * 1000))
remaining_ms = max(0, end_time - current_time)
bets = gi.get("betInfos", {}).get("betInfoArry", [])
total_pot = gi.get("betInfos", {}).get("total", 0)
bet_map = {}
for b in bets:
chair = b["country"]
bet_map[config.CHAIRS.get(chair, "?")] = int(b.get("totalBet", 0) or 0)
# Cards (available in REVEALING/ENDED)
cards = {}
for p in gi.get("playerCardInfos", []):
chair = config.CHAIRS.get(p["country"], "?")
cards[chair] = {
"hand": _format_hand(p["cards"]),
"hand_type": config.HAND_TYPES.get(p["cardType"], "?"),
"hand_type_id": p["cardType"],
"cards_raw": p["cards"],
}
game_state = {
"game_no": gn,
"status": gs,
"status_name": config.STATUS_NAMES.get(gs, "?"),
"remaining_s": round(remaining_ms / 1000, 1),
"total_pot": total_pot,
"bets": bet_map,
"cards": cards,
"winner": gi.get("gameResult"),
"winner_name": config.CHAIRS.get(gi.get("gameResult"), None),
}
# Broadcast current state
await self.broadcast("game_state", game_state)
# Status transitions
if gs != self.last_status:
prev = self.last_status
self.last_status = gs
if gs == 2:
# Revealing — capture result
self._capture_result(gi)
await self.broadcast("cards_revealed", {
"game_no": gn,
"cards": cards,
"winner": gi.get("gameResult"),
"winner_name": config.CHAIRS.get(gi.get("gameResult"), "?"),
})
elif gs == 3:
# Ended
self._capture_result(gi)
self.round_data["time_end_ts"] = time.time()
start_ts = self.round_data.get("time_start_ts", 0)
self.round_data["duration_s"] = round(time.time() - start_ts)
await self.broadcast("round_result", {
"game_no": gn,
"winner": gi.get("gameResult"),
"winner_name": config.CHAIRS.get(gi.get("gameResult"), "?"),
"total_pot": total_pot,
"bets": bet_map,
"cards": cards,
"duration_s": self.round_data["duration_s"],
})
# Push updated stats over WS
if self.push_refresh:
await self.push_refresh()
def _capture_result(self, gi: dict):
self.round_data["gameResult"] = gi.get("gameResult")
bets = gi.get("betInfos", {}).get("betInfoArry", [])
self.round_data["total_pot"] = gi.get("betInfos", {}).get("total", 0)
for b in bets:
chair = b["country"]
self.round_data[f"bet_{config.CHAIRS.get(chair, '?').lower()}"] = (
int(b.get("totalBet", 0) or 0)
)
for p in gi.get("playerCardInfos", []):
chair_name = config.CHAIRS.get(p["country"], "?").lower()
self.round_data[f"hand_{chair_name}"] = _format_hand(p["cards"])
self.round_data[f"hand_type_{chair_name}"] = p["cardType"]
self.round_data["cards_json"] = json.dumps(gi.get("playerCardInfos", []))
def _save_round(self):
rd = self.round_data
if not rd.get("gameResult"):
return
try:
db.insert_game({
"game_no": rd["gameNo"],
"winner": rd["gameResult"],
"total_pot": rd.get("total_pot", 0),
"bet_a": rd.get("bet_a", 0),
"bet_b": rd.get("bet_b", 0),
"bet_c": rd.get("bet_c", 0),
"hand_a": rd.get("hand_a", ""),
"hand_b": rd.get("hand_b", ""),
"hand_c": rd.get("hand_c", ""),
"hand_type_a": rd.get("hand_type_a", 0),
"hand_type_b": rd.get("hand_type_b", 0),
"hand_type_c": rd.get("hand_type_c", 0),
"cards_json": rd.get("cards_json", ""),
"duration_s": rd.get("duration_s", 0),
})
self.rounds_recorded += 1
log.info("Round #%s saved (%d total)", rd["gameNo"], self.rounds_recorded)
except Exception as e:
log.error("Failed to save round #%s: %s", rd["gameNo"], e)
async def run(self):
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._login)
log.info("Game poller started (interval=%ss)", config.POLL_INTERVAL)
while self.should_run:
data = await loop.run_in_executor(None, self._poll)
if data:
await self._process(data)
await asyncio.sleep(config.POLL_INTERVAL)
# Save last round
if self.round_data.get("gameResult"):
self.round_data.setdefault("time_end_ts", time.time())
start_ts = self.round_data.get("time_start_ts", 0)
self.round_data.setdefault("duration_s", round(time.time() - start_ts))
self._save_round()
log.info("Poller stopped. %d rounds recorded.", self.rounds_recorded)
def stop(self):
self.should_run = False