""" receiver_bridge.py Bridge helpers for external SREBOT transfer. This module provides two pieces: 1. A formal SREBOT API client that external consumers can use to query the SREBOT HTTP API. 2. A persistent outbox for replay payloads so the external bridge service can fan them out over websocket. """ from __future__ import annotations import asyncio import json import logging import os from dataclasses import dataclass from pathlib import Path from urllib.parse import quote from typing import Any, Optional import aiofiles import aiohttp logger = logging.getLogger(__name__) def _env(name: str, default: str = "") -> str: value = os.getenv(name, default) return value.strip() _storage_root_raw = _env("STORAGE_VOL_PATH") if not _storage_root_raw: raise RuntimeError("STORAGE_VOL_PATH must be set") _STORAGE_ROOT = Path(_storage_root_raw) SREBOT_API_BASE_URL = _env("SREBOT_API_BASE_URL", _env("SREBOT_HTTP_URL", "http://127.0.0.1:6000")).rstrip("/") SREBOT_API_BEARER_TOKEN = _env("SREBOT_API_BEARER_TOKEN") EXTERNAL_OUTBOX_PATH = Path(_env("SREBOT_EXTERNAL_OUTBOX_PATH", str(_STORAGE_ROOT / "external_bridge_outbox.jsonl"))) EXTERNAL_OUTBOX_PATH.parent.mkdir(parents=True, exist_ok=True) @dataclass(slots=True) class SREBOTApiClient: """Typed HTTP client for the SREBOT read-only API.""" base_url: str = SREBOT_API_BASE_URL bearer_token: str = SREBOT_API_BEARER_TOKEN timeout_seconds: float = 30.0 def _headers(self) -> dict[str, str]: headers = {"Accept": "application/json"} if self.bearer_token: headers["Authorization"] = f"Bearer {self.bearer_token}" return headers async def _request(self, path: str, params: Optional[dict[str, Any]] = None) -> Any: url = f"{self.base_url}/{path.lstrip('/')}" timeout = aiohttp.ClientTimeout(total=self.timeout_seconds) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url, headers=self._headers(), params=params) as response: response.raise_for_status() return await response.json() async def get_info(self) -> Any: return await self._request("/api/info") async def get_player(self, uid: str, **params: Any) -> Any: return await self._request(f"/api/player/{quote(str(uid), safe='')}", params=params or None) async def get_player_games(self, uid: str, **params: Any) -> Any: return await self._request(f"/api/player/{quote(str(uid), safe='')}/games", params=params or None) async def get_player_history(self, uid: str) -> Any: return await self._request(f"/api/player/{quote(str(uid), safe='')}/history") async def search_players(self, nickname: str) -> Any: return await self._request(f"/api/search/{quote(str(nickname), safe='')}") async def get_live(self, **params: Any) -> Any: return await self._request("/api/live", params=params or None) async def get_match(self, session_id: str) -> Any: return await self._request(f"/api/match/{quote(str(session_id), safe='')}") async def get_match_replay(self, session_id: str) -> Any: return await self._request(f"/api/match/{quote(str(session_id), safe='')}/replay") async def search_games(self, **params: Any) -> Any: return await self._request("/api/games/search", params=params or None) async def get_maps(self) -> Any: return await self._request("/api/maps") async def get_squadron(self, squadron_name: str, **params: Any) -> Any: return await self._request(f"/api/squadrons/{quote(str(squadron_name), safe='')}", params=params or None) async def get_leaderboard_players(self, **params: Any) -> Any: return await self._request("/api/leaderboard/players", params=params or None) async def get_leaderboard_squadrons(self, **params: Any) -> Any: return await self._request("/api/leaderboard/squadrons", params=params or None) async def get_leaderboard_vehicles(self, **params: Any) -> Any: return await self._request("/api/leaderboard/vehicles", params=params or None) async def get_leaderboard_stats(self) -> Any: return await self._request("/api/leaderboard/stats") _default_api_client = SREBOTApiClient() async def fetch_api_info() -> Any: return await _default_api_client.get_info() async def fetch_player(uid: str, **params: Any) -> Any: return await _default_api_client.get_player(uid, **params) async def fetch_player_games(uid: str, **params: Any) -> Any: return await _default_api_client.get_player_games(uid, **params) async def fetch_player_history(uid: str) -> Any: return await _default_api_client.get_player_history(uid) async def search_players(nickname: str) -> Any: return await _default_api_client.search_players(nickname) async def fetch_live(**params: Any) -> Any: return await _default_api_client.get_live(**params) async def fetch_match(session_id: str) -> Any: return await _default_api_client.get_match(session_id) async def fetch_match_replay(session_id: str) -> Any: return await _default_api_client.get_match_replay(session_id) async def search_games(**params: Any) -> Any: return await _default_api_client.search_games(**params) async def fetch_maps() -> Any: return await _default_api_client.get_maps() async def fetch_squadron(squadron_name: str, **params: Any) -> Any: return await _default_api_client.get_squadron(squadron_name, **params) async def fetch_leaderboard_players(**params: Any) -> Any: return await _default_api_client.get_leaderboard_players(**params) async def fetch_leaderboard_squadrons(**params: Any) -> Any: return await _default_api_client.get_leaderboard_squadrons(**params) async def fetch_leaderboard_vehicles(**params: Any) -> Any: return await _default_api_client.get_leaderboard_vehicles(**params) async def fetch_leaderboard_stats() -> Any: return await _default_api_client.get_leaderboard_stats() _EXTERNAL_OUTBOX_LOCK: asyncio.Lock | None = None def _get_external_outbox_lock() -> asyncio.Lock: global _EXTERNAL_OUTBOX_LOCK if _EXTERNAL_OUTBOX_LOCK is None: _EXTERNAL_OUTBOX_LOCK = asyncio.Lock() return _EXTERNAL_OUTBOX_LOCK async def _append_external_envelope(envelope: dict[str, Any]) -> None: line = json.dumps(envelope, ensure_ascii=False, separators=(",", ":")) async with _get_external_outbox_lock(): async with aiofiles.open(EXTERNAL_OUTBOX_PATH, "a", encoding="utf-8") as handle: await handle.write(line + "\n") logger.info( "Bridge envelope queued", extra={ "event_type": envelope.get("type"), "outbox_path": str(EXTERNAL_OUTBOX_PATH), }, ) async def publish_replay_batch(replays: list[dict[str, Any]]) -> None: """Queue a replay batch for websocket delivery by the external bridge.""" if not replays: return envelope = { "type": "spectra.replay_batch", "version": 1, "source": "srebot", "payload": {"replays": replays}, } await _append_external_envelope(envelope) async def publish_event(event_type: str, payload: dict[str, Any]) -> None: """Generic queue helper for future bridge events.""" envelope = { "type": event_type, "version": 1, "source": "srebot", "payload": payload, } await _append_external_envelope(envelope)