- Add _compute_whale_public_picks() to reconstruct whale/public picks from historical bets - Merge whale_pick, public_pick, whale_hit, public_hit into last_20_predictions - Add get_prediction_history(limit) for lightweight prediction+accuracy data - Add /api/prediction-history endpoint (default 100, max 500) - Add Whale and Public columns with HIT/MISS to Last 20 table in frontend
230 lines
9.5 KiB
Python
230 lines
9.5 KiB
Python
"""
|
|
aiohttp web server + browser WebSocket.
|
|
Serves the dashboard and pushes real-time events to connected browsers.
|
|
All blocking DB calls run in a thread executor to avoid blocking the event loop.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from functools import partial
|
|
|
|
from aiohttp import web
|
|
|
|
from . import config
|
|
from . import db
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
STATIC_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "static")
|
|
|
|
|
|
async def _run_sync(fn, *args):
|
|
"""Run a blocking function in the default thread executor."""
|
|
loop = asyncio.get_running_loop()
|
|
return await loop.run_in_executor(None, partial(fn, *args))
|
|
|
|
|
|
class WebServer:
|
|
def __init__(self):
|
|
self.app = web.Application()
|
|
self.clients: set[web.WebSocketResponse] = set()
|
|
self._setup_routes()
|
|
|
|
def _setup_routes(self):
|
|
self.app.router.add_get("/", self._handle_index)
|
|
self.app.router.add_get("/api/history", self._handle_history)
|
|
self.app.router.add_get("/api/leaderboard", self._handle_leaderboard)
|
|
self.app.router.add_get("/api/stats", self._handle_stats)
|
|
self.app.router.add_get("/api/user/{user_id}", self._handle_user)
|
|
self.app.router.add_get("/api/hot-cold", self._handle_hot_cold)
|
|
self.app.router.add_get("/analytics", self._handle_analytics_page)
|
|
self.app.router.add_get("/api/analytics", self._handle_analytics)
|
|
self.app.router.add_get("/patterns", self._handle_patterns_page)
|
|
self.app.router.add_get("/api/patterns", self._handle_patterns)
|
|
self.app.router.add_get("/predictions", self._handle_predictions_page)
|
|
self.app.router.add_get("/api/predictions", self._handle_predictions)
|
|
self.app.router.add_get("/api/prediction-history", self._handle_prediction_history)
|
|
self.app.router.add_get("/ws", self._handle_ws)
|
|
self.app.router.add_static("/static/", STATIC_DIR, name="static")
|
|
|
|
async def _handle_index(self, request: web.Request) -> web.Response:
|
|
index_path = os.path.join(STATIC_DIR, "index.html")
|
|
return web.FileResponse(index_path)
|
|
|
|
async def _handle_history(self, request: web.Request) -> web.Response:
|
|
n = int(request.query.get("n", 50))
|
|
try:
|
|
games = await _run_sync(db.get_recent_games, n)
|
|
return web.json_response(games)
|
|
except Exception as e:
|
|
log.error("History query failed: %s", e)
|
|
return web.json_response([], status=500)
|
|
|
|
async def _handle_leaderboard(self, request: web.Request) -> web.Response:
|
|
n = int(request.query.get("n", 10))
|
|
try:
|
|
leaders = await _run_sync(db.get_leaderboard, n)
|
|
return web.json_response(leaders)
|
|
except Exception as e:
|
|
log.error("Leaderboard query failed: %s", e)
|
|
return web.json_response([], status=500)
|
|
|
|
async def _handle_stats(self, request: web.Request) -> web.Response:
|
|
try:
|
|
dist = await _run_sync(db.get_win_distribution)
|
|
biggest = await _run_sync(db.get_biggest_winner)
|
|
return web.json_response({"win_distribution": dist, "biggest_winner": biggest})
|
|
except Exception as e:
|
|
log.error("Stats query failed: %s", e)
|
|
return web.json_response({}, status=500)
|
|
|
|
async def _handle_user(self, request: web.Request) -> web.Response:
|
|
uid = int(request.match_info["user_id"])
|
|
try:
|
|
detail = await _run_sync(db.get_user_detail, uid)
|
|
if detail:
|
|
return web.json_response(detail)
|
|
return web.json_response({"error": "User not found"}, status=404)
|
|
except Exception as e:
|
|
log.error("User detail query failed: %s", e)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_analytics_page(self, request: web.Request) -> web.Response:
|
|
path = os.path.join(STATIC_DIR, "analytics.html")
|
|
return web.FileResponse(path)
|
|
|
|
async def _handle_patterns_page(self, request: web.Request) -> web.Response:
|
|
path = os.path.join(STATIC_DIR, "patterns.html")
|
|
return web.FileResponse(path)
|
|
|
|
async def _handle_patterns(self, request: web.Request) -> web.Response:
|
|
try:
|
|
data = await _run_sync(db.get_pattern_analysis)
|
|
return web.json_response(data)
|
|
except Exception as e:
|
|
log.error("Pattern analysis query failed: %s", e)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_predictions_page(self, request: web.Request) -> web.Response:
|
|
path = os.path.join(STATIC_DIR, "predictions.html")
|
|
return web.FileResponse(path)
|
|
|
|
async def _handle_predictions(self, request: web.Request) -> web.Response:
|
|
try:
|
|
data = await _run_sync(db.get_prediction_analysis)
|
|
return web.json_response(data)
|
|
except Exception as e:
|
|
log.error("Prediction analysis query failed: %s", e)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_prediction_history(self, request: web.Request) -> web.Response:
|
|
limit = min(int(request.query.get("limit", 100)), 500)
|
|
try:
|
|
data = await _run_sync(db.get_prediction_history, limit)
|
|
return web.json_response(data)
|
|
except Exception as e:
|
|
log.error("Prediction history query failed: %s", e)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_analytics(self, request: web.Request) -> web.Response:
|
|
period = request.query.get("period", "all")
|
|
if period not in ("1h", "6h", "24h", "7d", "all"):
|
|
return web.json_response({"error": "Invalid period"}, status=400)
|
|
try:
|
|
data = await _run_sync(db.get_analytics, period)
|
|
return web.json_response(data)
|
|
except Exception as e:
|
|
log.error("Analytics query failed: %s", e)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_hot_cold(self, request: web.Request) -> web.Response:
|
|
try:
|
|
data = await _run_sync(db.get_hot_cold_players)
|
|
return web.json_response(data)
|
|
except Exception as e:
|
|
log.error("Hot/cold query failed: %s", e)
|
|
return web.json_response({"hot": [], "cold": []}, status=500)
|
|
|
|
async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse:
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
self.clients.add(ws)
|
|
log.info("Browser connected (%d total)", len(self.clients))
|
|
|
|
# Send initial data (all DB calls in executor)
|
|
try:
|
|
if not ws.closed:
|
|
games = await _run_sync(db.get_recent_games, 50)
|
|
await ws.send_json({"type": "history", "data": games})
|
|
if not ws.closed:
|
|
dist = await _run_sync(db.get_win_distribution)
|
|
await ws.send_json({"type": "win_distribution", "data": dist})
|
|
if not ws.closed:
|
|
leaders = await _run_sync(db.get_leaderboard, 10)
|
|
await ws.send_json({"type": "leaderboard", "data": leaders})
|
|
if not ws.closed:
|
|
biggest = await _run_sync(db.get_biggest_winner)
|
|
if biggest:
|
|
await ws.send_json({"type": "biggest_winner", "data": biggest})
|
|
if not ws.closed:
|
|
hot_cold = await _run_sync(db.get_hot_cold_players)
|
|
await ws.send_json({"type": "hot_cold", "data": hot_cold})
|
|
except Exception as e:
|
|
log.warning("Failed to send initial data: %s", e)
|
|
|
|
try:
|
|
async for msg in ws:
|
|
pass # Browser doesn't send data, just listens
|
|
finally:
|
|
self.clients.discard(ws)
|
|
log.info("Browser disconnected (%d remaining)", len(self.clients))
|
|
|
|
return ws
|
|
|
|
async def broadcast(self, event_type: str, data: dict):
|
|
"""Push event to all connected browsers."""
|
|
if not self.clients:
|
|
return
|
|
payload = json.dumps({"type": event_type, "data": data})
|
|
stale = set()
|
|
for ws in self.clients:
|
|
try:
|
|
await ws.send_str(payload)
|
|
except Exception:
|
|
stale.add(ws)
|
|
self.clients -= stale
|
|
|
|
async def push_refresh(self):
|
|
"""Compute stats in executor and push over WS — no browser HTTP needed."""
|
|
try:
|
|
dist, leaders, biggest, hot_cold = await asyncio.gather(
|
|
_run_sync(db.get_win_distribution),
|
|
_run_sync(db.get_leaderboard, 10),
|
|
_run_sync(db.get_biggest_winner),
|
|
_run_sync(db.get_hot_cold_players),
|
|
)
|
|
await self.broadcast("win_distribution", dist)
|
|
await self.broadcast("leaderboard", leaders)
|
|
if biggest:
|
|
await self.broadcast("biggest_winner", biggest)
|
|
await self.broadcast("hot_cold", hot_cold)
|
|
except Exception as e:
|
|
log.warning("push_refresh failed: %s", e)
|
|
|
|
async def run(self):
|
|
runner = web.AppRunner(self.app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, "0.0.0.0", config.WEB_PORT)
|
|
await site.start()
|
|
log.info("Web server listening on http://0.0.0.0:%s", config.WEB_PORT)
|
|
# Keep running until cancelled
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(3600)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
await runner.cleanup()
|