Files
SHARED/spectra_ws_payload.py

79 lines
2.5 KiB
Python

"""Decode Spectra websocket replay payloads.
Spectra historically sent zstd-compressed JSON bytes. Newer frames may be
zstd-compressed msgpack bytes. Keep the format check on the decompressed bytes
so both rollout states are accepted without guessing from websocket frame type.
"""
from __future__ import annotations
import json
from typing import Any
import msgpack
import zstandard as zstd
_ZSTD_DECOMPRESSOR = zstd.ZstdDecompressor()
_MAX_OUTPUT_SIZE = 64 * 1024 * 1024
class SpectraPayloadError(ValueError):
"""Raised when a Spectra websocket payload cannot be decoded."""
def _frame_bytes(message: str | bytes | bytearray | memoryview) -> bytes:
if isinstance(message, str):
return message.encode("utf-8")
return bytes(message)
def _decompress_if_needed(raw: bytes) -> bytes:
try:
return _ZSTD_DECOMPRESSOR.decompress(raw, max_output_size=_MAX_OUTPUT_SIZE)
except zstd.ZstdError:
return raw
def _first_non_ws_byte(payload: bytes) -> int | None:
for byte in payload:
if byte not in b" \t\r\n":
return byte
return None
def _looks_like_json(payload: bytes) -> bool:
first = _first_non_ws_byte(payload)
return first in (ord("{"), ord("["))
def _looks_like_msgpack_container(payload: bytes) -> bool:
first = _first_non_ws_byte(payload)
if first is None:
return False
return (
0x80 <= first <= 0x9F # fixmap/fixarray
or first in (0xDC, 0xDD, 0xDE, 0xDF) # array16/32, map16/32
)
def decode_spectra_ws_payload(message: str | bytes | bytearray | memoryview) -> Any:
"""Decode a Spectra WS frame as zstd-json, zstd-msgpack, or plain JSON.
Returns the decoded Python object. Raises ``SpectraPayloadError`` when the
decompressed payload is neither a JSON object/array nor a msgpack map/array.
"""
payload = _decompress_if_needed(_frame_bytes(message))
if _looks_like_json(payload):
try:
return json.loads(payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise SpectraPayloadError("Spectra WS payload looked like JSON but was invalid") from exc
if _looks_like_msgpack_container(payload):
try:
return msgpack.unpackb(payload, raw=False)
except (msgpack.ExtraData, msgpack.FormatError, msgpack.StackError, ValueError) as exc:
raise SpectraPayloadError("Spectra WS payload looked like msgpack but was invalid") from exc
raise SpectraPayloadError("Spectra WS payload was neither JSON nor msgpack")