from __future__ import annotations import hashlib import json import logging from dataclasses import dataclass from pathlib import Path logger = logging.getLogger("relay-gateway.keys") LEVELS = ("all", "sre", "sqb", "tss") CHANNELS_FOR_LEVEL: dict[str, tuple[str, ...]] = { "all": ("sre", "tss"), "sre": ("sre",), "sqb": ("sre",), # legacy alias "tss": ("tss",), } def hash_token(token: str) -> str: return hashlib.sha256(token.strip().encode("utf-8")).hexdigest() @dataclass(frozen=True, slots=True) class Grant: name: str level: str channels: tuple[str, ...] def can_access(grant: Grant, channel: str) -> bool: return channel in grant.channels class KeyStore: """Loads relay_keys.json (hashed tokens) and reloads on mtime change.""" def __init__(self, path: Path): self._path = Path(path) self._mtime: float | None = None self._by_hash: dict[str, Grant] = {} self._load() def _load(self) -> None: try: stat = self._path.stat() except FileNotFoundError: self._by_hash = {} self._mtime = None return if self._mtime is not None and stat.st_mtime == self._mtime: return try: raw = json.loads(self._path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError) as exc: logger.warning("Failed to read key store: %s", exc) return parsed: dict[str, Grant] = {} for token_hash, meta in (raw or {}).items(): level = str(meta.get("level", "")).strip() if level not in CHANNELS_FOR_LEVEL: logger.warning("Skipping key %s: bad level %r", meta.get("name"), level) continue parsed[token_hash] = Grant( name=str(meta.get("name", "unnamed")), level=level, channels=CHANNELS_FOR_LEVEL[level], ) self._by_hash = parsed self._mtime = stat.st_mtime def resolve(self, token: str) -> Grant | None: self._load() if not token: return None return self._by_hash.get(hash_token(token))