update tss and sre replay area (#1269)
This commit is contained in:
@@ -0,0 +1,184 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
tss_ws.py
|
||||||
|
|
||||||
|
Standalone TSS WebSocket listener for TSSBOT.
|
||||||
|
|
||||||
|
Each received game is written to:
|
||||||
|
STORAGE_VOL_PATH/REPLAYS/TSS/<session_id>/replay_data.json.gz
|
||||||
|
|
||||||
|
Run modes:
|
||||||
|
python tss_ws.py — listen forever, write every game
|
||||||
|
python tss_ws.py --one — capture exactly one game, write it, then exit
|
||||||
|
|
||||||
|
Reads SPECTRA_WS_TSS_URL, SPECTRA_API_KEY, and STORAGE_VOL_PATH from TSSBOT/.env.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Callable, Awaitable, List, Dict, Optional
|
||||||
|
|
||||||
|
import zstandard as zstd
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from websockets.asyncio.client import connect as wsconnect
|
||||||
|
|
||||||
|
_HERE = Path(__file__).resolve().parent
|
||||||
|
load_dotenv(dotenv_path=_HERE / ".env")
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="[%(asctime)s] [%(levelname)s] [tss-ws] %(message)s",
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S",
|
||||||
|
)
|
||||||
|
log = logging.getLogger("tss-ws")
|
||||||
|
|
||||||
|
WS_URL = os.getenv("SPECTRA_WS_TSS_URL", "").strip()
|
||||||
|
API_KEY = os.getenv("SPECTRA_API_KEY", "").strip()
|
||||||
|
|
||||||
|
_storage_raw = os.getenv("STORAGE_VOL_PATH", "").strip()
|
||||||
|
if not _storage_raw:
|
||||||
|
log.error("STORAGE_VOL_PATH is not set in TSSBOT/.env — aborting")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
REPLAYS_DIR = Path(_storage_raw) / "REPLAYS" / "TSS"
|
||||||
|
REPLAYS_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _auth_header() -> str:
|
||||||
|
return API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
||||||
|
|
||||||
|
|
||||||
|
def _session_id(game: Dict[str, Any]) -> str:
|
||||||
|
"""Extract a filesystem-safe session ID from a game dict."""
|
||||||
|
raw = game.get("_id") or game.get("id")
|
||||||
|
if raw is None:
|
||||||
|
return f"unknown_{int(time.time())}"
|
||||||
|
# Numeric ID → hex string (matches SREBOT convention)
|
||||||
|
if isinstance(raw, int):
|
||||||
|
return hex(raw)[2:].lower()
|
||||||
|
s = str(raw).strip().lower()
|
||||||
|
return s[2:] if s.startswith("0x") else s
|
||||||
|
|
||||||
|
|
||||||
|
def _write_game(game: Dict[str, Any]) -> Path:
|
||||||
|
"""Write game dict to REPLAYS/TSS/<session_id>/replay_data.json.gz."""
|
||||||
|
sid = _session_id(game)
|
||||||
|
session_dir = REPLAYS_DIR / sid
|
||||||
|
session_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
out = session_dir / "replay_data.json.gz"
|
||||||
|
raw = json.dumps(game, ensure_ascii=False).encode("utf-8")
|
||||||
|
with gzip.open(out, "wb") as fh:
|
||||||
|
fh.write(raw)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def normalize(data: Any) -> Optional[List[Dict[str, Any]]]:
|
||||||
|
"""Coerce whatever the WS sends into a list of game dicts."""
|
||||||
|
if isinstance(data, list) and data and isinstance(data[0], dict):
|
||||||
|
return data
|
||||||
|
if isinstance(data, dict):
|
||||||
|
if "_id" in data or "id" in data:
|
||||||
|
return [data]
|
||||||
|
if "completed" in data:
|
||||||
|
return data["completed"]
|
||||||
|
log.warning("Unknown WS frame shape: %s", type(data))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def listen(
|
||||||
|
on_game: Callable[[Dict[str, Any]], Awaitable[None]],
|
||||||
|
*,
|
||||||
|
stop_after: int = 0,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Maintain a persistent WSS connection to the TSS endpoint.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
on_game: Async callback invoked once per game dict received.
|
||||||
|
stop_after: If > 0, stop after this many games have been delivered.
|
||||||
|
"""
|
||||||
|
if not WS_URL:
|
||||||
|
log.error("SPECTRA_WS_TSS_URL is not set in TSSBOT/.env — aborting")
|
||||||
|
return
|
||||||
|
|
||||||
|
headers = {"Authorization": _auth_header()}
|
||||||
|
decompressor = zstd.ZstdDecompressor()
|
||||||
|
reconnect_delay = 1
|
||||||
|
received = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
log.info("Connecting → %s", WS_URL)
|
||||||
|
async with wsconnect(
|
||||||
|
WS_URL,
|
||||||
|
additional_headers=headers,
|
||||||
|
max_size=32 * 1024 * 1024,
|
||||||
|
) as ws:
|
||||||
|
log.info("Connected.")
|
||||||
|
reconnect_delay = 1 # reset on successful connect
|
||||||
|
async for message in ws:
|
||||||
|
raw = message.encode("utf-8") if isinstance(message, str) else bytes(message)
|
||||||
|
try:
|
||||||
|
text = decompressor.decompress(raw, max_output_size=64 * 1024 * 1024).decode("utf-8")
|
||||||
|
except zstd.ZstdError:
|
||||||
|
text = raw.decode("utf-8")
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = json.loads(text)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
log.warning("Bad JSON frame, skipping")
|
||||||
|
continue
|
||||||
|
|
||||||
|
games = normalize(data)
|
||||||
|
if not games:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for game in games:
|
||||||
|
received += 1
|
||||||
|
await on_game(game)
|
||||||
|
if stop_after and received >= stop_after:
|
||||||
|
log.info("Captured %d game(s), done.", received)
|
||||||
|
return
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
log.error("WS error: %s", exc)
|
||||||
|
|
||||||
|
log.info("Reconnecting in %ds...", reconnect_delay)
|
||||||
|
await asyncio.sleep(reconnect_delay)
|
||||||
|
reconnect_delay = min(reconnect_delay * 2, 30)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# CLI entry point
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _handle_game(game: Dict[str, Any]) -> None:
|
||||||
|
out = _write_game(game)
|
||||||
|
sid = _session_id(game)
|
||||||
|
log.info("Saved game %s → %s", sid, out)
|
||||||
|
print(json.dumps(game, indent=2, ensure_ascii=False))
|
||||||
|
print("-" * 80)
|
||||||
|
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
one_shot = "--one" in sys.argv
|
||||||
|
if one_shot:
|
||||||
|
log.info("--one mode: will exit after the first game arrives")
|
||||||
|
await listen(_handle_game, stop_after=1)
|
||||||
|
else:
|
||||||
|
log.info("Listening forever (Ctrl-C to stop)")
|
||||||
|
await listen(_handle_game)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
log.info("Stopped.")
|
||||||
Reference in New Issue
Block a user