"""Compatibility normalizers for Spectra replay payload schema changes.""" from __future__ import annotations from copy import deepcopy from typing import Any POSITION_SCALE = 16.0 MODEL_PREFIXES = ("tankModels/", "airModels/", "aircrafts/") def strip_model_prefix(unit: Any) -> str: """Return a bare unit CDK from either ``foo`` or ``tankModels/foo``. Spectra is inconsistent about prefix casing across replays (observed both ``tankModels/`` and ``tankmodels/`` on the wire), so the match is case-insensitive. """ value = str(unit or "").strip() lowered = value.lower() for prefix in MODEL_PREFIXES: if lowered.startswith(prefix.lower()): return value[len(prefix):] return value def _model_path(unit: Any) -> str: value = str(unit or "").strip() lowered = value.lower() return value if any(lowered.startswith(prefix.lower()) for prefix in MODEL_PREFIXES) else "" def _as_number(value: Any) -> float | int | None: if isinstance(value, bool): return None if isinstance(value, (int, float)): return value try: return float(value) except (TypeError, ValueError): return None def _cumulative(values: Any, *, scale: float = 1.0) -> list[float | int]: if not isinstance(values, list): return [] total: float | int = 0 out: list[float | int] = [] for value in values: num = _as_number(value) if num is None: continue total += num out.append(total / scale if scale != 1.0 else total) return out def _entity_path_from_columns(entity: dict[str, Any]) -> list[list[float | int]]: ts = _cumulative(entity.get("t")) xs = _cumulative(entity.get("x"), scale=POSITION_SCALE) ys = _cumulative(entity.get("y"), scale=POSITION_SCALE) zs = _cumulative(entity.get("z"), scale=POSITION_SCALE) count = min(len(ts), len(xs), len(ys), len(zs)) return [[ts[i], xs[i], ys[i], zs[i]] for i in range(count)] def _normalize_entities(entities: Any) -> Any: if not isinstance(entities, list): return entities out: list[Any] = [] for entity in entities: if not isinstance(entity, dict): out.append(entity) continue normalized = dict(entity) model = _model_path(normalized.get("unit")) if model: normalized["model_path"] = model normalized["unit"] = strip_model_prefix(model) if not isinstance(normalized.get("path"), list): path = _entity_path_from_columns(normalized) if path: normalized["path"] = path out.append(normalized) return out def _looks_like_ticket_delta_columns(series: Any) -> bool: if not ( isinstance(series, list) and len(series) == 2 and isinstance(series[0], list) and isinstance(series[1], list) and len(series[0]) == len(series[1]) ): return False # Avoid transposing an ambiguous two-row legacy timeline. return len(series[0]) > 2 def _normalize_ticket_series(series: Any) -> Any: if not _looks_like_ticket_delta_columns(series): return series ts = _cumulative(series[0]) vs = _cumulative(series[1]) return [[ts[i], vs[i]] for i in range(min(len(ts), len(vs)))] def _normalize_tickets(tickets: Any) -> Any: if not isinstance(tickets, dict): return tickets return {str(slot): _normalize_ticket_series(series) for slot, series in tickets.items()} def _normalize_event_units(events: Any) -> Any: if not isinstance(events, dict): return events normalized = dict(events) for bucket in ("kills", "damage"): rows = normalized.get(bucket) if not isinstance(rows, list): continue out_rows: list[Any] = [] for event in rows: if not isinstance(event, dict): out_rows.append(event) continue item = dict(event) for field in ("offender_unit", "offended_unit"): model = _model_path(item.get(field)) if model: item[f"{field}_model_path"] = model item[field] = strip_model_prefix(model) for field in ("offender_pos", "offended_pos"): pos = item.get(field) if isinstance(pos, list) and len(pos) >= 3: nums = [_as_number(v) for v in pos[:3]] if all(v is not None for v in nums): item[field] = [v / POSITION_SCALE for v in nums] # type: ignore[operator] out_rows.append(item) normalized[bucket] = out_rows return normalized def normalize_spectra_replay(replay: dict[str, Any]) -> dict[str, Any]: """Return a replay compatible with legacy bot consumers. Spectra v3 sends entity paths and tickets as delta columns. The bots still consume absolute row timelines, so this expands the compact representation while leaving already-legacy payloads unchanged. """ normalized = dict(replay) if "entities" in normalized: normalized["entities"] = _normalize_entities(normalized.get("entities")) if "tickets" in normalized: normalized["tickets"] = _normalize_tickets(normalized.get("tickets")) if isinstance(normalized.get("events"), dict): normalized["events"] = _normalize_event_units(normalized.get("events")) if isinstance(normalized.get("players"), dict): players = deepcopy(normalized["players"]) for player in players.values(): if not isinstance(player, dict): continue for unit in player.get("units") or []: if isinstance(unit, dict) and unit.get("unit"): unit["unit"] = strip_model_prefix(unit.get("unit")) normalized["players"] = players return normalized