-am (#1329)
This commit is contained in:
+142
-27
@@ -1,30 +1,36 @@
|
||||
"""TSSBOT autolog matcher.
|
||||
"""TSSBOT autolog matcher + scoreboard dispatch.
|
||||
|
||||
For each game received from the Spectra TSS feed, match it against every
|
||||
subscribing guild's ``tss-team`` preference entries and work out which channels
|
||||
should be notified, deduping per session.
|
||||
subscribing guild's ``tss-team`` / ``tss-player`` preference entries, render the
|
||||
scoreboard once per session/color/language, and post it to each subscribed channel.
|
||||
|
||||
NOTE: building and sending the scoreboard embed is intentionally a TODO for now
|
||||
(it needs significant rework). The matching + dedup pipeline below is complete
|
||||
and logs the targets it *would* post to. Wire the send in at the marked spot.
|
||||
Matching is per-entity (a team by id/name, a player by uid); sending is deduped per
|
||||
channel per session so a channel that subscribes both a team and one of its players
|
||||
only receives one scoreboard for a given game.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
import discord
|
||||
|
||||
from . import preferences
|
||||
from . import preferences, scoreboard, transform
|
||||
from .storage import STORAGE_DIR
|
||||
|
||||
log = logging.getLogger("tssbot.autolog")
|
||||
|
||||
REPLAYS_TSS_DIR: Path = STORAGE_DIR / "REPLAYS" / "TSS"
|
||||
|
||||
# Registered by start_bot once the client exists; standalone tss_ws leaves it None.
|
||||
_bot: Optional[discord.Client] = None
|
||||
|
||||
# session_id -> set of channel_ids already handled (in-memory idempotency,
|
||||
# mirrors SREBOT's _sent_channels_by_session).
|
||||
# session_id -> set of channel_ids already handled (in-memory idempotency).
|
||||
_sent_channels_by_session: dict[str, set[int]] = {}
|
||||
# session_id -> lock guarding the one-time PNG render.
|
||||
_render_locks: dict[str, asyncio.Lock] = {}
|
||||
|
||||
|
||||
def set_bot(bot: discord.Client) -> None:
|
||||
@@ -33,8 +39,12 @@ def set_bot(bot: discord.Client) -> None:
|
||||
_bot = bot
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Present-entity extraction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _present_teams(game: dict[str, Any]) -> tuple[set[str], set[str]]:
|
||||
"""Return stable TSS team IDs and names embedded in a replay."""
|
||||
"""Return stable TSS team IDs and lowercased names embedded in a replay."""
|
||||
tss = game.get("tss") or {}
|
||||
team_ids: set[str] = set()
|
||||
team_names: set[str] = set()
|
||||
@@ -50,41 +60,146 @@ def _present_teams(game: dict[str, Any]) -> tuple[set[str], set[str]]:
|
||||
return team_ids, team_names
|
||||
|
||||
|
||||
def _present_players(game: dict[str, Any]) -> set[str]:
|
||||
"""Return the uids of every player in a replay (players dict + tss rosters)."""
|
||||
uids: set[str] = {str(u) for u in (game.get("players") or {}).keys()}
|
||||
tss = game.get("tss") or {}
|
||||
for slot in ("1", "2"):
|
||||
team = tss.get(slot)
|
||||
if isinstance(team, dict):
|
||||
for entry in team.get("players") or []:
|
||||
if entry.get("uid") is not None:
|
||||
uids.add(str(entry["uid"]))
|
||||
return uids
|
||||
|
||||
|
||||
def _bar_color(game: dict[str, Any], guild_id: int) -> str:
|
||||
"""Header/separator tint from the guild's own team vs the result."""
|
||||
if game.get("draw"):
|
||||
return "draw"
|
||||
guild_team = preferences.get_guild_team(guild_id)
|
||||
if not guild_team or guild_team.get("team_id") is None:
|
||||
return "not_set"
|
||||
my_id = str(guild_team["team_id"])
|
||||
tss = game.get("tss") or {}
|
||||
slot_ids = {s: str((tss.get(s) or {}).get("team_id")) for s in ("1", "2")}
|
||||
winner = str(game.get("winner") or "")
|
||||
if slot_ids.get(winner) == my_id:
|
||||
return "win"
|
||||
if my_id in slot_ids.values():
|
||||
return "loss"
|
||||
return "not_involved"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Scoreboard view + render/send
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def build_tss_scoreboard_view(session_id: str) -> discord.ui.View:
|
||||
"""Link buttons under a scoreboard: in-game replay + the TSS website."""
|
||||
view = discord.ui.View(timeout=None)
|
||||
try:
|
||||
replay_url = f"https://warthunder.com/en/tournament/replay/{int(session_id, 16)}"
|
||||
view.add_item(discord.ui.Button(label="View Replay", style=discord.ButtonStyle.link, url=replay_url))
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
view.add_item(discord.ui.Button(
|
||||
label="View on Website", style=discord.ButtonStyle.link,
|
||||
url=f"https://tss.pawjob.us/games/{session_id}", emoji="🌐",
|
||||
))
|
||||
return view
|
||||
|
||||
|
||||
async def _render_scoreboard(game: dict[str, Any], session_id: str, bar_color: str, lang_column: str) -> Optional[Path]:
|
||||
"""Render (once, cached on disk) the scoreboard for a session/color/language."""
|
||||
model = transform.build_scoreboard_model(game, lang_column)
|
||||
if not model:
|
||||
log.warning("[TSS-AUTOLOG] could not build model for %s", session_id)
|
||||
return None
|
||||
lang_tag = lang_column.strip("<>").lower() or "english"
|
||||
out_dir = REPLAYS_TSS_DIR / session_id
|
||||
out_path = out_dir / f"scoreboard-{bar_color}-{lang_tag}.png"
|
||||
lock = _render_locks.setdefault(session_id, asyncio.Lock())
|
||||
async with lock:
|
||||
if not out_path.exists():
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
await scoreboard.create_scoreboard(model, str(out_path), bar_color=bar_color)
|
||||
return out_path if out_path.exists() else None
|
||||
|
||||
|
||||
async def _send_scoreboard(game: dict[str, Any], guild_id: int, channel_id: int, session_id: str, sent: set[int]) -> None:
|
||||
"""Render and post the scoreboard for one subscribed channel (deduped)."""
|
||||
bot = _bot
|
||||
if bot is None or channel_id in sent:
|
||||
return
|
||||
sent.add(channel_id) # claim the channel before any await so concurrent matches can't double-send
|
||||
|
||||
bar_color = _bar_color(game, guild_id)
|
||||
lang_column = preferences.guild_language_column(guild_id)
|
||||
out_path = await _render_scoreboard(game, session_id, bar_color, lang_column)
|
||||
if out_path is None:
|
||||
return
|
||||
|
||||
channel = bot.get_channel(channel_id)
|
||||
if channel is None:
|
||||
try:
|
||||
channel = await bot.fetch_channel(channel_id)
|
||||
except (discord.NotFound, discord.Forbidden) as e:
|
||||
log.warning("[TSS-AUTOLOG] channel %s unavailable (%s)", channel_id, e)
|
||||
return
|
||||
if not isinstance(channel, discord.abc.Messageable):
|
||||
return
|
||||
|
||||
try:
|
||||
with open(out_path, "rb") as f:
|
||||
await channel.send(
|
||||
file=discord.File(f, filename="scoreboard.png"),
|
||||
view=build_tss_scoreboard_view(session_id),
|
||||
)
|
||||
log.info("[TSS-AUTOLOG] sent %s -> guild=%s channel=%s", session_id, guild_id, channel_id)
|
||||
except discord.HTTPException as e:
|
||||
log.error("[TSS-AUTOLOG] send failed %s -> %s: %s", session_id, channel_id, e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def process_game(game: dict[str, Any]) -> None:
|
||||
"""Match one received game against guild preferences and notify channels.
|
||||
"""Match one received game against guild prefs and post scoreboards.
|
||||
|
||||
Safe to call from the standalone WS listener (no-ops if no bot registered).
|
||||
"""
|
||||
if _bot is None:
|
||||
return
|
||||
|
||||
session_id = str(game.get("_id") or "")
|
||||
if not session_id:
|
||||
return
|
||||
|
||||
present_team_ids, present_team_names = _present_teams(game)
|
||||
|
||||
present_uids = _present_players(game)
|
||||
sent = _sent_channels_by_session.setdefault(session_id, set())
|
||||
|
||||
for guild_id, prefs in preferences.iter_guild_preferences():
|
||||
for entity_id, entry in prefs.items():
|
||||
if not isinstance(entry, dict) or entry.get("Type") != "tss-team":
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
type_ = entry.get("Type")
|
||||
if type_ not in ("tss-team", "tss-player"):
|
||||
continue
|
||||
channel_id, enabled = preferences.parse_channel(entry.get("Logs"))
|
||||
if not channel_id or not enabled:
|
||||
if not channel_id or not enabled or channel_id in sent:
|
||||
continue
|
||||
|
||||
name = (entry.get("Name") or "").lower()
|
||||
matched = str(entity_id) in present_team_ids or (name and name in present_team_names)
|
||||
|
||||
if not matched or channel_id in sent:
|
||||
if type_ == "tss-team":
|
||||
name = (entry.get("Name") or "").lower()
|
||||
matched = str(entity_id) in present_team_ids or (bool(name) and name in present_team_names)
|
||||
else: # tss-player
|
||||
matched = str(entity_id) in present_uids
|
||||
if not matched:
|
||||
continue
|
||||
sent.add(channel_id)
|
||||
|
||||
# TODO: build & send the scoreboard embed to `channel_id`.
|
||||
# The matched target is fully resolved here; the only thing missing
|
||||
# is the rendered scoreboard (out of scope for now).
|
||||
log.info(
|
||||
"autolog match (send TODO): session=%s guild=%s channel=%s team=%s",
|
||||
session_id, guild_id, channel_id, entity_id,
|
||||
)
|
||||
try:
|
||||
await _send_scoreboard(game, guild_id, channel_id, session_id, sent)
|
||||
except Exception as e: # never let one channel break the rest
|
||||
log.error("[TSS-AUTOLOG] dispatch error guild=%s channel=%s: %s", guild_id, channel_id, e)
|
||||
|
||||
+25
-1
@@ -150,6 +150,29 @@ class TssCommands(commands.Cog):
|
||||
f"✅ Logging **{name}**'s matches to <#{interaction.channel_id}>.", ephemeral=True
|
||||
)
|
||||
|
||||
# ── /log-player ────────────────────────────────────────────────────────
|
||||
@app_commands.command(name="log-player", description="Send a player's matches to this channel")
|
||||
@app_commands.describe(player="Player username")
|
||||
@app_commands.autocomplete(player=player_autocomplete)
|
||||
@app_commands.checks.has_permissions(manage_guild=True)
|
||||
@not_blacklisted()
|
||||
async def log_player(self, interaction: discord.Interaction, player: str):
|
||||
await interaction.response.defer(ephemeral=True)
|
||||
if interaction.guild_id is None or interaction.channel_id is None:
|
||||
return await interaction.followup.send("This command must be used in a server channel.", ephemeral=True)
|
||||
candidates = await storage.resolve_players(player)
|
||||
if not candidates:
|
||||
return await interaction.followup.send(f"No players found matching **{player}**.", ephemeral=True)
|
||||
if len(candidates) > 1:
|
||||
return await interaction.followup.send(_too_many_msg(candidates), ephemeral=True)
|
||||
uid, nick = str(candidates[0]["uid"]), candidates[0]["nick"]
|
||||
preferences.upsert_log_entry(
|
||||
interaction.guild_id, uid, "tss-player", nick, interaction.channel_id
|
||||
)
|
||||
await interaction.followup.send(
|
||||
f"✅ Logging **{nick}**'s matches to <#{interaction.channel_id}>.", ephemeral=True
|
||||
)
|
||||
|
||||
# ── /set-player ────────────────────────────────────────────────────────
|
||||
@app_commands.command(name="set-player", description="Link your Discord account to a War Thunder player")
|
||||
@app_commands.describe(player="Player username")
|
||||
@@ -290,7 +313,8 @@ class TssCommands(commands.Cog):
|
||||
name="Autologging — Manage Server",
|
||||
value=(
|
||||
"`/set-team <team>` — set this server's team\n"
|
||||
"`/log-team <team>` — post a team's matches to this channel"
|
||||
"`/log-team <team>` — post a team's matches to this channel\n"
|
||||
"`/log-player <player>` — post a player's matches to this channel"
|
||||
),
|
||||
inline=False,
|
||||
)
|
||||
|
||||
@@ -39,6 +39,7 @@ log = logging.getLogger("tssbot.preferences")
|
||||
|
||||
PREFERENCES_DIR: Path = STORAGE_DIR / "PREFERENCES"
|
||||
TEAMS_JSON_PATH: Path = STORAGE_DIR / "TEAMS.json"
|
||||
FEATURES_DIR: Path = STORAGE_DIR / "FEATURES"
|
||||
|
||||
_CHANNEL_ID_RE = re.compile(r"(\d{17,20})")
|
||||
|
||||
@@ -182,3 +183,27 @@ def set_guild_team(guild_id: int | str, team_id: int, name: str) -> bool:
|
||||
teams = load_teams()
|
||||
teams[str(guild_id)] = {"TM_Name": name, "team_id": int(team_id)}
|
||||
return _write_json(TEAMS_JSON_PATH, teams)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FEATURES.json — per-guild feature flags (shared with SREBOT; we use Language)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _guild_features_path(guild_id: int | str) -> Path:
|
||||
return FEATURES_DIR / f"{guild_id}-features.json"
|
||||
|
||||
|
||||
def load_features(guild_id: int | str) -> dict[str, Any]:
|
||||
"""Load a guild's feature flags (shared SREBOT/TSSBOT file)."""
|
||||
data = _read_json(_guild_features_path(guild_id), {})
|
||||
return data if isinstance(data, dict) else {}
|
||||
|
||||
|
||||
def save_features(guild_id: int | str, features: dict[str, Any]) -> bool:
|
||||
"""Persist a guild's feature flags."""
|
||||
return _write_json(_guild_features_path(guild_id), features)
|
||||
|
||||
|
||||
def guild_language_column(guild_id: int | str) -> str:
|
||||
"""Return the guild's LangTableReader column, e.g. ``<English>`` (the default)."""
|
||||
return load_features(guild_id).get("Language") or "<English>"
|
||||
|
||||
@@ -0,0 +1,381 @@
|
||||
"""TSS scoreboard renderer.
|
||||
|
||||
Generates the match-result image posted by autologging. Mirrors SREBOT's visual
|
||||
language (blurred map background, vignette, two team columns, stat icons) but is
|
||||
TSS-native: team names instead of squadron tags, the full per-player vehicle lineup
|
||||
(unused dimmed) instead of a single vehicle, and per-player pvp_ratio instead of
|
||||
squadron points.
|
||||
|
||||
Input is the flat model from ``transform.build_scoreboard_model`` — this module never
|
||||
touches the raw feed.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from PIL.Image import Resampling
|
||||
|
||||
from . import SHARED_DIR
|
||||
from .wl import get_tss_standings
|
||||
|
||||
log = logging.getLogger("tssbot.scoreboard")
|
||||
|
||||
MAPS_DIR = SHARED_DIR / "MAPS"
|
||||
ICON_BASE_DIR = SHARED_DIR / "ICONS"
|
||||
VEHICLE_ICON_DIR = ICON_BASE_DIR / "VEHICLES"
|
||||
TEXT_FONT_PATH = SHARED_DIR / "FONTS" / "arial_unicode_ms.otf"
|
||||
|
||||
# Colors
|
||||
WIN_FILL = (60, 255, 60, 255)
|
||||
LOSS_FILL = (255, 60, 60, 255)
|
||||
DRAW_FILL = (255, 255, 0, 255)
|
||||
NEUTRAL_FILL = (255, 255, 255, 255)
|
||||
GREY_FILL = (200, 200, 200, 255)
|
||||
NAME_FILL = (250, 227, 200, 255)
|
||||
DIM_FILL = (150, 150, 150, 255)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Asset caching (mirrors SREBOT)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_FONTS: dict[str, ImageFont.FreeTypeFont] = {}
|
||||
|
||||
|
||||
def load_fonts(base_width: int) -> dict[str, ImageFont.FreeTypeFont]:
|
||||
global _FONTS
|
||||
if _FONTS:
|
||||
return _FONTS
|
||||
_FONTS = {
|
||||
"title": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.04)),
|
||||
"team": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.03)),
|
||||
"sub": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.019)),
|
||||
"body": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.020)),
|
||||
"stat": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.022)),
|
||||
"info": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.016)),
|
||||
"small": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.013)),
|
||||
"veh": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.016)),
|
||||
}
|
||||
return _FONTS
|
||||
|
||||
|
||||
@lru_cache(maxsize=256)
|
||||
def _load_icon_cached(path_str: str, size: tuple, alpha: int):
|
||||
img = Image.open(path_str).convert("RGBA")
|
||||
if size:
|
||||
img = img.resize(size, Resampling.LANCZOS)
|
||||
if alpha < 255:
|
||||
a = img.getchannel("A").point(lambda v: int(v * alpha / 255))
|
||||
img.putalpha(a)
|
||||
return img
|
||||
|
||||
|
||||
def load_icon(path: Path, size: Optional[tuple] = None, alpha: int = 255) -> Image.Image:
|
||||
return _load_icon_cached(str(path), tuple(size) if size else None, alpha)
|
||||
|
||||
|
||||
@lru_cache(maxsize=24)
|
||||
def _load_map_cached(path_str: str, blur_radius: int):
|
||||
from PIL import ImageFilter
|
||||
img = Image.open(path_str).convert("RGBA")
|
||||
return img.filter(ImageFilter.GaussianBlur(blur_radius))
|
||||
|
||||
|
||||
def _make_vignette(width: int, height: int, base_alpha=140, max_alpha=175, power=4) -> Image.Image:
|
||||
ys = np.linspace(-1, 1, height)[:, None]
|
||||
xs = np.linspace(-1, 1, width)[None, :]
|
||||
dist = np.sqrt(xs ** 2 + ys ** 2) / np.sqrt(2)
|
||||
band = base_alpha + (max_alpha - base_alpha) * (dist ** power)
|
||||
return Image.fromarray(np.clip(band, 0, 255).astype(np.uint8), mode="L")
|
||||
|
||||
|
||||
def _resolve_map_image(map_name: str) -> Optional[str]:
|
||||
"""Match a TSS mission_name against SHARED/MAPS (spaces→_, .jpg), like SRE."""
|
||||
clean = re.sub(r"^\s*\[[^]]+\]\s*", "", map_name).replace(" ", "_")
|
||||
target = f"{clean}.jpg"
|
||||
try:
|
||||
candidates = {f.name for f in MAPS_DIR.iterdir() if f.is_file()}
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
match = next((fn for fn in candidates if fn.lower() == target.lower()), None)
|
||||
return str(MAPS_DIR / match) if match else None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Render
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _bar_fill(bar_color: str) -> tuple:
|
||||
return {"win": WIN_FILL, "loss": LOSS_FILL, "draw": DRAW_FILL}.get(bar_color, NEUTRAL_FILL)
|
||||
|
||||
|
||||
def _strip_platform(name: str) -> str:
|
||||
return name.replace("@live", "").replace("@psn", "").strip()
|
||||
|
||||
|
||||
def _create_scoreboard_sync(model: dict[str, Any], output_path: str, bar_color: str) -> None:
|
||||
map_name = model.get("map", "")
|
||||
map_image_path = _resolve_map_image(map_name)
|
||||
|
||||
if map_image_path:
|
||||
background = _load_map_cached(map_image_path, 2).copy()
|
||||
else:
|
||||
log.warning("[TSS-SB] No map image for %r; using flat background", map_name)
|
||||
background = Image.new("RGBA", (1920, 1080), (25, 28, 32, 255))
|
||||
|
||||
bg_w, bg_h = background.size
|
||||
|
||||
band = _make_vignette(bg_w, bg_h)
|
||||
overlay = Image.new("RGBA", (bg_w, bg_h), (0, 0, 0, 0))
|
||||
black = Image.new("RGBA", (bg_w, bg_h), (0, 0, 0, 255))
|
||||
overlay = Image.composite(black, overlay, band)
|
||||
draw = ImageDraw.Draw(overlay)
|
||||
|
||||
fonts = load_fonts(bg_w)
|
||||
BODY = int(bg_w * 0.020)
|
||||
STAT = int(bg_w * 0.022)
|
||||
|
||||
# ── Top-right metadata: timestamp + session id
|
||||
padding = 15
|
||||
ts_epoch = int(model.get("utc_timestamp") or 0)
|
||||
ts_text = datetime.fromtimestamp(ts_epoch, tz=timezone.utc).strftime("%H:%M:%S - %Y-%m-%d UTC")
|
||||
sid_text = model.get("session_id", "")
|
||||
for i, txt in enumerate((ts_text, sid_text)):
|
||||
bbox = draw.textbbox((0, 0), txt, font=fonts["info"])
|
||||
x = bg_w - (bbox[2] - bbox[0]) - padding - 10
|
||||
y = padding + 10 + i * ((bbox[3] - bbox[1]) + 20)
|
||||
draw.text((x, y), txt, font=fonts["info"], fill=GREY_FILL)
|
||||
|
||||
# ── Top-center titles: map name, then tournament · mode
|
||||
y = 50
|
||||
title = map_name or "Unknown Map"
|
||||
tb = draw.textbbox((0, 0), title, font=fonts["title"])
|
||||
draw.text(((bg_w - (tb[2] - tb[0])) // 2, y), title, font=fonts["title"], fill=NEUTRAL_FILL)
|
||||
# Gap off the title's full glyph box (not just measured height) so the subtitle
|
||||
# clears descenders instead of clipping into the map name.
|
||||
y += int(bg_w * 0.04) + 28
|
||||
|
||||
sub_parts = [p for p in (model.get("tournament_name"), model.get("mission_mode")) if p]
|
||||
if sub_parts:
|
||||
sub = " · ".join(sub_parts)
|
||||
sb = draw.textbbox((0, 0), sub, font=fonts["sub"])
|
||||
draw.text(((bg_w - (sb[2] - sb[0])) // 2, y), sub, font=fonts["sub"], fill=GREY_FILL)
|
||||
y += (sb[3] - sb[1]) + 10
|
||||
|
||||
y_start = y + 150
|
||||
|
||||
# ── Layout: two team columns + separator
|
||||
x_start = 55
|
||||
col_width = (bg_w - x_start * 2) // 2
|
||||
|
||||
sep_x = x_start + col_width
|
||||
draw.line([(sep_x, y_start - 60), (sep_x, bg_h - 50)], fill=_bar_fill(bar_color), width=5)
|
||||
|
||||
teams = model.get("teams", [])
|
||||
is_draw = model.get("is_draw", False)
|
||||
winner = model.get("winner")
|
||||
|
||||
def draw_team(idx: int, team: dict, start_x: int, section_width: int):
|
||||
flipped = idx == 1 # second team mirrored to the right
|
||||
players = sorted(team.get("players", []), key=lambda p: int(p.get("score") or 0), reverse=True)
|
||||
|
||||
team_name = team.get("team_name", "Team")
|
||||
if is_draw:
|
||||
header_fill = DRAW_FILL
|
||||
elif winner is not None and team_name == winner:
|
||||
header_fill = WIN_FILL
|
||||
else:
|
||||
header_fill = LOSS_FILL
|
||||
|
||||
header_y = y_start - 90
|
||||
nb = draw.textbbox((0, 0), team_name, font=fonts["team"])
|
||||
name_w = nb[2] - nb[0]
|
||||
name_x = start_x if not flipped else start_x + section_width - name_w
|
||||
draw.text((name_x, header_y), team_name, font=fonts["team"], fill=header_fill)
|
||||
|
||||
# W/L slot — only drawn once real per-tournament standings exist (stub returns
|
||||
# None today, so nothing renders under the team name).
|
||||
standings = get_tss_standings(team.get("team_id"))
|
||||
if standings:
|
||||
wl_text = f"{standings['wins']}W - {standings['losses']}L"
|
||||
wb = draw.textbbox((0, 0), wl_text, font=fonts["sub"])
|
||||
wl_x = start_x if not flipped else start_x + section_width - (wb[2] - wb[0])
|
||||
draw.text((wl_x, header_y + (nb[3] - nb[1]) + 18), wl_text, font=fonts["sub"], fill=GREY_FILL)
|
||||
|
||||
# Stat columns (rating now lives inline next to the player name).
|
||||
col_labels = ["Air", "Ground", "Assists", "Deaths", "Caps"]
|
||||
icon_map = {
|
||||
"Air": ICON_BASE_DIR / "fighter_icon.png",
|
||||
"Ground": ICON_BASE_DIR / "tank_icon.png",
|
||||
"Assists": ICON_BASE_DIR / "assists_icon.png",
|
||||
"Deaths": ICON_BASE_DIR / "deaths_icon.png",
|
||||
"Caps": ICON_BASE_DIR / "cap_icon.png",
|
||||
}
|
||||
# Column CENTERS — a tight stat block packed against the center separator
|
||||
# (like SRE). Center-anchoring keeps 1- and 4-digit values aligned. Team 2
|
||||
# mirrors across its own center axis so the board is symmetric.
|
||||
n_cols = len(col_labels)
|
||||
gutter = section_width * 0.035 # small gap off the center separator
|
||||
stat_area = section_width * 0.30 # tight column block
|
||||
right_edge = start_x + section_width - gutter
|
||||
step = stat_area / n_cols
|
||||
centers = [right_edge - stat_area + step * (i + 0.5) for i in range(n_cols)]
|
||||
if flipped:
|
||||
axis = start_x + section_width / 2.0
|
||||
centers = [2 * axis - c for c in centers]
|
||||
col_center = {lab: int(c) for lab, c in zip(col_labels, centers)}
|
||||
|
||||
ICON_SIZE = int(STAT * 1.1)
|
||||
head_icon_y = header_y + 72
|
||||
for label in col_labels:
|
||||
cx = col_center[label]
|
||||
ico = icon_map.get(label)
|
||||
if ico:
|
||||
try:
|
||||
img = load_icon(ico, (ICON_SIZE, ICON_SIZE))
|
||||
overlay.paste(img, (int(cx - ICON_SIZE // 2), int(head_icon_y)), img)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Player rows. Layout per row:
|
||||
# top line — player name + inline (rating ±delta) at the column edge; stats
|
||||
# lower line — vehicle icons next to their translated names
|
||||
body = fonts["body"]
|
||||
small = fonts["veh"]
|
||||
name_h = body.getbbox("Ag")[3]
|
||||
small_h = small.getbbox("Ag")[3]
|
||||
veh_icon = int(BODY * 1.6)
|
||||
y_off = header_y + ICON_SIZE + 95
|
||||
for p in players:
|
||||
name = _strip_platform(p.get("fake_nick") or p.get("nick") or "")
|
||||
units = [u for u in p.get("units", []) if u.get("used")]
|
||||
row_h = name_h + 12 + veh_icon + 12
|
||||
line_y = y_off # player-name line (at the column edge)
|
||||
id_mid = line_y + name_h // 2
|
||||
veh_y = line_y + name_h + 12 # vehicle line (icons + names together)
|
||||
vname_y = veh_y + (veh_icon - small_h) // 2
|
||||
name_w = draw.textbbox((0, 0), name, font=body)[2]
|
||||
rating_segs = _rating_segments(p.get("pvp_ratio"), p.get("pvp_ratio_delta"))
|
||||
rating_w = _segments_width(draw, rating_segs, body)
|
||||
|
||||
if not flipped:
|
||||
# name + rating at the left edge
|
||||
draw.text((start_x, line_y), name, font=body, fill=NAME_FILL)
|
||||
_draw_segments(draw, start_x + name_w + 12, line_y, rating_segs, body)
|
||||
# below: each icon directly next to its own vehicle name
|
||||
_draw_vehicle_pairs(overlay, draw, units, start_x, veh_y, vname_y, veh_icon, small)
|
||||
else:
|
||||
# name + rating at the right edge (rating to the left of the name)
|
||||
name_x = start_x + section_width - name_w
|
||||
draw.text((name_x, line_y), name, font=body, fill=NAME_FILL)
|
||||
_draw_segments(draw, name_x - 12 - rating_w, line_y, rating_segs, body)
|
||||
# below: icon+name pairs, the whole group right-aligned to the edge
|
||||
total = _vehicle_pairs_width(draw, units, veh_icon, small)
|
||||
_draw_vehicle_pairs(overlay, draw, units, start_x + section_width - total,
|
||||
veh_y, vname_y, veh_icon, small)
|
||||
|
||||
# stat values, vertically centered on the identity line
|
||||
stats = [
|
||||
("Air", p.get("air_kills", 0)),
|
||||
("Ground", p.get("ground_kills", 0)),
|
||||
("Assists", p.get("assists", 0)),
|
||||
("Deaths", p.get("deaths", 0)),
|
||||
("Caps", p.get("captures", 0)),
|
||||
]
|
||||
for label, val in stats:
|
||||
num = int(val)
|
||||
fill = NEUTRAL_FILL
|
||||
if label in ("Air", "Ground") and num > 0:
|
||||
fill = WIN_FILL
|
||||
elif label == "Deaths" and num > 0:
|
||||
fill = (255, 20, 20, 255)
|
||||
elif label == "Caps" and num > 0:
|
||||
fill = DRAW_FILL
|
||||
elif label == "Assists" and num > 0:
|
||||
fill = (230, 150, 90, 255)
|
||||
draw.text((col_center[label], id_mid), str(num), font=fonts["stat"], fill=fill, anchor="mm")
|
||||
|
||||
y_off += row_h + 14
|
||||
|
||||
draw_team(0, teams[0], x_start + 10, col_width)
|
||||
draw_team(1, teams[1], x_start + col_width, col_width)
|
||||
|
||||
# ── Composite + downsample
|
||||
final = Image.alpha_composite(background, overlay)
|
||||
scale = 0.5
|
||||
final = final.resize((int(bg_w * scale), int(bg_h * scale)), resample=Resampling.LANCZOS).convert("RGB")
|
||||
final.save(output_path, format="PNG", compress_level=1)
|
||||
|
||||
|
||||
RATING_FILL = (170, 200, 255, 255)
|
||||
|
||||
|
||||
def _rating_segments(rating, delta):
|
||||
"""Build colored (text, fill) segments for the inline ``(1468 +2)`` rating.
|
||||
|
||||
``delta`` is a signed change vs the player's previous rating; it's ``None`` until
|
||||
rating history is tracked (see wl.py), so for now only the current value shows.
|
||||
"""
|
||||
rating_str = f"{int(round(rating))}" if isinstance(rating, (int, float)) else "—"
|
||||
if isinstance(delta, (int, float)) and delta != 0:
|
||||
dcol = WIN_FILL if delta > 0 else LOSS_FILL
|
||||
return [(f"({rating_str} ", RATING_FILL), (f"{'+' if delta > 0 else ''}{int(delta)}", dcol), (")", RATING_FILL)]
|
||||
return [(f"({rating_str})", RATING_FILL)]
|
||||
|
||||
|
||||
def _segments_width(draw, segs, font):
|
||||
return sum(draw.textbbox((0, 0), s, font=font)[2] for s, _ in segs)
|
||||
|
||||
|
||||
def _draw_segments(draw, x, y, segs, font):
|
||||
for s, c in segs:
|
||||
draw.text((x, y), s, font=font, fill=c)
|
||||
x += draw.textbbox((0, 0), s, font=font)[2]
|
||||
return x
|
||||
|
||||
|
||||
def _paste_vehicle(overlay, unit, xy, size):
|
||||
"""Paste a vehicle icon (unused ones dimmed), falling back to not_found."""
|
||||
alpha = 255 if unit["used"] else 90
|
||||
for path in (VEHICLE_ICON_DIR / f"{unit['internal'].lower()}.png", ICON_BASE_DIR / "not_found.png"):
|
||||
try:
|
||||
img = load_icon(path, (size, size), alpha=alpha)
|
||||
overlay.paste(img, (int(xy[0]), int(xy[1])), img)
|
||||
return
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
|
||||
_VEH_PAIR_GAP = 22
|
||||
|
||||
|
||||
def _vehicle_pairs_width(draw, units, size, font):
|
||||
"""Total pixel width of the icon+name pairs (for right-aligning team 2)."""
|
||||
w = 0
|
||||
for u in units:
|
||||
w += size + 4 + draw.textbbox((0, 0), u["name"], font=font)[2] + _VEH_PAIR_GAP
|
||||
return max(0, w - _VEH_PAIR_GAP)
|
||||
|
||||
|
||||
def _draw_vehicle_pairs(overlay, draw, units, x, veh_y, vname_y, size, font):
|
||||
"""Draw each vehicle as an icon immediately followed by its translated name."""
|
||||
x = int(x)
|
||||
for u in units:
|
||||
_paste_vehicle(overlay, u, (x, veh_y), size)
|
||||
x += size + 4
|
||||
draw.text((x, vname_y), u["name"], font=font, fill=NEUTRAL_FILL)
|
||||
x += draw.textbbox((0, 0), u["name"], font=font)[2] + _VEH_PAIR_GAP
|
||||
return x
|
||||
|
||||
|
||||
async def create_scoreboard(model: dict[str, Any], output_path: str, bar_color: str = "") -> None:
|
||||
"""Render the TSS scoreboard for a game model and write it to ``output_path``."""
|
||||
await asyncio.to_thread(_create_scoreboard_sync, model, output_path, bar_color)
|
||||
@@ -0,0 +1,156 @@
|
||||
"""Raw TSS game → scoreboard model adapter.
|
||||
|
||||
``process_game`` receives the raw Spectra TSS payload (also persisted to disk by
|
||||
``tss_ws._write_game``). Its shape is awkward for rendering: players live in a dict
|
||||
keyed by uid, team identity and pvp_ratio live in a parallel ``tss`` block, and each
|
||||
player carries a full ``units[]`` lineup rather than a single vehicle.
|
||||
|
||||
``build_scoreboard_model`` flattens all of that into a stable, renderer-friendly dict
|
||||
so ``scoreboard.py`` never has to understand the raw feed. Pure (no I/O); unit-tested
|
||||
against sample replays.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
|
||||
log = logging.getLogger("tssbot.transform")
|
||||
|
||||
# Imported lazily/defensively so the module is importable (and unit-testable for the
|
||||
# non-translation paths) even if SHARED isn't on sys.path yet.
|
||||
try:
|
||||
from data_parser import LangTableReader, apply_vehicle_name_filters # type: ignore
|
||||
except Exception: # pragma: no cover - exercised only when SHARED is missing
|
||||
LangTableReader = None # type: ignore
|
||||
|
||||
def apply_vehicle_name_filters(name, strip_decorations=True): # type: ignore
|
||||
return name
|
||||
|
||||
|
||||
def _translator(lang_column: str):
|
||||
"""Return a translate(internal)->human callable for the given lang column.
|
||||
|
||||
Falls back to identity (internal name) when data_parser is unavailable.
|
||||
"""
|
||||
if LangTableReader is None:
|
||||
return lambda v: v
|
||||
try:
|
||||
reader = LangTableReader(lang_column)
|
||||
except Exception as exc: # pragma: no cover
|
||||
log.warning("LangTableReader(%r) failed: %s", lang_column, exc)
|
||||
return lambda v: v
|
||||
|
||||
def _t(internal: str) -> str:
|
||||
try:
|
||||
translated = reader.get_translate(internal) or internal
|
||||
except Exception:
|
||||
translated = internal
|
||||
# Strip country-leak / premium / tree decoration glyphs the PNG font can't
|
||||
# render (e.g. the leading ␗ on cn_t_34_85_d_5t), matching SREBOT's renderer.
|
||||
return apply_vehicle_name_filters(translated)
|
||||
|
||||
return _t
|
||||
|
||||
|
||||
def _build_units(units: list[dict[str, Any]], translate) -> list[dict[str, Any]]:
|
||||
"""Normalize a player's lineup: icon key (internal), translated name, used flag."""
|
||||
out: list[dict[str, Any]] = []
|
||||
for u in units or []:
|
||||
internal = str(u.get("unit") or "").strip()
|
||||
if not internal:
|
||||
continue
|
||||
out.append({
|
||||
"internal": internal,
|
||||
"name": translate(internal) or u.get("unit_normalized") or internal,
|
||||
"used": bool(u.get("used")),
|
||||
})
|
||||
return out
|
||||
|
||||
|
||||
def _pvp_index(tss: dict[str, Any]) -> dict[str, dict[str, Any]]:
|
||||
"""Map uid -> {pvp_ratio, role} from the tss per-team roster blocks."""
|
||||
idx: dict[str, dict[str, Any]] = {}
|
||||
for slot in ("1", "2"):
|
||||
team = tss.get(slot)
|
||||
if not isinstance(team, dict):
|
||||
continue
|
||||
for entry in team.get("players") or []:
|
||||
uid = str(entry.get("uid") or "")
|
||||
if uid:
|
||||
idx[uid] = {
|
||||
"pvp_ratio": entry.get("pvp_ratio"),
|
||||
"role": entry.get("role"),
|
||||
}
|
||||
return idx
|
||||
|
||||
|
||||
def build_scoreboard_model(game: dict[str, Any], lang_column: str = "<English>") -> Optional[dict[str, Any]]:
|
||||
"""Flatten a raw TSS game into a scoreboard render model.
|
||||
|
||||
Returns ``None`` if the game lacks the minimum structure (two teams w/ players).
|
||||
"""
|
||||
tss = game.get("tss") or {}
|
||||
players_raw = game.get("players") or {}
|
||||
if not isinstance(players_raw, dict):
|
||||
return None
|
||||
|
||||
translate = _translator(lang_column)
|
||||
pvp = _pvp_index(tss)
|
||||
|
||||
teams: dict[str, dict[str, Any]] = {}
|
||||
for slot in ("1", "2"):
|
||||
raw_meta = tss.get(slot)
|
||||
meta: dict[str, Any] = raw_meta if isinstance(raw_meta, dict) else {}
|
||||
teams[slot] = {
|
||||
"slot": slot,
|
||||
"team_id": str(meta.get("team_id")) if meta.get("team_id") is not None else None,
|
||||
"team_name": str(meta.get("team_name") or f"Team {slot}"),
|
||||
"players": [],
|
||||
}
|
||||
|
||||
for uid, p in players_raw.items():
|
||||
if not isinstance(p, dict):
|
||||
continue
|
||||
slot = str(p.get("team") or "")
|
||||
if slot not in teams:
|
||||
continue
|
||||
info = pvp.get(str(uid), {})
|
||||
teams[slot]["players"].append({
|
||||
"uid": str(uid),
|
||||
"nick": str(p.get("name") or ""),
|
||||
# No streamer-mode field in the TSS feed yet; renderer falls back to nick.
|
||||
"fake_nick": None,
|
||||
"tag": str(p.get("tag") or ""),
|
||||
"country_id": p.get("country_id"),
|
||||
"air_kills": int(p.get("air_kills") or 0),
|
||||
"ground_kills": int(p.get("ground_kills") or 0),
|
||||
"assists": int(p.get("assists") or 0),
|
||||
"deaths": int(p.get("deaths") or 0),
|
||||
"captures": int(p.get("captures") or 0),
|
||||
"score": int(p.get("score") or 0),
|
||||
"pvp_ratio": info.get("pvp_ratio"),
|
||||
"role": info.get("role"),
|
||||
"units": _build_units(p.get("units") or [], translate),
|
||||
})
|
||||
|
||||
if not teams["1"]["players"] or not teams["2"]["players"]:
|
||||
return None
|
||||
|
||||
winner_slot = str(game.get("winner") or "")
|
||||
is_draw = bool(game.get("draw"))
|
||||
winner_name = teams[winner_slot]["team_name"] if winner_slot in teams and not is_draw else None
|
||||
|
||||
return {
|
||||
"session_id": str(game.get("_id") or ""),
|
||||
"utc_timestamp": int(game.get("end_ts") or 0),
|
||||
"start_ts": int(game.get("start_ts") or 0),
|
||||
"duration": game.get("duration"),
|
||||
"map": str(game.get("mission_name") or ""),
|
||||
"mission_mode": str(game.get("mission_mode") or ""),
|
||||
"tournament_name": str(tss.get("tournament_name") or ""),
|
||||
"bracket": str(tss.get("bracket") or ""),
|
||||
"is_draw": is_draw,
|
||||
"winner": winner_name,
|
||||
"winner_slot": winner_slot,
|
||||
"teams": [teams["1"], teams["2"]],
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
"""TSS win/loss standings — STUB.
|
||||
|
||||
SREBOT tracks squadron W/L globally. TSS needs the same idea but **scoped per
|
||||
tournament** (a team's record only means something inside its bracket), so the real
|
||||
implementation is deferred. Until then ``get_tss_standings`` returns ``None`` for
|
||||
every team and the scoreboard renders a neutral placeholder in the W/L slot.
|
||||
|
||||
TODO(tss-wl): persist per-(tournament_id, team_id) wins/losses and resolve here.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
def get_tss_standings(team_id: Optional[str], tournament_id: Optional[Any] = None) -> Optional[dict[str, int]]:
|
||||
"""Return ``{"wins": int, "losses": int}`` for a team, or ``None`` if untracked.
|
||||
|
||||
Stub: always ``None`` until per-tournament W/L tracking lands.
|
||||
"""
|
||||
return None
|
||||
+22
-4
@@ -1,18 +1,36 @@
|
||||
#!/usr/bin/env bash
|
||||
# Pull TSS replay dirs from the server for local study.
|
||||
# Run once manually: bash fetch_replays.sh
|
||||
# Pull the latest TSS replay dirs from the server for local study.
|
||||
# Run once manually: bash fetch_replays.sh [COUNT] (default COUNT=10)
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
REMOTE="srebot"
|
||||
REMOTE_PATH="/mnt/HC_Volume_105581488/STORAGE/REPLAYS/TSS/"
|
||||
LOCAL_PATH="./replays_sample"
|
||||
COUNT="${1:-10}"
|
||||
|
||||
mkdir -p "$LOCAL_PATH"
|
||||
|
||||
# Grab the COUNT most-recently-modified replay dirs (newest first).
|
||||
mapfile -t DIRS < <(ssh "$REMOTE" "cd '$REMOTE_PATH' && ls -1dt */ 2>/dev/null | head -n $COUNT")
|
||||
|
||||
if [ "${#DIRS[@]}" -eq 0 ]; then
|
||||
echo "No replay dirs found on ${REMOTE}:${REMOTE_PATH}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Pulling ${#DIRS[@]} latest replay dir(s):"
|
||||
printf ' %s\n' "${DIRS[@]}"
|
||||
|
||||
# Build include filters: each dir + its replay_data.json.gz, exclude the rest.
|
||||
INCLUDES=()
|
||||
for d in "${DIRS[@]}"; do
|
||||
d="${d%/}"
|
||||
INCLUDES+=( --include="${d}/" --include="${d}/replay_data.json.gz" )
|
||||
done
|
||||
|
||||
rsync -avz --progress \
|
||||
--include="*/" \
|
||||
--include="replay_data.json.gz" \
|
||||
"${INCLUDES[@]}" \
|
||||
--exclude="*" \
|
||||
"${REMOTE}:${REMOTE_PATH}" \
|
||||
"$LOCAL_PATH/"
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,37 @@
|
||||
"""Render scoreboards for every replay in TSSBOT/replays_sample/ for visual review.
|
||||
|
||||
Run from the TSSBOT dir: python -m scripts.gen_sample_scoreboards
|
||||
Outputs <session>/scoreboard.png next to each replay_data.json.
|
||||
"""
|
||||
import asyncio
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from BOT import transform, scoreboard # noqa: E402
|
||||
|
||||
|
||||
async def main():
|
||||
files = sorted(glob.glob("replays_sample/*/replay_data.json"))
|
||||
if not files:
|
||||
print("No sample replays found under replays_sample/")
|
||||
return
|
||||
for f in files:
|
||||
with open(f, encoding="utf-8") as fh:
|
||||
game = json.load(fh)
|
||||
model = transform.build_scoreboard_model(game, "<English>")
|
||||
if not model:
|
||||
print(f"SKIP {f}: model build returned None")
|
||||
continue
|
||||
out = os.path.join(os.path.dirname(f), "scoreboard.png")
|
||||
# bar_color "not_set": no guild team context in this offline render.
|
||||
await scoreboard.create_scoreboard(model, out, bar_color="not_set")
|
||||
size = os.path.getsize(out)
|
||||
print(f"OK {model['map']:<22} {model['session_id']} -> {out} ({size//1024} KB)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user