Files
3pmonitor/app/server.py
Junaid Saeed Uppal 86865166ef add balance/mean-reversion signal and Cloudflare visitor logging
Balance signal (15% weight) favors under-represented chairs over last 50
games. Visitor middleware captures real IPs from CF headers, batched into
ClickHouse with 90-day TTL.
2026-02-26 09:59:27 +05:00

290 lines
12 KiB
Python

"""
aiohttp web server + browser WebSocket.
Serves the dashboard and pushes real-time events to connected browsers.
All blocking DB calls run in a thread executor to avoid blocking the event loop.
"""
import asyncio
import json
import logging
import os
from functools import partial
from aiohttp import web
from . import config
from . import db
log = logging.getLogger(__name__)
STATIC_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "static")
async def _run_sync(fn, *args):
"""Run a blocking function in the default thread executor."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, partial(fn, *args))
class WebServer:
def __init__(self):
self.app = web.Application()
self.clients: set[web.WebSocketResponse] = set()
self._visitor_buffer: list[dict] = []
self._visitor_lock = asyncio.Lock()
self._setup_routes()
self.app.middlewares.append(self._make_visitor_middleware())
def _make_visitor_middleware(self):
server = self
@web.middleware
async def visitor_middleware(request: web.Request, handler):
response = await handler(request)
path = request.path
# Skip static files and WebSocket upgrades
if path.startswith("/static/") or request.headers.get("Upgrade", "").lower() == "websocket":
return response
ip = (
request.headers.get("CF-Connecting-IP")
or (request.headers.get("X-Forwarded-For", "").split(",")[0].strip())
or request.remote
or ""
)
visitor = {
"ip": ip,
"country": request.headers.get("CF-IPCountry", ""),
"path": path,
"method": request.method,
"user_agent": request.headers.get("User-Agent", ""),
"referer": request.headers.get("Referer", ""),
"accept_lang": request.headers.get("Accept-Language", ""),
}
batch = None
async with server._visitor_lock:
server._visitor_buffer.append(visitor)
if len(server._visitor_buffer) >= 20:
batch = server._visitor_buffer[:]
server._visitor_buffer.clear()
if batch:
try:
await _run_sync(db.insert_visitors, batch)
except Exception as e:
log.warning("Visitor insert failed: %s", e)
return response
return visitor_middleware
async def _flush_visitors(self):
"""Periodically flush visitor buffer so low-traffic visits aren't lost."""
while True:
await asyncio.sleep(30)
batch = None
async with self._visitor_lock:
if self._visitor_buffer:
batch = self._visitor_buffer[:]
self._visitor_buffer.clear()
if batch:
try:
await _run_sync(db.insert_visitors, batch)
except Exception as e:
log.warning("Visitor flush failed: %s", e)
def _setup_routes(self):
self.app.router.add_get("/", self._handle_index)
self.app.router.add_get("/api/history", self._handle_history)
self.app.router.add_get("/api/leaderboard", self._handle_leaderboard)
self.app.router.add_get("/api/stats", self._handle_stats)
self.app.router.add_get("/api/user/{user_id}", self._handle_user)
self.app.router.add_get("/api/hot-cold", self._handle_hot_cold)
self.app.router.add_get("/analytics", self._handle_analytics_page)
self.app.router.add_get("/api/analytics", self._handle_analytics)
self.app.router.add_get("/patterns", self._handle_patterns_page)
self.app.router.add_get("/api/patterns", self._handle_patterns)
self.app.router.add_get("/predictions", self._handle_predictions_page)
self.app.router.add_get("/api/predictions", self._handle_predictions)
self.app.router.add_get("/api/prediction-history", self._handle_prediction_history)
self.app.router.add_get("/ws", self._handle_ws)
self.app.router.add_static("/static/", STATIC_DIR, name="static")
async def _handle_index(self, request: web.Request) -> web.Response:
index_path = os.path.join(STATIC_DIR, "index.html")
return web.FileResponse(index_path)
async def _handle_history(self, request: web.Request) -> web.Response:
n = int(request.query.get("n", 50))
try:
games = await _run_sync(db.get_recent_games, n)
return web.json_response(games)
except Exception as e:
log.error("History query failed: %s", e)
return web.json_response([], status=500)
async def _handle_leaderboard(self, request: web.Request) -> web.Response:
n = int(request.query.get("n", 10))
try:
leaders = await _run_sync(db.get_leaderboard, n)
return web.json_response(leaders)
except Exception as e:
log.error("Leaderboard query failed: %s", e)
return web.json_response([], status=500)
async def _handle_stats(self, request: web.Request) -> web.Response:
try:
dist = await _run_sync(db.get_win_distribution)
biggest = await _run_sync(db.get_biggest_winner)
return web.json_response({"win_distribution": dist, "biggest_winner": biggest})
except Exception as e:
log.error("Stats query failed: %s", e)
return web.json_response({}, status=500)
async def _handle_user(self, request: web.Request) -> web.Response:
uid = int(request.match_info["user_id"])
try:
detail = await _run_sync(db.get_user_detail, uid)
if detail:
return web.json_response(detail)
return web.json_response({"error": "User not found"}, status=404)
except Exception as e:
log.error("User detail query failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
async def _handle_analytics_page(self, request: web.Request) -> web.Response:
path = os.path.join(STATIC_DIR, "analytics.html")
return web.FileResponse(path)
async def _handle_patterns_page(self, request: web.Request) -> web.Response:
path = os.path.join(STATIC_DIR, "patterns.html")
return web.FileResponse(path)
async def _handle_patterns(self, request: web.Request) -> web.Response:
try:
data = await _run_sync(db.get_pattern_analysis)
return web.json_response(data)
except Exception as e:
log.error("Pattern analysis query failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
async def _handle_predictions_page(self, request: web.Request) -> web.Response:
path = os.path.join(STATIC_DIR, "predictions.html")
return web.FileResponse(path)
async def _handle_predictions(self, request: web.Request) -> web.Response:
try:
data = await _run_sync(db.get_prediction_analysis)
return web.json_response(data)
except Exception as e:
log.error("Prediction analysis query failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
async def _handle_prediction_history(self, request: web.Request) -> web.Response:
limit = min(int(request.query.get("limit", 100)), 500)
try:
data = await _run_sync(db.get_prediction_history, limit)
return web.json_response(data)
except Exception as e:
log.error("Prediction history query failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
async def _handle_analytics(self, request: web.Request) -> web.Response:
period = request.query.get("period", "all")
if period not in ("1h", "6h", "24h", "7d", "all"):
return web.json_response({"error": "Invalid period"}, status=400)
try:
data = await _run_sync(db.get_analytics, period)
return web.json_response(data)
except Exception as e:
log.error("Analytics query failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
async def _handle_hot_cold(self, request: web.Request) -> web.Response:
try:
data = await _run_sync(db.get_hot_cold_players)
return web.json_response(data)
except Exception as e:
log.error("Hot/cold query failed: %s", e)
return web.json_response({"hot": [], "cold": []}, status=500)
async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
self.clients.add(ws)
log.info("Browser connected (%d total)", len(self.clients))
# Send initial data (all DB calls in executor)
try:
if not ws.closed:
games = await _run_sync(db.get_recent_games, 50)
await ws.send_json({"type": "history", "data": games})
if not ws.closed:
dist = await _run_sync(db.get_win_distribution)
await ws.send_json({"type": "win_distribution", "data": dist})
if not ws.closed:
leaders = await _run_sync(db.get_leaderboard, 10)
await ws.send_json({"type": "leaderboard", "data": leaders})
if not ws.closed:
biggest = await _run_sync(db.get_biggest_winner)
if biggest:
await ws.send_json({"type": "biggest_winner", "data": biggest})
if not ws.closed:
hot_cold = await _run_sync(db.get_hot_cold_players)
await ws.send_json({"type": "hot_cold", "data": hot_cold})
except Exception as e:
log.warning("Failed to send initial data: %s", e)
try:
async for msg in ws:
pass # Browser doesn't send data, just listens
finally:
self.clients.discard(ws)
log.info("Browser disconnected (%d remaining)", len(self.clients))
return ws
async def broadcast(self, event_type: str, data: dict):
"""Push event to all connected browsers."""
if not self.clients:
return
payload = json.dumps({"type": event_type, "data": data})
stale = set()
for ws in self.clients:
try:
await ws.send_str(payload)
except Exception:
stale.add(ws)
self.clients -= stale
async def push_refresh(self):
"""Compute stats in executor and push over WS — no browser HTTP needed."""
try:
dist, leaders, biggest, hot_cold = await asyncio.gather(
_run_sync(db.get_win_distribution),
_run_sync(db.get_leaderboard, 10),
_run_sync(db.get_biggest_winner),
_run_sync(db.get_hot_cold_players),
)
await self.broadcast("win_distribution", dist)
await self.broadcast("leaderboard", leaders)
if biggest:
await self.broadcast("biggest_winner", biggest)
await self.broadcast("hot_cold", hot_cold)
except Exception as e:
log.warning("push_refresh failed: %s", e)
async def run(self):
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", config.WEB_PORT)
await site.start()
log.info("Web server listening on http://0.0.0.0:%s", config.WEB_PORT)
flush_task = asyncio.create_task(self._flush_visitors())
# Keep running until cancelled
try:
while True:
await asyncio.sleep(3600)
except asyncio.CancelledError:
pass
finally:
flush_task.cancel()
await runner.cleanup()