From ac01217e1684a8c2c7dc223083b02de8a8ab0992 Mon Sep 17 00:00:00 2001 From: NotSoToothless <67082114+FURRO404@users.noreply.github.com> Date: Mon, 25 May 2026 21:24:56 -0700 Subject: [PATCH] update tss and sre replay area (#1269) --- tss_ws.py | 184 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 tss_ws.py diff --git a/tss_ws.py b/tss_ws.py new file mode 100644 index 0000000..17b9a6f --- /dev/null +++ b/tss_ws.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +""" +tss_ws.py + +Standalone TSS WebSocket listener for TSSBOT. + +Each received game is written to: + STORAGE_VOL_PATH/REPLAYS/TSS//replay_data.json.gz + +Run modes: + python tss_ws.py — listen forever, write every game + python tss_ws.py --one — capture exactly one game, write it, then exit + +Reads SPECTRA_WS_TSS_URL, SPECTRA_API_KEY, and STORAGE_VOL_PATH from TSSBOT/.env. +""" +from __future__ import annotations + +import asyncio +import gzip +import json +import logging +import os +import sys +import time +from pathlib import Path +from typing import Any, Callable, Awaitable, List, Dict, Optional + +import zstandard as zstd +from dotenv import load_dotenv +from websockets.asyncio.client import connect as wsconnect + +_HERE = Path(__file__).resolve().parent +load_dotenv(dotenv_path=_HERE / ".env") + +logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s] [%(levelname)s] [tss-ws] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("tss-ws") + +WS_URL = os.getenv("SPECTRA_WS_TSS_URL", "").strip() +API_KEY = os.getenv("SPECTRA_API_KEY", "").strip() + +_storage_raw = os.getenv("STORAGE_VOL_PATH", "").strip() +if not _storage_raw: + log.error("STORAGE_VOL_PATH is not set in TSSBOT/.env — aborting") + sys.exit(1) + +REPLAYS_DIR = Path(_storage_raw) / "REPLAYS" / "TSS" +REPLAYS_DIR.mkdir(parents=True, exist_ok=True) + + +def _auth_header() -> str: + return API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}" + + +def _session_id(game: Dict[str, Any]) -> str: + """Extract a filesystem-safe session ID from a game dict.""" + raw = game.get("_id") or game.get("id") + if raw is None: + return f"unknown_{int(time.time())}" + # Numeric ID → hex string (matches SREBOT convention) + if isinstance(raw, int): + return hex(raw)[2:].lower() + s = str(raw).strip().lower() + return s[2:] if s.startswith("0x") else s + + +def _write_game(game: Dict[str, Any]) -> Path: + """Write game dict to REPLAYS/TSS//replay_data.json.gz.""" + sid = _session_id(game) + session_dir = REPLAYS_DIR / sid + session_dir.mkdir(parents=True, exist_ok=True) + out = session_dir / "replay_data.json.gz" + raw = json.dumps(game, ensure_ascii=False).encode("utf-8") + with gzip.open(out, "wb") as fh: + fh.write(raw) + return out + + +def normalize(data: Any) -> Optional[List[Dict[str, Any]]]: + """Coerce whatever the WS sends into a list of game dicts.""" + if isinstance(data, list) and data and isinstance(data[0], dict): + return data + if isinstance(data, dict): + if "_id" in data or "id" in data: + return [data] + if "completed" in data: + return data["completed"] + log.warning("Unknown WS frame shape: %s", type(data)) + return None + + +async def listen( + on_game: Callable[[Dict[str, Any]], Awaitable[None]], + *, + stop_after: int = 0, +) -> None: + """ + Maintain a persistent WSS connection to the TSS endpoint. + + Args: + on_game: Async callback invoked once per game dict received. + stop_after: If > 0, stop after this many games have been delivered. + """ + if not WS_URL: + log.error("SPECTRA_WS_TSS_URL is not set in TSSBOT/.env — aborting") + return + + headers = {"Authorization": _auth_header()} + decompressor = zstd.ZstdDecompressor() + reconnect_delay = 1 + received = 0 + + while True: + try: + log.info("Connecting → %s", WS_URL) + async with wsconnect( + WS_URL, + additional_headers=headers, + max_size=32 * 1024 * 1024, + ) as ws: + log.info("Connected.") + reconnect_delay = 1 # reset on successful connect + async for message in ws: + raw = message.encode("utf-8") if isinstance(message, str) else bytes(message) + try: + text = decompressor.decompress(raw, max_output_size=64 * 1024 * 1024).decode("utf-8") + except zstd.ZstdError: + text = raw.decode("utf-8") + + try: + data = json.loads(text) + except json.JSONDecodeError: + log.warning("Bad JSON frame, skipping") + continue + + games = normalize(data) + if not games: + continue + + for game in games: + received += 1 + await on_game(game) + if stop_after and received >= stop_after: + log.info("Captured %d game(s), done.", received) + return + + except Exception as exc: + log.error("WS error: %s", exc) + + log.info("Reconnecting in %ds...", reconnect_delay) + await asyncio.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, 30) + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + +async def _handle_game(game: Dict[str, Any]) -> None: + out = _write_game(game) + sid = _session_id(game) + log.info("Saved game %s → %s", sid, out) + print(json.dumps(game, indent=2, ensure_ascii=False)) + print("-" * 80) + + +async def main() -> None: + one_shot = "--one" in sys.argv + if one_shot: + log.info("--one mode: will exit after the first game arrives") + await listen(_handle_game, stop_after=1) + else: + log.info("Listening forever (Ctrl-C to stop)") + await listen(_handle_game) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + log.info("Stopped.")