From 0c99a911e0be29199336ff0682d7dc6c9a72e990 Mon Sep 17 00:00:00 2001 From: NotSoToothless <67082114+FURRO404@users.noreply.github.com> Date: Wed, 27 May 2026 04:29:57 -0700 Subject: [PATCH] whoops (#1271) --- BOT/autologging.py | 8 +- BOT/receiver_bridge.py | 3 + BOT/srebot_external.py | 10 +- scripts/backfill_match_summary.py | 160 ++++++++++++++++++++++++++++++ 4 files changed, 174 insertions(+), 7 deletions(-) create mode 100644 scripts/backfill_match_summary.py diff --git a/BOT/autologging.py b/BOT/autologging.py index 29f3eb2..9313de8 100644 --- a/BOT/autologging.py +++ b/BOT/autologging.py @@ -2054,11 +2054,9 @@ async def process_match_summaries(new_games: List[Dict[str, Any]]) -> None: # Load replay JSON replay_path = replay_data_path(session_id) try: - async with aiofiles.open(replay_path, "r", encoding="utf-8") as f: - replay_data = json.loads(await f.read()) - except FileNotFoundError: - continue - except json.JSONDecodeError: + raw = await asyncio.to_thread(replay_path.read_bytes) + replay_data = json.loads(gzip.decompress(raw)) + except (FileNotFoundError, OSError, json.JSONDecodeError): continue # Translate vehicle names & sanitize diff --git a/BOT/receiver_bridge.py b/BOT/receiver_bridge.py index 3d6996a..3cfc7e5 100644 --- a/BOT/receiver_bridge.py +++ b/BOT/receiver_bridge.py @@ -16,6 +16,7 @@ import asyncio import json import logging import os +import time from dataclasses import dataclass from pathlib import Path from urllib.parse import quote @@ -205,6 +206,7 @@ async def publish_replay_batch(replays: list[dict[str, Any]]) -> None: "type": "spectra.replay_batch", "version": 1, "source": "srebot", + "sent_at": time.time(), "payload": {"replays": replays}, } await _append_external_envelope(envelope) @@ -216,6 +218,7 @@ async def publish_event(event_type: str, payload: dict[str, Any]) -> None: "type": event_type, "version": 1, "source": "srebot", + "sent_at": time.time(), "payload": payload, } await _append_external_envelope(envelope) diff --git a/BOT/srebot_external.py b/BOT/srebot_external.py index 07e42b0..378d620 100644 --- a/BOT/srebot_external.py +++ b/BOT/srebot_external.py @@ -20,6 +20,7 @@ from pathlib import Path from typing import Any import aiohttp +import zstandard as zstd from aiohttp import web from dotenv import load_dotenv @@ -73,6 +74,8 @@ HOP_BY_HOP_HEADERS = { CONNECTED_WEBSOCKETS: set[web.WebSocketResponse] = set() CONNECTED_LOCK = asyncio.Lock() +_compressor = zstd.ZstdCompressor(level=3) + def _auth_ok(request: web.Request) -> bool: if not SETTINGS.bearer_token: @@ -201,7 +204,8 @@ async def websocket_handler(request: web.Request) -> web.WebSocketResponse: async def _broadcast(envelope: dict[str, Any]) -> None: - serialized = json.dumps(envelope, ensure_ascii=False, separators=(",", ":")) + raw = json.dumps(envelope, ensure_ascii=False, separators=(",", ":")).encode("utf-8") + payload = _compressor.compress(raw) async with CONNECTED_LOCK: targets = list(CONNECTED_WEBSOCKETS) @@ -215,7 +219,7 @@ async def _broadcast(envelope: dict[str, Any]) -> None: dead: list[web.WebSocketResponse] = [] for ws in targets: try: - await ws.send_str(serialized) + await ws.send_bytes(payload) except Exception as exc: logger.warning( "Failed to send websocket envelope", @@ -233,6 +237,8 @@ async def _broadcast(envelope: dict[str, Any]) -> None: extra={ "event_type": envelope.get("type"), "clients": len(targets) - len(dead), + "raw_bytes": len(raw), + "compressed_bytes": len(payload), "payload_keys": list((envelope.get("payload") or {}).keys())[:8], }, ) diff --git a/scripts/backfill_match_summary.py b/scripts/backfill_match_summary.py new file mode 100644 index 0000000..6886848 --- /dev/null +++ b/scripts/backfill_match_summary.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +""" +Backfill match_summary for replay files that were saved as .json.gz but never +written to the DB — caused by process_match_summaries opening .gz files as +plain text (fixed in the same PR as this script). + +Scans STORAGE/REPLAYS/SRE/ for all replay_data.json.gz files, skips any +session_id already present in match_summary, then calls process_match_summaries +for the remainder. Dedup is also enforced by the table's PRIMARY KEY so +re-running this script is safe. + +Usage (from SREBOT root): + python scripts/backfill_match_summary.py [--dry-run] +""" +from __future__ import annotations + +import argparse +import asyncio +import gzip +import json +import logging +import os +import sqlite3 +import sys +from pathlib import Path + +# ── Bootstrap: make BOT importable ────────────────────────────────────────── +_repo_root = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_repo_root)) + +try: + from dotenv import load_dotenv + load_dotenv(_repo_root / ".env") +except ImportError: + pass + +from BOT.utils import REPLAYS_DIR, SQ_BATTLES_DB_PATH +from BOT.autologging import process_match_summaries + +logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s] [%(levelname)s] %(message)s", +) +log = logging.getLogger("backfill") + + +def _already_in_db(db_path: Path) -> set[str]: + """Return the set of session_ids already present in match_summary.""" + if not db_path.exists(): + return set() + with sqlite3.connect(db_path) as con: + rows = con.execute("SELECT session_id FROM match_summary").fetchall() + return {r[0] for r in rows} + + +def _load_replay(gz_path: Path) -> dict | None: + try: + raw = gz_path.read_bytes() + return json.loads(gzip.decompress(raw)) + except Exception as e: + log.warning("Failed to read %s: %s", gz_path, e) + return None + + +def _game_dict_from_replay(replay: dict, gz_path: Path) -> dict | None: + session_id = replay.get("session_id_hex", "") + if not session_id: + # Fall back to directory name (which IS the hex id) + session_id = gz_path.parent.name + if not session_id: + return None + + end_time = int(replay.get("end_ts") or replay.get("timestamp") or 0) + if not end_time: + # Use file mtime as a last resort + end_time = int(gz_path.stat().st_mtime) + + mission_name = str(replay.get("map") or "") + + return { + "sessionIdHex": session_id, + "endTime": end_time, + "missionName": mission_name, + "receivedTime": end_time, + } + + +async def main(dry_run: bool) -> None: + if not REPLAYS_DIR.exists(): + log.error("REPLAYS_DIR does not exist: %s", REPLAYS_DIR) + sys.exit(1) + + already = _already_in_db(SQ_BATTLES_DB_PATH) + log.info("match_summary already has %d entries", len(already)) + + gz_files = sorted(REPLAYS_DIR.glob("*/replay_data.json.gz")) + log.info("Found %d replay files in %s", len(gz_files), REPLAYS_DIR) + + pending: list[dict] = [] + skipped = 0 + unreadable = 0 + + for gz_path in gz_files: + session_id = gz_path.parent.name + + if session_id in already: + skipped += 1 + continue + + replay = _load_replay(gz_path) + if replay is None: + unreadable += 1 + continue + + game = _game_dict_from_replay(replay, gz_path) + if game is None: + log.warning("Could not extract game dict from %s", gz_path) + unreadable += 1 + continue + + pending.append(game) + + log.info( + "Summary: %d to backfill | %d already in DB | %d unreadable", + len(pending), skipped, unreadable, + ) + + if not pending: + log.info("Nothing to do.") + return + + if dry_run: + log.info("[DRY RUN] Would backfill %d matches — not writing.", len(pending)) + for g in pending[:10]: + log.info(" %s endTime=%s map=%r", g["sessionIdHex"], g["endTime"], g["missionName"]) + if len(pending) > 10: + log.info(" ... and %d more", len(pending) - 10) + return + + # Process in batches of 50 to keep memory flat and give progress feedback + batch_size = 50 + total = len(pending) + done = 0 + for i in range(0, total, batch_size): + batch = pending[i : i + batch_size] + await process_match_summaries(batch) + done += len(batch) + log.info("Backfilled %d / %d", done, total) + + log.info("Done. %d match_summary rows written.", total) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--dry-run", action="store_true", + help="Scan and report what would be backfilled without writing anything." + ) + args = parser.parse_args() + asyncio.run(main(dry_run=args.dry_run))