Files
SREBOT/test_spectra.py

55 lines
1.8 KiB
Python

"""Quick test to check Spectra host connectivity — WS + HTTP."""
import asyncio
import os
import aiohttp
from dotenv import load_dotenv
from websockets.asyncio.client import connect
load_dotenv()
API_KEY = os.getenv("SPECTRA_API_KEY", "")
BASE_URL = os.getenv("SPECTRA_API_URL", "")
WS_URL = os.getenv("SPECTRA_WS_SQB_URL", "")
async def test_http():
"""Test HTTP API endpoint."""
url = f"{BASE_URL}/v1/replays/sort"
auth = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
headers = {"accept": "application/json", "Authorization": auth, "sortField": "sqb", "id": "463391108263081012"}
print(f"[HTTP] POST {url}")
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
print(f"[HTTP] Status: {resp.status}")
text = await resp.text()
print(f"[HTTP] Body: {text[:500]}")
except Exception as e:
print(f"[HTTP] Failed: {type(e).__name__}: {e}")
async def test_ws_sqb():
"""Test SQB WebSocket connection."""
url = WS_URL
print(f"\n[WS-SQB] Connecting to {url}")
try:
async with connect(url, additional_headers={"Authorization": API_KEY}, open_timeout=15) as ws:
print("[WS-SQB] Connected! Waiting for message (10s)...")
msg = await asyncio.wait_for(ws.recv(), timeout=10)
print(f"[WS-SQB] Got message: {str(msg)[:300]}")
except Exception as e:
print(f"[WS-SQB] Failed: {type(e).__name__}: {e}")
async def main():
print(f"API Key configured: {'Yes' if API_KEY else 'No'}")
print(f"Base URL: {BASE_URL}")
print(f"WS URL: {WS_URL}")
print()
await test_http()
await test_ws_sqb()
if __name__ == "__main__":
asyncio.run(main())