update to handle new structure from spectra, no more gobs (#1266)
This commit is contained in:
+16
-129
@@ -15,20 +15,10 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional
|
||||
|
||||
# Third-Party Library Imports
|
||||
import aiohttp
|
||||
import pygob
|
||||
import zstandard as zstd
|
||||
from dotenv import load_dotenv
|
||||
from websockets.asyncio.client import connect as wsconnect
|
||||
|
||||
# Local Module Imports
|
||||
try:
|
||||
from data_parser import LangTableReader
|
||||
from .utils import REPLAYS_DIR
|
||||
except ImportError:
|
||||
LangTableReader = None # Running directly, not as module
|
||||
REPLAYS_DIR = None
|
||||
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
@@ -41,31 +31,9 @@ _replay_queue: asyncio.Queue = asyncio.Queue()
|
||||
WS_URL = os.getenv("SPECTRA_WS_SQB_URL", "")
|
||||
API_KEY = os.getenv("SPECTRA_API_KEY", "")
|
||||
SPECTRA_API_URL = os.getenv("SPECTRA_API_URL", "")
|
||||
WS_GOB_URL = os.getenv("SPECTRA_WS_GOB_URL", "")
|
||||
LEADERBOARD_PATH = "/v1/game/leaderboard"
|
||||
REPLAY_SORT_PATH = "/v1/replays/sort"
|
||||
|
||||
# Initialize translation reader for vehicle names
|
||||
translate = LangTableReader("English") if LangTableReader else None
|
||||
|
||||
|
||||
def _gob_to_dict(obj: object) -> Any:
|
||||
"""Recursively convert pygob namedtuples to plain dicts for JSON serialization."""
|
||||
fields = getattr(obj, '_fields', None)
|
||||
if isinstance(obj, tuple) and fields is not None:
|
||||
return {f: _gob_to_dict(getattr(obj, f)) for f in fields}
|
||||
elif isinstance(obj, list):
|
||||
return [_gob_to_dict(i) for i in obj]
|
||||
elif isinstance(obj, dict):
|
||||
return {
|
||||
(k.decode('utf-8', errors='replace') if isinstance(k, bytes) else k): _gob_to_dict(v)
|
||||
for k, v in obj.items()
|
||||
}
|
||||
elif isinstance(obj, bytes):
|
||||
return obj.decode('utf-8', errors='replace')
|
||||
return obj
|
||||
|
||||
|
||||
def normalize_ws_message(data: Any) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
Normalize WebSocket message to list of replay dicts.
|
||||
@@ -104,7 +72,9 @@ async def ws_replay_listener(callback: Callable[[List[Dict[str, Any]]], Awaitabl
|
||||
Args:
|
||||
callback: Async function to call with normalized replay data
|
||||
"""
|
||||
headers = {'Authorization': API_KEY}
|
||||
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
||||
headers = {"Authorization": auth_value}
|
||||
decompressor = zstd.ZstdDecompressor()
|
||||
reconnect_delay = 1
|
||||
|
||||
# Start queue processor as background task
|
||||
@@ -116,12 +86,23 @@ async def ws_replay_listener(callback: Callable[[List[Dict[str, Any]]], Awaitabl
|
||||
logger.info(f"WebSocket connected to {label}")
|
||||
async for message in ws:
|
||||
try:
|
||||
data = json.loads(message)
|
||||
if isinstance(message, str):
|
||||
raw = message.encode("utf-8")
|
||||
else:
|
||||
raw = bytes(message)
|
||||
|
||||
text: str
|
||||
try:
|
||||
text = decompressor.decompress(raw, max_output_size=64 * 1024 * 1024).decode("utf-8")
|
||||
except zstd.ZstdError:
|
||||
text = raw.decode("utf-8")
|
||||
|
||||
data = json.loads(text)
|
||||
replays = normalize_ws_message(data)
|
||||
if replays:
|
||||
await _replay_queue.put(replays)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Invalid JSON from WS: {message[:100]}")
|
||||
logger.warning("Invalid JSON from WS frame")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing WS message: {e}")
|
||||
|
||||
@@ -326,98 +307,6 @@ async def test_fetch_replay_by_id():
|
||||
print("No data returned.")
|
||||
|
||||
|
||||
async def ws_gob_listener(callback: Callable[[bytes, bytes], Awaitable[None]]) -> None:
|
||||
"""
|
||||
Maintain persistent WebSocket connection to the Spectra SQB .gob endpoint.
|
||||
Server pushes raw zstd-compressed .gob binary after each SQB replay is parsed.
|
||||
Client does not send messages.
|
||||
|
||||
Args:
|
||||
callback: Async function called with (compressed_bytes, decompressed_bytes)
|
||||
"""
|
||||
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
||||
headers = {"Authorization": auth_value}
|
||||
decompressor = zstd.ZstdDecompressor()
|
||||
reconnect_delay = 1
|
||||
|
||||
async def _connect_gob(url: str, label: str):
|
||||
logger.info(f"GOB WS attempting connect → {url}")
|
||||
async with wsconnect(url, additional_headers=headers) as ws:
|
||||
logger.info(f"WebSocket connected to {label}")
|
||||
reconnect_delay_ref = 1 # noqa: F841 — reset handled by caller
|
||||
async for message in ws:
|
||||
try:
|
||||
raw = bytes(message) if isinstance(message, (bytes, bytearray, memoryview)) else message.encode()
|
||||
data = decompressor.decompress(raw)
|
||||
await callback(raw, data)
|
||||
except zstd.ZstdError as e:
|
||||
logger.error(f"zstd decompression failed: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing GOB message: {e}")
|
||||
|
||||
while True:
|
||||
primary_url = WS_GOB_URL
|
||||
primary_label = "Spectra GOB endpoint"
|
||||
|
||||
try:
|
||||
await _connect_gob(primary_url, primary_label)
|
||||
except Exception as e:
|
||||
logger.error(f"GOB WebSocket error ({primary_label}): {e}")
|
||||
|
||||
logger.info(f"GOB WS reconnecting in {reconnect_delay}s...")
|
||||
await asyncio.sleep(reconnect_delay)
|
||||
reconnect_delay = min(reconnect_delay * 2, 30)
|
||||
|
||||
|
||||
async def test_gob_ws():
|
||||
"""
|
||||
Connect to the SQB GOB WebSocket and dump received messages to files.
|
||||
Each decompressed .gob blob is written to STORAGE/REPLAYS/<session_id>.gob for inspection.
|
||||
"""
|
||||
from pathlib import Path
|
||||
if REPLAYS_DIR is None:
|
||||
raise RuntimeError("REPLAYS_DIR is not configured")
|
||||
replays_dir = REPLAYS_DIR
|
||||
replays_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
||||
print(f"Connecting to {WS_GOB_URL}")
|
||||
print(f"API Key configured: {'Yes' if API_KEY else 'No'}")
|
||||
print(f"Saving to {replays_dir}")
|
||||
print("Waiting for messages (Ctrl+C to stop)...\n")
|
||||
|
||||
decompressor = zstd.ZstdDecompressor()
|
||||
count = 0
|
||||
|
||||
async with wsconnect(WS_GOB_URL, additional_headers={"Authorization": auth_value}) as ws:
|
||||
print("Connected.")
|
||||
async for message in ws:
|
||||
raw = bytes(message) if isinstance(message, (bytes, bytearray, memoryview)) else message.encode()
|
||||
print(f"[{count}] Received {len(raw)} bytes (compressed)")
|
||||
data = b""
|
||||
try:
|
||||
data = decompressor.decompress(raw)
|
||||
print(f"[{count}] Decompressed to {len(data)} bytes")
|
||||
replay = pygob.load(data)
|
||||
d = _gob_to_dict(replay)
|
||||
session_id = d.get("SessionID", count)
|
||||
out = replays_dir / f"{session_id}.json"
|
||||
out.write_text(json.dumps(d, indent=2, default=str), encoding="utf-8")
|
||||
print(f"[{count}] Decoded and written to {out}\n")
|
||||
except zstd.ZstdError as e:
|
||||
print(f"[{count}] zstd decompression failed: {e}")
|
||||
out = replays_dir / f"gob_{count}_raw.bin"
|
||||
out.write_bytes(raw)
|
||||
print(f"[{count}] Raw bytes written to {out}\n")
|
||||
except Exception as e:
|
||||
print(f"[{count}] gob decode failed: {e}")
|
||||
if data:
|
||||
out = replays_dir / f"gob_{count}.gob"
|
||||
out.write_bytes(data)
|
||||
print(f"[{count}] Raw gob written to {out}\n")
|
||||
count += 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Setup for direct execution
|
||||
import sys
|
||||
@@ -434,5 +323,3 @@ if __name__ == "__main__":
|
||||
mode = sys.argv[1] if len(sys.argv) > 1 else "replay"
|
||||
if mode == "replay":
|
||||
asyncio.run(test_fetch_replay_by_id())
|
||||
elif mode == "gob":
|
||||
asyncio.run(test_gob_ws())
|
||||
|
||||
Reference in New Issue
Block a user