""" health.py Bot health monitoring. Tracks task execution status, WebSocket connectivity, and game processing metrics. Writes periodic heartbeat to STORAGE/bot_health.json. """ # Standard Library Imports import json import logging import time from collections import deque from pathlib import Path # Third-Party Library Imports import aiofiles import aiosqlite # Local Module Imports from .utils import STORAGE_DIR, SQ_BATTLES_DB_PATH, get_bot HEALTH_PATH = STORAGE_DIR / "bot_health.json" # Rolling window for games-processed counters _games_timestamps: deque[float] = deque() _health_state: dict = { "bot_started_at": None, "guild_count": 0, "last_heartbeat": 0, "tasks": {}, "websocket": {}, "games_processed_1h": 0, "games_processed_24h": 0, } def init_health(started_at: float, guild_count: int) -> None: """Initialize health state on bot startup.""" _health_state["bot_started_at"] = started_at _health_state["guild_count"] = guild_count async def record_task_run(task_name: str, success: bool, error: str = "") -> None: """Record a task execution result.""" entry = _health_state["tasks"].setdefault(task_name, { "status": "unknown", "last_run": 0, "run_count": 0, "error_count": 0, "last_error": "", }) entry["last_run"] = time.time() entry["run_count"] += 1 if success: entry["status"] = "ok" else: entry["status"] = "error" entry["error_count"] += 1 entry["last_error"] = str(error)[:200] async def record_ws_message(ws_name: str) -> None: """Record a WebSocket message receipt.""" entry = _health_state["websocket"].setdefault(ws_name, { "connected": True, "last_message_at": 0, "messages_processed": 0, }) entry["connected"] = True entry["last_message_at"] = time.time() entry["messages_processed"] += 1 def record_ws_disconnect(ws_name: str) -> None: """Mark a WebSocket as disconnected.""" entry = _health_state["websocket"].get(ws_name) if entry: entry["connected"] = False def record_game_processed() -> None: """Record that a game was processed (for hourly/daily counters).""" _games_timestamps.append(time.time()) def _prune_games_window() -> tuple[int, int]: """Count games in the last 1h and 24h, pruning old entries.""" now = time.time() cutoff_24h = now - 86400 while _games_timestamps and _games_timestamps[0] < cutoff_24h: _games_timestamps.popleft() cutoff_1h = now - 3600 count_1h = sum(1 for ts in _games_timestamps if ts >= cutoff_1h) return count_1h, len(_games_timestamps) async def write_heartbeat() -> None: """Dump current health state to HEALTH_PATH as JSON.""" try: bot = get_bot() _health_state["guild_count"] = len(bot.guilds) except Exception: pass _health_state["last_heartbeat"] = time.time() games_1h, games_24h = _prune_games_window() _health_state["games_processed_1h"] = games_1h _health_state["games_processed_24h"] = games_24h try: HEALTH_PATH.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(HEALTH_PATH, "w", encoding="utf-8") as f: await f.write(json.dumps(_health_state, indent=2, default=str)) except Exception as e: logging.error(f"[HEALTH] Failed to write heartbeat: {e}") async def get_health_snapshot() -> dict: """Return current health state dict (live, not from file).""" try: bot = get_bot() _health_state["guild_count"] = len(bot.guilds) except Exception: pass _health_state["last_heartbeat"] = time.time() games_1h, games_24h = _prune_games_window() _health_state["games_processed_1h"] = games_1h _health_state["games_processed_24h"] = games_24h return dict(_health_state) async def get_recent_ttl_stats(limit: int = 30) -> dict: """Return receive-delay stats for the most recent completed games.""" async with aiosqlite.connect(SQ_BATTLES_DB_PATH, timeout=10.0) as db: rows = list(await db.execute_fetchall( "SELECT endtime_unix, received_unix FROM match_summary " "WHERE received_unix IS NOT NULL AND endtime_unix IS NOT NULL " "ORDER BY endtime_unix DESC LIMIT ?", (limit,), )) if not rows: return { "sample_size": 0, "avg_delay": None, "min_delay": None, "max_delay": None, "last_received_ts": None, } delays = [max(int(received) - int(ended), 0) for ended, received in rows] return { "sample_size": len(delays), "avg_delay": int(sum(delays) / len(delays)), "min_delay": min(delays), "max_delay": max(delays), "last_received_ts": max(int(received) for _, received in rows), }