3fb15d6282
- Make .env the single source of truth for runtime config. Remove all
`env:` blocks from SREBOT/ecosystem.config.js and TSSBOT/ecosystem.config.js
so values can't silently shadow .env. Both ecosystem files load .env via
`require('dotenv').config()` and PM2 inherits the resolved environment.
- Rename SREBOT_STORAGE_VOL_PATH → STORAGE_VOL_PATH across all readers
(BOT/utils.py, BOT/receiver_bridge.py, BOT/render_recap.py, server.js,
web/server.js, dateindex.js, scripts/*, srebot.service, tests/, README,
and both .env files). STORAGE is shared between SREBOT and TSSBOT, so the
variable shouldn't carry one bot's prefix.
- Rename per-process PORT env vars to disambiguated names so .env can be
the source of truth without collisions:
PORT (api) → SREBOT_API_PORT (server.js)
PORT (web) → SREBOT_WEB_PORT (web/server.js)
WEBHOOK_PORT → SREBOT_WEBHOOK_PORT (github_webhook_updater.py)
SREBOT_EXTERNAL_HOST/PORT/UPSTREAM_URL were already uniquely named;
they just move from ecosystem env to .env.
- TSSBOT/.env: drop GITHUB_WEBHOOK_SECRET (only srebot-webhook consumes it)
and the stale SREBOT_DEPLOY_PATH. SREBOT/.env: also drop the obsolete
SREBOT_DEPLOY_PATH (ecosystem now hardcodes __dirname).
- ecosystem.config.js no longer references SREBOT_DEPLOY_PATH; deploy path
is always __dirname of the ecosystem file.
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1808 lines
70 KiB
Python
1808 lines
70 KiB
Python
"""
|
||
render_recap.py
|
||
|
||
CLI tool that renders a season recap PNG for either a squadron or a single
|
||
player. Invoked by web/server.js on cache miss.
|
||
|
||
Usage (squadron mode — default):
|
||
python render_recap.py \\
|
||
--mode squadron \\
|
||
--clan-id 123456 \\
|
||
--season 2026-II \\
|
||
--season-start 1772348400 \\
|
||
--season-end 1777852799 \\
|
||
--week-boundaries 1772348400,1773039600,... \\
|
||
--out /path/to/output.png
|
||
|
||
Usage (player mode):
|
||
python render_recap.py \\
|
||
--mode player \\
|
||
--uid 987654321 \\
|
||
--season 2026-II \\
|
||
--season-start 1772348400 \\
|
||
--season-end 1777852799 \\
|
||
--week-boundaries 1772348400,1773039600,... \\
|
||
--out /path/to/output.png
|
||
|
||
Exits 0 on success, non-zero with stderr diagnostic on failure.
|
||
"""
|
||
|
||
import argparse
|
||
import gzip
|
||
import json
|
||
import re
|
||
import logging
|
||
import os
|
||
import sqlite3
|
||
import sys
|
||
from collections import defaultdict
|
||
from dataclasses import dataclass
|
||
from datetime import date as _date, datetime, timezone
|
||
from pathlib import Path
|
||
from typing import List, Literal, Optional, Tuple
|
||
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib.gridspec import GridSpec
|
||
from matplotlib.offsetbox import AnchoredOffsetbox, HPacker, TextArea
|
||
|
||
plt.rcParams["font.family"] = "sans-serif"
|
||
plt.rcParams["font.sans-serif"] = [
|
||
"DejaVu Sans", "Noto Sans", "Noto Sans Symbols2",
|
||
"Arial", "Liberation Sans", "sans-serif",
|
||
]
|
||
|
||
|
||
def _sanitize_render_text(s: str) -> str:
|
||
"""Strip Unicode code points that render as tofu boxes (tag chars,
|
||
variation selectors)."""
|
||
if not s:
|
||
return s
|
||
return "".join(
|
||
c for c in s
|
||
if not (
|
||
0xE0000 <= ord(c) <= 0xE007F # language tag characters
|
||
or 0xFE00 <= ord(c) <= 0xFE0F # VS1-VS16
|
||
or 0xE0100 <= ord(c) <= 0xE01EF # VS17-VS256
|
||
)
|
||
)
|
||
|
||
|
||
# This script runs as a subprocess entry point (python BOT/render_recap.py …),
|
||
# so BOT/__init__.py is not invoked automatically. Put SREBOT on sys.path,
|
||
# then `import BOT` to trigger the package's SHARED bootstrap once.
|
||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||
import BOT # noqa: F401, E402 — side effect: adds BOTS/SHARED to sys.path
|
||
|
||
from data_parser import apply_vehicle_name_filters # noqa: E402
|
||
|
||
LOCALES_DIR = Path(__file__).resolve().parent.parent / "web" / "locales"
|
||
DEFAULT_LANG = "en"
|
||
|
||
|
||
def load_translations(lang: str) -> dict[str, str]:
|
||
"""Load seasonCard.* strings for the given language, falling back to English for missing keys."""
|
||
def _read(lang_code: str) -> dict:
|
||
path = LOCALES_DIR / f"{lang_code}.json"
|
||
if not path.exists():
|
||
return {}
|
||
try:
|
||
return json.loads(path.read_text(encoding="utf-8")).get("seasonCard", {})
|
||
except Exception:
|
||
return {}
|
||
|
||
en = _read(DEFAULT_LANG)
|
||
translated = _read(lang) if lang != DEFAULT_LANG else {}
|
||
merged = dict(en)
|
||
merged.update(translated)
|
||
return merged
|
||
|
||
RATING_COLOR = "#14b8a6"
|
||
WR_COLOR = "#f59e0b"
|
||
KD_COLOR = "#a78bfa"
|
||
BATTLES_COLOR = "#60a5fa"
|
||
|
||
# Ordered so the first slots avoid clashing with the rolling-curve colors
|
||
# (WR #f59e0b orange, K/D #a78bfa violet, Battles #60a5fa blue).
|
||
_SQUADRON_PALETTE: List[str] = [
|
||
"#14b8a6", # teal (same as RATING_COLOR — single-squadron default)
|
||
"#dc2626", # red
|
||
"#059669", # green
|
||
"#ec4899", # pink
|
||
"#84cc16", # lime
|
||
"#06b6d4", # cyan
|
||
"#db2777", # magenta
|
||
"#65a30d", # olive
|
||
]
|
||
_NO_SQUADRON_COLOR = "#64748b"
|
||
|
||
THEMES: dict[str, dict[str, str]] = {
|
||
"light": {
|
||
"bg": "#fafafa",
|
||
"text": "#0f172a",
|
||
"muted": "#64748b",
|
||
"header_sub": "#475569",
|
||
"grid": "#94a3b8",
|
||
"divider": "#cbd5e1",
|
||
"footer": "#94a3b8",
|
||
},
|
||
"dark": {
|
||
"bg": "#121418",
|
||
"text": "#e5e7eb",
|
||
"muted": "#9ca3af",
|
||
"header_sub": "#d1d5db",
|
||
"grid": "#2a2f36",
|
||
"divider": "#262a30",
|
||
"footer": "#6b7280",
|
||
},
|
||
}
|
||
|
||
|
||
def theme_palette(name: str) -> dict[str, str]:
|
||
return THEMES.get(name, THEMES["light"])
|
||
|
||
|
||
def smooth_series(series: List[Tuple[int, int]], window: int = 12
|
||
) -> List[Tuple[int, float]]:
|
||
"""Centered moving average over (ts, value) pairs. Preserves length."""
|
||
n = len(series)
|
||
if n == 0:
|
||
return []
|
||
if n < window * 2:
|
||
return [(ts, float(v)) for ts, v in series]
|
||
half = window // 2
|
||
out: List[Tuple[int, float]] = []
|
||
for i in range(n):
|
||
lo = max(0, i - half)
|
||
hi = min(n, i + half + 1)
|
||
avg = sum(v for _, v in series[lo:hi]) / (hi - lo)
|
||
out.append((series[i][0], avg))
|
||
return out
|
||
|
||
|
||
def _downsample_daily(series: list) -> list:
|
||
"""Reduce to one point per UTC day, keeping each day's last (ts, value).
|
||
Caller can still apply smoothing; this just removes intra-day clustering
|
||
that otherwise renders as a vertical bar on a multi-month chart."""
|
||
if not series:
|
||
return []
|
||
out: list = []
|
||
prev_day = None
|
||
last_pt = None
|
||
for ts, v in series:
|
||
day = datetime.fromtimestamp(ts, tz=timezone.utc).date()
|
||
if prev_day is not None and day != prev_day and last_pt is not None:
|
||
out.append(last_pt)
|
||
last_pt = (ts, v)
|
||
prev_day = day
|
||
if last_pt is not None:
|
||
out.append(last_pt)
|
||
return out
|
||
|
||
|
||
def _split_on_gaps(series: list, gap_seconds: int) -> list:
|
||
"""Split a time-sorted series into sublists wherever consecutive timestamps
|
||
are more than gap_seconds apart. Prevents matplotlib from interpolating a
|
||
straight line across an inactive period."""
|
||
if not series:
|
||
return []
|
||
segments: list = [[]]
|
||
prev_ts = None
|
||
for item in series:
|
||
ts = item[0]
|
||
if prev_ts is not None and ts - prev_ts > gap_seconds and segments[-1]:
|
||
segments.append([])
|
||
segments[-1].append(item)
|
||
prev_ts = ts
|
||
return [s for s in segments if s]
|
||
|
||
|
||
def _clip_to_ranges(series: list, timeline: list) -> list:
|
||
"""Return a list of sublists — one per timeline range — containing only
|
||
the points whose ts falls inside that range's [first_ts, last_ts].
|
||
Used to make rolling WR/KD/Battles curves respect squadron shading
|
||
boundaries instead of sprawling across unshaded gaps."""
|
||
if not series or not timeline:
|
||
return []
|
||
out: list = []
|
||
for rng in timeline:
|
||
seg = [p for p in series if rng.first_ts <= p[0] <= rng.last_ts]
|
||
if seg:
|
||
out.append(seg)
|
||
return out
|
||
|
||
|
||
def _smooth_rolling_curve(series: list, window: int = 8) -> list:
|
||
"""Centered moving average for already-computed rolling curves.
|
||
Accepts (ts, int) or (ts, float) pairs; returns (ts, float)."""
|
||
n = len(series)
|
||
if n == 0:
|
||
return []
|
||
if n < window * 2:
|
||
return [(t, float(v)) for t, v in series]
|
||
half = window // 2
|
||
out: list = []
|
||
for i in range(n):
|
||
lo = max(0, i - half)
|
||
hi = min(n, i + half + 1)
|
||
avg = sum(v for _, v in series[lo:hi]) / (hi - lo)
|
||
out.append((series[i][0], float(avg)))
|
||
return out
|
||
|
||
|
||
_storage_env = os.environ.get("STORAGE_VOL_PATH", "").strip()
|
||
if not _storage_env:
|
||
raise RuntimeError("STORAGE_VOL_PATH must be set")
|
||
_STORAGE = Path(_storage_env)
|
||
SQUADRONS_DB = _STORAGE / "squadrons.db"
|
||
SQ_BATTLES_DB = _STORAGE / "sq_battles.db"
|
||
SEASONS_TXT = (Path(__file__).resolve().parent.parent
|
||
/ "web" / "constants" / "seasons")
|
||
|
||
_BR_LINE_RE = re.compile(r"<t:(\d+):\w+>.*?max BR\s+(\d+(?:\.\d+)?)",
|
||
re.IGNORECASE)
|
||
|
||
|
||
def _load_br_schedule() -> List[dict]:
|
||
"""Parse web/constants/seasons into a list of
|
||
[{'max_br': float, 'start': unix, 'end': unix}, ...].
|
||
Contains every season's BR tiers. Callers filter by season window."""
|
||
try:
|
||
text = SEASONS_TXT.read_text(encoding="utf-8")
|
||
except OSError:
|
||
return []
|
||
entries: List[dict] = []
|
||
for line in text.splitlines():
|
||
m = _BR_LINE_RE.search(line)
|
||
if not m:
|
||
continue
|
||
entries.append({
|
||
"start": int(m.group(1)),
|
||
"max_br": float(m.group(2)),
|
||
"end": 0,
|
||
})
|
||
# End = next entry's start (minus one sec); last entry gets +7d default.
|
||
for i, e in enumerate(entries):
|
||
nxt = entries[i + 1]["start"] if i + 1 < len(entries) else 0
|
||
e["end"] = (nxt - 1) if nxt > e["start"] else e["start"] + 7 * 86400
|
||
return entries
|
||
|
||
|
||
@dataclass
|
||
class Args:
|
||
mode: Literal["squadron", "player"]
|
||
clan_id: Optional[int]
|
||
uid: Optional[str]
|
||
season: str
|
||
season_start: int
|
||
season_end: int
|
||
week_boundaries: List[int]
|
||
out: Path
|
||
theme: str
|
||
lang: str
|
||
|
||
|
||
def parse_args(argv: Optional[List[str]] = None) -> Args:
|
||
p = argparse.ArgumentParser(description=__doc__)
|
||
p.add_argument("--mode", choices=["squadron", "player"], default="squadron")
|
||
p.add_argument("--clan-id", type=int, default=None)
|
||
p.add_argument("--uid", type=str, default=None)
|
||
p.add_argument("--season", required=True)
|
||
p.add_argument("--season-start", type=int, required=True)
|
||
p.add_argument("--season-end", type=int, required=True)
|
||
p.add_argument("--week-boundaries", default="")
|
||
p.add_argument("--out", type=Path, required=True)
|
||
p.add_argument("--theme", choices=list(THEMES.keys()), default="light")
|
||
p.add_argument("--lang", default=DEFAULT_LANG)
|
||
ns = p.parse_args(argv)
|
||
if ns.mode == "squadron" and ns.clan_id is None:
|
||
p.error("--mode squadron requires --clan-id")
|
||
if ns.mode == "player":
|
||
if not ns.uid:
|
||
p.error("--mode player requires --uid")
|
||
if not ns.uid.isdigit():
|
||
p.error("--uid must be numeric")
|
||
boundaries = [int(x) for x in ns.week_boundaries.split(",") if x.strip()]
|
||
return Args(
|
||
mode=ns.mode,
|
||
clan_id=ns.clan_id,
|
||
uid=ns.uid,
|
||
season=ns.season,
|
||
season_start=ns.season_start,
|
||
season_end=ns.season_end,
|
||
week_boundaries=boundaries,
|
||
out=ns.out,
|
||
theme=ns.theme,
|
||
lang=ns.lang,
|
||
)
|
||
|
||
|
||
def _open_ro(path: Path) -> sqlite3.Connection:
|
||
uri = f"file:{path}?mode=ro"
|
||
return sqlite3.connect(uri, uri=True, timeout=10.0)
|
||
|
||
|
||
@dataclass
|
||
class SquadronIdent:
|
||
clan_id: int
|
||
short_name: str
|
||
long_name: str
|
||
|
||
|
||
def resolve_squadron(conn_sq: sqlite3.Connection, clan_id: int) -> Optional[SquadronIdent]:
|
||
cur = conn_sq.execute(
|
||
"SELECT short_name, long_name FROM squadrons_data WHERE clan_id = ?",
|
||
(clan_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row:
|
||
return None
|
||
return SquadronIdent(clan_id=clan_id, short_name=row[0] or "", long_name=row[1] or "")
|
||
|
||
|
||
@dataclass
|
||
class RatingDerived:
|
||
series: List[Tuple[int, int]] # (unix_time, total_score) ordered
|
||
final: Optional[int]
|
||
peak: Optional[int]
|
||
peak_ts: Optional[int]
|
||
first: Optional[int]
|
||
change: Optional[int]
|
||
|
||
|
||
def gather_squadron_rating(conn_sq: sqlite3.Connection, clan_id: int,
|
||
start: int, end: int) -> RatingDerived:
|
||
cur = conn_sq.execute(
|
||
"SELECT unix_time, total_score FROM squadrons_points "
|
||
"WHERE clan_id = ? AND unix_time BETWEEN ? AND ? "
|
||
"ORDER BY unix_time",
|
||
(clan_id, start, end),
|
||
)
|
||
series: List[Tuple[int, int]] = [(row[0], row[1]) for row in cur.fetchall()]
|
||
if not series:
|
||
return RatingDerived(series=[], final=None, peak=None, peak_ts=None, first=None, change=None)
|
||
first = series[0][1]
|
||
final = series[-1][1]
|
||
peak_ts, peak = max(series, key=lambda t: t[1])
|
||
return RatingDerived(series=series, final=final, peak=peak, peak_ts=peak_ts,
|
||
first=first, change=final - first)
|
||
|
||
|
||
@dataclass
|
||
class MatchRow:
|
||
session_id: str
|
||
endtime_unix: int
|
||
winning_sq: str
|
||
losing_sq: str
|
||
|
||
|
||
@dataclass
|
||
class MatchDerived:
|
||
stream: List[MatchRow]
|
||
total: int
|
||
wins: int
|
||
losses: int
|
||
wr_pct: Optional[float]
|
||
longest_win_streak: int
|
||
top_opponent: Optional[Tuple[str, int, int]] # (short, matches_vs, wins_vs)
|
||
|
||
|
||
def gather_squadron_match_stream(conn_b: sqlite3.Connection, short: str,
|
||
start: int, end: int) -> List[MatchRow]:
|
||
cur = conn_b.execute(
|
||
"SELECT session_id, endtime_unix, winning_sq, losing_sq "
|
||
"FROM match_summary "
|
||
"WHERE (winning_sq = ? OR losing_sq = ?) "
|
||
" AND endtime_unix BETWEEN ? AND ? "
|
||
"ORDER BY endtime_unix",
|
||
(short, short, start, end),
|
||
)
|
||
return [MatchRow(session_id=r[0], endtime_unix=r[1], winning_sq=r[2] or "",
|
||
losing_sq=r[3] or "") for r in cur.fetchall()]
|
||
|
||
|
||
def gather_squadron_per_match_stream(conn_b: sqlite3.Connection, short: str,
|
||
start: int, end: int
|
||
) -> "List[PlayerGameRow]":
|
||
"""One synthetic PlayerGameRow per squadron match (kills/deaths summed
|
||
across all squadron members in that match, win/loss from match_summary).
|
||
Shaped as PlayerGameRow so the shared rolling helpers work unchanged."""
|
||
cur = conn_b.execute(
|
||
"SELECT pgh.session_id, MAX(pgh.endtime_unix) AS ts, "
|
||
" ms.winning_sq, "
|
||
" COALESCE(SUM(pgh.ground_kills), 0) AS g_kills, "
|
||
" COALESCE(SUM(pgh.air_kills), 0) AS a_kills, "
|
||
" COALESCE(SUM(pgh.deaths), 0) AS deaths "
|
||
"FROM player_games_hist pgh "
|
||
"JOIN match_summary ms ON ms.session_id = pgh.session_id "
|
||
"WHERE pgh.squadron_name = ? AND pgh.endtime_unix BETWEEN ? AND ? "
|
||
"GROUP BY pgh.session_id "
|
||
"ORDER BY ts",
|
||
(short, start, end),
|
||
)
|
||
rows: List[PlayerGameRow] = []
|
||
for sid, ts, win_sq, g, a, d in cur.fetchall():
|
||
rows.append(PlayerGameRow(
|
||
session_id=sid, endtime_unix=ts, squadron_name=short,
|
||
vehicle="",
|
||
ground_kills=g or 0, air_kills=a or 0,
|
||
assists=0, captures=0, deaths=d or 0,
|
||
victor_bool="WIN" if (win_sq or "") == short else "LOSS",
|
||
))
|
||
return rows
|
||
|
||
|
||
def compute_squadron_match_derived(stream: List[MatchRow], short: str) -> MatchDerived:
|
||
total = len(stream)
|
||
wins = sum(1 for m in stream if m.winning_sq == short)
|
||
losses = total - wins
|
||
wr_pct = (wins / total * 100.0) if total else None
|
||
|
||
# Longest win streak
|
||
longest = cur_streak = 0
|
||
for m in stream:
|
||
if m.winning_sq == short:
|
||
cur_streak += 1
|
||
if cur_streak > longest:
|
||
longest = cur_streak
|
||
else:
|
||
cur_streak = 0
|
||
|
||
# Top opponent (by match count), with wins-against count
|
||
opp_total: dict[str, int] = defaultdict(int)
|
||
opp_wins: dict[str, int] = defaultdict(int)
|
||
for m in stream:
|
||
other = m.losing_sq if m.winning_sq == short else m.winning_sq
|
||
if not other:
|
||
continue
|
||
opp_total[other] += 1
|
||
if m.winning_sq == short:
|
||
opp_wins[other] += 1
|
||
if opp_total:
|
||
top_name = max(opp_total, key=lambda k: opp_total[k])
|
||
top_opponent: Optional[Tuple[str, int, int]] = (top_name, opp_total[top_name], opp_wins[top_name])
|
||
else:
|
||
top_opponent = None
|
||
|
||
return MatchDerived(stream=stream, total=total, wins=wins, losses=losses,
|
||
wr_pct=wr_pct, longest_win_streak=longest, top_opponent=top_opponent)
|
||
|
||
|
||
@dataclass
|
||
class TimelineRange:
|
||
squadron_name: str # "" = no squadron
|
||
first_ts: int
|
||
last_ts: int
|
||
match_count: int
|
||
|
||
|
||
def gather_player_squadron_timeline(conn_b: sqlite3.Connection, uid: str,
|
||
start: int, end: int) -> List[TimelineRange]:
|
||
"""Walk player games in time order; group consecutive same-squadron rows
|
||
into ranges. Empty string squadron_name is a valid range ("no squadron")."""
|
||
cur = conn_b.execute(
|
||
"SELECT squadron_name, endtime_unix FROM player_games_hist "
|
||
"WHERE UID = ? AND endtime_unix BETWEEN ? AND ? "
|
||
"ORDER BY endtime_unix",
|
||
(uid, start, end),
|
||
)
|
||
rows = cur.fetchall()
|
||
if not rows:
|
||
return []
|
||
ranges: List[TimelineRange] = []
|
||
cur_name = rows[0][0] or ""
|
||
cur_first = cur_last = rows[0][1]
|
||
cur_count = 1
|
||
for name, ts in rows[1:]:
|
||
name = name or ""
|
||
if name == cur_name:
|
||
cur_last = ts
|
||
cur_count += 1
|
||
else:
|
||
ranges.append(TimelineRange(cur_name, cur_first, cur_last, cur_count))
|
||
cur_name, cur_first, cur_last, cur_count = name, ts, ts, 1
|
||
ranges.append(TimelineRange(cur_name, cur_first, cur_last, cur_count))
|
||
|
||
# Drop ranges shorter than one day (brief squadron stints aren't worth
|
||
# surfacing in the chain, legend, or shading), then merge any adjacent
|
||
# same-squadron ranges that got left side-by-side after the drop.
|
||
ranges = [r for r in ranges if (r.last_ts - r.first_ts) >= 86400]
|
||
merged: List[TimelineRange] = []
|
||
for r in ranges:
|
||
if merged and merged[-1].squadron_name == r.squadron_name:
|
||
prev = merged[-1]
|
||
merged[-1] = TimelineRange(
|
||
squadron_name=prev.squadron_name,
|
||
first_ts=prev.first_ts,
|
||
last_ts=r.last_ts,
|
||
match_count=prev.match_count + r.match_count,
|
||
)
|
||
else:
|
||
merged.append(r)
|
||
ranges = merged
|
||
|
||
if not ranges:
|
||
return []
|
||
|
||
# The final range covers "still in this squadron" — extend its last_ts
|
||
# through the season window so rating snapshots after the player's final
|
||
# match (decay, continued membership) still appear on the chart.
|
||
last = ranges[-1]
|
||
if last.last_ts < end:
|
||
ranges[-1] = TimelineRange(
|
||
squadron_name=last.squadron_name,
|
||
first_ts=last.first_ts,
|
||
last_ts=end,
|
||
match_count=last.match_count,
|
||
)
|
||
return ranges
|
||
|
||
|
||
@dataclass
|
||
class PlayerGameRow:
|
||
session_id: str
|
||
endtime_unix: int
|
||
squadron_name: str
|
||
vehicle: str
|
||
ground_kills: int
|
||
air_kills: int
|
||
assists: int
|
||
captures: int
|
||
deaths: int
|
||
victor_bool: str # "WIN" | "LOSS" (case varies in source data)
|
||
|
||
|
||
def gather_player_game_stream(conn_b: sqlite3.Connection, uid: str,
|
||
start: int, end: int) -> List[PlayerGameRow]:
|
||
cur = conn_b.execute(
|
||
"SELECT session_id, endtime_unix, squadron_name, vehicle, "
|
||
" ground_kills, air_kills, assists, captures, deaths, victor_bool "
|
||
"FROM player_games_hist "
|
||
"WHERE UID = ? AND endtime_unix BETWEEN ? AND ? "
|
||
"ORDER BY endtime_unix",
|
||
(uid, start, end),
|
||
)
|
||
return [
|
||
PlayerGameRow(
|
||
session_id=r[0], endtime_unix=r[1], squadron_name=r[2] or "",
|
||
vehicle=apply_vehicle_name_filters(r[3] or ""),
|
||
ground_kills=r[4] or 0, air_kills=r[5] or 0,
|
||
assists=r[6] or 0, captures=r[7] or 0, deaths=r[8] or 0,
|
||
victor_bool=r[9] or "",
|
||
)
|
||
for r in cur.fetchall()
|
||
]
|
||
|
||
|
||
@dataclass
|
||
class RatingSegment:
|
||
squadron_name: str # "" if the range had no squadron
|
||
points: List[Tuple[int, int]] # (unix_time, player_points); empty if unresolvable
|
||
|
||
|
||
def _decompress_clan_pts(blob) -> Optional[dict]:
|
||
"""clan_pts is a gzipped JSON of [members_dict, total_score]. Return
|
||
members_dict ({uid_str: {"points": N, ...}, ...}) or None on failure."""
|
||
try:
|
||
if isinstance(blob, (bytes, memoryview)):
|
||
data = json.loads(gzip.decompress(bytes(blob)))
|
||
else:
|
||
data = json.loads(blob)
|
||
except (OSError, ValueError, TypeError):
|
||
return None
|
||
if isinstance(data, list) and data and isinstance(data[0], dict):
|
||
return data[0]
|
||
if isinstance(data, dict):
|
||
return data
|
||
return None
|
||
|
||
|
||
def _resolve_squadron_long_name(conn_sq: sqlite3.Connection,
|
||
short_name: str) -> Optional[str]:
|
||
if not short_name:
|
||
return None
|
||
cur = conn_sq.execute(
|
||
"SELECT long_name FROM squadrons_data WHERE short_name = ? LIMIT 1",
|
||
(short_name,),
|
||
)
|
||
row = cur.fetchone()
|
||
return row[0] if row and row[0] else None
|
||
|
||
|
||
def gather_player_rating_trail(conn_sq: sqlite3.Connection,
|
||
timeline: List[TimelineRange],
|
||
uid: str) -> List[RatingSegment]:
|
||
"""For each timeline range, read the player's personal SQB points (from
|
||
each squadrons_points snapshot's gzipped member roster) during that range.
|
||
Returns per-range (unix_time, player_points) lists."""
|
||
segments: List[RatingSegment] = []
|
||
long_cache: dict[str, Optional[str]] = {}
|
||
for rng in timeline:
|
||
if not rng.squadron_name:
|
||
segments.append(RatingSegment("", []))
|
||
continue
|
||
if rng.squadron_name not in long_cache:
|
||
long_cache[rng.squadron_name] = _resolve_squadron_long_name(
|
||
conn_sq, rng.squadron_name
|
||
)
|
||
long_name = long_cache[rng.squadron_name]
|
||
if not long_name:
|
||
segments.append(RatingSegment(rng.squadron_name, []))
|
||
continue
|
||
cur = conn_sq.execute(
|
||
"SELECT unix_time, clan_pts FROM squadrons_points "
|
||
"WHERE long_name = ? AND unix_time BETWEEN ? AND ? "
|
||
"ORDER BY unix_time",
|
||
(long_name, rng.first_ts, rng.last_ts),
|
||
)
|
||
points: List[Tuple[int, int]] = []
|
||
for ts, blob in cur.fetchall():
|
||
members = _decompress_clan_pts(blob)
|
||
if not members:
|
||
continue
|
||
entry = members.get(uid) or members.get(str(uid))
|
||
if not isinstance(entry, dict):
|
||
continue
|
||
pts = entry.get("points")
|
||
if isinstance(pts, (int, float)):
|
||
points.append((ts, int(pts)))
|
||
segments.append(RatingSegment(rng.squadron_name, points))
|
||
return segments
|
||
|
||
|
||
def gather_player_most_common_opponent(conn_b: sqlite3.Connection,
|
||
game_stream: List[PlayerGameRow]
|
||
) -> Optional[Tuple[str, int]]:
|
||
"""For each session the player was in, look up winning/losing squadron in
|
||
match_summary; opponent is whichever side isn't the player's squadron in
|
||
that game. Returns (opp_short_name, matches_vs) or None."""
|
||
if not game_stream:
|
||
return None
|
||
ids = list({g.session_id for g in game_stream})
|
||
placeholders = ",".join("?" * len(ids))
|
||
cur = conn_b.execute(
|
||
f"SELECT session_id, winning_sq, losing_sq "
|
||
f"FROM match_summary WHERE session_id IN ({placeholders})",
|
||
ids,
|
||
)
|
||
by_session: dict[str, Tuple[str, str]] = {
|
||
r[0]: (r[1] or "", r[2] or "") for r in cur.fetchall()
|
||
}
|
||
opp_counts: dict[str, int] = defaultdict(int)
|
||
for g in game_stream:
|
||
pair = by_session.get(g.session_id)
|
||
if not pair:
|
||
continue
|
||
w, l = pair
|
||
own = g.squadron_name
|
||
opp = l if w == own else (w if l == own else "")
|
||
if opp:
|
||
opp_counts[opp] += 1
|
||
if not opp_counts:
|
||
return None
|
||
name = max(opp_counts, key=lambda k: opp_counts[k])
|
||
return name, opp_counts[name]
|
||
|
||
|
||
def gather_player_frequent_teammate(conn_b: sqlite3.Connection, uid: str,
|
||
start: int, end: int
|
||
) -> Optional[Tuple[str, int]]:
|
||
"""Self-join on session_id — counts UIDs (not the player) that shared the
|
||
most sessions with the player while on the same squadron in that session."""
|
||
cur = conn_b.execute(
|
||
"SELECT p.UID, COUNT(DISTINCT p.session_id) AS shared "
|
||
"FROM player_games_hist p "
|
||
"JOIN player_games_hist me "
|
||
" ON me.session_id = p.session_id "
|
||
" AND me.squadron_name = p.squadron_name "
|
||
"WHERE me.UID = ? "
|
||
" AND p.UID != ? "
|
||
" AND me.endtime_unix BETWEEN ? AND ? "
|
||
"GROUP BY p.UID "
|
||
"ORDER BY shared DESC "
|
||
"LIMIT 1",
|
||
(uid, uid, start, end),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row or not row[1]:
|
||
return None
|
||
teammate_uid, shared = row[0], int(row[1])
|
||
# MIN(nick) would pick byte-sorted smallest — e.g. a 'coop/…' disconnect
|
||
# row beats the real nick. Pick the most frequent non-coop nick instead,
|
||
# tiebreaking by recency so renames surface the latest identity.
|
||
nick_cur = conn_b.execute(
|
||
"SELECT nick FROM player_games_hist "
|
||
"WHERE UID = ? AND nick NOT LIKE 'coop/%' "
|
||
"GROUP BY nick "
|
||
"ORDER BY COUNT(*) DESC, MAX(endtime_unix) DESC "
|
||
"LIMIT 1",
|
||
(teammate_uid,),
|
||
)
|
||
nick_row = nick_cur.fetchone()
|
||
return (nick_row[0] if nick_row and nick_row[0] else "unknown"), shared
|
||
|
||
|
||
@dataclass
|
||
class BestMatchRow:
|
||
session_id: str
|
||
endtime_unix: int
|
||
vehicle: str
|
||
ground_kills: int
|
||
air_kills: int
|
||
assists: int
|
||
captures: int
|
||
deaths: int
|
||
|
||
|
||
@dataclass
|
||
class PlayerDerived:
|
||
total: int
|
||
wins: int
|
||
losses: int
|
||
wr_pct: Optional[float]
|
||
ground_kills: int
|
||
air_kills: int
|
||
total_kills: int
|
||
total_deaths: int
|
||
assists: int
|
||
captures: int
|
||
kd: Optional[float]
|
||
longest_win_streak: int
|
||
top_vehicle: Optional[Tuple[str, int]]
|
||
best_match: Optional[BestMatchRow]
|
||
peak_rating: Optional[Tuple[int, str, int]] # (rating, squadron_name, unix_time)
|
||
|
||
|
||
def compute_player_derived(stream: List[PlayerGameRow],
|
||
timeline: List[TimelineRange],
|
||
trail: List[RatingSegment]) -> PlayerDerived:
|
||
del timeline # reserved for future squadron-weighted metrics; currently unused
|
||
total = len(stream)
|
||
wins = sum(1 for g in stream if g.victor_bool.upper() == "WIN")
|
||
losses = total - wins
|
||
wr = (wins / total * 100.0) if total else None
|
||
|
||
gk = sum(g.ground_kills for g in stream)
|
||
ak = sum(g.air_kills for g in stream)
|
||
dk = sum(g.deaths for g in stream)
|
||
asts = sum(g.assists for g in stream)
|
||
caps = sum(g.captures for g in stream)
|
||
kd = ((gk + ak) / dk) if dk else None
|
||
|
||
longest = cur = 0
|
||
for g in stream:
|
||
if g.victor_bool.upper() == "WIN":
|
||
cur += 1
|
||
longest = max(longest, cur)
|
||
else:
|
||
cur = 0
|
||
|
||
veh_counts: dict[str, int] = defaultdict(int)
|
||
for g in stream:
|
||
if g.vehicle:
|
||
veh_counts[g.vehicle] += 1
|
||
if veh_counts:
|
||
top_name = max(veh_counts, key=lambda k: veh_counts[k])
|
||
top_vehicle: Optional[Tuple[str, int]] = (top_name, veh_counts[top_name])
|
||
else:
|
||
top_vehicle = None
|
||
|
||
best: Optional[BestMatchRow] = None
|
||
best_score = -1
|
||
for g in stream:
|
||
score = g.ground_kills + g.air_kills + g.assists + g.captures
|
||
if score > best_score:
|
||
best_score = score
|
||
best = BestMatchRow(
|
||
session_id=g.session_id, endtime_unix=g.endtime_unix,
|
||
vehicle=g.vehicle, ground_kills=g.ground_kills,
|
||
air_kills=g.air_kills, assists=g.assists, captures=g.captures,
|
||
deaths=g.deaths,
|
||
)
|
||
elif score == best_score and best is not None and g.deaths < best.deaths:
|
||
best = BestMatchRow(
|
||
session_id=g.session_id, endtime_unix=g.endtime_unix,
|
||
vehicle=g.vehicle, ground_kills=g.ground_kills,
|
||
air_kills=g.air_kills, assists=g.assists, captures=g.captures,
|
||
deaths=g.deaths,
|
||
)
|
||
|
||
peak: Optional[Tuple[int, str, int]] = None
|
||
for seg in trail:
|
||
for ts, rating in seg.points:
|
||
if peak is None or rating > peak[0]:
|
||
peak = (rating, seg.squadron_name, ts)
|
||
|
||
return PlayerDerived(
|
||
total=total, wins=wins, losses=losses, wr_pct=wr,
|
||
ground_kills=gk, air_kills=ak, total_kills=gk + ak,
|
||
total_deaths=dk, assists=asts, captures=caps, kd=kd,
|
||
longest_win_streak=longest, top_vehicle=top_vehicle,
|
||
best_match=best, peak_rating=peak,
|
||
)
|
||
|
||
|
||
@dataclass
|
||
class PlayerAggregates:
|
||
ground_kills: int
|
||
air_kills: int
|
||
deaths: int
|
||
assists: int
|
||
captures: int
|
||
total_kills: int
|
||
kd: Optional[float]
|
||
top_vehicle: Optional[Tuple[str, int]] # (vehicle_raw_name, games)
|
||
mvp: Optional[Tuple[str, int, int, int]] # (nick, kills, assists, captures)
|
||
most_active: Optional[Tuple[str, int]] # (nick, matches_played)
|
||
|
||
|
||
def gather_squadron_player_aggregates(conn_b: sqlite3.Connection, squadron_name: str,
|
||
start: int, end: int) -> PlayerAggregates:
|
||
# Totals
|
||
cur = conn_b.execute(
|
||
"SELECT COALESCE(SUM(ground_kills), 0), COALESCE(SUM(air_kills), 0), "
|
||
" COALESCE(SUM(deaths), 0), COALESCE(SUM(assists), 0), "
|
||
" COALESCE(SUM(captures), 0) "
|
||
"FROM player_games_hist "
|
||
"WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ?",
|
||
(squadron_name, start, end),
|
||
)
|
||
g, a, d, asst, cap = cur.fetchone()
|
||
|
||
# Top vehicle
|
||
cur = conn_b.execute(
|
||
"SELECT vehicle, COUNT(*) AS c FROM player_games_hist "
|
||
"WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? "
|
||
" AND vehicle IS NOT NULL AND vehicle != '' "
|
||
"GROUP BY vehicle ORDER BY c DESC LIMIT 1",
|
||
(squadron_name, start, end),
|
||
)
|
||
row = cur.fetchone()
|
||
top_vehicle: Optional[Tuple[str, int]] = (
|
||
(apply_vehicle_name_filters(row[0]), row[1]) if row else None
|
||
)
|
||
|
||
# MVP — ranked by K + A + C (no weighting); report the breakdown
|
||
cur = conn_b.execute(
|
||
"SELECT MIN(nick) AS nick, "
|
||
" SUM(ground_kills + air_kills) AS kills, "
|
||
" SUM(assists) AS assists, "
|
||
" SUM(captures) AS captures "
|
||
"FROM player_games_hist "
|
||
"WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? "
|
||
"GROUP BY UID "
|
||
"ORDER BY SUM(ground_kills + air_kills + assists + captures) DESC "
|
||
"LIMIT 1",
|
||
(squadron_name, start, end),
|
||
)
|
||
row = cur.fetchone()
|
||
mvp: Optional[Tuple[str, int, int, int]] = (
|
||
(row[0], row[1] or 0, row[2] or 0, row[3] or 0)
|
||
if row and (row[1] or row[2] or row[3]) else None
|
||
)
|
||
|
||
# Most active — matches played per UID
|
||
cur = conn_b.execute(
|
||
"SELECT MIN(nick) AS nick, COUNT(DISTINCT session_id) AS matches "
|
||
"FROM player_games_hist "
|
||
"WHERE squadron_name = ? AND endtime_unix BETWEEN ? AND ? "
|
||
"GROUP BY UID ORDER BY matches DESC LIMIT 1",
|
||
(squadron_name, start, end),
|
||
)
|
||
row = cur.fetchone()
|
||
most_active: Optional[Tuple[str, int]] = (row[0], row[1]) if row and row[1] else None
|
||
|
||
total_kills = g + a
|
||
kd = (total_kills / d) if d else None
|
||
return PlayerAggregates(ground_kills=g, air_kills=a, deaths=d, assists=asst,
|
||
captures=cap, total_kills=total_kills, kd=kd,
|
||
top_vehicle=top_vehicle, mvp=mvp, most_active=most_active)
|
||
|
||
|
||
def compute_squadron_wr_curve(stream: List[MatchRow], short: str, season_start: int,
|
||
season_end: int) -> Tuple[List[Tuple[int, float]], int]:
|
||
"""
|
||
Returns (points, window_size). Points: list of (unix_time, wr_pct) anchored
|
||
at each match's endtime, computed as rolling WR over the previous `window_size`
|
||
matches (inclusive of current). x-axis domain callers want = [season_start, season_end].
|
||
"""
|
||
total = len(stream)
|
||
if total == 0:
|
||
return [], 0
|
||
window = max(20, total // 20)
|
||
if total < window:
|
||
# Cumulative WR: point per match
|
||
running_wins = 0
|
||
points: List[Tuple[int, float]] = []
|
||
for i, m in enumerate(stream):
|
||
if m.winning_sq == short:
|
||
running_wins += 1
|
||
wr = running_wins / (i + 1) * 100.0
|
||
points.append((m.endtime_unix, wr))
|
||
return points, window
|
||
# Full rolling
|
||
wins_flags = [1 if m.winning_sq == short else 0 for m in stream]
|
||
points = []
|
||
window_sum = sum(wins_flags[:window])
|
||
# First plottable index = window - 1
|
||
points.append((stream[window - 1].endtime_unix, window_sum / window * 100.0))
|
||
for i in range(window, total):
|
||
window_sum += wins_flags[i] - wins_flags[i - window]
|
||
points.append((stream[i].endtime_unix, window_sum / window * 100.0))
|
||
return points, window
|
||
|
||
|
||
def compute_rolling_wr(stream: List[PlayerGameRow],
|
||
window_seconds: int = 7 * 86400
|
||
) -> List[Tuple[int, float]]:
|
||
"""Trailing time-window win rate, one point per match. Stale games drop
|
||
out of the window naturally during play gaps so the curve doesn't mix
|
||
stats across weeks."""
|
||
total = len(stream)
|
||
if total == 0:
|
||
return []
|
||
wins = [1 if g.victor_bool.upper() == "WIN" else 0 for g in stream]
|
||
out: List[Tuple[int, float]] = []
|
||
left = 0
|
||
wins_in_window = 0
|
||
for right in range(total):
|
||
t_r = stream[right].endtime_unix
|
||
wins_in_window += wins[right]
|
||
while stream[left].endtime_unix < t_r - window_seconds:
|
||
wins_in_window -= wins[left]
|
||
left += 1
|
||
n = right - left + 1
|
||
out.append((t_r, wins_in_window / n * 100.0))
|
||
return out
|
||
|
||
|
||
def compute_rolling_kd(stream: List[PlayerGameRow],
|
||
window_seconds: int = 7 * 86400
|
||
) -> List[Tuple[int, float]]:
|
||
"""Trailing time-window K/D, one point per match. deaths=0 is bounded
|
||
via max(deaths, 1) to keep values finite during kill-heavy early games."""
|
||
total = len(stream)
|
||
if total == 0:
|
||
return []
|
||
kills = [g.ground_kills + g.air_kills for g in stream]
|
||
deaths = [g.deaths for g in stream]
|
||
out: List[Tuple[int, float]] = []
|
||
left = 0
|
||
k_sum = d_sum = 0
|
||
for right in range(total):
|
||
t_r = stream[right].endtime_unix
|
||
k_sum += kills[right]
|
||
d_sum += deaths[right]
|
||
while stream[left].endtime_unix < t_r - window_seconds:
|
||
k_sum -= kills[left]
|
||
d_sum -= deaths[left]
|
||
left += 1
|
||
out.append((t_r, k_sum / max(d_sum, 1)))
|
||
return out
|
||
|
||
|
||
def compute_rolling_battles(stream: List[PlayerGameRow],
|
||
window_seconds: int = 7 * 86400
|
||
) -> List[Tuple[int, int]]:
|
||
"""Time-based rolling match count: at each game timestamp t, count of games
|
||
with endtime_unix in [t - window, t]."""
|
||
total = len(stream)
|
||
if total == 0:
|
||
return []
|
||
out: List[Tuple[int, int]] = []
|
||
left = 0
|
||
for right in range(total):
|
||
t_r = stream[right].endtime_unix
|
||
while stream[left].endtime_unix <= t_r - window_seconds:
|
||
left += 1
|
||
out.append((t_r, right - left + 1))
|
||
return out
|
||
|
||
|
||
# SQB timeslots (UTC) — source of truth is BOT/utils.py SQB_SLOTS_POSTED; duplicated
|
||
# here because BOT/utils.py pulls in Discord bot dependencies, too heavy for this CLI.
|
||
_SQB_TIMESLOTS_UTC: List[Tuple[str, int, int]] = [
|
||
("NA", 1 * 3600, 7 * 3600), # 01:00 – 07:00
|
||
("EU", 14 * 3600, 22 * 3600), # 14:00 – 22:00
|
||
]
|
||
|
||
|
||
def _classify_timeslot(ts: int) -> Optional[Tuple[str, _date]]:
|
||
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
|
||
sod = int(dt.replace(hour=0, minute=0, second=0, microsecond=0).timestamp())
|
||
offset = ts - sod
|
||
for region, lo, hi in _SQB_TIMESLOTS_UTC:
|
||
if lo <= offset <= hi:
|
||
return region, dt.date()
|
||
return None
|
||
|
||
|
||
def compute_longest_timeslot_session(stream: List[PlayerGameRow]
|
||
) -> Optional[Tuple[str, _date, int]]:
|
||
"""Bucket games into (date_UTC, region) slots based on endtime_unix; return
|
||
(region, date, match_count) for the bucket with the most games. Matches
|
||
outside any posted SQB timeslot are excluded."""
|
||
buckets: dict[Tuple[str, _date], int] = defaultdict(int)
|
||
for g in stream:
|
||
slot = _classify_timeslot(g.endtime_unix)
|
||
if slot is None:
|
||
continue
|
||
buckets[slot] += 1
|
||
if not buckets:
|
||
return None
|
||
(region, d), n = max(buckets.items(), key=lambda kv: kv[1])
|
||
return region, d, n
|
||
|
||
|
||
def compute_most_active_day_utc(stream: List[PlayerGameRow]
|
||
) -> Optional[Tuple[_date, int]]:
|
||
"""Return the UTC date with the most matches (ignores timeslot boundaries)."""
|
||
by_day: dict[_date, int] = defaultdict(int)
|
||
for g in stream:
|
||
d = datetime.fromtimestamp(g.endtime_unix, tz=timezone.utc).date()
|
||
by_day[d] += 1
|
||
if not by_day:
|
||
return None
|
||
d, n = max(by_day.items(), key=lambda kv: kv[1])
|
||
return d, n
|
||
|
||
|
||
def _fmt_int(n: Optional[int]) -> str:
|
||
return f"{n:,}" if n is not None else "—"
|
||
|
||
|
||
def _fmt_float(n: Optional[float], digits: int = 2) -> str:
|
||
return f"{n:.{digits}f}" if n is not None else "—"
|
||
|
||
|
||
def _fmt_date(ts: Optional[int]) -> str:
|
||
if ts is None:
|
||
return "—"
|
||
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d")
|
||
|
||
|
||
def render_squadron_card(out_path: Path, ident: SquadronIdent, season: str,
|
||
rating: RatingDerived, match: MatchDerived,
|
||
players: PlayerAggregates,
|
||
rolling_wr: List[Tuple[int, float]],
|
||
rolling_kd: List[Tuple[int, float]],
|
||
rolling_battles: List[Tuple[int, int]],
|
||
season_start: int, season_end: int,
|
||
week_boundaries: List[int],
|
||
theme: str = "light",
|
||
lang: str = DEFAULT_LANG) -> None:
|
||
pal = theme_palette(theme)
|
||
tr = load_translations(lang)
|
||
def t(key: str) -> str:
|
||
return tr.get(key, key)
|
||
|
||
fig = plt.figure(figsize=(16, 10), dpi=120, facecolor=pal["bg"])
|
||
gs = GridSpec(3, 1, height_ratios=[0.55, 2.4, 1.35], hspace=0.35,
|
||
left=0.13, right=0.87, top=0.93, bottom=0.06, figure=fig)
|
||
|
||
# ── Header / hero row ────────────────────────────────────────────
|
||
ax_head = fig.add_subplot(gs[0])
|
||
ax_head.set_axis_off()
|
||
ax_head.text(0.0, 0.75, f"{ident.short_name} · {ident.long_name}",
|
||
fontsize=22, fontweight="bold", color=pal["text"],
|
||
transform=ax_head.transAxes)
|
||
ax_head.text(1.0, 0.75, f"{season} {t('imgRecapSuffix')}",
|
||
fontsize=18, fontweight="bold", color=pal["header_sub"],
|
||
ha="right", transform=ax_head.transAxes)
|
||
hero = [
|
||
(t("imgHeroFinalRating"), _fmt_int(rating.final)),
|
||
(t("imgHeroMatches"), _fmt_int(match.total)),
|
||
(t("imgHeroWinRate"), (f"{match.wr_pct:.1f}%" if match.wr_pct is not None else "—")),
|
||
(t("imgHeroKD"), _fmt_float(players.kd)),
|
||
]
|
||
for i, (label, value) in enumerate(hero):
|
||
x = 0.04 + i * 0.24
|
||
ax_head.text(x, 0.15, value, fontsize=26, fontweight="bold",
|
||
color=pal["text"], va="center",
|
||
transform=ax_head.transAxes)
|
||
ax_head.text(x, -0.10, label, fontsize=11, color=pal["muted"],
|
||
va="center", transform=ax_head.transAxes)
|
||
|
||
for y in (0.55, -0.32):
|
||
ax_head.plot([0.01, 0.99], [y, y], color=pal["divider"],
|
||
alpha=0.85, linewidth=1.0,
|
||
transform=ax_head.transAxes, clip_on=False)
|
||
|
||
# ── Main graph (rating + WR dual axis) ───────────────────────────
|
||
ax = fig.add_subplot(gs[1])
|
||
ax.set_facecolor(pal["bg"])
|
||
ax.set_xlim(season_start, season_end)
|
||
|
||
for b in week_boundaries:
|
||
if season_start <= b <= season_end:
|
||
ax.axvline(b, color=pal["grid"], alpha=0.3, linewidth=1)
|
||
|
||
if rating.series:
|
||
smoothed = smooth_series(rating.series, window=12)
|
||
xs = [ts for ts, _ in smoothed]
|
||
ys = [v for _, v in smoothed]
|
||
ax.plot(xs, ys, color=RATING_COLOR, linewidth=2.2, zorder=3)
|
||
ax.set_ylabel(t("imgAxisRating"), color=RATING_COLOR, fontsize=12)
|
||
ax.tick_params(axis="y", colors=RATING_COLOR)
|
||
ax.spines["left"].set_color(RATING_COLOR)
|
||
ax.spines["top"].set_color(pal["grid"])
|
||
ax.spines["bottom"].set_color(pal["grid"])
|
||
ax.tick_params(axis="x", colors=pal["muted"])
|
||
|
||
def _prep(series: list) -> list:
|
||
return [_smooth_rolling_curve(_downsample_daily(part), window=4)
|
||
for part in _split_on_gaps(series, 7 * 86400)]
|
||
|
||
ax_wr = ax.twinx()
|
||
for s in _prep(rolling_wr):
|
||
ax_wr.plot([p[0] for p in s], [p[1] for p in s],
|
||
color=WR_COLOR, linewidth=1.3, alpha=0.8, zorder=2)
|
||
ax_wr.set_ylim(0, 100)
|
||
ax_wr.set_ylabel(t("imgAxisWinRate"), color=WR_COLOR, fontsize=12)
|
||
ax_wr.tick_params(axis="y", colors=WR_COLOR)
|
||
ax_wr.spines["right"].set_color(WR_COLOR)
|
||
ax_wr.spines["left"].set_visible(False)
|
||
|
||
ax_kd = ax.twinx()
|
||
ax_kd.spines["right"].set_position(("axes", 1.08))
|
||
for s in _prep(rolling_kd):
|
||
ax_kd.plot([p[0] for p in s], [p[1] for p in s],
|
||
color=KD_COLOR, linewidth=1.1, alpha=0.7, zorder=2)
|
||
ax_kd.set_ylabel(t("imgAxisKD"), color=KD_COLOR, fontsize=12, labelpad=10)
|
||
ax_kd.tick_params(axis="y", colors=KD_COLOR)
|
||
ax_kd.spines["right"].set_color(KD_COLOR)
|
||
ax_kd.spines["left"].set_visible(False)
|
||
|
||
ax_b = ax.twinx()
|
||
ax_b.spines["left"].set_position(("axes", -0.08))
|
||
ax_b.spines["left"].set_visible(True)
|
||
ax_b.spines["right"].set_visible(False)
|
||
ax_b.yaxis.set_label_position("left")
|
||
ax_b.yaxis.tick_left()
|
||
for s in _prep(rolling_battles):
|
||
ax_b.plot([p[0] for p in s], [p[1] for p in s],
|
||
color=BATTLES_COLOR, linewidth=1.0, alpha=0.6, zorder=2)
|
||
ax_b.set_ylabel(t("imgAxisBattles"), color=BATTLES_COLOR, fontsize=12, labelpad=10)
|
||
ax_b.tick_params(axis="y", colors=BATTLES_COLOR)
|
||
ax_b.spines["left"].set_color(BATTLES_COLOR)
|
||
|
||
boundary_inside = [b for b in week_boundaries if season_start <= b <= season_end]
|
||
if boundary_inside:
|
||
ax.set_xticks(boundary_inside)
|
||
ax.set_xticklabels(
|
||
[datetime.fromtimestamp(b, tz=timezone.utc).strftime("%m-%d")
|
||
for b in boundary_inside],
|
||
rotation=0, fontsize=9
|
||
)
|
||
ax.grid(axis="y", linestyle=":", color=pal["grid"], alpha=0.4)
|
||
|
||
# BR schedule ticks above the chart — centered over each BR segment,
|
||
# with a faint dashed divider at each segment boundary so tiers read
|
||
# as distinct bands.
|
||
for entry in _load_br_schedule():
|
||
s = int(entry["start"])
|
||
e = int(entry["end"])
|
||
if e < season_start or s > season_end:
|
||
continue
|
||
s_c = max(s, season_start)
|
||
e_c = min(e, season_end)
|
||
mid = (s_c + e_c) / 2
|
||
if season_start < s < season_end:
|
||
ax.axvline(s, color=pal["muted"], alpha=0.45, linewidth=0.7,
|
||
linestyle=(0, (3, 3)), zorder=1)
|
||
ax.text(mid, 1.02, f"{float(entry['max_br']):.1f}",
|
||
color=pal["header_sub"], fontsize=10, fontweight="bold",
|
||
ha="center", va="bottom",
|
||
transform=ax.get_xaxis_transform(), clip_on=False)
|
||
|
||
# ── Supporting stats grid ────────────────────────────────────────
|
||
ax_stats = fig.add_subplot(gs[2])
|
||
ax_stats.set_axis_off()
|
||
|
||
def _fmt_opponent(op: Optional[Tuple[str, int, int]]) -> str:
|
||
if not op:
|
||
return "—"
|
||
name, total, wins = op
|
||
losses = total - wins
|
||
return (f"{name} ({total} {t('imgUnitMatches')}, "
|
||
f"{wins} {t('imgUnitWins')} / {losses} {t('imgUnitLosses')})")
|
||
|
||
def _fmt_mvp(m: Optional[Tuple[str, int, int, int]]) -> str:
|
||
if not m:
|
||
return "—"
|
||
name, kills, assists, captures = m
|
||
return (f"{name} — {_fmt_int(kills)} {t('imgUnitKills')}, "
|
||
f"{_fmt_int(assists)} {t('imgUnitAssists')}, "
|
||
f"{_fmt_int(captures)} {t('imgUnitCaptures')}")
|
||
|
||
# Short rows render in 2 columns; wide rows span full width.
|
||
short_rows: List[Tuple[str, str]] = [
|
||
(t("imgStatPeakRating"), f"{_fmt_int(rating.peak)} ({_fmt_date(rating.peak_ts)})"),
|
||
(t("imgStatRatingChange"), f"{'+' if (rating.change or 0) >= 0 else ''}{_fmt_int(rating.change)}"),
|
||
(t("imgStatTotalKills"), f"{_fmt_int(players.total_kills)} ({_fmt_int(players.ground_kills)} {t('imgGroundShort')} / {_fmt_int(players.air_kills)} {t('imgAirShort')})"),
|
||
(t("imgStatTotalDeaths"), _fmt_int(players.deaths)),
|
||
(t("imgStatAssistsCaptures"), f"{_fmt_int(players.assists)} / {_fmt_int(players.captures)}"),
|
||
(t("imgStatMostPlayedVehicle"), f"{players.top_vehicle[0]} ({players.top_vehicle[1]} {t('imgUnitGames')})" if players.top_vehicle else "—"),
|
||
(t("imgStatMostActive"), f"{players.most_active[0]} ({_fmt_int(players.most_active[1])} {t('imgUnitMatches')})" if players.most_active else "—"),
|
||
(t("imgStatLongestWinStreak"), _fmt_int(match.longest_win_streak)),
|
||
]
|
||
wide_rows: List[Tuple[str, str]] = [
|
||
(t("imgStatMVP"), _fmt_mvp(players.mvp)),
|
||
(t("imgStatMostCommonOpponent"), _fmt_opponent(match.top_opponent)),
|
||
]
|
||
|
||
cols = 2
|
||
short_per_col = (len(short_rows) + cols - 1) // cols # 4
|
||
total_bands = short_per_col + len(wide_rows) # 6
|
||
grid_top = 0.98
|
||
grid_bottom = 0.05
|
||
band_h = (grid_top - grid_bottom) / total_bands
|
||
|
||
# Short rows — 2-column layout in the top `short_per_col` bands
|
||
for i, (label, value) in enumerate(short_rows):
|
||
col, row_in_col = divmod(i, short_per_col)
|
||
x = 0.02 + col * 0.52
|
||
y = grid_top - band_h * (row_in_col + 0.5)
|
||
ax_stats.text(x, y, f"{label}:", fontsize=12, color=pal["muted"],
|
||
va="center", transform=ax_stats.transAxes)
|
||
ax_stats.text(x + 0.20, y, value, fontsize=12, color=pal["text"],
|
||
va="center", transform=ax_stats.transAxes)
|
||
|
||
# Wide rows — full-width, below the short rows
|
||
for i, (label, value) in enumerate(wide_rows):
|
||
y = grid_top - band_h * (short_per_col + i + 0.5)
|
||
ax_stats.text(0.02, y, f"{label}:", fontsize=12, color=pal["muted"],
|
||
va="center", transform=ax_stats.transAxes)
|
||
ax_stats.text(0.22, y, value, fontsize=12, color=pal["text"],
|
||
va="center", transform=ax_stats.transAxes)
|
||
|
||
# Horizontal dividers at band boundaries
|
||
for k in range(1, total_bands):
|
||
y = grid_top - band_h * k
|
||
if k < short_per_col:
|
||
# still in the 2-column zone — split divider around the vertical line
|
||
ax_stats.plot([0.01, 0.49], [y, y], color=pal["divider"],
|
||
alpha=0.85, linewidth=1.0,
|
||
transform=ax_stats.transAxes, clip_on=False)
|
||
ax_stats.plot([0.51, 0.99], [y, y], color=pal["divider"],
|
||
alpha=0.85, linewidth=1.0,
|
||
transform=ax_stats.transAxes, clip_on=False)
|
||
else:
|
||
# wide zone — single divider spans full width
|
||
ax_stats.plot([0.01, 0.99], [y, y], color=pal["divider"],
|
||
alpha=0.85, linewidth=1.0,
|
||
transform=ax_stats.transAxes, clip_on=False)
|
||
|
||
# Vertical divider between columns — only across the short-row zone
|
||
ax_stats.plot([0.50, 0.50],
|
||
[grid_top - band_h * short_per_col, grid_top],
|
||
color=pal["divider"], alpha=0.85, linewidth=1.0,
|
||
transform=ax_stats.transAxes, clip_on=False)
|
||
|
||
fig.text(0.5, 0.015,
|
||
f"SREBOT · {t('imgFooterGenerated')} {datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
|
||
ha="center", fontsize=8, color=pal["footer"])
|
||
|
||
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
|
||
tmp.parent.mkdir(parents=True, exist_ok=True)
|
||
try:
|
||
fig.savefig(tmp, dpi=120, facecolor=pal["bg"], bbox_inches=None, format="png")
|
||
os.replace(tmp, out_path)
|
||
finally:
|
||
plt.close(fig)
|
||
|
||
|
||
def render_squadron_placeholder(out_path: Path, short: str, season: str,
|
||
reason: Optional[str] = None,
|
||
theme: str = "light",
|
||
lang: str = DEFAULT_LANG) -> None:
|
||
pal = theme_palette(theme)
|
||
tr = load_translations(lang)
|
||
if reason is None:
|
||
reason = tr.get("imgPlaceholderNoData", "No data for {short} in {season}").format(
|
||
short=short, season=season
|
||
)
|
||
fig = plt.figure(figsize=(16, 9), dpi=120, facecolor=pal["bg"])
|
||
ax = fig.add_subplot(1, 1, 1)
|
||
ax.set_axis_off()
|
||
ax.text(0.5, 0.62, f"{short} · {season}",
|
||
fontsize=28, fontweight="bold", color=pal["text"], ha="center",
|
||
transform=ax.transAxes)
|
||
ax.text(0.5, 0.48, reason,
|
||
fontsize=16, color=pal["muted"], ha="center",
|
||
transform=ax.transAxes)
|
||
ax.text(0.5, 0.05, "SREBOT",
|
||
fontsize=10, color=pal["footer"], ha="center",
|
||
transform=ax.transAxes)
|
||
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
|
||
tmp.parent.mkdir(parents=True, exist_ok=True)
|
||
try:
|
||
fig.savefig(tmp, dpi=120, facecolor=pal["bg"], format="png")
|
||
os.replace(tmp, out_path)
|
||
finally:
|
||
plt.close(fig)
|
||
|
||
|
||
def _assign_squadron_colors(timeline: List[TimelineRange]) -> dict[str, str]:
|
||
"""Deterministic per-recap palette assignment in timeline order."""
|
||
colors: dict[str, str] = {}
|
||
slot = 0
|
||
for rng in timeline:
|
||
name = rng.squadron_name
|
||
if name == "":
|
||
colors.setdefault("", _NO_SQUADRON_COLOR)
|
||
continue
|
||
if name not in colors:
|
||
colors[name] = _SQUADRON_PALETTE[slot % len(_SQUADRON_PALETTE)]
|
||
slot += 1
|
||
colors.setdefault("", _NO_SQUADRON_COLOR)
|
||
return colors
|
||
|
||
|
||
def render_player_card(out_path: Path, nick: str, uid: str, season: str,
|
||
timeline: List[TimelineRange],
|
||
trail: List[RatingSegment],
|
||
derived: PlayerDerived,
|
||
rolling_wr: List[Tuple[int, float]],
|
||
rolling_kd: List[Tuple[int, float]],
|
||
rolling_battles: List[Tuple[int, int]],
|
||
most_common_opp: Optional[Tuple[str, int]],
|
||
frequent_teammate: Optional[Tuple[str, int]],
|
||
longest_session: Optional[Tuple[str, _date, int]],
|
||
most_active_day: Optional[Tuple[_date, int]],
|
||
season_start: int, season_end: int,
|
||
week_boundaries: List[int],
|
||
theme: str = "light",
|
||
lang: str = DEFAULT_LANG) -> None:
|
||
pal = theme_palette(theme)
|
||
tr = load_translations(lang)
|
||
def t(key: str) -> str:
|
||
return tr.get(key, key)
|
||
|
||
colors = _assign_squadron_colors(timeline)
|
||
unique_sq_keys = {rng.squadron_name for rng in timeline}
|
||
show_squadron_viz = len(unique_sq_keys) > 1
|
||
|
||
nick = _sanitize_render_text(nick)
|
||
|
||
fig = plt.figure(figsize=(16, 10), dpi=120, facecolor=pal["bg"])
|
||
gs = GridSpec(3, 1, height_ratios=[0.55, 2.4, 1.6], hspace=0.35,
|
||
left=0.13, right=0.87, top=0.93, bottom=0.06, figure=fig)
|
||
|
||
# ── Header / hero ───────────────────────────────────────────────
|
||
ax_head = fig.add_subplot(gs[0])
|
||
ax_head.set_axis_off()
|
||
ax_head.text(0.0, 0.75, f"{nick} · {t('imgUIDLabel')} {uid}",
|
||
fontsize=22, fontweight="bold", color=pal["text"],
|
||
transform=ax_head.transAxes)
|
||
ax_head.text(1.0, 0.75, f"{season} {t('imgRecapSuffix')}",
|
||
fontsize=18, fontweight="bold", color=pal["header_sub"],
|
||
ha="right", transform=ax_head.transAxes)
|
||
hero = [
|
||
(t("imgHeroBattles"), _fmt_int(derived.total)),
|
||
(t("imgHeroWinRate"), (f"{derived.wr_pct:.1f}%" if derived.wr_pct is not None else "—")),
|
||
(t("imgHeroKD"), _fmt_float(derived.kd)),
|
||
(t("imgHeroTotalKills"), _fmt_int(derived.total_kills)),
|
||
]
|
||
for i, (label, value) in enumerate(hero):
|
||
x = 0.04 + i * 0.24
|
||
ax_head.text(x, 0.15, value, fontsize=26, fontweight="bold",
|
||
color=pal["text"], va="center",
|
||
transform=ax_head.transAxes)
|
||
ax_head.text(x, -0.10, label, fontsize=11, color=pal["muted"],
|
||
va="center", transform=ax_head.transAxes)
|
||
|
||
for y in (0.55, -0.32):
|
||
ax_head.plot([0.01, 0.99], [y, y], color=pal["divider"],
|
||
alpha=0.85, linewidth=1.0,
|
||
transform=ax_head.transAxes, clip_on=False)
|
||
|
||
# ── Main chart with 4 axes ──────────────────────────────────────
|
||
ax = fig.add_subplot(gs[1])
|
||
ax.set_facecolor(pal["bg"])
|
||
ax.set_xlim(season_start, season_end)
|
||
|
||
if show_squadron_viz:
|
||
for rng in timeline:
|
||
c = colors.get(rng.squadron_name, _NO_SQUADRON_COLOR)
|
||
alpha = 0.12 if rng.squadron_name else 0.05
|
||
ax.axvspan(rng.first_ts, rng.last_ts, color=c, alpha=alpha, zorder=0)
|
||
|
||
for b in week_boundaries:
|
||
if season_start <= b <= season_end:
|
||
ax.axvline(b, color=pal["grid"], alpha=0.3, linewidth=1, zorder=1)
|
||
|
||
for seg in trail:
|
||
if not seg.points:
|
||
continue
|
||
smoothed = smooth_series(seg.points, window=12)
|
||
xs = [p[0] for p in smoothed]
|
||
ys = [p[1] for p in smoothed]
|
||
# Rating line is always teal — squadron shading + the Squadrons
|
||
# Represented chain carry the per-squadron color key.
|
||
ax.plot(xs, ys, color=RATING_COLOR, linewidth=2.2, zorder=3)
|
||
ax.set_ylabel(t("imgAxisRating"), color=RATING_COLOR, fontsize=12)
|
||
ax.tick_params(axis="y", colors=RATING_COLOR)
|
||
ax.spines["left"].set_color(RATING_COLOR)
|
||
|
||
def _prep(series: list) -> list:
|
||
# Clip to squadron timeline ranges so rolling curves match the
|
||
# shading, then daily-downsample + smooth inside each segment.
|
||
return [_smooth_rolling_curve(_downsample_daily(part), window=4)
|
||
for part in _clip_to_ranges(series, timeline)]
|
||
|
||
ax_wr = ax.twinx()
|
||
for s in _prep(rolling_wr):
|
||
ax_wr.plot([p[0] for p in s], [p[1] for p in s],
|
||
color=WR_COLOR, linewidth=1.3, alpha=0.8, zorder=2)
|
||
ax_wr.set_ylim(0, 100)
|
||
ax_wr.set_ylabel(t("imgAxisWinRate"), color=WR_COLOR, fontsize=12)
|
||
ax_wr.tick_params(axis="y", colors=WR_COLOR)
|
||
ax_wr.spines["right"].set_color(WR_COLOR)
|
||
ax_wr.spines["left"].set_visible(False) # don't paint over ax's teal left spine
|
||
|
||
ax_kd = ax.twinx()
|
||
ax_kd.spines["right"].set_position(("axes", 1.08))
|
||
for s in _prep(rolling_kd):
|
||
ax_kd.plot([p[0] for p in s], [p[1] for p in s],
|
||
color=KD_COLOR, linewidth=1.1, alpha=0.7, zorder=2)
|
||
ax_kd.set_ylabel(t("imgAxisKD"), color=KD_COLOR, fontsize=12, labelpad=10)
|
||
ax_kd.tick_params(axis="y", colors=KD_COLOR)
|
||
ax_kd.spines["right"].set_color(KD_COLOR)
|
||
ax_kd.spines["left"].set_visible(False)
|
||
|
||
ax_b = ax.twinx()
|
||
ax_b.spines["left"].set_position(("axes", -0.08))
|
||
ax_b.spines["left"].set_visible(True)
|
||
ax_b.spines["right"].set_visible(False)
|
||
ax_b.yaxis.set_label_position("left")
|
||
ax_b.yaxis.tick_left()
|
||
for s in _prep(rolling_battles):
|
||
ax_b.plot([p[0] for p in s], [p[1] for p in s],
|
||
color=BATTLES_COLOR, linewidth=1.0, alpha=0.6, zorder=2)
|
||
ax_b.set_ylabel(t("imgAxisBattles"), color=BATTLES_COLOR, fontsize=12, labelpad=10)
|
||
ax_b.tick_params(axis="y", colors=BATTLES_COLOR)
|
||
ax_b.spines["left"].set_color(BATTLES_COLOR)
|
||
|
||
boundary_inside = [b for b in week_boundaries if season_start <= b <= season_end]
|
||
if boundary_inside:
|
||
ax.set_xticks(boundary_inside)
|
||
ax.set_xticklabels(
|
||
[datetime.fromtimestamp(b, tz=timezone.utc).strftime("%m-%d")
|
||
for b in boundary_inside],
|
||
rotation=0, fontsize=9
|
||
)
|
||
ax.tick_params(axis="x", colors=pal["muted"])
|
||
ax.grid(axis="y", linestyle=":", color=pal["grid"], alpha=0.4)
|
||
|
||
# BR schedule ticks above the chart — centered over each BR segment,
|
||
# with a faint dashed divider at each segment boundary so tiers read
|
||
# as distinct bands.
|
||
for entry in _load_br_schedule():
|
||
s = int(entry["start"])
|
||
e = int(entry["end"])
|
||
if e < season_start or s > season_end:
|
||
continue
|
||
s_c = max(s, season_start)
|
||
e_c = min(e, season_end)
|
||
mid = (s_c + e_c) / 2
|
||
if season_start < s < season_end:
|
||
ax.axvline(s, color=pal["muted"], alpha=0.45, linewidth=0.7,
|
||
linestyle=(0, (3, 3)), zorder=1)
|
||
ax.text(mid, 1.02, f"{float(entry['max_br']):.1f}",
|
||
color=pal["header_sub"], fontsize=10, fontweight="bold",
|
||
ha="center", va="bottom",
|
||
transform=ax.get_xaxis_transform(), clip_on=False)
|
||
|
||
# ── Stats grid (bottom) ─────────────────────────────────────────
|
||
ax_stats = fig.add_subplot(gs[2])
|
||
ax_stats.set_axis_off()
|
||
|
||
kdac_value = (
|
||
f"{_fmt_int(derived.total_kills)} "
|
||
f"({_fmt_int(derived.ground_kills)} {t('imgGroundShort')} / "
|
||
f"{_fmt_int(derived.air_kills)} {t('imgAirShort')}) / "
|
||
f"{_fmt_int(derived.total_deaths)} / "
|
||
f"{_fmt_int(derived.assists)} / "
|
||
f"{_fmt_int(derived.captures)}"
|
||
)
|
||
|
||
short_rows: List[Tuple[str, str]] = [
|
||
(t("imgStatKDAC"), kdac_value),
|
||
(t("imgStatMostPlayedVehicle"),
|
||
f"{derived.top_vehicle[0]} ({derived.top_vehicle[1]} {t('imgUnitGames')})"
|
||
if derived.top_vehicle else "—"),
|
||
(t("imgStatLongestWinStreak"), _fmt_int(derived.longest_win_streak)),
|
||
(t("imgStatLongestSession"),
|
||
f"{longest_session[0]} · {longest_session[1].isoformat()} — "
|
||
f"{longest_session[2]} {t('imgUnitMatches')}"
|
||
if longest_session else "—"),
|
||
(t("imgStatMostActiveDay"),
|
||
f"{most_active_day[0].isoformat()} — {most_active_day[1]} {t('imgUnitMatches')}"
|
||
if most_active_day else "—"),
|
||
(t("imgStatMostCommonOpponent"),
|
||
f"{most_common_opp[0]} ({most_common_opp[1]} {t('imgUnitMatches')})"
|
||
if most_common_opp else "—"),
|
||
]
|
||
|
||
peak_str = (f"{_fmt_int(derived.peak_rating[0])} · {derived.peak_rating[1]} · "
|
||
f"{_fmt_date(derived.peak_rating[2])}") if derived.peak_rating else "—"
|
||
if derived.best_match:
|
||
bm = derived.best_match
|
||
best_match_str = t("imgBestMatchLine").format(
|
||
vehicle=bm.vehicle,
|
||
gk=bm.ground_kills,
|
||
ak=bm.air_kills,
|
||
assists=bm.assists,
|
||
cap=bm.captures,
|
||
deaths=bm.deaths,
|
||
date=_fmt_date(bm.endtime_unix),
|
||
)
|
||
else:
|
||
best_match_str = "—"
|
||
|
||
chain_parts: List[str] = []
|
||
for rng in timeline:
|
||
dot = "●" if rng.squadron_name else "○"
|
||
name = rng.squadron_name if rng.squadron_name else t("imgUnitNoSquadron")
|
||
chain_parts.append(f"{dot} {name} ({rng.match_count})")
|
||
chain_str = " → ".join(chain_parts) if chain_parts else "—"
|
||
|
||
teammate_str = (f"{frequent_teammate[0]} — {frequent_teammate[1]} "
|
||
f"{t('imgUnitTogether')}") if frequent_teammate else "—"
|
||
|
||
wide_rows: List[Tuple[str, str]] = [
|
||
(t("imgStatPeakSquadronRating"), peak_str),
|
||
(t("imgStatBestMatch"), best_match_str),
|
||
(t("imgStatSquadronsRepresented"), chain_str),
|
||
(t("imgStatFrequentTeammate"), teammate_str),
|
||
]
|
||
|
||
cols = 2
|
||
short_per_col = (len(short_rows) + cols - 1) // cols
|
||
total_bands = short_per_col + len(wide_rows)
|
||
grid_top = 0.98
|
||
band_h = (grid_top - 0.05) / total_bands
|
||
|
||
for i, (label, value) in enumerate(short_rows):
|
||
col, row_in_col = divmod(i, short_per_col)
|
||
x = 0.02 + col * 0.52
|
||
y = grid_top - band_h * (row_in_col + 0.5)
|
||
ax_stats.text(x, y, f"{label}:", fontsize=12, color=pal["muted"],
|
||
va="center", transform=ax_stats.transAxes)
|
||
ax_stats.text(x + 0.20, y, value, fontsize=12, color=pal["text"],
|
||
va="center", transform=ax_stats.transAxes)
|
||
|
||
squadrons_label = t("imgStatSquadronsRepresented")
|
||
for i, (label, value) in enumerate(wide_rows):
|
||
y = grid_top - band_h * (short_per_col + i + 0.5)
|
||
ax_stats.text(0.02, y, f"{label}:", fontsize=12, color=pal["muted"],
|
||
va="center", transform=ax_stats.transAxes)
|
||
if label == squadrons_label:
|
||
children: List = []
|
||
for j, rng in enumerate(timeline):
|
||
if j > 0:
|
||
children.append(TextArea(
|
||
" → ",
|
||
textprops=dict(color=pal["muted"], fontsize=12),
|
||
))
|
||
seg_color = colors.get(rng.squadron_name, _NO_SQUADRON_COLOR)
|
||
name = rng.squadron_name if rng.squadron_name else t("imgUnitNoSquadron")
|
||
dot_char = "●" if rng.squadron_name else "○"
|
||
children.append(TextArea(
|
||
f"{dot_char} {name} ({rng.match_count})",
|
||
textprops=dict(color=seg_color, fontsize=12),
|
||
))
|
||
if not children:
|
||
children.append(TextArea(
|
||
"—", textprops=dict(color=pal["text"], fontsize=12)))
|
||
packed = HPacker(children=children, align="center", pad=0, sep=0)
|
||
ax_stats.add_artist(AnchoredOffsetbox(
|
||
loc="center left", child=packed, pad=0, frameon=False,
|
||
bbox_to_anchor=(0.22, y), bbox_transform=ax_stats.transAxes,
|
||
))
|
||
else:
|
||
ax_stats.text(0.22, y, value, fontsize=12, color=pal["text"],
|
||
va="center", transform=ax_stats.transAxes)
|
||
|
||
for k in range(1, total_bands):
|
||
y = grid_top - band_h * k
|
||
if k < short_per_col:
|
||
ax_stats.plot([0.01, 0.49], [y, y], color=pal["divider"],
|
||
alpha=0.85, linewidth=1.0,
|
||
transform=ax_stats.transAxes, clip_on=False)
|
||
ax_stats.plot([0.51, 0.99], [y, y], color=pal["divider"],
|
||
alpha=0.85, linewidth=1.0,
|
||
transform=ax_stats.transAxes, clip_on=False)
|
||
else:
|
||
ax_stats.plot([0.01, 0.99], [y, y], color=pal["divider"],
|
||
alpha=0.85, linewidth=1.0,
|
||
transform=ax_stats.transAxes, clip_on=False)
|
||
ax_stats.plot([0.50, 0.50],
|
||
[grid_top - band_h * short_per_col, grid_top],
|
||
color=pal["divider"], alpha=0.85, linewidth=1.0,
|
||
transform=ax_stats.transAxes, clip_on=False)
|
||
|
||
fig.text(0.5, 0.015,
|
||
f"SREBOT · {t('imgFooterGenerated')} {datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
|
||
ha="center", fontsize=8, color=pal["footer"])
|
||
|
||
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
|
||
tmp.parent.mkdir(parents=True, exist_ok=True)
|
||
try:
|
||
fig.savefig(tmp, dpi=120, facecolor=pal["bg"], bbox_inches=None, format="png")
|
||
os.replace(tmp, out_path)
|
||
finally:
|
||
plt.close(fig)
|
||
|
||
|
||
def render_player_placeholder(out_path: Path, nick: str, season: str,
|
||
reason: Optional[str] = None,
|
||
theme: str = "light",
|
||
lang: str = DEFAULT_LANG) -> None:
|
||
pal = theme_palette(theme)
|
||
tr = load_translations(lang)
|
||
if reason is None:
|
||
reason = tr.get("imgPlaceholderNoDataPlayer",
|
||
"No data for {nick} in {season}").format(nick=nick, season=season)
|
||
fig = plt.figure(figsize=(16, 9), dpi=120, facecolor=pal["bg"])
|
||
ax = fig.add_subplot(1, 1, 1)
|
||
ax.set_axis_off()
|
||
ax.text(0.5, 0.62, f"{nick} · {season}",
|
||
fontsize=28, fontweight="bold", color=pal["text"], ha="center",
|
||
transform=ax.transAxes)
|
||
ax.text(0.5, 0.48, reason,
|
||
fontsize=16, color=pal["muted"], ha="center",
|
||
transform=ax.transAxes)
|
||
ax.text(0.5, 0.05, "SREBOT",
|
||
fontsize=10, color=pal["footer"], ha="center",
|
||
transform=ax.transAxes)
|
||
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
|
||
tmp.parent.mkdir(parents=True, exist_ok=True)
|
||
try:
|
||
fig.savefig(tmp, dpi=120, facecolor=pal["bg"], format="png")
|
||
os.replace(tmp, out_path)
|
||
finally:
|
||
plt.close(fig)
|
||
|
||
|
||
def run_squadron(args: Args) -> int:
|
||
assert args.clan_id is not None
|
||
with _open_ro(SQUADRONS_DB) as conn_sq:
|
||
ident = resolve_squadron(conn_sq, args.clan_id)
|
||
if ident is None:
|
||
logging.error(f"unknown clan_id={args.clan_id}")
|
||
return 2
|
||
logging.info(f"resolved {ident.short_name} / {ident.long_name}")
|
||
rating = gather_squadron_rating(conn_sq, args.clan_id, args.season_start, args.season_end)
|
||
logging.info(f"rating: series={len(rating.series)} final={rating.final} peak={rating.peak} change={rating.change}")
|
||
|
||
with _open_ro(SQ_BATTLES_DB) as conn_b:
|
||
stream = gather_squadron_match_stream(conn_b, ident.short_name, args.season_start, args.season_end)
|
||
players = gather_squadron_player_aggregates(conn_b, ident.short_name, args.season_start, args.season_end)
|
||
squad_stream = gather_squadron_per_match_stream(
|
||
conn_b, ident.short_name, args.season_start, args.season_end
|
||
)
|
||
match = compute_squadron_match_derived(stream, ident.short_name)
|
||
logging.info(f"matches: total={match.total} wins={match.wins} wr={match.wr_pct} streak={match.longest_win_streak} top_opp={match.top_opponent}")
|
||
rolling_wr = compute_rolling_wr(squad_stream)
|
||
rolling_kd = compute_rolling_kd(squad_stream)
|
||
rolling_battles = compute_rolling_battles(squad_stream)
|
||
logging.info(f"rolling: wr_pts={len(rolling_wr)} kd_pts={len(rolling_kd)} battles_pts={len(rolling_battles)}")
|
||
|
||
if rating.final is None and match.total == 0:
|
||
logging.info("no data for squadron in season; writing placeholder")
|
||
render_squadron_placeholder(args.out, ident.short_name, args.season, theme=args.theme, lang=args.lang)
|
||
return 0
|
||
|
||
logging.info(f"players: kills={players.total_kills} deaths={players.deaths} kd={players.kd} top_veh={players.top_vehicle} active={players.most_active}")
|
||
if players.mvp:
|
||
_n, _k, _a, _c = players.mvp
|
||
logging.info(f"mvp: {_n} K={_k} A={_a} C={_c}")
|
||
|
||
render_squadron_card(
|
||
args.out, ident, args.season,
|
||
rating, match, players,
|
||
rolling_wr, rolling_kd, rolling_battles,
|
||
args.season_start, args.season_end,
|
||
args.week_boundaries,
|
||
theme=args.theme,
|
||
lang=args.lang,
|
||
)
|
||
logging.info(f"wrote {args.out}")
|
||
return 0
|
||
|
||
|
||
def _resolve_latest_player_nick(conn_b: sqlite3.Connection, uid: str) -> str:
|
||
cur = conn_b.execute(
|
||
"SELECT nick FROM player_games_hist "
|
||
"WHERE UID = ? AND nick NOT LIKE 'coop/%' "
|
||
"ORDER BY session_id DESC LIMIT 1",
|
||
(uid,),
|
||
)
|
||
row = cur.fetchone()
|
||
return row[0] if row else uid
|
||
|
||
|
||
def run_player(args: Args) -> int:
|
||
assert args.uid is not None
|
||
with _open_ro(SQ_BATTLES_DB) as conn_b, _open_ro(SQUADRONS_DB) as conn_sq:
|
||
nick = _resolve_latest_player_nick(conn_b, args.uid)
|
||
timeline = gather_player_squadron_timeline(
|
||
conn_b, args.uid, args.season_start, args.season_end
|
||
)
|
||
stream = gather_player_game_stream(
|
||
conn_b, args.uid, args.season_start, args.season_end
|
||
)
|
||
trail = gather_player_rating_trail(conn_sq, timeline, args.uid)
|
||
opp = gather_player_most_common_opponent(conn_b, stream)
|
||
teammate = gather_player_frequent_teammate(
|
||
conn_b, args.uid, args.season_start, args.season_end
|
||
)
|
||
|
||
# Membership check: the final timeline range was extended to season_end
|
||
# so we could scan past the player's last game. Clamp it back to the last
|
||
# snapshot where the player actually appeared in the squadron's roster
|
||
# (the last rating-trail point), so shading doesn't extend past when they
|
||
# left the squadron.
|
||
if timeline:
|
||
final_idx = len(timeline) - 1
|
||
final_seg = trail[final_idx] if final_idx < len(trail) else None
|
||
observed_ts = final_seg.points[-1][0] if final_seg and final_seg.points else None
|
||
last_rng = timeline[final_idx]
|
||
clamp_to = observed_ts if observed_ts is not None else min(
|
||
last_rng.last_ts, args.season_end
|
||
)
|
||
if clamp_to < last_rng.last_ts:
|
||
timeline[final_idx] = TimelineRange(
|
||
squadron_name=last_rng.squadron_name,
|
||
first_ts=last_rng.first_ts,
|
||
last_ts=clamp_to,
|
||
match_count=last_rng.match_count,
|
||
)
|
||
|
||
if not stream and all(not seg.points for seg in trail):
|
||
logging.info("no data for player in season; writing placeholder")
|
||
render_player_placeholder(args.out, nick, args.season,
|
||
theme=args.theme, lang=args.lang)
|
||
return 0
|
||
|
||
derived = compute_player_derived(stream, timeline, trail)
|
||
rolling_wr = compute_rolling_wr(stream)
|
||
rolling_kd = compute_rolling_kd(stream)
|
||
rolling_battles = compute_rolling_battles(stream)
|
||
longest = compute_longest_timeslot_session(stream)
|
||
active_day = compute_most_active_day_utc(stream)
|
||
|
||
logging.info(
|
||
f"player nick={nick} games={len(stream)} timeline_ranges={len(timeline)} "
|
||
f"rating_segments={sum(1 for s in trail if s.points)} "
|
||
f"wr={derived.wr_pct} kd={derived.kd} streak={derived.longest_win_streak}"
|
||
)
|
||
|
||
render_player_card(
|
||
args.out, nick, args.uid, args.season,
|
||
timeline, trail, derived,
|
||
rolling_wr, rolling_kd, rolling_battles,
|
||
opp, teammate, longest, active_day,
|
||
args.season_start, args.season_end,
|
||
args.week_boundaries,
|
||
theme=args.theme, lang=args.lang,
|
||
)
|
||
logging.info(f"wrote {args.out}")
|
||
return 0
|
||
|
||
|
||
def main(argv: Optional[List[str]] = None) -> int:
|
||
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s", stream=sys.stderr)
|
||
args = parse_args(argv)
|
||
logging.info(f"render start mode={args.mode} season={args.season}")
|
||
if args.mode == "squadron":
|
||
return run_squadron(args)
|
||
return run_player(args)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|