from __future__ import annotations import asyncio import json import logging import os import time from pathlib import Path from typing import Any import aiofiles logger = logging.getLogger(__name__) def _env(name: str, default: str = "") -> str: return os.getenv(name, default).strip() _storage_root_raw = _env("STORAGE_VOL_PATH") if not _storage_root_raw: raise RuntimeError("STORAGE_VOL_PATH must be set") _STORAGE_ROOT = Path(_storage_root_raw) TSS_EXTERNAL_OUTBOX_PATH = Path( _env("TSS_EXTERNAL_OUTBOX_PATH", str(_STORAGE_ROOT / "tss_bridge_outbox.jsonl")) ) TSS_EXTERNAL_OUTBOX_PATH.parent.mkdir(parents=True, exist_ok=True) _LOCK: asyncio.Lock | None = None def _lock() -> asyncio.Lock: global _LOCK if _LOCK is None: _LOCK = asyncio.Lock() return _LOCK async def _append(envelope: dict[str, Any]) -> None: line = json.dumps(envelope, ensure_ascii=False, separators=(",", ":")) async with _lock(): async with aiofiles.open(TSS_EXTERNAL_OUTBOX_PATH, "a", encoding="utf-8") as handle: await handle.write(line + "\n") logger.info("TSS bridge envelope queued type=%s", envelope.get("type")) async def publish_replay_batch(replays: list[dict[str, Any]]) -> None: if not replays: return await _append({ "type": "tss.replay_batch", "version": 1, "source": "tss", "sent_at": time.time(), "payload": {"replays": replays}, }) async def publish_event(event_type: str, payload: dict[str, Any]) -> None: await _append({ "type": event_type, "version": 1, "source": "tss", "sent_at": time.time(), "payload": payload, })