79 lines
2.5 KiB
Python
79 lines
2.5 KiB
Python
"""Decode Spectra websocket replay payloads.
|
|
|
|
Spectra historically sent zstd-compressed JSON bytes. Newer frames may be
|
|
zstd-compressed msgpack bytes. Keep the format check on the decompressed bytes
|
|
so both rollout states are accepted without guessing from websocket frame type.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
import msgpack
|
|
import zstandard as zstd
|
|
|
|
_ZSTD_DECOMPRESSOR = zstd.ZstdDecompressor()
|
|
_MAX_OUTPUT_SIZE = 64 * 1024 * 1024
|
|
|
|
|
|
class SpectraPayloadError(ValueError):
|
|
"""Raised when a Spectra websocket payload cannot be decoded."""
|
|
|
|
|
|
def _frame_bytes(message: str | bytes | bytearray | memoryview) -> bytes:
|
|
if isinstance(message, str):
|
|
return message.encode("utf-8")
|
|
return bytes(message)
|
|
|
|
|
|
def _decompress_if_needed(raw: bytes) -> bytes:
|
|
try:
|
|
return _ZSTD_DECOMPRESSOR.decompress(raw, max_output_size=_MAX_OUTPUT_SIZE)
|
|
except zstd.ZstdError:
|
|
return raw
|
|
|
|
|
|
def _first_non_ws_byte(payload: bytes) -> int | None:
|
|
for byte in payload:
|
|
if byte not in b" \t\r\n":
|
|
return byte
|
|
return None
|
|
|
|
|
|
def _looks_like_json(payload: bytes) -> bool:
|
|
first = _first_non_ws_byte(payload)
|
|
return first in (ord("{"), ord("["))
|
|
|
|
|
|
def _looks_like_msgpack_container(payload: bytes) -> bool:
|
|
first = _first_non_ws_byte(payload)
|
|
if first is None:
|
|
return False
|
|
return (
|
|
0x80 <= first <= 0x9F # fixmap/fixarray
|
|
or first in (0xDC, 0xDD, 0xDE, 0xDF) # array16/32, map16/32
|
|
)
|
|
|
|
|
|
def decode_spectra_ws_payload(message: str | bytes | bytearray | memoryview) -> Any:
|
|
"""Decode a Spectra WS frame as zstd-json, zstd-msgpack, or plain JSON.
|
|
|
|
Returns the decoded Python object. Raises ``SpectraPayloadError`` when the
|
|
decompressed payload is neither a JSON object/array nor a msgpack map/array.
|
|
"""
|
|
payload = _decompress_if_needed(_frame_bytes(message))
|
|
|
|
if _looks_like_json(payload):
|
|
try:
|
|
return json.loads(payload.decode("utf-8"))
|
|
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
|
|
raise SpectraPayloadError("Spectra WS payload looked like JSON but was invalid") from exc
|
|
|
|
if _looks_like_msgpack_container(payload):
|
|
try:
|
|
return msgpack.unpackb(payload, raw=False)
|
|
except (msgpack.ExtraData, msgpack.FormatError, msgpack.StackError, ValueError) as exc:
|
|
raise SpectraPayloadError("Spectra WS payload looked like msgpack but was invalid") from exc
|
|
|
|
raise SpectraPayloadError("Spectra WS payload was neither JSON nor msgpack")
|