"""Quick test to check Spectra host connectivity — WS + HTTP.""" import asyncio import os import aiohttp from dotenv import load_dotenv from websockets.asyncio.client import connect load_dotenv() API_KEY = os.getenv("SPECTRA_API_KEY", "") BASE_URL = os.getenv("SPECTRA_API_URL", "") WS_URL = os.getenv("SPECTRA_WS_SQB_URL", "") WS_GOB_URL = os.getenv("SPECTRA_WS_GOB_URL", "") async def test_http(): """Test HTTP API endpoint.""" url = f"{BASE_URL}/v1/replays/sort" auth = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}" headers = {"accept": "application/json", "Authorization": auth, "sortField": "sqb", "id": "463391108263081012"} print(f"[HTTP] POST {url}") try: async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp: print(f"[HTTP] Status: {resp.status}") text = await resp.text() print(f"[HTTP] Body: {text[:500]}") except Exception as e: print(f"[HTTP] Failed: {type(e).__name__}: {e}") async def test_ws_sqb(): """Test SQB WebSocket connection.""" url = WS_URL print(f"\n[WS-SQB] Connecting to {url}") try: async with connect(url, additional_headers={"Authorization": API_KEY}, open_timeout=15) as ws: print("[WS-SQB] Connected! Waiting for message (10s)...") msg = await asyncio.wait_for(ws.recv(), timeout=10) print(f"[WS-SQB] Got message: {str(msg)[:300]}") except Exception as e: print(f"[WS-SQB] Failed: {type(e).__name__}: {e}") async def test_ws_gob(): """Test GOB WebSocket connection.""" url = WS_GOB_URL print(f"\n[WS-GOB] Connecting to {url}") try: async with connect(url, additional_headers={"Authorization": API_KEY}, open_timeout=15) as ws: print("[WS-GOB] Connected! Waiting for message (10s)...") msg = await asyncio.wait_for(ws.recv(), timeout=10) print(f"[WS-GOB] Got message ({len(msg)} bytes)") except Exception as e: print(f"[WS-GOB] Failed: {type(e).__name__}: {e}") async def main(): print(f"API Key configured: {'Yes' if API_KEY else 'No'}") print(f"Base URL: {BASE_URL}") print(f"WS URL: {WS_URL}") print(f"WS GOB URL: {WS_GOB_URL}\n") await test_http() await test_ws_sqb() await test_ws_gob() if __name__ == "__main__": asyncio.run(main())