32e747212f
* feat(tssbot): build_match_logs + match_logs persistence * feat(tssbot): create match_logs table and write logs at ingest * feat(tssbot): one-time match_logs backfill script * feat(srebot): persist chat/battle logs to match_logs (parity, no backfill) * feat(tssbot): Battle/Chat Log buttons on Discord scoreboards
198 lines
6.4 KiB
Python
198 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
tss_ws.py
|
|
|
|
Standalone TSS WebSocket listener for TSSBOT.
|
|
|
|
Each received game is written to:
|
|
STORAGE_VOL_PATH/REPLAYS/TSS/<session_id>/replay_data.json.gz
|
|
|
|
Run modes:
|
|
python tss_ws.py — listen forever, write every game
|
|
python tss_ws.py --one — capture exactly one game, write it, then exit
|
|
|
|
Reads SPECTRA_WS_TSS_URL, SPECTRA_API_KEY, and STORAGE_VOL_PATH from TSSBOT/.env.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import gzip
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Awaitable, List, Dict, Optional
|
|
|
|
from dotenv import load_dotenv
|
|
from websockets.asyncio.client import connect as wsconnect
|
|
|
|
from BOT.storage import insert_match, insert_player_games, upsert_tss_teams
|
|
from BOT.autologging import process_game as autolog_process_game
|
|
from spectra_ws_payload import SpectraPayloadError, decode_spectra_ws_payload
|
|
|
|
_HERE = Path(__file__).resolve().parent
|
|
load_dotenv(dotenv_path=_HERE / ".env")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="[%(asctime)s] [%(levelname)s] [tss-ws] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
log = logging.getLogger("tss-ws")
|
|
|
|
WS_URL = os.getenv("SPECTRA_WS_TSS_URL", "").strip()
|
|
API_KEY = os.getenv("SPECTRA_API_KEY", "").strip()
|
|
|
|
_storage_raw = os.getenv("STORAGE_VOL_PATH", "").strip()
|
|
if not _storage_raw:
|
|
log.error("STORAGE_VOL_PATH is not set in TSSBOT/.env — aborting")
|
|
sys.exit(1)
|
|
|
|
REPLAYS_DIR = Path(_storage_raw) / "REPLAYS" / "TSS"
|
|
REPLAYS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _auth_header() -> str:
|
|
return API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
|
|
|
|
|
def _session_id(game: Dict[str, Any]) -> str:
|
|
"""Return a hex session ID, converting decimal string/int IDs from Spectra."""
|
|
raw = game.get("_id") or game.get("id")
|
|
if raw is None:
|
|
return f"unknown_{int(time.time())}"
|
|
try:
|
|
return hex(int(raw))[2:].lower()
|
|
except (ValueError, TypeError):
|
|
s = str(raw).strip().lower()
|
|
return s[2:] if s.startswith("0x") else s
|
|
|
|
|
|
def _write_game(game: Dict[str, Any]) -> Path:
|
|
"""Normalize _id to hex, then write to REPLAYS/TSS/<session_id>/replay_data.json.gz."""
|
|
sid = _session_id(game)
|
|
game["_id"] = sid # hex from this point forward
|
|
session_dir = REPLAYS_DIR / sid
|
|
session_dir.mkdir(parents=True, exist_ok=True)
|
|
out = session_dir / "replay_data.json.gz"
|
|
raw = json.dumps(game, ensure_ascii=False).encode("utf-8")
|
|
with gzip.open(out, "wb") as fh:
|
|
fh.write(raw)
|
|
return out
|
|
|
|
|
|
def normalize(data: Any) -> Optional[List[Dict[str, Any]]]:
|
|
"""Coerce whatever the WS sends into a list of game dicts."""
|
|
if isinstance(data, list) and data and isinstance(data[0], dict):
|
|
return data
|
|
if isinstance(data, dict):
|
|
if "_id" in data or "id" in data:
|
|
return [data]
|
|
if "completed" in data:
|
|
return data["completed"]
|
|
log.warning("Unknown WS frame shape: %s", type(data))
|
|
return None
|
|
|
|
|
|
async def listen(
|
|
on_game: Callable[[Dict[str, Any]], Awaitable[None]],
|
|
*,
|
|
stop_after: int = 0,
|
|
) -> None:
|
|
"""
|
|
Maintain a persistent WSS connection to the TSS endpoint.
|
|
|
|
Args:
|
|
on_game: Async callback invoked once per game dict received.
|
|
stop_after: If > 0, stop after this many games have been delivered.
|
|
"""
|
|
if not WS_URL:
|
|
log.error("SPECTRA_WS_TSS_URL is not set in TSSBOT/.env — aborting")
|
|
return
|
|
|
|
headers = {"Authorization": _auth_header()}
|
|
reconnect_delay = 1
|
|
received = 0
|
|
|
|
while True:
|
|
try:
|
|
log.info("Connecting → %s", WS_URL)
|
|
async with wsconnect(
|
|
WS_URL,
|
|
additional_headers=headers,
|
|
max_size=32 * 1024 * 1024,
|
|
) as ws:
|
|
log.info("Connected.")
|
|
reconnect_delay = 1 # reset on successful connect
|
|
async for message in ws:
|
|
try:
|
|
data = decode_spectra_ws_payload(message)
|
|
except SpectraPayloadError as exc:
|
|
log.warning("Bad Spectra WS frame, skipping: %s", exc)
|
|
continue
|
|
|
|
games = normalize(data)
|
|
if not games:
|
|
continue
|
|
|
|
for game in games:
|
|
received += 1
|
|
await on_game(game)
|
|
if stop_after and received >= stop_after:
|
|
log.info("Captured %d game(s), done.", received)
|
|
return
|
|
|
|
except Exception as exc:
|
|
log.error("WS error: %s", exc)
|
|
|
|
log.info("Reconnecting in %ds...", reconnect_delay)
|
|
await asyncio.sleep(reconnect_delay)
|
|
reconnect_delay = min(reconnect_delay * 2, 30)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _handle_game(game: Dict[str, Any]) -> None:
|
|
out = _write_game(game)
|
|
sid = game["_id"] # hex, normalized by _write_game
|
|
log.info("Saved game %s → %s", sid, out)
|
|
try:
|
|
await insert_match(game)
|
|
await insert_player_games(game)
|
|
await upsert_tss_teams(game)
|
|
from BOT.match_logs import build_match_logs, upsert_match_logs
|
|
from BOT.storage import TSS_BATTLES_DB_PATH
|
|
chat_log, battle_log = build_match_logs(game)
|
|
await upsert_match_logs(TSS_BATTLES_DB_PATH, sid, chat_log, battle_log)
|
|
log.info("Stored game %s in DB", sid)
|
|
except Exception as exc:
|
|
log.error("DB insert failed for %s: %s", sid, exc)
|
|
# Autolog match/dispatch (no-ops in standalone mode where no bot is set).
|
|
try:
|
|
await autolog_process_game(game)
|
|
except Exception as exc:
|
|
log.error("autolog failed for %s: %s", sid, exc)
|
|
print(json.dumps(game, indent=2, ensure_ascii=False))
|
|
print("-" * 80)
|
|
|
|
|
|
async def main() -> None:
|
|
one_shot = "--one" in sys.argv
|
|
if one_shot:
|
|
log.info("--one mode: will exit after the first game arrives")
|
|
await listen(_handle_game, stop_after=1)
|
|
else:
|
|
log.info("Listening forever (Ctrl-C to stop)")
|
|
await listen(_handle_game)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
log.info("Stopped.")
|