Files
SREBOT/BOT/lux_apis.py
T
2026-06-29 11:05:51 -07:00

321 lines
11 KiB
Python

"""
lux_apis.py
Async client for the Spectra gaming API. Provides a WebSocket listener for real-time
replay streaming with auto-reconnect, HTTP functions for fetching squadron leaderboards,
and data transformation utilities to convert API responses into the local format.
"""
# Standard Library Imports
import asyncio
import json
import logging
import os
from typing import Any, Awaitable, Callable, Dict, List, Optional
# Third-Party Library Imports
import aiohttp
from dotenv import load_dotenv
from websockets.asyncio.client import connect as wsconnect
from spectra_ws_payload import SpectraPayloadError, decode_spectra_ws_payload
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
# Global replay queue for WebSocket messages
_replay_queue: asyncio.Queue = asyncio.Queue()
# Constants from environment
WS_URL = os.getenv("SPECTRA_WS_SQB_URL", "")
API_KEY = os.getenv("SPECTRA_API_KEY", "")
SPECTRA_API_URL = os.getenv("SPECTRA_API_URL", "")
LEADERBOARD_PATH = "/v1/game/leaderboard"
REPLAY_SORT_PATH = "/v1/replays/sort"
def normalize_ws_message(data: Any) -> Optional[List[Dict[str, Any]]]:
"""
Normalize WebSocket message to list of replay dicts.
Handles: full replay data, array of replays, or wrapped containers.
Returns:
List of replay dicts, or None if format is unknown
"""
# Case 1: Already a list of full replay objects
if isinstance(data, list) and data and isinstance(data[0], dict):
return data
# Case 2: Single replay object (WS sends '_id', not 'id')
if isinstance(data, dict) and ('_id' in data or 'id' in data) and ('teams' in data or 'players' in data):
return [data]
# Case 3: Wrapped in container (like fetch_replays response)
if isinstance(data, dict) and 'completed' in data:
return data['completed']
if isinstance(data, dict) and isinstance(data.get('data'), dict):
return [data['data']]
if isinstance(data, dict) and isinstance(data.get('data'), list):
return data['data']
logger.warning(f"Unknown WS message format: {type(data)}")
return None
def get_replay_queue() -> asyncio.Queue:
"""Get the global replay queue for external access."""
return _replay_queue
async def ws_replay_listener(callback: Callable[[List[Dict[str, Any]]], Awaitable[None]]) -> None:
"""
Maintain persistent WebSocket connection to Spectra replay endpoint.
Queues incoming replays for sequential processing by the callback.
Auto-reconnects on disconnect with exponential backoff.
Args:
callback: Async function to call with normalized replay data
"""
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
headers = {"Authorization": auth_value}
reconnect_delay = 1
# Start queue processor as background task
processor_task = asyncio.create_task(_process_replay_queue(callback))
async def _connect_and_listen(url: str, label: str):
logger.info(f"WS attempting connect → {url}")
async with wsconnect(url, additional_headers=headers, max_size=32 * 1024 * 1024) as ws:
logger.info(f"WebSocket connected to {label}")
async for message in ws:
try:
data = decode_spectra_ws_payload(message)
replays = normalize_ws_message(data)
if replays:
await _replay_queue.put(replays)
except SpectraPayloadError as e:
logger.warning(f"Invalid Spectra WS frame: {e}")
except Exception as e:
logger.error(f"Error processing WS message: {e}")
try:
while True:
primary_url = WS_URL
primary_label = "Spectra"
try:
await _connect_and_listen(primary_url, primary_label)
except Exception as e:
logger.error(f"WebSocket error ({primary_label}): {e}")
# Reconnect with exponential backoff
logger.info(f"Reconnecting in {reconnect_delay}s...")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 30) # Cap at 30s
finally:
processor_task.cancel()
async def _process_replay_queue(callback: Callable[[List[Dict[str, Any]]], Awaitable[None]]) -> None:
"""
Process queued replays in batches.
Waits for at least one message, then drains queue and processes all at once.
"""
while True:
try:
# Wait for at least one message
replays = await _replay_queue.get()
batch = list(replays)
_replay_queue.task_done()
# Drain any others that accumulated while processing
while not _replay_queue.empty():
try:
more = _replay_queue.get_nowait()
batch.extend(more)
_replay_queue.task_done()
except asyncio.QueueEmpty:
break
if batch:
try:
await callback(batch)
except Exception as e:
logger.error(f"Error in replay callback: {e}")
except asyncio.CancelledError:
logger.info("Replay queue processor cancelled")
break
except Exception as e:
logger.error(f"Error in queue processor: {e}")
async def fetch_leaderboard(
count: int = 200,
start: int = 0,
short: bool = False
) -> Optional[List[Dict[str, Any]]]:
"""
Fetch squadron leaderboard data from Spectra API.
Args:
count: Number of clans to fetch (default 200)
start: Start position in leaderboard (default 0)
short: If True, exclude player data from response (default False)
Returns:
JSON response with leaderboard data, or None on failure
"""
url = f"{SPECTRA_API_URL}{LEADERBOARD_PATH}"
# API_KEY may or may not include "Bearer " prefix - handle both cases
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
headers = {
"accept": "application/json",
"count": str(count),
"start": str(start),
"short": str(short).lower(),
"Authorization": auth_value
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return data
else:
logger.error(f"Leaderboard API error: {response.status} - {await response.text()}")
except Exception as e:
logger.error(f"Failed to fetch leaderboard: {e}")
return None
async def fetch_leaderboard_bulk(
total_count: int = 1000,
batch_size: int = 200,
short: bool = False
) -> List[Dict[str, Any]]:
"""
Fetch multiple batches of leaderboard data.
Args:
total_count: Total number of clans to fetch (default 1000)
batch_size: Clans per request (default 200)
short: If True, exclude player data (default False)
Returns:
Combined list of all clan data from all batches
"""
all_clans = []
num_batches = (total_count + batch_size - 1) // batch_size
for i in range(num_batches):
start = i * batch_size
count = min(batch_size, total_count - start)
data = await fetch_leaderboard(count=count, start=start, short=short)
if data and isinstance(data, list):
all_clans.extend(data)
elif data:
logger.warning(f"Unexpected response format for batch {i+1}")
return all_clans
async def fetch_replay_by_id(
replay_id: str,
sort_field: str = "sqb"
) -> Optional[Dict[str, Any]]:
"""
Fetch a single replay by ID from Spectra API.
Args:
replay_id: The replay ID to fetch
sort_field: Game type sort field (default "sqb")
Returns:
Raw API response dict, or None on failure
"""
url = f"{SPECTRA_API_URL}{REPLAY_SORT_PATH}"
auth_value = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
headers = {
"accept": "application/json",
"Authorization": auth_value,
"sortField": sort_field,
"id": replay_id,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
logger.error(f"fetch_replay_by_id error: {response.status} - {await response.text()}")
except Exception as e:
logger.error(f"Failed to fetch replay {replay_id}: {e}")
return None
async def test_leaderboard():
"""Test function to fetch leaderboard data and print results."""
print(f"Fetching leaderboard from {SPECTRA_API_URL}...")
print(f"API Key configured: {'Yes' if API_KEY else 'No'}")
if API_KEY:
# Show first/last few chars for debugging without exposing full key
masked = f"{API_KEY[:15]}...{API_KEY[-8:]}" if len(API_KEY) > 25 else "[short key]"
print(f"API Key format: {masked}")
print()
# Fetch 100 clans with player data
print("=== Fetching 100 clans in batches of 25 ===")
all_data = await fetch_leaderboard_bulk(total_count=100, batch_size=25, short=False)
print(f"Total clans fetched: {len(all_data)}")
# Write to JSON file
output_file = "leaderboard_test_output.json"
with open(output_file, "w") as f:
json.dump(all_data, f, indent=2, default=str)
print(f"Data written to {output_file}")
async def test_fetch_replay_by_id():
"""Test function to fetch a single replay by ID and print results."""
test_id = input("Enter replay ID to fetch: ").strip()
if not test_id:
print("No ID provided, aborting.")
return
print(f"Fetching replay {test_id} from {SPECTRA_API_URL}...")
data = await fetch_replay_by_id(test_id)
if data:
output_file = f"replay_{test_id}_output.json"
with open(output_file, "w") as f:
json.dump(data, f, indent=2, default=str)
print(f"Data written to {output_file}")
else:
print("No data returned.")
if __name__ == "__main__":
# Setup for direct execution
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Re-load env vars since module import was skipped
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("SPECTRA_API_KEY", "")
logging.basicConfig(level=logging.INFO)
mode = sys.argv[1] if len(sys.argv) > 1 else "replay"
if mode == "replay":
asyncio.run(test_fetch_replay_by_id())