Files
2026-05-17 12:11:11 +01:00

163 lines
4.8 KiB
Python

"""
health.py
Bot health monitoring. Tracks task execution status, WebSocket connectivity,
and game processing metrics. Writes periodic heartbeat to STORAGE/bot_health.json.
"""
# Standard Library Imports
import json
import logging
import time
from collections import deque
from pathlib import Path
# Third-Party Library Imports
import aiofiles
import aiosqlite
# Local Module Imports
from .utils import STORAGE_DIR, SQ_BATTLES_DB_PATH, get_bot
HEALTH_PATH = STORAGE_DIR / "bot_health.json"
# Rolling window for games-processed counters
_games_timestamps: deque[float] = deque()
_health_state: dict = {
"bot_started_at": None,
"guild_count": 0,
"last_heartbeat": 0,
"tasks": {},
"websocket": {},
"games_processed_1h": 0,
"games_processed_24h": 0,
}
def init_health(started_at: float, guild_count: int) -> None:
"""Initialize health state on bot startup."""
_health_state["bot_started_at"] = started_at
_health_state["guild_count"] = guild_count
async def record_task_run(task_name: str, success: bool, error: str = "") -> None:
"""Record a task execution result."""
entry = _health_state["tasks"].setdefault(task_name, {
"status": "unknown",
"last_run": 0,
"run_count": 0,
"error_count": 0,
"last_error": "",
})
entry["last_run"] = time.time()
entry["run_count"] += 1
if success:
entry["status"] = "ok"
else:
entry["status"] = "error"
entry["error_count"] += 1
entry["last_error"] = str(error)[:200]
async def record_ws_message(ws_name: str) -> None:
"""Record a WebSocket message receipt."""
entry = _health_state["websocket"].setdefault(ws_name, {
"connected": True,
"last_message_at": 0,
"messages_processed": 0,
})
entry["connected"] = True
entry["last_message_at"] = time.time()
entry["messages_processed"] += 1
def record_ws_disconnect(ws_name: str) -> None:
"""Mark a WebSocket as disconnected."""
entry = _health_state["websocket"].get(ws_name)
if entry:
entry["connected"] = False
def record_game_processed() -> None:
"""Record that a game was processed (for hourly/daily counters)."""
_games_timestamps.append(time.time())
def _prune_games_window() -> tuple[int, int]:
"""Count games in the last 1h and 24h, pruning old entries."""
now = time.time()
cutoff_24h = now - 86400
while _games_timestamps and _games_timestamps[0] < cutoff_24h:
_games_timestamps.popleft()
cutoff_1h = now - 3600
count_1h = sum(1 for ts in _games_timestamps if ts >= cutoff_1h)
return count_1h, len(_games_timestamps)
async def write_heartbeat() -> None:
"""Dump current health state to HEALTH_PATH as JSON."""
try:
bot = get_bot()
_health_state["guild_count"] = len(bot.guilds)
except Exception:
pass
_health_state["last_heartbeat"] = time.time()
games_1h, games_24h = _prune_games_window()
_health_state["games_processed_1h"] = games_1h
_health_state["games_processed_24h"] = games_24h
try:
HEALTH_PATH.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(HEALTH_PATH, "w", encoding="utf-8") as f:
await f.write(json.dumps(_health_state, indent=2, default=str))
except Exception as e:
logging.error(f"[HEALTH] Failed to write heartbeat: {e}")
async def get_health_snapshot() -> dict:
"""Return current health state dict (live, not from file)."""
try:
bot = get_bot()
_health_state["guild_count"] = len(bot.guilds)
except Exception:
pass
_health_state["last_heartbeat"] = time.time()
games_1h, games_24h = _prune_games_window()
_health_state["games_processed_1h"] = games_1h
_health_state["games_processed_24h"] = games_24h
return dict(_health_state)
async def get_recent_ttl_stats(limit: int = 30) -> dict:
"""Return receive-delay stats for the most recent completed games."""
async with aiosqlite.connect(SQ_BATTLES_DB_PATH, timeout=10.0) as db:
rows = list(await db.execute_fetchall(
"SELECT endtime_unix, received_unix FROM match_summary "
"WHERE received_unix IS NOT NULL AND endtime_unix IS NOT NULL "
"ORDER BY endtime_unix DESC LIMIT ?",
(limit,),
))
if not rows:
return {
"sample_size": 0,
"avg_delay": None,
"min_delay": None,
"max_delay": None,
"last_received_ts": None,
}
delays = [max(int(received) - int(ended), 0) for ended, received in rows]
return {
"sample_size": len(delays),
"avg_delay": int(sum(delays) / len(delays)),
"min_delay": min(delays),
"max_delay": max(delays),
"last_received_ts": max(int(received) for _, received in rows),
}