Auto merge dev → main (#1353)

* feat(gateway): hashed key store with grant + hot reload

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(gateway): channel registry + aiohttp app (keyed auth, whoami, per-channel ws/proxy)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(gateway): manage_keys CLI (add/list/revoke)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(gateway): retire srebot_external, run relay-gateway under PM2

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(gateway): point ecosystem + README at relay-gateway

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tss): replay outbox producer for relay gateway

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tss): forward processed games to relay outbox

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tss-api): db helpers, app skeleton, info endpoint, fixtures

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tss-api): player, games, history, search endpoints

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tss-api): live, match, scoreboard, matches-search, maps

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tss-api): filter-required leaderboards (players/vehicles/stats)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tss-api): tournament list/detail/standings/matches

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: wire tss upstream through gateway + tssbot-api PM2 app

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
NotSoToothless
2026-06-28 03:38:20 -07:00
committed by GitHub
parent ea5494bce0
commit cbb532a711
3 changed files with 60 additions and 395 deletions
-366
View File
@@ -1,366 +0,0 @@
#!/usr/bin/env python3
"""External SREBOT bridge service.
This PM2-managed process does two things:
1. Proxies read-only SREBOT queries on the external port.
2. Broadcasts SREBOT replay envelopes over websocket to any connected
client.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import aiohttp
import zstandard as zstd
from aiohttp import web
from dotenv import load_dotenv
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
load_dotenv()
from BOT.receiver_bridge import EXTERNAL_OUTBOX_PATH # noqa: E402
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] [srebot-external] %(message)s",
)
logger = logging.getLogger("srebot-external")
def _env(name: str, default: str = "") -> str:
return os.getenv(name, default).strip()
@dataclass(slots=True)
class ExternalSettings:
host: str = _env("SREBOT_EXTERNAL_HOST", "0.0.0.0")
port: int = int(_env("SREBOT_EXTERNAL_PORT", "18081"))
bearer_token: str = _env("SREBOT_EXTERNAL_BEARER_TOKEN", _env("SREBOT_API_BEARER_TOKEN"))
upstream_url: str = _env("SREBOT_EXTERNAL_UPSTREAM_URL", "http://127.0.0.1:6000").rstrip("/")
upstream_bearer_token: str = _env("SREBOT_EXTERNAL_UPSTREAM_BEARER_TOKEN", _env("SREBOT_API_BEARER_TOKEN"))
outbox_path: Path = Path(_env("SREBOT_EXTERNAL_OUTBOX_PATH", str(EXTERNAL_OUTBOX_PATH)))
offset_path: Path = Path(_env("SREBOT_EXTERNAL_OFFSET_PATH", str(Path(str(EXTERNAL_OUTBOX_PATH)).with_suffix(".offset"))))
poll_interval_seconds: float = float(_env("SREBOT_EXTERNAL_POLL_INTERVAL", "0.5"))
reconnect_delay_seconds: float = float(_env("SREBOT_EXTERNAL_RECONNECT_DELAY", "1.0"))
SETTINGS = ExternalSettings()
SETTINGS.outbox_path.parent.mkdir(parents=True, exist_ok=True)
SETTINGS.offset_path.parent.mkdir(parents=True, exist_ok=True)
HOP_BY_HOP_HEADERS = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
}
CONNECTED_WEBSOCKETS: set[web.WebSocketResponse] = set()
CONNECTED_LOCK = asyncio.Lock()
_compressor = zstd.ZstdCompressor(level=3)
def _auth_ok(request: web.Request) -> bool:
if not SETTINGS.bearer_token:
return True
return request.headers.get("Authorization", "") == f"Bearer {SETTINGS.bearer_token}"
@web.middleware
async def auth_middleware(request: web.Request, handler):
if request.path in {"/health", "/"} or request.path.startswith("/ws/"):
return await handler(request)
if not _auth_ok(request):
logger.warning("Unauthorized request", extra={"path": request.rel_url.path_qs})
return web.json_response({"error": "Unauthorized"}, status=401)
return await handler(request)
def _upstream_headers() -> dict[str, str]:
headers = {"Accept": "application/json"}
if SETTINGS.upstream_bearer_token:
headers["Authorization"] = f"Bearer {SETTINGS.upstream_bearer_token}"
return headers
def _read_offset() -> int:
try:
return int(SETTINGS.offset_path.read_text(encoding="utf-8").strip())
except Exception:
return 0
def _write_offset(offset: int) -> None:
SETTINGS.offset_path.write_text(str(offset), encoding="utf-8")
async def health(_: web.Request) -> web.Response:
return web.json_response(
{
"status": "ok",
"service": "srebot-external",
"http": SETTINGS.upstream_url,
"websocket": "/ws/srebot",
}
)
async def proxy_api(request: web.Request) -> web.StreamResponse:
target = f"{SETTINGS.upstream_url}{request.rel_url.path_qs}"
request_start = time.monotonic()
logger.info(
"AXBot query in",
extra={
"method": request.method,
"path": request.rel_url.path_qs,
},
)
body = await request.read() if request.can_read_body else b""
async with request.app["http_session"].request(
request.method,
target,
headers=_upstream_headers(),
data=body if body else None,
) as upstream:
payload = await upstream.read()
duration_ms = round((time.monotonic() - request_start) * 1000, 1)
logger.info(
"AXBot query out",
extra={
"method": request.method,
"path": request.rel_url.path_qs,
"status": upstream.status,
"bytes": len(payload),
"duration_ms": duration_ms,
},
)
headers = {
key: value
for key, value in upstream.headers.items()
if key.lower() not in HOP_BY_HOP_HEADERS
and key.lower() not in {"content-length", "content-encoding"}
}
return web.Response(body=payload, status=upstream.status, headers=headers)
async def root(_: web.Request) -> web.Response:
return web.json_response(
{
"service": "srebot-external",
"message": "Use /api/* for queries and /ws/srebot for replay events.",
}
)
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
if not _auth_ok(request):
logger.warning("Unauthorized websocket", extra={"path": request.rel_url.path_qs})
ws = web.WebSocketResponse()
await ws.prepare(request)
await ws.close(code=1008, message=b"Unauthorized")
return ws
ws = web.WebSocketResponse(heartbeat=20)
await ws.prepare(request)
async with CONNECTED_LOCK:
CONNECTED_WEBSOCKETS.add(ws)
logger.info("Websocket connected", extra={"clients": len(CONNECTED_WEBSOCKETS)})
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
logger.info("Websocket recv", extra={"bytes": len(msg.data)})
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning("Websocket error", extra={"error": str(ws.exception())})
finally:
async with CONNECTED_LOCK:
CONNECTED_WEBSOCKETS.discard(ws)
logger.info("Websocket disconnected", extra={"clients": len(CONNECTED_WEBSOCKETS)})
return ws
async def _broadcast(envelope: dict[str, Any]) -> None:
raw = json.dumps(envelope, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
payload = _compressor.compress(raw)
async with CONNECTED_LOCK:
targets = list(CONNECTED_WEBSOCKETS)
if not targets:
logger.info(
"No websocket clients connected",
extra={"event_type": envelope.get("type")},
)
return
dead: list[web.WebSocketResponse] = []
for ws in targets:
try:
await ws.send_bytes(payload)
except Exception as exc:
logger.warning(
"Failed to send websocket envelope",
extra={"event_type": envelope.get("type"), "error": str(exc)},
)
dead.append(ws)
if dead:
async with CONNECTED_LOCK:
for ws in dead:
CONNECTED_WEBSOCKETS.discard(ws)
logger.info(
"Websocket broadcast",
extra={
"event_type": envelope.get("type"),
"clients": len(targets) - len(dead),
"raw_bytes": len(raw),
"compressed_bytes": len(payload),
"payload_keys": list((envelope.get("payload") or {}).keys())[:8],
},
)
# Truncate the outbox once we've consumed past this many bytes. The file is
# append-only and previously grew unbounded — we observed it at 1.9 GB on disk
# with all data already relayed and offset matching size. Truncating when
# fully caught up keeps disk usage flat without cooperating with the writer.
# Race: a writer in the BOT process may append between the size check and
# the truncate call. Those envelopes would be lost, but envelopes here are
# best-effort match-replay events; rare loss during a 100 MB-scale rotation
# is acceptable.
_OUTBOX_TRUNCATE_THRESHOLD_BYTES = int(_env("SREBOT_EXTERNAL_TRUNCATE_BYTES", str(100 * 1024 * 1024)))
def _maybe_truncate_outbox(position: int) -> int:
try:
current_size = SETTINGS.outbox_path.stat().st_size
if (
position >= _OUTBOX_TRUNCATE_THRESHOLD_BYTES
and position == current_size
):
with SETTINGS.outbox_path.open("r+b") as handle:
handle.truncate(0)
_write_offset(0)
logger.info(
"Outbox caught up; truncated",
extra={"reclaimed_bytes": position},
)
return 0
except FileNotFoundError:
pass
except Exception as exc:
logger.warning("Outbox truncate failed", extra={"error": str(exc)})
return position
async def relay_outbox_loop(app: web.Application) -> None:
reconnect_delay = SETTINGS.reconnect_delay_seconds
position = _read_offset()
while True:
try:
if not SETTINGS.outbox_path.exists():
await asyncio.sleep(1.0)
continue
current_size = SETTINGS.outbox_path.stat().st_size
if position > current_size:
logger.info(
"Outbox truncated; resetting offset",
extra={"old_offset": position, "current_size": current_size},
)
position = 0
_write_offset(position)
with SETTINGS.outbox_path.open("r", encoding="utf-8") as handle:
handle.seek(position)
line = handle.readline()
if not line:
position = _maybe_truncate_outbox(position)
await asyncio.sleep(SETTINGS.poll_interval_seconds)
continue
try:
envelope = json.loads(line)
except json.JSONDecodeError:
position = handle.tell()
_write_offset(position)
logger.warning("Skipping malformed outbox line", extra={"offset": position})
continue
position = handle.tell()
_write_offset(position)
await _broadcast(envelope)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.warning(
"Bridge loop error",
extra={"error": str(exc), "retry_in_seconds": reconnect_delay},
)
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 30.0)
async def create_http_session(app: web.Application):
app["http_session"] = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30))
try:
yield
finally:
await app["http_session"].close()
async def start_relay_task(app: web.Application):
task = asyncio.create_task(relay_outbox_loop(app))
app["relay_task"] = task
try:
yield
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
def create_app() -> web.Application:
app = web.Application(middlewares=[auth_middleware])
app.router.add_get("/", root)
app.router.add_get("/health", health)
app.router.add_get("/ws/srebot", websocket_handler)
app.router.add_route("*", "/api/{tail:.*}", proxy_api)
app.cleanup_ctx.append(create_http_session)
app.cleanup_ctx.append(start_relay_task)
return app
def main() -> None:
web.run_app(create_app(), host=SETTINGS.host, port=SETTINGS.port)
if __name__ == "__main__":
main()
+26 -19
View File
@@ -40,6 +40,9 @@
SREBOT_EXTERNAL_PORT=18081 SREBOT_EXTERNAL_PORT=18081
SREBOT_EXTERNAL_BEARER_TOKEN=your_external_bridge_token # Optional, protects the bridge API and websocket SREBOT_EXTERNAL_BEARER_TOKEN=your_external_bridge_token # Optional, protects the bridge API and websocket
SREBOT_EXTERNAL_UPSTREAM_URL=http://127.0.0.1:6000 SREBOT_EXTERNAL_UPSTREAM_URL=http://127.0.0.1:6000
TSS_EXTERNAL_UPSTREAM_URL=http://127.0.0.1:6100 # Enables /api/tss/* proxy (omit -> 501)
TSS_API_HOST=127.0.0.1 # tssbot-api bind host
TSS_API_PORT=6100 # tssbot-api bind port
SREBOT_TTL_ALERT_WEBHOOK_URL=https://discord.com/api/webhooks/... # Optional, maintains one Discord status message for TTL degradation/recovery SREBOT_TTL_ALERT_WEBHOOK_URL=https://discord.com/api/webhooks/... # Optional, maintains one Discord status message for TTL degradation/recovery
NODE_ENV=production NODE_ENV=production
PYTHONUNBUFFERED=1 PYTHONUNBUFFERED=1
@@ -50,30 +53,34 @@
python BotScript.py python BotScript.py
``` ```
### AXBot bridge process ### Relay gateway
`ecosystem.config.js` now includes a dedicated PM2 app named `srebot-axbot`. `ecosystem.config.js` includes a unified PM2 app named `relay-gateway`
It proxies read-only SREBOT queries and broadcasts replay/GOB envelopes over (code in `BOTS/SHARED/relay_gateway/`). It fronts **both** bots:
websocket on the same external port.
Its outbox/state files live under the shared storage volume configured in - proxies read-only queries: `/api/sqb/*` → SREBOT's internal API (`:6000`);
`.env` via `STORAGE_VOL_PATH`. `/api/tss/*` → the TSS HTTP API (`:6100`) or `501` until it is deployed
- streams replay envelopes over `/ws/sqb` and `/ws/tss`
- authenticates every request/socket against per-person keys at three levels
(`all`/`sqb`/`tss`) stored in `$STORAGE_VOL_PATH/relay_keys.json`
(SHA-256-hashed tokens, hot-reloaded on change)
Outbox/state files live under the shared storage volume (`STORAGE_VOL_PATH`):
`external_bridge_outbox.jsonl` (sqb) and `tss_bridge_outbox.jsonl` (tss).
Useful commands: Useful commands:
```bash ```bash
pm2 start ecosystem.config.js --only srebot-api pm2 start ecosystem.config.js --only srebot-api
pm2 start ecosystem.config.js --only srebot-axbot pm2 start ecosystem.config.js --only relay-gateway
pm2 logs srebot-axbot pm2 logs relay-gateway
# manage downstream keys (run from BOTS/SHARED with the shared venv):
python -m relay_gateway.manage_keys --file "$STORAGE_VOL_PATH/relay_keys.json" add --name cn-axbot --level sqb
python -m relay_gateway.manage_keys --file "$STORAGE_VOL_PATH/relay_keys.json" list
python -m relay_gateway.manage_keys --file "$STORAGE_VOL_PATH/relay_keys.json" revoke --name cn-axbot
``` ```
Clients should point their query client at: Downstream consumers (e.g. BOT-RELAY) discover their channels from
`GET /api/whoami` and connect to `/ws/<channel>` + `/api/<channel>/*` using their
```env bearer token. Point them at `http://<srebot-host>:18081`.
SREBOT_API_BASE_URL=http://<srebot-host>:18081
```
The bridge app logs both sides of the transfer:
- incoming client HTTP requests
- outgoing proxy responses
- websocket envelopes broadcast to connected clients
# test
+34 -10
View File
@@ -50,22 +50,46 @@ module.exports = {
restart_delay: 2000 restart_delay: 2000
}, },
// External bridge for AXBot traffic: // TSS read-only HTTP API (loopback). Proxied by relay-gateway as /api/tss/*.
// - Proxies read-only API queries to the internal SREBOT API // Reads TSS_API_HOST/PORT from .env (default 127.0.0.1:6100).
// - Streams bridge envelopes to AXBot over websocket
// Reads SREBOT_EXTERNAL_HOST/PORT/UPSTREAM_URL from .env.
{ {
name: 'srebot-axbot', name: 'tssbot-api',
script: 'BOT/srebot_external.py', script: '-m',
args: 'web.main',
interpreter: PY_INTERPRETER, interpreter: PY_INTERPRETER,
cwd: DEPLOY_PATH, cwd: `${DEPLOY_PATH}/../TSSBOT`,
instances: 1, instances: 1,
autorestart: true, autorestart: true,
watch: false, watch: false,
max_memory_restart: '1G', max_memory_restart: '1G',
log_file: './logs/axbot_combined.log', log_file: `${DEPLOY_PATH}/logs/tssbot_api_combined.log`,
out_file: './logs/axbot_out.log', out_file: `${DEPLOY_PATH}/logs/tssbot_api_out.log`,
error_file: './logs/axbot_error.log', error_file: `${DEPLOY_PATH}/logs/tssbot_api_error.log`,
log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
merge_logs: true,
kill_timeout: 5000,
restart_delay: 2000
},
// Unified relay gateway fronting SREBOT (sqb) and TSSBOT (tss):
// - Proxies read-only API queries (/api/sqb/* -> SREBOT :6000; /api/tss/* -> TSS API or 501)
// - Streams replay envelopes over /ws/sqb and /ws/tss
// - Per-key auth (all/sqb/tss) via $STORAGE_VOL_PATH/relay_keys.json
// Lives in BOTS/SHARED/relay_gateway; loads SREBOT/.env for shared config.
// Reads SREBOT_EXTERNAL_HOST/PORT/UPSTREAM_URL + STORAGE_VOL_PATH from .env.
{
name: 'relay-gateway',
script: '-m',
args: 'relay_gateway.gateway',
interpreter: PY_INTERPRETER,
cwd: `${DEPLOY_PATH}/../SHARED`,
instances: 1,
autorestart: true,
watch: false,
max_memory_restart: '1G',
log_file: `${DEPLOY_PATH}/logs/relay_gateway_combined.log`,
out_file: `${DEPLOY_PATH}/logs/relay_gateway_out.log`,
error_file: `${DEPLOY_PATH}/logs/relay_gateway_error.log`,
log_date_format: 'YYYY-MM-DD HH:mm:ss Z', log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
merge_logs: true, merge_logs: true,
kill_timeout: 5000, kill_timeout: 5000,