""" health.py Bot health monitoring. Tracks task execution status, WebSocket connectivity, and game processing metrics. Writes periodic heartbeat to STORAGE/bot_health.json. """ # Standard Library Imports import json import logging import time from collections import deque from pathlib import Path # Third-Party Library Imports import aiofiles # Local Module Imports from .utils import STORAGE_DIR, get_bot HEALTH_PATH = STORAGE_DIR / "bot_health.json" # Rolling window for games-processed counters _games_timestamps: deque[float] = deque() _health_state: dict = { "bot_started_at": None, "guild_count": 0, "last_heartbeat": 0, "tasks": {}, "websocket": {}, "games_processed_1h": 0, "games_processed_24h": 0, } def init_health(started_at: float, guild_count: int) -> None: """Initialize health state on bot startup.""" _health_state["bot_started_at"] = started_at _health_state["guild_count"] = guild_count async def record_task_run(task_name: str, success: bool, error: str = "") -> None: """Record a task execution result.""" entry = _health_state["tasks"].setdefault(task_name, { "status": "unknown", "last_run": 0, "run_count": 0, "error_count": 0, "last_error": "", }) entry["last_run"] = time.time() entry["run_count"] += 1 if success: entry["status"] = "ok" else: entry["status"] = "error" entry["error_count"] += 1 entry["last_error"] = str(error)[:200] async def record_ws_message(ws_name: str) -> None: """Record a WebSocket message receipt.""" entry = _health_state["websocket"].setdefault(ws_name, { "connected": True, "last_message_at": 0, "messages_processed": 0, }) entry["connected"] = True entry["last_message_at"] = time.time() entry["messages_processed"] += 1 def record_ws_disconnect(ws_name: str) -> None: """Mark a WebSocket as disconnected.""" entry = _health_state["websocket"].get(ws_name) if entry: entry["connected"] = False def record_game_processed() -> None: """Record that a game was processed (for hourly/daily counters).""" _games_timestamps.append(time.time()) def _prune_games_window() -> tuple[int, int]: """Count games in the last 1h and 24h, pruning old entries.""" now = time.time() cutoff_24h = now - 86400 while _games_timestamps and _games_timestamps[0] < cutoff_24h: _games_timestamps.popleft() cutoff_1h = now - 3600 count_1h = sum(1 for ts in _games_timestamps if ts >= cutoff_1h) return count_1h, len(_games_timestamps) async def write_heartbeat() -> None: """Dump current health state to HEALTH_PATH as JSON.""" try: bot = get_bot() _health_state["guild_count"] = len(bot.guilds) except Exception: pass _health_state["last_heartbeat"] = time.time() games_1h, games_24h = _prune_games_window() _health_state["games_processed_1h"] = games_1h _health_state["games_processed_24h"] = games_24h try: HEALTH_PATH.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(HEALTH_PATH, "w", encoding="utf-8") as f: await f.write(json.dumps(_health_state, indent=2, default=str)) except Exception as e: logging.error(f"[HEALTH] Failed to write heartbeat: {e}") async def get_health_snapshot() -> dict: """Return current health state dict (live, not from file).""" try: bot = get_bot() _health_state["guild_count"] = len(bot.guilds) except Exception: pass _health_state["last_heartbeat"] = time.time() games_1h, games_24h = _prune_games_window() _health_state["games_processed_1h"] = games_1h _health_state["games_processed_24h"] = games_24h return dict(_health_state)