Files
3pmonitor/app/server.py
Junaid Saeed Uppal 85f44e6a22 Initial commit: Teen Patti live monitor with analytics
Live dashboard with real-time WebSocket updates, analytics page with
time-filtered stats, ClickHouse storage, and Caddy reverse proxy.
2026-02-21 22:36:40 +05:00

192 lines
7.6 KiB
Python

"""
aiohttp web server + browser WebSocket.
Serves the dashboard and pushes real-time events to connected browsers.
All blocking DB calls run in a thread executor to avoid blocking the event loop.
"""
import asyncio
import json
import logging
import os
from functools import partial
from aiohttp import web
from . import config
from . import db
log = logging.getLogger(__name__)
STATIC_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "static")
async def _run_sync(fn, *args):
"""Run a blocking function in the default thread executor."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, partial(fn, *args))
class WebServer:
def __init__(self):
self.app = web.Application()
self.clients: set[web.WebSocketResponse] = set()
self._setup_routes()
def _setup_routes(self):
self.app.router.add_get("/", self._handle_index)
self.app.router.add_get("/api/history", self._handle_history)
self.app.router.add_get("/api/leaderboard", self._handle_leaderboard)
self.app.router.add_get("/api/stats", self._handle_stats)
self.app.router.add_get("/api/user/{user_id}", self._handle_user)
self.app.router.add_get("/api/hot-cold", self._handle_hot_cold)
self.app.router.add_get("/analytics", self._handle_analytics_page)
self.app.router.add_get("/api/analytics", self._handle_analytics)
self.app.router.add_get("/ws", self._handle_ws)
self.app.router.add_static("/static/", STATIC_DIR, name="static")
async def _handle_index(self, request: web.Request) -> web.Response:
index_path = os.path.join(STATIC_DIR, "index.html")
return web.FileResponse(index_path)
async def _handle_history(self, request: web.Request) -> web.Response:
n = int(request.query.get("n", 50))
try:
games = await _run_sync(db.get_recent_games, n)
return web.json_response(games)
except Exception as e:
log.error("History query failed: %s", e)
return web.json_response([], status=500)
async def _handle_leaderboard(self, request: web.Request) -> web.Response:
n = int(request.query.get("n", 10))
try:
leaders = await _run_sync(db.get_leaderboard, n)
return web.json_response(leaders)
except Exception as e:
log.error("Leaderboard query failed: %s", e)
return web.json_response([], status=500)
async def _handle_stats(self, request: web.Request) -> web.Response:
try:
dist = await _run_sync(db.get_win_distribution)
biggest = await _run_sync(db.get_biggest_winner)
return web.json_response({"win_distribution": dist, "biggest_winner": biggest})
except Exception as e:
log.error("Stats query failed: %s", e)
return web.json_response({}, status=500)
async def _handle_user(self, request: web.Request) -> web.Response:
uid = int(request.match_info["user_id"])
try:
detail = await _run_sync(db.get_user_detail, uid)
if detail:
return web.json_response(detail)
return web.json_response({"error": "User not found"}, status=404)
except Exception as e:
log.error("User detail query failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
async def _handle_analytics_page(self, request: web.Request) -> web.Response:
path = os.path.join(STATIC_DIR, "analytics.html")
return web.FileResponse(path)
async def _handle_analytics(self, request: web.Request) -> web.Response:
period = request.query.get("period", "all")
if period not in ("1h", "6h", "24h", "7d", "all"):
return web.json_response({"error": "Invalid period"}, status=400)
try:
data = await _run_sync(db.get_analytics, period)
return web.json_response(data)
except Exception as e:
log.error("Analytics query failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
async def _handle_hot_cold(self, request: web.Request) -> web.Response:
try:
data = await _run_sync(db.get_hot_cold_players)
return web.json_response(data)
except Exception as e:
log.error("Hot/cold query failed: %s", e)
return web.json_response({"hot": [], "cold": []}, status=500)
async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
self.clients.add(ws)
log.info("Browser connected (%d total)", len(self.clients))
# Send initial data (all DB calls in executor)
try:
if not ws.closed:
games = await _run_sync(db.get_recent_games, 50)
await ws.send_json({"type": "history", "data": games})
if not ws.closed:
dist = await _run_sync(db.get_win_distribution)
await ws.send_json({"type": "win_distribution", "data": dist})
if not ws.closed:
leaders = await _run_sync(db.get_leaderboard, 10)
await ws.send_json({"type": "leaderboard", "data": leaders})
if not ws.closed:
biggest = await _run_sync(db.get_biggest_winner)
if biggest:
await ws.send_json({"type": "biggest_winner", "data": biggest})
if not ws.closed:
hot_cold = await _run_sync(db.get_hot_cold_players)
await ws.send_json({"type": "hot_cold", "data": hot_cold})
except Exception as e:
log.warning("Failed to send initial data: %s", e)
try:
async for msg in ws:
pass # Browser doesn't send data, just listens
finally:
self.clients.discard(ws)
log.info("Browser disconnected (%d remaining)", len(self.clients))
return ws
async def broadcast(self, event_type: str, data: dict):
"""Push event to all connected browsers."""
if not self.clients:
return
payload = json.dumps({"type": event_type, "data": data})
stale = set()
for ws in self.clients:
try:
await ws.send_str(payload)
except Exception:
stale.add(ws)
self.clients -= stale
async def push_refresh(self):
"""Compute stats in executor and push over WS — no browser HTTP needed."""
try:
dist, leaders, biggest, hot_cold = await asyncio.gather(
_run_sync(db.get_win_distribution),
_run_sync(db.get_leaderboard, 10),
_run_sync(db.get_biggest_winner),
_run_sync(db.get_hot_cold_players),
)
await self.broadcast("win_distribution", dist)
await self.broadcast("leaderboard", leaders)
if biggest:
await self.broadcast("biggest_winner", biggest)
await self.broadcast("hot_cold", hot_cold)
except Exception as e:
log.warning("push_refresh failed: %s", e)
async def run(self):
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", config.WEB_PORT)
await site.start()
log.info("Web server listening on http://0.0.0.0:%s", config.WEB_PORT)
# Keep running until cancelled
try:
while True:
await asyncio.sleep(3600)
except asyncio.CancelledError:
pass
finally:
await runner.cleanup()