""" render_recap.py CLI tool that renders a season recap PNG for either a squadron or a single player. Invoked by web/server.js on cache miss. Usage (squadron mode — default): python render_recap.py \\ --mode squadron \\ --clan-id 123456 \\ --season 2026-II \\ --season-start 1772348400 \\ --season-end 1777852799 \\ --week-boundaries 1772348400,1773039600,... \\ --out /path/to/output.png Usage (player mode): python render_recap.py \\ --mode player \\ --uid 987654321 \\ --season 2026-II \\ --season-start 1772348400 \\ --season-end 1777852799 \\ --week-boundaries 1772348400,1773039600,... \\ --out /path/to/output.png Exits 0 on success, non-zero with stderr diagnostic on failure. """ import argparse import gzip import json import re import logging import os import sqlite3 import sys from collections import defaultdict from dataclasses import dataclass from datetime import date as _date, datetime, timezone from pathlib import Path from typing import List, Literal, Optional, Tuple import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.gridspec import GridSpec from matplotlib.offsetbox import AnchoredOffsetbox, HPacker, TextArea plt.rcParams["font.family"] = "sans-serif" plt.rcParams["font.sans-serif"] = [ "DejaVu Sans", "Noto Sans", "Noto Sans Symbols2", "Arial", "Liberation Sans", "sans-serif", ] def _sanitize_render_text(s: str) -> str: """Strip Unicode code points that render as tofu boxes (tag chars, variation selectors).""" if not s: return s return "".join( c for c in s if not ( 0xE0000 <= ord(c) <= 0xE007F # language tag characters or 0xFE00 <= ord(c) <= 0xFE0F # VS1-VS16 or 0xE0100 <= ord(c) <= 0xE01EF # VS17-VS256 ) ) # This script runs as a subprocess entry point (python BOT/render_recap.py …), # so BOT/__init__.py is not invoked automatically. Put SREBOT on sys.path, # then `import BOT` to trigger the package's SHARED bootstrap once. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) import BOT # noqa: F401, E402 — side effect: adds BOTS/SHARED to sys.path from data_parser import apply_vehicle_name_filters # noqa: E402 LOCALES_DIR = Path(__file__).resolve().parent.parent / "web" / "locales" DEFAULT_LANG = "en" def load_translations(lang: str) -> dict[str, str]: """Load seasonCard.* strings for the given language, falling back to English for missing keys.""" def _read(lang_code: str) -> dict: path = LOCALES_DIR / f"{lang_code}.json" if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")).get("seasonCard", {}) except Exception: return {} en = _read(DEFAULT_LANG) translated = _read(lang) if lang != DEFAULT_LANG else {} merged = dict(en) merged.update(translated) return merged RATING_COLOR = "#14b8a6" WR_COLOR = "#f59e0b" KD_COLOR = "#a78bfa" BATTLES_COLOR = "#60a5fa" # Ordered so the first slots avoid clashing with the rolling-curve colors # (WR #f59e0b orange, K/D #a78bfa violet, Battles #60a5fa blue). _SQUADRON_PALETTE: List[str] = [ "#14b8a6", # teal (same as RATING_COLOR — single-squadron default) "#dc2626", # red "#059669", # green "#ec4899", # pink "#84cc16", # lime "#06b6d4", # cyan "#db2777", # magenta "#65a30d", # olive ] _NO_SQUADRON_COLOR = "#64748b" THEMES: dict[str, dict[str, str]] = { "light": { "bg": "#fafafa", "text": "#0f172a", "muted": "#64748b", "header_sub": "#475569", "grid": "#94a3b8", "divider": "#cbd5e1", "footer": "#94a3b8", }, "dark": { "bg": "#121418", "text": "#e5e7eb", "muted": "#9ca3af", "header_sub": "#d1d5db", "grid": "#2a2f36", "divider": "#262a30", "footer": "#6b7280", }, } def theme_palette(name: str) -> dict[str, str]: return THEMES.get(name, THEMES["light"]) def smooth_series(series: List[Tuple[int, int]], window: int = 12 ) -> List[Tuple[int, float]]: """Centered moving average over (ts, value) pairs. Preserves length.""" n = len(series) if n == 0: return [] if n < window * 2: return [(ts, float(v)) for ts, v in series] half = window // 2 out: List[Tuple[int, float]] = [] for i in range(n): lo = max(0, i - half) hi = min(n, i + half + 1) avg = sum(v for _, v in series[lo:hi]) / (hi - lo) out.append((series[i][0], avg)) return out def _downsample_daily(series: list) -> list: """Reduce to one point per UTC day, keeping each day's last (ts, value). Caller can still apply smoothing; this just removes intra-day clustering that otherwise renders as a vertical bar on a multi-month chart.""" if not series: return [] out: list = [] prev_day = None last_pt = None for ts, v in series: day = datetime.fromtimestamp(ts, tz=timezone.utc).date() if prev_day is not None and day != prev_day and last_pt is not None: out.append(last_pt) last_pt = (ts, v) prev_day = day if last_pt is not None: out.append(last_pt) return out def _split_on_gaps(series: list, gap_seconds: int) -> list: """Split a time-sorted series into sublists wherever consecutive timestamps are more than gap_seconds apart. Prevents matplotlib from interpolating a straight line across an inactive period.""" if not series: return [] segments: list = [[]] prev_ts = None for item in series: ts = item[0] if prev_ts is not None and ts - prev_ts > gap_seconds and segments[-1]: segments.append([]) segments[-1].append(item) prev_ts = ts return [s for s in segments if s] def _clip_to_ranges(series: list, timeline: list) -> list: """Return a list of sublists — one per timeline range — containing only the points whose ts falls inside that range's [first_ts, last_ts]. Used to make rolling WR/KD/Battles curves respect squadron shading boundaries instead of sprawling across unshaded gaps.""" if not series or not timeline: return [] out: list = [] for rng in timeline: seg = [p for p in series if rng.first_ts <= p[0] <= rng.last_ts] if seg: out.append(seg) return out def _smooth_rolling_curve(series: list, window: int = 8) -> list: """Centered moving average for already-computed rolling curves. Accepts (ts, int) or (ts, float) pairs; returns (ts, float).""" n = len(series) if n == 0: return [] if n < window * 2: return [(t, float(v)) for t, v in series] half = window // 2 out: list = [] for i in range(n): lo = max(0, i - half) hi = min(n, i + half + 1) avg = sum(v for _, v in series[lo:hi]) / (hi - lo) out.append((series[i][0], float(avg))) return out _storage_env = os.environ.get("STORAGE_VOL_PATH", "").strip() if not _storage_env: raise RuntimeError("STORAGE_VOL_PATH must be set") _STORAGE = Path(_storage_env) SQUADRONS_DB = _STORAGE / "squadrons.db" SQ_BATTLES_DB = _STORAGE / "sq_battles.db" SEASONS_TXT = (Path(__file__).resolve().parent.parent / "web" / "constants" / "seasons") _BR_LINE_RE = re.compile(r".*?max BR\s+(\d+(?:\.\d+)?)", re.IGNORECASE) def _load_br_schedule() -> List[dict]: """Parse web/constants/seasons into a list of [{'max_br': float, 'start': unix, 'end': unix}, ...]. Contains every season's BR tiers. Callers filter by season window.""" try: text = SEASONS_TXT.read_text(encoding="utf-8") except OSError: return [] entries: List[dict] = [] for line in text.splitlines(): m = _BR_LINE_RE.search(line) if not m: continue entries.append({ "start": int(m.group(1)), "max_br": float(m.group(2)), "end": 0, }) # End = next entry's start (minus one sec); last entry gets +7d default. for i, e in enumerate(entries): nxt = entries[i + 1]["start"] if i + 1 < len(entries) else 0 e["end"] = (nxt - 1) if nxt > e["start"] else e["start"] + 7 * 86400 return entries @dataclass class Args: mode: Literal["squadron", "player"] clan_id: Optional[int] uid: Optional[str] season: str season_start: int season_end: int week_boundaries: List[int] out: Path theme: str lang: str def parse_args(argv: Optional[List[str]] = None) -> Args: p = argparse.ArgumentParser(description=__doc__) p.add_argument("--mode", choices=["squadron", "player"], default="squadron") p.add_argument("--clan-id", type=int, default=None) p.add_argument("--uid", type=str, default=None) p.add_argument("--season", required=True) p.add_argument("--season-start", type=int, required=True) p.add_argument("--season-end", type=int, required=True) p.add_argument("--week-boundaries", default="") p.add_argument("--out", type=Path, required=True) p.add_argument("--theme", choices=list(THEMES.keys()), default="light") p.add_argument("--lang", default=DEFAULT_LANG) ns = p.parse_args(argv) if ns.mode == "squadron" and ns.clan_id is None: p.error("--mode squadron requires --clan-id") if ns.mode == "player": if not ns.uid: p.error("--mode player requires --uid") if not ns.uid.isdigit(): p.error("--uid must be numeric") boundaries = [int(x) for x in ns.week_boundaries.split(",") if x.strip()] return Args( mode=ns.mode, clan_id=ns.clan_id, uid=ns.uid, season=ns.season, season_start=ns.season_start, season_end=ns.season_end, week_boundaries=boundaries, out=ns.out, theme=ns.theme, lang=ns.lang, ) def _open_ro(path: Path) -> sqlite3.Connection: uri = f"file:{path}?mode=ro" return sqlite3.connect(uri, uri=True, timeout=10.0) @dataclass class SquadronIdent: clan_id: int short_name: str long_name: str def resolve_squadron(conn_sq: sqlite3.Connection, clan_id: int) -> Optional[SquadronIdent]: cur = conn_sq.execute( "SELECT short_name, long_name FROM squadrons_data WHERE clan_id = ?", (clan_id,), ) row = cur.fetchone() if not row: return None return SquadronIdent(clan_id=clan_id, short_name=row[0] or "", long_name=row[1] or "") @dataclass class RatingDerived: series: List[Tuple[int, int]] # (unix_time, total_score) ordered final: Optional[int] peak: Optional[int] peak_ts: Optional[int] first: Optional[int] change: Optional[int] def gather_squadron_rating(conn_sq: sqlite3.Connection, clan_id: int, start: int, end: int) -> RatingDerived: cur = conn_sq.execute( "SELECT unix_time, total_score FROM squadrons_points " "WHERE clan_id = ? AND unix_time BETWEEN ? AND ? " "ORDER BY unix_time", (clan_id, start, end), ) series: List[Tuple[int, int]] = [(row[0], row[1]) for row in cur.fetchall()] if not series: return RatingDerived(series=[], final=None, peak=None, peak_ts=None, first=None, change=None) first = series[0][1] final = series[-1][1] peak_ts, peak = max(series, key=lambda t: t[1]) return RatingDerived(series=series, final=final, peak=peak, peak_ts=peak_ts, first=first, change=final - first) @dataclass class MatchRow: session_id: str endtime_unix: int winning_sq: str losing_sq: str @dataclass class MatchDerived: stream: List[MatchRow] total: int wins: int losses: int wr_pct: Optional[float] longest_win_streak: int top_opponent: Optional[Tuple[str, int, int]] # (short, matches_vs, wins_vs) def gather_squadron_match_stream(conn_b: sqlite3.Connection, short: str, start: int, end: int) -> List[MatchRow]: cur = conn_b.execute( "SELECT session_id, endtime_unix, winning_sq, losing_sq " "FROM match_summary " "WHERE (winning_sq = ? OR losing_sq = ?) " " AND endtime_unix BETWEEN ? AND ? " "ORDER BY endtime_unix", (short, short, start, end), ) return [MatchRow(session_id=r[0], endtime_unix=r[1], winning_sq=r[2] or "", losing_sq=r[3] or "") for r in cur.fetchall()] def gather_squadron_per_match_stream(conn_b: sqlite3.Connection, short: str, start: int, end: int ) -> "List[PlayerGameRow]": """One synthetic PlayerGameRow per squadron match (kills/deaths summed across all squadron members in that match, win/loss from match_summary). Shaped as PlayerGameRow so the shared rolling helpers work unchanged.""" cur = conn_b.execute( "SELECT pgh.session_id, MAX(pgh.endtime_unix) AS ts, " " ms.winning_sq, " " COALESCE(SUM(pgh.ground_kills), 0) AS g_kills, " " COALESCE(SUM(pgh.air_kills), 0) AS a_kills, " " COALESCE(SUM(pgh.deaths), 0) AS deaths " "FROM player_games_hist pgh " "JOIN match_summary ms ON ms.session_id = pgh.session_id " "WHERE pgh.squadron_name = ? AND pgh.endtime_unix BETWEEN ? AND ? " "GROUP BY pgh.session_id " "ORDER BY ts", (short, start, end), ) rows: List[PlayerGameRow] = [] for sid, ts, win_sq, g, a, d in cur.fetchall(): rows.append(PlayerGameRow( session_id=sid, endtime_unix=ts, squadron_name=short, vehicle="", ground_kills=g or 0, air_kills=a or 0, assists=0, captures=0, deaths=d or 0, victor_bool="WIN" if (win_sq or "") == short else "LOSS", )) return rows def compute_squadron_match_derived(stream: List[MatchRow], short: str) -> MatchDerived: total = len(stream) wins = sum(1 for m in stream if m.winning_sq == short) losses = total - wins wr_pct = (wins / total * 100.0) if total else None # Longest win streak longest = cur_streak = 0 for m in stream: if m.winning_sq == short: cur_streak += 1 if cur_streak > longest: longest = cur_streak else: cur_streak = 0 # Top opponent (by match count), with wins-against count opp_total: dict[str, int] = defaultdict(int) opp_wins: dict[str, int] = defaultdict(int) for m in stream: other = m.losing_sq if m.winning_sq == short else m.winning_sq if not other: continue opp_total[other] += 1 if m.winning_sq == short: opp_wins[other] += 1 if opp_total: top_name = max(opp_total, key=lambda k: opp_total[k]) top_opponent: Optional[Tuple[str, int, int]] = (top_name, opp_total[top_name], opp_wins[top_name]) else: top_opponent = None return MatchDerived(stream=stream, total=total, wins=wins, losses=losses, wr_pct=wr_pct, longest_win_streak=longest, top_opponent=top_opponent) @dataclass class TimelineRange: squadron_name: str # "" = no squadron first_ts: int last_ts: int match_count: int def gather_player_squadron_timeline(conn_b: sqlite3.Connection, uid: str, start: int, end: int) -> List[TimelineRange]: """Walk player games in time order; group consecutive same-squadron rows into ranges. Empty string squadron_name is a valid range ("no squadron").""" cur = conn_b.execute( "SELECT squadron_name, endtime_unix FROM player_games_hist " "WHERE UID = ? AND endtime_unix BETWEEN ? AND ? " "ORDER BY endtime_unix", (uid, start, end), ) rows = cur.fetchall() if not rows: return [] ranges: List[TimelineRange] = [] cur_name = rows[0][0] or "" cur_first = cur_last = rows[0][1] cur_count = 1 for name, ts in rows[1:]: name = name or "" if name == cur_name: cur_last = ts cur_count += 1 else: ranges.append(TimelineRange(cur_name, cur_first, cur_last, cur_count)) cur_name, cur_first, cur_last, cur_count = name, ts, ts, 1 ranges.append(TimelineRange(cur_name, cur_first, cur_last, cur_count)) # Drop ranges shorter than one day (brief squadron stints aren't worth # surfacing in the chain, legend, or shading), then merge any adjacent # same-squadron ranges that got left side-by-side after the drop. ranges = [r for r in ranges if (r.last_ts - r.first_ts) >= 86400] merged: List[TimelineRange] = [] for r in ranges: if merged and merged[-1].squadron_name == r.squadron_name: prev = merged[-1] merged[-1] = TimelineRange( squadron_name=prev.squadron_name, first_ts=prev.first_ts, last_ts=r.last_ts, match_count=prev.match_count + r.match_count, ) else: merged.append(r) ranges = merged if not ranges: return [] # The final range covers "still in this squadron" — extend its last_ts # through the season window so rating snapshots after the player's final # match (decay, continued membership) still appear on the chart. last = ranges[-1] if last.last_ts < end: ranges[-1] = TimelineRange( squadron_name=last.squadron_name, first_ts=last.first_ts, last_ts=end, match_count=last.match_count, ) return ranges @dataclass class PlayerGameRow: session_id: str endtime_unix: int squadron_name: str vehicle: str ground_kills: int air_kills: int assists: int captures: int deaths: int victor_bool: str # "WIN" | "LOSS" (case varies in source data) def gather_player_game_stream(conn_b: sqlite3.Connection, uid: str, start: int, end: int) -> List[PlayerGameRow]: cur = conn_b.execute( "SELECT session_id, endtime_unix, squadron_name, vehicle, " " ground_kills, air_kills, assists, captures, deaths, victor_bool " "FROM player_games_hist " "WHERE UID = ? AND endtime_unix BETWEEN ? AND ? " "ORDER BY endtime_unix", (uid, start, end), ) return [ PlayerGameRow( session_id=r[0], endtime_unix=r[1], squadron_name=r[2] or "", vehicle=apply_vehicle_name_filters(r[3] or ""), ground_kills=r[4] or 0, air_kills=r[5] or 0, assists=r[6] or 0, captures=r[7] or 0, deaths=r[8] or 0, victor_bool=r[9] or "", ) for r in cur.fetchall() ] @dataclass class RatingSegment: squadron_name: str # "" if the range had no squadron points: List[Tuple[int, int]] # (unix_time, player_points); empty if unresolvable def _decompress_clan_pts(blob) -> Optional[dict]: """clan_pts is a gzipped JSON of [members_dict, total_score]. Return members_dict ({uid_str: {"points": N, ...}, ...}) or None on failure.""" try: if isinstance(blob, (bytes, memoryview)): data = json.loads(gzip.decompress(bytes(blob))) else: data = json.loads(blob) except (OSError, ValueError, TypeError): return None if isinstance(data, list) and data and isinstance(data[0], dict): return data[0] if isinstance(data, dict): return data return None def _resolve_squadron_long_name(conn_sq: sqlite3.Connection, short_name: str) -> Optional[str]: if not short_name: return None cur = conn_sq.execute( "SELECT long_name FROM squadrons_data WHERE short_name = ? LIMIT 1", (short_name,), ) row = cur.fetchone() return row[0] if row and row[0] else None def gather_player_rating_trail(conn_sq: sqlite3.Connection, timeline: List[TimelineRange], uid: str) -> List[RatingSegment]: """For each timeline range, read the player's personal SQB points (from each squadrons_points snapshot's gzipped member roster) during that range. Returns per-range (unix_time, player_points) lists.""" segments: List[RatingSegment] = [] long_cache: dict[str, Optional[str]] = {} for rng in timeline: if not rng.squadron_name: segments.append(RatingSegment("", [])) continue if rng.squadron_name not in long_cache: long_cache[rng.squadron_name] = _resolve_squadron_long_name( conn_sq, rng.squadron_name ) long_name = long_cache[rng.squadron_name] if not long_name: segments.append(RatingSegment(rng.squadron_name, [])) continue cur = conn_sq.execute( "SELECT unix_time, clan_pts FROM squadrons_points " "WHERE long_name = ? AND unix_time BETWEEN ? AND ? " "ORDER BY unix_time", (long_name, rng.first_ts, rng.last_ts), ) points: List[Tuple[int, int]] = [] for ts, blob in cur.fetchall(): members = _decompress_clan_pts(blob) if not members: continue entry = members.get(uid) or members.get(str(uid)) if not isinstance(entry, dict): continue pts = entry.get("points") if isinstance(pts, (int, float)): points.append((ts, int(pts))) segments.append(RatingSegment(rng.squadron_name, points)) return segments def gather_player_most_common_opponent(conn_b: sqlite3.Connection, game_stream: List[PlayerGameRow] ) -> Optional[Tuple[str, int]]: """For each session the player was in, look up winning/losing squadron in match_summary; opponent is whichever side isn't the player's squadron in that game. Returns (opp_short_name, matches_vs) or None.""" if not game_stream: return None ids = list({g.session_id for g in game_stream}) placeholders = ",".join("?" * len(ids)) cur = conn_b.execute( f"SELECT session_id, winning_sq, losing_sq " f"FROM match_summary WHERE session_id IN ({placeholders})", ids, ) by_session: dict[str, Tuple[str, str]] = { r[0]: (r[1] or "", r[2] or "") for r in cur.fetchall() } opp_counts: dict[str, int] = defaultdict(int) for g in game_stream: pair = by_session.get(g.session_id) if not pair: continue w, l = pair own = g.squadron_name opp = l if w == own else (w if l == own else "") if opp: opp_counts[opp] += 1 if not opp_counts: return None name = max(opp_counts, key=lambda k: opp_counts[k]) return name, opp_counts[name] def gather_player_frequent_teammate(conn_b: sqlite3.Connection, uid: str, start: int, end: int ) -> Optional[Tuple[str, int]]: """Self-join on session_id — counts UIDs (not the player) that shared the most sessions with the player while on the same squadron in that session.""" cur = conn_b.execute( "SELECT p.UID, COUNT(DISTINCT p.session_id) AS shared " "FROM player_games_hist p " "JOIN player_games_hist me " " ON me.session_id = p.session_id " " AND me.squadron_name = p.squadron_name " "WHERE me.UID = ? " " AND p.UID != ? " " AND me.endtime_unix BETWEEN ? AND ? " "GROUP BY p.UID " "ORDER BY shared DESC " "LIMIT 1", (uid, uid, start, end), ) row = cur.fetchone() if not row or not row[1]: return None teammate_uid, shared = row[0], int(row[1]) # MIN(nick) would pick byte-sorted smallest — e.g. a 'coop/…' disconnect # row beats the real nick. Pick the most frequent non-coop nick instead, # tiebreaking by recency so renames surface the latest identity. nick_cur = conn_b.execute( "SELECT nick FROM player_games_hist " "WHERE UID = ? AND nick NOT LIKE 'coop/%' " "GROUP BY nick " "ORDER BY COUNT(*) DESC, MAX(endtime_unix) DESC " "LIMIT 1", (teammate_uid,), ) nick_row = nick_cur.fetchone() return (nick_row[0] if nick_row and nick_row[0] else "unknown"), shared @dataclass class BestMatchRow: session_id: str endtime_unix: int vehicle: str ground_kills: int air_kills: int assists: int captures: int deaths: int @dataclass class PlayerDerived: total: int wins: int losses: int wr_pct: Optional[float] ground_kills: int air_kills: int total_kills: int total_deaths: int assists: int captures: int kd: Optional[float] longest_win_streak: int top_vehicle: Optional[Tuple[str, int]] best_match: Optional[BestMatchRow] peak_rating: Optional[Tuple[int, str, int]] # (rating, squadron_name, unix_time) def compute_player_derived(stream: List[PlayerGameRow], timeline: List[TimelineRange], trail: List[RatingSegment]) -> PlayerDerived: del timeline # reserved for future squadron-weighted metrics; currently unused total = len(stream) wins = sum(1 for g in stream if g.victor_bool.upper() == "WIN") losses = total - wins wr = (wins / total * 100.0) if total else None gk = sum(g.ground_kills for g in stream) ak = sum(g.air_kills for g in stream) dk = sum(g.deaths for g in stream) asts = sum(g.assists for g in stream) caps = sum(g.captures for g in stream) kd = ((gk + ak) / dk) if dk else None longest = cur = 0 for g in stream: if g.victor_bool.upper() == "WIN": cur += 1 longest = max(longest, cur) else: cur = 0 veh_counts: dict[str, int] = defaultdict(int) for g in stream: if g.vehicle: veh_counts[g.vehicle] += 1 if veh_counts: top_name = max(veh_counts, key=lambda k: veh_counts[k]) top_vehicle: Optional[Tuple[str, int]] = (top_name, veh_counts[top_name]) else: top_vehicle = None best: Optional[BestMatchRow] = None best_score = -1 for g in stream: score = g.ground_kills + g.air_kills + g.assists + g.captures if score > best_score: best_score = score best = BestMatchRow( session_id=g.session_id, endtime_unix=g.endtime_unix, vehicle=g.vehicle, ground_kills=g.ground_kills, air_kills=g.air_kills, assists=g.assists, captures=g.captures, deaths=g.deaths, ) elif score == best_score and best is not None and g.deaths < best.deaths: best = BestMatchRow( session_id=g.session_id, endtime_unix=g.endtime_unix, vehicle=g.vehicle, ground_kills=g.ground_kills, air_kills=g.air_kills, assists=g.assists, captures=g.captures, deaths=g.deaths, ) peak: Optional[Tuple[int, str, int]] = None for seg in trail: for ts, rating in seg.points: if peak is None or rating > peak[0]: peak = (rating, seg.squadron_name, ts) return PlayerDerived( total=total, wins=wins, losses=losses, wr_pct=wr, ground_kills=gk, air_kills=ak, total_kills=gk + ak, total_deaths=dk, assists=asts, captures=caps, kd=kd, longest_win_streak=longest, top_vehicle=top_vehicle, best_match=best, peak_rating=peak, ) @dataclass class PlayerAggregates: ground_kills: int air_kills: int deaths: int assists: int captures: int total_kills: int kd: Optional[float] top_vehicle: Optional[Tuple[str, int]] # (vehicle_raw_name, games) mvp: Optional[Tuple[str, int, int, int]] # (nick, kills, assists, captures) most_active: Optional[Tuple[str, int]] # (nick, matches_played) def gather_squadron_player_aggregates(conn_b: sqlite3.Connection, squadron_name: str, start: int, end: int) -> PlayerAggregates: # Totals cur = conn_b.execute( "SELECT COALESCE(SUM(ground_kills), 0), COALESCE(SUM(air_kills), 0), " " COALESCE(SUM(deaths), 0), COALESCE(SUM(assists), 0), " " COALESCE(SUM(captures), 0) " "FROM player_games_hist " "WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ?", (squadron_name, start, end), ) g, a, d, asst, cap = cur.fetchone() # Top vehicle cur = conn_b.execute( "SELECT vehicle, COUNT(*) AS c FROM player_games_hist " "WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? " " AND vehicle IS NOT NULL AND vehicle != '' " "GROUP BY vehicle ORDER BY c DESC LIMIT 1", (squadron_name, start, end), ) row = cur.fetchone() top_vehicle: Optional[Tuple[str, int]] = ( (apply_vehicle_name_filters(row[0]), row[1]) if row else None ) # MVP — ranked by K + A + C (no weighting); report the breakdown cur = conn_b.execute( "SELECT MIN(nick) AS nick, " " SUM(ground_kills + air_kills) AS kills, " " SUM(assists) AS assists, " " SUM(captures) AS captures " "FROM player_games_hist " "WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? " "GROUP BY UID " "ORDER BY SUM(ground_kills + air_kills + assists + captures) DESC " "LIMIT 1", (squadron_name, start, end), ) row = cur.fetchone() mvp: Optional[Tuple[str, int, int, int]] = ( (row[0], row[1] or 0, row[2] or 0, row[3] or 0) if row and (row[1] or row[2] or row[3]) else None ) # Most active — matches played per UID cur = conn_b.execute( "SELECT MIN(nick) AS nick, COUNT(DISTINCT session_id) AS matches " "FROM player_games_hist " "WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? " "GROUP BY UID ORDER BY matches DESC LIMIT 1", (squadron_name, start, end), ) row = cur.fetchone() most_active: Optional[Tuple[str, int]] = (row[0], row[1]) if row and row[1] else None total_kills = g + a kd = (total_kills / d) if d else None return PlayerAggregates(ground_kills=g, air_kills=a, deaths=d, assists=asst, captures=cap, total_kills=total_kills, kd=kd, top_vehicle=top_vehicle, mvp=mvp, most_active=most_active) def compute_squadron_wr_curve(stream: List[MatchRow], short: str, season_start: int, season_end: int) -> Tuple[List[Tuple[int, float]], int]: """ Returns (points, window_size). Points: list of (unix_time, wr_pct) anchored at each match's endtime, computed as rolling WR over the previous `window_size` matches (inclusive of current). x-axis domain callers want = [season_start, season_end]. """ total = len(stream) if total == 0: return [], 0 window = max(20, total // 20) if total < window: # Cumulative WR: point per match running_wins = 0 points: List[Tuple[int, float]] = [] for i, m in enumerate(stream): if m.winning_sq == short: running_wins += 1 wr = running_wins / (i + 1) * 100.0 points.append((m.endtime_unix, wr)) return points, window # Full rolling wins_flags = [1 if m.winning_sq == short else 0 for m in stream] points = [] window_sum = sum(wins_flags[:window]) # First plottable index = window - 1 points.append((stream[window - 1].endtime_unix, window_sum / window * 100.0)) for i in range(window, total): window_sum += wins_flags[i] - wins_flags[i - window] points.append((stream[i].endtime_unix, window_sum / window * 100.0)) return points, window def compute_rolling_wr(stream: List[PlayerGameRow], window_seconds: int = 7 * 86400 ) -> List[Tuple[int, float]]: """Trailing time-window win rate, one point per match. Stale games drop out of the window naturally during play gaps so the curve doesn't mix stats across weeks.""" total = len(stream) if total == 0: return [] wins = [1 if g.victor_bool.upper() == "WIN" else 0 for g in stream] out: List[Tuple[int, float]] = [] left = 0 wins_in_window = 0 for right in range(total): t_r = stream[right].endtime_unix wins_in_window += wins[right] while stream[left].endtime_unix < t_r - window_seconds: wins_in_window -= wins[left] left += 1 n = right - left + 1 out.append((t_r, wins_in_window / n * 100.0)) return out def compute_rolling_kd(stream: List[PlayerGameRow], window_seconds: int = 7 * 86400 ) -> List[Tuple[int, float]]: """Trailing time-window K/D, one point per match. deaths=0 is bounded via max(deaths, 1) to keep values finite during kill-heavy early games.""" total = len(stream) if total == 0: return [] kills = [g.ground_kills + g.air_kills for g in stream] deaths = [g.deaths for g in stream] out: List[Tuple[int, float]] = [] left = 0 k_sum = d_sum = 0 for right in range(total): t_r = stream[right].endtime_unix k_sum += kills[right] d_sum += deaths[right] while stream[left].endtime_unix < t_r - window_seconds: k_sum -= kills[left] d_sum -= deaths[left] left += 1 out.append((t_r, k_sum / max(d_sum, 1))) return out def compute_rolling_battles(stream: List[PlayerGameRow], window_seconds: int = 7 * 86400 ) -> List[Tuple[int, int]]: """Time-based rolling match count: at each game timestamp t, count of games with endtime_unix in [t - window, t].""" total = len(stream) if total == 0: return [] out: List[Tuple[int, int]] = [] left = 0 for right in range(total): t_r = stream[right].endtime_unix while stream[left].endtime_unix <= t_r - window_seconds: left += 1 out.append((t_r, right - left + 1)) return out # SQB timeslots (UTC) — source of truth is BOT/utils.py SQB_SLOTS_POSTED; duplicated # here because BOT/utils.py pulls in Discord bot dependencies, too heavy for this CLI. _SQB_TIMESLOTS_UTC: List[Tuple[str, int, int]] = [ ("NA", 1 * 3600, 7 * 3600), # 01:00 – 07:00 ("EU", 14 * 3600, 22 * 3600), # 14:00 – 22:00 ] def _classify_timeslot(ts: int) -> Optional[Tuple[str, _date]]: dt = datetime.fromtimestamp(ts, tz=timezone.utc) sod = int(dt.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()) offset = ts - sod for region, lo, hi in _SQB_TIMESLOTS_UTC: if lo <= offset <= hi: return region, dt.date() return None def compute_longest_timeslot_session(stream: List[PlayerGameRow] ) -> Optional[Tuple[str, _date, int]]: """Bucket games into (date_UTC, region) slots based on endtime_unix; return (region, date, match_count) for the bucket with the most games. Matches outside any posted SQB timeslot are excluded.""" buckets: dict[Tuple[str, _date], int] = defaultdict(int) for g in stream: slot = _classify_timeslot(g.endtime_unix) if slot is None: continue buckets[slot] += 1 if not buckets: return None (region, d), n = max(buckets.items(), key=lambda kv: kv[1]) return region, d, n def compute_most_active_day_utc(stream: List[PlayerGameRow] ) -> Optional[Tuple[_date, int]]: """Return the UTC date with the most matches (ignores timeslot boundaries).""" by_day: dict[_date, int] = defaultdict(int) for g in stream: d = datetime.fromtimestamp(g.endtime_unix, tz=timezone.utc).date() by_day[d] += 1 if not by_day: return None d, n = max(by_day.items(), key=lambda kv: kv[1]) return d, n def _fmt_int(n: Optional[int]) -> str: return f"{n:,}" if n is not None else "—" def _fmt_float(n: Optional[float], digits: int = 2) -> str: return f"{n:.{digits}f}" if n is not None else "—" def _fmt_date(ts: Optional[int]) -> str: if ts is None: return "—" return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d") def render_squadron_card(out_path: Path, ident: SquadronIdent, season: str, rating: RatingDerived, match: MatchDerived, players: PlayerAggregates, rolling_wr: List[Tuple[int, float]], rolling_kd: List[Tuple[int, float]], rolling_battles: List[Tuple[int, int]], season_start: int, season_end: int, week_boundaries: List[int], theme: str = "light", lang: str = DEFAULT_LANG) -> None: pal = theme_palette(theme) tr = load_translations(lang) def t(key: str) -> str: return tr.get(key, key) fig = plt.figure(figsize=(16, 10), dpi=120, facecolor=pal["bg"]) gs = GridSpec(3, 1, height_ratios=[0.55, 2.4, 1.35], hspace=0.35, left=0.13, right=0.87, top=0.93, bottom=0.06, figure=fig) # ── Header / hero row ──────────────────────────────────────────── ax_head = fig.add_subplot(gs[0]) ax_head.set_axis_off() ax_head.text(0.0, 0.75, f"{ident.short_name} · {ident.long_name}", fontsize=22, fontweight="bold", color=pal["text"], transform=ax_head.transAxes) ax_head.text(1.0, 0.75, f"{season} {t('imgRecapSuffix')}", fontsize=18, fontweight="bold", color=pal["header_sub"], ha="right", transform=ax_head.transAxes) hero = [ (t("imgHeroFinalRating"), _fmt_int(rating.final)), (t("imgHeroMatches"), _fmt_int(match.total)), (t("imgHeroWinRate"), (f"{match.wr_pct:.1f}%" if match.wr_pct is not None else "—")), (t("imgHeroKD"), _fmt_float(players.kd)), ] for i, (label, value) in enumerate(hero): x = 0.04 + i * 0.24 ax_head.text(x, 0.15, value, fontsize=26, fontweight="bold", color=pal["text"], va="center", transform=ax_head.transAxes) ax_head.text(x, -0.10, label, fontsize=11, color=pal["muted"], va="center", transform=ax_head.transAxes) for y in (0.55, -0.32): ax_head.plot([0.01, 0.99], [y, y], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_head.transAxes, clip_on=False) # ── Main graph (rating + WR dual axis) ─────────────────────────── ax = fig.add_subplot(gs[1]) ax.set_facecolor(pal["bg"]) ax.set_xlim(season_start, season_end) for b in week_boundaries: if season_start <= b <= season_end: ax.axvline(b, color=pal["grid"], alpha=0.3, linewidth=1) if rating.series: smoothed = smooth_series(rating.series, window=12) xs = [ts for ts, _ in smoothed] ys = [v for _, v in smoothed] ax.plot(xs, ys, color=RATING_COLOR, linewidth=2.2, zorder=3) ax.set_ylabel(t("imgAxisRating"), color=RATING_COLOR, fontsize=12) ax.tick_params(axis="y", colors=RATING_COLOR) ax.spines["left"].set_color(RATING_COLOR) ax.spines["top"].set_color(pal["grid"]) ax.spines["bottom"].set_color(pal["grid"]) ax.tick_params(axis="x", colors=pal["muted"]) def _prep(series: list) -> list: return [_smooth_rolling_curve(_downsample_daily(part), window=4) for part in _split_on_gaps(series, 7 * 86400)] ax_wr = ax.twinx() for s in _prep(rolling_wr): ax_wr.plot([p[0] for p in s], [p[1] for p in s], color=WR_COLOR, linewidth=1.3, alpha=0.8, zorder=2) ax_wr.set_ylim(0, 100) ax_wr.set_ylabel(t("imgAxisWinRate"), color=WR_COLOR, fontsize=12) ax_wr.tick_params(axis="y", colors=WR_COLOR) ax_wr.spines["right"].set_color(WR_COLOR) ax_wr.spines["left"].set_visible(False) ax_kd = ax.twinx() ax_kd.spines["right"].set_position(("axes", 1.08)) for s in _prep(rolling_kd): ax_kd.plot([p[0] for p in s], [p[1] for p in s], color=KD_COLOR, linewidth=1.1, alpha=0.7, zorder=2) ax_kd.set_ylabel(t("imgAxisKD"), color=KD_COLOR, fontsize=12, labelpad=10) ax_kd.tick_params(axis="y", colors=KD_COLOR) ax_kd.spines["right"].set_color(KD_COLOR) ax_kd.spines["left"].set_visible(False) ax_b = ax.twinx() ax_b.spines["left"].set_position(("axes", -0.08)) ax_b.spines["left"].set_visible(True) ax_b.spines["right"].set_visible(False) ax_b.yaxis.set_label_position("left") ax_b.yaxis.tick_left() for s in _prep(rolling_battles): ax_b.plot([p[0] for p in s], [p[1] for p in s], color=BATTLES_COLOR, linewidth=1.0, alpha=0.6, zorder=2) ax_b.set_ylabel(t("imgAxisBattles"), color=BATTLES_COLOR, fontsize=12, labelpad=10) ax_b.tick_params(axis="y", colors=BATTLES_COLOR) ax_b.spines["left"].set_color(BATTLES_COLOR) boundary_inside = [b for b in week_boundaries if season_start <= b <= season_end] if boundary_inside: ax.set_xticks(boundary_inside) ax.set_xticklabels( [datetime.fromtimestamp(b, tz=timezone.utc).strftime("%m-%d") for b in boundary_inside], rotation=0, fontsize=9 ) ax.grid(axis="y", linestyle=":", color=pal["grid"], alpha=0.4) # BR schedule ticks above the chart — centered over each BR segment, # with a faint dashed divider at each segment boundary so tiers read # as distinct bands. for entry in _load_br_schedule(): s = int(entry["start"]) e = int(entry["end"]) if e < season_start or s > season_end: continue s_c = max(s, season_start) e_c = min(e, season_end) mid = (s_c + e_c) / 2 if season_start < s < season_end: ax.axvline(s, color=pal["muted"], alpha=0.45, linewidth=0.7, linestyle=(0, (3, 3)), zorder=1) ax.text(mid, 1.02, f"{float(entry['max_br']):.1f}", color=pal["header_sub"], fontsize=10, fontweight="bold", ha="center", va="bottom", transform=ax.get_xaxis_transform(), clip_on=False) # ── Supporting stats grid ──────────────────────────────────────── ax_stats = fig.add_subplot(gs[2]) ax_stats.set_axis_off() def _fmt_opponent(op: Optional[Tuple[str, int, int]]) -> str: if not op: return "—" name, total, wins = op losses = total - wins return (f"{name} ({total} {t('imgUnitMatches')}, " f"{wins} {t('imgUnitWins')} / {losses} {t('imgUnitLosses')})") def _fmt_mvp(m: Optional[Tuple[str, int, int, int]]) -> str: if not m: return "—" name, kills, assists, captures = m return (f"{name} — {_fmt_int(kills)} {t('imgUnitKills')}, " f"{_fmt_int(assists)} {t('imgUnitAssists')}, " f"{_fmt_int(captures)} {t('imgUnitCaptures')}") # Short rows render in 2 columns; wide rows span full width. short_rows: List[Tuple[str, str]] = [ (t("imgStatPeakRating"), f"{_fmt_int(rating.peak)} ({_fmt_date(rating.peak_ts)})"), (t("imgStatRatingChange"), f"{'+' if (rating.change or 0) >= 0 else ''}{_fmt_int(rating.change)}"), (t("imgStatTotalKills"), f"{_fmt_int(players.total_kills)} ({_fmt_int(players.ground_kills)} {t('imgGroundShort')} / {_fmt_int(players.air_kills)} {t('imgAirShort')})"), (t("imgStatTotalDeaths"), _fmt_int(players.deaths)), (t("imgStatAssistsCaptures"), f"{_fmt_int(players.assists)} / {_fmt_int(players.captures)}"), (t("imgStatMostPlayedVehicle"), f"{players.top_vehicle[0]} ({players.top_vehicle[1]} {t('imgUnitGames')})" if players.top_vehicle else "—"), (t("imgStatMostActive"), f"{players.most_active[0]} ({_fmt_int(players.most_active[1])} {t('imgUnitMatches')})" if players.most_active else "—"), (t("imgStatLongestWinStreak"), _fmt_int(match.longest_win_streak)), ] wide_rows: List[Tuple[str, str]] = [ (t("imgStatMVP"), _fmt_mvp(players.mvp)), (t("imgStatMostCommonOpponent"), _fmt_opponent(match.top_opponent)), ] cols = 2 short_per_col = (len(short_rows) + cols - 1) // cols # 4 total_bands = short_per_col + len(wide_rows) # 6 grid_top = 0.98 grid_bottom = 0.05 band_h = (grid_top - grid_bottom) / total_bands # Short rows — 2-column layout in the top `short_per_col` bands for i, (label, value) in enumerate(short_rows): col, row_in_col = divmod(i, short_per_col) x = 0.02 + col * 0.52 y = grid_top - band_h * (row_in_col + 0.5) ax_stats.text(x, y, f"{label}:", fontsize=12, color=pal["muted"], va="center", transform=ax_stats.transAxes) ax_stats.text(x + 0.20, y, value, fontsize=12, color=pal["text"], va="center", transform=ax_stats.transAxes) # Wide rows — full-width, below the short rows for i, (label, value) in enumerate(wide_rows): y = grid_top - band_h * (short_per_col + i + 0.5) ax_stats.text(0.02, y, f"{label}:", fontsize=12, color=pal["muted"], va="center", transform=ax_stats.transAxes) ax_stats.text(0.22, y, value, fontsize=12, color=pal["text"], va="center", transform=ax_stats.transAxes) # Horizontal dividers at band boundaries for k in range(1, total_bands): y = grid_top - band_h * k if k < short_per_col: # still in the 2-column zone — split divider around the vertical line ax_stats.plot([0.01, 0.49], [y, y], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_stats.transAxes, clip_on=False) ax_stats.plot([0.51, 0.99], [y, y], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_stats.transAxes, clip_on=False) else: # wide zone — single divider spans full width ax_stats.plot([0.01, 0.99], [y, y], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_stats.transAxes, clip_on=False) # Vertical divider between columns — only across the short-row zone ax_stats.plot([0.50, 0.50], [grid_top - band_h * short_per_col, grid_top], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_stats.transAxes, clip_on=False) fig.text(0.5, 0.015, f"SREBOT · {t('imgFooterGenerated')} {datetime.now(timezone.utc).strftime('%Y-%m-%d')}", ha="center", fontsize=8, color=pal["footer"]) tmp = out_path.with_suffix(out_path.suffix + ".tmp") tmp.parent.mkdir(parents=True, exist_ok=True) try: fig.savefig(tmp, dpi=120, facecolor=pal["bg"], bbox_inches=None, format="png") os.replace(tmp, out_path) finally: plt.close(fig) def render_squadron_placeholder(out_path: Path, short: str, season: str, reason: Optional[str] = None, theme: str = "light", lang: str = DEFAULT_LANG) -> None: pal = theme_palette(theme) tr = load_translations(lang) if reason is None: reason = tr.get("imgPlaceholderNoData", "No data for {short} in {season}").format( short=short, season=season ) fig = plt.figure(figsize=(16, 9), dpi=120, facecolor=pal["bg"]) ax = fig.add_subplot(1, 1, 1) ax.set_axis_off() ax.text(0.5, 0.62, f"{short} · {season}", fontsize=28, fontweight="bold", color=pal["text"], ha="center", transform=ax.transAxes) ax.text(0.5, 0.48, reason, fontsize=16, color=pal["muted"], ha="center", transform=ax.transAxes) ax.text(0.5, 0.05, "SREBOT", fontsize=10, color=pal["footer"], ha="center", transform=ax.transAxes) tmp = out_path.with_suffix(out_path.suffix + ".tmp") tmp.parent.mkdir(parents=True, exist_ok=True) try: fig.savefig(tmp, dpi=120, facecolor=pal["bg"], format="png") os.replace(tmp, out_path) finally: plt.close(fig) def _assign_squadron_colors(timeline: List[TimelineRange]) -> dict[str, str]: """Deterministic per-recap palette assignment in timeline order.""" colors: dict[str, str] = {} slot = 0 for rng in timeline: name = rng.squadron_name if name == "": colors.setdefault("", _NO_SQUADRON_COLOR) continue if name not in colors: colors[name] = _SQUADRON_PALETTE[slot % len(_SQUADRON_PALETTE)] slot += 1 colors.setdefault("", _NO_SQUADRON_COLOR) return colors def render_player_card(out_path: Path, nick: str, uid: str, season: str, timeline: List[TimelineRange], trail: List[RatingSegment], derived: PlayerDerived, rolling_wr: List[Tuple[int, float]], rolling_kd: List[Tuple[int, float]], rolling_battles: List[Tuple[int, int]], most_common_opp: Optional[Tuple[str, int]], frequent_teammate: Optional[Tuple[str, int]], longest_session: Optional[Tuple[str, _date, int]], most_active_day: Optional[Tuple[_date, int]], season_start: int, season_end: int, week_boundaries: List[int], theme: str = "light", lang: str = DEFAULT_LANG) -> None: pal = theme_palette(theme) tr = load_translations(lang) def t(key: str) -> str: return tr.get(key, key) colors = _assign_squadron_colors(timeline) unique_sq_keys = {rng.squadron_name for rng in timeline} show_squadron_viz = len(unique_sq_keys) > 1 nick = _sanitize_render_text(nick) fig = plt.figure(figsize=(16, 10), dpi=120, facecolor=pal["bg"]) gs = GridSpec(3, 1, height_ratios=[0.55, 2.4, 1.6], hspace=0.35, left=0.13, right=0.87, top=0.93, bottom=0.06, figure=fig) # ── Header / hero ─────────────────────────────────────────────── ax_head = fig.add_subplot(gs[0]) ax_head.set_axis_off() ax_head.text(0.0, 0.75, f"{nick} · {t('imgUIDLabel')} {uid}", fontsize=22, fontweight="bold", color=pal["text"], transform=ax_head.transAxes) ax_head.text(1.0, 0.75, f"{season} {t('imgRecapSuffix')}", fontsize=18, fontweight="bold", color=pal["header_sub"], ha="right", transform=ax_head.transAxes) hero = [ (t("imgHeroBattles"), _fmt_int(derived.total)), (t("imgHeroWinRate"), (f"{derived.wr_pct:.1f}%" if derived.wr_pct is not None else "—")), (t("imgHeroKD"), _fmt_float(derived.kd)), (t("imgHeroTotalKills"), _fmt_int(derived.total_kills)), ] for i, (label, value) in enumerate(hero): x = 0.04 + i * 0.24 ax_head.text(x, 0.15, value, fontsize=26, fontweight="bold", color=pal["text"], va="center", transform=ax_head.transAxes) ax_head.text(x, -0.10, label, fontsize=11, color=pal["muted"], va="center", transform=ax_head.transAxes) for y in (0.55, -0.32): ax_head.plot([0.01, 0.99], [y, y], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_head.transAxes, clip_on=False) # ── Main chart with 4 axes ────────────────────────────────────── ax = fig.add_subplot(gs[1]) ax.set_facecolor(pal["bg"]) ax.set_xlim(season_start, season_end) if show_squadron_viz: for rng in timeline: c = colors.get(rng.squadron_name, _NO_SQUADRON_COLOR) alpha = 0.12 if rng.squadron_name else 0.05 ax.axvspan(rng.first_ts, rng.last_ts, color=c, alpha=alpha, zorder=0) for b in week_boundaries: if season_start <= b <= season_end: ax.axvline(b, color=pal["grid"], alpha=0.3, linewidth=1, zorder=1) for seg in trail: if not seg.points: continue smoothed = smooth_series(seg.points, window=12) xs = [p[0] for p in smoothed] ys = [p[1] for p in smoothed] # Rating line is always teal — squadron shading + the Squadrons # Represented chain carry the per-squadron color key. ax.plot(xs, ys, color=RATING_COLOR, linewidth=2.2, zorder=3) ax.set_ylabel(t("imgAxisRating"), color=RATING_COLOR, fontsize=12) ax.tick_params(axis="y", colors=RATING_COLOR) ax.spines["left"].set_color(RATING_COLOR) def _prep(series: list) -> list: # Clip to squadron timeline ranges so rolling curves match the # shading, then daily-downsample + smooth inside each segment. return [_smooth_rolling_curve(_downsample_daily(part), window=4) for part in _clip_to_ranges(series, timeline)] ax_wr = ax.twinx() for s in _prep(rolling_wr): ax_wr.plot([p[0] for p in s], [p[1] for p in s], color=WR_COLOR, linewidth=1.3, alpha=0.8, zorder=2) ax_wr.set_ylim(0, 100) ax_wr.set_ylabel(t("imgAxisWinRate"), color=WR_COLOR, fontsize=12) ax_wr.tick_params(axis="y", colors=WR_COLOR) ax_wr.spines["right"].set_color(WR_COLOR) ax_wr.spines["left"].set_visible(False) # don't paint over ax's teal left spine ax_kd = ax.twinx() ax_kd.spines["right"].set_position(("axes", 1.08)) for s in _prep(rolling_kd): ax_kd.plot([p[0] for p in s], [p[1] for p in s], color=KD_COLOR, linewidth=1.1, alpha=0.7, zorder=2) ax_kd.set_ylabel(t("imgAxisKD"), color=KD_COLOR, fontsize=12, labelpad=10) ax_kd.tick_params(axis="y", colors=KD_COLOR) ax_kd.spines["right"].set_color(KD_COLOR) ax_kd.spines["left"].set_visible(False) ax_b = ax.twinx() ax_b.spines["left"].set_position(("axes", -0.08)) ax_b.spines["left"].set_visible(True) ax_b.spines["right"].set_visible(False) ax_b.yaxis.set_label_position("left") ax_b.yaxis.tick_left() for s in _prep(rolling_battles): ax_b.plot([p[0] for p in s], [p[1] for p in s], color=BATTLES_COLOR, linewidth=1.0, alpha=0.6, zorder=2) ax_b.set_ylabel(t("imgAxisBattles"), color=BATTLES_COLOR, fontsize=12, labelpad=10) ax_b.tick_params(axis="y", colors=BATTLES_COLOR) ax_b.spines["left"].set_color(BATTLES_COLOR) boundary_inside = [b for b in week_boundaries if season_start <= b <= season_end] if boundary_inside: ax.set_xticks(boundary_inside) ax.set_xticklabels( [datetime.fromtimestamp(b, tz=timezone.utc).strftime("%m-%d") for b in boundary_inside], rotation=0, fontsize=9 ) ax.tick_params(axis="x", colors=pal["muted"]) ax.grid(axis="y", linestyle=":", color=pal["grid"], alpha=0.4) # BR schedule ticks above the chart — centered over each BR segment, # with a faint dashed divider at each segment boundary so tiers read # as distinct bands. for entry in _load_br_schedule(): s = int(entry["start"]) e = int(entry["end"]) if e < season_start or s > season_end: continue s_c = max(s, season_start) e_c = min(e, season_end) mid = (s_c + e_c) / 2 if season_start < s < season_end: ax.axvline(s, color=pal["muted"], alpha=0.45, linewidth=0.7, linestyle=(0, (3, 3)), zorder=1) ax.text(mid, 1.02, f"{float(entry['max_br']):.1f}", color=pal["header_sub"], fontsize=10, fontweight="bold", ha="center", va="bottom", transform=ax.get_xaxis_transform(), clip_on=False) # ── Stats grid (bottom) ───────────────────────────────────────── ax_stats = fig.add_subplot(gs[2]) ax_stats.set_axis_off() kdac_value = ( f"{_fmt_int(derived.total_kills)} " f"({_fmt_int(derived.ground_kills)} {t('imgGroundShort')} / " f"{_fmt_int(derived.air_kills)} {t('imgAirShort')}) / " f"{_fmt_int(derived.total_deaths)} / " f"{_fmt_int(derived.assists)} / " f"{_fmt_int(derived.captures)}" ) short_rows: List[Tuple[str, str]] = [ (t("imgStatKDAC"), kdac_value), (t("imgStatMostPlayedVehicle"), f"{derived.top_vehicle[0]} ({derived.top_vehicle[1]} {t('imgUnitGames')})" if derived.top_vehicle else "—"), (t("imgStatLongestWinStreak"), _fmt_int(derived.longest_win_streak)), (t("imgStatLongestSession"), f"{longest_session[0]} · {longest_session[1].isoformat()} — " f"{longest_session[2]} {t('imgUnitMatches')}" if longest_session else "—"), (t("imgStatMostActiveDay"), f"{most_active_day[0].isoformat()} — {most_active_day[1]} {t('imgUnitMatches')}" if most_active_day else "—"), (t("imgStatMostCommonOpponent"), f"{most_common_opp[0]} ({most_common_opp[1]} {t('imgUnitMatches')})" if most_common_opp else "—"), ] peak_str = (f"{_fmt_int(derived.peak_rating[0])} · {derived.peak_rating[1]} · " f"{_fmt_date(derived.peak_rating[2])}") if derived.peak_rating else "—" if derived.best_match: bm = derived.best_match best_match_str = t("imgBestMatchLine").format( vehicle=bm.vehicle, gk=bm.ground_kills, ak=bm.air_kills, assists=bm.assists, cap=bm.captures, deaths=bm.deaths, date=_fmt_date(bm.endtime_unix), ) else: best_match_str = "—" chain_parts: List[str] = [] for rng in timeline: dot = "●" if rng.squadron_name else "○" name = rng.squadron_name if rng.squadron_name else t("imgUnitNoSquadron") chain_parts.append(f"{dot} {name} ({rng.match_count})") chain_str = " → ".join(chain_parts) if chain_parts else "—" teammate_str = (f"{frequent_teammate[0]} — {frequent_teammate[1]} " f"{t('imgUnitTogether')}") if frequent_teammate else "—" wide_rows: List[Tuple[str, str]] = [ (t("imgStatPeakSquadronRating"), peak_str), (t("imgStatBestMatch"), best_match_str), (t("imgStatSquadronsRepresented"), chain_str), (t("imgStatFrequentTeammate"), teammate_str), ] cols = 2 short_per_col = (len(short_rows) + cols - 1) // cols total_bands = short_per_col + len(wide_rows) grid_top = 0.98 band_h = (grid_top - 0.05) / total_bands for i, (label, value) in enumerate(short_rows): col, row_in_col = divmod(i, short_per_col) x = 0.02 + col * 0.52 y = grid_top - band_h * (row_in_col + 0.5) ax_stats.text(x, y, f"{label}:", fontsize=12, color=pal["muted"], va="center", transform=ax_stats.transAxes) ax_stats.text(x + 0.20, y, value, fontsize=12, color=pal["text"], va="center", transform=ax_stats.transAxes) squadrons_label = t("imgStatSquadronsRepresented") for i, (label, value) in enumerate(wide_rows): y = grid_top - band_h * (short_per_col + i + 0.5) ax_stats.text(0.02, y, f"{label}:", fontsize=12, color=pal["muted"], va="center", transform=ax_stats.transAxes) if label == squadrons_label: children: List = [] for j, rng in enumerate(timeline): if j > 0: children.append(TextArea( " → ", textprops=dict(color=pal["muted"], fontsize=12), )) seg_color = colors.get(rng.squadron_name, _NO_SQUADRON_COLOR) name = rng.squadron_name if rng.squadron_name else t("imgUnitNoSquadron") dot_char = "●" if rng.squadron_name else "○" children.append(TextArea( f"{dot_char} {name} ({rng.match_count})", textprops=dict(color=seg_color, fontsize=12), )) if not children: children.append(TextArea( "—", textprops=dict(color=pal["text"], fontsize=12))) packed = HPacker(children=children, align="center", pad=0, sep=0) ax_stats.add_artist(AnchoredOffsetbox( loc="center left", child=packed, pad=0, frameon=False, bbox_to_anchor=(0.22, y), bbox_transform=ax_stats.transAxes, )) else: ax_stats.text(0.22, y, value, fontsize=12, color=pal["text"], va="center", transform=ax_stats.transAxes) for k in range(1, total_bands): y = grid_top - band_h * k if k < short_per_col: ax_stats.plot([0.01, 0.49], [y, y], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_stats.transAxes, clip_on=False) ax_stats.plot([0.51, 0.99], [y, y], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_stats.transAxes, clip_on=False) else: ax_stats.plot([0.01, 0.99], [y, y], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_stats.transAxes, clip_on=False) ax_stats.plot([0.50, 0.50], [grid_top - band_h * short_per_col, grid_top], color=pal["divider"], alpha=0.85, linewidth=1.0, transform=ax_stats.transAxes, clip_on=False) fig.text(0.5, 0.015, f"SREBOT · {t('imgFooterGenerated')} {datetime.now(timezone.utc).strftime('%Y-%m-%d')}", ha="center", fontsize=8, color=pal["footer"]) tmp = out_path.with_suffix(out_path.suffix + ".tmp") tmp.parent.mkdir(parents=True, exist_ok=True) try: fig.savefig(tmp, dpi=120, facecolor=pal["bg"], bbox_inches=None, format="png") os.replace(tmp, out_path) finally: plt.close(fig) def render_player_placeholder(out_path: Path, nick: str, season: str, reason: Optional[str] = None, theme: str = "light", lang: str = DEFAULT_LANG) -> None: pal = theme_palette(theme) tr = load_translations(lang) if reason is None: reason = tr.get("imgPlaceholderNoDataPlayer", "No data for {nick} in {season}").format(nick=nick, season=season) fig = plt.figure(figsize=(16, 9), dpi=120, facecolor=pal["bg"]) ax = fig.add_subplot(1, 1, 1) ax.set_axis_off() ax.text(0.5, 0.62, f"{nick} · {season}", fontsize=28, fontweight="bold", color=pal["text"], ha="center", transform=ax.transAxes) ax.text(0.5, 0.48, reason, fontsize=16, color=pal["muted"], ha="center", transform=ax.transAxes) ax.text(0.5, 0.05, "SREBOT", fontsize=10, color=pal["footer"], ha="center", transform=ax.transAxes) tmp = out_path.with_suffix(out_path.suffix + ".tmp") tmp.parent.mkdir(parents=True, exist_ok=True) try: fig.savefig(tmp, dpi=120, facecolor=pal["bg"], format="png") os.replace(tmp, out_path) finally: plt.close(fig) def run_squadron(args: Args) -> int: assert args.clan_id is not None with _open_ro(SQUADRONS_DB) as conn_sq: ident = resolve_squadron(conn_sq, args.clan_id) if ident is None: logging.error(f"unknown clan_id={args.clan_id}") return 2 logging.info(f"resolved {ident.short_name} / {ident.long_name}") rating = gather_squadron_rating(conn_sq, args.clan_id, args.season_start, args.season_end) logging.info(f"rating: series={len(rating.series)} final={rating.final} peak={rating.peak} change={rating.change}") with _open_ro(SQ_BATTLES_DB) as conn_b: stream = gather_squadron_match_stream(conn_b, ident.short_name, args.season_start, args.season_end) players = gather_squadron_player_aggregates(conn_b, ident.short_name, args.season_start, args.season_end) squad_stream = gather_squadron_per_match_stream( conn_b, ident.short_name, args.season_start, args.season_end ) match = compute_squadron_match_derived(stream, ident.short_name) logging.info(f"matches: total={match.total} wins={match.wins} wr={match.wr_pct} streak={match.longest_win_streak} top_opp={match.top_opponent}") rolling_wr = compute_rolling_wr(squad_stream) rolling_kd = compute_rolling_kd(squad_stream) rolling_battles = compute_rolling_battles(squad_stream) logging.info(f"rolling: wr_pts={len(rolling_wr)} kd_pts={len(rolling_kd)} battles_pts={len(rolling_battles)}") if rating.final is None and match.total == 0: logging.info("no data for squadron in season; writing placeholder") render_squadron_placeholder(args.out, ident.short_name, args.season, theme=args.theme, lang=args.lang) return 0 logging.info(f"players: kills={players.total_kills} deaths={players.deaths} kd={players.kd} top_veh={players.top_vehicle} active={players.most_active}") if players.mvp: _n, _k, _a, _c = players.mvp logging.info(f"mvp: {_n} K={_k} A={_a} C={_c}") render_squadron_card( args.out, ident, args.season, rating, match, players, rolling_wr, rolling_kd, rolling_battles, args.season_start, args.season_end, args.week_boundaries, theme=args.theme, lang=args.lang, ) logging.info(f"wrote {args.out}") return 0 def _resolve_latest_player_nick(conn_b: sqlite3.Connection, uid: str) -> str: cur = conn_b.execute( "SELECT nick FROM player_games_hist " "WHERE UID = ? AND nick NOT LIKE 'coop/%' " "ORDER BY session_id DESC LIMIT 1", (uid,), ) row = cur.fetchone() return row[0] if row else uid def run_player(args: Args) -> int: assert args.uid is not None with _open_ro(SQ_BATTLES_DB) as conn_b, _open_ro(SQUADRONS_DB) as conn_sq: nick = _resolve_latest_player_nick(conn_b, args.uid) timeline = gather_player_squadron_timeline( conn_b, args.uid, args.season_start, args.season_end ) stream = gather_player_game_stream( conn_b, args.uid, args.season_start, args.season_end ) trail = gather_player_rating_trail(conn_sq, timeline, args.uid) opp = gather_player_most_common_opponent(conn_b, stream) teammate = gather_player_frequent_teammate( conn_b, args.uid, args.season_start, args.season_end ) # Membership check: the final timeline range was extended to season_end # so we could scan past the player's last game. Clamp it back to the last # snapshot where the player actually appeared in the squadron's roster # (the last rating-trail point), so shading doesn't extend past when they # left the squadron. if timeline: final_idx = len(timeline) - 1 final_seg = trail[final_idx] if final_idx < len(trail) else None observed_ts = final_seg.points[-1][0] if final_seg and final_seg.points else None last_rng = timeline[final_idx] clamp_to = observed_ts if observed_ts is not None else min( last_rng.last_ts, args.season_end ) if clamp_to < last_rng.last_ts: timeline[final_idx] = TimelineRange( squadron_name=last_rng.squadron_name, first_ts=last_rng.first_ts, last_ts=clamp_to, match_count=last_rng.match_count, ) if not stream and all(not seg.points for seg in trail): logging.info("no data for player in season; writing placeholder") render_player_placeholder(args.out, nick, args.season, theme=args.theme, lang=args.lang) return 0 derived = compute_player_derived(stream, timeline, trail) rolling_wr = compute_rolling_wr(stream) rolling_kd = compute_rolling_kd(stream) rolling_battles = compute_rolling_battles(stream) longest = compute_longest_timeslot_session(stream) active_day = compute_most_active_day_utc(stream) logging.info( f"player nick={nick} games={len(stream)} timeline_ranges={len(timeline)} " f"rating_segments={sum(1 for s in trail if s.points)} " f"wr={derived.wr_pct} kd={derived.kd} streak={derived.longest_win_streak}" ) render_player_card( args.out, nick, args.uid, args.season, timeline, trail, derived, rolling_wr, rolling_kd, rolling_battles, opp, teammate, longest, active_day, args.season_start, args.season_end, args.week_boundaries, theme=args.theme, lang=args.lang, ) logging.info(f"wrote {args.out}") return 0 def main(argv: Optional[List[str]] = None) -> int: logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s", stream=sys.stderr) args = parse_args(argv) logging.info(f"render start mode={args.mode} season={args.season}") if args.mode == "squadron": return run_squadron(args) return run_player(args) if __name__ == "__main__": sys.exit(main())