whoops (#1271)
This commit is contained in:
+3
-5
@@ -2054,11 +2054,9 @@ async def process_match_summaries(new_games: List[Dict[str, Any]]) -> None:
|
|||||||
# Load replay JSON
|
# Load replay JSON
|
||||||
replay_path = replay_data_path(session_id)
|
replay_path = replay_data_path(session_id)
|
||||||
try:
|
try:
|
||||||
async with aiofiles.open(replay_path, "r", encoding="utf-8") as f:
|
raw = await asyncio.to_thread(replay_path.read_bytes)
|
||||||
replay_data = json.loads(await f.read())
|
replay_data = json.loads(gzip.decompress(raw))
|
||||||
except FileNotFoundError:
|
except (FileNotFoundError, OSError, json.JSONDecodeError):
|
||||||
continue
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Translate vehicle names & sanitize
|
# Translate vehicle names & sanitize
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
@@ -205,6 +206,7 @@ async def publish_replay_batch(replays: list[dict[str, Any]]) -> None:
|
|||||||
"type": "spectra.replay_batch",
|
"type": "spectra.replay_batch",
|
||||||
"version": 1,
|
"version": 1,
|
||||||
"source": "srebot",
|
"source": "srebot",
|
||||||
|
"sent_at": time.time(),
|
||||||
"payload": {"replays": replays},
|
"payload": {"replays": replays},
|
||||||
}
|
}
|
||||||
await _append_external_envelope(envelope)
|
await _append_external_envelope(envelope)
|
||||||
@@ -216,6 +218,7 @@ async def publish_event(event_type: str, payload: dict[str, Any]) -> None:
|
|||||||
"type": event_type,
|
"type": event_type,
|
||||||
"version": 1,
|
"version": 1,
|
||||||
"source": "srebot",
|
"source": "srebot",
|
||||||
|
"sent_at": time.time(),
|
||||||
"payload": payload,
|
"payload": payload,
|
||||||
}
|
}
|
||||||
await _append_external_envelope(envelope)
|
await _append_external_envelope(envelope)
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ from pathlib import Path
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
import zstandard as zstd
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
@@ -73,6 +74,8 @@ HOP_BY_HOP_HEADERS = {
|
|||||||
CONNECTED_WEBSOCKETS: set[web.WebSocketResponse] = set()
|
CONNECTED_WEBSOCKETS: set[web.WebSocketResponse] = set()
|
||||||
CONNECTED_LOCK = asyncio.Lock()
|
CONNECTED_LOCK = asyncio.Lock()
|
||||||
|
|
||||||
|
_compressor = zstd.ZstdCompressor(level=3)
|
||||||
|
|
||||||
|
|
||||||
def _auth_ok(request: web.Request) -> bool:
|
def _auth_ok(request: web.Request) -> bool:
|
||||||
if not SETTINGS.bearer_token:
|
if not SETTINGS.bearer_token:
|
||||||
@@ -201,7 +204,8 @@ async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
|
|||||||
|
|
||||||
|
|
||||||
async def _broadcast(envelope: dict[str, Any]) -> None:
|
async def _broadcast(envelope: dict[str, Any]) -> None:
|
||||||
serialized = json.dumps(envelope, ensure_ascii=False, separators=(",", ":"))
|
raw = json.dumps(envelope, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
|
||||||
|
payload = _compressor.compress(raw)
|
||||||
async with CONNECTED_LOCK:
|
async with CONNECTED_LOCK:
|
||||||
targets = list(CONNECTED_WEBSOCKETS)
|
targets = list(CONNECTED_WEBSOCKETS)
|
||||||
|
|
||||||
@@ -215,7 +219,7 @@ async def _broadcast(envelope: dict[str, Any]) -> None:
|
|||||||
dead: list[web.WebSocketResponse] = []
|
dead: list[web.WebSocketResponse] = []
|
||||||
for ws in targets:
|
for ws in targets:
|
||||||
try:
|
try:
|
||||||
await ws.send_str(serialized)
|
await ws.send_bytes(payload)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Failed to send websocket envelope",
|
"Failed to send websocket envelope",
|
||||||
@@ -233,6 +237,8 @@ async def _broadcast(envelope: dict[str, Any]) -> None:
|
|||||||
extra={
|
extra={
|
||||||
"event_type": envelope.get("type"),
|
"event_type": envelope.get("type"),
|
||||||
"clients": len(targets) - len(dead),
|
"clients": len(targets) - len(dead),
|
||||||
|
"raw_bytes": len(raw),
|
||||||
|
"compressed_bytes": len(payload),
|
||||||
"payload_keys": list((envelope.get("payload") or {}).keys())[:8],
|
"payload_keys": list((envelope.get("payload") or {}).keys())[:8],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -0,0 +1,160 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Backfill match_summary for replay files that were saved as .json.gz but never
|
||||||
|
written to the DB — caused by process_match_summaries opening .gz files as
|
||||||
|
plain text (fixed in the same PR as this script).
|
||||||
|
|
||||||
|
Scans STORAGE/REPLAYS/SRE/ for all replay_data.json.gz files, skips any
|
||||||
|
session_id already present in match_summary, then calls process_match_summaries
|
||||||
|
for the remainder. Dedup is also enforced by the table's PRIMARY KEY so
|
||||||
|
re-running this script is safe.
|
||||||
|
|
||||||
|
Usage (from SREBOT root):
|
||||||
|
python scripts/backfill_match_summary.py [--dry-run]
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# ── Bootstrap: make BOT importable ──────────────────────────────────────────
|
||||||
|
_repo_root = Path(__file__).resolve().parents[1]
|
||||||
|
sys.path.insert(0, str(_repo_root))
|
||||||
|
|
||||||
|
try:
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(_repo_root / ".env")
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
from BOT.utils import REPLAYS_DIR, SQ_BATTLES_DB_PATH
|
||||||
|
from BOT.autologging import process_match_summaries
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="[%(asctime)s] [%(levelname)s] %(message)s",
|
||||||
|
)
|
||||||
|
log = logging.getLogger("backfill")
|
||||||
|
|
||||||
|
|
||||||
|
def _already_in_db(db_path: Path) -> set[str]:
|
||||||
|
"""Return the set of session_ids already present in match_summary."""
|
||||||
|
if not db_path.exists():
|
||||||
|
return set()
|
||||||
|
with sqlite3.connect(db_path) as con:
|
||||||
|
rows = con.execute("SELECT session_id FROM match_summary").fetchall()
|
||||||
|
return {r[0] for r in rows}
|
||||||
|
|
||||||
|
|
||||||
|
def _load_replay(gz_path: Path) -> dict | None:
|
||||||
|
try:
|
||||||
|
raw = gz_path.read_bytes()
|
||||||
|
return json.loads(gzip.decompress(raw))
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("Failed to read %s: %s", gz_path, e)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _game_dict_from_replay(replay: dict, gz_path: Path) -> dict | None:
|
||||||
|
session_id = replay.get("session_id_hex", "")
|
||||||
|
if not session_id:
|
||||||
|
# Fall back to directory name (which IS the hex id)
|
||||||
|
session_id = gz_path.parent.name
|
||||||
|
if not session_id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
end_time = int(replay.get("end_ts") or replay.get("timestamp") or 0)
|
||||||
|
if not end_time:
|
||||||
|
# Use file mtime as a last resort
|
||||||
|
end_time = int(gz_path.stat().st_mtime)
|
||||||
|
|
||||||
|
mission_name = str(replay.get("map") or "")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"sessionIdHex": session_id,
|
||||||
|
"endTime": end_time,
|
||||||
|
"missionName": mission_name,
|
||||||
|
"receivedTime": end_time,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def main(dry_run: bool) -> None:
|
||||||
|
if not REPLAYS_DIR.exists():
|
||||||
|
log.error("REPLAYS_DIR does not exist: %s", REPLAYS_DIR)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
already = _already_in_db(SQ_BATTLES_DB_PATH)
|
||||||
|
log.info("match_summary already has %d entries", len(already))
|
||||||
|
|
||||||
|
gz_files = sorted(REPLAYS_DIR.glob("*/replay_data.json.gz"))
|
||||||
|
log.info("Found %d replay files in %s", len(gz_files), REPLAYS_DIR)
|
||||||
|
|
||||||
|
pending: list[dict] = []
|
||||||
|
skipped = 0
|
||||||
|
unreadable = 0
|
||||||
|
|
||||||
|
for gz_path in gz_files:
|
||||||
|
session_id = gz_path.parent.name
|
||||||
|
|
||||||
|
if session_id in already:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
replay = _load_replay(gz_path)
|
||||||
|
if replay is None:
|
||||||
|
unreadable += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
game = _game_dict_from_replay(replay, gz_path)
|
||||||
|
if game is None:
|
||||||
|
log.warning("Could not extract game dict from %s", gz_path)
|
||||||
|
unreadable += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
pending.append(game)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Summary: %d to backfill | %d already in DB | %d unreadable",
|
||||||
|
len(pending), skipped, unreadable,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not pending:
|
||||||
|
log.info("Nothing to do.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
log.info("[DRY RUN] Would backfill %d matches — not writing.", len(pending))
|
||||||
|
for g in pending[:10]:
|
||||||
|
log.info(" %s endTime=%s map=%r", g["sessionIdHex"], g["endTime"], g["missionName"])
|
||||||
|
if len(pending) > 10:
|
||||||
|
log.info(" ... and %d more", len(pending) - 10)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Process in batches of 50 to keep memory flat and give progress feedback
|
||||||
|
batch_size = 50
|
||||||
|
total = len(pending)
|
||||||
|
done = 0
|
||||||
|
for i in range(0, total, batch_size):
|
||||||
|
batch = pending[i : i + batch_size]
|
||||||
|
await process_match_summaries(batch)
|
||||||
|
done += len(batch)
|
||||||
|
log.info("Backfilled %d / %d", done, total)
|
||||||
|
|
||||||
|
log.info("Done. %d match_summary rows written.", total)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description=__doc__)
|
||||||
|
parser.add_argument(
|
||||||
|
"--dry-run", action="store_true",
|
||||||
|
help="Scan and report what would be backfilled without writing anything."
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
asyncio.run(main(dry_run=args.dry_run))
|
||||||
Reference in New Issue
Block a user