55 lines
1.8 KiB
Python
55 lines
1.8 KiB
Python
"""Quick test to check Spectra host connectivity — WS + HTTP."""
|
|
import asyncio
|
|
import os
|
|
import aiohttp
|
|
from dotenv import load_dotenv
|
|
from websockets.asyncio.client import connect
|
|
|
|
load_dotenv()
|
|
|
|
API_KEY = os.getenv("SPECTRA_API_KEY", "")
|
|
BASE_URL = os.getenv("SPECTRA_API_URL", "")
|
|
WS_URL = os.getenv("SPECTRA_WS_SQB_URL", "")
|
|
|
|
|
|
async def test_http():
|
|
"""Test HTTP API endpoint."""
|
|
url = f"{BASE_URL}/v1/replays/sort"
|
|
auth = API_KEY if API_KEY.startswith("Bearer ") else f"Bearer {API_KEY}"
|
|
headers = {"accept": "application/json", "Authorization": auth, "sortField": "sqb", "id": "463391108263081012"}
|
|
print(f"[HTTP] POST {url}")
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
|
|
print(f"[HTTP] Status: {resp.status}")
|
|
text = await resp.text()
|
|
print(f"[HTTP] Body: {text[:500]}")
|
|
except Exception as e:
|
|
print(f"[HTTP] Failed: {type(e).__name__}: {e}")
|
|
|
|
|
|
async def test_ws_sqb():
|
|
"""Test SQB WebSocket connection."""
|
|
url = WS_URL
|
|
print(f"\n[WS-SQB] Connecting to {url}")
|
|
try:
|
|
async with connect(url, additional_headers={"Authorization": API_KEY}, open_timeout=15) as ws:
|
|
print("[WS-SQB] Connected! Waiting for message (10s)...")
|
|
msg = await asyncio.wait_for(ws.recv(), timeout=10)
|
|
print(f"[WS-SQB] Got message: {str(msg)[:300]}")
|
|
except Exception as e:
|
|
print(f"[WS-SQB] Failed: {type(e).__name__}: {e}")
|
|
|
|
|
|
async def main():
|
|
print(f"API Key configured: {'Yes' if API_KEY else 'No'}")
|
|
print(f"Base URL: {BASE_URL}")
|
|
print(f"WS URL: {WS_URL}")
|
|
print()
|
|
await test_http()
|
|
await test_ws_sqb()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|