Files
SREBOT/BOT/health.py
T
FURRO404 2b399fdb81 add SREBOT, SHARED, TSSBOT contents (fixup for #1223)
PR #1223 only staged the deletions of the old paths because the new
top-level directories were still untracked when the commit was authored.
This commit adds the actual restructured tree: SREBOT/ (existing bot),
SHARED/ (vromfs, data_parser, ICONS/MAPS/FONTS, DAGOR_FILES,
update_game_files), and TSSBOT/ (skeleton).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 23:17:02 -07:00

133 lines
3.8 KiB
Python

"""
health.py
Bot health monitoring. Tracks task execution status, WebSocket connectivity,
and game processing metrics. Writes periodic heartbeat to STORAGE/bot_health.json.
"""
# Standard Library Imports
import json
import logging
import time
from collections import deque
from pathlib import Path
# Third-Party Library Imports
import aiofiles
# Local Module Imports
from .utils import STORAGE_DIR, get_bot
HEALTH_PATH = STORAGE_DIR / "bot_health.json"
# Rolling window for games-processed counters
_games_timestamps: deque[float] = deque()
_health_state: dict = {
"bot_started_at": None,
"guild_count": 0,
"last_heartbeat": 0,
"tasks": {},
"websocket": {},
"games_processed_1h": 0,
"games_processed_24h": 0,
}
def init_health(started_at: float, guild_count: int) -> None:
"""Initialize health state on bot startup."""
_health_state["bot_started_at"] = started_at
_health_state["guild_count"] = guild_count
async def record_task_run(task_name: str, success: bool, error: str = "") -> None:
"""Record a task execution result."""
entry = _health_state["tasks"].setdefault(task_name, {
"status": "unknown",
"last_run": 0,
"run_count": 0,
"error_count": 0,
"last_error": "",
})
entry["last_run"] = time.time()
entry["run_count"] += 1
if success:
entry["status"] = "ok"
else:
entry["status"] = "error"
entry["error_count"] += 1
entry["last_error"] = str(error)[:200]
async def record_ws_message(ws_name: str) -> None:
"""Record a WebSocket message receipt."""
entry = _health_state["websocket"].setdefault(ws_name, {
"connected": True,
"last_message_at": 0,
"messages_processed": 0,
})
entry["connected"] = True
entry["last_message_at"] = time.time()
entry["messages_processed"] += 1
def record_ws_disconnect(ws_name: str) -> None:
"""Mark a WebSocket as disconnected."""
entry = _health_state["websocket"].get(ws_name)
if entry:
entry["connected"] = False
def record_game_processed() -> None:
"""Record that a game was processed (for hourly/daily counters)."""
_games_timestamps.append(time.time())
def _prune_games_window() -> tuple[int, int]:
"""Count games in the last 1h and 24h, pruning old entries."""
now = time.time()
cutoff_24h = now - 86400
while _games_timestamps and _games_timestamps[0] < cutoff_24h:
_games_timestamps.popleft()
cutoff_1h = now - 3600
count_1h = sum(1 for ts in _games_timestamps if ts >= cutoff_1h)
return count_1h, len(_games_timestamps)
async def write_heartbeat() -> None:
"""Dump current health state to HEALTH_PATH as JSON."""
try:
bot = get_bot()
_health_state["guild_count"] = len(bot.guilds)
except Exception:
pass
_health_state["last_heartbeat"] = time.time()
games_1h, games_24h = _prune_games_window()
_health_state["games_processed_1h"] = games_1h
_health_state["games_processed_24h"] = games_24h
try:
HEALTH_PATH.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(HEALTH_PATH, "w", encoding="utf-8") as f:
await f.write(json.dumps(_health_state, indent=2, default=str))
except Exception as e:
logging.error(f"[HEALTH] Failed to write heartbeat: {e}")
async def get_health_snapshot() -> dict:
"""Return current health state dict (live, not from file)."""
try:
bot = get_bot()
_health_state["guild_count"] = len(bot.guilds)
except Exception:
pass
_health_state["last_heartbeat"] = time.time()
games_1h, games_24h = _prune_games_window()
_health_state["games_processed_1h"] = games_1h
_health_state["games_processed_24h"] = games_24h
return dict(_health_state)