#!/usr/bin/env python3 """ Backfill match_summary for replay files that were saved as .json.gz but never written to the DB — caused by process_match_summaries opening .gz files as plain text (fixed in the same PR as this script). Scans STORAGE/REPLAYS/SRE/ for all replay_data.json.gz files, skips any session_id already present in match_summary, then calls process_match_summaries for the remainder. Dedup is also enforced by the table's PRIMARY KEY so re-running this script is safe. Usage (from SREBOT root): python scripts/backfill_match_summary.py [--dry-run] """ from __future__ import annotations import argparse import asyncio import gzip import json import logging import os import sqlite3 import sys from pathlib import Path # ── Bootstrap: make BOT importable ────────────────────────────────────────── _repo_root = Path(__file__).resolve().parents[1] sys.path.insert(0, str(_repo_root)) try: from dotenv import load_dotenv load_dotenv(_repo_root / ".env") except ImportError: pass from BOT.utils import REPLAYS_DIR, SQ_BATTLES_DB_PATH from BOT.autologging import process_match_summaries logging.basicConfig( level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s", ) log = logging.getLogger("backfill") def _already_in_db(db_path: Path) -> set[str]: """Return the set of session_ids already present in match_summary.""" if not db_path.exists(): return set() with sqlite3.connect(db_path) as con: rows = con.execute("SELECT session_id FROM match_summary").fetchall() return {r[0] for r in rows} def _load_replay(gz_path: Path) -> dict | None: try: raw = gz_path.read_bytes() return json.loads(gzip.decompress(raw)) except Exception as e: log.warning("Failed to read %s: %s", gz_path, e) return None def _game_dict_from_replay(replay: dict, gz_path: Path) -> dict | None: session_id = replay.get("session_id_hex", "") if not session_id: # Fall back to directory name (which IS the hex id) session_id = gz_path.parent.name if not session_id: return None end_time = int(replay.get("end_ts") or replay.get("timestamp") or 0) if not end_time: # Use file mtime as a last resort end_time = int(gz_path.stat().st_mtime) mission_name = str(replay.get("map") or "") return { "sessionIdHex": session_id, "endTime": end_time, "missionName": mission_name, "receivedTime": end_time, } async def main(dry_run: bool) -> None: if not REPLAYS_DIR.exists(): log.error("REPLAYS_DIR does not exist: %s", REPLAYS_DIR) sys.exit(1) already = _already_in_db(SQ_BATTLES_DB_PATH) log.info("match_summary already has %d entries", len(already)) gz_files = sorted(REPLAYS_DIR.glob("*/replay_data.json.gz")) log.info("Found %d replay files in %s", len(gz_files), REPLAYS_DIR) # Session ID is the directory name — no file reads needed to find missing ones missing_paths = [p for p in gz_files if p.parent.name not in already] skipped = len(gz_files) - len(missing_paths) log.info( "Summary: %d to backfill | %d already in DB", len(missing_paths), skipped, ) if not missing_paths: log.info("Nothing to do.") return if dry_run: log.info("[DRY RUN] Would backfill %d matches — not writing.", len(missing_paths)) for p in missing_paths[:10]: log.info(" %s", p.parent.name) if len(missing_paths) > 10: log.info(" ... and %d more", len(missing_paths) - 10) return # Read files and write in batches of 50 batch_size = 50 total = len(missing_paths) done = 0 unreadable = 0 for i in range(0, total, batch_size): batch_paths = missing_paths[i : i + batch_size] batch: list[dict] = [] for gz_path in batch_paths: replay = _load_replay(gz_path) if replay is None: unreadable += 1 continue game = _game_dict_from_replay(replay, gz_path) if game is None: unreadable += 1 continue batch.append(game) if batch: await process_match_summaries(batch) done += len(batch) log.info("Backfilled %d / %d", done, total) log.info("Done. %d written, %d unreadable.", done, unreadable) if __name__ == "__main__": parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--dry-run", action="store_true", help="Scan and report what would be backfilled without writing anything." ) args = parser.parse_args() asyncio.run(main(dry_run=args.dry_run))