""" render_replay.py Handles replay JSON files: renders MP4 videos and exports slim JSON for the web canvas replay viewer. Output mode is picked from the output file extension. Usage: python -m BOT.render_replay # render video python -m BOT.render_replay # export json Public API: render_gob(d, out_path, fps, speed, n_workers, progress_cb) load_gob_file(gob_path) export_replay_json(gob_path) """ # Standard Library Imports import json import math import os import subprocess import sys import threading from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from functools import lru_cache from pathlib import Path from typing import Any, Callable, Optional # Third-Party Library Imports import numpy as np from . import SHARED_DIR from .utils import REPLAYS_DIR from PIL import Image, ImageChops, ImageDraw, ImageFilter, ImageFont try: from data_parser import ( LangTableReader as _LangTableReader, WeaponTableReader as _WeaponTableReader, apply_vehicle_name_filters as _apply_filters, ) _lang = _LangTableReader("English") _weapons = _WeaponTableReader("English") def _translate_vehicle(internal: str) -> str: name = _lang.get_translate(internal) return _apply_filters(name) if name else internal def _translate_weapon(internal: str) -> str: return _weapons.get_translate(internal) or internal except Exception: def _translate_vehicle(internal: str) -> str: # type: ignore[misc] return internal def _translate_weapon(internal: str) -> str: # type: ignore[misc] return internal # Prune verbose suffixes from translated weapon names _WEAPON_PRUNE = [ " air-to-ground missiles", " air-to-ground missile", " air-to-air missiles", " air-to-air missile", " anti-radiation missile", " anti-tank guided missile", " anti-tank guided missiles", " guided bomb", " guided bombs", ] _orig_translate_weapon = _translate_weapon def _translate_weapon(internal: str) -> str: # type: ignore[misc] # noqa: F811 name = _orig_translate_weapon(internal) for suffix in _WEAPON_PRUNE: if name.endswith(suffix): name = name[:-len(suffix)] break return name # Load .env from repo root _env_path = Path(__file__).parent.parent / ".env" if _env_path.exists(): for _line in _env_path.read_text().splitlines(): if "=" in _line and not _line.startswith("#"): _k, _, _v = _line.partition("=") os.environ.setdefault(_k.strip(), _v.strip()) # ── Config ───────────────────────────────────────────────────────────────────── CANVAS_MIN = 1024 # Minimum canvas resolution (px) CANVAS_MAX = 4096 # Maximum canvas resolution (px) MIN_OUTPUT = 1024 # Target minimum crop output size (px) FPS = 22 # Video frames per second SPEED = 4 # Playback speed multiplier (4× = 1min sim → 15s video) MAP_PAD_PX = 100 # Expand resolved map bounds by this many pixels per side TRAIL_MS = 18_000 # Ground trail length (ms, video time) AIR_TRAIL_MS = 2_500 # Aircraft trail length (ms, video time) DRONE_TRAIL_MS = 1_000 # Drone trail length (ms, video time) DOT_R = 5 # Player dot radius (px) DRONE_R = 3 # Drone dot radius (px) AIR_R = 4 # Aircraft triangle half-size (px) TANK_ICON_SIZE = 15 # Ground vehicle icon size (px, at 1024px reference) AIR_ICON_SIZE = 20 # Aircraft icon size (px, at 1024px reference) ICON_HIGHLIGHT_PAD = 3 # Highlight border padding around icon (px, at 1024px reference) ICON_HIGHLIGHT_COLOR = (200, 200, 200, 120) # RGBA highlight backdrop color KILL_TTL = 4_000 # Kill marker display duration (ms, video time) GHOST_TTL = 3_000 # Death fade-to-black duration (ms, video time) WIPE_GRACE = 90 # Extra frames rendered after a team wipe N_WORKERS = min(8, os.cpu_count() or 4) # Thread pool size for parallel frame rendering WIN_COLOR = (0, 200, 0) # green LOSE_COLOR = (220, 30, 30) # red MINIMAPS_DIR = SHARED_DIR / "MAPS" / "MINIMAPS" LEVELS_DIR = SHARED_DIR / "MAPS" / "LEVELS" ICONS_DIR = SHARED_DIR / "ICONS" BOTS_DIR = SHARED_DIR.parent WT_LEVELS_DIR = BOTS_DIR / "War-Thunder-Datamine" / "aces.vromfs.bin_u" / "levels" WT_MISSIONS_DIR = BOTS_DIR / "War-Thunder-Datamine" / "mis.vromfs.bin_u" LOCAL_MISSIONS_DIR = LEVELS_DIR / "MISSIONS" _tl = threading.local() # ── Vehicle-type icon system ────────────────────────────────────────────────── # Map raw unittags tags → icon key (checked in order, first match wins) _TAG_TO_ICON = { "type_spaa": "spaa", "type_light_tank": "light_tank", "type_tank_destroyer": "tank_destroyer", "type_heavy_tank": "heavy_tank", "type_medium_tank": "medium_tank", "type_missile_tank": "tank_destroyer", "type_jet_bomber": "jet_bomber", "type_bomber": "bomber", "type_strike_aircraft": "fighter", "type_jet_fighter": "jet", "type_fighter": "fighter", "type_strike_ucav": "drone", "type_helicopter": "helicopter", } # Icon key → PNG filename (relative to ICONS_DIR) _ICON_FILES = { "light_tank": "light.png", "medium_tank": "medium.png", "heavy_tank": "heavy.png", "tank_destroyer": "tank_destroyer.png", "spaa": "spaa.png", "fighter": "FALLBACKS/fighter_icon.png", "jet": "FALLBACKS/jet_icon.png", "jet_bomber": "FALLBACKS/jet_bomber_icon.png", "bomber": "FALLBACKS/bomber_icon.png", "drone": "drone.png", "helicopter": "FALLBACKS/helicopter_icon.png", "unknown": "tank_icon.png", } # ── Premultiplied sprite ─────────────────────────────────────────────────────── @dataclass class Sprite: """Premultiplied RGBA sprite for fast alpha compositing onto RGB buffers. Attributes: pm: Premultiplied RGB array, shape (h, w, 3), uint8. ia: Inverse alpha array, shape (h, w, 1), uint16. h: Sprite height in pixels. w: Sprite width in pixels. """ pm: np.ndarray # (h, w, 3) uint8 premultiplied RGB pm16: np.ndarray # (h, w, 3) uint16 premultiplied RGB (cached for blending math) ia: np.ndarray # (h, w, 1) uint16 inverse alpha, pre-cast h: int w: int def make_sprite(rgba: np.ndarray) -> Sprite: a = rgba[..., 3:4].astype(np.uint16) pm16 = (rgba[..., :3].astype(np.uint16) * a) >> 8 pm = pm16.astype(np.uint8) ia = (255 - rgba[..., 3:4]).astype(np.uint16) return Sprite(pm=pm, pm16=pm16, ia=ia, h=rgba.shape[0], w=rgba.shape[1]) def make_black_sprite(spr: Sprite) -> Sprite: """Create a black version of a sprite (RGB=0, same alpha).""" pm = np.zeros_like(spr.pm) pm16 = np.zeros_like(spr.pm16) ia = spr.ia.copy() return Sprite(pm=pm, pm16=pm16, ia=ia, h=spr.h, w=spr.w) def make_bare_black_sprite(icon_key: str, size: int, highlight_pad: int) -> Sprite: """Create a black icon sprite with no highlight glow — just the bare silhouette. The sprite is sized to match the highlighted version (with padding) so it can be used as a drop-in replacement when the highlight fades away. """ rgba = _load_icon_rgba(icon_key) src_h, src_w = rgba.shape[:2] scale_f = size / max(src_h, src_w) new_w = max(1, int(src_w * scale_f + 0.5)) new_h = max(1, int(src_h * scale_f + 0.5)) img = Image.fromarray(rgba).resize((new_w, new_h), Image.Resampling.LANCZOS) arr = np.asarray(img).copy() # Zero out RGB, keep alpha — black silhouette arr[..., :3] = 0 if highlight_pad > 0: # Embed in padded canvas to match highlighted sprite dimensions out_h, out_w = new_h + highlight_pad * 2, new_w + highlight_pad * 2 canvas = np.zeros((out_h, out_w, 4), dtype=np.uint8) canvas[highlight_pad:highlight_pad + new_h, highlight_pad:highlight_pad + new_w] = arr arr = canvas return make_sprite(arr) def _get_unit_tags(internal_name: str) -> list[str] | None: """Get raw tags for a vehicle from UnitTags, returns None if not found.""" from data_parser import UnitTags return UnitTags.get()._get_tags(internal_name) MINIS_DIR = SHARED_DIR / "ICONS" / "MINIS" _AIRCRAFT_TAGS = {"air", "aircraft", "helicopter"} def get_icon_key(model_name: str) -> str: """Map a ModelName (e.g. 'tankModels/ussr_t_34') to an icon key. For aircraft/helicopters, tries a per-vehicle mini icon first ('{internal}_ico.png' in MINIS/), then falls back to tag-based. Ground vehicles always use tag-based icons. """ internal = model_name.split("/")[-1] tags = _get_unit_tags(internal) if tags: tag_set = set(tags) # Only aircraft/helis get per-vehicle mini icons if tag_set & _AIRCRAFT_TAGS: mini_path = MINIS_DIR / f"{internal}_ico.png" if mini_path.exists(): return f"mini:{internal}" # Tag-based fallback (used for all ground vehicles, and aircraft without a mini) for tag, icon in _TAG_TO_ICON.items(): if tag in tag_set: return icon return "unknown" @lru_cache(maxsize=512) def _load_icon_rgba(icon_key: str) -> np.ndarray: """Load an icon PNG as RGBA numpy array, cropped to content bounds. Cached.""" if icon_key.startswith("mini:"): internal = icon_key[5:] path = MINIS_DIR / f"{internal}_ico.png" else: filename = _ICON_FILES.get(icon_key, _ICON_FILES["unknown"]) path = ICONS_DIR / filename arr = np.asarray(Image.open(path).convert("RGBA")).copy() # Crop to bounding box of non-transparent pixels alpha = arr[..., 3] rows = np.any(alpha > 0, axis=1) cols = np.any(alpha > 0, axis=0) if rows.any() and cols.any(): y0, y1 = np.where(rows)[0][[0, -1]] x0, x1 = np.where(cols)[0][[0, -1]] arr = arr[y0:y1 + 1, x0:x1 + 1].copy() return arr def make_tinted_icon_sprite(icon_key: str, color: tuple[int, int, int], size: int, highlight_pad: int = 0, highlight_color: tuple[int, int, int, int] = (0, 0, 0, 0), ) -> Sprite: """Load an icon, resize preserving aspect ratio, tint, add outline highlight.""" rgba = _load_icon_rgba(icon_key) src_h, src_w = rgba.shape[:2] # Resize to fit within `size` height, preserving aspect ratio scale_f = size / max(src_h, src_w) new_w = max(1, int(src_w * scale_f + 0.5)) new_h = max(1, int(src_h * scale_f + 0.5)) img = Image.fromarray(rgba).resize((new_w, new_h), Image.Resampling.LANCZOS) arr = np.asarray(img).copy().astype(np.float32) # Tint: multiply RGB by color/255, preserving alpha arr[..., 0] *= color[0] / 255.0 arr[..., 1] *= color[1] / 255.0 arr[..., 2] *= color[2] / 255.0 tinted = np.clip(arr, 0, 255).astype(np.uint8) if highlight_pad <= 0 or highlight_color[3] == 0: return make_sprite(tinted) # Build outline highlight by painting the highlight color at offsets around the icon pad = highlight_pad out_h, out_w = new_h + pad * 2, new_w + pad * 2 canvas = np.zeros((out_h, out_w, 4), dtype=np.uint8) tinted_img = Image.fromarray(tinted) hl_layer = Image.new("RGBA", (out_w, out_h), (0, 0, 0, 0)) # Stamp the icon alpha at each offset around the center to create an outline for dy in range(-pad, pad + 1): for dx in range(-pad, pad + 1): if dx * dx + dy * dy > pad * pad: continue hl_layer.paste(tinted_img, (pad + dx, pad + dy), tinted_img) # Replace RGB with highlight color, keep the merged alpha capped hl_arr = np.asarray(hl_layer).copy() mask = hl_arr[..., 3] > 0 hl_arr[mask, 0] = highlight_color[0] hl_arr[mask, 1] = highlight_color[1] hl_arr[mask, 2] = highlight_color[2] hl_arr[..., 3] = np.minimum(hl_arr[..., 3], highlight_color[3]) # Composite tinted icon on top of highlight bg = Image.fromarray(hl_arr) bg.paste(tinted_img, (pad, pad), tinted_img) return make_sprite(np.asarray(bg).copy()) def load_target_sprite(filename: str, size: int) -> Sprite: """Load a target icon PNG, resize, return as Sprite.""" path = ICONS_DIR / filename rgba = np.asarray(Image.open(path).convert("RGBA").resize( (size, size), Image.Resampling.LANCZOS )).copy() return make_sprite(rgba) # ── Rotated sprite cache for aircraft ───────────────────────────────────────── ROTATION_STEPS = 72 # one sprite every 5° def _sprite_to_rgba(spr: Sprite) -> np.ndarray: """Reconstruct RGBA array from a premultiplied Sprite.""" alpha = (255 - spr.ia[..., 0]).astype(np.uint8) rgba = np.zeros((spr.h, spr.w, 4), dtype=np.uint8) safe_a = np.maximum(alpha, 1).astype(np.float32) rgba[..., 0] = np.clip(spr.pm[..., 0].astype(np.float32) * 255.0 / safe_a, 0, 255).astype(np.uint8) rgba[..., 1] = np.clip(spr.pm[..., 1].astype(np.float32) * 255.0 / safe_a, 0, 255).astype(np.uint8) rgba[..., 2] = np.clip(spr.pm[..., 2].astype(np.float32) * 255.0 / safe_a, 0, 255).astype(np.uint8) rgba[..., 3] = alpha return rgba def make_rotation_cache(spr: Sprite) -> list[Sprite]: """Pre-compute rotated sprites at ROTATION_STEPS even angles. Index 0 = 0°, etc.""" rgba = _sprite_to_rgba(spr) img = Image.fromarray(rgba) cache: list[Sprite] = [] for i in range(ROTATION_STEPS): deg = i * (360.0 / ROTATION_STEPS) # PIL rotates CCW; we want CW rotation for heading, so negate rot = img.rotate(-deg, resample=Image.Resampling.BILINEAR, expand=True) cache.append(make_sprite(np.asarray(rot).copy())) return cache def precompute_headings(px: np.ndarray, py: np.ndarray) -> np.ndarray: """Compute heading angle (degrees, 0=up/north, CW) per entity per frame. Returns (n_entities, n_frames) float32 array. -1 where invalid. """ n_ents, n_frames = px.shape headings = np.full((n_ents, n_frames), -1.0, dtype=np.float32) for i in range(n_ents): last_heading = 0.0 for f in range(1, n_frames): if px[i, f] < 0 or px[i, f - 1] < 0: headings[i, f] = last_heading continue dx = float(px[i, f] - px[i, f - 1]) dy = float(py[i, f] - py[i, f - 1]) if abs(dx) < 0.5 and abs(dy) < 0.5: headings[i, f] = last_heading continue # atan2(dx, -dy): 0=up, 90=right, 180=down, 270=left deg = math.degrees(math.atan2(dx, -dy)) % 360 last_heading = deg headings[i, f] = deg # Fill frame 0 with frame 1's heading if n_frames > 1: headings[i, 0] = headings[i, 1] return headings def heading_to_rot_index(deg: float) -> int: """Convert a heading in degrees to a rotation cache index.""" return int(round(deg / (360.0 / ROTATION_STEPS))) % ROTATION_STEPS def blit(buf: np.ndarray, spr: Sprite, x: int, y: int) -> None: x1, y1 = max(x, 0), max(y, 0) H, W = buf.shape[0], buf.shape[1] x2, y2 = min(x + spr.w, W), min(y + spr.h, H) if x1 >= x2 or y1 >= y2: return sy1, sx1 = y1 - y, x1 - x sy2, sx2 = sy1 + (y2 - y1), sx1 + (x2 - x1) pm = spr.pm16[sy1:sy2, sx1:sx2] buf[y1:y2, x1:x2] = (pm + ((buf[y1:y2, x1:x2] * spr.ia[sy1:sy2, sx1:sx2]) >> 8) ).astype(np.uint8) def blit_alpha(buf: np.ndarray, spr: Sprite, x: int, y: int, alpha: float) -> None: """Like blit() but with an additional [0,1] alpha multiplier.""" if alpha <= 0: return x1, y1 = max(x, 0), max(y, 0) H, W = buf.shape[0], buf.shape[1] x2, y2 = min(x + spr.w, W), min(y + spr.h, H) if x1 >= x2 or y1 >= y2: return sy1, sx1 = y1 - y, x1 - x sy2, sx2 = sy1 + (y2 - y1), sx1 + (x2 - x1) a16 = int(alpha * 256) pm = (spr.pm16[sy1:sy2, sx1:sx2] * a16) >> 8 ia = 255 - (((255 - spr.ia[sy1:sy2, sx1:sx2]) * a16) >> 8) buf[y1:y2, x1:x2] = (pm + ((buf[y1:y2, x1:x2] * ia) >> 8)).astype(np.uint8) def blit_batch(buf: np.ndarray, items: list[tuple[Sprite, int, int]]) -> None: """Blit multiple sprites in one call — avoids per-call Python/numpy overhead.""" H = buf.shape[0] W = buf.shape[1] for spr, x, y in items: x1 = max(x, 0); y1 = max(y, 0) x2 = min(x + spr.w, W); y2 = min(y + spr.h, H) if x1 >= x2 or y1 >= y2: continue sy1 = y1 - y; sx1 = x1 - x sy2 = sy1 + (y2 - y1); sx2 = sx1 + (x2 - x1) pm = spr.pm16[sy1:sy2, sx1:sx2] buf[y1:y2, x1:x2] = (pm + ((buf[y1:y2, x1:x2] * spr.ia[sy1:sy2, sx1:sx2]) >> 8) ).astype(np.uint8) # ── VideoCtx — all pre-computed data for one output video ───────────────────── @dataclass class VideoCtx: """Pre-computed rendering context for one replay video. Holds all interpolated positions, colors, death states, kill/damage events, label sprites, and the baked background for every frame so that render_one_ctx can draw each frame without re-computing anything. """ n_players: int px_all: np.ndarray # (n_players, n_frames) int16, -1=absent py_all: np.ndarray colors_arr: np.ndarray # (n_players, 3) uint8 trail_colors_arr: np.ndarray # (n_players, 3) uint8 is_dead: np.ndarray # (n_players, n_frames) bool death_frame: np.ndarray # (n_players,) int32 — frame of death, or n_frames death_fade: np.ndarray # (n_players, n_frames) float32 — 1→0 over ghost ttl last_dead_px: np.ndarray # (n_players,) int16 — last known x at death last_dead_py: np.ndarray # (n_players,) int16 — last known y at death kills_by_frame: list # per-frame kill events damages_by_frame: list # per-frame damage events label_sprites: list # list[Sprite] combined name+vehicle per player bg_arr: np.ndarray # pre-baked background RGB end_frame: int n_drones: int px_drone: np.ndarray | None py_drone: np.ndarray | None drone_colors_arr: np.ndarray | None drone_trail_colors: np.ndarray | None drone_sprites: list # list[Sprite] drone_is_hit: np.ndarray | None = None # True from kill onward (for fade) drone_hit_fade: np.ndarray | None = None # 1→0 from kill to crash drone_is_crashed: np.ndarray | None = None # True after path ends drone_crash_frame: np.ndarray | None = None drone_last_dead_px: np.ndarray | None = None drone_last_dead_py: np.ndarray | None = None drone_alt: np.ndarray | None = None drone_alt_sprites: dict = field(default_factory=dict) n_aircraft: int = 0 px_air: np.ndarray | None = None py_air: np.ndarray | None = None air_colors_arr: np.ndarray | None = None air_trail_colors: np.ndarray | None = None air_sprites: list = field(default_factory=list) air_is_hit: np.ndarray | None = None # True from kill onward air_hit_fade: np.ndarray | None = None # 1→0 from kill to crash air_is_crashed: np.ndarray | None = None # True after path ends air_crash_frame: np.ndarray | None = None air_last_dead_px: np.ndarray | None = None air_last_dead_py: np.ndarray | None = None air_alt: np.ndarray | None = None air_alt_sprites: dict = field(default_factory=dict) air_trail_f: int = 1 drone_trail_f: int = 1 # Scaled circle masks dot_dy: np.ndarray = field(default_factory=lambda: DOT_DY) dot_dx: np.ndarray = field(default_factory=lambda: DOT_DX) shadow_dy: np.ndarray = field(default_factory=lambda: SHADOW_DY) shadow_dx: np.ndarray = field(default_factory=lambda: SHADOW_DX) drone_dy: np.ndarray = field(default_factory=lambda: DRONE_DY) drone_dx: np.ndarray = field(default_factory=lambda: DRONE_DX) air_dy: np.ndarray = field(default_factory=lambda: AIR_DY) air_dx: np.ndarray = field(default_factory=lambda: AIR_DX) dot_r: int = DOT_R drone_r: int = DRONE_R air_r: int = AIR_R canvas: int = CANVAS_MIN # Per-player vehicle icon sprites (tinted to player color) player_icon_sprites: list = field(default_factory=list) # list[Sprite], len=n_players player_dead_sprites: list = field(default_factory=list) # list[Sprite], black versions air_icon_sprites: list = field(default_factory=list) # list[Sprite], len=n_aircraft air_dead_sprites: list = field(default_factory=list) # list[Sprite], black versions # Per-aircraft rotation caches and heading angles air_rot_caches: list = field(default_factory=list) # list[list[Sprite]], per-aircraft air_dead_rot_caches: list = field(default_factory=list) # list[list[Sprite]], black versions air_headings: np.ndarray | None = None # (n_aircraft, n_frames) float32 # Per-drone icon sprites + rotation drone_icon_sprites: list = field(default_factory=list) drone_dead_sprites: list = field(default_factory=list) drone_rot_caches: list = field(default_factory=list) drone_dead_rot_caches: list = field(default_factory=list) drone_headings: np.ndarray | None = None # Kill/damage target icons kill_target_spr: Sprite | None = None dmg_target_spr: Sprite | None = None # Dynamic capture-state fills (per-cap, per-frame); empty for pre-v2 replays cap_overlays: list = field(default_factory=list) # ── Map / LevelDef load ──────────────────────────────────────────────────────── def _clean_map_key(raw: str) -> str: key = Path(raw).name.strip().rstrip("*") if key.endswith(".png"): key = key[:-4] return key def _load_json_file(path: Path) -> dict | None: try: import gzip as _gzip raw = path.read_bytes() if path.suffix == ".gz": raw = _gzip.decompress(raw) return json.loads(raw) except Exception as e: print(f" Failed to parse {path.name}: {e}") return None def _normalize_mission_relpath(path: str) -> str: rel = str(path or "").strip().replace("\\", "/").lstrip("/") if rel[:9].lower() == "gamedata/": rel = "gamedata/" + rel[9:] return rel.lower() def _mission_path_candidates(path: str) -> list[Path]: rel = _normalize_mission_relpath(path) if not rel: return [] out: list[Path] = [] def add(p: Path) -> None: if p not in out: out.append(p) def add_variants(root: Path) -> None: raw = root / rel suffix = raw.suffix.lower() if suffix == ".blk": add(raw.with_suffix(".blkx")) add(raw) elif suffix == ".blkx": add(raw) add(raw.with_suffix(".blk")) else: add(raw) add(raw.with_suffix(".blkx")) add(raw.with_suffix(".blk")) # Prefer local mission bundle in SHARED/MAPS/LEVELS/MISSIONS. add_variants(LOCAL_MISSIONS_DIR) # Fallback to full datamine clone for anything not yet copied locally. add_variants(WT_MISSIONS_DIR) return out def _load_mission_def_from_path(path: str) -> tuple[dict | None, Path | None]: for p in _mission_path_candidates(path): if not p.exists(): continue data = _load_json_file(p) if isinstance(data, dict): return data, p return None, None def load_mission_def(level_settings_path: str) -> tuple[dict | None, Path | None]: """Load mission settings .blkx referenced by replay Mission.LevelSettings.""" return _load_mission_def_from_path(level_settings_path) def _mission_use_alt_map_coord(mission_def: dict | None) -> bool: """Return whether mission settings request alternative tank map coords.""" if not isinstance(mission_def, dict): return False ms = mission_def.get("mission_settings") if not isinstance(ms, dict): return False mission_cfg = ms.get("mission") if not isinstance(mission_cfg, dict): return False val = mission_cfg.get("useAlternativeMapCoord") if isinstance(val, bool): return val per_diff = mission_cfg.get("mission") if isinstance(per_diff, list): for item in per_diff: if isinstance(item, dict) and item.get("useAlternativeMapCoord") is True: return True return False def _mission_level_override(mission_def: dict | None) -> str: if not isinstance(mission_def, dict): return "" ms = mission_def.get("mission_settings") if not isinstance(ms, dict): return "" mission_cfg = ms.get("mission") if not isinstance(mission_cfg, dict): return "" lvl = mission_cfg.get("level", "") return lvl if isinstance(lvl, str) else "" def _import_paths(mission_def: dict) -> list[str]: imports = mission_def.get("imports") if not isinstance(imports, dict): return [] import_record = imports.get("import_record") records = import_record if isinstance(import_record, list) else [import_record] out: list[str] = [] for rec in records: if not isinstance(rec, dict): continue f = rec.get("file") if isinstance(f, str) and f.strip(): out.append(f) return out def _collect_mission_areas(root_def: dict | None, root_path: Path | None) -> dict[str, dict]: if not isinstance(root_def, dict): return {} areas: dict[str, dict] = {} queue: list[tuple[dict, Path | None]] = [(root_def, root_path)] seen: set[Path] = set() if root_path: seen.add(root_path.resolve()) while queue: cur_def, _ = queue.pop(0) cur_areas = cur_def.get("areas") if isinstance(cur_areas, dict): for name, area in cur_areas.items(): if isinstance(name, str) and isinstance(area, dict) and name not in areas: areas[name] = area for rel in _import_paths(cur_def): child_def, child_path = _load_mission_def_from_path(rel) if not isinstance(child_def, dict): continue resolved = child_path.resolve() if child_path else None if resolved and resolved in seen: continue if resolved: seen.add(resolved) queue.append((child_def, child_path)) return areas def _mission_battle_area_targets(mission_def: dict | None) -> list[str]: if not isinstance(mission_def, dict): return [] ms = mission_def.get("mission_settings") if not isinstance(ms, dict): return [] out: list[str] = [] seen: set[str] = set() def add(target: str) -> None: if target and target not in seen: seen.add(target) out.append(target) def walk(node: Any) -> None: if isinstance(node, dict): for key, value in node.items(): if key == "battleArea" and isinstance(value, dict): target = value.get("target") if isinstance(target, str): add(target) walk(value) elif isinstance(node, list): for item in node: walk(item) walk(ms) return out def _battle_type_candidates(battle_type: str) -> list[str]: bt = _clean_map_key(battle_type).lower() if not bt: return [] mode = bt.split("_")[-1] out: list[str] = [] if mode: out.append(f"{mode}_battle_area_realistic") out.append(f"{mode}_battle_area_arcade") out.append(f"{mode}_battle_area_hardcore") out.append(f"{mode}_battle_area") if "dom" in bt: out.append("dom_battle_area_realistic") out.append("dom_battle_area_arcade") out.append("dom_battle_area_hardcore") if "conq" in bt: out.append("conq_battle_area_realistic") out.append("conq_battle_area_arcade") out.append("conq_battle_area_hardcore") dedup: list[str] = [] seen: set[str] = set() for name in out: if name not in seen: seen.add(name) dedup.append(name) return dedup def _battle_area_variant_priority(name: str) -> int: ln = name.lower() if "realistic" in ln: return 0 if "hardcore" in ln: return 1 if "arcade" in ln: return 2 if "briefing" in ln or "brief_" in ln: return 3 return 4 def _select_battle_area_name(mission_def: dict | None, areas: dict[str, dict], battle_type: str) -> str: if not areas: return "" for target in _mission_battle_area_targets(mission_def): if target in areas: return target for name in _battle_type_candidates(battle_type): if name in areas: return name return "" def _box_bounds_from_tm(area: dict) -> tuple[list[float], list[float]] | None: if area.get("type") != "Box": return None tm = area.get("tm") if not isinstance(tm, list) or len(tm) < 4: return None try: axes = np.array(tm[:3], dtype=np.float64) center = np.array(tm[3], dtype=np.float64) except Exception: return None if axes.shape != (3, 3) or center.shape[0] < 3: return None xs: list[float] = [] zs: list[float] = [] for sx in (-0.5, 0.5): for sy in (-0.5, 0.5): for sz in (-0.5, 0.5): pt = center + sx * axes[0] + sy * axes[1] + sz * axes[2] xs.append(float(pt[0])) zs.append(float(pt[2])) x0, x1 = min(xs), max(xs) z0, z1 = min(zs), max(zs) if x1 <= x0 or z1 <= z0: return None return [x0, z0], [x1, z1] def resolve_world_bounds(level_def: dict, use_alt_map_coord: bool, mission_def: dict | None, mission_def_path: Path | None, battle_type: str, ) -> tuple[list[float], list[float], str]: tc0, tc1, coord_src = select_tank_coords(level_def, use_alt_map_coord) areas = _collect_mission_areas(mission_def, mission_def_path) battle_name = _select_battle_area_name(mission_def, areas, battle_type) if not battle_name: return tc0, tc1, coord_src area = areas.get(battle_name) if not isinstance(area, dict): return tc0, tc1, coord_src bounds = _box_bounds_from_tm(area) if bounds is None: return tc0, tc1, coord_src c0, c1 = bounds return c0, c1, f"battleArea:{battle_name}" def _ground_points_for_bounds_fit(ground_entities: list[dict]) -> list[tuple[float, float]]: pts: list[tuple[float, float]] = [] for ent in ground_entities: for sample in ent.get("Path", []): try: pts.append((float(sample["X"]), float(sample["Z"]))) except Exception: continue return pts def _bounds_coverage(points: list[tuple[float, float]], c0: list[float], c1: list[float]) -> float: if not points: return 0.0 x_lo = min(float(c0[0]), float(c1[0])) x_hi = max(float(c0[0]), float(c1[0])) z_lo = min(float(c0[1]), float(c1[1])) z_hi = max(float(c0[1]), float(c1[1])) inside = 0 for x, z in points: if x_lo <= x <= x_hi and z_lo <= z <= z_hi: inside += 1 return inside / float(len(points)) def _fit_world_bounds_to_ground_activity(c0: list[float], c1: list[float], coord_src: str, mission_def: dict | None, mission_def_path: Path | None, battle_type: str, ground_entities: list[dict], ) -> tuple[list[float], list[float], str]: pts = _ground_points_for_bounds_fit(ground_entities) if not pts: return c0, c1, coord_src areas = _collect_mission_areas(mission_def, mission_def_path) if not areas: return c0, c1, coord_src names: list[str] = [] seen: set[str] = set() for name in _mission_battle_area_targets(mission_def): if name in areas and name not in seen: seen.add(name) names.append(name) for name in _battle_type_candidates(battle_type): if name in areas and name not in seen: seen.add(name) names.append(name) if not names: return c0, c1, coord_src candidates: list[tuple[str, list[float], list[float], float, float]] = [] for name in names: area = areas.get(name) if not isinstance(area, dict): continue bounds = _box_bounds_from_tm(area) if bounds is None: continue bc0, bc1 = bounds cov = _bounds_coverage(pts, bc0, bc1) area_sz = abs((float(bc1[0]) - float(bc0[0])) * (float(bc1[1]) - float(bc0[1]))) candidates.append((name, bc0, bc1, cov, area_sz)) if not candidates: return c0, c1, coord_src cur_cov = _bounds_coverage(pts, c0, c1) best_name, best_c0, best_c1, best_cov, _ = sorted( candidates, key=lambda t: (-t[3], _battle_area_variant_priority(t[0]), t[4]) )[0] # Switch when current bounds miss noticeable movement. if best_cov > cur_cov + 0.02: return best_c0, best_c1, f"battleArea:{best_name}|fit={best_cov:.3f}" return c0, c1, coord_src def _capture_mode_prefixes(battle_type: str) -> list[str]: bt = _clean_map_key(battle_type).lower() if "conq" in bt: return ["conq_capture_area_"] if "dom" in bt: return ["dom_capture_area_"] if "bttl" in bt: return ["bttl_t1_capture_area_", "bttl_t2_capture_area_"] return [] def _capture_sort_key(name: str) -> tuple[int, str]: digits = "".join(ch for ch in name if ch.isdigit()) return (int(digits) if digits else 999, name) def _capture_radius_from_tm(tm: list) -> float: try: a = np.array(tm[0], dtype=np.float64) b = np.array(tm[2], dtype=np.float64) ra = float(np.hypot(a[0], a[2])) rb = float(np.hypot(b[0], b[2])) return max(ra, rb) except Exception: return 0.0 def _capture_tm_vectors(tm: list) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] | None: try: a0 = np.array(tm[0], dtype=np.float64) a1 = np.array(tm[1], dtype=np.float64) a2 = np.array(tm[2], dtype=np.float64) c = np.array(tm[3], dtype=np.float64) except Exception: return None if a0.shape[0] < 3 or a1.shape[0] < 3 or a2.shape[0] < 3 or c.shape[0] < 3: return None return a0, a1, a2, c def resolve_capture_areas(mission_def: dict | None, mission_def_path: Path | None, battle_type: str) -> list[dict]: """Resolve gameplay capture areas from mission imports (mode-specific).""" areas = _collect_mission_areas(mission_def, mission_def_path) if not areas: return [] prefixes = _capture_mode_prefixes(battle_type) capture_names: list[str] = [] for name, area in areas.items(): if not isinstance(area, dict): continue lname = name.lower() if "capture_area" not in lname or lname.startswith("briefing_"): continue if prefixes and not any(lname.startswith(p) for p in prefixes): continue capture_names.append(name) if not capture_names: return [] # Keep one variant per area base (prefer arcade, then realistic, hardcore). suffix_prio = ["_arcade", "_realistic", "_hardcore", "_simulator", ""] groups: dict[str, dict[str, str]] = {} for name in capture_names: lname = name.lower() suffix = "" base = lname for sfx in suffix_prio[:-1]: if lname.endswith(sfx): suffix = sfx base = lname[:-len(sfx)] break groups.setdefault(base, {})[suffix] = name chosen: list[str] = [] for base in sorted(groups.keys(), key=_capture_sort_key): variants = groups[base] pick = "" for sfx in suffix_prio: if sfx in variants: pick = variants[sfx] break if pick: chosen.append(pick) out: list[dict] = [] for name in chosen: area = areas.get(name) if not isinstance(area, dict): continue tm = area.get("tm") if not isinstance(tm, list) or len(tm) < 4 or not isinstance(tm[3], list): continue try: cx = float(tm[3][0]) cz = float(tm[3][2]) except Exception: continue tm_vecs = _capture_tm_vectors(tm) if tm_vecs is not None: a0, _a1, a2, center = tm_vecs tm_data = { "a0": [float(a0[0]), float(a0[1]), float(a0[2])], "a2": [float(a2[0]), float(a2[1]), float(a2[2])], "center": [float(center[0]), float(center[1]), float(center[2])], } else: tm_data = None out.append({ "name": name, "type": str(area.get("type", "")), "x": cx, "z": cz, "radius": _capture_radius_from_tm(tm), "tm": tm_data, }) # If exactly one cap is a strong outlier (>2x median sibling radius), # halve only that cap (including tm X/Z basis so rendered outline matches). if len(out) >= 3: radii = sorted(float(c["radius"]) for c in out if float(c["radius"]) > 0.0) if len(radii) >= 3: median_r = radii[len(radii) // 2] if median_r > 0.0: outlier = [ i for i, c in enumerate(out) if float(c.get("radius", 0.0)) > (2.0 * median_r) ] if len(outlier) == 1: oi = outlier[0] out[oi]["radius"] = float(out[oi]["radius"]) * 0.5 tm = out[oi].get("tm") if isinstance(tm, dict): for key in ("a0", "a2"): vec = tm.get(key) if isinstance(vec, list) and len(vec) >= 3: tm[key] = [float(vec[0]) * 0.5, float(vec[1]) * 0.5, float(vec[2]) * 0.5] return out def _world_to_map_px(x: float, z: float, x0: float, z0: float, xr: float, zr: float, canvas: int) -> tuple[int, int]: px = int(round((x - x0) / xr * canvas)) py = int(round((z0 + zr - z) / zr * canvas)) return px, py def _cap_outline_points_px(cap: dict, x0: float, z0: float, xr: float, zr: float, canvas: int) -> list[tuple[int, int]]: tm = cap.get("tm") if not isinstance(tm, dict): return [] a0 = tm.get("a0") a2 = tm.get("a2") center = tm.get("center") if not (isinstance(a0, list) and isinstance(a2, list) and isinstance(center, list)): return [] try: ax0, az0 = float(a0[0]), float(a0[2]) ax2, az2 = float(a2[0]), float(a2[2]) cx, cz = float(center[0]), float(center[2]) except Exception: return [] cap_type = str(cap.get("type", "")).lower() points: list[tuple[int, int]] = [] if cap_type in {"sphere", "cylinder"}: # Project the transformed local unit circle in XZ using tm basis vectors. steps = 64 for i in range(steps): t = (2.0 * math.pi * i) / steps wx = cx + math.cos(t) * ax0 + math.sin(t) * ax2 wz = cz + math.cos(t) * az0 + math.sin(t) * az2 points.append(_world_to_map_px(wx, wz, x0, z0, xr, zr, canvas)) return points if cap_type == "box": # Top-down rectangle from transformed local XZ square corners. corners = [(-0.5, -0.5), (0.5, -0.5), (0.5, 0.5), (-0.5, 0.5)] for sx, sz in corners: wx = cx + sx * ax0 + sz * ax2 wz = cz + sx * az0 + sz * az2 points.append(_world_to_map_px(wx, wz, x0, z0, xr, zr, canvas)) return points return [] def _map_name_candidates(level_path: str, level_def: dict | None, battle_type: str = "", level_settings_path: str = "") -> list[str]: stem = Path(level_path).stem out: list[str] = [] seen: set[str] = set() def add(name: str) -> None: if not name or name in seen: return seen.add(name) out.append(name) # The canonical ground map is "_tankmap" — this is exactly what the # web viewer requests (/api/match/minimap/), so try it first to stay in # sync with the canvas. customLevelMap points at the full/thumbnail map and # must not win over the real tankmap for a ground render. add(f"{stem}_tankmap") if isinstance(level_def, dict): tank_custom = level_def.get("customLevelTankMap", "") if isinstance(tank_custom, str) and tank_custom: add(_clean_map_key(tank_custom)) full_custom = level_def.get("customLevelMap", "") if isinstance(full_custom, str) and full_custom: full_key = _clean_map_key(full_custom) add(full_key) if full_key.endswith("_map"): add(full_key[:-4] + "_tankmap") # Mission-specific identifiers from replay fields (mode/scenario variants). if battle_type: bkey = _clean_map_key(battle_type) add(f"{bkey}_tankmap") add(f"{bkey}_map") add(bkey) if level_settings_path: mkey = _clean_map_key(Path(level_settings_path).stem) add(f"{mkey}_tankmap") add(f"{mkey}_map") add(mkey) add(f"{stem}_tankmap") add(stem) return out def _bounds_almost_equal(c0: list[float], c1: list[float], r0: list[float], r1: list[float], eps: float = 1e-6) -> bool: return ( abs(float(c0[0]) - float(r0[0])) <= eps and abs(float(c0[1]) - float(r0[1])) <= eps and abs(float(c1[0]) - float(r1[0])) <= eps and abs(float(c1[1]) - float(r1[1])) <= eps ) def _expand_bounds_by_pixels(c0: list[float], c1: list[float], canvas: int, pad_px: int) -> tuple[list[float], list[float]]: if canvas <= 0 or pad_px <= 0: return [float(c0[0]), float(c0[1])], [float(c1[0]), float(c1[1])] x_lo = min(float(c0[0]), float(c1[0])) x_hi = max(float(c0[0]), float(c1[0])) z_lo = min(float(c0[1]), float(c1[1])) z_hi = max(float(c0[1]), float(c1[1])) xr = x_hi - x_lo zr = z_hi - z_lo if xr <= 0.0 or zr <= 0.0: return [float(c0[0]), float(c0[1])], [float(c1[0]), float(c1[1])] pad_x = xr * (float(pad_px) / float(canvas)) pad_z = zr * (float(pad_px) / float(canvas)) return [x_lo - pad_x, z_lo - pad_z], [x_hi + pad_x, z_hi + pad_z] def _clamp_bounds_to_base(c0: list[float], c1: list[float], base_c0: list[float], base_c1: list[float]) -> tuple[list[float], list[float]]: bx_lo = min(float(base_c0[0]), float(base_c1[0])) bx_hi = max(float(base_c0[0]), float(base_c1[0])) bz_lo = min(float(base_c0[1]), float(base_c1[1])) bz_hi = max(float(base_c0[1]), float(base_c1[1])) x_lo = min(float(c0[0]), float(c1[0])) x_hi = max(float(c0[0]), float(c1[0])) z_lo = min(float(c0[1]), float(c1[1])) z_hi = max(float(c0[1]), float(c1[1])) x_lo = max(bx_lo, min(x_lo, bx_hi)) x_hi = max(bx_lo, min(x_hi, bx_hi)) z_lo = max(bz_lo, min(z_lo, bz_hi)) z_hi = max(bz_lo, min(z_hi, bz_hi)) if x_hi <= x_lo: x_lo, x_hi = bx_lo, bx_hi if z_hi <= z_lo: z_lo, z_hi = bz_lo, bz_hi return [x_lo, z_lo], [x_hi, z_hi] def _crop_map_to_world_bounds(img: Image.Image, base_c0: list[float], base_c1: list[float], render_c0: list[float], render_c1: list[float], ) -> Image.Image: w, h = img.size bx0, bz0 = float(base_c0[0]), float(base_c0[1]) bx1, bz1 = float(base_c1[0]), float(base_c1[1]) rx0, rz0 = float(render_c0[0]), float(render_c0[1]) rx1, rz1 = float(render_c1[0]), float(render_c1[1]) dx = bx1 - bx0 dz = bz1 - bz0 if dx == 0 or dz == 0: return img x_lo = min(rx0, rx1) x_hi = max(rx0, rx1) z_lo = min(rz0, rz1) z_hi = max(rz0, rz1) u0 = (x_lo - bx0) / dx u1 = (x_hi - bx0) / dx v0 = (z_lo - bz0) / dz v1 = (z_hi - bz0) / dz left = int(np.floor(np.clip(min(u0, u1), 0.0, 1.0) * w)) right = int(np.ceil(np.clip(max(u0, u1), 0.0, 1.0) * w)) top = int(np.floor((1.0 - np.clip(max(v0, v1), 0.0, 1.0)) * h)) bottom = int(np.ceil((1.0 - np.clip(min(v0, v1), 0.0, 1.0)) * h)) left = max(0, min(left, w - 1)) right = max(left + 1, min(right, w)) top = max(0, min(top, h - 1)) bottom = max(top + 1, min(bottom, h)) return img.crop((left, top, right, bottom)) @lru_cache(maxsize=8) def _load_capture_icon_rgba(label: str) -> Image.Image | None: letter = (label or "").strip().lower() candidates: list[Path] = [] if len(letter) == 1 and "a" <= letter <= "z": candidates.append(ICONS_DIR / f"capture_{letter}.png") candidates.append(ICONS_DIR / "cap_icon.png") for path in candidates: if not path.exists(): continue try: return Image.open(path).convert("RGBA") except Exception: continue return None def _draw_capture_icon(out: Image.Image, label: str, px: int, py: int, size: int, alpha: int = 120) -> None: base = _load_capture_icon_rgba(label) if base is None: return s = max(8, int(size)) icon = base.resize((s, s), Image.Resampling.LANCZOS) if alpha < 255: scale = alpha / 255.0 a_arr = np.asarray(icon.getchannel("A"), dtype=np.float32) a_arr = np.clip(np.round(a_arr * scale), 0.0, 255.0).astype(np.uint8) icon.putalpha(Image.fromarray(a_arr, mode="L")) out.paste(icon, (px - s // 2, py - s // 2), icon) def _pick_contrast_outline_color(img: Image.Image, sample_points: list[tuple[int, int]]) -> tuple[int, int, int]: w, h = img.size if not sample_points: return (255, 255, 255) pixels = img.load() if pixels is None: return (255, 255, 255) lumas: list[float] = [] for x, y in sample_points: if x < 0 or y < 0 or x >= w or y >= h: continue raw = pixels[x, y] if isinstance(raw, int): r = g = b = raw elif isinstance(raw, tuple): if len(raw) >= 3: r, g, b = int(raw[0]), int(raw[1]), int(raw[2]) elif len(raw) == 1: r = g = b = int(raw[0]) else: continue else: continue lumas.append(0.2126 * r + 0.7152 * g + 0.0722 * b) if not lumas: return (255, 255, 255) avg = sum(lumas) / len(lumas) return (255, 255, 255) if avg < 128.0 else (0, 0, 0) def _draw_capture_areas(img: Image.Image, capture_areas: list[dict], c0: list[float], c1: list[float]) -> Image.Image: if not capture_areas: return img x0, z0 = float(c0[0]), float(c0[1]) x1, z1 = float(c1[0]), float(c1[1]) xr = x1 - x0 zr = z1 - z0 if xr == 0 or zr == 0: return img out = img.copy() draw = ImageDraw.Draw(out) canvas = out.size[0] labels = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" stroke_w = 3 for idx, cap in enumerate(capture_areas): cx = float(cap.get("x", 0.0)) cz = float(cap.get("z", 0.0)) px, py = _world_to_map_px(cx, cz, x0, z0, xr, zr, canvas) outline_points = _cap_outline_points_px(cap, x0, z0, xr, zr, canvas) label = labels[idx] if idx < len(labels) else str(idx + 1) icon_size = 18 rp = 8 if len(outline_points) >= 3: outline = (255, 255, 255) draw.line(outline_points + [outline_points[0]], fill=outline, width=stroke_w) # Keep icon area at ~1/4 of cap area (diamond area ~= s^2/2). area2 = 0.0 n_pts = len(outline_points) for i in range(n_pts): x0p, y0p = outline_points[i] x1p, y1p = outline_points[(i + 1) % n_pts] area2 += float(x0p * y1p - x1p * y0p) cap_area = abs(area2) * 0.5 if cap_area > 0.0: icon_size = max(10, min(canvas, int(round(math.sqrt(cap_area / 2.0))))) else: rr = max(0.0, float(cap.get("radius", 0.0))) rp = int(round(rr * ((canvas / xr + canvas / zr) * 0.5))) rp = max(8, rp) if px < -rp or py < -rp or px > canvas + rp or py > canvas + rp: continue ring_samples = [ (px + rp, py), (px - rp, py), (px, py + rp), (px, py - rp), (px + int(rp * 0.707), py + int(rp * 0.707)), (px - int(rp * 0.707), py + int(rp * 0.707)), (px + int(rp * 0.707), py - int(rp * 0.707)), (px - int(rp * 0.707), py - int(rp * 0.707)), ] outline = (255, 255, 255) draw.ellipse((px - rp, py - rp, px + rp, py + rp), outline=outline, width=stroke_w) cap_area = math.pi * float(rp * rp) icon_size = max(10, min(canvas, int(round(math.sqrt(cap_area / 2.0))))) _draw_capture_icon(out, label, px, py, icon_size, alpha=90) return out # ── Capture-state fill (dynamic, per-frame) ──────────────────────────────────── # Colors match the web canvas (RC.WIN / RC.LOSE). CAP_FILL_WIN = (0, 200, 0) CAP_FILL_LOSE = (220, 30, 30) CAP_FILL_ALPHA = 0.5 def _precompute_cap_overlays(capture_areas: list[dict], zones: dict, winner_slot: int, frame_times: np.ndarray, c0: list[float], c1: list[float], canvas: int) -> list[dict]: """Build per-cap, per-frame fill data from the Spectra zone `cap` timelines. `cap` sign is the owning team slot (positive == slot 2, negative == slot 1); a cap is the winner's when that slot == ``winner_slot``. capture_areas is index-mapped to zone letters A/B/C…; each overlay carries the diamond outline (for clipping), center, and per-frame fill fraction + winner/loser flag. """ overlays: list[dict] = [] if not capture_areas or not isinstance(zones, dict) or not zones: return overlays x0, z0 = float(c0[0]), float(c0[1]) x1, z1 = float(c1[0]), float(c1[1]) xr, zr = x1 - x0, z1 - z0 if xr == 0 or zr == 0: return overlays labels = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" for idx, cap in enumerate(capture_areas): letter = labels[idx] if idx < len(labels) else str(idx + 1) series = zones.get(letter) if not series: continue ts = np.array([float(p[0]) for p in series], dtype=np.float64) vs = np.array([float(p[1]) for p in series], dtype=np.float64) order = np.argsort(ts) ts, vs = ts[order], vs[order] # Zero-order hold: a cap holds its last recorded value until the next # change. Samples are only emitted when the value changes, so linear # interpolation across the long initial [0,0]→first-change gap would show # a false early fill before any player has reached the point. idx = np.clip(np.searchsorted(ts, frame_times, side="right") - 1, 0, len(vs) - 1) val = vs[idx] frac = np.clip(np.abs(val) / 100.0, 0.0, 1.0).astype(np.float32) # cap sign is the owning team slot: negative == slot 1, positive == slot 2 # (verified against unit occupancy in the zone). owner_slot = np.where(val > 0, 2, 1) is_winner = (owner_slot == winner_slot) cx = float(cap.get("x", 0.0)) cz = float(cap.get("z", 0.0)) px, py = _world_to_map_px(cx, cz, x0, z0, xr, zr, canvas) outline = _cap_outline_points_px(cap, x0, z0, xr, zr, canvas) rp = 0 if len(outline) < 3: outline = [] rr = max(0.0, float(cap.get("radius", 0.0))) rp = max(8, int(round(rr * ((canvas / xr + canvas / zr) * 0.5)))) overlays.append({ "cx": int(px), "cy": int(py), "rp": int(rp), "outline": outline, "frac": frac, "is_winner": is_winner, }) return overlays def _draw_cap_fill_np(buf: np.ndarray, overlay: dict, fi: int) -> None: """Alpha-composite a single cap's radial (pie) fill into the frame buffer.""" frac = float(overlay["frac"][fi]) if frac <= 0.01: return rgb = CAP_FILL_WIN if bool(overlay["is_winner"][fi]) else CAP_FILL_LOSE cx, cy = overlay["cx"], overlay["cy"] outline = overlay["outline"] if outline: xs = [p[0] for p in outline] ys = [p[1] for p in outline] minx, maxx, miny, maxy = min(xs), max(xs), min(ys), max(ys) else: rp = overlay["rp"] minx, maxx, miny, maxy = cx - rp, cx + rp, cy - rp, cy + rp minx = max(0, minx) miny = max(0, miny) maxx = min(buf.shape[1] - 1, maxx) maxy = min(buf.shape[0] - 1, maxy) w, h = maxx - minx, maxy - miny if w <= 0 or h <= 0: return R = max(w, h) pie = Image.new("L", (w, h), 0) end = -90.0 + 360.0 * frac ImageDraw.Draw(pie).pieslice( [cx - minx - R, cy - miny - R, cx - minx + R, cy - miny + R], -90.0, end, fill=255) if outline: poly = Image.new("L", (w, h), 0) ImageDraw.Draw(poly).polygon([(px - minx, py - miny) for px, py in outline], fill=255) pie = ImageChops.multiply(pie, poly) mask = (np.asarray(pie, dtype=np.float32) / 255.0) * CAP_FILL_ALPHA if not mask.any(): return region = buf[miny:maxy, minx:maxx].astype(np.float32) color = np.array(rgb, dtype=np.float32) m = mask[:, :, None] buf[miny:maxy, minx:maxx] = (region * (1.0 - m) + color * m).astype(np.uint8) def load_map_image(level_path: str, level_def: dict | None, battle_type: str = "", level_settings_path: str = "", base_coords: tuple[list[float], list[float]] | None = None, render_coords: tuple[list[float], list[float]] | None = None, canvas: int = CANVAS_MIN) -> Image.Image | None: """Load a local minimap PNG using level-def hints for tankmap naming.""" for map_key in _map_name_candidates(level_path, level_def, battle_type, level_settings_path): path = MINIMAPS_DIR / f"{map_key}.png" if path.exists(): print(f" Map image : {path.name}") img = Image.open(path).convert("RGB") if ( base_coords is not None and render_coords is not None and not _bounds_almost_equal( base_coords[0], base_coords[1], render_coords[0], render_coords[1], ) ): img = _crop_map_to_world_bounds( img, base_coords[0], base_coords[1], render_coords[0], render_coords[1], ) print(f" Map crop : world X=[{render_coords[0][0]}, {render_coords[1][0]}] " f"Z=[{render_coords[0][1]}, {render_coords[1][1]}]") return img.resize((canvas, canvas), Image.Resampling.LANCZOS) return None def load_level_coords(level_path: str, session_id: int = 0) -> dict | None: """Load tankMapCoord0/1 from local .blkx files (including datamine clone).""" del session_id # kept in signature for call-site compatibility stem = Path(level_path).stem candidates = [ LEVELS_DIR / f"{stem}.blkx", LEVELS_DIR / "DATAMINE" / f"{stem}.blkx", WT_LEVELS_DIR / f"{stem}.blkx", ] for blkx in candidates: if not blkx.exists(): continue data = _load_json_file(blkx) if not isinstance(data, dict): continue if "tankMapCoord0" in data and "tankMapCoord1" in data: print(f" LevelDef : {blkx}") return data print(f" LevelDef missing tank coords: {blkx.name}") return None def select_tank_coords(level_def: dict, use_alt_map_coord: bool ) -> tuple[list[float], list[float], str]: """Pick tank coordinate bounds according to mission mode flags.""" if use_alt_map_coord: ac0 = level_def.get("aiTanksMapCoord0") ac1 = level_def.get("aiTanksMapCoord1") if ( isinstance(ac0, list) and len(ac0) >= 2 and isinstance(ac1, list) and len(ac1) >= 2 ): return ac0, ac1, "aiTanksMapCoord" tc0 = level_def.get("tankMapCoord0") tc1 = level_def.get("tankMapCoord1") if ( isinstance(tc0, list) and len(tc0) >= 2 and isinstance(tc1, list) and len(tc1) >= 2 ): return tc0, tc1, "tankMapCoord" raise ValueError("LevelDef missing usable tank map coordinates") # ── Coordinate transform ─────────────────────────────────────────────────────── class CoordTransform: """Transforms world coordinates (X, Z) to pixel coordinates on the canvas. Args: x0: World X origin (left edge of the map). z0: World Z origin (bottom edge of the map). x1: World X extent (right edge of the map). z1: World Z extent (top edge of the map). canvas: Canvas size in pixels (square). """ def __init__(self, x0: float = 0, z0: float = 0, x1: float = 4096, z1: float = 4096, canvas: int = CANVAS_MIN): self.x0 = x0 self.z0 = z0 self.x_range = x1 - x0 self.z_range = z1 - z0 self.canvas = canvas def world_to_px(self, x: np.ndarray, z: np.ndarray): """Convert world X/Z arrays to pixel coordinates on the canvas. Args: x: World X positions as a numpy array. z: World Z positions as a numpy array. Returns: Tuple of (px, py) int16 numpy arrays in pixel space. """ px = ((x - self.x0) / self.x_range * self.canvas).astype(np.int16) py = ((self.z0 + self.z_range - z) / self.z_range * self.canvas).astype(np.int16) return px, py def point(self, x: float, z: float) -> tuple[int, int]: xi, zi = self.world_to_px(np.array([x]), np.array([z])) return int(np.clip(xi[0], 0, self.canvas - 1)), int(np.clip(zi[0], 0, self.canvas - 1)) # ── Pre-computation ──────────────────────────────────────────────────────────── def precompute_positions(players: list[dict], xfm: CoordTransform, frame_times: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Interpolate player world positions onto the frame time grid and convert to pixels. Args: players: List of player dicts, each containing a '_samples' key with time-stamped X/Z path data. xfm: Coordinate transform for world-to-pixel conversion. frame_times: 1-D array of frame timestamps in milliseconds. Returns: Tuple of (px_all, py_all) int16 arrays, shape (n_players, n_frames). -1 indicates the player is absent at that frame. """ n_p, n_f = len(players), len(frame_times) px_all = np.full((n_p, n_f), -1, dtype=np.int16) py_all = np.full((n_p, n_f), -1, dtype=np.int16) for i, p in enumerate(players): s = p["_samples"] t = np.array([x["Time"] for x in s], dtype=np.float64) xa = np.array([x["X"] for x in s], dtype=np.float64) za = np.array([x["Z"] for x in s], dtype=np.float64) mask = (frame_times >= t[0]) & (frame_times <= t[-1]) if not mask.any(): continue xi, zi = np.interp(frame_times[mask], t, xa), np.interp(frame_times[mask], t, za) pxi, pyi = xfm.world_to_px(xi, zi) in_bounds = (pxi >= 0) & (pxi < xfm.canvas) & (pyi >= 0) & (pyi < xfm.canvas) full_mask = np.where(mask)[0][in_bounds] px_all[i, full_mask] = pxi[in_bounds] py_all[i, full_mask] = pyi[in_bounds] return px_all, py_all def precompute_altitudes(players: list[dict], frame_times: np.ndarray) -> np.ndarray: """Return (n_players, n_frames) int16 array of altitude in metres. -1 = absent.""" n_p, n_f = len(players), len(frame_times) alt_all = np.full((n_p, n_f), -1, dtype=np.int16) for i, p in enumerate(players): s = p["_samples"] t = np.array([x["Time"] for x in s], dtype=np.float64) ya = np.array([x["Y"] for x in s], dtype=np.float64) mask = (frame_times >= t[0]) & (frame_times <= t[-1]) if not mask.any(): continue yi = np.interp(frame_times[mask], t, ya) alt_all[i, mask] = np.clip(yi, 0, 32767).astype(np.int16) return alt_all def precompute_kills(kills: list[dict], xfm: CoordTransform, t_start: float, ms_per_frame: float, n_frames: int, fps: int, font: ImageFont.FreeTypeFont | ImageFont.ImageFont, offset_x: int = 0, offset_y: int = 0, pid_pos: dict[int, tuple[np.ndarray, np.ndarray]] | None = None, ) -> list[list[tuple]]: """ Returns per-frame list of tuples: (vx, vy, kx, ky, age_frac, label_sprite | None) label_sprite is shown for the first half of the kill TTL then fades out. offset_x/y: crop origin to shift coordinates into crop-space. pid_pos: optional {PlayerID: (px_arr, py_arr)} for tracking moving entities. When provided, kill lines follow the entity's interpolated position each frame instead of using the static kill snapshot position. """ out: list[list[tuple]] = [[] for _ in range(n_frames)] kill_f = int(math.ceil(KILL_TTL * fps / 1000.0)) for k in kills: vp = k.get("VictimPosition") if not vp: continue kf = int((k["Time"] - t_start) / ms_per_frame) if not (0 <= kf < n_frames): continue # Static fallback positions from the kill event svx, svy = xfm.point(vp["X"], vp["Z"]) svx -= offset_x; svy -= offset_y kp = k.get("KillerPosition") if kp: skx, sky = xfm.point(kp["X"], kp["Z"]) skx -= offset_x; sky -= offset_y else: skx, sky = -1, -1 # Look up tracked position arrays for killer/victim kid = k.get("KillerID", 0) vid = k.get("VictimID", 0) k_tracked = pid_pos.get(kid) if pid_pos and kid else None v_tracked = pid_pos.get(vid) if pid_pos and vid else None killer_model = k.get("KillerModel", "") weapon = k.get("Weapon", "") label = make_kill_label(killer_model, weapon, font) if killer_model else None for df in range(kill_f): f = kf + df if f >= n_frames: break # Use tracked position if available and valid at this frame if v_tracked is not None and int(v_tracked[0][f]) >= 0: vx, vy = int(v_tracked[0][f]), int(v_tracked[1][f]) else: vx, vy = svx, svy if k_tracked is not None and int(k_tracked[0][f]) >= 0: kx, ky = int(k_tracked[0][f]), int(k_tracked[1][f]) else: kx, ky = skx, sky out[f].append((vx, vy, kx, ky, df / kill_f, label)) return out DMG_TTL = 2_000 # Damage line display duration (ms, video time) def precompute_damages(damages: list[dict], active: list[dict], px_all: np.ndarray, py_all: np.ndarray, t_start: float, ms_per_frame: float, n_frames: int, fps: int, ) -> list[list[tuple]]: """Build per-frame damage line events from raw damage reports. Each tuple is (ox, oy, vx, vy, age_frac) where positions are looked up from px_all/py_all at the damage time. Args: damages: Raw damage report dicts from the replay. active: Active player dicts (used for PlayerID-to-index mapping). px_all: Precomputed pixel X positions, shape (n_players, n_frames). py_all: Precomputed pixel Y positions, shape (n_players, n_frames). t_start: Replay start time in milliseconds. ms_per_frame: Milliseconds per video frame. n_frames: Total number of video frames. fps: Frames per second. Returns: List of lists, one per frame, containing damage line tuples. """ pid_to_idx = {p["PlayerID"]: i for i, p in enumerate(active)} out: list[list[tuple]] = [[] for _ in range(n_frames)] dmg_f = int(math.ceil(DMG_TTL * fps / 1000.0)) for dr in damages: off_id = dr.get("OffenderID", 0) vic_id = dr.get("OffendedID", 0) if off_id not in pid_to_idx or vic_id not in pid_to_idx: continue oi = pid_to_idx[off_id] vi = pid_to_idx[vic_id] kf = int((dr["Time"] - t_start) / ms_per_frame) if not (0 <= kf < n_frames): continue ox, oy = int(px_all[oi, kf]), int(py_all[oi, kf]) vx, vy = int(px_all[vi, kf]), int(py_all[vi, kf]) if ox < 0 or oy < 0 or vx < 0 or vy < 0: continue for df in range(dmg_f): f = kf + df if f >= n_frames: break out[f].append((ox, oy, vx, vy, df / dmg_f)) return out def precompute_deaths(active: list[dict], kills: list[dict], t_start: float, ms_per_frame: float, n_frames: int, fps: int, px_all: np.ndarray, py_all: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Compute per-player death state, fade-to-black curve, and last known position. Args: active: Active ground player dicts. kills: Kill event dicts from the replay. t_start: Replay start time in ms. ms_per_frame: Milliseconds per video frame. n_frames: Total video frames. fps: Frames per second. px_all: Precomputed pixel X positions. py_all: Precomputed pixel Y positions. Returns: Tuple of (is_dead, death_frame, death_fade, last_dead_px, last_dead_py). """ pid_to_idx = {p["PlayerID"]: i for i, p in enumerate(active)} n_p = len(active) ghost_f = max(1, int(GHOST_TTL * fps / 1000.0)) is_dead = np.zeros((n_p, n_frames), dtype=bool) death_frame = np.full(n_p, n_frames, dtype=np.int32) death_fade = np.zeros((n_p, n_frames), dtype=np.float32) last_dead_px = np.full(n_p, -1, dtype=np.int16) last_dead_py = np.full(n_p, -1, dtype=np.int16) for k in kills: vid = k.get("VictimID", 0) if vid not in pid_to_idx: continue idx = pid_to_idx[vid] kf = int((k["Time"] - t_start) / ms_per_frame) if not (0 <= kf < n_frames): continue is_dead[idx, kf + 1:] = True death_frame[idx] = kf # Find last valid position at or before death for f in range(kf, -1, -1): if px_all[idx, f] >= 0: last_dead_px[idx] = px_all[idx, f] last_dead_py[idx] = py_all[idx, f] break # Fade from 1.0 (full color) to 0.0 (black) over ghost_f video frames fade_len = min(ghost_f, n_frames - kf - 1) death_fade[idx, kf + 1:kf + 1 + fade_len] = np.maximum( 0.0, 1.0 - np.arange(1, fade_len + 1) / ghost_f ) # After fade completes, stays 0.0 (black) return is_dead, death_frame, death_fade, last_dead_px, last_dead_py def precompute_air_deaths(active: list[dict], kills: list[dict], t_start: float, ms_per_frame: float, n_frames: int, fps: int, px_all: np.ndarray, py_all: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Death state for aircraft/drones: they keep moving after kill until path ends (crash). Returns: is_hit: (n, n_frames) bool — True from kill onward (for color fade) hit_fade: (n, n_frames) float32 — 1→0 from kill to crash is_crashed: (n, n_frames) bool — True only after path data ends crash_frame: (n,) int32 — frame when path ends last_px/py: (n,) int16 — position at crash (last valid path point) """ pid_to_idx: dict[int, int] = {} eidx_to_idx: dict[int, int] = {} for i, p in enumerate(active): pid = p.get("PlayerID", 0) if pid: pid_to_idx[pid] = i eidx = p.get("EntityIndex", 0) if eidx: eidx_to_idx[eidx] = i n_p = len(active) is_hit = np.zeros((n_p, n_frames), dtype=bool) hit_fade = np.ones((n_p, n_frames), dtype=np.float32) is_crashed = np.zeros((n_p, n_frames), dtype=bool) crash_frame = np.full(n_p, n_frames, dtype=np.int32) last_px = np.full(n_p, -1, dtype=np.int16) last_py = np.full(n_p, -1, dtype=np.int16) # Find last valid position frame for each entity for i in range(n_p): for f in range(n_frames - 1, -1, -1): if px_all[i, f] >= 0: crash_frame[i] = f last_px[i] = px_all[i, f] last_py[i] = py_all[i, f] break # Mark crashed after last valid frame cf = int(crash_frame[i]) if cf < n_frames - 1: is_crashed[i, cf + 1:] = True ghost_f = max(1, int(GHOST_TTL * fps / 1000.0)) hit_set: set[int] = set() for k in kills: # Match by PlayerID first, then by EntityIndex (for drones with PlayerID=0) vid = k.get("VictimID", 0) idx = pid_to_idx.get(vid) if idx is None: veidx = k.get("VictimEntityIndex", 0) idx = eidx_to_idx.get(veidx) if idx is None: continue kf = int((k["Time"] - t_start) / ms_per_frame) if not (0 <= kf < n_frames): continue hit_set.add(idx) is_hit[idx, kf:] = True # Fade from 1.0 → 0.0 between kill frame and crash frame cf = int(crash_frame[idx]) fade_len = cf - kf if fade_len > 0: n_slots = min(fade_len + 1, n_frames - kf) hit_fade[idx, kf:kf + n_slots] = np.maximum( 0.0, 1.0 - np.arange(n_slots, dtype=np.float32) / fade_len ) hit_fade[idx, cf + 1:] = 0.0 # For entities not matched by any kill (e.g. drones with PlayerID=0), # fade to black over GHOST_TTL before their path ends (crash) for i in range(n_p): if i in hit_set: continue cf = int(crash_frame[i]) if cf >= n_frames: continue # path never ends in this clip fade_start = max(0, cf - ghost_f) fade_len = cf - fade_start if fade_len > 0: is_hit[i, fade_start:] = True n_slots = min(fade_len + 1, n_frames - fade_start) hit_fade[i, fade_start:fade_start + n_slots] = np.maximum( 0.0, 1.0 - np.arange(n_slots, dtype=np.float32) / fade_len ) hit_fade[i, cf + 1:] = 0.0 return is_hit, hit_fade, is_crashed, crash_frame, last_px, last_py def find_team_wipe_frame(active: list[dict], is_dead: np.ndarray, n_frames: int) -> int: """Find the first frame where all players on any one team are dead. Args: active: Active player dicts (must contain a 'Team' key). is_dead: Boolean array, shape (n_players, n_frames). n_frames: Total video frames. Returns: Frame index of the first team wipe, or n_frames if none occurs. """ team_groups: dict[int, list[int]] = {} for i, p in enumerate(active): team_groups.setdefault(p["Team"], []).append(i) for f in range(n_frames): for idxs in team_groups.values(): if idxs and all(is_dead[idx, f] for idx in idxs): return f return n_frames def build_drone_list(d: dict, ground_active: list[dict], xfm: CoordTransform, px_ground: np.ndarray, py_ground: np.ndarray, frame_times: np.ndarray, t_start: float, ms_per_frame: float, colors_arr: np.ndarray) -> list[dict]: n_frames = len(frame_times) drones = [] for e in d["Entities"]: if "ucav" not in e["ModelName"].lower() or not e["Path"]: continue spawn_t = e["Path"][0]["Time"] spawn_f = max(0, min(int((spawn_t - t_start) / ms_per_frame), n_frames - 1)) spx, spy = xfm.point(e["Path"][0]["X"], e["Path"][0]["Z"]) best_idx, best_dist = 0, float("inf") for i in range(len(ground_active)): pxi, pyi = int(px_ground[i, spawn_f]), int(py_ground[i, spawn_f]) if pxi < 0: continue dist = (pxi - spx) ** 2 + (pyi - spy) ** 2 if dist < best_dist: best_dist, best_idx = dist, i color = (colors_arr[best_idx].astype(np.float32) * 0.75).astype(np.uint8) drones.append({"entity": e, "color": color, "_samples": e["Path"], "EntityIndex": e["EntityIndex"]}) return drones # ── Circle masks ─────────────────────────────────────────────────────────────── def make_circle_masks(r: int) -> tuple[np.ndarray, np.ndarray]: y, x = np.ogrid[-r:r+1, -r:r+1] mask = x*x + y*y <= r*r ys, xs = np.where(mask) return (ys - r).astype(np.int32), (xs - r).astype(np.int32) SHADOW_DY, SHADOW_DX = make_circle_masks(DOT_R + 1) DOT_DY, DOT_DX = make_circle_masks(DOT_R) DRONE_DY, DRONE_DX = make_circle_masks(DRONE_R) def make_triangle_masks(r: int) -> tuple[np.ndarray, np.ndarray]: """Downward-pointing triangle of half-size r. Returns (dy, dx) offsets.""" pts = [] for y in range(-r, r + 1): # width narrows linearly from full at top (-r) to point at bottom (+r) half_w = int(r * (r - y) / (2 * r)) if r else 0 for x in range(-half_w, half_w + 1): pts.append((y, x)) if not pts: pts = [(0, 0)] arr = np.array(pts, dtype=np.int32) return arr[:, 0], arr[:, 1] AIR_DY, AIR_DX = make_triangle_masks(AIR_R) _TRAIL_DY = np.array([0, 0, 1, 1], dtype=np.int32) _TRAIL_DX = np.array([0, 1, 0, 1], dtype=np.int32) # ── Sprite factories ─────────────────────────────────────────────────────────── def _render_rgba(w: float, h: float, draw_fn) -> np.ndarray: img = Image.new("RGBA", (int(w), int(h)), (0, 0, 0, 0)) draw_fn(ImageDraw.Draw(img)) return np.asarray(img).copy() def make_name_sprites(names: list[str], font: ImageFont.FreeTypeFont | ImageFont.ImageFont) -> list[Sprite]: """Render a list of text strings into premultiplied Sprite objects. Args: names: Text labels to render. font: PIL font used for rendering. Returns: List of Sprite objects, one per name. """ dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1))) PAD = 3 # padding each side (covers stroke_width=1 + margin) out = [] for name in names: bb = dummy.textbbox((0, 0), name, font=font) w, h = bb[2] - bb[0] + PAD * 2, bb[3] - bb[1] + PAD * 2 def _draw(d, _n=name, _x=bb[0], _y=bb[1]): d.text((PAD - _x, PAD - _y), _n, font=font, fill=(255, 255, 255, 220), stroke_width=1, stroke_fill=(0, 0, 0, 200)) out.append(make_sprite(_render_rgba(w, h, _draw))) return out def merge_sprites(top: Sprite, bottom: Sprite, gap: int = -2) -> Sprite: """Stack two sprites vertically with a gap, return a single combined sprite.""" w = max(top.w, bottom.w) h = top.h + gap + bottom.h pm = np.zeros((h, w, 3), dtype=np.uint8) pm16 = np.zeros((h, w, 3), dtype=np.uint16) ia = np.full((h, w, 1), 255, dtype=np.uint16) # top sprite pm[:top.h, :top.w] = top.pm pm16[:top.h, :top.w] = top.pm16 ia[:top.h, :top.w] = top.ia # bottom sprite y_off = top.h + gap pm[y_off:y_off + bottom.h, :bottom.w] = bottom.pm pm16[y_off:y_off + bottom.h, :bottom.w] = bottom.pm16 ia[y_off:y_off + bottom.h, :bottom.w] = bottom.ia return Sprite(pm=pm, pm16=pm16, ia=ia, h=h, w=w) def _short_model(model_name: str) -> str: """Return human-readable vehicle name, falling back to internal ID.""" internal = model_name.split("/")[-1] return _translate_vehicle(internal) def make_kill_label(killer_model: str, weapon: str, font: ImageFont.FreeTypeFont | ImageFont.ImageFont) -> Sprite: """Render a kill event label showing the killer's vehicle and weapon. Args: killer_model: Internal model path of the killer vehicle. weapon: Internal weapon identifier. font: PIL font used for rendering. Returns: A Sprite containing the rendered kill label text. """ text = f"{_short_model(killer_model)} [{_translate_weapon(weapon)}]" dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1))) bb = dummy.textbbox((0, 0), text, font=font) PAD = 3 w, h = bb[2] - bb[0] + PAD * 2, bb[3] - bb[1] + PAD * 2 def _draw(d, _x=bb[0], _y=bb[1]): d.text((PAD - _x, PAD - _y), text, font=font, fill=(255, 230, 100, 230), stroke_width=1, stroke_fill=(0, 0, 0, 200)) return make_sprite(_render_rgba(w, h, _draw)) def make_hud_sprite(team_won: int, font: ImageFont.FreeTypeFont | ImageFont.ImageFont) -> Sprite: """Render a HUD overlay sprite displaying which team won. Args: team_won: Winning team index. font: PIL font used for rendering. Returns: A Sprite containing the "Team N wins" text. """ text = f"Team {team_won} wins" dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1))) bb = dummy.textbbox((0, 0), text, font=font) PAD = 3 w, h = bb[2] - bb[0] + PAD * 2, bb[3] - bb[1] + PAD * 2 def _draw(d, _x=bb[0], _y=bb[1]): d.text((PAD - _x, PAD - _y), text, font=font, fill=(160, 255, 120, 230), stroke_width=1, stroke_fill=(0, 0, 0, 180)) return make_sprite(_render_rgba(w, h, _draw)) def make_time_sprites(max_secs: int, speed: float, font: ImageFont.FreeTypeFont | ImageFont.ImageFont, ) -> dict[int, Sprite]: dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1))) out: dict[int, Sprite] = {} for sec in range(max_secs + 1): m, s = divmod(sec, 60) text = f"{m:02d}:{s:02d} ×{speed:.0f}" bb = dummy.textbbox((0, 0), text, font=font) w, h = bb[2] - bb[0] + 4, bb[3] - bb[1] + 4 def _draw(d, _t=text): d.text((2, 2), _t, font=font, fill=(210, 210, 210, 230)) out[sec] = make_sprite(_render_rgba(w, h, _draw)) return out # ── Drawing helpers ──────────────────────────────────────────────────────────── def draw_all_trails_np(buf: np.ndarray, fi: int, px_all: np.ndarray, py_all: np.ndarray, trail_f: int, trail_colors_arr: np.ndarray, is_dead: np.ndarray, death_frame: np.ndarray, canvas: int = CANVAS_MIN) -> None: n_players = px_all.shape[0] all_vx, all_vy, all_cols = [], [], [] for pi in range(n_players): # Dead players: anchor trail at death frame; alive: current frame if is_dead[pi, fi]: ef = int(death_frame[pi]) else: ef = fi start = max(0, ef - trail_f) pxs = px_all[pi, start:ef + 1] pys = py_all[pi, start:ef + 1] valid = (pxs >= 0) & (pys >= 0) if not valid.any(): continue vx = pxs[valid].astype(np.int32) vy = pys[valid].astype(np.int32) trail_len = ef + 1 - start ti = np.where(valid)[0] bright = ((ti + 1).astype(np.float32) / trail_len) ** 0.3 faded = (trail_colors_arr[pi] * bright[:, None]).astype(np.uint8) all_vx.append(vx) all_vy.append(vy) all_cols.append(faded) if not all_vx: return vx = np.concatenate(all_vx) vy = np.concatenate(all_vy) cols = np.concatenate(all_cols) all_ys = np.clip(vy[:, None] + _TRAIL_DY, 0, canvas - 1).ravel() all_xs = np.clip(vx[:, None] + _TRAIL_DX, 0, canvas - 1).ravel() buf[all_ys, all_xs] = np.repeat(cols, 4, axis=0) def draw_all_dots_np(buf: np.ndarray, fi: int, px_all: np.ndarray, py_all: np.ndarray, colors_arr: np.ndarray, shadow_color: np.ndarray, is_dead: np.ndarray | None = None, shadow_dy: np.ndarray = SHADOW_DY, shadow_dx: np.ndarray = SHADOW_DX, dot_dy: np.ndarray = DOT_DY, dot_dx: np.ndarray = DOT_DX, canvas: int = CANVAS_MIN) -> None: cxs = px_all[:, fi].astype(np.int32) cys = py_all[:, fi].astype(np.int32) valid = (cxs >= 0) & (cys >= 0) if is_dead is not None: valid &= ~is_dead[:, fi] if not valid.any(): return cx = cxs[valid]; cy = cys[valid] sy = np.clip(cy[:, None] + shadow_dy, 0, canvas - 1) sx = np.clip(cx[:, None] + shadow_dx, 0, canvas - 1) buf[sy.ravel(), sx.ravel()] = shadow_color nd = len(dot_dy) dy = np.clip(cy[:, None] + dot_dy, 0, canvas - 1) dx = np.clip(cx[:, None] + dot_dx, 0, canvas - 1) buf[dy.ravel(), dx.ravel()] = np.repeat(colors_arr[valid], nd, axis=0) def draw_dead_dots_np(buf: np.ndarray, fi: int, colors_arr: np.ndarray, is_dead: np.ndarray, death_fade: np.ndarray, last_dead_px: np.ndarray, last_dead_py: np.ndarray, dot_dy: np.ndarray = DOT_DY, dot_dx: np.ndarray = DOT_DX, canvas: int = CANVAS_MIN, ) -> None: """Draw dead player dots: fade from color to black, then stay black.""" dead = is_dead[:, fi] if not dead.any(): return cxs = last_dead_px[dead].astype(np.int32) cys = last_dead_py[dead].astype(np.int32) valid = (cxs >= 0) & (cys >= 0) if not valid.any(): return cx, cy = cxs[valid], cys[valid] fade = death_fade[dead, fi][valid] cols = (colors_arr[dead][valid].astype(np.float32) * fade[:, None]).astype(np.uint8) nd = len(dot_dy) dy = np.clip(cy[:, None] + dot_dy, 0, canvas - 1) dx = np.clip(cx[:, None] + dot_dx, 0, canvas - 1) buf[dy.ravel(), dx.ravel()] = np.repeat(cols, nd, axis=0) def draw_drone_dots_np(buf: np.ndarray, fi: int, px_drone: np.ndarray, py_drone: np.ndarray, drone_colors: np.ndarray, is_crashed: np.ndarray | None = None, hit_fade: np.ndarray | None = None, drone_dy: np.ndarray = DRONE_DY, drone_dx: np.ndarray = DRONE_DX, canvas: int = CANVAS_MIN) -> None: cxs = px_drone[:, fi].astype(np.int32) cys = py_drone[:, fi].astype(np.int32) valid = (cxs >= 0) & (cys >= 0) if is_crashed is not None: valid &= ~is_crashed[:, fi] if not valid.any(): return cx = cxs[valid]; cy = cys[valid] cols = drone_colors[valid] if hit_fade is not None: fade = hit_fade[valid, fi] cols = (cols.astype(np.float32) * fade[:, None]).astype(np.uint8) nd = len(drone_dy) dy = np.clip(cy[:, None] + drone_dy, 0, canvas - 1) dx = np.clip(cx[:, None] + drone_dx, 0, canvas - 1) buf[dy.ravel(), dx.ravel()] = np.repeat(cols, nd, axis=0) def draw_aircraft_np(buf: np.ndarray, fi: int, px_air: np.ndarray, py_air: np.ndarray, air_colors: np.ndarray, is_crashed: np.ndarray | None = None, hit_fade: np.ndarray | None = None, air_dy: np.ndarray = AIR_DY, air_dx: np.ndarray = AIR_DX, canvas: int = CANVAS_MIN) -> None: """Draw aircraft as small triangles, fading to black after hit.""" cxs = px_air[:, fi].astype(np.int32) cys = py_air[:, fi].astype(np.int32) valid = (cxs >= 0) & (cys >= 0) if is_crashed is not None: valid &= ~is_crashed[:, fi] if not valid.any(): return cx = cxs[valid]; cy = cys[valid] cols = air_colors[valid] if hit_fade is not None: fade = hit_fade[valid, fi] cols = (cols.astype(np.float32) * fade[:, None]).astype(np.uint8) nd = len(air_dy) dy = np.clip(cy[:, None] + air_dy, 0, canvas - 1) dx = np.clip(cx[:, None] + air_dx, 0, canvas - 1) buf[dy.ravel(), dx.ravel()] = np.repeat(cols, nd, axis=0) def draw_dead_entities_np(buf: np.ndarray, fi: int, colors_arr: np.ndarray, is_dead: np.ndarray, death_fade: np.ndarray, last_dead_px: np.ndarray, last_dead_py: np.ndarray, mask_dy: np.ndarray, mask_dx: np.ndarray, canvas: int = CANVAS_MIN) -> None: """Draw dead entity markers (circle or triangle) fading from color to black.""" dead = is_dead[:, fi] if not dead.any(): return cxs = last_dead_px[dead].astype(np.int32) cys = last_dead_py[dead].astype(np.int32) valid = (cxs >= 0) & (cys >= 0) if not valid.any(): return cx, cy = cxs[valid], cys[valid] fade = death_fade[dead, fi][valid] cols = (colors_arr[dead][valid].astype(np.float32) * fade[:, None]).astype(np.uint8) nd = len(mask_dy) dy = np.clip(cy[:, None] + mask_dy, 0, canvas - 1) dx = np.clip(cx[:, None] + mask_dx, 0, canvas - 1) buf[dy.ravel(), dx.ravel()] = np.repeat(cols, nd, axis=0) def draw_icon_sprites(buf: np.ndarray, fi: int, px_all: np.ndarray, py_all: np.ndarray, icon_sprites: list, is_dead: np.ndarray | None = None, is_crashed: np.ndarray | None = None, hit_fade: np.ndarray | None = None, rot_caches: list | None = None, headings: np.ndarray | None = None) -> None: """Draw per-entity icon sprites instead of circles/triangles.""" n = px_all.shape[0] for i in range(n): px_i, py_i = int(px_all[i, fi]), int(py_all[i, fi]) if px_i < 0 or py_i < 0: continue if is_dead is not None and is_dead[i, fi]: continue if is_crashed is not None and is_crashed[i, fi]: continue if rot_caches and headings is not None and headings[i, fi] >= 0: spr = rot_caches[i][heading_to_rot_index(headings[i, fi])] else: spr = icon_sprites[i] alpha = 1.0 if hit_fade is not None: alpha = float(hit_fade[i, fi]) if alpha <= 0: continue x, y = px_i - spr.w // 2, py_i - spr.h // 2 if alpha < 1.0: blit_alpha(buf, spr, x, y, alpha) else: blit(buf, spr, x, y) def draw_dead_icon_sprites(buf: np.ndarray, fi: int, icon_sprites: list, dead_sprites: list, is_dead: np.ndarray, death_fade: np.ndarray | None, last_dead_px: np.ndarray, last_dead_py: np.ndarray, rot_caches: list | None = None, dead_rot_caches: list | None = None, headings: np.ndarray | None = None) -> None: """Draw dead entity icons fading from color to black at their last known position.""" n = len(icon_sprites) for i in range(n): if not is_dead[i, fi]: continue px_i, py_i = int(last_dead_px[i]), int(last_dead_py[i]) if px_i < 0 or py_i < 0: continue # Pick rotated sprite if available (use last valid heading) if rot_caches and dead_rot_caches and headings is not None and headings[i, fi] >= 0: ri = heading_to_rot_index(headings[i, fi]) spr = rot_caches[i][ri] dspr = dead_rot_caches[i][ri] else: spr = icon_sprites[i] dspr = dead_sprites[i] x, y = px_i - spr.w // 2, py_i - spr.h // 2 blit(buf, dspr, x, y) if death_fade is not None: fade = float(death_fade[i, fi]) else: fade = 0.0 if fade > 0.0: blit_alpha(buf, spr, x, y, fade) def draw_air_trails_np(buf: np.ndarray, fi: int, px_all: np.ndarray, py_all: np.ndarray, trail_f: int, trail_colors: np.ndarray, is_crashed: np.ndarray | None, crash_frame: np.ndarray | None, canvas: int = CANVAS_MIN) -> None: """Short trails for aircraft/drones with line interpolation between frames. Anchors at crash_frame once crashed (path ended).""" n_ents = px_all.shape[0] all_vx, all_vy, all_cols = [], [], [] CM = canvas - 1 for ei in range(n_ents): if is_crashed is not None and crash_frame is not None and is_crashed[ei, fi]: ef = int(crash_frame[ei]) else: ef = fi start = max(0, ef - trail_f) pxs = px_all[ei, start:ef + 1] pys = py_all[ei, start:ef + 1] valid = (pxs >= 0) & (pys >= 0) if not valid.any(): continue vx = pxs[valid].astype(np.int32) vy = pys[valid].astype(np.int32) trail_len = ef + 1 - start ti = np.where(valid)[0] # Interpolate lines between consecutive valid points to fill gaps if len(vx) >= 2: seg_vx, seg_vy, seg_bright = [], [], [] for si in range(len(vx) - 1): x0, y0, x1, y1 = vx[si], vy[si], vx[si + 1], vy[si + 1] dist = max(abs(x1 - x0), abs(y1 - y0)) if dist <= 1: seg_vx.append(x0) seg_vy.append(y0) seg_bright.append((ti[si] + 1) / trail_len) else: n_pts = min(dist, 64) # cap to avoid huge arrays t = np.arange(n_pts, dtype=np.float32) / n_pts seg_vx.append(np.clip((x0 + (x1 - x0) * t).astype(np.int32), 0, CM)) seg_vy.append(np.clip((y0 + (y1 - y0) * t).astype(np.int32), 0, CM)) b0 = (ti[si] + 1) / trail_len b1 = (ti[si + 1] + 1) / trail_len seg_bright.append(b0 + (b1 - b0) * t) # Last point seg_vx.append(vx[-1]) seg_vy.append(vy[-1]) seg_bright.append((ti[-1] + 1) / trail_len) vx_interp = np.concatenate([np.atleast_1d(s) for s in seg_vx]) vy_interp = np.concatenate([np.atleast_1d(s) for s in seg_vy]) bright = np.concatenate([np.atleast_1d(s) for s in seg_bright]) else: vx_interp, vy_interp = vx, vy bright = np.array([(ti[0] + 1) / trail_len], dtype=np.float32) bright = bright ** 0.3 faded = (trail_colors[ei] * bright[:, None]).astype(np.uint8) all_vx.append(vx_interp) all_vy.append(vy_interp) all_cols.append(faded) if not all_vx: return vx = np.concatenate(all_vx) vy = np.concatenate(all_vy) cols = np.concatenate(all_cols) all_ys = np.clip(vy[:, None] + _TRAIL_DY, 0, canvas - 1).ravel() all_xs = np.clip(vx[:, None] + _TRAIL_DX, 0, canvas - 1).ravel() buf[all_ys, all_xs] = np.repeat(cols, 4, axis=0) def draw_kill_events(buf: np.ndarray, events: list[tuple], kill_target_spr: Sprite | None = None, canvas: int = CANVAS_MIN) -> None: CM = canvas - 1 full_line = np.array([255, 30, 30], dtype=np.float32) _LINE_OFFSETS = [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)] for (vx, vy, kx, ky, age_frac, label) in events: alpha = 1.0 - age_frac # Kill line from killer to victim if kx >= 0 and ky >= 0: pts = max(abs(vx - kx), abs(vy - ky)) + 1 if pts >= 2: t = np.arange(pts, dtype=np.float32) / (pts - 1) cx = (kx + (vx - kx) * t).astype(np.int32) cy = (ky + (vy - ky) * t).astype(np.int32) for odx, ody in _LINE_OFFSETS: xs = np.clip(cx + odx, 0, CM) ys = np.clip(cy + ody, 0, CM) bg = buf[ys, xs].astype(np.float32) buf[ys, xs] = (bg + (full_line - bg) * alpha).astype(np.uint8) # Target icon at victim position (replaces starburst) if kill_target_spr is not None: blit_alpha(buf, kill_target_spr, vx - kill_target_spr.w // 2, vy - kill_target_spr.h // 2, alpha) # Label if label is not None: label_alpha = 1.0 if age_frac < 0.8 else max(0.0, 1.0 - (age_frac - 0.8) / 0.2) blit_alpha(buf, label, vx - label.w // 2, vy - label.h - 14, label_alpha) def draw_damage_events(buf: np.ndarray, events: list[tuple], dmg_target_spr: Sprite | None = None, canvas: int = CANVAS_MIN) -> None: CM = canvas - 1 full_line = np.array([255, 255, 80], dtype=np.float32) for (ox, oy, vx, vy, age_frac) in events: alpha = 1.0 - age_frac pts = max(abs(vx - ox), abs(vy - oy)) + 1 if pts >= 2: t = np.arange(pts, dtype=np.float32) / (pts - 1) xs = np.clip((ox + (vx - ox) * t).astype(np.int32), 0, CM) ys = np.clip((oy + (vy - oy) * t).astype(np.int32), 0, CM) bg = buf[ys, xs].astype(np.float32) buf[ys, xs] = (bg + (full_line - bg) * alpha).astype(np.uint8) # Target icon at victim position if dmg_target_spr is not None: blit_alpha(buf, dmg_target_spr, vx - dmg_target_spr.w // 2, vy - dmg_target_spr.h // 2, alpha) # ── Per-frame render for one VideoCtx ───────────────────────────────────────── def render_one_ctx(fi: int, buf: np.ndarray, ctx: VideoCtx, shadow_color: np.ndarray, trail_f: int) -> None: """Render a single video frame into the provided buffer. Draws background, trails, damage/kill events, dots (alive + dead), drones, aircraft, and player name labels in compositing order. Args: fi: Frame index to render. buf: Mutable RGB numpy array, shape (canvas, canvas, 3). ctx: Pre-computed VideoCtx with all per-frame data. shadow_color: RGB color for dot shadows, shape (3,). trail_f: Number of trailing frames to draw behind each player. """ np.copyto(buf, ctx.bg_arr) # Dynamic capture-state fills (drawn on the map, beneath players) if ctx.cap_overlays: for ov in ctx.cap_overlays: _draw_cap_fill_np(buf, ov, fi) cv = ctx.canvas # Ground trails draw_all_trails_np(buf, fi, ctx.px_all, ctx.py_all, trail_f, ctx.trail_colors_arr, ctx.is_dead, ctx.death_frame, cv) # Drone trails (very short, line-interpolated) if ctx.n_drones and ctx.drone_trail_colors is not None: assert ctx.px_drone is not None and ctx.py_drone is not None draw_air_trails_np(buf, fi, ctx.px_drone, ctx.py_drone, ctx.drone_trail_f, ctx.drone_trail_colors, ctx.drone_is_crashed, ctx.drone_crash_frame, cv) # Aircraft trails (short, line-interpolated) if ctx.n_aircraft and ctx.air_trail_colors is not None: assert ctx.px_air is not None and ctx.py_air is not None draw_air_trails_np(buf, fi, ctx.px_air, ctx.py_air, ctx.air_trail_f, ctx.air_trail_colors, ctx.air_is_crashed, ctx.air_crash_frame, cv) if ctx.damages_by_frame[fi]: draw_damage_events(buf, ctx.damages_by_frame[fi], ctx.dmg_target_spr, cv) if ctx.kills_by_frame[fi]: draw_kill_events(buf, ctx.kills_by_frame[fi], ctx.kill_target_spr, cv) # Ground: dead icons + alive icons if ctx.player_icon_sprites: draw_dead_icon_sprites(buf, fi, ctx.player_icon_sprites, ctx.player_dead_sprites, ctx.is_dead, None, ctx.last_dead_px, ctx.last_dead_py) draw_icon_sprites(buf, fi, ctx.px_all, ctx.py_all, ctx.player_icon_sprites, is_dead=ctx.is_dead) else: draw_dead_dots_np(buf, fi, ctx.colors_arr, ctx.is_dead, ctx.death_fade, ctx.last_dead_px, ctx.last_dead_py, ctx.dot_dy, ctx.dot_dx, cv) draw_all_dots_np(buf, fi, ctx.px_all, ctx.py_all, ctx.colors_arr, shadow_color, ctx.is_dead, ctx.shadow_dy, ctx.shadow_dx, ctx.dot_dy, ctx.dot_dx, cv) # Drones: freeze at hit position, fade to transparent over ~2-3s if ctx.n_drones: assert ctx.px_drone is not None and ctx.py_drone is not None assert ctx.drone_colors_arr is not None if ctx.drone_icon_sprites: n_dr = ctx.px_drone.shape[0] for di in range(n_dr): is_hit = ctx.drone_is_hit is not None and ctx.drone_is_hit[di, fi] if is_hit: # Frozen at hit position, fading out if ctx.drone_last_dead_px is None or ctx.drone_last_dead_py is None or ctx.drone_hit_fade is None: continue dpx, dpy = int(ctx.drone_last_dead_px[di]), int(ctx.drone_last_dead_py[di]) fade = float(ctx.drone_hit_fade[di, fi]) if fade <= 0 or dpx < 0 or dpy < 0: continue else: # Alive — draw at current position dpx, dpy = int(ctx.px_drone[di, fi]), int(ctx.py_drone[di, fi]) fade = 1.0 if dpx < 0 or dpy < 0: continue if ctx.drone_rot_caches and ctx.drone_headings is not None and ctx.drone_headings[di, fi] >= 0: spr = ctx.drone_rot_caches[di][heading_to_rot_index(ctx.drone_headings[di, fi])] else: spr = ctx.drone_icon_sprites[di] x, y = dpx - spr.w // 2, dpy - spr.h // 2 if fade < 1.0: blit_alpha(buf, spr, x, y, fade) else: blit(buf, spr, x, y) else: # Fallback to dots if no icon sprites if ctx.drone_is_crashed is not None: draw_dead_entities_np(buf, fi, ctx.drone_colors_arr, ctx.drone_is_crashed, ctx.drone_hit_fade, # type: ignore[arg-type] ctx.drone_last_dead_px, ctx.drone_last_dead_py, # type: ignore[arg-type] ctx.drone_dy, ctx.drone_dx, cv) draw_drone_dots_np(buf, fi, ctx.px_drone, ctx.py_drone, ctx.drone_colors_arr, ctx.drone_is_crashed, ctx.drone_hit_fade, ctx.drone_dy, ctx.drone_dx, cv) # Aircraft: dead icons + alive icons if ctx.n_aircraft: assert ctx.px_air is not None and ctx.py_air is not None assert ctx.air_colors_arr is not None if ctx.air_icon_sprites: if ctx.air_is_crashed is not None: draw_dead_icon_sprites(buf, fi, ctx.air_icon_sprites, ctx.air_dead_sprites, ctx.air_is_crashed, None, ctx.air_last_dead_px, ctx.air_last_dead_py, # type: ignore[arg-type] rot_caches=ctx.air_rot_caches, dead_rot_caches=ctx.air_dead_rot_caches, headings=ctx.air_headings) draw_icon_sprites(buf, fi, ctx.px_air, ctx.py_air, ctx.air_icon_sprites, is_crashed=ctx.air_is_crashed, hit_fade=ctx.air_hit_fade, rot_caches=ctx.air_rot_caches, headings=ctx.air_headings) else: if ctx.air_is_crashed is not None: draw_dead_entities_np(buf, fi, ctx.air_colors_arr, ctx.air_is_crashed, ctx.air_hit_fade, # type: ignore[arg-type] ctx.air_last_dead_px, ctx.air_last_dead_py, # type: ignore[arg-type] ctx.air_dy, ctx.air_dx, cv) draw_aircraft_np(buf, fi, ctx.px_air, ctx.py_air, ctx.air_colors_arr, ctx.air_is_crashed, ctx.air_hit_fade, ctx.air_dy, ctx.air_dx, cv) items: list[tuple[Sprite, int, int]] = [] for i in range(ctx.n_players): px_i, py_i = int(ctx.px_all[i, fi]), int(ctx.py_all[i, fi]) if px_i >= 0 and py_i >= 0: ls = ctx.label_sprites[i] icon_w = ctx.player_icon_sprites[i].w if ctx.player_icon_sprites else ctx.dot_r * 2 items.append((ls, px_i + icon_w // 2 + 3, py_i - ls.h // 2)) if ctx.n_drones: assert ctx.px_drone is not None and ctx.py_drone is not None for i in range(ctx.n_drones): px_i, py_i = int(ctx.px_drone[i, fi]), int(ctx.py_drone[i, fi]) if px_i >= 0 and py_i >= 0: if ctx.drone_is_crashed is not None and ctx.drone_is_crashed[i, fi]: continue ds = ctx.drone_sprites[i] items.append((ds, px_i + ctx.drone_r + 2, py_i - ds.h // 2)) if ctx.n_aircraft: assert ctx.px_air is not None and ctx.py_air is not None assert ctx.air_alt is not None for i in range(ctx.n_aircraft): px_i, py_i = int(ctx.px_air[i, fi]), int(ctx.py_air[i, fi]) if px_i >= 0 and py_i >= 0: if ctx.air_is_crashed is not None and ctx.air_is_crashed[i, fi]: continue # Name+model label to the right ls = ctx.air_sprites[i] air_icon_w = ctx.air_icon_sprites[i].w if ctx.air_icon_sprites else ctx.air_r * 2 items.append((ls, px_i + air_icon_w // 2 + 3, py_i - ls.h // 2)) # Altitude label to the left alt_m = int(ctx.air_alt[i, fi]) if alt_m >= 0: alt_key = alt_m // 10 * 10 alt_spr = ctx.air_alt_sprites.get(alt_key) if alt_spr is not None: items.append((alt_spr, px_i - air_icon_w // 2 - 3 - alt_spr.w, py_i - alt_spr.h // 2)) if items: blit_batch(buf, items) def _get_thread_buf(tmpl: np.ndarray) -> np.ndarray: """Thread-local render buffer.""" if not hasattr(_tl, "buf"): _tl.buf = np.empty_like(tmpl) return _tl.buf # ── FFmpeg / font ────────────────────────────────────────────────────────────── def open_ffmpeg(output_path: Path, fps: int, w: int = CANVAS_MIN, h: int = CANVAS_MIN) -> subprocess.Popen: """Open an FFmpeg subprocess that reads raw RGB24 frames from stdin and encodes to H.264 MP4. Args: output_path: Destination MP4 file path. fps: Frames per second for the output video. w: Frame width in pixels. h: Frame height in pixels. Returns: A Popen handle whose stdin accepts raw RGB24 frame bytes. """ return subprocess.Popen([ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{w}x{h}", "-pix_fmt", "rgb24", "-r", str(fps), "-i", "pipe:0", "-vcodec", "libx264", "-preset", "ultrafast", "-crf", "34", "-pix_fmt", "yuv420p", "-threads", "4", "-movflags", "+faststart", str(output_path), ], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def load_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont: for p in ["/usr/share/fonts/TTF/DejaVuSans.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/dejavu/DejaVuSans.ttf"]: if Path(p).exists(): return ImageFont.truetype(p, size) return ImageFont.load_default() # ── Helpers ──────────────────────────────────────────────────────────────────── def assign_colors(active: list[dict], team_won: int ) -> tuple[np.ndarray, np.ndarray]: """Assign per-player RGB colors based on team membership (winner vs loser palette). Args: active: Active player dicts, each with a 'Team' key. team_won: The winning team index. Returns: Tuple of (colors_arr, trail_colors_arr), both uint8 arrays of shape (n_players, 3). Trail colors are dimmed to 60% brightness. """ colors_list = [] for p in active: if p["Team"] == team_won: colors_list.append(WIN_COLOR) else: colors_list.append(LOSE_COLOR) colors_arr = np.array(colors_list, dtype=np.uint8) return colors_arr, (colors_arr * 0.6).astype(np.uint8) def make_bg(map_pil: Image.Image) -> np.ndarray: return np.asarray(map_pil).copy() # ── Public render function ───────────────────────────────────────────────────── def render_gob( d: dict, out_path: Path, fps: int = FPS, speed: float = SPEED, n_workers: int = N_WORKERS, progress_cb: Optional[Callable[[int], None]] = None, ) -> None: """ Render a replay dict to an MP4 file. Args: d: Parsed replay dict (normalized by load_gob_file) out_path: Output MP4 path fps: Frames per second speed: Playback speed multiplier n_workers: Thread pool size for parallel frame render progress_cb: Optional callback called with int 0-100 during render """ players_by_id = {p["PlayerID"]: p for p in d["Players"]} team_won = d["TeamWon"] def build_active(entities: list[dict]) -> list[dict]: out = [] for e in entities: if e.get("PlayerID", 0) == 0: continue p = dict(players_by_id[e["PlayerID"]]) p["_samples"] = e["Path"] p["_model"] = e["ModelName"] out.append(p) return out ground_ents = [e for e in d["Entities"] if e.get("PlayerID", 0) != 0 and e["ModelName"].startswith("tankModels/")] active = build_active(ground_ents) if not active: raise ValueError("No ground unit entities found in replay.") print(f"Ground : {len(active)}") # Time grid all_t = [s["Time"] for e in ground_ents for s in e["Path"]] t_start = float(min(all_t)) t_end = float(max(all_t)) ms_per_frame = (1000.0 / fps) * speed n_frames = int(math.ceil((t_end - t_start) / ms_per_frame)) frame_times = t_start + np.arange(n_frames, dtype=np.float64) * ms_per_frame trail_f = max(1, int(TRAIL_MS * fps / 1000.0)) duration_s = (t_end - t_start) / 1000.0 print(f"Duration : {duration_s:.1f}s → ~{n_frames/fps:.0f}s video ({n_frames} frames)") # Load mode/scenario info from replay mission block mission = d.get("Mission", {}) level = mission.get("Level", "") level_settings_path = mission.get("LevelSettings", "") battle_type = mission.get("BattleType", "") mission_def, mission_def_path = load_mission_def(level_settings_path) use_alt_map_coord = _mission_use_alt_map_coord(mission_def) level_override = _mission_level_override(mission_def) if level_override: level = level_override if mission_def_path: print(f"MissionDef : {mission_def_path}") if level_settings_path: print(f"BattleType : {battle_type or '(n/a)'}") print(f"AltMapCoord : {'on' if use_alt_map_coord else 'off'}") # Load map coordinate bounds from local level defs session_id = d.get("SessionID", 0) print("Loading LevelDef …", end=" ", flush=True) level_def = load_level_coords(level, session_id) if not level_def: raise ValueError(f"Missing local LevelDef for level '{level}'") canvas = CANVAS_MIN base_tc0, base_tc1, _ = select_tank_coords(level_def, use_alt_map_coord) tc0, tc1, coord_src = resolve_world_bounds( level_def, use_alt_map_coord, mission_def, mission_def_path, battle_type, ) tc0, tc1, coord_src = _fit_world_bounds_to_ground_activity( tc0, tc1, coord_src, mission_def, mission_def_path, battle_type, ground_ents, ) tc0, tc1 = _expand_bounds_by_pixels(tc0, tc1, canvas=canvas, pad_px=MAP_PAD_PX) tc0, tc1 = _clamp_bounds_to_base(tc0, tc1, base_tc0, base_tc1) print(f"ok ({coord_src}) X=[{tc0[0]}, {tc1[0]}] Z=[{tc0[1]}, {tc1[1]}]") capture_areas = resolve_capture_areas(mission_def, mission_def_path, battle_type) if capture_areas: print("Capture : " + ", ".join( f"{c['name']}({c['type']})" for c in capture_areas )) xfm = CoordTransform(tc0[0], tc0[1], tc1[0], tc1[1], canvas=canvas) print(f"Canvas : {canvas}px (full tank map)") print("Pre-computing positions …", end=" ", flush=True) px, py = precompute_positions(active, xfm, frame_times) print("done") kills = [k for k in d.get("Kills", []) if t_start <= k["Time"] <= t_end] damages = [dr for dr in d.get("DamageReports", []) if t_start <= dr["Time"] <= t_end] print(f"Events : {len(kills)} kills, {len(damages)} damage reports") is_dead, death_frame, death_fade, last_dead_px, last_dead_py = precompute_deaths( active, kills, t_start, ms_per_frame, n_frames, fps, px, py) wipe_f = find_team_wipe_frame(active, is_dead, n_frames) end_frame = min(wipe_f + WIPE_GRACE, n_frames) if wipe_f < n_frames: print(f"Team wipe : frame {wipe_f} (~{wipe_f/fps:.1f}s video)") colors, trail_colors = assign_colors(active, team_won) shadow_color = np.array([0, 0, 0], dtype=np.uint8) drones = build_drone_list(d, active, xfm, px, py, frame_times, t_start, ms_per_frame, colors) n_drones = len(drones) if n_drones: px_dr, py_dr = precompute_positions(drones, xfm, frame_times) drone_alt = precompute_altitudes(drones, frame_times) drone_cols = np.array([dr["color"] for dr in drones], dtype=np.uint8) drone_trail_cols = (drone_cols * 0.6).astype(np.uint8) # Match drone kills by EntityIndex (since PlayerID=0) drone_is_hit, drone_hit_fade, drone_is_crashed, drone_crash_frame, \ drone_last_dead_px, drone_last_dead_py = \ precompute_air_deaths(drones, kills, t_start, ms_per_frame, n_frames, fps, px_dr, py_dr) # Override: drones freeze at kill/path-end and fade to transparent over ~2.5s drone_fade_frames = max(1, int(2.5 * fps)) for i in range(len(drones)): hit_frames = np.where(drone_is_hit[i])[0] if len(hit_frames): # Kill-matched: freeze at kill frame hf = hit_frames[0] else: # Unmatched: freeze at path end cf = int(drone_crash_frame[i]) if cf >= n_frames: continue # path extends beyond video, drone stays alive hf = cf # Apply 2.5s fade from freeze point drone_is_hit[i, :] = False drone_is_hit[i, hf:] = True n_slots = min(drone_fade_frames + 1, n_frames - hf) drone_hit_fade[i, :] = 1.0 drone_hit_fade[i, hf:hf + n_slots] = np.maximum( 0.0, 1.0 - np.arange(n_slots, dtype=np.float32) / drone_fade_frames) drone_hit_fade[i, hf + n_slots:] = 0.0 # Freeze position at hit/end frame drone_last_dead_px[i] = px_dr[i, min(hf, px_dr.shape[1] - 1)] drone_last_dead_py[i] = py_dr[i, min(hf, py_dr.shape[1] - 1)] # Also stop the trail at this frame drone_crash_frame[i] = hf drone_is_crashed[i, :] = False if hf < n_frames - 1: drone_is_crashed[i, hf + 1:] = True print(f"Drones : {n_drones}") else: px_dr = py_dr = drone_cols = drone_trail_cols = drone_alt = None drone_is_hit = drone_hit_fade = drone_is_crashed = drone_crash_frame = None drone_last_dead_px = drone_last_dead_py = None # Aircraft — non-tank, non-drone entities with path data air_ents = [e for e in d["Entities"] if e.get("PlayerID", 0) != 0 and not e["ModelName"].startswith("tankModels/") and "ucav" not in e["ModelName"].lower() and e.get("Path")] air_active = build_active(air_ents) n_aircraft = len(air_active) if n_aircraft: px_air, py_air = precompute_positions(air_active, xfm, frame_times) air_alt = precompute_altitudes(air_active, frame_times) # Air death: keep flying after hit, fade to black, crash at end of path air_is_hit, air_hit_fade, air_is_crashed, air_crash_frame, \ air_last_dead_px, air_last_dead_py = \ precompute_air_deaths(air_active, kills, t_start, ms_per_frame, n_frames, fps, px_air, py_air) # Assign colors: match team using same win/lose scheme air_cols_list = [] for p in air_active: if p["Team"] == team_won: air_cols_list.append(WIN_COLOR) else: air_cols_list.append(LOSE_COLOR) air_cols = np.array(air_cols_list, dtype=np.uint8) air_trail_cols = (air_cols * 0.6).astype(np.uint8) print(f"Air : {n_aircraft}") else: px_air = py_air = air_cols = air_trail_cols = air_alt = None air_is_hit = air_hit_fade = air_is_crashed = air_crash_frame = None air_last_dead_px = air_last_dead_py = None print("Loading tank map …", end=" ", flush=True) map_img = load_map_image( level, level_def, battle_type=battle_type, level_settings_path=level_settings_path, base_coords=(base_tc0, base_tc1), render_coords=(tc0, tc1), canvas=canvas, ) if not map_img: print("not found") raise ValueError(f"No local tankmap image for level '{level}'") map_img = _draw_capture_areas(map_img, capture_areas, tc0, tc1) cap_overlays = _precompute_cap_overlays( capture_areas, d.get("Zones", {}), _to_int(d.get("WinnerSlot"), 0), frame_times, tc0, tc1, canvas) if cap_overlays: print(f"Cap state : {len(cap_overlays)} zones with live ownership") print("ok") bg = make_bg(map_img) # Scale icons and text — reference size is 1024px (original design target) scale = canvas / 1024.0 s_dot_r = max(2, int(DOT_R * scale + 0.5)) s_drone_r = max(1, int(DRONE_R * scale + 0.5)) s_font_sz = max(8, int(11 * scale + 0.5)) s_air_r = max(2, int(AIR_R * scale + 0.5)) s_shadow_dy, s_shadow_dx = make_circle_masks(s_dot_r + 1) s_dot_dy, s_dot_dx = make_circle_masks(s_dot_r) s_drone_dy, s_drone_dx = make_circle_masks(s_drone_r) s_air_dy, s_air_dx = make_triangle_masks(s_air_r) font = load_font(s_font_sz) kbf = precompute_kills(kills, xfm, t_start, ms_per_frame, n_frames, fps, font, offset_x=0, offset_y=0) dbf = precompute_damages(damages, active, px, py, t_start, ms_per_frame, n_frames, fps) name_sprs = make_name_sprites([p["Name"] for p in active], font) model_sprs = make_name_sprites([_short_model(p["_model"]) for p in active], font) label_sprs = [merge_sprites(ns, ms) for ns, ms in zip(name_sprs, model_sprs)] drone_spr = make_name_sprites(["(Drone)"] * n_drones, font) # Build altitude sprite cache — shared between drones and aircraft all_alt_vals: set[int] = set() if drone_alt is not None: all_alt_vals.update(int(v) // 10 * 10 for v in drone_alt[drone_alt >= 0]) if air_alt is not None: all_alt_vals.update(int(v) // 10 * 10 for v in air_alt[air_alt >= 0]) alt_sprites: dict[int, Sprite] = {} for a in all_alt_vals: sprs = make_name_sprites([f"(Alt: {a}m)"], font) alt_sprites[a] = sprs[0] # Build per-player vehicle icon sprites (tinted to player color) icon_size = max(12, int(TANK_ICON_SIZE * scale + 0.5)) hl_pad = max(1, int(ICON_HIGHLIGHT_PAD * scale + 0.5)) player_icon_sprs: list[Sprite] = [] player_icon_keys: list[str] = [] for i, p in enumerate(active): ik = get_icon_key(p["_model"]) player_icon_keys.append(ik) c = (int(colors[i][0]), int(colors[i][1]), int(colors[i][2])) player_icon_sprs.append(make_tinted_icon_sprite( ik, c, icon_size, highlight_pad=hl_pad, highlight_color=ICON_HIGHLIGHT_COLOR)) # Dead sprites: bare black silhouette (no highlight glow) player_dead_sprs = [make_bare_black_sprite(ik, icon_size, hl_pad) for ik in player_icon_keys] print(f"Icons : {len(player_icon_sprs)} ground @ {icon_size}px (+{hl_pad}px highlight)") # Build per-aircraft icon sprites air_icon_sprs: list[Sprite] = [] if n_aircraft: assert air_cols is not None air_name_sprs = make_name_sprites([p["Name"] for p in air_active], font) air_model_sprs = make_name_sprites([_short_model(p["_model"]) for p in air_active], font) air_label_sprs = [merge_sprites(ns, ms) for ns, ms in zip(air_name_sprs, air_model_sprs)] air_icon_size = max(10, int(AIR_ICON_SIZE * scale + 0.5)) for i, p in enumerate(air_active): ik = get_icon_key(p["_model"]) c = (int(air_cols[i][0]), int(air_cols[i][1]), int(air_cols[i][2])) air_icon_sprs.append(make_tinted_icon_sprite( ik, c, air_icon_size, highlight_pad=hl_pad, highlight_color=ICON_HIGHLIGHT_COLOR)) air_dead_sprs = [make_black_sprite(s) for s in air_icon_sprs] air_rot_caches = [make_rotation_cache(s) for s in air_icon_sprs] air_dead_rot_caches = [make_rotation_cache(s) for s in air_dead_sprs] assert px_air is not None and py_air is not None air_headings = precompute_headings(px_air, py_air) print(f"Icons : {n_aircraft} aircraft @ {air_icon_size}px (+{hl_pad}px highlight, {ROTATION_STEPS} rotations)") else: air_label_sprs = [] air_dead_sprs = [] air_rot_caches = [] air_dead_rot_caches = [] air_headings = None # Build per-drone icon sprites with rotation (like aircraft) drone_icon_sprs: list[Sprite] = [] drone_dead_sprs_list: list[Sprite] = [] drone_rot_caches: list = [] drone_dead_rot_caches: list = [] drone_headings_arr: np.ndarray | None = None drone_icon_keys: list[str] = [] if n_drones: drone_icon_size = max(10, int(AIR_ICON_SIZE * scale + 0.5)) for dr in drones: ik = get_icon_key(dr["entity"]["ModelName"]) drone_icon_keys.append(ik) c = (int(dr["color"][0]), int(dr["color"][1]), int(dr["color"][2])) drone_icon_sprs.append(make_tinted_icon_sprite( ik, c, drone_icon_size, highlight_pad=hl_pad, highlight_color=ICON_HIGHLIGHT_COLOR)) # Drones: bare black silhouette (no glow) when dead drone_dead_sprs_list = [make_bare_black_sprite(ik, drone_icon_size, hl_pad) for ik in drone_icon_keys] drone_rot_caches = [make_rotation_cache(s) for s in drone_icon_sprs] drone_dead_rot_caches = [make_rotation_cache(s) for s in drone_dead_sprs_list] if px_dr is not None and py_dr is not None: drone_headings_arr = precompute_headings(px_dr, py_dr) print(f"Icons : {n_drones} drones @ {drone_icon_size}px (+{hl_pad}px highlight, {ROTATION_STEPS} rotations)") # Load target icons for kill/damage events target_size = max(12, int(20 * scale + 0.5)) kill_target = load_target_sprite("target_red.png", target_size) dmg_target = load_target_sprite("target_yellow.png", target_size) air_trail_f = max(1, int(AIR_TRAIL_MS * fps / 1000.0)) drone_trail_f = max(1, int(DRONE_TRAIL_MS * fps / 1000.0)) ctx = VideoCtx( n_players=len(active), px_all=px, py_all=py, colors_arr=colors, trail_colors_arr=trail_colors, is_dead=is_dead, death_frame=death_frame, death_fade=death_fade, last_dead_px=last_dead_px, last_dead_py=last_dead_py, kills_by_frame=kbf, damages_by_frame=dbf, label_sprites=label_sprs, bg_arr=bg, end_frame=end_frame, n_drones=n_drones, px_drone=px_dr, py_drone=py_dr, drone_colors_arr=drone_cols, drone_trail_colors=drone_trail_cols, drone_sprites=drone_spr, drone_icon_sprites=drone_icon_sprs, drone_dead_sprites=drone_dead_sprs_list, drone_rot_caches=drone_rot_caches, drone_dead_rot_caches=drone_dead_rot_caches, drone_headings=drone_headings_arr, drone_is_hit=drone_is_hit, drone_hit_fade=drone_hit_fade, drone_is_crashed=drone_is_crashed, drone_crash_frame=drone_crash_frame, drone_last_dead_px=drone_last_dead_px, drone_last_dead_py=drone_last_dead_py, drone_alt=drone_alt, drone_alt_sprites=alt_sprites, n_aircraft=n_aircraft, px_air=px_air, py_air=py_air, air_colors_arr=air_cols, air_trail_colors=air_trail_cols, air_sprites=air_label_sprs, air_is_hit=air_is_hit, air_hit_fade=air_hit_fade, air_is_crashed=air_is_crashed, air_crash_frame=air_crash_frame, air_last_dead_px=air_last_dead_px, air_last_dead_py=air_last_dead_py, air_alt=air_alt, air_alt_sprites=alt_sprites, air_trail_f=air_trail_f, drone_trail_f=drone_trail_f, dot_dy=s_dot_dy, dot_dx=s_dot_dx, shadow_dy=s_shadow_dy, shadow_dx=s_shadow_dx, drone_dy=s_drone_dy, drone_dx=s_drone_dx, air_dy=s_air_dy, air_dx=s_air_dx, dot_r=s_dot_r, drone_r=s_drone_r, air_r=s_air_r, canvas=canvas, player_icon_sprites=player_icon_sprs, player_dead_sprites=player_dead_sprs, air_icon_sprites=air_icon_sprs, air_dead_sprites=air_dead_sprs, air_rot_caches=air_rot_caches, air_dead_rot_caches=air_dead_rot_caches, air_headings=air_headings, kill_target_spr=kill_target, dmg_target_spr=dmg_target, cap_overlays=cap_overlays, ) shared = (shadow_color, trail_f) def render_frame(fi: int) -> bytes: buf = _get_thread_buf(ctx.bg_arr) render_one_ctx(fi, buf, ctx, *shared) return buf.tobytes() ff = open_ffmpeg(out_path, fps, canvas, canvas) assert ff.stdin is not None fd = ff.stdin.fileno() BATCH = n_workers * 8 with ThreadPoolExecutor(max_workers=n_workers) as pool: for chunk_start in range(0, end_frame, BATCH): chunk_end = min(chunk_start + BATCH, end_frame) futs = [pool.submit(render_frame, fi) for fi in range(chunk_start, chunk_end)] for fut in futs: os.write(fd, fut.result()) if progress_cb is not None: pct = int(chunk_end / end_frame * 100) progress_cb(pct) ff.stdin.close() ff.wait() sz = out_path.stat().st_size / 1_048_576 print(f"\nDone → {out_path} ({sz:.1f} MB)") # ── Replay loading helpers ──────────────────────────────────────────────────── def _to_int(value: Any, default: int = 0) -> int: try: return int(value) except (TypeError, ValueError): return default def _unit_to_model_name(unit_name: str) -> str: internal = (unit_name or "").strip() if not internal: return "tankModels/unknown" tags = _get_unit_tags(internal) or [] tag_set = set(tags) if "type_strike_ucav" in tag_set or "ucav" in internal.lower(): return f"airModels/{internal}_ucav" if tag_set & { "air", "aircraft", "helicopter", "type_jet_bomber", "type_bomber", "type_strike_aircraft", "type_jet_fighter", "type_fighter", "type_helicopter", }: return f"airModels/{internal}" return f"tankModels/{internal}" def _path_sample_to_dict(sample: Any) -> dict[str, float] | None: if isinstance(sample, dict): if {"Time", "X", "Z"}.issubset(sample.keys()): return { "Time": float(sample.get("Time", 0.0)), "X": float(sample.get("X", 0.0)), "Y": float(sample.get("Y", 0.0)), "Z": float(sample.get("Z", 0.0)), } if {"t", "x", "z"}.issubset(sample.keys()): return { "Time": float(sample.get("t", 0.0)), "X": float(sample.get("x", 0.0)), "Y": float(sample.get("y", 0.0)), "Z": float(sample.get("z", 0.0)), } return None if isinstance(sample, (list, tuple)) and len(sample) >= 4: return { "Time": float(sample[0]), "X": float(sample[1]), "Y": float(sample[2]), "Z": float(sample[3]), } return None def _position_at_time(path: list[dict[str, float]], time_ms: float) -> dict[str, float] | None: if not path: return None if time_ms <= path[0]["Time"]: return path[0] prev = path[0] for pt in path[1:]: if pt["Time"] >= time_ms: return pt prev = pt return prev def _convert_ws_replay_to_render_dict(replay: dict[str, Any]) -> dict[str, Any]: players_src = replay.get("players") or {} if not isinstance(players_src, dict): players_src = {} players_out: list[dict[str, Any]] = [] winner_tag = str(replay.get("winner") or "") loser_tag = str(replay.get("loser") or "") winner_team = 0 loser_team = 0 for uid_str, pdata in players_src.items(): if not isinstance(pdata, dict): continue pid = _to_int(pdata.get("uid") or uid_str, 0) team = _to_int(pdata.get("team"), 0) tag = str(pdata.get("tag") or "") if winner_team == 0 and tag == winner_tag: winner_team = team if loser_team == 0 and tag == loser_tag: loser_team = team players_out.append({ "PlayerID": pid, "Name": str(pdata.get("name") or f"Player#{pid}"), "Team": team, "Clan": tag, }) if winner_team == 0: winner_team = 1 if loser_team != 1 else 2 entities_src = replay.get("entities") or [] if not isinstance(entities_src, list): entities_src = [] entities_out: list[dict[str, Any]] = [] uid_to_entity_index: dict[int, int] = {} for idx, ent in enumerate(entities_src, start=1): if not isinstance(ent, dict): continue uid = _to_int(ent.get("uid"), 0) unit = str(ent.get("unit") or "") path_raw = ent.get("path") or [] if not isinstance(path_raw, list): continue path: list[dict[str, float]] = [] for sample in path_raw: parsed = _path_sample_to_dict(sample) if parsed is not None: path.append(parsed) if not path: continue entity_index = _to_int(ent.get("entity_index") or ent.get("entityIndex"), idx) uid_to_entity_index.setdefault(uid, entity_index) entities_out.append({ "EntityIndex": entity_index, "PlayerID": uid, "ModelName": _unit_to_model_name(unit), "Path": path, }) entity_paths_by_uid: dict[int, list[dict[str, float]]] = { e["PlayerID"]: e["Path"] for e in entities_out if e.get("PlayerID") } events = replay.get("events") or {} if isinstance(events, str): try: events = json.loads(events) except json.JSONDecodeError: events = {} if not isinstance(events, dict): events = {} kills_out: list[dict[str, Any]] = [] for kill in (events.get("kills") or []): if not isinstance(kill, dict): continue victim_id = _to_int(kill.get("offended_uid"), 0) killer_id = _to_int(kill.get("offender_uid"), 0) kill_time = float(kill.get("time") or 0.0) victim_path = entity_paths_by_uid.get(victim_id, []) killer_path = entity_paths_by_uid.get(killer_id, []) victim_pos = _position_at_time(victim_path, kill_time) killer_pos = _position_at_time(killer_path, kill_time) payload: dict[str, Any] = { "Time": kill_time, "VictimID": victim_id, "KillerID": killer_id, "VictimEntityIndex": uid_to_entity_index.get(victim_id, 0), "Weapon": str(kill.get("used_weapon") or kill.get("weapon") or ""), "VictimModel": _unit_to_model_name(str(kill.get("offended_unit") or "")), "KillerModel": _unit_to_model_name(str(kill.get("offender_unit") or "")), "crashed": bool(kill.get("crashed", False)), } if victim_pos: payload["VictimPosition"] = { "X": float(victim_pos["X"]), "Y": float(victim_pos["Y"]), "Z": float(victim_pos["Z"]), } if killer_pos: payload["KillerPosition"] = { "X": float(killer_pos["X"]), "Y": float(killer_pos["Y"]), "Z": float(killer_pos["Z"]), } kills_out.append(payload) damages_out: list[dict[str, Any]] = [] for dmg in (events.get("damage") or []): if not isinstance(dmg, dict): continue damages_out.append({ "Time": float(dmg.get("time") or 0.0), "OffenderID": _to_int(dmg.get("offender_uid"), 0), "OffendedID": _to_int(dmg.get("offended_uid"), 0), "OffenderModel": _unit_to_model_name(str(dmg.get("offender_unit") or "")), "OffendedModel": _unit_to_model_name(str(dmg.get("offended_unit") or "")), "Afire": bool(dmg.get("afire", False)), }) mission_mode = str(replay.get("mission_mode") or "") difficulty = str(replay.get("difficulty") or "") battle_type = mission_mode or difficulty # Zones (positive cap == team slot 1) and tickets (keyed by slot) are kept in # their original slot space; readers compare the owning slot to WinnerSlot. # center/radius are dropped — geometry comes from the mission .blk. zones_src = replay.get("zones") or {} tickets_src = replay.get("tickets") or {} zones_out = { letter: zdata.get("cap", []) for letter, zdata in zones_src.items() if isinstance(zdata, dict) } if isinstance(zones_src, dict) else {} tickets_out = tickets_src if isinstance(tickets_src, dict) else {} team_slot_names = {} if winner_team: team_slot_names[str(winner_team)] = winner_tag if loser_team: team_slot_names[str(loser_team)] = loser_tag return { "SessionID": _to_int(replay.get("_id") or replay.get("id"), 0), "TeamWon": winner_team, "Mission": { "Level": str(replay.get("level_path") or ""), "LevelSettings": str(replay.get("mission_path") or ""), "BattleType": battle_type, }, "Players": players_out, "Entities": entities_out, "Kills": kills_out, "DamageReports": damages_out, "Zones": zones_out, "Tickets": tickets_out, "WinnerSlot": winner_team, "TeamSlotNames": team_slot_names, } def _convert_local_replay_to_render_dict(replay: dict[str, Any]) -> dict[str, Any]: teams_src = replay.get("teams") or [] if not isinstance(teams_src, list): teams_src = [] players_out: list[dict[str, Any]] = [] winner_sq = str(replay.get("winning_team_squadron") or "") winner_team = 1 for idx, team in enumerate(teams_src[:2], start=1): if not isinstance(team, dict): continue team_sq = str(team.get("squadron") or "") if team_sq and team_sq == winner_sq: winner_team = idx for p in (team.get("players") or []): if not isinstance(p, dict): continue pid = _to_int(p.get("uid"), 0) if pid <= 0: continue players_out.append({ "PlayerID": pid, "Name": str(p.get("nick") or f"Player#{pid}"), "Team": idx, "Clan": str(team.get("squadron_tagged") or team_sq), }) entities_src = replay.get("entities") or [] if not isinstance(entities_src, list): entities_src = [] entities_out: list[dict[str, Any]] = [] uid_to_entity_index: dict[int, int] = {} for idx, ent in enumerate(entities_src, start=1): if not isinstance(ent, dict): continue uid = _to_int(ent.get("uid"), 0) unit = str(ent.get("unit") or "") path_raw = ent.get("path") or [] if not isinstance(path_raw, list): continue path: list[dict[str, float]] = [] for sample in path_raw: parsed = _path_sample_to_dict(sample) if parsed is not None: path.append(parsed) if not path: continue entity_index = _to_int(ent.get("entity_index") or ent.get("entityIndex"), idx) uid_to_entity_index.setdefault(uid, entity_index) entities_out.append({ "EntityIndex": entity_index, "PlayerID": uid, "ModelName": _unit_to_model_name(unit), "Path": path, }) entity_paths_by_uid: dict[int, list[dict[str, float]]] = { e["PlayerID"]: e["Path"] for e in entities_out if e.get("PlayerID") } events = replay.get("events") or {} if isinstance(events, str): try: events = json.loads(events) except json.JSONDecodeError: events = {} if not isinstance(events, dict): events = {} kills_out: list[dict[str, Any]] = [] for kill in (events.get("kills") or []): if not isinstance(kill, dict): continue victim_id = _to_int(kill.get("offended_uid"), 0) killer_id = _to_int(kill.get("offender_uid"), 0) kill_time = float(kill.get("time") or 0.0) victim_path = entity_paths_by_uid.get(victim_id, []) killer_path = entity_paths_by_uid.get(killer_id, []) victim_pos = _position_at_time(victim_path, kill_time) killer_pos = _position_at_time(killer_path, kill_time) payload: dict[str, Any] = { "Time": kill_time, "VictimID": victim_id, "KillerID": killer_id, "VictimEntityIndex": uid_to_entity_index.get(victim_id, 0), "Weapon": str(kill.get("used_weapon") or kill.get("weapon") or ""), "VictimModel": _unit_to_model_name(str(kill.get("offended_unit") or "")), "KillerModel": _unit_to_model_name(str(kill.get("offender_unit") or "")), "crashed": bool(kill.get("crashed", False)), } if victim_pos: payload["VictimPosition"] = { "X": float(victim_pos["X"]), "Y": float(victim_pos["Y"]), "Z": float(victim_pos["Z"]), } if killer_pos: payload["KillerPosition"] = { "X": float(killer_pos["X"]), "Y": float(killer_pos["Y"]), "Z": float(killer_pos["Z"]), } kills_out.append(payload) damages_out: list[dict[str, Any]] = [] for dmg in (events.get("damage") or []): if not isinstance(dmg, dict): continue damages_out.append({ "Time": float(dmg.get("time") or 0.0), "OffenderID": _to_int(dmg.get("offender_uid"), 0), "OffendedID": _to_int(dmg.get("offended_uid"), 0), "OffenderModel": _unit_to_model_name(str(dmg.get("offender_unit") or "")), "OffendedModel": _unit_to_model_name(str(dmg.get("offended_unit") or "")), "Afire": bool(dmg.get("afire", False)), }) # Zones (letter -> cap timeline, positive == slot 1), tickets (slot -> timeline), # the winning slot, and slot -> name all stay in slot space for the readers. zones_out = replay.get("zones") if isinstance(replay.get("zones"), dict) else {} tickets_out = replay.get("tickets") if isinstance(replay.get("tickets"), dict) else {} team_slot_names = replay.get("team_slot_names") if isinstance(replay.get("team_slot_names"), dict) else {} return { "SessionID": _to_int(replay.get("session_id_dec") or replay.get("session_id_hex"), 0), "TeamWon": winner_team, "Mission": { "Level": str(replay.get("level_path") or ""), "LevelSettings": str(replay.get("mission_path") or ""), "BattleType": str(replay.get("mode") or replay.get("difficulty") or ""), }, "Players": players_out, "Entities": entities_out, "Kills": kills_out, "DamageReports": damages_out, "Zones": zones_out, "Tickets": tickets_out, "WinnerSlot": _to_int(replay.get("winner_team_slot"), 0), "TeamSlotNames": team_slot_names, } def load_gob_file(replay_path: Path) -> dict[str, Any]: """Load a replay .json or .json.gz file and normalize it for render/export routines.""" import gzip as _gzip raw = replay_path.read_bytes() if replay_path.suffix == ".gz": raw = _gzip.decompress(raw) data = json.loads(raw.decode("utf-8")) if isinstance(data, dict) and {"Players", "Entities", "Mission"}.issubset(data.keys()): return data if isinstance(data, dict): if {"teams", "events", "entities"}.issubset(data.keys()): return _convert_local_replay_to_render_dict(data) return _convert_ws_replay_to_render_dict(data) raise ValueError(f"Unsupported replay payload in {replay_path}") # ── JSON export (slim dict for the web canvas replay viewer) ────────────────── def _entity_type(model_name: str) -> str: if model_name.startswith("tankModels/"): return "ground" if "ucav" in model_name.lower(): return "drone" return "aircraft" _TAG_TO_ICON_KEY = [ ("type_spaa", "spaa"), ("type_light_tank", "light"), ("type_tank_destroyer", "tank_destroyer"), ("type_heavy_tank", "heavy"), ("type_medium_tank", "medium"), ("type_missile_tank", "tank_destroyer"), ("type_bomber", "bomber_icon"), ("type_strike_aircraft", "fighter_icon"), ("type_jet_fighter", "jet_icon"), ("type_fighter", "fighter_icon"), ("type_strike_ucav", "drone"), ("type_helicopter", "helicopter_icon"), ] def _vehicle_icon_key(model_name: str) -> str: """Return an icon key for ground vehicles: light, medium, heavy, spaa, tank_destroyer, drone. Aircraft return tag-based fallback keys (fighter_icon, bomber_icon, etc.).""" internal = model_name.split("/")[-1] if "ucav" in model_name.lower(): return "drone" tags = _get_unit_tags(internal) if tags: tag_set = set(tags) for tag, icon in _TAG_TO_ICON_KEY: if tag in tag_set: return icon return "medium" def _vehicle_mini_icon(model_name: str) -> str | None: """Return the mini icon filename for aircraft (e.g. 'spitfire_ix_ico'), or None.""" internal = model_name.split("/")[-1] mini_path = Path(__file__).resolve().parent / "ICONS" / "MINIS" / f"{internal}_ico.png" if mini_path.exists(): return f"mini:{internal}_ico" return None def _subsample_path(path: list[dict], threshold: float = 1.0) -> list[dict]: """Keep first, last, and points that moved >= threshold world units.""" if len(path) <= 2: return path out = [path[0]] lx, lz = path[0]["X"], path[0]["Z"] for pt in path[1:-1]: dx = pt["X"] - lx dz = pt["Z"] - lz if dx * dx + dz * dz >= threshold * threshold: out.append(pt) lx, lz = pt["X"], pt["Z"] out.append(path[-1]) return out def _resolve_drone_team(drone_entity: dict, ground_entities: list[dict], players_by_id: dict) -> int: """Return team index for a drone by finding nearest ground player at spawn.""" if not drone_entity.get("Path") or not ground_entities: return 0 spawn_t = drone_entity["Path"][0]["Time"] spawn_x = drone_entity["Path"][0]["X"] spawn_z = drone_entity["Path"][0]["Z"] best_team = 0 best_dist = float("inf") for ge in ground_entities: pid = ge.get("PlayerID", 0) if pid == 0 or pid not in players_by_id: continue closest_pt = None closest_dt = float("inf") for pt in ge.get("Path", []): dt = abs(pt["Time"] - spawn_t) if dt < closest_dt: closest_dt = dt closest_pt = pt if closest_pt is None: continue dx = closest_pt["X"] - spawn_x dz = closest_pt["Z"] - spawn_z dist = dx * dx + dz * dz if dist < best_dist: best_dist = dist best_team = players_by_id[pid].get("Team", 0) return best_team def export_replay_json(replay_path: Path) -> dict: """Load a replay file and produce a slim dict for the web viewer.""" d = load_gob_file(replay_path) players_by_id = {p["PlayerID"]: p for p in d.get("Players", [])} team_won = d.get("TeamWon", 0) mission = d.get("Mission", {}) level_path = mission.get("Level", "") level_settings_path = mission.get("LevelSettings", "") battle_type = mission.get("BattleType", "") mission_def, mission_def_path = load_mission_def(level_settings_path) use_alt_map_coord = _mission_use_alt_map_coord(mission_def) level_override = _mission_level_override(mission_def) if level_override: level_path = level_override session_id = d.get("SessionID", 0) level_data = load_level_coords(level_path, session_id=session_id) tank_map_coords = None ground_entities_for_bounds = [e for e in d.get("Entities", []) if e.get("PlayerID", 0) != 0 and e.get("ModelName", "").startswith("tankModels/") and e.get("Path")] capture_areas = resolve_capture_areas(mission_def, mission_def_path, battle_type) if level_data: base_c0, base_c1, _ = select_tank_coords(level_data, use_alt_map_coord) tank_map_coords = {"x0": base_c0[0], "z0": base_c0[1], "x1": base_c1[0], "z1": base_c1[1]} c0, c1, _ = resolve_world_bounds( level_data, use_alt_map_coord, mission_def, mission_def_path, battle_type, ) c0, c1, _ = _fit_world_bounds_to_ground_activity( c0, c1, "json", mission_def, mission_def_path, battle_type, ground_entities_for_bounds, ) c0, c1 = _expand_bounds_by_pixels(c0, c1, canvas=CANVAS_MIN, pad_px=MAP_PAD_PX) c0, c1 = _clamp_bounds_to_base(c0, c1, base_c0, base_c1) level_coords = {"x0": c0[0], "z0": c0[1], "x1": c1[0], "z1": c1[1]} else: level_coords = {"x0": 0, "z0": 0, "x1": 4096, "z1": 4096} map_coords = None full_map_level = None if level_data and "mapCoord0" in level_data and "mapCoord1" in level_data: mc0 = level_data["mapCoord0"] mc1 = level_data["mapCoord1"] if isinstance(mc0[0], list): mc0 = mc0[0] if isinstance(mc1[0], list): mc1 = mc1[0] map_coords = {"x0": mc0[0], "z0": mc0[1], "x1": mc1[0], "z1": mc1[1]} stem = Path(level_path).stem if level_path else "" custom = level_data.get("customLevelMap", "") if custom: full_map_level = custom.rstrip("*") else: full_map_level = stem + "_map" if stem else None players_out = [] for p in d.get("Players", []): player = { "id": p["PlayerID"], "name": p.get("Name", ""), "team": p.get("Team", 0), } clan = p.get("ClanTag") or p.get("Clan") or "" if clan: player["clan"] = clan players_out.append(player) ground_entities = [e for e in d.get("Entities", []) if e.get("PlayerID", 0) != 0 and e["ModelName"].startswith("tankModels/") and e.get("Path")] entities_out = [] for e in d.get("Entities", []): path = e.get("Path", []) if not path: continue pid = e.get("PlayerID", 0) etype = _entity_type(e["ModelName"]) if pid == 0 and etype != "drone": continue internal = e["ModelName"].split("/")[-1] vehicle_name = _translate_vehicle(internal) if etype == "drone": drone_team = _resolve_drone_team(e, ground_entities, players_by_id) else: drone_team = None sampled = _subsample_path(path) path_out = [{"t": round(pt["Time"]), "x": round(pt["X"], 1), "z": round(pt["Z"], 1), "y": round(pt.get("Y", 0), 1)} for pt in sampled] icon_key = _vehicle_icon_key(e["ModelName"]) ent = { "playerId": pid, "entityIndex": e.get("EntityIndex", 0), "type": etype, "iconKey": icon_key, "vehicleName": vehicle_name, "path": path_out, } if etype == "aircraft": mini = _vehicle_mini_icon(e["ModelName"]) if mini: ent["miniIcon"] = mini if drone_team is not None: ent["droneTeam"] = drone_team entities_out.append(ent) kills_out = [] for k in d.get("Kills", []): kill = { "time": round(k.get("Time", 0)), "victimId": k.get("VictimID", 0), "killerId": k.get("KillerID", 0), "victimEntityIndex": k.get("VictimEntityIndex", 0), "weapon": _translate_weapon(k.get("Weapon", "")), } vp = k.get("VictimPosition") if vp: kill["victimPos"] = {"x": round(vp.get("X", 0), 1), "z": round(vp.get("Z", 0), 1)} kp = k.get("KillerPosition") if kp: kill["killerPos"] = {"x": round(kp.get("X", 0), 1), "z": round(kp.get("Z", 0), 1)} km = k.get("KillerModel", "") if km: kill["killerVehicle"] = _translate_vehicle(km.split("/")[-1]) vm = k.get("VictimModel", "") if vm: kill["victimVehicle"] = _translate_vehicle(vm.split("/")[-1]) kills_out.append(kill) damages_out = [] for dm in d.get("DamageReports", []): damages_out.append({ "time": round(dm.get("Time", 0)), "offenderId": dm.get("OffenderID", 0), "offendedId": dm.get("OffendedID", 0), }) # Capture-zone ownership + ticket timelines (Spectra v2+); empty for older # replays. Kept in slot space: cap sign / ticket keys / team names are all # keyed by team slot, and `winnerSlot` says which slot won. capture_state = d.get("Zones") if isinstance(d.get("Zones"), dict) else {} tickets_series = d.get("Tickets") if isinstance(d.get("Tickets"), dict) else {} team_names = d.get("TeamSlotNames") if isinstance(d.get("TeamSlotNames"), dict) else {} winner_slot = d.get("WinnerSlot", 0) out = { "teamWon": team_won, "mission": { "level": Path(level_path).stem if level_path else "", "battleType": battle_type or "", "levelSettings": level_settings_path or "", "useAlternativeMapCoord": use_alt_map_coord, }, "levelCoords": level_coords, "captureAreas": capture_areas, "captureState": capture_state, "tickets": tickets_series, "teamNames": team_names, "winnerSlot": winner_slot, "players": players_out, "entities": entities_out, "kills": kills_out, "damages": damages_out, } if tank_map_coords: out["tankMapCoords"] = tank_map_coords if map_coords and full_map_level: out["mapCoords"] = map_coords out["fullMapLevel"] = full_map_level return out # ── Main (CLI wrapper) ───────────────────────────────────────────────────────── def main(): """CLI entry point: render a replay JSON to MP4, or export a slim viewer JSON. Output mode is selected by the output file extension: `.json` → json export, anything else → mp4 render. Supports --profile for cProfile hotspot analysis. """ import argparse parser = argparse.ArgumentParser(description="Render replay_data JSON to MP4") parser.add_argument("replay", nargs="?", help="Path to replay_data .json") parser.add_argument("out", nargs="?", help="Output .mp4 path") parser.add_argument("--fps", type=int, default=FPS) parser.add_argument("--speed", type=float, default=SPEED) parser.add_argument("--workers", type=int, default=N_WORKERS) parser.add_argument("--profile", action="store_true", help="Run with cProfile and print top 40 hotspots") args = parser.parse_args() if args.replay: replay_path = Path(args.replay) else: candidates = sorted(REPLAYS_DIR.glob("*/replay_data.json.gz")) if not candidates: candidates = sorted(REPLAYS_DIR.glob("*.json")) if not candidates: sys.exit(f"No replay .json files in {REPLAYS_DIR}") replay_path = candidates[0] out_path = Path(args.out) if args.out else replay_path.parent / "replay_video.mp4" if out_path.suffix.lower() == ".json": data = export_replay_json(replay_path) raw = json.dumps(data, separators=(",", ":")) out_path.write_text(raw, encoding="utf-8") print(f"Exported {len(raw):,} bytes to {out_path}") return print(f"Input : {replay_path}") print(f"Output : {out_path}") print(f"Settings : {args.fps}fps {args.speed:.0f}× {args.workers} threads") d = load_gob_file(replay_path) if args.profile: import cProfile import pstats import io import time # Phase 1: profile just the prep (everything before frame loop) # We do this by profiling render_gob with 0 workers trick — not feasible, # so profile the whole thing and the stats will show us. print("\n=== PROFILING ===\n") t0 = time.perf_counter() pr = cProfile.Profile() pr.enable() render_gob(d, out_path, fps=args.fps, speed=args.speed, n_workers=args.workers) pr.disable() wall = time.perf_counter() - t0 print(f"\n{'='*70}") print(f"Total wall time: {wall:.2f}s") print(f"{'='*70}\n") s = io.StringIO() ps = pstats.Stats(pr, stream=s) ps.strip_dirs().sort_stats("cumulative") ps.print_stats(40) print(s.getvalue()) s2 = io.StringIO() ps2 = pstats.Stats(pr, stream=s2) ps2.strip_dirs().sort_stats("tottime") ps2.print_stats(40) print("\n--- Sorted by total time ---\n") print(s2.getvalue()) else: render_gob(d, out_path, fps=args.fps, speed=args.speed, n_workers=args.workers) if __name__ == "__main__": main()