ff420e131f
PR #1223 + fixup moved data_parser into BOTS/SHARED, but five BOT modules (analytics, autologging, botscript, lux_apis, meta_manager) still used `from .data_parser import ...`. That relative form looks inside the BOT package, which no longer contains data_parser, so the bot crashed at startup with ModuleNotFoundError. Add BOT/__init__.py to put BOTS/SHARED on sys.path at package import, then switch all five files to absolute `from data_parser import ...`. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
439 lines
15 KiB
Python
439 lines
15 KiB
Python
"""
|
|
lux_apis.py
|
|
|
|
Async client for the Spectra gaming API. Provides a WebSocket listener for real-time
|
|
replay streaming with auto-reconnect, HTTP functions for fetching squadron leaderboards,
|
|
and data transformation utilities to convert API responses into the local format.
|
|
"""
|
|
|
|
# Standard Library Imports
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
|
|
|
# Third-Party Library Imports
|
|
import aiohttp
|
|
import pygob
|
|
import zstandard as zstd
|
|
from dotenv import load_dotenv
|
|
from websockets.asyncio.client import connect as wsconnect
|
|
|
|
# Local Module Imports
|
|
try:
|
|
from data_parser import LangTableReader
|
|
from .utils import REPLAYS_DIR
|
|
except ImportError:
|
|
LangTableReader = None # Running directly, not as module
|
|
REPLAYS_DIR = None
|
|
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global replay queue for WebSocket messages
|
|
_replay_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
# Constants from environment
|
|
WS_URL = os.getenv("SPECTRA_WS_SQB_URL", "")
|
|
API_KEY = os.getenv("SPECTRA_API_KEY", "")
|
|
SPECTRA_API_URL = os.getenv("SPECTRA_API_URL", "")
|
|
WS_GOB_URL = os.getenv("SPECTRA_WS_GOB_URL", "")
|
|
LEADERBOARD_PATH = "/v1/game/leaderboard"
|
|
REPLAY_SORT_PATH = "/v1/replays/sort"
|
|
|
|
# Initialize translation reader for vehicle names
|
|
translate = LangTableReader("English") if LangTableReader else None
|
|
|
|
|
|
def _gob_to_dict(obj: object) -> Any:
|
|
"""Recursively convert pygob namedtuples to plain dicts for JSON serialization."""
|
|
fields = getattr(obj, '_fields', None)
|
|
if isinstance(obj, tuple) and fields is not None:
|
|
return {f: _gob_to_dict(getattr(obj, f)) for f in fields}
|
|
elif isinstance(obj, list):
|
|
return [_gob_to_dict(i) for i in obj]
|
|
elif isinstance(obj, dict):
|
|
return {
|
|
(k.decode('utf-8', errors='replace') if isinstance(k, bytes) else k): _gob_to_dict(v)
|
|
for k, v in obj.items()
|
|
}
|
|
elif isinstance(obj, bytes):
|
|
return obj.decode('utf-8', errors='replace')
|
|
return obj
|
|
|
|
|
|
def normalize_ws_message(data: Any) -> Optional[List[Dict[str, Any]]]:
|
|
"""
|
|
Normalize WebSocket message to list of replay dicts.
|
|
Handles: full replay data, array of replays, or wrapped containers.
|
|
|
|
Returns:
|
|
List of replay dicts, or None if format is unknown
|
|
"""
|
|
# Case 1: Already a list of full replay objects
|
|
if isinstance(data, list) and data and isinstance(data[0], dict):
|
|
return data
|
|
|
|
# Case 2: Single replay object (WS sends '_id', not 'id')
|
|
if isinstance(data, dict) and ('_id' in data or 'id' in data) and ('teams' in data or 'players' in data):
|
|
return [data]
|
|
|
|
# Case 3: Wrapped in container (like fetch_replays response)
|
|
if isinstance(data, dict) and 'completed' in data:
|
|
return data['completed']
|
|
|
|
logger.warning(f"Unknown WS message format: {type(data)}")
|
|
return None
|
|
|
|
|
|
def get_replay_queue() -> asyncio.Queue:
|
|
"""Get the global replay queue for external access."""
|
|
return _replay_queue
|
|
|
|
|
|
async def ws_replay_listener(callback: Callable[[List[Dict[str, Any]]], Awaitable[None]]) -> None:
|
|
"""
|
|
Maintain persistent WebSocket connection to Spectra replay endpoint.
|
|
Queues incoming replays for sequential processing by the callback.
|
|
Auto-reconnects on disconnect with exponential backoff.
|
|
|
|
Args:
|
|
callback: Async function to call with normalized replay data
|
|
"""
|
|
headers = {'Authorization': API_KEY}
|
|
reconnect_delay = 1
|
|
|
|
# Start queue processor as background task
|
|
processor_task = asyncio.create_task(_process_replay_queue(callback))
|
|
|
|
async def _connect_and_listen(url: str, label: str):
|
|
logger.info(f"WS attempting connect → {url}")
|
|
async with wsconnect(url, additional_headers=headers) as ws:
|
|
logger.info(f"WebSocket connected to {label}")
|
|
async for message in ws:
|
|
try:
|
|
data = json.loads(message)
|
|
replays = normalize_ws_message(data)
|
|
if replays:
|
|
await _replay_queue.put(replays)
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"Invalid JSON from WS: {message[:100]}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing WS message: {e}")
|
|
|
|
try:
|
|
while True:
|
|
primary_url = WS_URL
|
|
primary_label = "Spectra"
|
|
try:
|
|
await _connect_and_listen(primary_url, primary_label)
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error ({primary_label}): {e}")
|
|
|
|
# Reconnect with exponential backoff
|
|
logger.info(f"Reconnecting in {reconnect_delay}s...")
|
|
await asyncio.sleep(reconnect_delay)
|
|
reconnect_delay = min(reconnect_delay * 2, 30) # Cap at 30s
|
|
finally:
|
|
processor_task.cancel()
|
|
|
|
|
|
async def _process_replay_queue(callback: Callable[[List[Dict[str, Any]]], Awaitable[None]]) -> None:
|
|
"""
|
|
Process queued replays in batches.
|
|
Waits for at least one message, then drains queue and processes all at once.
|
|
"""
|
|
while True:
|
|
try:
|
|
# Wait for at least one message
|
|
replays = await _replay_queue.get()
|
|
batch = list(replays)
|
|
_replay_queue.task_done()
|
|
|
|
# Drain any others that accumulated while processing
|
|
while not _replay_queue.empty():
|
|
try:
|
|
more = _replay_queue.get_nowait()
|
|
batch.extend(more)
|
|
_replay_queue.task_done()
|
|
except asyncio.QueueEmpty:
|
|
break
|
|
|
|
if batch:
|
|
try:
|
|
await callback(batch)
|
|
except Exception as e:
|
|
logger.error(f"Error in replay callback: {e}")
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Replay queue processor cancelled")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in queue processor: {e}")
|
|
|
|
|
|
async def fetch_leaderboard(
|
|
count: int = 200,
|
|
start: int = 0,
|
|
short: bool = False
|
|
) -> Optional[List[Dict[str, Any]]]:
|
|
"""
|
|
Fetch squadron leaderboard data from Spectra API.
|
|
|
|
Args:
|
|
count: Number of clans to fetch (default 200)
|
|
start: Start position in leaderboard (default 0)
|
|
short: If True, exclude player data from response (default False)
|
|
|
|
Returns:
|
|
JSON response with leaderboard data, or None on failure
|
|
"""
|
|
url = f"{SPECTRA_API_URL}{LEADERBOARD_PATH}"
|
|
# API_KEY may or may not include "Bearer " prefix - handle both cases
|
|
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
|
headers = {
|
|
"accept": "application/json",
|
|
"count": str(count),
|
|
"start": str(start),
|
|
"short": str(short).lower(),
|
|
"Authorization": auth_value
|
|
}
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, headers=headers) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
return data
|
|
else:
|
|
logger.error(f"Leaderboard API error: {response.status} - {await response.text()}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch leaderboard: {e}")
|
|
|
|
return None
|
|
|
|
|
|
async def fetch_leaderboard_bulk(
|
|
total_count: int = 1000,
|
|
batch_size: int = 200,
|
|
short: bool = False
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Fetch multiple batches of leaderboard data.
|
|
|
|
Args:
|
|
total_count: Total number of clans to fetch (default 1000)
|
|
batch_size: Clans per request (default 200)
|
|
short: If True, exclude player data (default False)
|
|
|
|
Returns:
|
|
Combined list of all clan data from all batches
|
|
"""
|
|
all_clans = []
|
|
num_batches = (total_count + batch_size - 1) // batch_size
|
|
|
|
for i in range(num_batches):
|
|
start = i * batch_size
|
|
count = min(batch_size, total_count - start)
|
|
|
|
data = await fetch_leaderboard(count=count, start=start, short=short)
|
|
|
|
if data and isinstance(data, list):
|
|
all_clans.extend(data)
|
|
elif data:
|
|
logger.warning(f"Unexpected response format for batch {i+1}")
|
|
|
|
return all_clans
|
|
|
|
|
|
async def fetch_replay_by_id(
|
|
replay_id: str,
|
|
sort_field: str = "sqb"
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fetch a single replay by ID from Spectra API.
|
|
|
|
Args:
|
|
replay_id: The replay ID to fetch
|
|
sort_field: Game type sort field (default "sqb")
|
|
|
|
Returns:
|
|
Raw API response dict, or None on failure
|
|
"""
|
|
url = f"{SPECTRA_API_URL}{REPLAY_SORT_PATH}"
|
|
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
|
headers = {
|
|
"accept": "application/json",
|
|
"Authorization": auth_value,
|
|
"sortField": sort_field,
|
|
"id": replay_id,
|
|
}
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, headers=headers) as response:
|
|
if response.status == 200:
|
|
return await response.json()
|
|
else:
|
|
logger.error(f"fetch_replay_by_id error: {response.status} - {await response.text()}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch replay {replay_id}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
async def test_leaderboard():
|
|
"""Test function to fetch leaderboard data and print results."""
|
|
print(f"Fetching leaderboard from {SPECTRA_API_URL}...")
|
|
print(f"API Key configured: {'Yes' if API_KEY else 'No'}")
|
|
if API_KEY:
|
|
# Show first/last few chars for debugging without exposing full key
|
|
masked = f"{API_KEY[:15]}...{API_KEY[-8:]}" if len(API_KEY) > 25 else "[short key]"
|
|
print(f"API Key format: {masked}")
|
|
print()
|
|
|
|
# Fetch 100 clans with player data
|
|
print("=== Fetching 100 clans in batches of 25 ===")
|
|
all_data = await fetch_leaderboard_bulk(total_count=100, batch_size=25, short=False)
|
|
print(f"Total clans fetched: {len(all_data)}")
|
|
|
|
# Write to JSON file
|
|
output_file = "leaderboard_test_output.json"
|
|
with open(output_file, "w") as f:
|
|
json.dump(all_data, f, indent=2, default=str)
|
|
print(f"Data written to {output_file}")
|
|
|
|
|
|
async def test_fetch_replay_by_id():
|
|
"""Test function to fetch a single replay by ID and print results."""
|
|
test_id = input("Enter replay ID to fetch: ").strip()
|
|
if not test_id:
|
|
print("No ID provided, aborting.")
|
|
return
|
|
|
|
print(f"Fetching replay {test_id} from {SPECTRA_API_URL}...")
|
|
data = await fetch_replay_by_id(test_id)
|
|
if data:
|
|
output_file = f"replay_{test_id}_output.json"
|
|
with open(output_file, "w") as f:
|
|
json.dump(data, f, indent=2, default=str)
|
|
print(f"Data written to {output_file}")
|
|
else:
|
|
print("No data returned.")
|
|
|
|
|
|
async def ws_gob_listener(callback: Callable[[bytes, bytes], Awaitable[None]]) -> None:
|
|
"""
|
|
Maintain persistent WebSocket connection to the Spectra SQB .gob endpoint.
|
|
Server pushes raw zstd-compressed .gob binary after each SQB replay is parsed.
|
|
Client does not send messages.
|
|
|
|
Args:
|
|
callback: Async function called with (compressed_bytes, decompressed_bytes)
|
|
"""
|
|
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
|
headers = {"Authorization": auth_value}
|
|
decompressor = zstd.ZstdDecompressor()
|
|
reconnect_delay = 1
|
|
|
|
async def _connect_gob(url: str, label: str):
|
|
logger.info(f"GOB WS attempting connect → {url}")
|
|
async with wsconnect(url, additional_headers=headers) as ws:
|
|
logger.info(f"WebSocket connected to {label}")
|
|
reconnect_delay_ref = 1 # noqa: F841 — reset handled by caller
|
|
async for message in ws:
|
|
try:
|
|
raw = bytes(message) if isinstance(message, (bytes, bytearray, memoryview)) else message.encode()
|
|
data = decompressor.decompress(raw)
|
|
await callback(raw, data)
|
|
except zstd.ZstdError as e:
|
|
logger.error(f"zstd decompression failed: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing GOB message: {e}")
|
|
|
|
while True:
|
|
primary_url = WS_GOB_URL
|
|
primary_label = "Spectra GOB endpoint"
|
|
|
|
try:
|
|
await _connect_gob(primary_url, primary_label)
|
|
except Exception as e:
|
|
logger.error(f"GOB WebSocket error ({primary_label}): {e}")
|
|
|
|
logger.info(f"GOB WS reconnecting in {reconnect_delay}s...")
|
|
await asyncio.sleep(reconnect_delay)
|
|
reconnect_delay = min(reconnect_delay * 2, 30)
|
|
|
|
|
|
async def test_gob_ws():
|
|
"""
|
|
Connect to the SQB GOB WebSocket and dump received messages to files.
|
|
Each decompressed .gob blob is written to STORAGE/REPLAYS/<session_id>.gob for inspection.
|
|
"""
|
|
from pathlib import Path
|
|
if REPLAYS_DIR is None:
|
|
raise RuntimeError("REPLAYS_DIR is not configured")
|
|
replays_dir = REPLAYS_DIR
|
|
replays_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
|
print(f"Connecting to {WS_GOB_URL}")
|
|
print(f"API Key configured: {'Yes' if API_KEY else 'No'}")
|
|
print(f"Saving to {replays_dir}")
|
|
print("Waiting for messages (Ctrl+C to stop)...\n")
|
|
|
|
decompressor = zstd.ZstdDecompressor()
|
|
count = 0
|
|
|
|
async with wsconnect(WS_GOB_URL, additional_headers={"Authorization": auth_value}) as ws:
|
|
print("Connected.")
|
|
async for message in ws:
|
|
raw = bytes(message) if isinstance(message, (bytes, bytearray, memoryview)) else message.encode()
|
|
print(f"[{count}] Received {len(raw)} bytes (compressed)")
|
|
data = b""
|
|
try:
|
|
data = decompressor.decompress(raw)
|
|
print(f"[{count}] Decompressed to {len(data)} bytes")
|
|
replay = pygob.load(data)
|
|
d = _gob_to_dict(replay)
|
|
session_id = d.get("SessionID", count)
|
|
out = replays_dir / f"{session_id}.json"
|
|
out.write_text(json.dumps(d, indent=2, default=str), encoding="utf-8")
|
|
print(f"[{count}] Decoded and written to {out}\n")
|
|
except zstd.ZstdError as e:
|
|
print(f"[{count}] zstd decompression failed: {e}")
|
|
out = replays_dir / f"gob_{count}_raw.bin"
|
|
out.write_bytes(raw)
|
|
print(f"[{count}] Raw bytes written to {out}\n")
|
|
except Exception as e:
|
|
print(f"[{count}] gob decode failed: {e}")
|
|
if data:
|
|
out = replays_dir / f"gob_{count}.gob"
|
|
out.write_bytes(data)
|
|
print(f"[{count}] Raw gob written to {out}\n")
|
|
count += 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Setup for direct execution
|
|
import sys
|
|
from pathlib import Path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
# Re-load env vars since module import was skipped
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
API_KEY = os.getenv("SPECTRA_API_KEY", "")
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
mode = sys.argv[1] if len(sys.argv) > 1 else "replay"
|
|
if mode == "replay":
|
|
asyncio.run(test_fetch_replay_by_id())
|
|
elif mode == "gob":
|
|
asyncio.run(test_gob_ws())
|