Files
SREBOT/BOT/gob.py
T
NotSoToothless b36886425b fix again (#1263)
2026-05-19 18:06:30 -07:00

3327 lines
133 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
gob.py
Handles GOB replay files: renders MP4 videos and exports slim JSON for the
web canvas replay viewer. Output mode is picked from the output file extension.
Usage:
python -m BOT.gob <replay.gob|.json> <out.mp4> # render video
python -m BOT.gob <replay.gob|.json> <out.json> # export json
Public API:
render_gob(d, out_path, fps, speed, n_workers, progress_cb)
load_gob_file(gob_path)
export_replay_json(gob_path)
"""
# Standard Library Imports
import json
import math
import os
import subprocess
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from functools import lru_cache
from pathlib import Path
from typing import Any, Callable, Optional
# Third-Party Library Imports
import numpy as np
import pygob
import zstandard as zstd
from . import SHARED_DIR
from .utils import REPLAYS_DIR
from PIL import Image, ImageDraw, ImageFilter, ImageFont
try:
from data_parser import (
LangTableReader as _LangTableReader,
WeaponTableReader as _WeaponTableReader,
apply_vehicle_name_filters as _apply_filters,
)
_lang = _LangTableReader("English")
_weapons = _WeaponTableReader("English")
def _translate_vehicle(internal: str) -> str:
name = _lang.get_translate(internal)
return _apply_filters(name) if name else internal
def _translate_weapon(internal: str) -> str:
return _weapons.get_translate(internal) or internal
except Exception:
def _translate_vehicle(internal: str) -> str: # type: ignore[misc]
return internal
def _translate_weapon(internal: str) -> str: # type: ignore[misc]
return internal
# Prune verbose suffixes from translated weapon names
_WEAPON_PRUNE = [
" air-to-ground missiles",
" air-to-ground missile",
" air-to-air missiles",
" air-to-air missile",
" anti-radiation missile",
" anti-tank guided missile",
" anti-tank guided missiles",
" guided bomb",
" guided bombs",
]
_orig_translate_weapon = _translate_weapon
def _translate_weapon(internal: str) -> str: # type: ignore[misc] # noqa: F811
name = _orig_translate_weapon(internal)
for suffix in _WEAPON_PRUNE:
if name.endswith(suffix):
name = name[:-len(suffix)]
break
return name
# Load .env from repo root
_env_path = Path(__file__).parent.parent / ".env"
if _env_path.exists():
for _line in _env_path.read_text().splitlines():
if "=" in _line and not _line.startswith("#"):
_k, _, _v = _line.partition("=")
os.environ.setdefault(_k.strip(), _v.strip())
# ── Config ─────────────────────────────────────────────────────────────────────
CANVAS_MIN = 1024 # Minimum canvas resolution (px)
CANVAS_MAX = 4096 # Maximum canvas resolution (px)
MIN_OUTPUT = 1024 # Target minimum crop output size (px)
FPS = 22 # Video frames per second
SPEED = 4 # Playback speed multiplier (4× = 1min sim → 15s video)
MAP_PAD_PX = 100 # Expand resolved map bounds by this many pixels per side
TRAIL_MS = 18_000 # Ground trail length (ms, video time)
AIR_TRAIL_MS = 2_500 # Aircraft trail length (ms, video time)
DRONE_TRAIL_MS = 1_000 # Drone trail length (ms, video time)
DOT_R = 5 # Player dot radius (px)
DRONE_R = 3 # Drone dot radius (px)
AIR_R = 4 # Aircraft triangle half-size (px)
TANK_ICON_SIZE = 15 # Ground vehicle icon size (px, at 1024px reference)
AIR_ICON_SIZE = 20 # Aircraft icon size (px, at 1024px reference)
ICON_HIGHLIGHT_PAD = 3 # Highlight border padding around icon (px, at 1024px reference)
ICON_HIGHLIGHT_COLOR = (200, 200, 200, 120) # RGBA highlight backdrop color
KILL_TTL = 4_000 # Kill marker display duration (ms, video time)
GHOST_TTL = 3_000 # Death fade-to-black duration (ms, video time)
WIPE_GRACE = 90 # Extra frames rendered after a team wipe
N_WORKERS = min(8, os.cpu_count() or 4) # Thread pool size for parallel frame rendering
WIN_COLOR = (0, 200, 0) # green
LOSE_COLOR = (220, 30, 30) # red
MINIMAPS_DIR = SHARED_DIR / "MAPS" / "MINIMAPS"
LEVELS_DIR = SHARED_DIR / "MAPS" / "LEVELS"
ICONS_DIR = SHARED_DIR / "ICONS"
BOTS_DIR = SHARED_DIR.parent
WT_LEVELS_DIR = BOTS_DIR / "War-Thunder-Datamine" / "aces.vromfs.bin_u" / "levels"
WT_MISSIONS_DIR = BOTS_DIR / "War-Thunder-Datamine" / "mis.vromfs.bin_u"
LOCAL_MISSIONS_DIR = LEVELS_DIR / "MISSIONS"
_tl = threading.local()
# ── Vehicle-type icon system ──────────────────────────────────────────────────
# Map raw unittags tags → icon key (checked in order, first match wins)
_TAG_TO_ICON = {
"type_spaa": "spaa",
"type_light_tank": "light_tank",
"type_tank_destroyer": "tank_destroyer",
"type_heavy_tank": "heavy_tank",
"type_medium_tank": "medium_tank",
"type_missile_tank": "tank_destroyer",
"type_jet_bomber": "jet_bomber",
"type_bomber": "bomber",
"type_strike_aircraft": "fighter",
"type_jet_fighter": "jet",
"type_fighter": "fighter",
"type_strike_ucav": "drone",
"type_helicopter": "helicopter",
}
# Icon key → PNG filename (relative to ICONS_DIR)
_ICON_FILES = {
"light_tank": "light.png",
"medium_tank": "medium.png",
"heavy_tank": "heavy.png",
"tank_destroyer": "tank_destroyer.png",
"spaa": "spaa.png",
"fighter": "FALLBACKS/fighter_icon.png",
"jet": "FALLBACKS/jet_icon.png",
"jet_bomber": "FALLBACKS/jet_bomber_icon.png",
"bomber": "FALLBACKS/bomber_icon.png",
"drone": "drone.png",
"helicopter": "FALLBACKS/helicopter_icon.png",
"unknown": "tank_icon.png",
}
# ── Premultiplied sprite ───────────────────────────────────────────────────────
@dataclass
class Sprite:
"""Premultiplied RGBA sprite for fast alpha compositing onto RGB buffers.
Attributes:
pm: Premultiplied RGB array, shape (h, w, 3), uint8.
ia: Inverse alpha array, shape (h, w, 1), uint16.
h: Sprite height in pixels.
w: Sprite width in pixels.
"""
pm: np.ndarray # (h, w, 3) uint8 premultiplied RGB
pm16: np.ndarray # (h, w, 3) uint16 premultiplied RGB (cached for blending math)
ia: np.ndarray # (h, w, 1) uint16 inverse alpha, pre-cast
h: int
w: int
def make_sprite(rgba: np.ndarray) -> Sprite:
a = rgba[..., 3:4].astype(np.uint16)
pm16 = (rgba[..., :3].astype(np.uint16) * a) >> 8
pm = pm16.astype(np.uint8)
ia = (255 - rgba[..., 3:4]).astype(np.uint16)
return Sprite(pm=pm, pm16=pm16, ia=ia, h=rgba.shape[0], w=rgba.shape[1])
def make_black_sprite(spr: Sprite) -> Sprite:
"""Create a black version of a sprite (RGB=0, same alpha)."""
pm = np.zeros_like(spr.pm)
pm16 = np.zeros_like(spr.pm16)
ia = spr.ia.copy()
return Sprite(pm=pm, pm16=pm16, ia=ia, h=spr.h, w=spr.w)
def make_bare_black_sprite(icon_key: str, size: int, highlight_pad: int) -> Sprite:
"""Create a black icon sprite with no highlight glow — just the bare silhouette.
The sprite is sized to match the highlighted version (with padding) so it can be
used as a drop-in replacement when the highlight fades away.
"""
rgba = _load_icon_rgba(icon_key)
src_h, src_w = rgba.shape[:2]
scale_f = size / max(src_h, src_w)
new_w = max(1, int(src_w * scale_f + 0.5))
new_h = max(1, int(src_h * scale_f + 0.5))
img = Image.fromarray(rgba).resize((new_w, new_h), Image.Resampling.LANCZOS)
arr = np.asarray(img).copy()
# Zero out RGB, keep alpha — black silhouette
arr[..., :3] = 0
if highlight_pad > 0:
# Embed in padded canvas to match highlighted sprite dimensions
out_h, out_w = new_h + highlight_pad * 2, new_w + highlight_pad * 2
canvas = np.zeros((out_h, out_w, 4), dtype=np.uint8)
canvas[highlight_pad:highlight_pad + new_h, highlight_pad:highlight_pad + new_w] = arr
arr = canvas
return make_sprite(arr)
def _get_unit_tags(internal_name: str) -> list[str] | None:
"""Get raw tags for a vehicle from UnitTags, returns None if not found."""
from data_parser import UnitTags
return UnitTags.get()._get_tags(internal_name)
MINIS_DIR = SHARED_DIR / "ICONS" / "MINIS"
_AIRCRAFT_TAGS = {"air", "aircraft", "helicopter"}
def get_icon_key(model_name: str) -> str:
"""Map a ModelName (e.g. 'tankModels/ussr_t_34') to an icon key.
For aircraft/helicopters, tries a per-vehicle mini icon first
('{internal}_ico.png' in MINIS/), then falls back to tag-based.
Ground vehicles always use tag-based icons.
"""
internal = model_name.split("/")[-1]
tags = _get_unit_tags(internal)
if tags:
tag_set = set(tags)
# Only aircraft/helis get per-vehicle mini icons
if tag_set & _AIRCRAFT_TAGS:
mini_path = MINIS_DIR / f"{internal}_ico.png"
if mini_path.exists():
return f"mini:{internal}"
# Tag-based fallback (used for all ground vehicles, and aircraft without a mini)
for tag, icon in _TAG_TO_ICON.items():
if tag in tag_set:
return icon
return "unknown"
@lru_cache(maxsize=512)
def _load_icon_rgba(icon_key: str) -> np.ndarray:
"""Load an icon PNG as RGBA numpy array, cropped to content bounds. Cached."""
if icon_key.startswith("mini:"):
internal = icon_key[5:]
path = MINIS_DIR / f"{internal}_ico.png"
else:
filename = _ICON_FILES.get(icon_key, _ICON_FILES["unknown"])
path = ICONS_DIR / filename
arr = np.asarray(Image.open(path).convert("RGBA")).copy()
# Crop to bounding box of non-transparent pixels
alpha = arr[..., 3]
rows = np.any(alpha > 0, axis=1)
cols = np.any(alpha > 0, axis=0)
if rows.any() and cols.any():
y0, y1 = np.where(rows)[0][[0, -1]]
x0, x1 = np.where(cols)[0][[0, -1]]
arr = arr[y0:y1 + 1, x0:x1 + 1].copy()
return arr
def make_tinted_icon_sprite(icon_key: str, color: tuple[int, int, int],
size: int, highlight_pad: int = 0,
highlight_color: tuple[int, int, int, int] = (0, 0, 0, 0),
) -> Sprite:
"""Load an icon, resize preserving aspect ratio, tint, add outline highlight."""
rgba = _load_icon_rgba(icon_key)
src_h, src_w = rgba.shape[:2]
# Resize to fit within `size` height, preserving aspect ratio
scale_f = size / max(src_h, src_w)
new_w = max(1, int(src_w * scale_f + 0.5))
new_h = max(1, int(src_h * scale_f + 0.5))
img = Image.fromarray(rgba).resize((new_w, new_h), Image.Resampling.LANCZOS)
arr = np.asarray(img).copy().astype(np.float32)
# Tint: multiply RGB by color/255, preserving alpha
arr[..., 0] *= color[0] / 255.0
arr[..., 1] *= color[1] / 255.0
arr[..., 2] *= color[2] / 255.0
tinted = np.clip(arr, 0, 255).astype(np.uint8)
if highlight_pad <= 0 or highlight_color[3] == 0:
return make_sprite(tinted)
# Build outline highlight by painting the highlight color at offsets around the icon
pad = highlight_pad
out_h, out_w = new_h + pad * 2, new_w + pad * 2
canvas = np.zeros((out_h, out_w, 4), dtype=np.uint8)
tinted_img = Image.fromarray(tinted)
hl_layer = Image.new("RGBA", (out_w, out_h), (0, 0, 0, 0))
# Stamp the icon alpha at each offset around the center to create an outline
for dy in range(-pad, pad + 1):
for dx in range(-pad, pad + 1):
if dx * dx + dy * dy > pad * pad:
continue
hl_layer.paste(tinted_img, (pad + dx, pad + dy), tinted_img)
# Replace RGB with highlight color, keep the merged alpha capped
hl_arr = np.asarray(hl_layer).copy()
mask = hl_arr[..., 3] > 0
hl_arr[mask, 0] = highlight_color[0]
hl_arr[mask, 1] = highlight_color[1]
hl_arr[mask, 2] = highlight_color[2]
hl_arr[..., 3] = np.minimum(hl_arr[..., 3], highlight_color[3])
# Composite tinted icon on top of highlight
bg = Image.fromarray(hl_arr)
bg.paste(tinted_img, (pad, pad), tinted_img)
return make_sprite(np.asarray(bg).copy())
def load_target_sprite(filename: str, size: int) -> Sprite:
"""Load a target icon PNG, resize, return as Sprite."""
path = ICONS_DIR / filename
rgba = np.asarray(Image.open(path).convert("RGBA").resize(
(size, size), Image.Resampling.LANCZOS
)).copy()
return make_sprite(rgba)
# ── Rotated sprite cache for aircraft ─────────────────────────────────────────
ROTATION_STEPS = 72 # one sprite every 5°
def _sprite_to_rgba(spr: Sprite) -> np.ndarray:
"""Reconstruct RGBA array from a premultiplied Sprite."""
alpha = (255 - spr.ia[..., 0]).astype(np.uint8)
rgba = np.zeros((spr.h, spr.w, 4), dtype=np.uint8)
safe_a = np.maximum(alpha, 1).astype(np.float32)
rgba[..., 0] = np.clip(spr.pm[..., 0].astype(np.float32) * 255.0 / safe_a, 0, 255).astype(np.uint8)
rgba[..., 1] = np.clip(spr.pm[..., 1].astype(np.float32) * 255.0 / safe_a, 0, 255).astype(np.uint8)
rgba[..., 2] = np.clip(spr.pm[..., 2].astype(np.float32) * 255.0 / safe_a, 0, 255).astype(np.uint8)
rgba[..., 3] = alpha
return rgba
def make_rotation_cache(spr: Sprite) -> list[Sprite]:
"""Pre-compute rotated sprites at ROTATION_STEPS even angles. Index 0 = 0°, etc."""
rgba = _sprite_to_rgba(spr)
img = Image.fromarray(rgba)
cache: list[Sprite] = []
for i in range(ROTATION_STEPS):
deg = i * (360.0 / ROTATION_STEPS)
# PIL rotates CCW; we want CW rotation for heading, so negate
rot = img.rotate(-deg, resample=Image.Resampling.BILINEAR, expand=True)
cache.append(make_sprite(np.asarray(rot).copy()))
return cache
def precompute_headings(px: np.ndarray, py: np.ndarray) -> np.ndarray:
"""Compute heading angle (degrees, 0=up/north, CW) per entity per frame.
Returns (n_entities, n_frames) float32 array. -1 where invalid.
"""
n_ents, n_frames = px.shape
headings = np.full((n_ents, n_frames), -1.0, dtype=np.float32)
for i in range(n_ents):
last_heading = 0.0
for f in range(1, n_frames):
if px[i, f] < 0 or px[i, f - 1] < 0:
headings[i, f] = last_heading
continue
dx = float(px[i, f] - px[i, f - 1])
dy = float(py[i, f] - py[i, f - 1])
if abs(dx) < 0.5 and abs(dy) < 0.5:
headings[i, f] = last_heading
continue
# atan2(dx, -dy): 0=up, 90=right, 180=down, 270=left
deg = math.degrees(math.atan2(dx, -dy)) % 360
last_heading = deg
headings[i, f] = deg
# Fill frame 0 with frame 1's heading
if n_frames > 1:
headings[i, 0] = headings[i, 1]
return headings
def heading_to_rot_index(deg: float) -> int:
"""Convert a heading in degrees to a rotation cache index."""
return int(round(deg / (360.0 / ROTATION_STEPS))) % ROTATION_STEPS
def blit(buf: np.ndarray, spr: Sprite, x: int, y: int) -> None:
x1, y1 = max(x, 0), max(y, 0)
H, W = buf.shape[0], buf.shape[1]
x2, y2 = min(x + spr.w, W), min(y + spr.h, H)
if x1 >= x2 or y1 >= y2:
return
sy1, sx1 = y1 - y, x1 - x
sy2, sx2 = sy1 + (y2 - y1), sx1 + (x2 - x1)
pm = spr.pm16[sy1:sy2, sx1:sx2]
buf[y1:y2, x1:x2] = (pm
+ ((buf[y1:y2, x1:x2] * spr.ia[sy1:sy2, sx1:sx2]) >> 8)
).astype(np.uint8)
def blit_alpha(buf: np.ndarray, spr: Sprite, x: int, y: int, alpha: float) -> None:
"""Like blit() but with an additional [0,1] alpha multiplier."""
if alpha <= 0:
return
x1, y1 = max(x, 0), max(y, 0)
H, W = buf.shape[0], buf.shape[1]
x2, y2 = min(x + spr.w, W), min(y + spr.h, H)
if x1 >= x2 or y1 >= y2:
return
sy1, sx1 = y1 - y, x1 - x
sy2, sx2 = sy1 + (y2 - y1), sx1 + (x2 - x1)
a16 = int(alpha * 256)
pm = (spr.pm16[sy1:sy2, sx1:sx2] * a16) >> 8
ia = 255 - (((255 - spr.ia[sy1:sy2, sx1:sx2]) * a16) >> 8)
buf[y1:y2, x1:x2] = (pm + ((buf[y1:y2, x1:x2] * ia) >> 8)).astype(np.uint8)
def blit_batch(buf: np.ndarray, items: list[tuple[Sprite, int, int]]) -> None:
"""Blit multiple sprites in one call — avoids per-call Python/numpy overhead."""
H = buf.shape[0]
W = buf.shape[1]
for spr, x, y in items:
x1 = max(x, 0); y1 = max(y, 0)
x2 = min(x + spr.w, W); y2 = min(y + spr.h, H)
if x1 >= x2 or y1 >= y2:
continue
sy1 = y1 - y; sx1 = x1 - x
sy2 = sy1 + (y2 - y1); sx2 = sx1 + (x2 - x1)
pm = spr.pm16[sy1:sy2, sx1:sx2]
buf[y1:y2, x1:x2] = (pm
+ ((buf[y1:y2, x1:x2] * spr.ia[sy1:sy2, sx1:sx2]) >> 8)
).astype(np.uint8)
# ── VideoCtx — all pre-computed data for one output video ─────────────────────
@dataclass
class VideoCtx:
"""Pre-computed rendering context for one GOB replay video.
Holds all interpolated positions, colors, death states, kill/damage events,
label sprites, and the baked background for every frame so that
render_one_ctx can draw each frame without re-computing anything.
"""
n_players: int
px_all: np.ndarray # (n_players, n_frames) int16, -1=absent
py_all: np.ndarray
colors_arr: np.ndarray # (n_players, 3) uint8
trail_colors_arr: np.ndarray # (n_players, 3) uint8
is_dead: np.ndarray # (n_players, n_frames) bool
death_frame: np.ndarray # (n_players,) int32 — frame of death, or n_frames
death_fade: np.ndarray # (n_players, n_frames) float32 — 1→0 over ghost ttl
last_dead_px: np.ndarray # (n_players,) int16 — last known x at death
last_dead_py: np.ndarray # (n_players,) int16 — last known y at death
kills_by_frame: list # per-frame kill events
damages_by_frame: list # per-frame damage events
label_sprites: list # list[Sprite] combined name+vehicle per player
bg_arr: np.ndarray # pre-baked background RGB
end_frame: int
n_drones: int
px_drone: np.ndarray | None
py_drone: np.ndarray | None
drone_colors_arr: np.ndarray | None
drone_trail_colors: np.ndarray | None
drone_sprites: list # list[Sprite]
drone_is_hit: np.ndarray | None = None # True from kill onward (for fade)
drone_hit_fade: np.ndarray | None = None # 1→0 from kill to crash
drone_is_crashed: np.ndarray | None = None # True after path ends
drone_crash_frame: np.ndarray | None = None
drone_last_dead_px: np.ndarray | None = None
drone_last_dead_py: np.ndarray | None = None
drone_alt: np.ndarray | None = None
drone_alt_sprites: dict = field(default_factory=dict)
n_aircraft: int = 0
px_air: np.ndarray | None = None
py_air: np.ndarray | None = None
air_colors_arr: np.ndarray | None = None
air_trail_colors: np.ndarray | None = None
air_sprites: list = field(default_factory=list)
air_is_hit: np.ndarray | None = None # True from kill onward
air_hit_fade: np.ndarray | None = None # 1→0 from kill to crash
air_is_crashed: np.ndarray | None = None # True after path ends
air_crash_frame: np.ndarray | None = None
air_last_dead_px: np.ndarray | None = None
air_last_dead_py: np.ndarray | None = None
air_alt: np.ndarray | None = None
air_alt_sprites: dict = field(default_factory=dict)
air_trail_f: int = 1
drone_trail_f: int = 1
# Scaled circle masks
dot_dy: np.ndarray = field(default_factory=lambda: DOT_DY)
dot_dx: np.ndarray = field(default_factory=lambda: DOT_DX)
shadow_dy: np.ndarray = field(default_factory=lambda: SHADOW_DY)
shadow_dx: np.ndarray = field(default_factory=lambda: SHADOW_DX)
drone_dy: np.ndarray = field(default_factory=lambda: DRONE_DY)
drone_dx: np.ndarray = field(default_factory=lambda: DRONE_DX)
air_dy: np.ndarray = field(default_factory=lambda: AIR_DY)
air_dx: np.ndarray = field(default_factory=lambda: AIR_DX)
dot_r: int = DOT_R
drone_r: int = DRONE_R
air_r: int = AIR_R
canvas: int = CANVAS_MIN
# Per-player vehicle icon sprites (tinted to player color)
player_icon_sprites: list = field(default_factory=list) # list[Sprite], len=n_players
player_dead_sprites: list = field(default_factory=list) # list[Sprite], black versions
air_icon_sprites: list = field(default_factory=list) # list[Sprite], len=n_aircraft
air_dead_sprites: list = field(default_factory=list) # list[Sprite], black versions
# Per-aircraft rotation caches and heading angles
air_rot_caches: list = field(default_factory=list) # list[list[Sprite]], per-aircraft
air_dead_rot_caches: list = field(default_factory=list) # list[list[Sprite]], black versions
air_headings: np.ndarray | None = None # (n_aircraft, n_frames) float32
# Per-drone icon sprites + rotation
drone_icon_sprites: list = field(default_factory=list)
drone_dead_sprites: list = field(default_factory=list)
drone_rot_caches: list = field(default_factory=list)
drone_dead_rot_caches: list = field(default_factory=list)
drone_headings: np.ndarray | None = None
# Kill/damage target icons
kill_target_spr: Sprite | None = None
dmg_target_spr: Sprite | None = None
# ── Map / LevelDef load ────────────────────────────────────────────────────────
def _clean_map_key(raw: str) -> str:
key = Path(raw).name.strip().rstrip("*")
if key.endswith(".png"):
key = key[:-4]
return key
def _load_json_file(path: Path) -> dict | None:
try:
return json.loads(path.read_text())
except Exception as e:
print(f" Failed to parse {path.name}: {e}")
return None
def _normalize_mission_relpath(path: str) -> str:
rel = str(path or "").strip().replace("\\", "/").lstrip("/")
if rel[:9].lower() == "gamedata/":
rel = "gamedata/" + rel[9:]
return rel.lower()
def _mission_path_candidates(path: str) -> list[Path]:
rel = _normalize_mission_relpath(path)
if not rel:
return []
out: list[Path] = []
def add(p: Path) -> None:
if p not in out:
out.append(p)
def add_variants(root: Path) -> None:
raw = root / rel
suffix = raw.suffix.lower()
if suffix == ".blk":
add(raw.with_suffix(".blkx"))
add(raw)
elif suffix == ".blkx":
add(raw)
add(raw.with_suffix(".blk"))
else:
add(raw)
add(raw.with_suffix(".blkx"))
add(raw.with_suffix(".blk"))
# Prefer local mission bundle in SHARED/MAPS/LEVELS/MISSIONS.
add_variants(LOCAL_MISSIONS_DIR)
# Fallback to full datamine clone for anything not yet copied locally.
add_variants(WT_MISSIONS_DIR)
return out
def _load_mission_def_from_path(path: str) -> tuple[dict | None, Path | None]:
for p in _mission_path_candidates(path):
if not p.exists():
continue
data = _load_json_file(p)
if isinstance(data, dict):
return data, p
return None, None
def load_mission_def(level_settings_path: str) -> tuple[dict | None, Path | None]:
"""Load mission settings .blkx referenced by replay Mission.LevelSettings."""
return _load_mission_def_from_path(level_settings_path)
def _mission_use_alt_map_coord(mission_def: dict | None) -> bool:
"""Return whether mission settings request alternative tank map coords."""
if not isinstance(mission_def, dict):
return False
ms = mission_def.get("mission_settings")
if not isinstance(ms, dict):
return False
mission_cfg = ms.get("mission")
if not isinstance(mission_cfg, dict):
return False
val = mission_cfg.get("useAlternativeMapCoord")
if isinstance(val, bool):
return val
per_diff = mission_cfg.get("mission")
if isinstance(per_diff, list):
for item in per_diff:
if isinstance(item, dict) and item.get("useAlternativeMapCoord") is True:
return True
return False
def _mission_level_override(mission_def: dict | None) -> str:
if not isinstance(mission_def, dict):
return ""
ms = mission_def.get("mission_settings")
if not isinstance(ms, dict):
return ""
mission_cfg = ms.get("mission")
if not isinstance(mission_cfg, dict):
return ""
lvl = mission_cfg.get("level", "")
return lvl if isinstance(lvl, str) else ""
def _import_paths(mission_def: dict) -> list[str]:
imports = mission_def.get("imports")
if not isinstance(imports, dict):
return []
import_record = imports.get("import_record")
records = import_record if isinstance(import_record, list) else [import_record]
out: list[str] = []
for rec in records:
if not isinstance(rec, dict):
continue
f = rec.get("file")
if isinstance(f, str) and f.strip():
out.append(f)
return out
def _collect_mission_areas(root_def: dict | None, root_path: Path | None) -> dict[str, dict]:
if not isinstance(root_def, dict):
return {}
areas: dict[str, dict] = {}
queue: list[tuple[dict, Path | None]] = [(root_def, root_path)]
seen: set[Path] = set()
if root_path:
seen.add(root_path.resolve())
while queue:
cur_def, _ = queue.pop(0)
cur_areas = cur_def.get("areas")
if isinstance(cur_areas, dict):
for name, area in cur_areas.items():
if isinstance(name, str) and isinstance(area, dict) and name not in areas:
areas[name] = area
for rel in _import_paths(cur_def):
child_def, child_path = _load_mission_def_from_path(rel)
if not isinstance(child_def, dict):
continue
resolved = child_path.resolve() if child_path else None
if resolved and resolved in seen:
continue
if resolved:
seen.add(resolved)
queue.append((child_def, child_path))
return areas
def _mission_battle_area_targets(mission_def: dict | None) -> list[str]:
if not isinstance(mission_def, dict):
return []
ms = mission_def.get("mission_settings")
if not isinstance(ms, dict):
return []
out: list[str] = []
seen: set[str] = set()
def add(target: str) -> None:
if target and target not in seen:
seen.add(target)
out.append(target)
def walk(node: Any) -> None:
if isinstance(node, dict):
for key, value in node.items():
if key == "battleArea" and isinstance(value, dict):
target = value.get("target")
if isinstance(target, str):
add(target)
walk(value)
elif isinstance(node, list):
for item in node:
walk(item)
walk(ms)
return out
def _battle_type_candidates(battle_type: str) -> list[str]:
bt = _clean_map_key(battle_type).lower()
if not bt:
return []
mode = bt.split("_")[-1]
out: list[str] = []
if mode:
out.append(f"{mode}_battle_area_realistic")
out.append(f"{mode}_battle_area_arcade")
out.append(f"{mode}_battle_area_hardcore")
out.append(f"{mode}_battle_area")
if "dom" in bt:
out.append("dom_battle_area_realistic")
out.append("dom_battle_area_arcade")
out.append("dom_battle_area_hardcore")
if "conq" in bt:
out.append("conq_battle_area_realistic")
out.append("conq_battle_area_arcade")
out.append("conq_battle_area_hardcore")
dedup: list[str] = []
seen: set[str] = set()
for name in out:
if name not in seen:
seen.add(name)
dedup.append(name)
return dedup
def _battle_area_variant_priority(name: str) -> int:
ln = name.lower()
if "realistic" in ln:
return 0
if "hardcore" in ln:
return 1
if "arcade" in ln:
return 2
if "briefing" in ln or "brief_" in ln:
return 3
return 4
def _select_battle_area_name(mission_def: dict | None,
areas: dict[str, dict],
battle_type: str) -> str:
if not areas:
return ""
for target in _mission_battle_area_targets(mission_def):
if target in areas:
return target
for name in _battle_type_candidates(battle_type):
if name in areas:
return name
return ""
def _box_bounds_from_tm(area: dict) -> tuple[list[float], list[float]] | None:
if area.get("type") != "Box":
return None
tm = area.get("tm")
if not isinstance(tm, list) or len(tm) < 4:
return None
try:
axes = np.array(tm[:3], dtype=np.float64)
center = np.array(tm[3], dtype=np.float64)
except Exception:
return None
if axes.shape != (3, 3) or center.shape[0] < 3:
return None
xs: list[float] = []
zs: list[float] = []
for sx in (-0.5, 0.5):
for sy in (-0.5, 0.5):
for sz in (-0.5, 0.5):
pt = center + sx * axes[0] + sy * axes[1] + sz * axes[2]
xs.append(float(pt[0]))
zs.append(float(pt[2]))
x0, x1 = min(xs), max(xs)
z0, z1 = min(zs), max(zs)
if x1 <= x0 or z1 <= z0:
return None
return [x0, z0], [x1, z1]
def resolve_world_bounds(level_def: dict,
use_alt_map_coord: bool,
mission_def: dict | None,
mission_def_path: Path | None,
battle_type: str,
) -> tuple[list[float], list[float], str]:
tc0, tc1, coord_src = select_tank_coords(level_def, use_alt_map_coord)
areas = _collect_mission_areas(mission_def, mission_def_path)
battle_name = _select_battle_area_name(mission_def, areas, battle_type)
if not battle_name:
return tc0, tc1, coord_src
area = areas.get(battle_name)
if not isinstance(area, dict):
return tc0, tc1, coord_src
bounds = _box_bounds_from_tm(area)
if bounds is None:
return tc0, tc1, coord_src
c0, c1 = bounds
return c0, c1, f"battleArea:{battle_name}"
def _ground_points_for_bounds_fit(ground_entities: list[dict]) -> list[tuple[float, float]]:
pts: list[tuple[float, float]] = []
for ent in ground_entities:
for sample in ent.get("Path", []):
try:
pts.append((float(sample["X"]), float(sample["Z"])))
except Exception:
continue
return pts
def _bounds_coverage(points: list[tuple[float, float]], c0: list[float], c1: list[float]) -> float:
if not points:
return 0.0
x_lo = min(float(c0[0]), float(c1[0]))
x_hi = max(float(c0[0]), float(c1[0]))
z_lo = min(float(c0[1]), float(c1[1]))
z_hi = max(float(c0[1]), float(c1[1]))
inside = 0
for x, z in points:
if x_lo <= x <= x_hi and z_lo <= z <= z_hi:
inside += 1
return inside / float(len(points))
def _fit_world_bounds_to_ground_activity(c0: list[float], c1: list[float], coord_src: str,
mission_def: dict | None, mission_def_path: Path | None,
battle_type: str, ground_entities: list[dict],
) -> tuple[list[float], list[float], str]:
pts = _ground_points_for_bounds_fit(ground_entities)
if not pts:
return c0, c1, coord_src
areas = _collect_mission_areas(mission_def, mission_def_path)
if not areas:
return c0, c1, coord_src
names: list[str] = []
seen: set[str] = set()
for name in _mission_battle_area_targets(mission_def):
if name in areas and name not in seen:
seen.add(name)
names.append(name)
for name in _battle_type_candidates(battle_type):
if name in areas and name not in seen:
seen.add(name)
names.append(name)
if not names:
return c0, c1, coord_src
candidates: list[tuple[str, list[float], list[float], float, float]] = []
for name in names:
area = areas.get(name)
if not isinstance(area, dict):
continue
bounds = _box_bounds_from_tm(area)
if bounds is None:
continue
bc0, bc1 = bounds
cov = _bounds_coverage(pts, bc0, bc1)
area_sz = abs((float(bc1[0]) - float(bc0[0])) * (float(bc1[1]) - float(bc0[1])))
candidates.append((name, bc0, bc1, cov, area_sz))
if not candidates:
return c0, c1, coord_src
cur_cov = _bounds_coverage(pts, c0, c1)
best_name, best_c0, best_c1, best_cov, _ = sorted(
candidates, key=lambda t: (-t[3], _battle_area_variant_priority(t[0]), t[4])
)[0]
# Switch when current bounds miss noticeable movement.
if best_cov > cur_cov + 0.02:
return best_c0, best_c1, f"battleArea:{best_name}|fit={best_cov:.3f}"
return c0, c1, coord_src
def _capture_mode_prefixes(battle_type: str) -> list[str]:
bt = _clean_map_key(battle_type).lower()
if "conq" in bt:
return ["conq_capture_area_"]
if "dom" in bt:
return ["dom_capture_area_"]
if "bttl" in bt:
return ["bttl_t1_capture_area_", "bttl_t2_capture_area_"]
return []
def _capture_sort_key(name: str) -> tuple[int, str]:
digits = "".join(ch for ch in name if ch.isdigit())
return (int(digits) if digits else 999, name)
def _capture_radius_from_tm(tm: list) -> float:
try:
a = np.array(tm[0], dtype=np.float64)
b = np.array(tm[2], dtype=np.float64)
ra = float(np.hypot(a[0], a[2]))
rb = float(np.hypot(b[0], b[2]))
return max(ra, rb)
except Exception:
return 0.0
def _capture_tm_vectors(tm: list) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] | None:
try:
a0 = np.array(tm[0], dtype=np.float64)
a1 = np.array(tm[1], dtype=np.float64)
a2 = np.array(tm[2], dtype=np.float64)
c = np.array(tm[3], dtype=np.float64)
except Exception:
return None
if a0.shape[0] < 3 or a1.shape[0] < 3 or a2.shape[0] < 3 or c.shape[0] < 3:
return None
return a0, a1, a2, c
def resolve_capture_areas(mission_def: dict | None, mission_def_path: Path | None,
battle_type: str) -> list[dict]:
"""Resolve gameplay capture areas from mission imports (mode-specific)."""
areas = _collect_mission_areas(mission_def, mission_def_path)
if not areas:
return []
prefixes = _capture_mode_prefixes(battle_type)
capture_names: list[str] = []
for name, area in areas.items():
if not isinstance(area, dict):
continue
lname = name.lower()
if "capture_area" not in lname or lname.startswith("briefing_"):
continue
if prefixes and not any(lname.startswith(p) for p in prefixes):
continue
capture_names.append(name)
if not capture_names:
return []
# Keep one variant per area base (prefer arcade, then realistic, hardcore).
suffix_prio = ["_arcade", "_realistic", "_hardcore", "_simulator", ""]
groups: dict[str, dict[str, str]] = {}
for name in capture_names:
lname = name.lower()
suffix = ""
base = lname
for sfx in suffix_prio[:-1]:
if lname.endswith(sfx):
suffix = sfx
base = lname[:-len(sfx)]
break
groups.setdefault(base, {})[suffix] = name
chosen: list[str] = []
for base in sorted(groups.keys(), key=_capture_sort_key):
variants = groups[base]
pick = ""
for sfx in suffix_prio:
if sfx in variants:
pick = variants[sfx]
break
if pick:
chosen.append(pick)
out: list[dict] = []
for name in chosen:
area = areas.get(name)
if not isinstance(area, dict):
continue
tm = area.get("tm")
if not isinstance(tm, list) or len(tm) < 4 or not isinstance(tm[3], list):
continue
try:
cx = float(tm[3][0])
cz = float(tm[3][2])
except Exception:
continue
tm_vecs = _capture_tm_vectors(tm)
if tm_vecs is not None:
a0, _a1, a2, center = tm_vecs
tm_data = {
"a0": [float(a0[0]), float(a0[1]), float(a0[2])],
"a2": [float(a2[0]), float(a2[1]), float(a2[2])],
"center": [float(center[0]), float(center[1]), float(center[2])],
}
else:
tm_data = None
out.append({
"name": name,
"type": str(area.get("type", "")),
"x": cx,
"z": cz,
"radius": _capture_radius_from_tm(tm),
"tm": tm_data,
})
# If exactly one cap is a strong outlier (>2x median sibling radius),
# halve only that cap (including tm X/Z basis so rendered outline matches).
if len(out) >= 3:
radii = sorted(float(c["radius"]) for c in out if float(c["radius"]) > 0.0)
if len(radii) >= 3:
median_r = radii[len(radii) // 2]
if median_r > 0.0:
outlier = [
i for i, c in enumerate(out)
if float(c.get("radius", 0.0)) > (2.0 * median_r)
]
if len(outlier) == 1:
oi = outlier[0]
out[oi]["radius"] = float(out[oi]["radius"]) * 0.5
tm = out[oi].get("tm")
if isinstance(tm, dict):
for key in ("a0", "a2"):
vec = tm.get(key)
if isinstance(vec, list) and len(vec) >= 3:
tm[key] = [float(vec[0]) * 0.5, float(vec[1]) * 0.5, float(vec[2]) * 0.5]
return out
def _world_to_map_px(x: float, z: float, x0: float, z0: float, xr: float, zr: float, canvas: int) -> tuple[int, int]:
px = int(round((x - x0) / xr * canvas))
py = int(round((z0 + zr - z) / zr * canvas))
return px, py
def _cap_outline_points_px(cap: dict, x0: float, z0: float, xr: float, zr: float, canvas: int) -> list[tuple[int, int]]:
tm = cap.get("tm")
if not isinstance(tm, dict):
return []
a0 = tm.get("a0")
a2 = tm.get("a2")
center = tm.get("center")
if not (isinstance(a0, list) and isinstance(a2, list) and isinstance(center, list)):
return []
try:
ax0, az0 = float(a0[0]), float(a0[2])
ax2, az2 = float(a2[0]), float(a2[2])
cx, cz = float(center[0]), float(center[2])
except Exception:
return []
cap_type = str(cap.get("type", "")).lower()
points: list[tuple[int, int]] = []
if cap_type in {"sphere", "cylinder"}:
# Project the transformed local unit circle in XZ using tm basis vectors.
steps = 64
for i in range(steps):
t = (2.0 * math.pi * i) / steps
wx = cx + math.cos(t) * ax0 + math.sin(t) * ax2
wz = cz + math.cos(t) * az0 + math.sin(t) * az2
points.append(_world_to_map_px(wx, wz, x0, z0, xr, zr, canvas))
return points
if cap_type == "box":
# Top-down rectangle from transformed local XZ square corners.
corners = [(-0.5, -0.5), (0.5, -0.5), (0.5, 0.5), (-0.5, 0.5)]
for sx, sz in corners:
wx = cx + sx * ax0 + sz * ax2
wz = cz + sx * az0 + sz * az2
points.append(_world_to_map_px(wx, wz, x0, z0, xr, zr, canvas))
return points
return []
def _map_name_candidates(level_path: str, level_def: dict | None,
battle_type: str = "", level_settings_path: str = "") -> list[str]:
stem = Path(level_path).stem
out: list[str] = []
seen: set[str] = set()
def add(name: str) -> None:
if not name or name in seen:
return
seen.add(name)
out.append(name)
if isinstance(level_def, dict):
tank_custom = level_def.get("customLevelTankMap", "")
if isinstance(tank_custom, str) and tank_custom:
add(_clean_map_key(tank_custom))
full_custom = level_def.get("customLevelMap", "")
if isinstance(full_custom, str) and full_custom:
full_key = _clean_map_key(full_custom)
add(full_key)
if full_key.endswith("_map"):
add(full_key[:-4] + "_tankmap")
# Mission-specific identifiers from replay fields (mode/scenario variants).
if battle_type:
bkey = _clean_map_key(battle_type)
add(f"{bkey}_tankmap")
add(f"{bkey}_map")
add(bkey)
if level_settings_path:
mkey = _clean_map_key(Path(level_settings_path).stem)
add(f"{mkey}_tankmap")
add(f"{mkey}_map")
add(mkey)
add(f"{stem}_tankmap")
add(stem)
return out
def _bounds_almost_equal(c0: list[float], c1: list[float],
r0: list[float], r1: list[float],
eps: float = 1e-6) -> bool:
return (
abs(float(c0[0]) - float(r0[0])) <= eps
and abs(float(c0[1]) - float(r0[1])) <= eps
and abs(float(c1[0]) - float(r1[0])) <= eps
and abs(float(c1[1]) - float(r1[1])) <= eps
)
def _expand_bounds_by_pixels(c0: list[float], c1: list[float],
canvas: int, pad_px: int) -> tuple[list[float], list[float]]:
if canvas <= 0 or pad_px <= 0:
return [float(c0[0]), float(c0[1])], [float(c1[0]), float(c1[1])]
x_lo = min(float(c0[0]), float(c1[0]))
x_hi = max(float(c0[0]), float(c1[0]))
z_lo = min(float(c0[1]), float(c1[1]))
z_hi = max(float(c0[1]), float(c1[1]))
xr = x_hi - x_lo
zr = z_hi - z_lo
if xr <= 0.0 or zr <= 0.0:
return [float(c0[0]), float(c0[1])], [float(c1[0]), float(c1[1])]
pad_x = xr * (float(pad_px) / float(canvas))
pad_z = zr * (float(pad_px) / float(canvas))
return [x_lo - pad_x, z_lo - pad_z], [x_hi + pad_x, z_hi + pad_z]
def _clamp_bounds_to_base(c0: list[float], c1: list[float],
base_c0: list[float], base_c1: list[float]) -> tuple[list[float], list[float]]:
bx_lo = min(float(base_c0[0]), float(base_c1[0]))
bx_hi = max(float(base_c0[0]), float(base_c1[0]))
bz_lo = min(float(base_c0[1]), float(base_c1[1]))
bz_hi = max(float(base_c0[1]), float(base_c1[1]))
x_lo = min(float(c0[0]), float(c1[0]))
x_hi = max(float(c0[0]), float(c1[0]))
z_lo = min(float(c0[1]), float(c1[1]))
z_hi = max(float(c0[1]), float(c1[1]))
x_lo = max(bx_lo, min(x_lo, bx_hi))
x_hi = max(bx_lo, min(x_hi, bx_hi))
z_lo = max(bz_lo, min(z_lo, bz_hi))
z_hi = max(bz_lo, min(z_hi, bz_hi))
if x_hi <= x_lo:
x_lo, x_hi = bx_lo, bx_hi
if z_hi <= z_lo:
z_lo, z_hi = bz_lo, bz_hi
return [x_lo, z_lo], [x_hi, z_hi]
def _crop_map_to_world_bounds(img: Image.Image,
base_c0: list[float], base_c1: list[float],
render_c0: list[float], render_c1: list[float],
) -> Image.Image:
w, h = img.size
bx0, bz0 = float(base_c0[0]), float(base_c0[1])
bx1, bz1 = float(base_c1[0]), float(base_c1[1])
rx0, rz0 = float(render_c0[0]), float(render_c0[1])
rx1, rz1 = float(render_c1[0]), float(render_c1[1])
dx = bx1 - bx0
dz = bz1 - bz0
if dx == 0 or dz == 0:
return img
x_lo = min(rx0, rx1)
x_hi = max(rx0, rx1)
z_lo = min(rz0, rz1)
z_hi = max(rz0, rz1)
u0 = (x_lo - bx0) / dx
u1 = (x_hi - bx0) / dx
v0 = (z_lo - bz0) / dz
v1 = (z_hi - bz0) / dz
left = int(np.floor(np.clip(min(u0, u1), 0.0, 1.0) * w))
right = int(np.ceil(np.clip(max(u0, u1), 0.0, 1.0) * w))
top = int(np.floor((1.0 - np.clip(max(v0, v1), 0.0, 1.0)) * h))
bottom = int(np.ceil((1.0 - np.clip(min(v0, v1), 0.0, 1.0)) * h))
left = max(0, min(left, w - 1))
right = max(left + 1, min(right, w))
top = max(0, min(top, h - 1))
bottom = max(top + 1, min(bottom, h))
return img.crop((left, top, right, bottom))
@lru_cache(maxsize=8)
def _load_capture_icon_rgba(label: str) -> Image.Image | None:
letter = (label or "").strip().lower()
candidates: list[Path] = []
if len(letter) == 1 and "a" <= letter <= "z":
candidates.append(ICONS_DIR / f"capture_{letter}.png")
candidates.append(ICONS_DIR / "cap_icon.png")
for path in candidates:
if not path.exists():
continue
try:
return Image.open(path).convert("RGBA")
except Exception:
continue
return None
def _draw_capture_icon(out: Image.Image, label: str, px: int, py: int,
size: int, alpha: int = 120) -> None:
base = _load_capture_icon_rgba(label)
if base is None:
return
s = max(8, int(size))
icon = base.resize((s, s), Image.Resampling.LANCZOS)
if alpha < 255:
scale = alpha / 255.0
a_arr = np.asarray(icon.getchannel("A"), dtype=np.float32)
a_arr = np.clip(np.round(a_arr * scale), 0.0, 255.0).astype(np.uint8)
icon.putalpha(Image.fromarray(a_arr, mode="L"))
out.paste(icon, (px - s // 2, py - s // 2), icon)
def _pick_contrast_outline_color(img: Image.Image, sample_points: list[tuple[int, int]]) -> tuple[int, int, int]:
w, h = img.size
if not sample_points:
return (255, 255, 255)
pixels = img.load()
if pixels is None:
return (255, 255, 255)
lumas: list[float] = []
for x, y in sample_points:
if x < 0 or y < 0 or x >= w or y >= h:
continue
raw = pixels[x, y]
if isinstance(raw, int):
r = g = b = raw
elif isinstance(raw, tuple):
if len(raw) >= 3:
r, g, b = int(raw[0]), int(raw[1]), int(raw[2])
elif len(raw) == 1:
r = g = b = int(raw[0])
else:
continue
else:
continue
lumas.append(0.2126 * r + 0.7152 * g + 0.0722 * b)
if not lumas:
return (255, 255, 255)
avg = sum(lumas) / len(lumas)
return (255, 255, 255) if avg < 128.0 else (0, 0, 0)
def _draw_capture_areas(img: Image.Image, capture_areas: list[dict],
c0: list[float], c1: list[float]) -> Image.Image:
if not capture_areas:
return img
x0, z0 = float(c0[0]), float(c0[1])
x1, z1 = float(c1[0]), float(c1[1])
xr = x1 - x0
zr = z1 - z0
if xr == 0 or zr == 0:
return img
out = img.copy()
draw = ImageDraw.Draw(out)
canvas = out.size[0]
labels = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
stroke_w = 3
for idx, cap in enumerate(capture_areas):
cx = float(cap.get("x", 0.0))
cz = float(cap.get("z", 0.0))
px, py = _world_to_map_px(cx, cz, x0, z0, xr, zr, canvas)
outline_points = _cap_outline_points_px(cap, x0, z0, xr, zr, canvas)
label = labels[idx] if idx < len(labels) else str(idx + 1)
icon_size = 18
rp = 8
if len(outline_points) >= 3:
outline = (255, 255, 255)
draw.line(outline_points + [outline_points[0]], fill=outline, width=stroke_w)
# Keep icon area at ~1/4 of cap area (diamond area ~= s^2/2).
area2 = 0.0
n_pts = len(outline_points)
for i in range(n_pts):
x0p, y0p = outline_points[i]
x1p, y1p = outline_points[(i + 1) % n_pts]
area2 += float(x0p * y1p - x1p * y0p)
cap_area = abs(area2) * 0.5
if cap_area > 0.0:
icon_size = max(10, min(canvas, int(round(math.sqrt(cap_area / 2.0)))))
else:
rr = max(0.0, float(cap.get("radius", 0.0)))
rp = int(round(rr * ((canvas / xr + canvas / zr) * 0.5)))
rp = max(8, rp)
if px < -rp or py < -rp or px > canvas + rp or py > canvas + rp:
continue
ring_samples = [
(px + rp, py), (px - rp, py), (px, py + rp), (px, py - rp),
(px + int(rp * 0.707), py + int(rp * 0.707)),
(px - int(rp * 0.707), py + int(rp * 0.707)),
(px + int(rp * 0.707), py - int(rp * 0.707)),
(px - int(rp * 0.707), py - int(rp * 0.707)),
]
outline = (255, 255, 255)
draw.ellipse((px - rp, py - rp, px + rp, py + rp), outline=outline, width=stroke_w)
cap_area = math.pi * float(rp * rp)
icon_size = max(10, min(canvas, int(round(math.sqrt(cap_area / 2.0)))))
_draw_capture_icon(out, label, px, py, icon_size, alpha=90)
return out
def load_map_image(level_path: str, level_def: dict | None,
battle_type: str = "", level_settings_path: str = "",
base_coords: tuple[list[float], list[float]] | None = None,
render_coords: tuple[list[float], list[float]] | None = None,
canvas: int = CANVAS_MIN) -> Image.Image | None:
"""Load a local minimap PNG using level-def hints for tankmap naming."""
for map_key in _map_name_candidates(level_path, level_def, battle_type, level_settings_path):
path = MINIMAPS_DIR / f"{map_key}.png"
if path.exists():
print(f" Map image : {path.name}")
img = Image.open(path).convert("RGB")
if (
base_coords is not None
and render_coords is not None
and not _bounds_almost_equal(
base_coords[0], base_coords[1],
render_coords[0], render_coords[1],
)
):
img = _crop_map_to_world_bounds(
img,
base_coords[0], base_coords[1],
render_coords[0], render_coords[1],
)
print(f" Map crop : world X=[{render_coords[0][0]}, {render_coords[1][0]}] "
f"Z=[{render_coords[0][1]}, {render_coords[1][1]}]")
return img.resize((canvas, canvas), Image.Resampling.LANCZOS)
return None
def load_level_coords(level_path: str, session_id: int = 0) -> dict | None:
"""Load tankMapCoord0/1 from local .blkx files (including datamine clone)."""
del session_id # kept in signature for call-site compatibility
stem = Path(level_path).stem
candidates = [
LEVELS_DIR / f"{stem}.blkx",
LEVELS_DIR / "DATAMINE" / f"{stem}.blkx",
WT_LEVELS_DIR / f"{stem}.blkx",
]
for blkx in candidates:
if not blkx.exists():
continue
data = _load_json_file(blkx)
if not isinstance(data, dict):
continue
if "tankMapCoord0" in data and "tankMapCoord1" in data:
print(f" LevelDef : {blkx}")
return data
print(f" LevelDef missing tank coords: {blkx.name}")
return None
def select_tank_coords(level_def: dict, use_alt_map_coord: bool
) -> tuple[list[float], list[float], str]:
"""Pick tank coordinate bounds according to mission mode flags."""
if use_alt_map_coord:
ac0 = level_def.get("aiTanksMapCoord0")
ac1 = level_def.get("aiTanksMapCoord1")
if (
isinstance(ac0, list) and len(ac0) >= 2
and isinstance(ac1, list) and len(ac1) >= 2
):
return ac0, ac1, "aiTanksMapCoord"
tc0 = level_def.get("tankMapCoord0")
tc1 = level_def.get("tankMapCoord1")
if (
isinstance(tc0, list) and len(tc0) >= 2
and isinstance(tc1, list) and len(tc1) >= 2
):
return tc0, tc1, "tankMapCoord"
raise ValueError("LevelDef missing usable tank map coordinates")
# ── Coordinate transform ───────────────────────────────────────────────────────
class CoordTransform:
"""Transforms world coordinates (X, Z) to pixel coordinates on the canvas.
Args:
x0: World X origin (left edge of the map).
z0: World Z origin (bottom edge of the map).
x1: World X extent (right edge of the map).
z1: World Z extent (top edge of the map).
canvas: Canvas size in pixels (square).
"""
def __init__(self, x0: float = 0, z0: float = 0, x1: float = 4096, z1: float = 4096,
canvas: int = CANVAS_MIN):
self.x0 = x0
self.z0 = z0
self.x_range = x1 - x0
self.z_range = z1 - z0
self.canvas = canvas
def world_to_px(self, x: np.ndarray, z: np.ndarray):
"""Convert world X/Z arrays to pixel coordinates on the canvas.
Args:
x: World X positions as a numpy array.
z: World Z positions as a numpy array.
Returns:
Tuple of (px, py) int16 numpy arrays in pixel space.
"""
px = ((x - self.x0) / self.x_range * self.canvas).astype(np.int16)
py = ((self.z0 + self.z_range - z) / self.z_range * self.canvas).astype(np.int16)
return px, py
def point(self, x: float, z: float) -> tuple[int, int]:
xi, zi = self.world_to_px(np.array([x]), np.array([z]))
return int(np.clip(xi[0], 0, self.canvas - 1)), int(np.clip(zi[0], 0, self.canvas - 1))
# ── Pre-computation ────────────────────────────────────────────────────────────
def precompute_positions(players: list[dict], xfm: CoordTransform,
frame_times: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Interpolate player world positions onto the frame time grid and convert to pixels.
Args:
players: List of player dicts, each containing a '_samples' key with
time-stamped X/Z path data.
xfm: Coordinate transform for world-to-pixel conversion.
frame_times: 1-D array of frame timestamps in milliseconds.
Returns:
Tuple of (px_all, py_all) int16 arrays, shape (n_players, n_frames).
-1 indicates the player is absent at that frame.
"""
n_p, n_f = len(players), len(frame_times)
px_all = np.full((n_p, n_f), -1, dtype=np.int16)
py_all = np.full((n_p, n_f), -1, dtype=np.int16)
for i, p in enumerate(players):
s = p["_samples"]
t = np.array([x["Time"] for x in s], dtype=np.float64)
xa = np.array([x["X"] for x in s], dtype=np.float64)
za = np.array([x["Z"] for x in s], dtype=np.float64)
mask = (frame_times >= t[0]) & (frame_times <= t[-1])
if not mask.any():
continue
xi, zi = np.interp(frame_times[mask], t, xa), np.interp(frame_times[mask], t, za)
pxi, pyi = xfm.world_to_px(xi, zi)
in_bounds = (pxi >= 0) & (pxi < xfm.canvas) & (pyi >= 0) & (pyi < xfm.canvas)
full_mask = np.where(mask)[0][in_bounds]
px_all[i, full_mask] = pxi[in_bounds]
py_all[i, full_mask] = pyi[in_bounds]
return px_all, py_all
def precompute_altitudes(players: list[dict], frame_times: np.ndarray) -> np.ndarray:
"""Return (n_players, n_frames) int16 array of altitude in metres. -1 = absent."""
n_p, n_f = len(players), len(frame_times)
alt_all = np.full((n_p, n_f), -1, dtype=np.int16)
for i, p in enumerate(players):
s = p["_samples"]
t = np.array([x["Time"] for x in s], dtype=np.float64)
ya = np.array([x["Y"] for x in s], dtype=np.float64)
mask = (frame_times >= t[0]) & (frame_times <= t[-1])
if not mask.any():
continue
yi = np.interp(frame_times[mask], t, ya)
alt_all[i, mask] = np.clip(yi, 0, 32767).astype(np.int16)
return alt_all
def precompute_kills(kills: list[dict], xfm: CoordTransform,
t_start: float, ms_per_frame: float,
n_frames: int, fps: int,
font: ImageFont.FreeTypeFont | ImageFont.ImageFont,
offset_x: int = 0, offset_y: int = 0,
pid_pos: dict[int, tuple[np.ndarray, np.ndarray]] | None = None,
) -> list[list[tuple]]:
"""
Returns per-frame list of tuples:
(vx, vy, kx, ky, age_frac, label_sprite | None)
label_sprite is shown for the first half of the kill TTL then fades out.
offset_x/y: crop origin to shift coordinates into crop-space.
pid_pos: optional {PlayerID: (px_arr, py_arr)} for tracking moving entities.
When provided, kill lines follow the entity's interpolated position
each frame instead of using the static GOB snapshot position.
"""
out: list[list[tuple]] = [[] for _ in range(n_frames)]
kill_f = int(math.ceil(KILL_TTL * fps / 1000.0))
for k in kills:
vp = k.get("VictimPosition")
if not vp:
continue
kf = int((k["Time"] - t_start) / ms_per_frame)
if not (0 <= kf < n_frames):
continue
# Static fallback positions from the kill event
svx, svy = xfm.point(vp["X"], vp["Z"])
svx -= offset_x; svy -= offset_y
kp = k.get("KillerPosition")
if kp:
skx, sky = xfm.point(kp["X"], kp["Z"])
skx -= offset_x; sky -= offset_y
else:
skx, sky = -1, -1
# Look up tracked position arrays for killer/victim
kid = k.get("KillerID", 0)
vid = k.get("VictimID", 0)
k_tracked = pid_pos.get(kid) if pid_pos and kid else None
v_tracked = pid_pos.get(vid) if pid_pos and vid else None
killer_model = k.get("KillerModel", "")
weapon = k.get("Weapon", "")
label = make_kill_label(killer_model, weapon, font) if killer_model else None
for df in range(kill_f):
f = kf + df
if f >= n_frames:
break
# Use tracked position if available and valid at this frame
if v_tracked is not None and int(v_tracked[0][f]) >= 0:
vx, vy = int(v_tracked[0][f]), int(v_tracked[1][f])
else:
vx, vy = svx, svy
if k_tracked is not None and int(k_tracked[0][f]) >= 0:
kx, ky = int(k_tracked[0][f]), int(k_tracked[1][f])
else:
kx, ky = skx, sky
out[f].append((vx, vy, kx, ky, df / kill_f, label))
return out
DMG_TTL = 2_000 # Damage line display duration (ms, video time)
def precompute_damages(damages: list[dict], active: list[dict],
px_all: np.ndarray, py_all: np.ndarray,
t_start: float, ms_per_frame: float,
n_frames: int, fps: int,
) -> list[list[tuple]]:
"""Build per-frame damage line events from raw damage reports.
Each tuple is (ox, oy, vx, vy, age_frac) where positions are looked
up from px_all/py_all at the damage time.
Args:
damages: Raw damage report dicts from the GOB replay.
active: Active player dicts (used for PlayerID-to-index mapping).
px_all: Precomputed pixel X positions, shape (n_players, n_frames).
py_all: Precomputed pixel Y positions, shape (n_players, n_frames).
t_start: Replay start time in milliseconds.
ms_per_frame: Milliseconds per video frame.
n_frames: Total number of video frames.
fps: Frames per second.
Returns:
List of lists, one per frame, containing damage line tuples.
"""
pid_to_idx = {p["PlayerID"]: i for i, p in enumerate(active)}
out: list[list[tuple]] = [[] for _ in range(n_frames)]
dmg_f = int(math.ceil(DMG_TTL * fps / 1000.0))
for dr in damages:
off_id = dr.get("OffenderID", 0)
vic_id = dr.get("OffendedID", 0)
if off_id not in pid_to_idx or vic_id not in pid_to_idx:
continue
oi = pid_to_idx[off_id]
vi = pid_to_idx[vic_id]
kf = int((dr["Time"] - t_start) / ms_per_frame)
if not (0 <= kf < n_frames):
continue
ox, oy = int(px_all[oi, kf]), int(py_all[oi, kf])
vx, vy = int(px_all[vi, kf]), int(py_all[vi, kf])
if ox < 0 or oy < 0 or vx < 0 or vy < 0:
continue
for df in range(dmg_f):
f = kf + df
if f >= n_frames:
break
out[f].append((ox, oy, vx, vy, df / dmg_f))
return out
def precompute_deaths(active: list[dict], kills: list[dict],
t_start: float, ms_per_frame: float,
n_frames: int, fps: int,
px_all: np.ndarray, py_all: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Compute per-player death state, fade-to-black curve, and last known position.
Args:
active: Active ground player dicts.
kills: Kill event dicts from the replay.
t_start: Replay start time in ms.
ms_per_frame: Milliseconds per video frame.
n_frames: Total video frames.
fps: Frames per second.
px_all: Precomputed pixel X positions.
py_all: Precomputed pixel Y positions.
Returns:
Tuple of (is_dead, death_frame, death_fade, last_dead_px, last_dead_py).
"""
pid_to_idx = {p["PlayerID"]: i for i, p in enumerate(active)}
n_p = len(active)
ghost_f = max(1, int(GHOST_TTL * fps / 1000.0))
is_dead = np.zeros((n_p, n_frames), dtype=bool)
death_frame = np.full(n_p, n_frames, dtype=np.int32)
death_fade = np.zeros((n_p, n_frames), dtype=np.float32)
last_dead_px = np.full(n_p, -1, dtype=np.int16)
last_dead_py = np.full(n_p, -1, dtype=np.int16)
for k in kills:
vid = k.get("VictimID", 0)
if vid not in pid_to_idx:
continue
idx = pid_to_idx[vid]
kf = int((k["Time"] - t_start) / ms_per_frame)
if not (0 <= kf < n_frames):
continue
is_dead[idx, kf + 1:] = True
death_frame[idx] = kf
# Find last valid position at or before death
for f in range(kf, -1, -1):
if px_all[idx, f] >= 0:
last_dead_px[idx] = px_all[idx, f]
last_dead_py[idx] = py_all[idx, f]
break
# Fade from 1.0 (full color) to 0.0 (black) over ghost_f video frames
fade_len = min(ghost_f, n_frames - kf - 1)
death_fade[idx, kf + 1:kf + 1 + fade_len] = np.maximum(
0.0, 1.0 - np.arange(1, fade_len + 1) / ghost_f
)
# After fade completes, stays 0.0 (black)
return is_dead, death_frame, death_fade, last_dead_px, last_dead_py
def precompute_air_deaths(active: list[dict], kills: list[dict],
t_start: float, ms_per_frame: float,
n_frames: int, fps: int,
px_all: np.ndarray, py_all: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Death state for aircraft/drones: they keep moving after kill until path ends (crash).
Returns:
is_hit: (n, n_frames) bool — True from kill onward (for color fade)
hit_fade: (n, n_frames) float32 — 1→0 from kill to crash
is_crashed: (n, n_frames) bool — True only after path data ends
crash_frame: (n,) int32 — frame when path ends
last_px/py: (n,) int16 — position at crash (last valid path point)
"""
pid_to_idx: dict[int, int] = {}
eidx_to_idx: dict[int, int] = {}
for i, p in enumerate(active):
pid = p.get("PlayerID", 0)
if pid:
pid_to_idx[pid] = i
eidx = p.get("EntityIndex", 0)
if eidx:
eidx_to_idx[eidx] = i
n_p = len(active)
is_hit = np.zeros((n_p, n_frames), dtype=bool)
hit_fade = np.ones((n_p, n_frames), dtype=np.float32)
is_crashed = np.zeros((n_p, n_frames), dtype=bool)
crash_frame = np.full(n_p, n_frames, dtype=np.int32)
last_px = np.full(n_p, -1, dtype=np.int16)
last_py = np.full(n_p, -1, dtype=np.int16)
# Find last valid position frame for each entity
for i in range(n_p):
for f in range(n_frames - 1, -1, -1):
if px_all[i, f] >= 0:
crash_frame[i] = f
last_px[i] = px_all[i, f]
last_py[i] = py_all[i, f]
break
# Mark crashed after last valid frame
cf = int(crash_frame[i])
if cf < n_frames - 1:
is_crashed[i, cf + 1:] = True
ghost_f = max(1, int(GHOST_TTL * fps / 1000.0))
hit_set: set[int] = set()
for k in kills:
# Match by PlayerID first, then by EntityIndex (for drones with PlayerID=0)
vid = k.get("VictimID", 0)
idx = pid_to_idx.get(vid)
if idx is None:
veidx = k.get("VictimEntityIndex", 0)
idx = eidx_to_idx.get(veidx)
if idx is None:
continue
kf = int((k["Time"] - t_start) / ms_per_frame)
if not (0 <= kf < n_frames):
continue
hit_set.add(idx)
is_hit[idx, kf:] = True
# Fade from 1.0 → 0.0 between kill frame and crash frame
cf = int(crash_frame[idx])
fade_len = cf - kf
if fade_len > 0:
n_slots = min(fade_len + 1, n_frames - kf)
hit_fade[idx, kf:kf + n_slots] = np.maximum(
0.0, 1.0 - np.arange(n_slots, dtype=np.float32) / fade_len
)
hit_fade[idx, cf + 1:] = 0.0
# For entities not matched by any kill (e.g. drones with PlayerID=0),
# fade to black over GHOST_TTL before their path ends (crash)
for i in range(n_p):
if i in hit_set:
continue
cf = int(crash_frame[i])
if cf >= n_frames:
continue # path never ends in this clip
fade_start = max(0, cf - ghost_f)
fade_len = cf - fade_start
if fade_len > 0:
is_hit[i, fade_start:] = True
n_slots = min(fade_len + 1, n_frames - fade_start)
hit_fade[i, fade_start:fade_start + n_slots] = np.maximum(
0.0, 1.0 - np.arange(n_slots, dtype=np.float32) / fade_len
)
hit_fade[i, cf + 1:] = 0.0
return is_hit, hit_fade, is_crashed, crash_frame, last_px, last_py
def find_team_wipe_frame(active: list[dict], is_dead: np.ndarray, n_frames: int) -> int:
"""Find the first frame where all players on any one team are dead.
Args:
active: Active player dicts (must contain a 'Team' key).
is_dead: Boolean array, shape (n_players, n_frames).
n_frames: Total video frames.
Returns:
Frame index of the first team wipe, or n_frames if none occurs.
"""
team_groups: dict[int, list[int]] = {}
for i, p in enumerate(active):
team_groups.setdefault(p["Team"], []).append(i)
for f in range(n_frames):
for idxs in team_groups.values():
if idxs and all(is_dead[idx, f] for idx in idxs):
return f
return n_frames
def build_drone_list(d: dict, ground_active: list[dict], xfm: CoordTransform,
px_ground: np.ndarray, py_ground: np.ndarray,
frame_times: np.ndarray, t_start: float,
ms_per_frame: float, colors_arr: np.ndarray) -> list[dict]:
n_frames = len(frame_times)
drones = []
for e in d["Entities"]:
if "ucav" not in e["ModelName"].lower() or not e["Path"]:
continue
spawn_t = e["Path"][0]["Time"]
spawn_f = max(0, min(int((spawn_t - t_start) / ms_per_frame), n_frames - 1))
spx, spy = xfm.point(e["Path"][0]["X"], e["Path"][0]["Z"])
best_idx, best_dist = 0, float("inf")
for i in range(len(ground_active)):
pxi, pyi = int(px_ground[i, spawn_f]), int(py_ground[i, spawn_f])
if pxi < 0:
continue
dist = (pxi - spx) ** 2 + (pyi - spy) ** 2
if dist < best_dist:
best_dist, best_idx = dist, i
color = (colors_arr[best_idx].astype(np.float32) * 0.75).astype(np.uint8)
drones.append({"entity": e, "color": color,
"_samples": e["Path"], "EntityIndex": e["EntityIndex"]})
return drones
# ── Circle masks ───────────────────────────────────────────────────────────────
def make_circle_masks(r: int) -> tuple[np.ndarray, np.ndarray]:
y, x = np.ogrid[-r:r+1, -r:r+1]
mask = x*x + y*y <= r*r
ys, xs = np.where(mask)
return (ys - r).astype(np.int32), (xs - r).astype(np.int32)
SHADOW_DY, SHADOW_DX = make_circle_masks(DOT_R + 1)
DOT_DY, DOT_DX = make_circle_masks(DOT_R)
DRONE_DY, DRONE_DX = make_circle_masks(DRONE_R)
def make_triangle_masks(r: int) -> tuple[np.ndarray, np.ndarray]:
"""Downward-pointing triangle of half-size r. Returns (dy, dx) offsets."""
pts = []
for y in range(-r, r + 1):
# width narrows linearly from full at top (-r) to point at bottom (+r)
half_w = int(r * (r - y) / (2 * r)) if r else 0
for x in range(-half_w, half_w + 1):
pts.append((y, x))
if not pts:
pts = [(0, 0)]
arr = np.array(pts, dtype=np.int32)
return arr[:, 0], arr[:, 1]
AIR_DY, AIR_DX = make_triangle_masks(AIR_R)
_TRAIL_DY = np.array([0, 0, 1, 1], dtype=np.int32)
_TRAIL_DX = np.array([0, 1, 0, 1], dtype=np.int32)
# ── Sprite factories ───────────────────────────────────────────────────────────
def _render_rgba(w: float, h: float, draw_fn) -> np.ndarray:
img = Image.new("RGBA", (int(w), int(h)), (0, 0, 0, 0))
draw_fn(ImageDraw.Draw(img))
return np.asarray(img).copy()
def make_name_sprites(names: list[str],
font: ImageFont.FreeTypeFont | ImageFont.ImageFont) -> list[Sprite]:
"""Render a list of text strings into premultiplied Sprite objects.
Args:
names: Text labels to render.
font: PIL font used for rendering.
Returns:
List of Sprite objects, one per name.
"""
dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1)))
PAD = 3 # padding each side (covers stroke_width=1 + margin)
out = []
for name in names:
bb = dummy.textbbox((0, 0), name, font=font)
w, h = bb[2] - bb[0] + PAD * 2, bb[3] - bb[1] + PAD * 2
def _draw(d, _n=name, _x=bb[0], _y=bb[1]):
d.text((PAD - _x, PAD - _y), _n, font=font,
fill=(255, 255, 255, 220),
stroke_width=1, stroke_fill=(0, 0, 0, 200))
out.append(make_sprite(_render_rgba(w, h, _draw)))
return out
def merge_sprites(top: Sprite, bottom: Sprite, gap: int = -2) -> Sprite:
"""Stack two sprites vertically with a gap, return a single combined sprite."""
w = max(top.w, bottom.w)
h = top.h + gap + bottom.h
pm = np.zeros((h, w, 3), dtype=np.uint8)
pm16 = np.zeros((h, w, 3), dtype=np.uint16)
ia = np.full((h, w, 1), 255, dtype=np.uint16)
# top sprite
pm[:top.h, :top.w] = top.pm
pm16[:top.h, :top.w] = top.pm16
ia[:top.h, :top.w] = top.ia
# bottom sprite
y_off = top.h + gap
pm[y_off:y_off + bottom.h, :bottom.w] = bottom.pm
pm16[y_off:y_off + bottom.h, :bottom.w] = bottom.pm16
ia[y_off:y_off + bottom.h, :bottom.w] = bottom.ia
return Sprite(pm=pm, pm16=pm16, ia=ia, h=h, w=w)
def _short_model(model_name: str) -> str:
"""Return human-readable vehicle name, falling back to internal ID."""
internal = model_name.split("/")[-1]
return _translate_vehicle(internal)
def make_kill_label(killer_model: str, weapon: str,
font: ImageFont.FreeTypeFont | ImageFont.ImageFont) -> Sprite:
"""Render a kill event label showing the killer's vehicle and weapon.
Args:
killer_model: Internal model path of the killer vehicle.
weapon: Internal weapon identifier.
font: PIL font used for rendering.
Returns:
A Sprite containing the rendered kill label text.
"""
text = f"{_short_model(killer_model)} [{_translate_weapon(weapon)}]"
dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1)))
bb = dummy.textbbox((0, 0), text, font=font)
PAD = 3
w, h = bb[2] - bb[0] + PAD * 2, bb[3] - bb[1] + PAD * 2
def _draw(d, _x=bb[0], _y=bb[1]):
d.text((PAD - _x, PAD - _y), text, font=font,
fill=(255, 230, 100, 230),
stroke_width=1, stroke_fill=(0, 0, 0, 200))
return make_sprite(_render_rgba(w, h, _draw))
def make_hud_sprite(team_won: int,
font: ImageFont.FreeTypeFont | ImageFont.ImageFont) -> Sprite:
"""Render a HUD overlay sprite displaying which team won.
Args:
team_won: Winning team index.
font: PIL font used for rendering.
Returns:
A Sprite containing the "Team N wins" text.
"""
text = f"Team {team_won} wins"
dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1)))
bb = dummy.textbbox((0, 0), text, font=font)
PAD = 3
w, h = bb[2] - bb[0] + PAD * 2, bb[3] - bb[1] + PAD * 2
def _draw(d, _x=bb[0], _y=bb[1]):
d.text((PAD - _x, PAD - _y), text, font=font,
fill=(160, 255, 120, 230),
stroke_width=1, stroke_fill=(0, 0, 0, 180))
return make_sprite(_render_rgba(w, h, _draw))
def make_time_sprites(max_secs: int, speed: float,
font: ImageFont.FreeTypeFont | ImageFont.ImageFont,
) -> dict[int, Sprite]:
dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1)))
out: dict[int, Sprite] = {}
for sec in range(max_secs + 1):
m, s = divmod(sec, 60)
text = f"{m:02d}:{s:02d} ×{speed:.0f}"
bb = dummy.textbbox((0, 0), text, font=font)
w, h = bb[2] - bb[0] + 4, bb[3] - bb[1] + 4
def _draw(d, _t=text):
d.text((2, 2), _t, font=font, fill=(210, 210, 210, 230))
out[sec] = make_sprite(_render_rgba(w, h, _draw))
return out
# ── Drawing helpers ────────────────────────────────────────────────────────────
def draw_all_trails_np(buf: np.ndarray, fi: int,
px_all: np.ndarray, py_all: np.ndarray,
trail_f: int, trail_colors_arr: np.ndarray,
is_dead: np.ndarray, death_frame: np.ndarray,
canvas: int = CANVAS_MIN) -> None:
n_players = px_all.shape[0]
all_vx, all_vy, all_cols = [], [], []
for pi in range(n_players):
# Dead players: anchor trail at death frame; alive: current frame
if is_dead[pi, fi]:
ef = int(death_frame[pi])
else:
ef = fi
start = max(0, ef - trail_f)
pxs = px_all[pi, start:ef + 1]
pys = py_all[pi, start:ef + 1]
valid = (pxs >= 0) & (pys >= 0)
if not valid.any():
continue
vx = pxs[valid].astype(np.int32)
vy = pys[valid].astype(np.int32)
trail_len = ef + 1 - start
ti = np.where(valid)[0]
bright = ((ti + 1).astype(np.float32) / trail_len) ** 0.3
faded = (trail_colors_arr[pi] * bright[:, None]).astype(np.uint8)
all_vx.append(vx)
all_vy.append(vy)
all_cols.append(faded)
if not all_vx:
return
vx = np.concatenate(all_vx)
vy = np.concatenate(all_vy)
cols = np.concatenate(all_cols)
all_ys = np.clip(vy[:, None] + _TRAIL_DY, 0, canvas - 1).ravel()
all_xs = np.clip(vx[:, None] + _TRAIL_DX, 0, canvas - 1).ravel()
buf[all_ys, all_xs] = np.repeat(cols, 4, axis=0)
def draw_all_dots_np(buf: np.ndarray, fi: int,
px_all: np.ndarray, py_all: np.ndarray,
colors_arr: np.ndarray, shadow_color: np.ndarray,
is_dead: np.ndarray | None = None,
shadow_dy: np.ndarray = SHADOW_DY, shadow_dx: np.ndarray = SHADOW_DX,
dot_dy: np.ndarray = DOT_DY, dot_dx: np.ndarray = DOT_DX,
canvas: int = CANVAS_MIN) -> None:
cxs = px_all[:, fi].astype(np.int32)
cys = py_all[:, fi].astype(np.int32)
valid = (cxs >= 0) & (cys >= 0)
if is_dead is not None:
valid &= ~is_dead[:, fi]
if not valid.any():
return
cx = cxs[valid]; cy = cys[valid]
sy = np.clip(cy[:, None] + shadow_dy, 0, canvas - 1)
sx = np.clip(cx[:, None] + shadow_dx, 0, canvas - 1)
buf[sy.ravel(), sx.ravel()] = shadow_color
nd = len(dot_dy)
dy = np.clip(cy[:, None] + dot_dy, 0, canvas - 1)
dx = np.clip(cx[:, None] + dot_dx, 0, canvas - 1)
buf[dy.ravel(), dx.ravel()] = np.repeat(colors_arr[valid], nd, axis=0)
def draw_dead_dots_np(buf: np.ndarray, fi: int,
colors_arr: np.ndarray,
is_dead: np.ndarray, death_fade: np.ndarray,
last_dead_px: np.ndarray, last_dead_py: np.ndarray,
dot_dy: np.ndarray = DOT_DY, dot_dx: np.ndarray = DOT_DX,
canvas: int = CANVAS_MIN,
) -> None:
"""Draw dead player dots: fade from color to black, then stay black."""
dead = is_dead[:, fi]
if not dead.any():
return
cxs = last_dead_px[dead].astype(np.int32)
cys = last_dead_py[dead].astype(np.int32)
valid = (cxs >= 0) & (cys >= 0)
if not valid.any():
return
cx, cy = cxs[valid], cys[valid]
fade = death_fade[dead, fi][valid]
cols = (colors_arr[dead][valid].astype(np.float32) * fade[:, None]).astype(np.uint8)
nd = len(dot_dy)
dy = np.clip(cy[:, None] + dot_dy, 0, canvas - 1)
dx = np.clip(cx[:, None] + dot_dx, 0, canvas - 1)
buf[dy.ravel(), dx.ravel()] = np.repeat(cols, nd, axis=0)
def draw_drone_dots_np(buf: np.ndarray, fi: int,
px_drone: np.ndarray, py_drone: np.ndarray,
drone_colors: np.ndarray,
is_crashed: np.ndarray | None = None,
hit_fade: np.ndarray | None = None,
drone_dy: np.ndarray = DRONE_DY, drone_dx: np.ndarray = DRONE_DX,
canvas: int = CANVAS_MIN) -> None:
cxs = px_drone[:, fi].astype(np.int32)
cys = py_drone[:, fi].astype(np.int32)
valid = (cxs >= 0) & (cys >= 0)
if is_crashed is not None:
valid &= ~is_crashed[:, fi]
if not valid.any():
return
cx = cxs[valid]; cy = cys[valid]
cols = drone_colors[valid]
if hit_fade is not None:
fade = hit_fade[valid, fi]
cols = (cols.astype(np.float32) * fade[:, None]).astype(np.uint8)
nd = len(drone_dy)
dy = np.clip(cy[:, None] + drone_dy, 0, canvas - 1)
dx = np.clip(cx[:, None] + drone_dx, 0, canvas - 1)
buf[dy.ravel(), dx.ravel()] = np.repeat(cols, nd, axis=0)
def draw_aircraft_np(buf: np.ndarray, fi: int,
px_air: np.ndarray, py_air: np.ndarray,
air_colors: np.ndarray,
is_crashed: np.ndarray | None = None,
hit_fade: np.ndarray | None = None,
air_dy: np.ndarray = AIR_DY, air_dx: np.ndarray = AIR_DX,
canvas: int = CANVAS_MIN) -> None:
"""Draw aircraft as small triangles, fading to black after hit."""
cxs = px_air[:, fi].astype(np.int32)
cys = py_air[:, fi].astype(np.int32)
valid = (cxs >= 0) & (cys >= 0)
if is_crashed is not None:
valid &= ~is_crashed[:, fi]
if not valid.any():
return
cx = cxs[valid]; cy = cys[valid]
cols = air_colors[valid]
if hit_fade is not None:
fade = hit_fade[valid, fi]
cols = (cols.astype(np.float32) * fade[:, None]).astype(np.uint8)
nd = len(air_dy)
dy = np.clip(cy[:, None] + air_dy, 0, canvas - 1)
dx = np.clip(cx[:, None] + air_dx, 0, canvas - 1)
buf[dy.ravel(), dx.ravel()] = np.repeat(cols, nd, axis=0)
def draw_dead_entities_np(buf: np.ndarray, fi: int,
colors_arr: np.ndarray,
is_dead: np.ndarray, death_fade: np.ndarray,
last_dead_px: np.ndarray, last_dead_py: np.ndarray,
mask_dy: np.ndarray, mask_dx: np.ndarray,
canvas: int = CANVAS_MIN) -> None:
"""Draw dead entity markers (circle or triangle) fading from color to black."""
dead = is_dead[:, fi]
if not dead.any():
return
cxs = last_dead_px[dead].astype(np.int32)
cys = last_dead_py[dead].astype(np.int32)
valid = (cxs >= 0) & (cys >= 0)
if not valid.any():
return
cx, cy = cxs[valid], cys[valid]
fade = death_fade[dead, fi][valid]
cols = (colors_arr[dead][valid].astype(np.float32) * fade[:, None]).astype(np.uint8)
nd = len(mask_dy)
dy = np.clip(cy[:, None] + mask_dy, 0, canvas - 1)
dx = np.clip(cx[:, None] + mask_dx, 0, canvas - 1)
buf[dy.ravel(), dx.ravel()] = np.repeat(cols, nd, axis=0)
def draw_icon_sprites(buf: np.ndarray, fi: int,
px_all: np.ndarray, py_all: np.ndarray,
icon_sprites: list,
is_dead: np.ndarray | None = None,
is_crashed: np.ndarray | None = None,
hit_fade: np.ndarray | None = None,
rot_caches: list | None = None,
headings: np.ndarray | None = None) -> None:
"""Draw per-entity icon sprites instead of circles/triangles."""
n = px_all.shape[0]
for i in range(n):
px_i, py_i = int(px_all[i, fi]), int(py_all[i, fi])
if px_i < 0 or py_i < 0:
continue
if is_dead is not None and is_dead[i, fi]:
continue
if is_crashed is not None and is_crashed[i, fi]:
continue
if rot_caches and headings is not None and headings[i, fi] >= 0:
spr = rot_caches[i][heading_to_rot_index(headings[i, fi])]
else:
spr = icon_sprites[i]
alpha = 1.0
if hit_fade is not None:
alpha = float(hit_fade[i, fi])
if alpha <= 0:
continue
x, y = px_i - spr.w // 2, py_i - spr.h // 2
if alpha < 1.0:
blit_alpha(buf, spr, x, y, alpha)
else:
blit(buf, spr, x, y)
def draw_dead_icon_sprites(buf: np.ndarray, fi: int,
icon_sprites: list,
dead_sprites: list,
is_dead: np.ndarray, death_fade: np.ndarray | None,
last_dead_px: np.ndarray,
last_dead_py: np.ndarray,
rot_caches: list | None = None,
dead_rot_caches: list | None = None,
headings: np.ndarray | None = None) -> None:
"""Draw dead entity icons fading from color to black at their last known position."""
n = len(icon_sprites)
for i in range(n):
if not is_dead[i, fi]:
continue
px_i, py_i = int(last_dead_px[i]), int(last_dead_py[i])
if px_i < 0 or py_i < 0:
continue
# Pick rotated sprite if available (use last valid heading)
if rot_caches and dead_rot_caches and headings is not None and headings[i, fi] >= 0:
ri = heading_to_rot_index(headings[i, fi])
spr = rot_caches[i][ri]
dspr = dead_rot_caches[i][ri]
else:
spr = icon_sprites[i]
dspr = dead_sprites[i]
x, y = px_i - spr.w // 2, py_i - spr.h // 2
blit(buf, dspr, x, y)
if death_fade is not None:
fade = float(death_fade[i, fi])
else:
fade = 0.0
if fade > 0.0:
blit_alpha(buf, spr, x, y, fade)
def draw_air_trails_np(buf: np.ndarray, fi: int,
px_all: np.ndarray, py_all: np.ndarray,
trail_f: int, trail_colors: np.ndarray,
is_crashed: np.ndarray | None, crash_frame: np.ndarray | None,
canvas: int = CANVAS_MIN) -> None:
"""Short trails for aircraft/drones with line interpolation between frames.
Anchors at crash_frame once crashed (path ended)."""
n_ents = px_all.shape[0]
all_vx, all_vy, all_cols = [], [], []
CM = canvas - 1
for ei in range(n_ents):
if is_crashed is not None and crash_frame is not None and is_crashed[ei, fi]:
ef = int(crash_frame[ei])
else:
ef = fi
start = max(0, ef - trail_f)
pxs = px_all[ei, start:ef + 1]
pys = py_all[ei, start:ef + 1]
valid = (pxs >= 0) & (pys >= 0)
if not valid.any():
continue
vx = pxs[valid].astype(np.int32)
vy = pys[valid].astype(np.int32)
trail_len = ef + 1 - start
ti = np.where(valid)[0]
# Interpolate lines between consecutive valid points to fill gaps
if len(vx) >= 2:
seg_vx, seg_vy, seg_bright = [], [], []
for si in range(len(vx) - 1):
x0, y0, x1, y1 = vx[si], vy[si], vx[si + 1], vy[si + 1]
dist = max(abs(x1 - x0), abs(y1 - y0))
if dist <= 1:
seg_vx.append(x0)
seg_vy.append(y0)
seg_bright.append((ti[si] + 1) / trail_len)
else:
n_pts = min(dist, 64) # cap to avoid huge arrays
t = np.arange(n_pts, dtype=np.float32) / n_pts
seg_vx.append(np.clip((x0 + (x1 - x0) * t).astype(np.int32), 0, CM))
seg_vy.append(np.clip((y0 + (y1 - y0) * t).astype(np.int32), 0, CM))
b0 = (ti[si] + 1) / trail_len
b1 = (ti[si + 1] + 1) / trail_len
seg_bright.append(b0 + (b1 - b0) * t)
# Last point
seg_vx.append(vx[-1])
seg_vy.append(vy[-1])
seg_bright.append((ti[-1] + 1) / trail_len)
vx_interp = np.concatenate([np.atleast_1d(s) for s in seg_vx])
vy_interp = np.concatenate([np.atleast_1d(s) for s in seg_vy])
bright = np.concatenate([np.atleast_1d(s) for s in seg_bright])
else:
vx_interp, vy_interp = vx, vy
bright = np.array([(ti[0] + 1) / trail_len], dtype=np.float32)
bright = bright ** 0.3
faded = (trail_colors[ei] * bright[:, None]).astype(np.uint8)
all_vx.append(vx_interp)
all_vy.append(vy_interp)
all_cols.append(faded)
if not all_vx:
return
vx = np.concatenate(all_vx)
vy = np.concatenate(all_vy)
cols = np.concatenate(all_cols)
all_ys = np.clip(vy[:, None] + _TRAIL_DY, 0, canvas - 1).ravel()
all_xs = np.clip(vx[:, None] + _TRAIL_DX, 0, canvas - 1).ravel()
buf[all_ys, all_xs] = np.repeat(cols, 4, axis=0)
def draw_kill_events(buf: np.ndarray, events: list[tuple],
kill_target_spr: Sprite | None = None,
canvas: int = CANVAS_MIN) -> None:
CM = canvas - 1
full_line = np.array([255, 30, 30], dtype=np.float32)
_LINE_OFFSETS = [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]
for (vx, vy, kx, ky, age_frac, label) in events:
alpha = 1.0 - age_frac
# Kill line from killer to victim
if kx >= 0 and ky >= 0:
pts = max(abs(vx - kx), abs(vy - ky)) + 1
if pts >= 2:
t = np.arange(pts, dtype=np.float32) / (pts - 1)
cx = (kx + (vx - kx) * t).astype(np.int32)
cy = (ky + (vy - ky) * t).astype(np.int32)
for odx, ody in _LINE_OFFSETS:
xs = np.clip(cx + odx, 0, CM)
ys = np.clip(cy + ody, 0, CM)
bg = buf[ys, xs].astype(np.float32)
buf[ys, xs] = (bg + (full_line - bg) * alpha).astype(np.uint8)
# Target icon at victim position (replaces starburst)
if kill_target_spr is not None:
blit_alpha(buf, kill_target_spr,
vx - kill_target_spr.w // 2, vy - kill_target_spr.h // 2,
alpha)
# Label
if label is not None:
label_alpha = 1.0 if age_frac < 0.8 else max(0.0, 1.0 - (age_frac - 0.8) / 0.2)
blit_alpha(buf, label, vx - label.w // 2, vy - label.h - 14, label_alpha)
def draw_damage_events(buf: np.ndarray, events: list[tuple],
dmg_target_spr: Sprite | None = None,
canvas: int = CANVAS_MIN) -> None:
CM = canvas - 1
full_line = np.array([255, 255, 80], dtype=np.float32)
for (ox, oy, vx, vy, age_frac) in events:
alpha = 1.0 - age_frac
pts = max(abs(vx - ox), abs(vy - oy)) + 1
if pts >= 2:
t = np.arange(pts, dtype=np.float32) / (pts - 1)
xs = np.clip((ox + (vx - ox) * t).astype(np.int32), 0, CM)
ys = np.clip((oy + (vy - oy) * t).astype(np.int32), 0, CM)
bg = buf[ys, xs].astype(np.float32)
buf[ys, xs] = (bg + (full_line - bg) * alpha).astype(np.uint8)
# Target icon at victim position
if dmg_target_spr is not None:
blit_alpha(buf, dmg_target_spr,
vx - dmg_target_spr.w // 2, vy - dmg_target_spr.h // 2,
alpha)
# ── Per-frame render for one VideoCtx ─────────────────────────────────────────
def render_one_ctx(fi: int, buf: np.ndarray, ctx: VideoCtx,
shadow_color: np.ndarray, trail_f: int) -> None:
"""Render a single video frame into the provided buffer.
Draws background, trails, damage/kill events, dots (alive + dead),
drones, aircraft, and player name labels in compositing order.
Args:
fi: Frame index to render.
buf: Mutable RGB numpy array, shape (canvas, canvas, 3).
ctx: Pre-computed VideoCtx with all per-frame data.
shadow_color: RGB color for dot shadows, shape (3,).
trail_f: Number of trailing frames to draw behind each player.
"""
np.copyto(buf, ctx.bg_arr)
cv = ctx.canvas
# Ground trails
draw_all_trails_np(buf, fi, ctx.px_all, ctx.py_all, trail_f, ctx.trail_colors_arr,
ctx.is_dead, ctx.death_frame, cv)
# Drone trails (very short, line-interpolated)
if ctx.n_drones and ctx.drone_trail_colors is not None:
assert ctx.px_drone is not None and ctx.py_drone is not None
draw_air_trails_np(buf, fi, ctx.px_drone, ctx.py_drone, ctx.drone_trail_f,
ctx.drone_trail_colors, ctx.drone_is_crashed, ctx.drone_crash_frame, cv)
# Aircraft trails (short, line-interpolated)
if ctx.n_aircraft and ctx.air_trail_colors is not None:
assert ctx.px_air is not None and ctx.py_air is not None
draw_air_trails_np(buf, fi, ctx.px_air, ctx.py_air, ctx.air_trail_f,
ctx.air_trail_colors, ctx.air_is_crashed, ctx.air_crash_frame, cv)
if ctx.damages_by_frame[fi]:
draw_damage_events(buf, ctx.damages_by_frame[fi], ctx.dmg_target_spr, cv)
if ctx.kills_by_frame[fi]:
draw_kill_events(buf, ctx.kills_by_frame[fi], ctx.kill_target_spr, cv)
# Ground: dead icons + alive icons
if ctx.player_icon_sprites:
draw_dead_icon_sprites(buf, fi, ctx.player_icon_sprites,
ctx.player_dead_sprites,
ctx.is_dead, None,
ctx.last_dead_px, ctx.last_dead_py)
draw_icon_sprites(buf, fi, ctx.px_all, ctx.py_all,
ctx.player_icon_sprites, is_dead=ctx.is_dead)
else:
draw_dead_dots_np(buf, fi, ctx.colors_arr, ctx.is_dead, ctx.death_fade,
ctx.last_dead_px, ctx.last_dead_py,
ctx.dot_dy, ctx.dot_dx, cv)
draw_all_dots_np(buf, fi, ctx.px_all, ctx.py_all,
ctx.colors_arr, shadow_color, ctx.is_dead,
ctx.shadow_dy, ctx.shadow_dx, ctx.dot_dy, ctx.dot_dx, cv)
# Drones: freeze at hit position, fade to transparent over ~2-3s
if ctx.n_drones:
assert ctx.px_drone is not None and ctx.py_drone is not None
assert ctx.drone_colors_arr is not None
if ctx.drone_icon_sprites:
n_dr = ctx.px_drone.shape[0]
for di in range(n_dr):
is_hit = ctx.drone_is_hit is not None and ctx.drone_is_hit[di, fi]
if is_hit:
# Frozen at hit position, fading out
if ctx.drone_last_dead_px is None or ctx.drone_last_dead_py is None or ctx.drone_hit_fade is None:
continue
dpx, dpy = int(ctx.drone_last_dead_px[di]), int(ctx.drone_last_dead_py[di])
fade = float(ctx.drone_hit_fade[di, fi])
if fade <= 0 or dpx < 0 or dpy < 0:
continue
else:
# Alive — draw at current position
dpx, dpy = int(ctx.px_drone[di, fi]), int(ctx.py_drone[di, fi])
fade = 1.0
if dpx < 0 or dpy < 0:
continue
if ctx.drone_rot_caches and ctx.drone_headings is not None and ctx.drone_headings[di, fi] >= 0:
spr = ctx.drone_rot_caches[di][heading_to_rot_index(ctx.drone_headings[di, fi])]
else:
spr = ctx.drone_icon_sprites[di]
x, y = dpx - spr.w // 2, dpy - spr.h // 2
if fade < 1.0:
blit_alpha(buf, spr, x, y, fade)
else:
blit(buf, spr, x, y)
else:
# Fallback to dots if no icon sprites
if ctx.drone_is_crashed is not None:
draw_dead_entities_np(buf, fi, ctx.drone_colors_arr,
ctx.drone_is_crashed, ctx.drone_hit_fade, # type: ignore[arg-type]
ctx.drone_last_dead_px, ctx.drone_last_dead_py, # type: ignore[arg-type]
ctx.drone_dy, ctx.drone_dx, cv)
draw_drone_dots_np(buf, fi, ctx.px_drone, ctx.py_drone, ctx.drone_colors_arr,
ctx.drone_is_crashed, ctx.drone_hit_fade,
ctx.drone_dy, ctx.drone_dx, cv)
# Aircraft: dead icons + alive icons
if ctx.n_aircraft:
assert ctx.px_air is not None and ctx.py_air is not None
assert ctx.air_colors_arr is not None
if ctx.air_icon_sprites:
if ctx.air_is_crashed is not None:
draw_dead_icon_sprites(buf, fi, ctx.air_icon_sprites,
ctx.air_dead_sprites,
ctx.air_is_crashed, None,
ctx.air_last_dead_px, ctx.air_last_dead_py, # type: ignore[arg-type]
rot_caches=ctx.air_rot_caches,
dead_rot_caches=ctx.air_dead_rot_caches,
headings=ctx.air_headings)
draw_icon_sprites(buf, fi, ctx.px_air, ctx.py_air,
ctx.air_icon_sprites,
is_crashed=ctx.air_is_crashed,
hit_fade=ctx.air_hit_fade,
rot_caches=ctx.air_rot_caches,
headings=ctx.air_headings)
else:
if ctx.air_is_crashed is not None:
draw_dead_entities_np(buf, fi, ctx.air_colors_arr,
ctx.air_is_crashed, ctx.air_hit_fade, # type: ignore[arg-type]
ctx.air_last_dead_px, ctx.air_last_dead_py, # type: ignore[arg-type]
ctx.air_dy, ctx.air_dx, cv)
draw_aircraft_np(buf, fi, ctx.px_air, ctx.py_air, ctx.air_colors_arr,
ctx.air_is_crashed, ctx.air_hit_fade,
ctx.air_dy, ctx.air_dx, cv)
items: list[tuple[Sprite, int, int]] = []
for i in range(ctx.n_players):
px_i, py_i = int(ctx.px_all[i, fi]), int(ctx.py_all[i, fi])
if px_i >= 0 and py_i >= 0:
ls = ctx.label_sprites[i]
icon_w = ctx.player_icon_sprites[i].w if ctx.player_icon_sprites else ctx.dot_r * 2
items.append((ls, px_i + icon_w // 2 + 3, py_i - ls.h // 2))
if ctx.n_drones:
assert ctx.px_drone is not None and ctx.py_drone is not None
for i in range(ctx.n_drones):
px_i, py_i = int(ctx.px_drone[i, fi]), int(ctx.py_drone[i, fi])
if px_i >= 0 and py_i >= 0:
if ctx.drone_is_crashed is not None and ctx.drone_is_crashed[i, fi]:
continue
ds = ctx.drone_sprites[i]
items.append((ds, px_i + ctx.drone_r + 2, py_i - ds.h // 2))
if ctx.n_aircraft:
assert ctx.px_air is not None and ctx.py_air is not None
assert ctx.air_alt is not None
for i in range(ctx.n_aircraft):
px_i, py_i = int(ctx.px_air[i, fi]), int(ctx.py_air[i, fi])
if px_i >= 0 and py_i >= 0:
if ctx.air_is_crashed is not None and ctx.air_is_crashed[i, fi]:
continue
# Name+model label to the right
ls = ctx.air_sprites[i]
air_icon_w = ctx.air_icon_sprites[i].w if ctx.air_icon_sprites else ctx.air_r * 2
items.append((ls, px_i + air_icon_w // 2 + 3, py_i - ls.h // 2))
# Altitude label to the left
alt_m = int(ctx.air_alt[i, fi])
if alt_m >= 0:
alt_key = alt_m // 10 * 10
alt_spr = ctx.air_alt_sprites.get(alt_key)
if alt_spr is not None:
items.append((alt_spr, px_i - air_icon_w // 2 - 3 - alt_spr.w, py_i - alt_spr.h // 2))
if items:
blit_batch(buf, items)
def _get_thread_buf(tmpl: np.ndarray) -> np.ndarray:
"""Thread-local render buffer."""
if not hasattr(_tl, "buf"):
_tl.buf = np.empty_like(tmpl)
return _tl.buf
# ── FFmpeg / font ──────────────────────────────────────────────────────────────
def open_ffmpeg(output_path: Path, fps: int, w: int = CANVAS_MIN, h: int = CANVAS_MIN) -> subprocess.Popen:
"""Open an FFmpeg subprocess that reads raw RGB24 frames from stdin and encodes to H.264 MP4.
Args:
output_path: Destination MP4 file path.
fps: Frames per second for the output video.
w: Frame width in pixels.
h: Frame height in pixels.
Returns:
A Popen handle whose stdin accepts raw RGB24 frame bytes.
"""
return subprocess.Popen([
"ffmpeg", "-y",
"-f", "rawvideo", "-vcodec", "rawvideo",
"-s", f"{w}x{h}", "-pix_fmt", "rgb24",
"-r", str(fps), "-i", "pipe:0",
"-vcodec", "libx264", "-preset", "ultrafast",
"-crf", "34", "-pix_fmt", "yuv420p", "-threads", "4",
"-movflags", "+faststart",
str(output_path),
], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def load_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
for p in ["/usr/share/fonts/TTF/DejaVuSans.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/dejavu/DejaVuSans.ttf"]:
if Path(p).exists():
return ImageFont.truetype(p, size)
return ImageFont.load_default()
# ── Helpers ────────────────────────────────────────────────────────────────────
def assign_colors(active: list[dict], team_won: int
) -> tuple[np.ndarray, np.ndarray]:
"""Assign per-player RGB colors based on team membership (winner vs loser palette).
Args:
active: Active player dicts, each with a 'Team' key.
team_won: The winning team index.
Returns:
Tuple of (colors_arr, trail_colors_arr), both uint8 arrays of shape
(n_players, 3). Trail colors are dimmed to 60% brightness.
"""
colors_list = []
for p in active:
if p["Team"] == team_won:
colors_list.append(WIN_COLOR)
else:
colors_list.append(LOSE_COLOR)
colors_arr = np.array(colors_list, dtype=np.uint8)
return colors_arr, (colors_arr * 0.6).astype(np.uint8)
def make_bg(map_pil: Image.Image) -> np.ndarray:
return np.asarray(map_pil).copy()
# ── Public render function ─────────────────────────────────────────────────────
def render_gob(
d: dict,
out_path: Path,
fps: int = FPS,
speed: float = SPEED,
n_workers: int = N_WORKERS,
progress_cb: Optional[Callable[[int], None]] = None,
) -> None:
"""
Render a GOB replay dict to an MP4 file.
Args:
d: Parsed GOB replay dict (from _gob_to_dict or json.load)
out_path: Output MP4 path
fps: Frames per second
speed: Playback speed multiplier
n_workers: Thread pool size for parallel frame render
progress_cb: Optional callback called with int 0-100 during render
"""
players_by_id = {p["PlayerID"]: p for p in d["Players"]}
team_won = d["TeamWon"]
def build_active(entities: list[dict]) -> list[dict]:
out = []
for e in entities:
if e.get("PlayerID", 0) == 0:
continue
p = dict(players_by_id[e["PlayerID"]])
p["_samples"] = e["Path"]
p["_model"] = e["ModelName"]
out.append(p)
return out
ground_ents = [e for e in d["Entities"] if e.get("PlayerID", 0) != 0
and e["ModelName"].startswith("tankModels/")]
active = build_active(ground_ents)
if not active:
raise ValueError("No ground unit entities found in replay.")
print(f"Ground : {len(active)}")
# Time grid
all_t = [s["Time"] for e in ground_ents for s in e["Path"]]
t_start = float(min(all_t))
t_end = float(max(all_t))
ms_per_frame = (1000.0 / fps) * speed
n_frames = int(math.ceil((t_end - t_start) / ms_per_frame))
frame_times = t_start + np.arange(n_frames, dtype=np.float64) * ms_per_frame
trail_f = max(1, int(TRAIL_MS * fps / 1000.0))
duration_s = (t_end - t_start) / 1000.0
print(f"Duration : {duration_s:.1f}s → ~{n_frames/fps:.0f}s video ({n_frames} frames)")
# Load mode/scenario info from replay mission block
mission = d.get("Mission", {})
level = mission.get("Level", "")
level_settings_path = mission.get("LevelSettings", "")
battle_type = mission.get("BattleType", "")
mission_def, mission_def_path = load_mission_def(level_settings_path)
use_alt_map_coord = _mission_use_alt_map_coord(mission_def)
level_override = _mission_level_override(mission_def)
if level_override:
level = level_override
if mission_def_path:
print(f"MissionDef : {mission_def_path}")
if level_settings_path:
print(f"BattleType : {battle_type or '(n/a)'}")
print(f"AltMapCoord : {'on' if use_alt_map_coord else 'off'}")
# Load map coordinate bounds from local level defs
session_id = d.get("SessionID", 0)
print("Loading LevelDef …", end=" ", flush=True)
level_def = load_level_coords(level, session_id)
if not level_def:
raise ValueError(f"Missing local LevelDef for level '{level}'")
canvas = CANVAS_MIN
base_tc0, base_tc1, _ = select_tank_coords(level_def, use_alt_map_coord)
tc0, tc1, coord_src = resolve_world_bounds(
level_def,
use_alt_map_coord,
mission_def,
mission_def_path,
battle_type,
)
tc0, tc1, coord_src = _fit_world_bounds_to_ground_activity(
tc0, tc1, coord_src,
mission_def, mission_def_path, battle_type,
ground_ents,
)
tc0, tc1 = _expand_bounds_by_pixels(tc0, tc1, canvas=canvas, pad_px=MAP_PAD_PX)
tc0, tc1 = _clamp_bounds_to_base(tc0, tc1, base_tc0, base_tc1)
print(f"ok ({coord_src}) X=[{tc0[0]}, {tc1[0]}] Z=[{tc0[1]}, {tc1[1]}]")
capture_areas = resolve_capture_areas(mission_def, mission_def_path, battle_type)
if capture_areas:
print("Capture : " + ", ".join(
f"{c['name']}({c['type']})" for c in capture_areas
))
xfm = CoordTransform(tc0[0], tc0[1], tc1[0], tc1[1], canvas=canvas)
print(f"Canvas : {canvas}px (full tank map)")
print("Pre-computing positions …", end=" ", flush=True)
px, py = precompute_positions(active, xfm, frame_times)
print("done")
kills = [k for k in d.get("Kills", []) if t_start <= k["Time"] <= t_end]
damages = [dr for dr in d.get("DamageReports", []) if t_start <= dr["Time"] <= t_end]
print(f"Events : {len(kills)} kills, {len(damages)} damage reports")
is_dead, death_frame, death_fade, last_dead_px, last_dead_py = precompute_deaths(
active, kills, t_start, ms_per_frame, n_frames, fps, px, py)
wipe_f = find_team_wipe_frame(active, is_dead, n_frames)
end_frame = min(wipe_f + WIPE_GRACE, n_frames)
if wipe_f < n_frames:
print(f"Team wipe : frame {wipe_f} (~{wipe_f/fps:.1f}s video)")
colors, trail_colors = assign_colors(active, team_won)
shadow_color = np.array([0, 0, 0], dtype=np.uint8)
drones = build_drone_list(d, active, xfm, px, py,
frame_times, t_start, ms_per_frame, colors)
n_drones = len(drones)
if n_drones:
px_dr, py_dr = precompute_positions(drones, xfm, frame_times)
drone_alt = precompute_altitudes(drones, frame_times)
drone_cols = np.array([dr["color"] for dr in drones], dtype=np.uint8)
drone_trail_cols = (drone_cols * 0.6).astype(np.uint8)
# Match drone kills by EntityIndex (since PlayerID=0)
drone_is_hit, drone_hit_fade, drone_is_crashed, drone_crash_frame, \
drone_last_dead_px, drone_last_dead_py = \
precompute_air_deaths(drones, kills, t_start, ms_per_frame, n_frames, fps, px_dr, py_dr)
# Override: drones freeze at kill/path-end and fade to transparent over ~2.5s
drone_fade_frames = max(1, int(2.5 * fps))
for i in range(len(drones)):
hit_frames = np.where(drone_is_hit[i])[0]
if len(hit_frames):
# Kill-matched: freeze at kill frame
hf = hit_frames[0]
else:
# Unmatched: freeze at path end
cf = int(drone_crash_frame[i])
if cf >= n_frames:
continue # path extends beyond video, drone stays alive
hf = cf
# Apply 2.5s fade from freeze point
drone_is_hit[i, :] = False
drone_is_hit[i, hf:] = True
n_slots = min(drone_fade_frames + 1, n_frames - hf)
drone_hit_fade[i, :] = 1.0
drone_hit_fade[i, hf:hf + n_slots] = np.maximum(
0.0, 1.0 - np.arange(n_slots, dtype=np.float32) / drone_fade_frames)
drone_hit_fade[i, hf + n_slots:] = 0.0
# Freeze position at hit/end frame
drone_last_dead_px[i] = px_dr[i, min(hf, px_dr.shape[1] - 1)]
drone_last_dead_py[i] = py_dr[i, min(hf, py_dr.shape[1] - 1)]
# Also stop the trail at this frame
drone_crash_frame[i] = hf
drone_is_crashed[i, :] = False
if hf < n_frames - 1:
drone_is_crashed[i, hf + 1:] = True
print(f"Drones : {n_drones}")
else:
px_dr = py_dr = drone_cols = drone_trail_cols = drone_alt = None
drone_is_hit = drone_hit_fade = drone_is_crashed = drone_crash_frame = None
drone_last_dead_px = drone_last_dead_py = None
# Aircraft — non-tank, non-drone entities with path data
air_ents = [e for e in d["Entities"]
if e.get("PlayerID", 0) != 0
and not e["ModelName"].startswith("tankModels/")
and "ucav" not in e["ModelName"].lower()
and e.get("Path")]
air_active = build_active(air_ents)
n_aircraft = len(air_active)
if n_aircraft:
px_air, py_air = precompute_positions(air_active, xfm, frame_times)
air_alt = precompute_altitudes(air_active, frame_times)
# Air death: keep flying after hit, fade to black, crash at end of path
air_is_hit, air_hit_fade, air_is_crashed, air_crash_frame, \
air_last_dead_px, air_last_dead_py = \
precompute_air_deaths(air_active, kills, t_start, ms_per_frame, n_frames, fps, px_air, py_air)
# Assign colors: match team using same win/lose scheme
air_cols_list = []
for p in air_active:
if p["Team"] == team_won:
air_cols_list.append(WIN_COLOR)
else:
air_cols_list.append(LOSE_COLOR)
air_cols = np.array(air_cols_list, dtype=np.uint8)
air_trail_cols = (air_cols * 0.6).astype(np.uint8)
print(f"Air : {n_aircraft}")
else:
px_air = py_air = air_cols = air_trail_cols = air_alt = None
air_is_hit = air_hit_fade = air_is_crashed = air_crash_frame = None
air_last_dead_px = air_last_dead_py = None
print("Loading tank map …", end=" ", flush=True)
map_img = load_map_image(
level,
level_def,
battle_type=battle_type,
level_settings_path=level_settings_path,
base_coords=(base_tc0, base_tc1),
render_coords=(tc0, tc1),
canvas=canvas,
)
if not map_img:
print("not found")
raise ValueError(f"No local tankmap image for level '{level}'")
map_img = _draw_capture_areas(map_img, capture_areas, tc0, tc1)
print("ok")
bg = make_bg(map_img)
# Scale icons and text — reference size is 1024px (original design target)
scale = canvas / 1024.0
s_dot_r = max(2, int(DOT_R * scale + 0.5))
s_drone_r = max(1, int(DRONE_R * scale + 0.5))
s_font_sz = max(8, int(11 * scale + 0.5))
s_air_r = max(2, int(AIR_R * scale + 0.5))
s_shadow_dy, s_shadow_dx = make_circle_masks(s_dot_r + 1)
s_dot_dy, s_dot_dx = make_circle_masks(s_dot_r)
s_drone_dy, s_drone_dx = make_circle_masks(s_drone_r)
s_air_dy, s_air_dx = make_triangle_masks(s_air_r)
font = load_font(s_font_sz)
kbf = precompute_kills(kills, xfm, t_start, ms_per_frame, n_frames, fps, font,
offset_x=0, offset_y=0)
dbf = precompute_damages(damages, active, px, py, t_start, ms_per_frame, n_frames, fps)
name_sprs = make_name_sprites([p["Name"] for p in active], font)
model_sprs = make_name_sprites([_short_model(p["_model"]) for p in active], font)
label_sprs = [merge_sprites(ns, ms) for ns, ms in zip(name_sprs, model_sprs)]
drone_spr = make_name_sprites(["(Drone)"] * n_drones, font)
# Build altitude sprite cache — shared between drones and aircraft
all_alt_vals: set[int] = set()
if drone_alt is not None:
all_alt_vals.update(int(v) // 10 * 10 for v in drone_alt[drone_alt >= 0])
if air_alt is not None:
all_alt_vals.update(int(v) // 10 * 10 for v in air_alt[air_alt >= 0])
alt_sprites: dict[int, Sprite] = {}
for a in all_alt_vals:
sprs = make_name_sprites([f"(Alt: {a}m)"], font)
alt_sprites[a] = sprs[0]
# Build per-player vehicle icon sprites (tinted to player color)
icon_size = max(12, int(TANK_ICON_SIZE * scale + 0.5))
hl_pad = max(1, int(ICON_HIGHLIGHT_PAD * scale + 0.5))
player_icon_sprs: list[Sprite] = []
player_icon_keys: list[str] = []
for i, p in enumerate(active):
ik = get_icon_key(p["_model"])
player_icon_keys.append(ik)
c = (int(colors[i][0]), int(colors[i][1]), int(colors[i][2]))
player_icon_sprs.append(make_tinted_icon_sprite(
ik, c, icon_size, highlight_pad=hl_pad, highlight_color=ICON_HIGHLIGHT_COLOR))
# Dead sprites: bare black silhouette (no highlight glow)
player_dead_sprs = [make_bare_black_sprite(ik, icon_size, hl_pad)
for ik in player_icon_keys]
print(f"Icons : {len(player_icon_sprs)} ground @ {icon_size}px (+{hl_pad}px highlight)")
# Build per-aircraft icon sprites
air_icon_sprs: list[Sprite] = []
if n_aircraft:
assert air_cols is not None
air_name_sprs = make_name_sprites([p["Name"] for p in air_active], font)
air_model_sprs = make_name_sprites([_short_model(p["_model"]) for p in air_active], font)
air_label_sprs = [merge_sprites(ns, ms) for ns, ms in zip(air_name_sprs, air_model_sprs)]
air_icon_size = max(10, int(AIR_ICON_SIZE * scale + 0.5))
for i, p in enumerate(air_active):
ik = get_icon_key(p["_model"])
c = (int(air_cols[i][0]), int(air_cols[i][1]), int(air_cols[i][2]))
air_icon_sprs.append(make_tinted_icon_sprite(
ik, c, air_icon_size, highlight_pad=hl_pad, highlight_color=ICON_HIGHLIGHT_COLOR))
air_dead_sprs = [make_black_sprite(s) for s in air_icon_sprs]
air_rot_caches = [make_rotation_cache(s) for s in air_icon_sprs]
air_dead_rot_caches = [make_rotation_cache(s) for s in air_dead_sprs]
assert px_air is not None and py_air is not None
air_headings = precompute_headings(px_air, py_air)
print(f"Icons : {n_aircraft} aircraft @ {air_icon_size}px (+{hl_pad}px highlight, {ROTATION_STEPS} rotations)")
else:
air_label_sprs = []
air_dead_sprs = []
air_rot_caches = []
air_dead_rot_caches = []
air_headings = None
# Build per-drone icon sprites with rotation (like aircraft)
drone_icon_sprs: list[Sprite] = []
drone_dead_sprs_list: list[Sprite] = []
drone_rot_caches: list = []
drone_dead_rot_caches: list = []
drone_headings_arr: np.ndarray | None = None
drone_icon_keys: list[str] = []
if n_drones:
drone_icon_size = max(10, int(AIR_ICON_SIZE * scale + 0.5))
for dr in drones:
ik = get_icon_key(dr["entity"]["ModelName"])
drone_icon_keys.append(ik)
c = (int(dr["color"][0]), int(dr["color"][1]), int(dr["color"][2]))
drone_icon_sprs.append(make_tinted_icon_sprite(
ik, c, drone_icon_size, highlight_pad=hl_pad, highlight_color=ICON_HIGHLIGHT_COLOR))
# Drones: bare black silhouette (no glow) when dead
drone_dead_sprs_list = [make_bare_black_sprite(ik, drone_icon_size, hl_pad)
for ik in drone_icon_keys]
drone_rot_caches = [make_rotation_cache(s) for s in drone_icon_sprs]
drone_dead_rot_caches = [make_rotation_cache(s) for s in drone_dead_sprs_list]
if px_dr is not None and py_dr is not None:
drone_headings_arr = precompute_headings(px_dr, py_dr)
print(f"Icons : {n_drones} drones @ {drone_icon_size}px (+{hl_pad}px highlight, {ROTATION_STEPS} rotations)")
# Load target icons for kill/damage events
target_size = max(12, int(20 * scale + 0.5))
kill_target = load_target_sprite("target_red.png", target_size)
dmg_target = load_target_sprite("target_yellow.png", target_size)
air_trail_f = max(1, int(AIR_TRAIL_MS * fps / 1000.0))
drone_trail_f = max(1, int(DRONE_TRAIL_MS * fps / 1000.0))
ctx = VideoCtx(
n_players=len(active), px_all=px, py_all=py,
colors_arr=colors, trail_colors_arr=trail_colors,
is_dead=is_dead, death_frame=death_frame, death_fade=death_fade,
last_dead_px=last_dead_px, last_dead_py=last_dead_py,
kills_by_frame=kbf, damages_by_frame=dbf, label_sprites=label_sprs,
bg_arr=bg, end_frame=end_frame,
n_drones=n_drones, px_drone=px_dr, py_drone=py_dr,
drone_colors_arr=drone_cols, drone_trail_colors=drone_trail_cols,
drone_sprites=drone_spr,
drone_icon_sprites=drone_icon_sprs, drone_dead_sprites=drone_dead_sprs_list,
drone_rot_caches=drone_rot_caches, drone_dead_rot_caches=drone_dead_rot_caches,
drone_headings=drone_headings_arr,
drone_is_hit=drone_is_hit, drone_hit_fade=drone_hit_fade,
drone_is_crashed=drone_is_crashed, drone_crash_frame=drone_crash_frame,
drone_last_dead_px=drone_last_dead_px, drone_last_dead_py=drone_last_dead_py,
drone_alt=drone_alt, drone_alt_sprites=alt_sprites,
n_aircraft=n_aircraft, px_air=px_air, py_air=py_air,
air_colors_arr=air_cols, air_trail_colors=air_trail_cols,
air_sprites=air_label_sprs,
air_is_hit=air_is_hit, air_hit_fade=air_hit_fade,
air_is_crashed=air_is_crashed, air_crash_frame=air_crash_frame,
air_last_dead_px=air_last_dead_px, air_last_dead_py=air_last_dead_py,
air_alt=air_alt, air_alt_sprites=alt_sprites,
air_trail_f=air_trail_f, drone_trail_f=drone_trail_f,
dot_dy=s_dot_dy, dot_dx=s_dot_dx,
shadow_dy=s_shadow_dy, shadow_dx=s_shadow_dx,
drone_dy=s_drone_dy, drone_dx=s_drone_dx,
air_dy=s_air_dy, air_dx=s_air_dx,
dot_r=s_dot_r, drone_r=s_drone_r, air_r=s_air_r,
canvas=canvas,
player_icon_sprites=player_icon_sprs,
player_dead_sprites=player_dead_sprs,
air_icon_sprites=air_icon_sprs,
air_dead_sprites=air_dead_sprs,
air_rot_caches=air_rot_caches,
air_dead_rot_caches=air_dead_rot_caches,
air_headings=air_headings,
kill_target_spr=kill_target,
dmg_target_spr=dmg_target,
)
shared = (shadow_color, trail_f)
def render_frame(fi: int) -> bytes:
buf = _get_thread_buf(ctx.bg_arr)
render_one_ctx(fi, buf, ctx, *shared)
return buf.tobytes()
ff = open_ffmpeg(out_path, fps, canvas, canvas)
assert ff.stdin is not None
fd = ff.stdin.fileno()
BATCH = n_workers * 8
with ThreadPoolExecutor(max_workers=n_workers) as pool:
for chunk_start in range(0, end_frame, BATCH):
chunk_end = min(chunk_start + BATCH, end_frame)
futs = [pool.submit(render_frame, fi)
for fi in range(chunk_start, chunk_end)]
for fut in futs:
os.write(fd, fut.result())
if progress_cb is not None:
pct = int(chunk_end / end_frame * 100)
progress_cb(pct)
ff.stdin.close()
ff.wait()
sz = out_path.stat().st_size / 1_048_576
print(f"\nDone → {out_path} ({sz:.1f} MB)")
# ── GOB loading helpers ───────────────────────────────────────────────────────
def _decode_gob_bytes(raw: bytes) -> str:
"""Decode replay byte strings while trimming fixed-width padding bytes."""
core = raw.split(b"\x00", 1)[0]
text = core.decode("utf-8", errors="replace")
return text.rstrip("".join(chr(i) for i in range(0x00, 0x20)) + "\x7f")
def _gob_to_dict(obj: object) -> Any:
"""Recursively convert pygob namedtuples to plain dicts."""
if isinstance(obj, tuple) and hasattr(obj, '_fields'): # type: ignore[union-attr]
return {f: _gob_to_dict(getattr(obj, f)) for f in obj._fields} # type: ignore[union-attr]
elif isinstance(obj, list):
return [_gob_to_dict(i) for i in obj]
elif isinstance(obj, dict):
return {
(_decode_gob_bytes(k) if isinstance(k, bytes) else k): _gob_to_dict(v)
for k, v in obj.items()
}
elif isinstance(obj, bytes):
return _decode_gob_bytes(obj)
return obj
def load_gob_file(gob_path: Path) -> dict[str, Any]:
"""Load a .gob (zstd-compressed) or .json replay file and return the dict."""
raw = gob_path.read_bytes()
if gob_path.suffix == ".json":
return json.loads(raw)
# zstd-compressed gob binary
decompressor = zstd.ZstdDecompressor()
data = decompressor.decompress(raw, max_output_size=200 * 1024 * 1024)
replay = pygob.load(data)
return _gob_to_dict(replay) # type: ignore[return-value]
# ── JSON export (slim dict for the web canvas replay viewer) ──────────────────
def _entity_type(model_name: str) -> str:
if model_name.startswith("tankModels/"):
return "ground"
if "ucav" in model_name.lower():
return "drone"
return "aircraft"
_TAG_TO_ICON_KEY = [
("type_spaa", "spaa"),
("type_light_tank", "light"),
("type_tank_destroyer", "tank_destroyer"),
("type_heavy_tank", "heavy"),
("type_medium_tank", "medium"),
("type_missile_tank", "tank_destroyer"),
("type_bomber", "bomber_icon"),
("type_strike_aircraft", "fighter_icon"),
("type_jet_fighter", "jet_icon"),
("type_fighter", "fighter_icon"),
("type_strike_ucav", "drone"),
("type_helicopter", "helicopter_icon"),
]
def _vehicle_icon_key(model_name: str) -> str:
"""Return an icon key for ground vehicles: light, medium, heavy, spaa, tank_destroyer, drone.
Aircraft return tag-based fallback keys (fighter_icon, bomber_icon, etc.)."""
internal = model_name.split("/")[-1]
if "ucav" in model_name.lower():
return "drone"
tags = _get_unit_tags(internal)
if tags:
tag_set = set(tags)
for tag, icon in _TAG_TO_ICON_KEY:
if tag in tag_set:
return icon
return "medium"
def _vehicle_mini_icon(model_name: str) -> str | None:
"""Return the mini icon filename for aircraft (e.g. 'spitfire_ix_ico'), or None."""
internal = model_name.split("/")[-1]
mini_path = Path(__file__).resolve().parent / "ICONS" / "MINIS" / f"{internal}_ico.png"
if mini_path.exists():
return f"mini:{internal}_ico"
return None
def _subsample_path(path: list[dict], threshold: float = 1.0) -> list[dict]:
"""Keep first, last, and points that moved >= threshold world units."""
if len(path) <= 2:
return path
out = [path[0]]
lx, lz = path[0]["X"], path[0]["Z"]
for pt in path[1:-1]:
dx = pt["X"] - lx
dz = pt["Z"] - lz
if dx * dx + dz * dz >= threshold * threshold:
out.append(pt)
lx, lz = pt["X"], pt["Z"]
out.append(path[-1])
return out
def _resolve_drone_team(drone_entity: dict, ground_entities: list[dict],
players_by_id: dict) -> int:
"""Return team index for a drone by finding nearest ground player at spawn."""
if not drone_entity.get("Path") or not ground_entities:
return 0
spawn_t = drone_entity["Path"][0]["Time"]
spawn_x = drone_entity["Path"][0]["X"]
spawn_z = drone_entity["Path"][0]["Z"]
best_team = 0
best_dist = float("inf")
for ge in ground_entities:
pid = ge.get("PlayerID", 0)
if pid == 0 or pid not in players_by_id:
continue
closest_pt = None
closest_dt = float("inf")
for pt in ge.get("Path", []):
dt = abs(pt["Time"] - spawn_t)
if dt < closest_dt:
closest_dt = dt
closest_pt = pt
if closest_pt is None:
continue
dx = closest_pt["X"] - spawn_x
dz = closest_pt["Z"] - spawn_z
dist = dx * dx + dz * dz
if dist < best_dist:
best_dist = dist
best_team = players_by_id[pid].get("Team", 0)
return best_team
def export_replay_json(gob_path: Path) -> dict:
"""Load a GOB file and produce a slim dict for the web viewer."""
d = load_gob_file(gob_path)
players_by_id = {p["PlayerID"]: p for p in d.get("Players", [])}
team_won = d.get("TeamWon", 0)
mission = d.get("Mission", {})
level_path = mission.get("Level", "")
level_settings_path = mission.get("LevelSettings", "")
battle_type = mission.get("BattleType", "")
mission_def, mission_def_path = load_mission_def(level_settings_path)
use_alt_map_coord = _mission_use_alt_map_coord(mission_def)
level_override = _mission_level_override(mission_def)
if level_override:
level_path = level_override
session_id = d.get("SessionID", 0)
level_data = load_level_coords(level_path, session_id=session_id)
tank_map_coords = None
ground_entities_for_bounds = [e for e in d.get("Entities", [])
if e.get("PlayerID", 0) != 0
and e.get("ModelName", "").startswith("tankModels/")
and e.get("Path")]
capture_areas = resolve_capture_areas(mission_def, mission_def_path, battle_type)
if level_data:
base_c0, base_c1, _ = select_tank_coords(level_data, use_alt_map_coord)
tank_map_coords = {"x0": base_c0[0], "z0": base_c0[1], "x1": base_c1[0], "z1": base_c1[1]}
c0, c1, _ = resolve_world_bounds(
level_data,
use_alt_map_coord,
mission_def,
mission_def_path,
battle_type,
)
c0, c1, _ = _fit_world_bounds_to_ground_activity(
c0, c1, "json",
mission_def, mission_def_path, battle_type,
ground_entities_for_bounds,
)
c0, c1 = _expand_bounds_by_pixels(c0, c1, canvas=CANVAS_MIN, pad_px=MAP_PAD_PX)
c0, c1 = _clamp_bounds_to_base(c0, c1, base_c0, base_c1)
level_coords = {"x0": c0[0], "z0": c0[1], "x1": c1[0], "z1": c1[1]}
else:
level_coords = {"x0": 0, "z0": 0, "x1": 4096, "z1": 4096}
map_coords = None
full_map_level = None
if level_data and "mapCoord0" in level_data and "mapCoord1" in level_data:
mc0 = level_data["mapCoord0"]
mc1 = level_data["mapCoord1"]
if isinstance(mc0[0], list):
mc0 = mc0[0]
if isinstance(mc1[0], list):
mc1 = mc1[0]
map_coords = {"x0": mc0[0], "z0": mc0[1], "x1": mc1[0], "z1": mc1[1]}
stem = Path(level_path).stem if level_path else ""
custom = level_data.get("customLevelMap", "")
if custom:
full_map_level = custom.rstrip("*")
else:
full_map_level = stem + "_map" if stem else None
players_out = []
for p in d.get("Players", []):
player = {
"id": p["PlayerID"],
"name": p.get("Name", ""),
"team": p.get("Team", 0),
}
clan = p.get("ClanTag", "")
if clan:
player["clan"] = clan
players_out.append(player)
ground_entities = [e for e in d.get("Entities", [])
if e.get("PlayerID", 0) != 0
and e["ModelName"].startswith("tankModels/")
and e.get("Path")]
entities_out = []
for e in d.get("Entities", []):
path = e.get("Path", [])
if not path:
continue
pid = e.get("PlayerID", 0)
etype = _entity_type(e["ModelName"])
if pid == 0 and etype != "drone":
continue
internal = e["ModelName"].split("/")[-1]
vehicle_name = _translate_vehicle(internal)
if etype == "drone":
drone_team = _resolve_drone_team(e, ground_entities, players_by_id)
else:
drone_team = None
sampled = _subsample_path(path)
path_out = [{"t": round(pt["Time"]), "x": round(pt["X"], 1),
"z": round(pt["Z"], 1), "y": round(pt.get("Y", 0), 1)}
for pt in sampled]
icon_key = _vehicle_icon_key(e["ModelName"])
ent = {
"playerId": pid,
"entityIndex": e.get("EntityIndex", 0),
"type": etype,
"iconKey": icon_key,
"vehicleName": vehicle_name,
"path": path_out,
}
if etype == "aircraft":
mini = _vehicle_mini_icon(e["ModelName"])
if mini:
ent["miniIcon"] = mini
if drone_team is not None:
ent["droneTeam"] = drone_team
entities_out.append(ent)
kills_out = []
for k in d.get("Kills", []):
kill = {
"time": round(k.get("Time", 0)),
"victimId": k.get("VictimID", 0),
"killerId": k.get("KillerID", 0),
"victimEntityIndex": k.get("VictimEntityIndex", 0),
"weapon": _translate_weapon(k.get("Weapon", "")),
}
vp = k.get("VictimPosition")
if vp:
kill["victimPos"] = {"x": round(vp.get("X", 0), 1), "z": round(vp.get("Z", 0), 1)}
kp = k.get("KillerPosition")
if kp:
kill["killerPos"] = {"x": round(kp.get("X", 0), 1), "z": round(kp.get("Z", 0), 1)}
km = k.get("KillerModel", "")
if km:
kill["killerVehicle"] = _translate_vehicle(km.split("/")[-1])
vm = k.get("VictimModel", "")
if vm:
kill["victimVehicle"] = _translate_vehicle(vm.split("/")[-1])
kills_out.append(kill)
damages_out = []
for dm in d.get("DamageReports", []):
damages_out.append({
"time": round(dm.get("Time", 0)),
"offenderId": dm.get("OffenderID", 0),
"offendedId": dm.get("OffendedID", 0),
})
out = {
"teamWon": team_won,
"mission": {
"level": Path(level_path).stem if level_path else "",
"battleType": battle_type or "",
"levelSettings": level_settings_path or "",
"useAlternativeMapCoord": use_alt_map_coord,
},
"levelCoords": level_coords,
"captureAreas": capture_areas,
"players": players_out,
"entities": entities_out,
"kills": kills_out,
"damages": damages_out,
}
if tank_map_coords:
out["tankMapCoords"] = tank_map_coords
if map_coords and full_map_level:
out["mapCoords"] = map_coords
out["fullMapLevel"] = full_map_level
return out
# ── Main (CLI wrapper) ─────────────────────────────────────────────────────────
def main():
"""CLI entry point: render a GOB replay to MP4, or export a slim JSON for the web viewer.
Output mode is selected by the output file extension: `.json` → json export,
anything else → mp4 render. Supports --profile for cProfile hotspot analysis.
"""
import argparse
parser = argparse.ArgumentParser(description="Render GOB replay to MP4")
parser.add_argument("gob", nargs="?", help="Path to .gob or .json replay")
parser.add_argument("out", nargs="?", help="Output .mp4 path")
parser.add_argument("--fps", type=int, default=FPS)
parser.add_argument("--speed", type=float, default=SPEED)
parser.add_argument("--workers", type=int, default=N_WORKERS)
parser.add_argument("--profile", action="store_true",
help="Run with cProfile and print top 40 hotspots")
args = parser.parse_args()
if args.gob:
gob_path = Path(args.gob)
else:
candidates = sorted(REPLAYS_DIR.glob("*/replay.gob"))
if not candidates:
candidates = sorted(REPLAYS_DIR.glob("*.json"))
if not candidates:
sys.exit(f"No .gob or .json files in {REPLAYS_DIR}")
gob_path = candidates[0]
out_path = Path(args.out) if args.out else gob_path.parent / "replay_video.mp4"
if out_path.suffix.lower() == ".json":
data = export_replay_json(gob_path)
raw = json.dumps(data, separators=(",", ":"))
out_path.write_text(raw, encoding="utf-8")
print(f"Exported {len(raw):,} bytes to {out_path}")
return
print(f"Input : {gob_path}")
print(f"Output : {out_path}")
print(f"Settings : {args.fps}fps {args.speed:.0f}× {args.workers} threads")
d = load_gob_file(gob_path)
if args.profile:
import cProfile
import pstats
import io
import time
# Phase 1: profile just the prep (everything before frame loop)
# We do this by profiling render_gob with 0 workers trick — not feasible,
# so profile the whole thing and the stats will show us.
print("\n=== PROFILING ===\n")
t0 = time.perf_counter()
pr = cProfile.Profile()
pr.enable()
render_gob(d, out_path, fps=args.fps, speed=args.speed, n_workers=args.workers)
pr.disable()
wall = time.perf_counter() - t0
print(f"\n{'='*70}")
print(f"Total wall time: {wall:.2f}s")
print(f"{'='*70}\n")
s = io.StringIO()
ps = pstats.Stats(pr, stream=s)
ps.strip_dirs().sort_stats("cumulative")
ps.print_stats(40)
print(s.getvalue())
s2 = io.StringIO()
ps2 = pstats.Stats(pr, stream=s2)
ps2.strip_dirs().sort_stats("tottime")
ps2.print_stats(40)
print("\n--- Sorted by total time ---\n")
print(s2.getvalue())
else:
render_gob(d, out_path, fps=args.fps, speed=args.speed, n_workers=args.workers)
if __name__ == "__main__":
main()