Files
TSSBOT/tss_ws.py
T
NotSoToothless a5abecb918 Auto merge dev → main (#1361)
* fix: don't freeze tournament brackets that overrun their scheduled end

date_end (TSS dateEndTournament) is only the *scheduled* end; tournaments
overrun it. compute_status now trusts in_active (GetActiveTournaments
presence) over date_end, and needs_scan no longer disables re-scans past
date_end + buffer. Fixes brackets stuck while their final was still pending.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* refactor: split _store_scan_sync into reusable upsert helpers

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: store_schedule writes meta/matches/standings without wiping battles

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* refactor: extract gather_structure_sync; add fetch_schedule_sync (no battles)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: reconcile_battles gap-fills replay links for played matches only

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: debounced schedule refresh coalesces game bursts into one TSS call

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: maybe_link_battle links replays from game match_id with zero TSS calls

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: route received games through maybe_link_battle (fast-path bracket link)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* tss_tournaments: three small final-review cleanups

- link_battle_sync: drop unused team_a_name/team_b_name from SELECT and
  fix status index from r[3] to r[1]
- _replace_battles: add clarifying comment about fast-path caveat and
  that schedule refresh deliberately skips this function
- store_scan / store_schedule: dispatch sqlite writes off the event loop
  via run_in_thread instead of calling sync writers directly

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-29 04:23:34 -07:00

213 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
tss_ws.py
Standalone TSS WebSocket listener for TSSBOT.
Each received game is written to:
STORAGE_VOL_PATH/REPLAYS/TSS/<session_id>/replay_data.json.gz
Run modes:
python tss_ws.py — listen forever, write every game
python tss_ws.py --one — capture exactly one game, write it, then exit
Reads SPECTRA_WS_TSS_URL, SPECTRA_API_KEY, and STORAGE_VOL_PATH from TSSBOT/.env.
"""
from __future__ import annotations
import asyncio
import gzip
import json
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any, Callable, Awaitable, List, Dict, Optional
from dotenv import load_dotenv
from websockets.asyncio.client import connect as wsconnect
from BOT.storage import insert_match, insert_player_games, upsert_tss_teams
from BOT.autologging import process_game as autolog_process_game
from BOT.receiver_bridge import publish_replay_batch
from spectra_ws_payload import SpectraPayloadError, decode_spectra_ws_payload
_HERE = Path(__file__).resolve().parent
load_dotenv(dotenv_path=_HERE / ".env")
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] [tss-ws] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("tss-ws")
WS_URL = os.getenv("SPECTRA_WS_TSS_URL", "").strip()
API_KEY = os.getenv("SPECTRA_API_KEY", "").strip()
_storage_raw = os.getenv("STORAGE_VOL_PATH", "").strip()
if not _storage_raw:
log.error("STORAGE_VOL_PATH is not set in TSSBOT/.env — aborting")
sys.exit(1)
REPLAYS_DIR = Path(_storage_raw) / "REPLAYS" / "TSS"
REPLAYS_DIR.mkdir(parents=True, exist_ok=True)
def _auth_header() -> str:
return API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
def _session_id(game: Dict[str, Any]) -> str:
"""Return a hex session ID, converting decimal string/int IDs from Spectra."""
raw = game.get("_id") or game.get("id")
if raw is None:
return f"unknown_{int(time.time())}"
try:
return hex(int(raw))[2:].lower()
except (ValueError, TypeError):
s = str(raw).strip().lower()
return s[2:] if s.startswith("0x") else s
def _write_game(game: Dict[str, Any]) -> Path:
"""Normalize _id to hex, then write to REPLAYS/TSS/<session_id>/replay_data.json.gz."""
sid = _session_id(game)
game["_id"] = sid # hex from this point forward
session_dir = REPLAYS_DIR / sid
session_dir.mkdir(parents=True, exist_ok=True)
out = session_dir / "replay_data.json.gz"
raw = json.dumps(game, ensure_ascii=False).encode("utf-8")
with gzip.open(out, "wb") as fh:
fh.write(raw)
return out
def normalize(data: Any) -> Optional[List[Dict[str, Any]]]:
"""Coerce whatever the WS sends into a list of game dicts."""
if isinstance(data, list) and data and isinstance(data[0], dict):
return data
if isinstance(data, dict):
if "_id" in data or "id" in data:
return [data]
if "completed" in data:
return data["completed"]
log.warning("Unknown WS frame shape: %s", type(data))
return None
async def listen(
on_game: Callable[[Dict[str, Any]], Awaitable[None]],
*,
stop_after: int = 0,
) -> None:
"""
Maintain a persistent WSS connection to the TSS endpoint.
Args:
on_game: Async callback invoked once per game dict received.
stop_after: If > 0, stop after this many games have been delivered.
"""
if not WS_URL:
log.error("SPECTRA_WS_TSS_URL is not set in TSSBOT/.env — aborting")
return
headers = {"Authorization": _auth_header()}
reconnect_delay = 1
received = 0
while True:
try:
log.info("Connecting → %s", WS_URL)
async with wsconnect(
WS_URL,
additional_headers=headers,
max_size=32 * 1024 * 1024,
) as ws:
log.info("Connected.")
reconnect_delay = 1 # reset on successful connect
async for message in ws:
try:
data = decode_spectra_ws_payload(message)
except SpectraPayloadError as exc:
log.warning("Bad Spectra WS frame, skipping: %s", exc)
continue
games = normalize(data)
if not games:
continue
for game in games:
received += 1
await on_game(game)
if stop_after and received >= stop_after:
log.info("Captured %d game(s), done.", received)
return
except Exception as exc:
log.error("WS error: %s", exc)
log.info("Reconnecting in %ds...", reconnect_delay)
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 30)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
async def _handle_game(game: Dict[str, Any]) -> None:
out = _write_game(game)
sid = game["_id"] # hex, normalized by _write_game
log.info("Saved game %s%s", sid, out)
try:
await insert_match(game)
await insert_player_games(game)
await upsert_tss_teams(game)
from BOT.match_logs import build_event_log, build_match_logs, upsert_match_logs
from BOT.storage import TSS_BATTLES_DB_PATH
chat_log, battle_log = build_match_logs(game)
await upsert_match_logs(TSS_BATTLES_DB_PATH, sid, chat_log, battle_log, build_event_log(game))
log.info("Stored game %s in DB", sid)
except Exception as exc:
log.error("DB insert failed for %s: %s", sid, exc)
# Forward the processed game to the relay gateway (tss channel). Best-effort:
# a bridge failure must never break ingest.
try:
await publish_replay_batch([game])
except Exception as exc:
log.warning("[TSS-BRIDGE] Failed to forward replay batch for %s: %s", sid, exc)
# Link this game's replay to its bracket slot from tss.match_id (zero TSS
# calls) and request a debounced schedule refresh; unknown matches fall back
# to a full scan inside maybe_link_battle. Never blocks ingest.
try:
from BOT.tss_tournaments import maybe_link_battle
await maybe_link_battle(game)
except Exception as exc:
log.error("tournament link trigger failed for %s: %s", sid, exc)
# Autolog match/dispatch (no-ops in standalone mode where no bot is set).
try:
await autolog_process_game(game)
except Exception as exc:
log.error("autolog failed for %s: %s", sid, exc)
print(json.dumps(game, indent=2, ensure_ascii=False))
print("-" * 80)
async def main() -> None:
one_shot = "--one" in sys.argv
if one_shot:
log.info("--one mode: will exit after the first game arrives")
await listen(_handle_game, stop_after=1)
else:
log.info("Listening forever (Ctrl-C to stop)")
await listen(_handle_game)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
log.info("Stopped.")