#!/usr/bin/env python3 """ tss_ws.py Standalone TSS WebSocket listener for TSSBOT. Each received game is written to: STORAGE_VOL_PATH/REPLAYS/TSS//replay_data.json.gz Run modes: python tss_ws.py — listen forever, write every game python tss_ws.py --one — capture exactly one game, write it, then exit Reads SPECTRA_WS_TSS_URL, SPECTRA_API_KEY, and STORAGE_VOL_PATH from TSSBOT/.env. """ from __future__ import annotations import asyncio import gzip import json import logging import os import sys import time from pathlib import Path from typing import Any, Callable, Awaitable, List, Dict, Optional import zstandard as zstd from dotenv import load_dotenv from websockets.asyncio.client import connect as wsconnect from BOT.storage import insert_match, insert_player_games _HERE = Path(__file__).resolve().parent load_dotenv(dotenv_path=_HERE / ".env") logging.basicConfig( level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [tss-ws] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("tss-ws") WS_URL = os.getenv("SPECTRA_WS_TSS_URL", "").strip() API_KEY = os.getenv("SPECTRA_API_KEY", "").strip() _storage_raw = os.getenv("STORAGE_VOL_PATH", "").strip() if not _storage_raw: log.error("STORAGE_VOL_PATH is not set in TSSBOT/.env — aborting") sys.exit(1) REPLAYS_DIR = Path(_storage_raw) / "REPLAYS" / "TSS" REPLAYS_DIR.mkdir(parents=True, exist_ok=True) def _auth_header() -> str: return API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}" def _session_id(game: Dict[str, Any]) -> str: """Return a hex session ID, converting decimal string/int IDs from Spectra.""" raw = game.get("_id") or game.get("id") if raw is None: return f"unknown_{int(time.time())}" try: return hex(int(raw))[2:].lower() except (ValueError, TypeError): s = str(raw).strip().lower() return s[2:] if s.startswith("0x") else s def _write_game(game: Dict[str, Any]) -> Path: """Normalize _id to hex, then write to REPLAYS/TSS//replay_data.json.gz.""" sid = _session_id(game) game["_id"] = sid # hex from this point forward session_dir = REPLAYS_DIR / sid session_dir.mkdir(parents=True, exist_ok=True) out = session_dir / "replay_data.json.gz" raw = json.dumps(game, ensure_ascii=False).encode("utf-8") with gzip.open(out, "wb") as fh: fh.write(raw) return out def normalize(data: Any) -> Optional[List[Dict[str, Any]]]: """Coerce whatever the WS sends into a list of game dicts.""" if isinstance(data, list) and data and isinstance(data[0], dict): return data if isinstance(data, dict): if "_id" in data or "id" in data: return [data] if "completed" in data: return data["completed"] log.warning("Unknown WS frame shape: %s", type(data)) return None async def listen( on_game: Callable[[Dict[str, Any]], Awaitable[None]], *, stop_after: int = 0, ) -> None: """ Maintain a persistent WSS connection to the TSS endpoint. Args: on_game: Async callback invoked once per game dict received. stop_after: If > 0, stop after this many games have been delivered. """ if not WS_URL: log.error("SPECTRA_WS_TSS_URL is not set in TSSBOT/.env — aborting") return headers = {"Authorization": _auth_header()} decompressor = zstd.ZstdDecompressor() reconnect_delay = 1 received = 0 while True: try: log.info("Connecting → %s", WS_URL) async with wsconnect( WS_URL, additional_headers=headers, max_size=32 * 1024 * 1024, ) as ws: log.info("Connected.") reconnect_delay = 1 # reset on successful connect async for message in ws: raw = message.encode("utf-8") if isinstance(message, str) else bytes(message) try: text = decompressor.decompress(raw, max_output_size=64 * 1024 * 1024).decode("utf-8") except zstd.ZstdError: text = raw.decode("utf-8") try: data = json.loads(text) except json.JSONDecodeError: log.warning("Bad JSON frame, skipping") continue games = normalize(data) if not games: continue for game in games: received += 1 await on_game(game) if stop_after and received >= stop_after: log.info("Captured %d game(s), done.", received) return except Exception as exc: log.error("WS error: %s", exc) log.info("Reconnecting in %ds...", reconnect_delay) await asyncio.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 2, 30) # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- async def _handle_game(game: Dict[str, Any]) -> None: out = _write_game(game) sid = game["_id"] # hex, normalized by _write_game log.info("Saved game %s → %s", sid, out) try: await insert_match(game) await insert_player_games(game) log.info("Stored game %s in DB", sid) except Exception as exc: log.error("DB insert failed for %s: %s", sid, exc) print(json.dumps(game, indent=2, ensure_ascii=False)) print("-" * 80) async def main() -> None: one_shot = "--one" in sys.argv if one_shot: log.info("--one mode: will exit after the first game arrives") await listen(_handle_game, stop_after=1) else: log.info("Listening forever (Ctrl-C to stop)") await listen(_handle_game) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: log.info("Stopped.")