69 lines
2.1 KiB
Python
69 lines
2.1 KiB
Python
import json
|
|
from pathlib import Path
|
|
|
|
from relay_gateway.keys import KeyStore, hash_token, can_access, CHANNELS_FOR_LEVEL
|
|
|
|
|
|
def _write(path: Path, tokens: dict[str, dict]) -> None:
|
|
body = {hash_token(tok): meta for tok, meta in tokens.items()}
|
|
path.write_text(json.dumps(body), encoding="utf-8")
|
|
|
|
|
|
def test_resolve_returns_grant_with_derived_channels(tmp_path):
|
|
kf = tmp_path / "relay_keys.json"
|
|
_write(kf, {"sekret": {"name": "cn-axbot", "level": "sre"}})
|
|
store = KeyStore(kf)
|
|
grant = store.resolve("sekret")
|
|
assert grant is not None
|
|
assert grant.name == "cn-axbot"
|
|
assert grant.level == "sre"
|
|
assert grant.channels == ("sre",)
|
|
|
|
|
|
def test_resolve_unknown_token_is_none(tmp_path):
|
|
kf = tmp_path / "relay_keys.json"
|
|
_write(kf, {"sekret": {"name": "x", "level": "all"}})
|
|
store = KeyStore(kf)
|
|
assert store.resolve("nope") is None
|
|
|
|
|
|
def test_all_level_grants_both_channels(tmp_path):
|
|
kf = tmp_path / "relay_keys.json"
|
|
_write(kf, {"k": {"name": "internal", "level": "all"}})
|
|
grant = KeyStore(kf).resolve("k")
|
|
assert grant is not None
|
|
assert set(grant.channels) == {"sre", "tss"}
|
|
|
|
|
|
def test_can_access_enforces_channel(tmp_path):
|
|
kf = tmp_path / "relay_keys.json"
|
|
_write(kf, {"k": {"name": "t", "level": "tss"}})
|
|
grant = KeyStore(kf).resolve("k")
|
|
assert grant is not None
|
|
assert can_access(grant, "tss") is True
|
|
assert can_access(grant, "sre") is False
|
|
|
|
|
|
def test_hot_reload_on_mtime_change(tmp_path):
|
|
kf = tmp_path / "relay_keys.json"
|
|
_write(kf, {"k": {"name": "t", "level": "sre"}})
|
|
store = KeyStore(kf)
|
|
assert store.resolve("k").level == "sre"
|
|
import os, time
|
|
time.sleep(0.01)
|
|
_write(kf, {"k": {"name": "t", "level": "tss"}})
|
|
os.utime(kf, None)
|
|
assert store.resolve("k").level == "tss"
|
|
|
|
|
|
def test_missing_file_resolves_none(tmp_path):
|
|
store = KeyStore(tmp_path / "absent.json")
|
|
assert store.resolve("anything") is None
|
|
|
|
|
|
def test_bad_level_is_skipped(tmp_path):
|
|
kf = tmp_path / "relay_keys.json"
|
|
_write(kf, {"k": {"name": "t", "level": "bogus"}})
|
|
assert KeyStore(kf).resolve("k") is None
|
|
assert "bogus" not in CHANNELS_FOR_LEVEL
|