From f6f4e33a65ec941af214603fa85c5e910063b75c Mon Sep 17 00:00:00 2001 From: NotSoToothless <67082114+FURRO404@users.noreply.github.com> Date: Sat, 23 May 2026 17:16:53 -0700 Subject: [PATCH] update to handle new structure from spectra, no more gobs (#1266) --- BOT/autologging.py | 54 +--- BOT/lux_apis.py | 145 ++--------- BOT/receiver_bridge.py | 13 +- BOT/{gob.py => render_replay.py} | 430 +++++++++++++++++++++++++++---- BOT/srebot_external.py | 4 +- BOT/task_executors.py | 11 +- BOT/tasks.py | 30 +-- BOT/utils.py | 47 +++- requirements.txt | 1 - test_spectra.py | 17 +- web/server.js | 20 +- 11 files changed, 459 insertions(+), 313 deletions(-) rename BOT/{gob.py => render_replay.py} (90%) diff --git a/BOT/autologging.py b/BOT/autologging.py index 843cd94..d425abe 100644 --- a/BOT/autologging.py +++ b/BOT/autologging.py @@ -23,17 +23,15 @@ import aiofiles import aiohttp import aiosqlite import discord -import pygob # Local Module Imports from . import utils from data_parser import LangTableReader from .game_api import get_point_diff -from .gob import load_gob_file, render_gob +from .render_replay import load_gob_file, render_gob from .health import record_game_processed, record_ws_message -from .receiver_bridge import publish_gob_payload, publish_replay_batch +from .receiver_bridge import publish_replay_batch from .utils import t, lang_from_features -from .lux_apis import _gob_to_dict from .scoreboard import create_scoreboard from .utils import ( STORAGE_DIR, @@ -452,7 +450,7 @@ async def process_ws_replays(replays: list[dict]): if not hex_id: continue - # Skip if already processed (check for replay_data.json, not just dir — GOB ws may create the dir first) + # Skip if already processed replay_dir = replay_session_dir(hex_id) if (replay_dir / "replay_data.json").exists(): continue @@ -494,14 +492,14 @@ async def process_ws_replays(replays: list[dict]): hex_id, local_data, received_time=now_ts, - end_time=replay.get('end_ts', now_ts), + end_time=int(replay.get("end_ts") or now_ts), ) local_data["scoreboard_context"] = scoreboard_context forwarded_replays.append(local_data) validated_games.append({ "sessionIdHex": hex_id, - "endTime": replay.get('end_ts', now_ts), + "endTime": int(replay.get("end_ts") or now_ts), "missionName": local_data.get("map", ""), "receivedTime": now_ts, "scoreboard_context": scoreboard_context, @@ -1296,36 +1294,8 @@ def build_scoreboard_view(guild_id: int, session_id: str, lang: str = "en") -> d return view -async def handle_gob_message(compressed: bytes, decompressed: bytes) -> None: - """Save a received GOB replay (zstd-compressed) to disk for on-demand video generation.""" - try: - replay = pygob.load(decompressed) - d = _gob_to_dict(replay) - session_id = d.get("SessionID") - if not session_id: - return - hex_id = format(session_id, 'x') - replay_dir = replay_session_dir(hex_id) - replay_dir.mkdir(parents=True, exist_ok=True) - gob_path = replay_dir / "replay.gob" - if not gob_path.exists(): - gob_path.write_bytes(compressed) - logging.info(f"[GOB] Saved {hex_id} ({len(compressed)} bytes compressed)") - await record_ws_message("sqb_gob") - try: - await publish_gob_payload({ - "session_id": hex_id, - "payload": d, - "compressed_size": len(compressed), - }) - except Exception as bridge_error: - logging.warning(f"[BRIDGE] Failed to forward GOB payload for {hex_id}: {bridge_error}") - except Exception as e: - logging.error(f"[GOB] Save error: {e}") - - async def handle_view_video(interaction: discord.Interaction, session_id: str): - """Callback for 'View Video' - renders GOB replay to MP4, sends ephemerally.""" + """Callback for 'View Video' - renders replay JSON to MP4, sends ephemerally.""" try: try: await interaction.response.defer(thinking=True, ephemeral=True) @@ -1336,10 +1306,10 @@ async def handle_view_video(interaction: discord.Interaction, session_id: str): _lang = lang_from_features(_gf) replay_dir = replay_session_dir(session_id) - gob_path = replay_dir / "replay.gob" + replay_json_path = replay_dir / "replay_data.json" video_path = replay_dir / "replay_video.mp4" - if not gob_path.exists(): + if not replay_json_path.exists(): await interaction.followup.send( t(_lang, "autolog.replay_not_available"), ephemeral=True @@ -1356,15 +1326,15 @@ async def handle_view_video(interaction: discord.Interaction, session_id: str): return try: def _generate(): - d = load_gob_file(gob_path) + d = load_gob_file(replay_json_path) render_gob(d, video_path) - logging.info(f"GOB ({session_id}) RENDER START") + logging.info(f"REPLAY ({session_id}) RENDER START") async with _video_render_sem: await asyncio.get_event_loop().run_in_executor(None, _generate) - logging.info(f"GOB ({session_id}) RENDER END (Success)") + logging.info(f"REPLAY ({session_id}) RENDER END (Success)") except Exception as e: - logging.info(f"GOB ({session_id}) RENDER END (Fail)") + logging.info(f"REPLAY ({session_id}) RENDER END (Fail)") # Clean up broken/partial mp4 so it doesn't get cached if video_path.exists(): video_path.unlink(missing_ok=True) diff --git a/BOT/lux_apis.py b/BOT/lux_apis.py index 30f6599..b90fb2b 100644 --- a/BOT/lux_apis.py +++ b/BOT/lux_apis.py @@ -15,20 +15,10 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional # Third-Party Library Imports import aiohttp -import pygob import zstandard as zstd from dotenv import load_dotenv from websockets.asyncio.client import connect as wsconnect -# Local Module Imports -try: - from data_parser import LangTableReader - from .utils import REPLAYS_DIR -except ImportError: - LangTableReader = None # Running directly, not as module - REPLAYS_DIR = None - - # Load environment variables load_dotenv() @@ -41,31 +31,9 @@ _replay_queue: asyncio.Queue = asyncio.Queue() WS_URL = os.getenv("SPECTRA_WS_SQB_URL", "") API_KEY = os.getenv("SPECTRA_API_KEY", "") SPECTRA_API_URL = os.getenv("SPECTRA_API_URL", "") -WS_GOB_URL = os.getenv("SPECTRA_WS_GOB_URL", "") LEADERBOARD_PATH = "/v1/game/leaderboard" REPLAY_SORT_PATH = "/v1/replays/sort" -# Initialize translation reader for vehicle names -translate = LangTableReader("English") if LangTableReader else None - - -def _gob_to_dict(obj: object) -> Any: - """Recursively convert pygob namedtuples to plain dicts for JSON serialization.""" - fields = getattr(obj, '_fields', None) - if isinstance(obj, tuple) and fields is not None: - return {f: _gob_to_dict(getattr(obj, f)) for f in fields} - elif isinstance(obj, list): - return [_gob_to_dict(i) for i in obj] - elif isinstance(obj, dict): - return { - (k.decode('utf-8', errors='replace') if isinstance(k, bytes) else k): _gob_to_dict(v) - for k, v in obj.items() - } - elif isinstance(obj, bytes): - return obj.decode('utf-8', errors='replace') - return obj - - def normalize_ws_message(data: Any) -> Optional[List[Dict[str, Any]]]: """ Normalize WebSocket message to list of replay dicts. @@ -104,7 +72,9 @@ async def ws_replay_listener(callback: Callable[[List[Dict[str, Any]]], Awaitabl Args: callback: Async function to call with normalized replay data """ - headers = {'Authorization': API_KEY} + auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}" + headers = {"Authorization": auth_value} + decompressor = zstd.ZstdDecompressor() reconnect_delay = 1 # Start queue processor as background task @@ -116,12 +86,23 @@ async def ws_replay_listener(callback: Callable[[List[Dict[str, Any]]], Awaitabl logger.info(f"WebSocket connected to {label}") async for message in ws: try: - data = json.loads(message) + if isinstance(message, str): + raw = message.encode("utf-8") + else: + raw = bytes(message) + + text: str + try: + text = decompressor.decompress(raw, max_output_size=64 * 1024 * 1024).decode("utf-8") + except zstd.ZstdError: + text = raw.decode("utf-8") + + data = json.loads(text) replays = normalize_ws_message(data) if replays: await _replay_queue.put(replays) except json.JSONDecodeError: - logger.warning(f"Invalid JSON from WS: {message[:100]}") + logger.warning("Invalid JSON from WS frame") except Exception as e: logger.error(f"Error processing WS message: {e}") @@ -326,98 +307,6 @@ async def test_fetch_replay_by_id(): print("No data returned.") -async def ws_gob_listener(callback: Callable[[bytes, bytes], Awaitable[None]]) -> None: - """ - Maintain persistent WebSocket connection to the Spectra SQB .gob endpoint. - Server pushes raw zstd-compressed .gob binary after each SQB replay is parsed. - Client does not send messages. - - Args: - callback: Async function called with (compressed_bytes, decompressed_bytes) - """ - auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}" - headers = {"Authorization": auth_value} - decompressor = zstd.ZstdDecompressor() - reconnect_delay = 1 - - async def _connect_gob(url: str, label: str): - logger.info(f"GOB WS attempting connect → {url}") - async with wsconnect(url, additional_headers=headers) as ws: - logger.info(f"WebSocket connected to {label}") - reconnect_delay_ref = 1 # noqa: F841 — reset handled by caller - async for message in ws: - try: - raw = bytes(message) if isinstance(message, (bytes, bytearray, memoryview)) else message.encode() - data = decompressor.decompress(raw) - await callback(raw, data) - except zstd.ZstdError as e: - logger.error(f"zstd decompression failed: {e}") - except Exception as e: - logger.error(f"Error processing GOB message: {e}") - - while True: - primary_url = WS_GOB_URL - primary_label = "Spectra GOB endpoint" - - try: - await _connect_gob(primary_url, primary_label) - except Exception as e: - logger.error(f"GOB WebSocket error ({primary_label}): {e}") - - logger.info(f"GOB WS reconnecting in {reconnect_delay}s...") - await asyncio.sleep(reconnect_delay) - reconnect_delay = min(reconnect_delay * 2, 30) - - -async def test_gob_ws(): - """ - Connect to the SQB GOB WebSocket and dump received messages to files. - Each decompressed .gob blob is written to STORAGE/REPLAYS/.gob for inspection. - """ - from pathlib import Path - if REPLAYS_DIR is None: - raise RuntimeError("REPLAYS_DIR is not configured") - replays_dir = REPLAYS_DIR - replays_dir.mkdir(parents=True, exist_ok=True) - - auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}" - print(f"Connecting to {WS_GOB_URL}") - print(f"API Key configured: {'Yes' if API_KEY else 'No'}") - print(f"Saving to {replays_dir}") - print("Waiting for messages (Ctrl+C to stop)...\n") - - decompressor = zstd.ZstdDecompressor() - count = 0 - - async with wsconnect(WS_GOB_URL, additional_headers={"Authorization": auth_value}) as ws: - print("Connected.") - async for message in ws: - raw = bytes(message) if isinstance(message, (bytes, bytearray, memoryview)) else message.encode() - print(f"[{count}] Received {len(raw)} bytes (compressed)") - data = b"" - try: - data = decompressor.decompress(raw) - print(f"[{count}] Decompressed to {len(data)} bytes") - replay = pygob.load(data) - d = _gob_to_dict(replay) - session_id = d.get("SessionID", count) - out = replays_dir / f"{session_id}.json" - out.write_text(json.dumps(d, indent=2, default=str), encoding="utf-8") - print(f"[{count}] Decoded and written to {out}\n") - except zstd.ZstdError as e: - print(f"[{count}] zstd decompression failed: {e}") - out = replays_dir / f"gob_{count}_raw.bin" - out.write_bytes(raw) - print(f"[{count}] Raw bytes written to {out}\n") - except Exception as e: - print(f"[{count}] gob decode failed: {e}") - if data: - out = replays_dir / f"gob_{count}.gob" - out.write_bytes(data) - print(f"[{count}] Raw gob written to {out}\n") - count += 1 - - if __name__ == "__main__": # Setup for direct execution import sys @@ -434,5 +323,3 @@ if __name__ == "__main__": mode = sys.argv[1] if len(sys.argv) > 1 else "replay" if mode == "replay": asyncio.run(test_fetch_replay_by_id()) - elif mode == "gob": - asyncio.run(test_gob_ws()) diff --git a/BOT/receiver_bridge.py b/BOT/receiver_bridge.py index a09bd94..3d6996a 100644 --- a/BOT/receiver_bridge.py +++ b/BOT/receiver_bridge.py @@ -6,7 +6,7 @@ Bridge helpers for external SREBOT transfer. This module provides two pieces: 1. A formal SREBOT API client that external consumers can use to query the SREBOT HTTP API. -2. A persistent outbox for replay and GOB payloads so the external bridge +2. A persistent outbox for replay payloads so the external bridge service can fan them out over websocket. """ @@ -210,17 +210,6 @@ async def publish_replay_batch(replays: list[dict[str, Any]]) -> None: await _append_external_envelope(envelope) -async def publish_gob_payload(payload: dict[str, Any]) -> None: - """Queue a GOB payload for websocket delivery by the external bridge.""" - envelope = { - "type": "spectra.gob", - "version": 1, - "source": "srebot", - "payload": payload, - } - await _append_external_envelope(envelope) - - async def publish_event(event_type: str, payload: dict[str, Any]) -> None: """Generic queue helper for future bridge events.""" envelope = { diff --git a/BOT/gob.py b/BOT/render_replay.py similarity index 90% rename from BOT/gob.py rename to BOT/render_replay.py index f42f6cd..32b8fac 100644 --- a/BOT/gob.py +++ b/BOT/render_replay.py @@ -1,12 +1,12 @@ """ -gob.py +render_replay.py -Handles GOB replay files: renders MP4 videos and exports slim JSON for the +Handles replay JSON files: renders MP4 videos and exports slim JSON for the web canvas replay viewer. Output mode is picked from the output file extension. Usage: - python -m BOT.gob # render video - python -m BOT.gob # export json + python -m BOT.render_replay # render video + python -m BOT.render_replay # export json Public API: render_gob(d, out_path, fps, speed, n_workers, progress_cb) @@ -29,8 +29,6 @@ from typing import Any, Callable, Optional # Third-Party Library Imports import numpy as np -import pygob -import zstandard as zstd from . import SHARED_DIR from .utils import REPLAYS_DIR @@ -439,7 +437,7 @@ def blit_batch(buf: np.ndarray, items: list[tuple[Sprite, int, int]]) -> None: @dataclass class VideoCtx: - """Pre-computed rendering context for one GOB replay video. + """Pre-computed rendering context for one replay video. Holds all interpolated positions, colors, death states, kill/damage events, label sprites, and the baked background for every frame so that @@ -1506,7 +1504,7 @@ def precompute_kills(kills: list[dict], xfm: CoordTransform, offset_x/y: crop origin to shift coordinates into crop-space. pid_pos: optional {PlayerID: (px_arr, py_arr)} for tracking moving entities. When provided, kill lines follow the entity's interpolated position - each frame instead of using the static GOB snapshot position. + each frame instead of using the static kill snapshot position. """ out: list[list[tuple]] = [[] for _ in range(n_frames)] kill_f = int(math.ceil(KILL_TTL * fps / 1000.0)) @@ -1565,7 +1563,7 @@ def precompute_damages(damages: list[dict], active: list[dict], up from px_all/py_all at the damage time. Args: - damages: Raw damage report dicts from the GOB replay. + damages: Raw damage report dicts from the replay. active: Active player dicts (used for PlayerID-to-index mapping). px_all: Precomputed pixel X positions, shape (n_players, n_frames). py_all: Precomputed pixel Y positions, shape (n_players, n_frames). @@ -2548,10 +2546,10 @@ def render_gob( progress_cb: Optional[Callable[[int], None]] = None, ) -> None: """ - Render a GOB replay dict to an MP4 file. + Render a replay dict to an MP4 file. Args: - d: Parsed GOB replay dict (from _gob_to_dict or json.load) + d: Parsed replay dict (normalized by load_gob_file) out_path: Output MP4 path fps: Frames per second speed: Playback speed multiplier @@ -2931,41 +2929,365 @@ def render_gob( print(f"\nDone → {out_path} ({sz:.1f} MB)") -# ── GOB loading helpers ─────────────────────────────────────────────────────── +# ── Replay loading helpers ──────────────────────────────────────────────────── -def _decode_gob_bytes(raw: bytes) -> str: - """Decode replay byte strings while trimming fixed-width padding bytes.""" - core = raw.split(b"\x00", 1)[0] - text = core.decode("utf-8", errors="replace") - return text.rstrip("".join(chr(i) for i in range(0x00, 0x20)) + "\x7f") +def _to_int(value: Any, default: int = 0) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default -def _gob_to_dict(obj: object) -> Any: - """Recursively convert pygob namedtuples to plain dicts.""" - if isinstance(obj, tuple) and hasattr(obj, '_fields'): # type: ignore[union-attr] - return {f: _gob_to_dict(getattr(obj, f)) for f in obj._fields} # type: ignore[union-attr] - elif isinstance(obj, list): - return [_gob_to_dict(i) for i in obj] - elif isinstance(obj, dict): +def _unit_to_model_name(unit_name: str) -> str: + internal = (unit_name or "").strip() + if not internal: + return "tankModels/unknown" + tags = _get_unit_tags(internal) or [] + tag_set = set(tags) + if "type_strike_ucav" in tag_set or "ucav" in internal.lower(): + return f"airModels/{internal}_ucav" + if tag_set & { + "air", + "aircraft", + "helicopter", + "type_jet_bomber", + "type_bomber", + "type_strike_aircraft", + "type_jet_fighter", + "type_fighter", + "type_helicopter", + }: + return f"airModels/{internal}" + return f"tankModels/{internal}" + + +def _path_sample_to_dict(sample: Any) -> dict[str, float] | None: + if isinstance(sample, dict): + if {"Time", "X", "Z"}.issubset(sample.keys()): + return { + "Time": float(sample.get("Time", 0.0)), + "X": float(sample.get("X", 0.0)), + "Y": float(sample.get("Y", 0.0)), + "Z": float(sample.get("Z", 0.0)), + } + if {"t", "x", "z"}.issubset(sample.keys()): + return { + "Time": float(sample.get("t", 0.0)), + "X": float(sample.get("x", 0.0)), + "Y": float(sample.get("y", 0.0)), + "Z": float(sample.get("z", 0.0)), + } + return None + + if isinstance(sample, (list, tuple)) and len(sample) >= 4: return { - (_decode_gob_bytes(k) if isinstance(k, bytes) else k): _gob_to_dict(v) - for k, v in obj.items() + "Time": float(sample[0]), + "X": float(sample[1]), + "Y": float(sample[2]), + "Z": float(sample[3]), } - elif isinstance(obj, bytes): - return _decode_gob_bytes(obj) - return obj + return None -def load_gob_file(gob_path: Path) -> dict[str, Any]: - """Load a .gob (zstd-compressed) or .json replay file and return the dict.""" - raw = gob_path.read_bytes() - if gob_path.suffix == ".json": - return json.loads(raw) - # zstd-compressed gob binary - decompressor = zstd.ZstdDecompressor() - data = decompressor.decompress(raw, max_output_size=200 * 1024 * 1024) - replay = pygob.load(data) - return _gob_to_dict(replay) # type: ignore[return-value] +def _position_at_time(path: list[dict[str, float]], time_ms: float) -> dict[str, float] | None: + if not path: + return None + if time_ms <= path[0]["Time"]: + return path[0] + prev = path[0] + for pt in path[1:]: + if pt["Time"] >= time_ms: + return pt + prev = pt + return prev + + +def _convert_ws_replay_to_render_dict(replay: dict[str, Any]) -> dict[str, Any]: + players_src = replay.get("players") or {} + if not isinstance(players_src, dict): + players_src = {} + + players_out: list[dict[str, Any]] = [] + winner_tag = str(replay.get("winner") or "") + loser_tag = str(replay.get("loser") or "") + winner_team = 0 + loser_team = 0 + + for uid_str, pdata in players_src.items(): + if not isinstance(pdata, dict): + continue + pid = _to_int(pdata.get("uid") or uid_str, 0) + team = _to_int(pdata.get("team"), 0) + tag = str(pdata.get("tag") or "") + if winner_team == 0 and tag == winner_tag: + winner_team = team + if loser_team == 0 and tag == loser_tag: + loser_team = team + players_out.append({ + "PlayerID": pid, + "Name": str(pdata.get("name") or f"Player#{pid}"), + "Team": team, + "Clan": tag, + }) + + if winner_team == 0: + winner_team = 1 if loser_team != 1 else 2 + + entities_src = replay.get("entities") or [] + if not isinstance(entities_src, list): + entities_src = [] + + entities_out: list[dict[str, Any]] = [] + uid_to_entity_index: dict[int, int] = {} + + for idx, ent in enumerate(entities_src, start=1): + if not isinstance(ent, dict): + continue + uid = _to_int(ent.get("uid"), 0) + unit = str(ent.get("unit") or "") + path_raw = ent.get("path") or [] + if not isinstance(path_raw, list): + continue + path: list[dict[str, float]] = [] + for sample in path_raw: + parsed = _path_sample_to_dict(sample) + if parsed is not None: + path.append(parsed) + if not path: + continue + entity_index = _to_int(ent.get("entity_index") or ent.get("entityIndex"), idx) + uid_to_entity_index.setdefault(uid, entity_index) + entities_out.append({ + "EntityIndex": entity_index, + "PlayerID": uid, + "ModelName": _unit_to_model_name(unit), + "Path": path, + }) + + entity_paths_by_uid: dict[int, list[dict[str, float]]] = { + e["PlayerID"]: e["Path"] for e in entities_out if e.get("PlayerID") + } + + events = replay.get("events") or {} + if isinstance(events, str): + try: + events = json.loads(events) + except json.JSONDecodeError: + events = {} + if not isinstance(events, dict): + events = {} + + kills_out: list[dict[str, Any]] = [] + for kill in (events.get("kills") or []): + if not isinstance(kill, dict): + continue + victim_id = _to_int(kill.get("offended_uid"), 0) + killer_id = _to_int(kill.get("offender_uid"), 0) + kill_time = float(kill.get("time") or 0.0) + victim_path = entity_paths_by_uid.get(victim_id, []) + killer_path = entity_paths_by_uid.get(killer_id, []) + victim_pos = _position_at_time(victim_path, kill_time) + killer_pos = _position_at_time(killer_path, kill_time) + payload: dict[str, Any] = { + "Time": kill_time, + "VictimID": victim_id, + "KillerID": killer_id, + "VictimEntityIndex": uid_to_entity_index.get(victim_id, 0), + "Weapon": str(kill.get("used_weapon") or kill.get("weapon") or ""), + "VictimModel": _unit_to_model_name(str(kill.get("offended_unit") or "")), + "KillerModel": _unit_to_model_name(str(kill.get("offender_unit") or "")), + "crashed": bool(kill.get("crashed", False)), + } + if victim_pos: + payload["VictimPosition"] = { + "X": float(victim_pos["X"]), + "Y": float(victim_pos["Y"]), + "Z": float(victim_pos["Z"]), + } + if killer_pos: + payload["KillerPosition"] = { + "X": float(killer_pos["X"]), + "Y": float(killer_pos["Y"]), + "Z": float(killer_pos["Z"]), + } + kills_out.append(payload) + + damages_out: list[dict[str, Any]] = [] + for dmg in (events.get("damage") or []): + if not isinstance(dmg, dict): + continue + damages_out.append({ + "Time": float(dmg.get("time") or 0.0), + "OffenderID": _to_int(dmg.get("offender_uid"), 0), + "OffendedID": _to_int(dmg.get("offended_uid"), 0), + "OffenderModel": _unit_to_model_name(str(dmg.get("offender_unit") or "")), + "OffendedModel": _unit_to_model_name(str(dmg.get("offended_unit") or "")), + "Afire": bool(dmg.get("afire", False)), + }) + + mission_mode = str(replay.get("mission_mode") or "") + difficulty = str(replay.get("difficulty") or "") + battle_type = mission_mode or difficulty + + return { + "SessionID": _to_int(replay.get("_id") or replay.get("id"), 0), + "TeamWon": winner_team, + "Mission": { + "Level": str(replay.get("level_path") or ""), + "LevelSettings": str(replay.get("mission_path") or ""), + "BattleType": battle_type, + }, + "Players": players_out, + "Entities": entities_out, + "Kills": kills_out, + "DamageReports": damages_out, + } + + +def _convert_local_replay_to_render_dict(replay: dict[str, Any]) -> dict[str, Any]: + teams_src = replay.get("teams") or [] + if not isinstance(teams_src, list): + teams_src = [] + + players_out: list[dict[str, Any]] = [] + winner_sq = str(replay.get("winning_team_squadron") or "") + winner_team = 1 + + for idx, team in enumerate(teams_src[:2], start=1): + if not isinstance(team, dict): + continue + team_sq = str(team.get("squadron") or "") + if team_sq and team_sq == winner_sq: + winner_team = idx + for p in (team.get("players") or []): + if not isinstance(p, dict): + continue + pid = _to_int(p.get("uid"), 0) + if pid <= 0: + continue + players_out.append({ + "PlayerID": pid, + "Name": str(p.get("nick") or f"Player#{pid}"), + "Team": idx, + "Clan": str(team.get("squadron_tagged") or team_sq), + }) + + entities_src = replay.get("entities") or [] + if not isinstance(entities_src, list): + entities_src = [] + + entities_out: list[dict[str, Any]] = [] + uid_to_entity_index: dict[int, int] = {} + for idx, ent in enumerate(entities_src, start=1): + if not isinstance(ent, dict): + continue + uid = _to_int(ent.get("uid"), 0) + unit = str(ent.get("unit") or "") + path_raw = ent.get("path") or [] + if not isinstance(path_raw, list): + continue + path: list[dict[str, float]] = [] + for sample in path_raw: + parsed = _path_sample_to_dict(sample) + if parsed is not None: + path.append(parsed) + if not path: + continue + entity_index = _to_int(ent.get("entity_index") or ent.get("entityIndex"), idx) + uid_to_entity_index.setdefault(uid, entity_index) + entities_out.append({ + "EntityIndex": entity_index, + "PlayerID": uid, + "ModelName": _unit_to_model_name(unit), + "Path": path, + }) + + entity_paths_by_uid: dict[int, list[dict[str, float]]] = { + e["PlayerID"]: e["Path"] for e in entities_out if e.get("PlayerID") + } + + events = replay.get("events") or {} + if isinstance(events, str): + try: + events = json.loads(events) + except json.JSONDecodeError: + events = {} + if not isinstance(events, dict): + events = {} + + kills_out: list[dict[str, Any]] = [] + for kill in (events.get("kills") or []): + if not isinstance(kill, dict): + continue + victim_id = _to_int(kill.get("offended_uid"), 0) + killer_id = _to_int(kill.get("offender_uid"), 0) + kill_time = float(kill.get("time") or 0.0) + victim_path = entity_paths_by_uid.get(victim_id, []) + killer_path = entity_paths_by_uid.get(killer_id, []) + victim_pos = _position_at_time(victim_path, kill_time) + killer_pos = _position_at_time(killer_path, kill_time) + payload: dict[str, Any] = { + "Time": kill_time, + "VictimID": victim_id, + "KillerID": killer_id, + "VictimEntityIndex": uid_to_entity_index.get(victim_id, 0), + "Weapon": str(kill.get("used_weapon") or kill.get("weapon") or ""), + "VictimModel": _unit_to_model_name(str(kill.get("offended_unit") or "")), + "KillerModel": _unit_to_model_name(str(kill.get("offender_unit") or "")), + "crashed": bool(kill.get("crashed", False)), + } + if victim_pos: + payload["VictimPosition"] = { + "X": float(victim_pos["X"]), + "Y": float(victim_pos["Y"]), + "Z": float(victim_pos["Z"]), + } + if killer_pos: + payload["KillerPosition"] = { + "X": float(killer_pos["X"]), + "Y": float(killer_pos["Y"]), + "Z": float(killer_pos["Z"]), + } + kills_out.append(payload) + + damages_out: list[dict[str, Any]] = [] + for dmg in (events.get("damage") or []): + if not isinstance(dmg, dict): + continue + damages_out.append({ + "Time": float(dmg.get("time") or 0.0), + "OffenderID": _to_int(dmg.get("offender_uid"), 0), + "OffendedID": _to_int(dmg.get("offended_uid"), 0), + "OffenderModel": _unit_to_model_name(str(dmg.get("offender_unit") or "")), + "OffendedModel": _unit_to_model_name(str(dmg.get("offended_unit") or "")), + "Afire": bool(dmg.get("afire", False)), + }) + + return { + "SessionID": _to_int(replay.get("session_id_dec") or replay.get("session_id_hex"), 0), + "TeamWon": winner_team, + "Mission": { + "Level": str(replay.get("level_path") or ""), + "LevelSettings": str(replay.get("mission_path") or ""), + "BattleType": str(replay.get("mode") or replay.get("difficulty") or ""), + }, + "Players": players_out, + "Entities": entities_out, + "Kills": kills_out, + "DamageReports": damages_out, + } + + +def load_gob_file(replay_path: Path) -> dict[str, Any]: + """Load a replay .json file and normalize it for render/export routines.""" + data = json.loads(replay_path.read_text(encoding="utf-8")) + if isinstance(data, dict) and {"Players", "Entities", "Mission"}.issubset(data.keys()): + return data + if isinstance(data, dict): + if {"teams", "events", "entities"}.issubset(data.keys()): + return _convert_local_replay_to_render_dict(data) + return _convert_ws_replay_to_render_dict(data) + raise ValueError(f"Unsupported replay payload in {replay_path}") # ── JSON export (slim dict for the web canvas replay viewer) ────────────────── @@ -3067,9 +3389,9 @@ def _resolve_drone_team(drone_entity: dict, ground_entities: list[dict], return best_team -def export_replay_json(gob_path: Path) -> dict: - """Load a GOB file and produce a slim dict for the web viewer.""" - d = load_gob_file(gob_path) +def export_replay_json(replay_path: Path) -> dict: + """Load a replay file and produce a slim dict for the web viewer.""" + d = load_gob_file(replay_path) players_by_id = {p["PlayerID"]: p for p in d.get("Players", [])} team_won = d.get("TeamWon", 0) @@ -3244,14 +3566,14 @@ def export_replay_json(gob_path: Path) -> dict: # ── Main (CLI wrapper) ───────────────────────────────────────────────────────── def main(): - """CLI entry point: render a GOB replay to MP4, or export a slim JSON for the web viewer. + """CLI entry point: render a replay JSON to MP4, or export a slim viewer JSON. Output mode is selected by the output file extension: `.json` → json export, anything else → mp4 render. Supports --profile for cProfile hotspot analysis. """ import argparse - parser = argparse.ArgumentParser(description="Render GOB replay to MP4") - parser.add_argument("gob", nargs="?", help="Path to .gob or .json replay") + parser = argparse.ArgumentParser(description="Render replay_data JSON to MP4") + parser.add_argument("replay", nargs="?", help="Path to replay_data .json") parser.add_argument("out", nargs="?", help="Output .mp4 path") parser.add_argument("--fps", type=int, default=FPS) parser.add_argument("--speed", type=float, default=SPEED) @@ -3260,30 +3582,30 @@ def main(): help="Run with cProfile and print top 40 hotspots") args = parser.parse_args() - if args.gob: - gob_path = Path(args.gob) + if args.replay: + replay_path = Path(args.replay) else: - candidates = sorted(REPLAYS_DIR.glob("*/replay.gob")) + candidates = sorted(REPLAYS_DIR.glob("*/replay_data.json")) if not candidates: candidates = sorted(REPLAYS_DIR.glob("*.json")) if not candidates: - sys.exit(f"No .gob or .json files in {REPLAYS_DIR}") - gob_path = candidates[0] + sys.exit(f"No replay .json files in {REPLAYS_DIR}") + replay_path = candidates[0] - out_path = Path(args.out) if args.out else gob_path.parent / "replay_video.mp4" + out_path = Path(args.out) if args.out else replay_path.parent / "replay_video.mp4" if out_path.suffix.lower() == ".json": - data = export_replay_json(gob_path) + data = export_replay_json(replay_path) raw = json.dumps(data, separators=(",", ":")) out_path.write_text(raw, encoding="utf-8") print(f"Exported {len(raw):,} bytes to {out_path}") return - print(f"Input : {gob_path}") + print(f"Input : {replay_path}") print(f"Output : {out_path}") print(f"Settings : {args.fps}fps {args.speed:.0f}× {args.workers} threads") - d = load_gob_file(gob_path) + d = load_gob_file(replay_path) if args.profile: import cProfile diff --git a/BOT/srebot_external.py b/BOT/srebot_external.py index 05b76a8..07e42b0 100644 --- a/BOT/srebot_external.py +++ b/BOT/srebot_external.py @@ -3,7 +3,7 @@ This PM2-managed process does two things: 1. Proxies read-only SREBOT queries on the external port. -2. Broadcasts SREBOT replay/GOB envelopes over websocket to any connected +2. Broadcasts SREBOT replay envelopes over websocket to any connected client. """ @@ -165,7 +165,7 @@ async def root(_: web.Request) -> web.Response: return web.json_response( { "service": "srebot-external", - "message": "Use /api/* for queries and /ws/srebot for replay/gob events.", + "message": "Use /api/* for queries and /ws/srebot for replay events.", } ) diff --git a/BOT/task_executors.py b/BOT/task_executors.py index a7ce51e..2432d0b 100644 --- a/BOT/task_executors.py +++ b/BOT/task_executors.py @@ -285,14 +285,14 @@ async def cleanup_replays(): """ Cleans up replay directories in STORAGE/REPLAYS/: - After 12 hours: deletes regenerable files (PNGs, MP4s) - - After 48 hours: deletes entire directory (GOB + JSON included) + - After 48 hours: deletes entire directory (replay JSON included) - Age is determined from the mtime of replay.gob / replay_data.json (written + Age is determined from the mtime of replay_data.json (written once at capture time), not the directory mtime — directory mtime is bumped whenever files inside are added or removed (including by this cleanup), which would otherwise keep dirs perpetually "fresh". """ - KEEP_FILES = {"replay.gob", "replay_data.json"} + KEEP_FILES = {"replay_data.json"} def _sync_cleanup_replays(): """Synchronous helper that walks replay dirs and deletes stale files.""" @@ -312,11 +312,8 @@ async def cleanup_replays(): if not entry_path.is_dir(): continue - gob_path = entry_path / "replay.gob" json_path = entry_path / "replay_data.json" - if gob_path.exists(): - entry_mtime = gob_path.stat().st_mtime - elif json_path.exists(): + if json_path.exists(): entry_mtime = json_path.stat().st_mtime else: entry_mtime = entry_path.stat().st_mtime diff --git a/BOT/tasks.py b/BOT/tasks.py index 995f9d0..adaae13 100644 --- a/BOT/tasks.py +++ b/BOT/tasks.py @@ -24,7 +24,7 @@ from discord.ext import tasks # Local Module Imports from . import lux_apis -from .autologging import handle_ws_replays, handle_gob_message +from .autologging import handle_ws_replays from .health import get_recent_ttl_stats, record_task_run, write_heartbeat from .meta_manager import process_all_players, sync_all_guild_metas from .task_executors import ( @@ -501,32 +501,6 @@ async def after_ws_autolog(): ws_autolog_task.start() -# ============================================================================ -# WEBSOCKET GOB LISTENER TASK -# ============================================================================ - -@tasks.loop(count=1) -async def ws_gob_task(): - """ - Single-run task that maintains persistent WebSocket connection to the GOB endpoint. - Saves incoming compressed GOB replays to disk for on-demand video generation. - """ - await lux_apis.ws_gob_listener(handle_gob_message) - - -@ws_gob_task.before_loop -async def before_ws_gob(): - await get_bot().wait_until_ready() - - -@ws_gob_task.after_loop -async def after_ws_gob(): - if ws_gob_task.failed(): - logging.error("[GOB] ws_gob_task died, restarting in 10s...") - await asyncio.sleep(10) - ws_gob_task.start() - - # ============================================================================ # SQUADRON POINTS CONTINUOUS UPDATER # ============================================================================ @@ -729,7 +703,6 @@ async def start_all_tasks(): ttl_alert_task.start() # Phase 2: WebSocket listeners ws_autolog_task.start() - ws_gob_task.start() # Phase 3: Heavy DB tasks (background — doesn't block on_ready) asyncio.create_task(_startup_heavy_init()) @@ -747,7 +720,6 @@ def stop_all_tasks(): update_squadrons_db_task.cancel() update_meta_data_task.cancel() ws_autolog_task.cancel() - ws_gob_task.cancel() squadron_points_loop_task.cancel() sync_guild_metas_task.cancel() health_heartbeat_task.cancel() diff --git a/BOT/utils.py b/BOT/utils.py index 5f47ca5..8eee681 100644 --- a/BOT/utils.py +++ b/BOT/utils.py @@ -1216,11 +1216,15 @@ def transform_to_local_format(api_data: Dict[str, Any]) -> Optional[Dict[str, An replay = api_data["completed"][0] - winner_winged = replay.get("winner") - loser_winged = replay.get("loser") + winner_winged = str(replay.get("winner") or "") + loser_winged = str(replay.get("loser") or "") - winner_squadron = winner_winged[1:-1] if winner_winged else "" - loser_squadron = loser_winged[1:-1] if loser_winged else "" + def _normalize_squadron_tag(raw: str) -> str: + cleaned = re.sub(r"[^A-Za-z0-9_-]", "", raw or "") + return cleaned or (raw or "").strip() + + winner_squadron = _normalize_squadron_tag(winner_winged) + loser_squadron = _normalize_squadron_tag(loser_winged) is_draw = replay.get("draw", False) @@ -1310,7 +1314,7 @@ def transform_to_local_format(api_data: Dict[str, Any]) -> Optional[Dict[str, An "offended_uid": str(kill["offended_uid"]) if kill.get("offended_uid") is not None else None, "offended_unit": kill.get("offended_unit"), "crashed": kill.get("crashed", False), - "weapon": kill.get("weapon", ""), + "weapon": kill.get("used_weapon", "") or kill.get("weapon", ""), "afire": False, }) @@ -1402,9 +1406,23 @@ def transform_to_local_format(api_data: Dict[str, Any]) -> Optional[Dict[str, An f"{prefix}[{time_str}] {sq_tag:<7} {name} ({vehicle}) damaged {afire}{victim_name} ({victim_vehicle})" ) - raw_id = replay.get("_id") - start_ts = replay.get("start_ts") or 0 - end_ts = replay.get("end_ts") or 0 + raw_id = replay.get("_id") + if raw_id is None: + raw_id = replay.get("id") + start_ts = int(replay.get("start_ts") or 0) + end_ts = int(replay.get("end_ts") or 0) + + mission_name = str(replay.get("mission_name") or "").strip() + if not mission_name: + mission_name = str(replay.get("level_path") or "").strip() + + mission_mode = str(replay.get("mission_mode") or "").strip() + if not mission_mode: + mission_mode = str(replay.get("difficulty") or "").strip() + + duration = replay.get("duration") + if duration is None: + duration = max(0, end_ts - start_ts) session_id_dec = str(raw_id) if raw_id is not None else "" try: @@ -1420,9 +1438,11 @@ def transform_to_local_format(api_data: Dict[str, Any]) -> Optional[Dict[str, An "session_id_dec": session_id_dec, "session_id_hex": session_id_hex, "timestamp": end_ts, - "map": replay.get("mission_name", ""), - "mode": replay.get("mission_mode", ""), - "duration": end_ts - start_ts, + "start_ts": start_ts, + "end_ts": end_ts, + "map": mission_name, + "mode": mission_mode, + "duration": duration, "draw": is_draw, "teams": [ { @@ -1444,6 +1464,11 @@ def transform_to_local_format(api_data: Dict[str, Any]) -> Optional[Dict[str, An ], "chat_log": chat_log, "battle_log": battle_log, + "events": raw_events, + "entities": replay.get("entities", []), + "level_path": replay.get("level_path"), + "mission_path": replay.get("mission_path"), + "difficulty": replay.get("difficulty"), "type": replay.get("type", ""), } diff --git a/requirements.txt b/requirements.txt index 0758beb..7c41112 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ requests>=2.32.3,<3.0.0 beautifulsoup4>=4.12.3,<5.0.0 lxml>=5.0.0 zstandard -pygob @ git+https://github.com/mgeisler/pygob.git lz4==4.3.3 aiofiles aiohttp diff --git a/test_spectra.py b/test_spectra.py index 368e005..cebe92a 100644 --- a/test_spectra.py +++ b/test_spectra.py @@ -10,7 +10,6 @@ load_dotenv() API_KEY = os.getenv("SPECTRA_API_KEY", "") BASE_URL = os.getenv("SPECTRA_API_URL", "") WS_URL = os.getenv("SPECTRA_WS_SQB_URL", "") -WS_GOB_URL = os.getenv("SPECTRA_WS_GOB_URL", "") async def test_http(): @@ -42,27 +41,13 @@ async def test_ws_sqb(): print(f"[WS-SQB] Failed: {type(e).__name__}: {e}") -async def test_ws_gob(): - """Test GOB WebSocket connection.""" - url = WS_GOB_URL - print(f"\n[WS-GOB] Connecting to {url}") - try: - async with connect(url, additional_headers={"Authorization": API_KEY}, open_timeout=15) as ws: - print("[WS-GOB] Connected! Waiting for message (10s)...") - msg = await asyncio.wait_for(ws.recv(), timeout=10) - print(f"[WS-GOB] Got message ({len(msg)} bytes)") - except Exception as e: - print(f"[WS-GOB] Failed: {type(e).__name__}: {e}") - - async def main(): print(f"API Key configured: {'Yes' if API_KEY else 'No'}") print(f"Base URL: {BASE_URL}") print(f"WS URL: {WS_URL}") - print(f"WS GOB URL: {WS_GOB_URL}\n") + print() await test_http() await test_ws_sqb() - await test_ws_gob() if __name__ == "__main__": diff --git a/web/server.js b/web/server.js index d0ef30b..597795c 100644 --- a/web/server.js +++ b/web/server.js @@ -1828,7 +1828,7 @@ app.get('/api/match/:sessionId/video', async (req, res) => { const sessionDir = resolveReplaySessionDir(sessionId); const videoPath = path.join(sessionDir, 'replay_video.mp4'); - const gobPath = path.join(sessionDir, 'replay.gob'); + const replayPath = path.join(sessionDir, 'replay_data.json'); // 1. Serve from disk if cached if (fs.existsSync(videoPath)) { @@ -1837,9 +1837,9 @@ app.get('/api/match/:sessionId/video', async (req, res) => { }); } - // 2. Check if compressed gob exists for generation - if (!fs.existsSync(gobPath)) { - return res.status(404).json({ available: false, reason: 'No GOB replay data available for this session' }); + // 2. Check if replay JSON exists for generation + if (!fs.existsSync(replayPath)) { + return res.status(404).json({ available: false, reason: 'No replay data available for this session' }); } // 3. Generate video on demand (decompress + render) @@ -1850,7 +1850,7 @@ app.get('/api/match/:sessionId/video', async (req, res) => { try { await new Promise((resolve, reject) => { const pythonBin = path.join(__dirname, '..', '.venv', 'bin', 'python'); - execFile(pythonBin, ['-m', 'BOT.gob', gobPath, videoPath], { + execFile(pythonBin, ['-m', 'BOT.render_replay', replayPath, videoPath], { timeout: 120000, cwd: path.join(__dirname, '..') }, (error, stdout, stderr) => { @@ -1891,7 +1891,7 @@ app.get('/api/match/:sessionId/replay-canvas', async (req, res) => { const sessionDir = resolveReplaySessionDir(sessionId); const jsonPath = path.join(sessionDir, 'replay_canvas.json'); - const gobPath = path.join(sessionDir, 'replay.gob'); + const replayPath = path.join(sessionDir, 'replay_data.json'); // 1. Serve from disk if cached if (fs.existsSync(jsonPath)) { @@ -1900,9 +1900,9 @@ app.get('/api/match/:sessionId/replay-canvas', async (req, res) => { }); } - // 2. Check if GOB exists - if (!fs.existsSync(gobPath)) { - return res.status(404).json({ available: false, reason: 'No GOB replay data available' }); + // 2. Check if replay JSON exists + if (!fs.existsSync(replayPath)) { + return res.status(404).json({ available: false, reason: 'No replay data available' }); } // 3. Generate on demand @@ -1913,7 +1913,7 @@ app.get('/api/match/:sessionId/replay-canvas', async (req, res) => { try { await new Promise((resolve, reject) => { const pythonBin = path.join(__dirname, '..', '.venv', 'bin', 'python'); - execFile(pythonBin, ['-m', 'BOT.gob', gobPath, jsonPath], { + execFile(pythonBin, ['-m', 'BOT.render_replay', replayPath, jsonPath], { timeout: 30000, cwd: path.join(__dirname, '..') }, (error, stdout, stderr) => {