Files
SREBOT/BOT/render_recap.py
T

1849 lines
71 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
render_recap.py
CLI tool that renders a season recap PNG for either a squadron or a single
player. Invoked by web/server.js on cache miss.
Usage (squadron mode — default):
python render_recap.py \\
--mode squadron \\
--clan-id 123456 \\
--season 2026-II \\
--season-start 1772348400 \\
--season-end 1777852799 \\
--week-boundaries 1772348400,1773039600,... \\
--out /path/to/output.png
Usage (player mode):
python render_recap.py \\
--mode player \\
--uid 987654321 \\
--season 2026-II \\
--season-start 1772348400 \\
--season-end 1777852799 \\
--week-boundaries 1772348400,1773039600,... \\
--out /path/to/output.png
Exits 0 on success, non-zero with stderr diagnostic on failure.
"""
import argparse
import gzip
import json
import re
import logging
import os
import sqlite3
import sys
from collections import defaultdict
from dataclasses import dataclass
from datetime import date as _date, datetime, timezone
from pathlib import Path
from typing import List, Literal, Optional, Tuple
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from matplotlib.offsetbox import AnchoredOffsetbox, HPacker, TextArea
plt.rcParams["font.family"] = "sans-serif"
plt.rcParams["font.sans-serif"] = [
"DejaVu Sans", "Noto Sans", "Noto Sans Symbols2",
"Arial", "Liberation Sans", "sans-serif",
]
def _sanitize_render_text(s: str) -> str:
"""Strip Unicode code points that render as tofu boxes (tag chars,
variation selectors)."""
if not s:
return s
return "".join(
c for c in s
if not (
0xE0000 <= ord(c) <= 0xE007F # language tag characters
or 0xFE00 <= ord(c) <= 0xFE0F # VS1-VS16
or 0xE0100 <= ord(c) <= 0xE01EF # VS17-VS256
)
)
# This script runs as a subprocess entry point (python BOT/render_recap.py …),
# so BOT/__init__.py is not invoked automatically. Put SREBOT on sys.path,
# then `import BOT` to trigger the package's SHARED bootstrap once.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import BOT # noqa: F401, E402 — side effect: adds BOTS/SHARED to sys.path
from data_parser import apply_vehicle_name_filters # noqa: E402
LOCALES_DIR = Path(__file__).resolve().parent.parent / "locales"
DEFAULT_LANG = "en"
def load_translations(lang: str) -> dict[str, str]:
"""Load seasonCard.* strings for the given language, falling back to English for missing keys."""
def _read(lang_code: str) -> dict:
path = LOCALES_DIR / f"{lang_code}.json"
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8")).get("seasonCard", {})
except Exception:
return {}
en = _read(DEFAULT_LANG)
translated = _read(lang) if lang != DEFAULT_LANG else {}
merged = dict(en)
merged.update(translated)
return merged
RATING_COLOR = "#14b8a6"
WR_COLOR = "#f59e0b"
KD_COLOR = "#a78bfa"
BATTLES_COLOR = "#60a5fa"
# Ordered so the first slots avoid clashing with the rolling-curve colors
# (WR #f59e0b orange, K/D #a78bfa violet, Battles #60a5fa blue).
_SQUADRON_PALETTE: List[str] = [
"#14b8a6", # teal (same as RATING_COLOR — single-squadron default)
"#dc2626", # red
"#059669", # green
"#ec4899", # pink
"#84cc16", # lime
"#06b6d4", # cyan
"#db2777", # magenta
"#65a30d", # olive
]
_NO_SQUADRON_COLOR = "#64748b"
THEMES: dict[str, dict[str, str]] = {
"light": {
"bg": "#fafafa",
"text": "#0f172a",
"muted": "#64748b",
"header_sub": "#475569",
"grid": "#94a3b8",
"divider": "#cbd5e1",
"footer": "#94a3b8",
},
"dark": {
"bg": "#121418",
"text": "#e5e7eb",
"muted": "#9ca3af",
"header_sub": "#d1d5db",
"grid": "#2a2f36",
"divider": "#262a30",
"footer": "#6b7280",
},
}
def theme_palette(name: str) -> dict[str, str]:
return THEMES.get(name, THEMES["light"])
def smooth_series(series: List[Tuple[int, int]], window: int = 12
) -> List[Tuple[int, float]]:
"""Centered moving average over (ts, value) pairs. Preserves length."""
n = len(series)
if n == 0:
return []
if n < window * 2:
return [(ts, float(v)) for ts, v in series]
half = window // 2
out: List[Tuple[int, float]] = []
for i in range(n):
lo = max(0, i - half)
hi = min(n, i + half + 1)
avg = sum(v for _, v in series[lo:hi]) / (hi - lo)
out.append((series[i][0], avg))
return out
def _downsample_daily(series: list) -> list:
"""Reduce to one point per UTC day, keeping each day's last (ts, value).
Caller can still apply smoothing; this just removes intra-day clustering
that otherwise renders as a vertical bar on a multi-month chart."""
if not series:
return []
out: list = []
prev_day = None
last_pt = None
for ts, v in series:
day = datetime.fromtimestamp(ts, tz=timezone.utc).date()
if prev_day is not None and day != prev_day and last_pt is not None:
out.append(last_pt)
last_pt = (ts, v)
prev_day = day
if last_pt is not None:
out.append(last_pt)
return out
def _split_on_gaps(series: list, gap_seconds: int) -> list:
"""Split a time-sorted series into sublists wherever consecutive timestamps
are more than gap_seconds apart. Prevents matplotlib from interpolating a
straight line across an inactive period."""
if not series:
return []
segments: list = [[]]
prev_ts = None
for item in series:
ts = item[0]
if prev_ts is not None and ts - prev_ts > gap_seconds and segments[-1]:
segments.append([])
segments[-1].append(item)
prev_ts = ts
return [s for s in segments if s]
def _clip_to_ranges(series: list, timeline: list) -> list:
"""Return a list of sublists — one per timeline range — containing only
the points whose ts falls inside that range's [first_ts, last_ts].
Used to make rolling WR/KD/Battles curves respect squadron shading
boundaries instead of sprawling across unshaded gaps."""
if not series or not timeline:
return []
out: list = []
for rng in timeline:
seg = [p for p in series if rng.first_ts <= p[0] <= rng.last_ts]
if seg:
out.append(seg)
return out
def _smooth_rolling_curve(series: list, window: int = 8) -> list:
"""Centered moving average for already-computed rolling curves.
Accepts (ts, int) or (ts, float) pairs; returns (ts, float)."""
n = len(series)
if n == 0:
return []
if n < window * 2:
return [(t, float(v)) for t, v in series]
half = window // 2
out: list = []
for i in range(n):
lo = max(0, i - half)
hi = min(n, i + half + 1)
avg = sum(v for _, v in series[lo:hi]) / (hi - lo)
out.append((series[i][0], float(avg)))
return out
_storage_env = os.environ.get("STORAGE_VOL_PATH", "").strip()
if not _storage_env:
raise RuntimeError("STORAGE_VOL_PATH must be set")
_STORAGE = Path(_storage_env)
SQUADRONS_DB = _STORAGE / "squadrons.db"
SQ_BATTLES_DB = _STORAGE / "sq_battles.db"
SEASONS_TXT = (Path(__file__).resolve().parent.parent
/ "constants" / "seasons")
_BR_LINE_RE = re.compile(r"<t:(\d+):\w+>.*?max BR\s+(\d+(?:\.\d+)?)",
re.IGNORECASE)
def _load_br_schedule() -> List[dict]:
"""Parse web/constants/seasons into a list of
[{'max_br': float, 'start': unix, 'end': unix}, ...].
Contains every season's BR tiers. Callers filter by season window."""
try:
text = SEASONS_TXT.read_text(encoding="utf-8")
except OSError:
return []
entries: List[dict] = []
for line in text.splitlines():
m = _BR_LINE_RE.search(line)
if not m:
continue
entries.append({
"start": int(m.group(1)),
"max_br": float(m.group(2)),
"end": 0,
})
# End = next entry's start (minus one sec); last entry gets +7d default.
for i, e in enumerate(entries):
nxt = entries[i + 1]["start"] if i + 1 < len(entries) else 0
e["end"] = (nxt - 1) if nxt > e["start"] else e["start"] + 7 * 86400
return entries
@dataclass
class Args:
mode: Literal["squadron", "player"]
clan_id: Optional[int]
uid: Optional[str]
season: str
season_start: int
season_end: int
week_boundaries: List[int]
out: Path
theme: str
lang: str
def parse_args(argv: Optional[List[str]] = None) -> Args:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--mode", choices=["squadron", "player"], default="squadron")
p.add_argument("--clan-id", type=int, default=None)
p.add_argument("--uid", type=str, default=None)
p.add_argument("--season", required=True)
p.add_argument("--season-start", type=int, required=True)
p.add_argument("--season-end", type=int, required=True)
p.add_argument("--week-boundaries", default="")
p.add_argument("--out", type=Path, required=True)
p.add_argument("--theme", choices=list(THEMES.keys()), default="light")
p.add_argument("--lang", default=DEFAULT_LANG)
ns = p.parse_args(argv)
if ns.mode == "squadron" and ns.clan_id is None:
p.error("--mode squadron requires --clan-id")
if ns.mode == "player":
if not ns.uid:
p.error("--mode player requires --uid")
if not ns.uid.isdigit():
p.error("--uid must be numeric")
boundaries = [int(x) for x in ns.week_boundaries.split(",") if x.strip()]
return Args(
mode=ns.mode,
clan_id=ns.clan_id,
uid=ns.uid,
season=ns.season,
season_start=ns.season_start,
season_end=ns.season_end,
week_boundaries=boundaries,
out=ns.out,
theme=ns.theme,
lang=ns.lang,
)
def _open_ro(path: Path) -> sqlite3.Connection:
uri = f"file:{path}?mode=ro"
return sqlite3.connect(uri, uri=True, timeout=10.0)
@dataclass
class SquadronIdent:
clan_id: int
short_name: str
long_name: str
def resolve_squadron(conn_sq: sqlite3.Connection, clan_id: int) -> Optional[SquadronIdent]:
cur = conn_sq.execute(
"SELECT short_name, long_name FROM squadrons_data WHERE clan_id = ?",
(clan_id,),
)
row = cur.fetchone()
if not row:
return None
return SquadronIdent(clan_id=clan_id, short_name=row[0] or "", long_name=row[1] or "")
@dataclass
class RatingDerived:
series: List[Tuple[int, int]] # (unix_time, total_score) ordered
final: Optional[int]
peak: Optional[int]
peak_ts: Optional[int]
first: Optional[int]
change: Optional[int]
def gather_squadron_rating(conn_sq: sqlite3.Connection, clan_id: int,
start: int, end: int) -> RatingDerived:
cur = conn_sq.execute(
"SELECT unix_time, total_score FROM squadrons_points "
"WHERE clan_id = ? AND unix_time BETWEEN ? AND ? "
"ORDER BY unix_time",
(clan_id, start, end),
)
series: List[Tuple[int, int]] = [(row[0], row[1]) for row in cur.fetchall()]
if not series:
return RatingDerived(series=[], final=None, peak=None, peak_ts=None, first=None, change=None)
first = series[0][1]
final = series[-1][1]
peak_ts, peak = max(series, key=lambda t: t[1])
return RatingDerived(series=series, final=final, peak=peak, peak_ts=peak_ts,
first=first, change=final - first)
def gather_squadron_place(conn_sq: sqlite3.Connection, clan_id: int,
season_start: int, season_end: int
) -> Optional[Tuple[int, int]]:
"""Finishing leaderboard place for the season.
We don't store a per-season position, but ``total_score`` in
squadrons_points IS the squadron rating that the in-game leaderboard ranks
by. So rank clans by their final score and read off this clan's rank.
Only clans with at least one snapshot *within the season window* are ranked
— a clan whose last data predates the season never appeared on that season's
ladder (dead/inactive clans drop off in-game), so counting them would inflate
the denominator. Each ranked clan uses its last in-season snapshot.
Returns (place, total_ranked) or None if this clan wasn't active in-season.
Works for in-progress seasons too (season_end in the future → last snapshot
so far = current standing).
"""
cur = conn_sq.execute(
"SELECT p.clan_id, p.total_score "
"FROM squadrons_points p "
"JOIN (SELECT clan_id, MAX(unix_time) AS mt FROM squadrons_points "
" WHERE unix_time BETWEEN ? AND ? GROUP BY clan_id) m "
" ON m.clan_id = p.clan_id AND m.mt = p.unix_time",
(season_start, season_end),
)
board = [(cid, sc) for cid, sc in cur.fetchall() if sc is not None]
if not board:
return None
board.sort(key=lambda r: r[1], reverse=True)
total = len(board)
for i, (cid, _sc) in enumerate(board):
if cid == clan_id:
return i + 1, total
return None
@dataclass
class MatchRow:
session_id: str
endtime_unix: int
winning_sq: str
losing_sq: str
@dataclass
class MatchDerived:
stream: List[MatchRow]
total: int
wins: int
losses: int
wr_pct: Optional[float]
longest_win_streak: int
top_opponent: Optional[Tuple[str, int, int]] # (short, matches_vs, wins_vs)
def gather_squadron_match_stream(conn_b: sqlite3.Connection, short: str,
start: int, end: int) -> List[MatchRow]:
cur = conn_b.execute(
"SELECT session_id, endtime_unix, winning_sq, losing_sq "
"FROM match_summary "
"WHERE (winning_sq = ? OR losing_sq = ?) "
" AND endtime_unix BETWEEN ? AND ? "
"ORDER BY endtime_unix",
(short, short, start, end),
)
return [MatchRow(session_id=r[0], endtime_unix=r[1], winning_sq=r[2] or "",
losing_sq=r[3] or "") for r in cur.fetchall()]
def gather_squadron_per_match_stream(conn_b: sqlite3.Connection, short: str,
start: int, end: int
) -> "List[PlayerGameRow]":
"""One synthetic PlayerGameRow per squadron match (kills/deaths summed
across all squadron members in that match, win/loss from match_summary).
Shaped as PlayerGameRow so the shared rolling helpers work unchanged."""
cur = conn_b.execute(
"SELECT pgh.session_id, MAX(pgh.endtime_unix) AS ts, "
" ms.winning_sq, "
" COALESCE(SUM(pgh.ground_kills), 0) AS g_kills, "
" COALESCE(SUM(pgh.air_kills), 0) AS a_kills, "
" COALESCE(SUM(pgh.deaths), 0) AS deaths "
"FROM player_games_hist pgh "
"JOIN match_summary ms ON ms.session_id = pgh.session_id "
"WHERE pgh.squadron_name = ? AND pgh.endtime_unix BETWEEN ? AND ? "
"GROUP BY pgh.session_id "
"ORDER BY ts",
(short, start, end),
)
rows: List[PlayerGameRow] = []
for sid, ts, win_sq, g, a, d in cur.fetchall():
rows.append(PlayerGameRow(
session_id=sid, endtime_unix=ts, squadron_name=short,
vehicle="",
ground_kills=g or 0, air_kills=a or 0,
assists=0, captures=0, deaths=d or 0,
victor_bool="WIN" if (win_sq or "") == short else "LOSS",
))
return rows
def compute_squadron_match_derived(stream: List[MatchRow], short: str) -> MatchDerived:
total = len(stream)
wins = sum(1 for m in stream if m.winning_sq == short)
losses = total - wins
wr_pct = (wins / total * 100.0) if total else None
# Longest win streak
longest = cur_streak = 0
for m in stream:
if m.winning_sq == short:
cur_streak += 1
if cur_streak > longest:
longest = cur_streak
else:
cur_streak = 0
# Top opponent (by match count), with wins-against count
opp_total: dict[str, int] = defaultdict(int)
opp_wins: dict[str, int] = defaultdict(int)
for m in stream:
other = m.losing_sq if m.winning_sq == short else m.winning_sq
if not other:
continue
opp_total[other] += 1
if m.winning_sq == short:
opp_wins[other] += 1
if opp_total:
top_name = max(opp_total, key=lambda k: opp_total[k])
top_opponent: Optional[Tuple[str, int, int]] = (top_name, opp_total[top_name], opp_wins[top_name])
else:
top_opponent = None
return MatchDerived(stream=stream, total=total, wins=wins, losses=losses,
wr_pct=wr_pct, longest_win_streak=longest, top_opponent=top_opponent)
@dataclass
class TimelineRange:
squadron_name: str # "" = no squadron
first_ts: int
last_ts: int
match_count: int
def gather_player_squadron_timeline(conn_b: sqlite3.Connection, uid: str,
start: int, end: int) -> List[TimelineRange]:
"""Walk player games in time order; group consecutive same-squadron rows
into ranges. Empty string squadron_name is a valid range ("no squadron")."""
cur = conn_b.execute(
"SELECT squadron_name, endtime_unix FROM player_games_hist "
"WHERE UID = ? AND endtime_unix BETWEEN ? AND ? "
"ORDER BY endtime_unix",
(uid, start, end),
)
rows = cur.fetchall()
if not rows:
return []
ranges: List[TimelineRange] = []
cur_name = rows[0][0] or ""
cur_first = cur_last = rows[0][1]
cur_count = 1
for name, ts in rows[1:]:
name = name or ""
if name == cur_name:
cur_last = ts
cur_count += 1
else:
ranges.append(TimelineRange(cur_name, cur_first, cur_last, cur_count))
cur_name, cur_first, cur_last, cur_count = name, ts, ts, 1
ranges.append(TimelineRange(cur_name, cur_first, cur_last, cur_count))
# Drop ranges shorter than one day (brief squadron stints aren't worth
# surfacing in the chain, legend, or shading), then merge any adjacent
# same-squadron ranges that got left side-by-side after the drop.
ranges = [r for r in ranges if (r.last_ts - r.first_ts) >= 86400]
merged: List[TimelineRange] = []
for r in ranges:
if merged and merged[-1].squadron_name == r.squadron_name:
prev = merged[-1]
merged[-1] = TimelineRange(
squadron_name=prev.squadron_name,
first_ts=prev.first_ts,
last_ts=r.last_ts,
match_count=prev.match_count + r.match_count,
)
else:
merged.append(r)
ranges = merged
if not ranges:
return []
# The final range covers "still in this squadron" — extend its last_ts
# through the season window so rating snapshots after the player's final
# match (decay, continued membership) still appear on the chart.
last = ranges[-1]
if last.last_ts < end:
ranges[-1] = TimelineRange(
squadron_name=last.squadron_name,
first_ts=last.first_ts,
last_ts=end,
match_count=last.match_count,
)
return ranges
@dataclass
class PlayerGameRow:
session_id: str
endtime_unix: int
squadron_name: str
vehicle: str
ground_kills: int
air_kills: int
assists: int
captures: int
deaths: int
victor_bool: str # "WIN" | "LOSS" (case varies in source data)
def gather_player_game_stream(conn_b: sqlite3.Connection, uid: str,
start: int, end: int) -> List[PlayerGameRow]:
cur = conn_b.execute(
"SELECT session_id, endtime_unix, squadron_name, vehicle, "
" ground_kills, air_kills, assists, captures, deaths, victor_bool "
"FROM player_games_hist "
"WHERE UID = ? AND endtime_unix BETWEEN ? AND ? "
"ORDER BY endtime_unix",
(uid, start, end),
)
return [
PlayerGameRow(
session_id=r[0], endtime_unix=r[1], squadron_name=r[2] or "",
vehicle=apply_vehicle_name_filters(r[3] or ""),
ground_kills=r[4] or 0, air_kills=r[5] or 0,
assists=r[6] or 0, captures=r[7] or 0, deaths=r[8] or 0,
victor_bool=r[9] or "",
)
for r in cur.fetchall()
]
@dataclass
class RatingSegment:
squadron_name: str # "" if the range had no squadron
points: List[Tuple[int, int]] # (unix_time, player_points); empty if unresolvable
def _decompress_clan_pts(blob) -> Optional[dict]:
"""clan_pts is a gzipped JSON of [members_dict, total_score]. Return
members_dict ({uid_str: {"points": N, ...}, ...}) or None on failure."""
try:
if isinstance(blob, (bytes, memoryview)):
data = json.loads(gzip.decompress(bytes(blob)))
else:
data = json.loads(blob)
except (OSError, ValueError, TypeError):
return None
if isinstance(data, list) and data and isinstance(data[0], dict):
return data[0]
if isinstance(data, dict):
return data
return None
def _resolve_squadron_long_name(conn_sq: sqlite3.Connection,
short_name: str) -> Optional[str]:
if not short_name:
return None
cur = conn_sq.execute(
"SELECT long_name FROM squadrons_data WHERE short_name = ? LIMIT 1",
(short_name,),
)
row = cur.fetchone()
return row[0] if row and row[0] else None
def gather_player_rating_trail(conn_sq: sqlite3.Connection,
timeline: List[TimelineRange],
uid: str) -> List[RatingSegment]:
"""For each timeline range, read the player's personal SQB points (from
each squadrons_points snapshot's gzipped member roster) during that range.
Returns per-range (unix_time, player_points) lists."""
segments: List[RatingSegment] = []
long_cache: dict[str, Optional[str]] = {}
for rng in timeline:
if not rng.squadron_name:
segments.append(RatingSegment("", []))
continue
if rng.squadron_name not in long_cache:
long_cache[rng.squadron_name] = _resolve_squadron_long_name(
conn_sq, rng.squadron_name
)
long_name = long_cache[rng.squadron_name]
if not long_name:
segments.append(RatingSegment(rng.squadron_name, []))
continue
cur = conn_sq.execute(
"SELECT unix_time, clan_pts FROM squadrons_points "
"WHERE long_name = ? AND unix_time BETWEEN ? AND ? "
"ORDER BY unix_time",
(long_name, rng.first_ts, rng.last_ts),
)
points: List[Tuple[int, int]] = []
for ts, blob in cur.fetchall():
members = _decompress_clan_pts(blob)
if not members:
continue
entry = members.get(uid) or members.get(str(uid))
if not isinstance(entry, dict):
continue
pts = entry.get("points")
if isinstance(pts, (int, float)):
points.append((ts, int(pts)))
segments.append(RatingSegment(rng.squadron_name, points))
return segments
def gather_player_most_common_opponent(conn_b: sqlite3.Connection,
game_stream: List[PlayerGameRow]
) -> Optional[Tuple[str, int]]:
"""For each session the player was in, look up winning/losing squadron in
match_summary; opponent is whichever side isn't the player's squadron in
that game. Returns (opp_short_name, matches_vs) or None."""
if not game_stream:
return None
ids = list({g.session_id for g in game_stream})
placeholders = ",".join("?" * len(ids))
cur = conn_b.execute(
f"SELECT session_id, winning_sq, losing_sq "
f"FROM match_summary WHERE session_id IN ({placeholders})",
ids,
)
by_session: dict[str, Tuple[str, str]] = {
r[0]: (r[1] or "", r[2] or "") for r in cur.fetchall()
}
opp_counts: dict[str, int] = defaultdict(int)
for g in game_stream:
pair = by_session.get(g.session_id)
if not pair:
continue
w, l = pair
own = g.squadron_name
opp = l if w == own else (w if l == own else "")
if opp:
opp_counts[opp] += 1
if not opp_counts:
return None
name = max(opp_counts, key=lambda k: opp_counts[k])
return name, opp_counts[name]
def gather_player_frequent_teammate(conn_b: sqlite3.Connection, uid: str,
start: int, end: int
) -> Optional[Tuple[str, int]]:
"""Self-join on session_id — counts UIDs (not the player) that shared the
most sessions with the player while on the same squadron in that session."""
cur = conn_b.execute(
"SELECT p.UID, COUNT(DISTINCT p.session_id) AS shared "
"FROM player_games_hist p "
"JOIN player_games_hist me "
" ON me.session_id = p.session_id "
" AND me.squadron_name = p.squadron_name "
"WHERE me.UID = ? "
" AND p.UID != ? "
" AND me.endtime_unix BETWEEN ? AND ? "
"GROUP BY p.UID "
"ORDER BY shared DESC "
"LIMIT 1",
(uid, uid, start, end),
)
row = cur.fetchone()
if not row or not row[1]:
return None
teammate_uid, shared = row[0], int(row[1])
# MIN(nick) would pick byte-sorted smallest — e.g. a 'coop/…' disconnect
# row beats the real nick. Pick the most frequent non-coop nick instead,
# tiebreaking by recency so renames surface the latest identity.
nick_cur = conn_b.execute(
"SELECT nick FROM player_games_hist "
"WHERE UID = ? AND nick NOT LIKE 'coop/%' "
"GROUP BY nick "
"ORDER BY COUNT(*) DESC, MAX(endtime_unix) DESC "
"LIMIT 1",
(teammate_uid,),
)
nick_row = nick_cur.fetchone()
return (nick_row[0] if nick_row and nick_row[0] else "unknown"), shared
@dataclass
class BestMatchRow:
session_id: str
endtime_unix: int
vehicle: str
ground_kills: int
air_kills: int
assists: int
captures: int
deaths: int
@dataclass
class PlayerDerived:
total: int
wins: int
losses: int
wr_pct: Optional[float]
ground_kills: int
air_kills: int
total_kills: int
total_deaths: int
assists: int
captures: int
kd: Optional[float]
longest_win_streak: int
top_vehicle: Optional[Tuple[str, int]]
best_match: Optional[BestMatchRow]
peak_rating: Optional[Tuple[int, str, int]] # (rating, squadron_name, unix_time)
def compute_player_derived(stream: List[PlayerGameRow],
timeline: List[TimelineRange],
trail: List[RatingSegment]) -> PlayerDerived:
del timeline # reserved for future squadron-weighted metrics; currently unused
total = len(stream)
wins = sum(1 for g in stream if g.victor_bool.upper() == "WIN")
losses = total - wins
wr = (wins / total * 100.0) if total else None
gk = sum(g.ground_kills for g in stream)
ak = sum(g.air_kills for g in stream)
dk = sum(g.deaths for g in stream)
asts = sum(g.assists for g in stream)
caps = sum(g.captures for g in stream)
kd = ((gk + ak) / dk) if dk else None
longest = cur = 0
for g in stream:
if g.victor_bool.upper() == "WIN":
cur += 1
longest = max(longest, cur)
else:
cur = 0
veh_counts: dict[str, int] = defaultdict(int)
for g in stream:
if g.vehicle:
veh_counts[g.vehicle] += 1
if veh_counts:
top_name = max(veh_counts, key=lambda k: veh_counts[k])
top_vehicle: Optional[Tuple[str, int]] = (top_name, veh_counts[top_name])
else:
top_vehicle = None
best: Optional[BestMatchRow] = None
best_score = -1
for g in stream:
score = g.ground_kills + g.air_kills + g.assists + g.captures
if score > best_score:
best_score = score
best = BestMatchRow(
session_id=g.session_id, endtime_unix=g.endtime_unix,
vehicle=g.vehicle, ground_kills=g.ground_kills,
air_kills=g.air_kills, assists=g.assists, captures=g.captures,
deaths=g.deaths,
)
elif score == best_score and best is not None and g.deaths < best.deaths:
best = BestMatchRow(
session_id=g.session_id, endtime_unix=g.endtime_unix,
vehicle=g.vehicle, ground_kills=g.ground_kills,
air_kills=g.air_kills, assists=g.assists, captures=g.captures,
deaths=g.deaths,
)
peak: Optional[Tuple[int, str, int]] = None
for seg in trail:
for ts, rating in seg.points:
if peak is None or rating > peak[0]:
peak = (rating, seg.squadron_name, ts)
return PlayerDerived(
total=total, wins=wins, losses=losses, wr_pct=wr,
ground_kills=gk, air_kills=ak, total_kills=gk + ak,
total_deaths=dk, assists=asts, captures=caps, kd=kd,
longest_win_streak=longest, top_vehicle=top_vehicle,
best_match=best, peak_rating=peak,
)
@dataclass
class PlayerAggregates:
ground_kills: int
air_kills: int
deaths: int
assists: int
captures: int
total_kills: int
kd: Optional[float]
top_vehicle: Optional[Tuple[str, int]] # (vehicle_raw_name, games)
mvp: Optional[Tuple[str, int, int, int]] # (nick, kills, assists, captures)
most_active: Optional[Tuple[str, int]] # (nick, matches_played)
def gather_squadron_player_aggregates(conn_b: sqlite3.Connection, squadron_name: str,
start: int, end: int) -> PlayerAggregates:
# Totals
cur = conn_b.execute(
"SELECT COALESCE(SUM(ground_kills), 0), COALESCE(SUM(air_kills), 0), "
" COALESCE(SUM(deaths), 0), COALESCE(SUM(assists), 0), "
" COALESCE(SUM(captures), 0) "
"FROM player_games_hist "
"WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ?",
(squadron_name, start, end),
)
g, a, d, asst, cap = cur.fetchone()
# Top vehicle
cur = conn_b.execute(
"SELECT vehicle, COUNT(*) AS c FROM player_games_hist "
"WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? "
" AND vehicle IS NOT NULL AND vehicle != '' "
"GROUP BY vehicle ORDER BY c DESC LIMIT 1",
(squadron_name, start, end),
)
row = cur.fetchone()
top_vehicle: Optional[Tuple[str, int]] = (
(apply_vehicle_name_filters(row[0]), row[1]) if row else None
)
# MVP — ranked by K + A + C (no weighting); report the breakdown
cur = conn_b.execute(
"SELECT MIN(nick) AS nick, "
" SUM(ground_kills + air_kills) AS kills, "
" SUM(assists) AS assists, "
" SUM(captures) AS captures "
"FROM player_games_hist "
"WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? "
"GROUP BY UID "
"ORDER BY SUM(ground_kills + air_kills + assists + captures) DESC "
"LIMIT 1",
(squadron_name, start, end),
)
row = cur.fetchone()
mvp: Optional[Tuple[str, int, int, int]] = (
(row[0], row[1] or 0, row[2] or 0, row[3] or 0)
if row and (row[1] or row[2] or row[3]) else None
)
# Most active — matches played per UID
cur = conn_b.execute(
"SELECT MIN(nick) AS nick, COUNT(DISTINCT session_id) AS matches "
"FROM player_games_hist "
"WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? "
"GROUP BY UID ORDER BY matches DESC LIMIT 1",
(squadron_name, start, end),
)
row = cur.fetchone()
most_active: Optional[Tuple[str, int]] = (row[0], row[1]) if row and row[1] else None
total_kills = g + a
kd = (total_kills / d) if d else None
return PlayerAggregates(ground_kills=g, air_kills=a, deaths=d, assists=asst,
captures=cap, total_kills=total_kills, kd=kd,
top_vehicle=top_vehicle, mvp=mvp, most_active=most_active)
def compute_squadron_wr_curve(stream: List[MatchRow], short: str, season_start: int,
season_end: int) -> Tuple[List[Tuple[int, float]], int]:
"""
Returns (points, window_size). Points: list of (unix_time, wr_pct) anchored
at each match's endtime, computed as rolling WR over the previous `window_size`
matches (inclusive of current). x-axis domain callers want = [season_start, season_end].
"""
total = len(stream)
if total == 0:
return [], 0
window = max(20, total // 20)
if total < window:
# Cumulative WR: point per match
running_wins = 0
points: List[Tuple[int, float]] = []
for i, m in enumerate(stream):
if m.winning_sq == short:
running_wins += 1
wr = running_wins / (i + 1) * 100.0
points.append((m.endtime_unix, wr))
return points, window
# Full rolling
wins_flags = [1 if m.winning_sq == short else 0 for m in stream]
points = []
window_sum = sum(wins_flags[:window])
# First plottable index = window - 1
points.append((stream[window - 1].endtime_unix, window_sum / window * 100.0))
for i in range(window, total):
window_sum += wins_flags[i] - wins_flags[i - window]
points.append((stream[i].endtime_unix, window_sum / window * 100.0))
return points, window
def compute_rolling_wr(stream: List[PlayerGameRow],
window_seconds: int = 7 * 86400
) -> List[Tuple[int, float]]:
"""Trailing time-window win rate, one point per match. Stale games drop
out of the window naturally during play gaps so the curve doesn't mix
stats across weeks."""
total = len(stream)
if total == 0:
return []
wins = [1 if g.victor_bool.upper() == "WIN" else 0 for g in stream]
out: List[Tuple[int, float]] = []
left = 0
wins_in_window = 0
for right in range(total):
t_r = stream[right].endtime_unix
wins_in_window += wins[right]
while stream[left].endtime_unix < t_r - window_seconds:
wins_in_window -= wins[left]
left += 1
n = right - left + 1
out.append((t_r, wins_in_window / n * 100.0))
return out
def compute_rolling_kd(stream: List[PlayerGameRow],
window_seconds: int = 7 * 86400
) -> List[Tuple[int, float]]:
"""Trailing time-window K/D, one point per match. deaths=0 is bounded
via max(deaths, 1) to keep values finite during kill-heavy early games."""
total = len(stream)
if total == 0:
return []
kills = [g.ground_kills + g.air_kills for g in stream]
deaths = [g.deaths for g in stream]
out: List[Tuple[int, float]] = []
left = 0
k_sum = d_sum = 0
for right in range(total):
t_r = stream[right].endtime_unix
k_sum += kills[right]
d_sum += deaths[right]
while stream[left].endtime_unix < t_r - window_seconds:
k_sum -= kills[left]
d_sum -= deaths[left]
left += 1
out.append((t_r, k_sum / max(d_sum, 1)))
return out
def compute_rolling_battles(stream: List[PlayerGameRow],
window_seconds: int = 7 * 86400
) -> List[Tuple[int, int]]:
"""Time-based rolling match count: at each game timestamp t, count of games
with endtime_unix in [t - window, t]."""
total = len(stream)
if total == 0:
return []
out: List[Tuple[int, int]] = []
left = 0
for right in range(total):
t_r = stream[right].endtime_unix
while stream[left].endtime_unix <= t_r - window_seconds:
left += 1
out.append((t_r, right - left + 1))
return out
# SQB timeslots (UTC) — source of truth is BOT/utils.py SQB_SLOTS_POSTED; duplicated
# here because BOT/utils.py pulls in Discord bot dependencies, too heavy for this CLI.
_SQB_TIMESLOTS_UTC: List[Tuple[str, int, int]] = [
("NA", 1 * 3600, 7 * 3600), # 01:00 07:00
("EU", 14 * 3600, 22 * 3600), # 14:00 22:00
]
def _classify_timeslot(ts: int) -> Optional[Tuple[str, _date]]:
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
sod = int(dt.replace(hour=0, minute=0, second=0, microsecond=0).timestamp())
offset = ts - sod
for region, lo, hi in _SQB_TIMESLOTS_UTC:
if lo <= offset <= hi:
return region, dt.date()
return None
def compute_longest_timeslot_session(stream: List[PlayerGameRow]
) -> Optional[Tuple[str, _date, int]]:
"""Bucket games into (date_UTC, region) slots based on endtime_unix; return
(region, date, match_count) for the bucket with the most games. Matches
outside any posted SQB timeslot are excluded."""
buckets: dict[Tuple[str, _date], int] = defaultdict(int)
for g in stream:
slot = _classify_timeslot(g.endtime_unix)
if slot is None:
continue
buckets[slot] += 1
if not buckets:
return None
(region, d), n = max(buckets.items(), key=lambda kv: kv[1])
return region, d, n
def compute_most_active_day_utc(stream: List[PlayerGameRow]
) -> Optional[Tuple[_date, int]]:
"""Return the UTC date with the most matches (ignores timeslot boundaries)."""
by_day: dict[_date, int] = defaultdict(int)
for g in stream:
d = datetime.fromtimestamp(g.endtime_unix, tz=timezone.utc).date()
by_day[d] += 1
if not by_day:
return None
d, n = max(by_day.items(), key=lambda kv: kv[1])
return d, n
def _fmt_int(n: Optional[int]) -> str:
return f"{n:,}" if n is not None else "—"
def _fmt_float(n: Optional[float], digits: int = 2) -> str:
return f"{n:.{digits}f}" if n is not None else "—"
def _fmt_date(ts: Optional[int]) -> str:
if ts is None:
return "—"
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d")
def render_squadron_card(out_path: Path, ident: SquadronIdent, season: str,
rating: RatingDerived, match: MatchDerived,
players: PlayerAggregates,
rolling_wr: List[Tuple[int, float]],
rolling_kd: List[Tuple[int, float]],
rolling_battles: List[Tuple[int, int]],
season_start: int, season_end: int,
week_boundaries: List[int],
place: Optional[Tuple[int, int]] = None,
theme: str = "light",
lang: str = DEFAULT_LANG) -> None:
pal = theme_palette(theme)
tr = load_translations(lang)
def t(key: str) -> str:
return tr.get(key, key)
fig = plt.figure(figsize=(16, 10), dpi=120, facecolor=pal["bg"])
gs = GridSpec(3, 1, height_ratios=[0.55, 2.4, 1.35], hspace=0.35,
left=0.13, right=0.87, top=0.93, bottom=0.06, figure=fig)
# ── Header / hero row ────────────────────────────────────────────
ax_head = fig.add_subplot(gs[0])
ax_head.set_axis_off()
ax_head.text(0.0, 0.75, f"{ident.short_name} · {ident.long_name}",
fontsize=22, fontweight="bold", color=pal["text"],
transform=ax_head.transAxes)
ax_head.text(1.0, 0.75, f"{season} {t('imgRecapSuffix')}",
fontsize=18, fontweight="bold", color=pal["header_sub"],
ha="right", transform=ax_head.transAxes)
hero = [
(t("imgHeroFinalRating"), _fmt_int(rating.final)),
(t("imgHeroMatches"), _fmt_int(match.total)),
(t("imgHeroWinRate"), (f"{match.wr_pct:.1f}%" if match.wr_pct is not None else "—")),
(t("imgHeroKD"), _fmt_float(players.kd)),
]
for i, (label, value) in enumerate(hero):
x = 0.04 + i * 0.24
ax_head.text(x, 0.15, value, fontsize=26, fontweight="bold",
color=pal["text"], va="center",
transform=ax_head.transAxes)
ax_head.text(x, -0.10, label, fontsize=11, color=pal["muted"],
va="center", transform=ax_head.transAxes)
for y in (0.55, -0.32):
ax_head.plot([0.01, 0.99], [y, y], color=pal["divider"],
alpha=0.85, linewidth=1.0,
transform=ax_head.transAxes, clip_on=False)
# ── Main graph (rating + WR dual axis) ───────────────────────────
ax = fig.add_subplot(gs[1])
ax.set_facecolor(pal["bg"])
ax.set_xlim(season_start, season_end)
for b in week_boundaries:
if season_start <= b <= season_end:
ax.axvline(b, color=pal["grid"], alpha=0.3, linewidth=1)
if rating.series:
smoothed = smooth_series(rating.series, window=12)
xs = [ts for ts, _ in smoothed]
ys = [v for _, v in smoothed]
ax.plot(xs, ys, color=RATING_COLOR, linewidth=2.2, zorder=3)
ax.set_ylabel(t("imgAxisRating"), color=RATING_COLOR, fontsize=12)
ax.tick_params(axis="y", colors=RATING_COLOR)
ax.spines["left"].set_color(RATING_COLOR)
ax.spines["top"].set_color(pal["grid"])
ax.spines["bottom"].set_color(pal["grid"])
ax.tick_params(axis="x", colors=pal["muted"])
def _prep(series: list) -> list:
return [_smooth_rolling_curve(_downsample_daily(part), window=4)
for part in _split_on_gaps(series, 7 * 86400)]
ax_wr = ax.twinx()
for s in _prep(rolling_wr):
ax_wr.plot([p[0] for p in s], [p[1] for p in s],
color=WR_COLOR, linewidth=1.3, alpha=0.8, zorder=2)
ax_wr.set_ylim(0, 100)
ax_wr.set_ylabel(t("imgAxisWinRate"), color=WR_COLOR, fontsize=12)
ax_wr.tick_params(axis="y", colors=WR_COLOR)
ax_wr.spines["right"].set_color(WR_COLOR)
ax_wr.spines["left"].set_visible(False)
ax_kd = ax.twinx()
ax_kd.spines["right"].set_position(("axes", 1.08))
for s in _prep(rolling_kd):
ax_kd.plot([p[0] for p in s], [p[1] for p in s],
color=KD_COLOR, linewidth=1.1, alpha=0.7, zorder=2)
ax_kd.set_ylabel(t("imgAxisKD"), color=KD_COLOR, fontsize=12, labelpad=10)
ax_kd.tick_params(axis="y", colors=KD_COLOR)
ax_kd.spines["right"].set_color(KD_COLOR)
ax_kd.spines["left"].set_visible(False)
ax_b = ax.twinx()
ax_b.spines["left"].set_position(("axes", -0.08))
ax_b.spines["left"].set_visible(True)
ax_b.spines["right"].set_visible(False)
ax_b.yaxis.set_label_position("left")
ax_b.yaxis.tick_left()
for s in _prep(rolling_battles):
ax_b.plot([p[0] for p in s], [p[1] for p in s],
color=BATTLES_COLOR, linewidth=1.0, alpha=0.6, zorder=2)
ax_b.set_ylabel(t("imgAxisBattles"), color=BATTLES_COLOR, fontsize=12, labelpad=10)
ax_b.tick_params(axis="y", colors=BATTLES_COLOR)
ax_b.spines["left"].set_color(BATTLES_COLOR)
boundary_inside = [b for b in week_boundaries if season_start <= b <= season_end]
if boundary_inside:
ax.set_xticks(boundary_inside)
ax.set_xticklabels(
[datetime.fromtimestamp(b, tz=timezone.utc).strftime("%m-%d")
for b in boundary_inside],
rotation=0, fontsize=9
)
ax.grid(axis="y", linestyle=":", color=pal["grid"], alpha=0.4)
# BR schedule ticks above the chart — centered over each BR segment,
# with a faint dashed divider at each segment boundary so tiers read
# as distinct bands.
for entry in _load_br_schedule():
s = int(entry["start"])
e = int(entry["end"])
if e < season_start or s > season_end:
continue
s_c = max(s, season_start)
e_c = min(e, season_end)
mid = (s_c + e_c) / 2
if season_start < s < season_end:
ax.axvline(s, color=pal["muted"], alpha=0.45, linewidth=0.7,
linestyle=(0, (3, 3)), zorder=1)
ax.text(mid, 1.02, f"{float(entry['max_br']):.1f}",
color=pal["header_sub"], fontsize=10, fontweight="bold",
ha="center", va="bottom",
transform=ax.get_xaxis_transform(), clip_on=False)
# ── Supporting stats grid ────────────────────────────────────────
ax_stats = fig.add_subplot(gs[2])
ax_stats.set_axis_off()
def _fmt_opponent(op: Optional[Tuple[str, int, int]]) -> str:
if not op:
return "—"
name, total, wins = op
losses = total - wins
return (f"{name} ({total} {t('imgUnitMatches')}, "
f"{wins} {t('imgUnitWins')} / {losses} {t('imgUnitLosses')})")
def _fmt_mvp(m: Optional[Tuple[str, int, int, int]]) -> str:
if not m:
return "—"
name, kills, assists, captures = m
return (f"{name}{_fmt_int(kills)} {t('imgUnitKills')}, "
f"{_fmt_int(assists)} {t('imgUnitAssists')}, "
f"{_fmt_int(captures)} {t('imgUnitCaptures')}")
# Short rows render in 2 columns; wide rows span full width.
short_rows: List[Tuple[str, str]] = [
(t("imgStatPeakRating"), f"{_fmt_int(rating.peak)} ({_fmt_date(rating.peak_ts)})"),
(t("imgStatPlaceFinished"), f"#{_fmt_int(place[0])} / {_fmt_int(place[1])}" if place else "—"),
(t("imgStatTotalKills"), f"{_fmt_int(players.total_kills)} ({_fmt_int(players.ground_kills)} {t('imgGroundShort')} / {_fmt_int(players.air_kills)} {t('imgAirShort')})"),
(t("imgStatTotalDeaths"), _fmt_int(players.deaths)),
(t("imgStatAssistsCaptures"), f"{_fmt_int(players.assists)} / {_fmt_int(players.captures)}"),
(t("imgStatMostPlayedVehicle"), f"{players.top_vehicle[0]} ({players.top_vehicle[1]} {t('imgUnitGames')})" if players.top_vehicle else "—"),
(t("imgStatMostActive"), f"{players.most_active[0]} ({_fmt_int(players.most_active[1])} {t('imgUnitMatches')})" if players.most_active else "—"),
(t("imgStatLongestWinStreak"), _fmt_int(match.longest_win_streak)),
]
wide_rows: List[Tuple[str, str]] = [
(t("imgStatMVP"), _fmt_mvp(players.mvp)),
(t("imgStatMostCommonOpponent"), _fmt_opponent(match.top_opponent)),
]
cols = 2
short_per_col = (len(short_rows) + cols - 1) // cols # 4
total_bands = short_per_col + len(wide_rows) # 6
grid_top = 0.98
grid_bottom = 0.05
band_h = (grid_top - grid_bottom) / total_bands
# Short rows — 2-column layout in the top `short_per_col` bands
for i, (label, value) in enumerate(short_rows):
col, row_in_col = divmod(i, short_per_col)
x = 0.02 + col * 0.52
y = grid_top - band_h * (row_in_col + 0.5)
ax_stats.text(x, y, f"{label}:", fontsize=12, color=pal["muted"],
va="center", transform=ax_stats.transAxes)
ax_stats.text(x + 0.20, y, value, fontsize=12, color=pal["text"],
va="center", transform=ax_stats.transAxes)
# Wide rows — full-width, below the short rows
for i, (label, value) in enumerate(wide_rows):
y = grid_top - band_h * (short_per_col + i + 0.5)
ax_stats.text(0.02, y, f"{label}:", fontsize=12, color=pal["muted"],
va="center", transform=ax_stats.transAxes)
ax_stats.text(0.22, y, value, fontsize=12, color=pal["text"],
va="center", transform=ax_stats.transAxes)
# Horizontal dividers at band boundaries
for k in range(1, total_bands):
y = grid_top - band_h * k
if k < short_per_col:
# still in the 2-column zone — split divider around the vertical line
ax_stats.plot([0.01, 0.49], [y, y], color=pal["divider"],
alpha=0.85, linewidth=1.0,
transform=ax_stats.transAxes, clip_on=False)
ax_stats.plot([0.51, 0.99], [y, y], color=pal["divider"],
alpha=0.85, linewidth=1.0,
transform=ax_stats.transAxes, clip_on=False)
else:
# wide zone — single divider spans full width
ax_stats.plot([0.01, 0.99], [y, y], color=pal["divider"],
alpha=0.85, linewidth=1.0,
transform=ax_stats.transAxes, clip_on=False)
# Vertical divider between columns — only across the short-row zone
ax_stats.plot([0.50, 0.50],
[grid_top - band_h * short_per_col, grid_top],
color=pal["divider"], alpha=0.85, linewidth=1.0,
transform=ax_stats.transAxes, clip_on=False)
fig.text(0.5, 0.015,
f"SREBOT · {t('imgFooterGenerated')} {datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
ha="center", fontsize=8, color=pal["footer"])
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
tmp.parent.mkdir(parents=True, exist_ok=True)
try:
fig.savefig(tmp, dpi=120, facecolor=pal["bg"], bbox_inches=None, format="png")
os.replace(tmp, out_path)
finally:
plt.close(fig)
def render_squadron_placeholder(out_path: Path, short: str, season: str,
reason: Optional[str] = None,
theme: str = "light",
lang: str = DEFAULT_LANG) -> None:
pal = theme_palette(theme)
tr = load_translations(lang)
if reason is None:
reason = tr.get("imgPlaceholderNoData", "No data for {short} in {season}").format(
short=short, season=season
)
fig = plt.figure(figsize=(16, 9), dpi=120, facecolor=pal["bg"])
ax = fig.add_subplot(1, 1, 1)
ax.set_axis_off()
ax.text(0.5, 0.62, f"{short} · {season}",
fontsize=28, fontweight="bold", color=pal["text"], ha="center",
transform=ax.transAxes)
ax.text(0.5, 0.48, reason,
fontsize=16, color=pal["muted"], ha="center",
transform=ax.transAxes)
ax.text(0.5, 0.05, "SREBOT",
fontsize=10, color=pal["footer"], ha="center",
transform=ax.transAxes)
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
tmp.parent.mkdir(parents=True, exist_ok=True)
try:
fig.savefig(tmp, dpi=120, facecolor=pal["bg"], format="png")
os.replace(tmp, out_path)
finally:
plt.close(fig)
def _assign_squadron_colors(timeline: List[TimelineRange]) -> dict[str, str]:
"""Deterministic per-recap palette assignment in timeline order."""
colors: dict[str, str] = {}
slot = 0
for rng in timeline:
name = rng.squadron_name
if name == "":
colors.setdefault("", _NO_SQUADRON_COLOR)
continue
if name not in colors:
colors[name] = _SQUADRON_PALETTE[slot % len(_SQUADRON_PALETTE)]
slot += 1
colors.setdefault("", _NO_SQUADRON_COLOR)
return colors
def render_player_card(out_path: Path, nick: str, uid: str, season: str,
timeline: List[TimelineRange],
trail: List[RatingSegment],
derived: PlayerDerived,
rolling_wr: List[Tuple[int, float]],
rolling_kd: List[Tuple[int, float]],
rolling_battles: List[Tuple[int, int]],
most_common_opp: Optional[Tuple[str, int]],
frequent_teammate: Optional[Tuple[str, int]],
longest_session: Optional[Tuple[str, _date, int]],
most_active_day: Optional[Tuple[_date, int]],
season_start: int, season_end: int,
week_boundaries: List[int],
theme: str = "light",
lang: str = DEFAULT_LANG) -> None:
pal = theme_palette(theme)
tr = load_translations(lang)
def t(key: str) -> str:
return tr.get(key, key)
colors = _assign_squadron_colors(timeline)
unique_sq_keys = {rng.squadron_name for rng in timeline}
show_squadron_viz = len(unique_sq_keys) > 1
nick = _sanitize_render_text(nick)
fig = plt.figure(figsize=(16, 10), dpi=120, facecolor=pal["bg"])
gs = GridSpec(3, 1, height_ratios=[0.55, 2.4, 1.6], hspace=0.35,
left=0.13, right=0.87, top=0.93, bottom=0.06, figure=fig)
# ── Header / hero ───────────────────────────────────────────────
ax_head = fig.add_subplot(gs[0])
ax_head.set_axis_off()
ax_head.text(0.0, 0.75, f"{nick} · {t('imgUIDLabel')} {uid}",
fontsize=22, fontweight="bold", color=pal["text"],
transform=ax_head.transAxes)
ax_head.text(1.0, 0.75, f"{season} {t('imgRecapSuffix')}",
fontsize=18, fontweight="bold", color=pal["header_sub"],
ha="right", transform=ax_head.transAxes)
hero = [
(t("imgHeroBattles"), _fmt_int(derived.total)),
(t("imgHeroWinRate"), (f"{derived.wr_pct:.1f}%" if derived.wr_pct is not None else "—")),
(t("imgHeroKD"), _fmt_float(derived.kd)),
(t("imgHeroTotalKills"), _fmt_int(derived.total_kills)),
]
for i, (label, value) in enumerate(hero):
x = 0.04 + i * 0.24
ax_head.text(x, 0.15, value, fontsize=26, fontweight="bold",
color=pal["text"], va="center",
transform=ax_head.transAxes)
ax_head.text(x, -0.10, label, fontsize=11, color=pal["muted"],
va="center", transform=ax_head.transAxes)
for y in (0.55, -0.32):
ax_head.plot([0.01, 0.99], [y, y], color=pal["divider"],
alpha=0.85, linewidth=1.0,
transform=ax_head.transAxes, clip_on=False)
# ── Main chart with 4 axes ──────────────────────────────────────
ax = fig.add_subplot(gs[1])
ax.set_facecolor(pal["bg"])
ax.set_xlim(season_start, season_end)
if show_squadron_viz:
for rng in timeline:
c = colors.get(rng.squadron_name, _NO_SQUADRON_COLOR)
alpha = 0.12 if rng.squadron_name else 0.05
ax.axvspan(rng.first_ts, rng.last_ts, color=c, alpha=alpha, zorder=0)
for b in week_boundaries:
if season_start <= b <= season_end:
ax.axvline(b, color=pal["grid"], alpha=0.3, linewidth=1, zorder=1)
for seg in trail:
if not seg.points:
continue
smoothed = smooth_series(seg.points, window=12)
xs = [p[0] for p in smoothed]
ys = [p[1] for p in smoothed]
# Rating line is always teal — squadron shading + the Squadrons
# Represented chain carry the per-squadron color key.
ax.plot(xs, ys, color=RATING_COLOR, linewidth=2.2, zorder=3)
ax.set_ylabel(t("imgAxisRating"), color=RATING_COLOR, fontsize=12)
ax.tick_params(axis="y", colors=RATING_COLOR)
ax.spines["left"].set_color(RATING_COLOR)
def _prep(series: list) -> list:
# Clip to squadron timeline ranges so rolling curves match the
# shading, then daily-downsample + smooth inside each segment.
return [_smooth_rolling_curve(_downsample_daily(part), window=4)
for part in _clip_to_ranges(series, timeline)]
ax_wr = ax.twinx()
for s in _prep(rolling_wr):
ax_wr.plot([p[0] for p in s], [p[1] for p in s],
color=WR_COLOR, linewidth=1.3, alpha=0.8, zorder=2)
ax_wr.set_ylim(0, 100)
ax_wr.set_ylabel(t("imgAxisWinRate"), color=WR_COLOR, fontsize=12)
ax_wr.tick_params(axis="y", colors=WR_COLOR)
ax_wr.spines["right"].set_color(WR_COLOR)
ax_wr.spines["left"].set_visible(False) # don't paint over ax's teal left spine
ax_kd = ax.twinx()
ax_kd.spines["right"].set_position(("axes", 1.08))
for s in _prep(rolling_kd):
ax_kd.plot([p[0] for p in s], [p[1] for p in s],
color=KD_COLOR, linewidth=1.1, alpha=0.7, zorder=2)
ax_kd.set_ylabel(t("imgAxisKD"), color=KD_COLOR, fontsize=12, labelpad=10)
ax_kd.tick_params(axis="y", colors=KD_COLOR)
ax_kd.spines["right"].set_color(KD_COLOR)
ax_kd.spines["left"].set_visible(False)
ax_b = ax.twinx()
ax_b.spines["left"].set_position(("axes", -0.08))
ax_b.spines["left"].set_visible(True)
ax_b.spines["right"].set_visible(False)
ax_b.yaxis.set_label_position("left")
ax_b.yaxis.tick_left()
for s in _prep(rolling_battles):
ax_b.plot([p[0] for p in s], [p[1] for p in s],
color=BATTLES_COLOR, linewidth=1.0, alpha=0.6, zorder=2)
ax_b.set_ylabel(t("imgAxisBattles"), color=BATTLES_COLOR, fontsize=12, labelpad=10)
ax_b.tick_params(axis="y", colors=BATTLES_COLOR)
ax_b.spines["left"].set_color(BATTLES_COLOR)
boundary_inside = [b for b in week_boundaries if season_start <= b <= season_end]
if boundary_inside:
ax.set_xticks(boundary_inside)
ax.set_xticklabels(
[datetime.fromtimestamp(b, tz=timezone.utc).strftime("%m-%d")
for b in boundary_inside],
rotation=0, fontsize=9
)
ax.tick_params(axis="x", colors=pal["muted"])
ax.grid(axis="y", linestyle=":", color=pal["grid"], alpha=0.4)
# BR schedule ticks above the chart — centered over each BR segment,
# with a faint dashed divider at each segment boundary so tiers read
# as distinct bands.
for entry in _load_br_schedule():
s = int(entry["start"])
e = int(entry["end"])
if e < season_start or s > season_end:
continue
s_c = max(s, season_start)
e_c = min(e, season_end)
mid = (s_c + e_c) / 2
if season_start < s < season_end:
ax.axvline(s, color=pal["muted"], alpha=0.45, linewidth=0.7,
linestyle=(0, (3, 3)), zorder=1)
ax.text(mid, 1.02, f"{float(entry['max_br']):.1f}",
color=pal["header_sub"], fontsize=10, fontweight="bold",
ha="center", va="bottom",
transform=ax.get_xaxis_transform(), clip_on=False)
# ── Stats grid (bottom) ─────────────────────────────────────────
ax_stats = fig.add_subplot(gs[2])
ax_stats.set_axis_off()
kdac_value = (
f"{_fmt_int(derived.total_kills)} "
f"({_fmt_int(derived.ground_kills)} {t('imgGroundShort')} / "
f"{_fmt_int(derived.air_kills)} {t('imgAirShort')}) / "
f"{_fmt_int(derived.total_deaths)} / "
f"{_fmt_int(derived.assists)} / "
f"{_fmt_int(derived.captures)}"
)
short_rows: List[Tuple[str, str]] = [
(t("imgStatKDAC"), kdac_value),
(t("imgStatMostPlayedVehicle"),
f"{derived.top_vehicle[0]} ({derived.top_vehicle[1]} {t('imgUnitGames')})"
if derived.top_vehicle else "—"),
(t("imgStatLongestWinStreak"), _fmt_int(derived.longest_win_streak)),
(t("imgStatLongestSession"),
f"{longest_session[0]} · {longest_session[1].isoformat()} — "
f"{longest_session[2]} {t('imgUnitMatches')}"
if longest_session else "—"),
(t("imgStatMostActiveDay"),
f"{most_active_day[0].isoformat()}{most_active_day[1]} {t('imgUnitMatches')}"
if most_active_day else "—"),
(t("imgStatMostCommonOpponent"),
f"{most_common_opp[0]} ({most_common_opp[1]} {t('imgUnitMatches')})"
if most_common_opp else "—"),
]
peak_str = (f"{_fmt_int(derived.peak_rating[0])} · {derived.peak_rating[1]} · "
f"{_fmt_date(derived.peak_rating[2])}") if derived.peak_rating else "—"
if derived.best_match:
bm = derived.best_match
best_match_str = t("imgBestMatchLine").format(
vehicle=bm.vehicle,
gk=bm.ground_kills,
ak=bm.air_kills,
assists=bm.assists,
cap=bm.captures,
deaths=bm.deaths,
date=_fmt_date(bm.endtime_unix),
)
else:
best_match_str = "—"
chain_parts: List[str] = []
for rng in timeline:
dot = "●" if rng.squadron_name else "○"
name = rng.squadron_name if rng.squadron_name else t("imgUnitNoSquadron")
chain_parts.append(f"{dot} {name} ({rng.match_count})")
chain_str = " → ".join(chain_parts) if chain_parts else "—"
teammate_str = (f"{frequent_teammate[0]}{frequent_teammate[1]} "
f"{t('imgUnitTogether')}") if frequent_teammate else "—"
wide_rows: List[Tuple[str, str]] = [
(t("imgStatPeakSquadronRating"), peak_str),
(t("imgStatBestMatch"), best_match_str),
(t("imgStatSquadronsRepresented"), chain_str),
(t("imgStatFrequentTeammate"), teammate_str),
]
cols = 2
short_per_col = (len(short_rows) + cols - 1) // cols
total_bands = short_per_col + len(wide_rows)
grid_top = 0.98
band_h = (grid_top - 0.05) / total_bands
for i, (label, value) in enumerate(short_rows):
col, row_in_col = divmod(i, short_per_col)
x = 0.02 + col * 0.52
y = grid_top - band_h * (row_in_col + 0.5)
ax_stats.text(x, y, f"{label}:", fontsize=12, color=pal["muted"],
va="center", transform=ax_stats.transAxes)
ax_stats.text(x + 0.20, y, value, fontsize=12, color=pal["text"],
va="center", transform=ax_stats.transAxes)
squadrons_label = t("imgStatSquadronsRepresented")
for i, (label, value) in enumerate(wide_rows):
y = grid_top - band_h * (short_per_col + i + 0.5)
ax_stats.text(0.02, y, f"{label}:", fontsize=12, color=pal["muted"],
va="center", transform=ax_stats.transAxes)
if label == squadrons_label:
children: List = []
for j, rng in enumerate(timeline):
if j > 0:
children.append(TextArea(
" → ",
textprops=dict(color=pal["muted"], fontsize=12),
))
seg_color = colors.get(rng.squadron_name, _NO_SQUADRON_COLOR)
name = rng.squadron_name if rng.squadron_name else t("imgUnitNoSquadron")
dot_char = "●" if rng.squadron_name else "○"
children.append(TextArea(
f"{dot_char} {name} ({rng.match_count})",
textprops=dict(color=seg_color, fontsize=12),
))
if not children:
children.append(TextArea(
"—", textprops=dict(color=pal["text"], fontsize=12)))
packed = HPacker(children=children, align="center", pad=0, sep=0)
ax_stats.add_artist(AnchoredOffsetbox(
loc="center left", child=packed, pad=0, frameon=False,
bbox_to_anchor=(0.22, y), bbox_transform=ax_stats.transAxes,
))
else:
ax_stats.text(0.22, y, value, fontsize=12, color=pal["text"],
va="center", transform=ax_stats.transAxes)
for k in range(1, total_bands):
y = grid_top - band_h * k
if k < short_per_col:
ax_stats.plot([0.01, 0.49], [y, y], color=pal["divider"],
alpha=0.85, linewidth=1.0,
transform=ax_stats.transAxes, clip_on=False)
ax_stats.plot([0.51, 0.99], [y, y], color=pal["divider"],
alpha=0.85, linewidth=1.0,
transform=ax_stats.transAxes, clip_on=False)
else:
ax_stats.plot([0.01, 0.99], [y, y], color=pal["divider"],
alpha=0.85, linewidth=1.0,
transform=ax_stats.transAxes, clip_on=False)
ax_stats.plot([0.50, 0.50],
[grid_top - band_h * short_per_col, grid_top],
color=pal["divider"], alpha=0.85, linewidth=1.0,
transform=ax_stats.transAxes, clip_on=False)
fig.text(0.5, 0.015,
f"SREBOT · {t('imgFooterGenerated')} {datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
ha="center", fontsize=8, color=pal["footer"])
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
tmp.parent.mkdir(parents=True, exist_ok=True)
try:
fig.savefig(tmp, dpi=120, facecolor=pal["bg"], bbox_inches=None, format="png")
os.replace(tmp, out_path)
finally:
plt.close(fig)
def render_player_placeholder(out_path: Path, nick: str, season: str,
reason: Optional[str] = None,
theme: str = "light",
lang: str = DEFAULT_LANG) -> None:
pal = theme_palette(theme)
tr = load_translations(lang)
if reason is None:
reason = tr.get("imgPlaceholderNoDataPlayer",
"No data for {nick} in {season}").format(nick=nick, season=season)
fig = plt.figure(figsize=(16, 9), dpi=120, facecolor=pal["bg"])
ax = fig.add_subplot(1, 1, 1)
ax.set_axis_off()
ax.text(0.5, 0.62, f"{nick} · {season}",
fontsize=28, fontweight="bold", color=pal["text"], ha="center",
transform=ax.transAxes)
ax.text(0.5, 0.48, reason,
fontsize=16, color=pal["muted"], ha="center",
transform=ax.transAxes)
ax.text(0.5, 0.05, "SREBOT",
fontsize=10, color=pal["footer"], ha="center",
transform=ax.transAxes)
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
tmp.parent.mkdir(parents=True, exist_ok=True)
try:
fig.savefig(tmp, dpi=120, facecolor=pal["bg"], format="png")
os.replace(tmp, out_path)
finally:
plt.close(fig)
def run_squadron(args: Args) -> int:
assert args.clan_id is not None
with _open_ro(SQUADRONS_DB) as conn_sq:
ident = resolve_squadron(conn_sq, args.clan_id)
if ident is None:
logging.error(f"unknown clan_id={args.clan_id}")
return 2
logging.info(f"resolved {ident.short_name} / {ident.long_name}")
rating = gather_squadron_rating(conn_sq, args.clan_id, args.season_start, args.season_end)
logging.info(f"rating: series={len(rating.series)} final={rating.final} peak={rating.peak} change={rating.change}")
place = gather_squadron_place(conn_sq, args.clan_id, args.season_start, args.season_end)
logging.info(f"place: {place}")
with _open_ro(SQ_BATTLES_DB) as conn_b:
stream = gather_squadron_match_stream(conn_b, ident.short_name, args.season_start, args.season_end)
players = gather_squadron_player_aggregates(conn_b, ident.short_name, args.season_start, args.season_end)
squad_stream = gather_squadron_per_match_stream(
conn_b, ident.short_name, args.season_start, args.season_end
)
match = compute_squadron_match_derived(stream, ident.short_name)
logging.info(f"matches: total={match.total} wins={match.wins} wr={match.wr_pct} streak={match.longest_win_streak} top_opp={match.top_opponent}")
rolling_wr = compute_rolling_wr(squad_stream)
rolling_kd = compute_rolling_kd(squad_stream)
rolling_battles = compute_rolling_battles(squad_stream)
logging.info(f"rolling: wr_pts={len(rolling_wr)} kd_pts={len(rolling_kd)} battles_pts={len(rolling_battles)}")
if rating.final is None and match.total == 0:
logging.info("no data for squadron in season; writing placeholder")
render_squadron_placeholder(args.out, ident.short_name, args.season, theme=args.theme, lang=args.lang)
return 0
logging.info(f"players: kills={players.total_kills} deaths={players.deaths} kd={players.kd} top_veh={players.top_vehicle} active={players.most_active}")
if players.mvp:
_n, _k, _a, _c = players.mvp
logging.info(f"mvp: {_n} K={_k} A={_a} C={_c}")
render_squadron_card(
args.out, ident, args.season,
rating, match, players,
rolling_wr, rolling_kd, rolling_battles,
args.season_start, args.season_end,
args.week_boundaries,
place=place,
theme=args.theme,
lang=args.lang,
)
logging.info(f"wrote {args.out}")
return 0
def _resolve_latest_player_nick(conn_b: sqlite3.Connection, uid: str) -> str:
cur = conn_b.execute(
"SELECT nick FROM player_games_hist "
"WHERE UID = ? AND nick NOT LIKE 'coop/%' "
"ORDER BY session_id DESC LIMIT 1",
(uid,),
)
row = cur.fetchone()
return row[0] if row else uid
def run_player(args: Args) -> int:
assert args.uid is not None
with _open_ro(SQ_BATTLES_DB) as conn_b, _open_ro(SQUADRONS_DB) as conn_sq:
nick = _resolve_latest_player_nick(conn_b, args.uid)
timeline = gather_player_squadron_timeline(
conn_b, args.uid, args.season_start, args.season_end
)
stream = gather_player_game_stream(
conn_b, args.uid, args.season_start, args.season_end
)
trail = gather_player_rating_trail(conn_sq, timeline, args.uid)
opp = gather_player_most_common_opponent(conn_b, stream)
teammate = gather_player_frequent_teammate(
conn_b, args.uid, args.season_start, args.season_end
)
# Membership check: the final timeline range was extended to season_end
# so we could scan past the player's last game. Clamp it back to the last
# snapshot where the player actually appeared in the squadron's roster
# (the last rating-trail point), so shading doesn't extend past when they
# left the squadron.
if timeline:
final_idx = len(timeline) - 1
final_seg = trail[final_idx] if final_idx < len(trail) else None
observed_ts = final_seg.points[-1][0] if final_seg and final_seg.points else None
last_rng = timeline[final_idx]
clamp_to = observed_ts if observed_ts is not None else min(
last_rng.last_ts, args.season_end
)
if clamp_to < last_rng.last_ts:
timeline[final_idx] = TimelineRange(
squadron_name=last_rng.squadron_name,
first_ts=last_rng.first_ts,
last_ts=clamp_to,
match_count=last_rng.match_count,
)
if not stream and all(not seg.points for seg in trail):
logging.info("no data for player in season; writing placeholder")
render_player_placeholder(args.out, nick, args.season,
theme=args.theme, lang=args.lang)
return 0
derived = compute_player_derived(stream, timeline, trail)
rolling_wr = compute_rolling_wr(stream)
rolling_kd = compute_rolling_kd(stream)
rolling_battles = compute_rolling_battles(stream)
longest = compute_longest_timeslot_session(stream)
active_day = compute_most_active_day_utc(stream)
logging.info(
f"player nick={nick} games={len(stream)} timeline_ranges={len(timeline)} "
f"rating_segments={sum(1 for s in trail if s.points)} "
f"wr={derived.wr_pct} kd={derived.kd} streak={derived.longest_win_streak}"
)
render_player_card(
args.out, nick, args.uid, args.season,
timeline, trail, derived,
rolling_wr, rolling_kd, rolling_battles,
opp, teammate, longest, active_day,
args.season_start, args.season_end,
args.week_boundaries,
theme=args.theme, lang=args.lang,
)
logging.info(f"wrote {args.out}")
return 0
def main(argv: Optional[List[str]] = None) -> int:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s", stream=sys.stderr)
args = parse_args(argv)
logging.info(f"render start mode={args.mode} season={args.season}")
if args.mode == "squadron":
return run_squadron(args)
return run_player(args)
if __name__ == "__main__":
sys.exit(main())