Files

77 lines
2.2 KiB
Python

from __future__ import annotations
import hashlib
import json
import logging
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger("relay-gateway.keys")
LEVELS = ("all", "sre", "sqb", "tss")
CHANNELS_FOR_LEVEL: dict[str, tuple[str, ...]] = {
"all": ("sre", "tss"),
"sre": ("sre",),
"sqb": ("sre",), # legacy alias
"tss": ("tss",),
}
def hash_token(token: str) -> str:
return hashlib.sha256(token.strip().encode("utf-8")).hexdigest()
@dataclass(frozen=True, slots=True)
class Grant:
name: str
level: str
channels: tuple[str, ...]
def can_access(grant: Grant, channel: str) -> bool:
return channel in grant.channels
class KeyStore:
"""Loads relay_keys.json (hashed tokens) and reloads on mtime change."""
def __init__(self, path: Path):
self._path = Path(path)
self._mtime: float | None = None
self._by_hash: dict[str, Grant] = {}
self._load()
def _load(self) -> None:
try:
stat = self._path.stat()
except FileNotFoundError:
self._by_hash = {}
self._mtime = None
return
if self._mtime is not None and stat.st_mtime == self._mtime:
return
try:
raw = json.loads(self._path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError) as exc:
logger.warning("Failed to read key store: %s", exc)
return
parsed: dict[str, Grant] = {}
for token_hash, meta in (raw or {}).items():
level = str(meta.get("level", "")).strip()
if level not in CHANNELS_FOR_LEVEL:
logger.warning("Skipping key %s: bad level %r", meta.get("name"), level)
continue
parsed[token_hash] = Grant(
name=str(meta.get("name", "unnamed")),
level=level,
channels=CHANNELS_FOR_LEVEL[level],
)
self._by_hash = parsed
self._mtime = stat.st_mtime
def resolve(self, token: str) -> Grant | None:
self._load()
if not token:
return None
return self._by_hash.get(hash_token(token))