diff --git a/BOT/autologging.py b/BOT/autologging.py index a476503..db14332 100644 --- a/BOT/autologging.py +++ b/BOT/autologging.py @@ -1,30 +1,36 @@ -"""TSSBOT autolog matcher. +"""TSSBOT autolog matcher + scoreboard dispatch. For each game received from the Spectra TSS feed, match it against every -subscribing guild's ``tss-team`` preference entries and work out which channels -should be notified, deduping per session. +subscribing guild's ``tss-team`` / ``tss-player`` preference entries, render the +scoreboard once per session/color/language, and post it to each subscribed channel. -NOTE: building and sending the scoreboard embed is intentionally a TODO for now -(it needs significant rework). The matching + dedup pipeline below is complete -and logs the targets it *would* post to. Wire the send in at the marked spot. +Matching is per-entity (a team by id/name, a player by uid); sending is deduped per +channel per session so a channel that subscribes both a team and one of its players +only receives one scoreboard for a given game. """ from __future__ import annotations +import asyncio import logging +from pathlib import Path from typing import Any, Optional import discord -from . import preferences +from . import preferences, scoreboard, transform +from .storage import STORAGE_DIR log = logging.getLogger("tssbot.autolog") +REPLAYS_TSS_DIR: Path = STORAGE_DIR / "REPLAYS" / "TSS" + # Registered by start_bot once the client exists; standalone tss_ws leaves it None. _bot: Optional[discord.Client] = None -# session_id -> set of channel_ids already handled (in-memory idempotency, -# mirrors SREBOT's _sent_channels_by_session). +# session_id -> set of channel_ids already handled (in-memory idempotency). _sent_channels_by_session: dict[str, set[int]] = {} +# session_id -> lock guarding the one-time PNG render. +_render_locks: dict[str, asyncio.Lock] = {} def set_bot(bot: discord.Client) -> None: @@ -33,8 +39,12 @@ def set_bot(bot: discord.Client) -> None: _bot = bot +# --------------------------------------------------------------------------- +# Present-entity extraction +# --------------------------------------------------------------------------- + def _present_teams(game: dict[str, Any]) -> tuple[set[str], set[str]]: - """Return stable TSS team IDs and names embedded in a replay.""" + """Return stable TSS team IDs and lowercased names embedded in a replay.""" tss = game.get("tss") or {} team_ids: set[str] = set() team_names: set[str] = set() @@ -50,41 +60,146 @@ def _present_teams(game: dict[str, Any]) -> tuple[set[str], set[str]]: return team_ids, team_names +def _present_players(game: dict[str, Any]) -> set[str]: + """Return the uids of every player in a replay (players dict + tss rosters).""" + uids: set[str] = {str(u) for u in (game.get("players") or {}).keys()} + tss = game.get("tss") or {} + for slot in ("1", "2"): + team = tss.get(slot) + if isinstance(team, dict): + for entry in team.get("players") or []: + if entry.get("uid") is not None: + uids.add(str(entry["uid"])) + return uids + + +def _bar_color(game: dict[str, Any], guild_id: int) -> str: + """Header/separator tint from the guild's own team vs the result.""" + if game.get("draw"): + return "draw" + guild_team = preferences.get_guild_team(guild_id) + if not guild_team or guild_team.get("team_id") is None: + return "not_set" + my_id = str(guild_team["team_id"]) + tss = game.get("tss") or {} + slot_ids = {s: str((tss.get(s) or {}).get("team_id")) for s in ("1", "2")} + winner = str(game.get("winner") or "") + if slot_ids.get(winner) == my_id: + return "win" + if my_id in slot_ids.values(): + return "loss" + return "not_involved" + + +# --------------------------------------------------------------------------- +# Scoreboard view + render/send +# --------------------------------------------------------------------------- + +def build_tss_scoreboard_view(session_id: str) -> discord.ui.View: + """Link buttons under a scoreboard: in-game replay + the TSS website.""" + view = discord.ui.View(timeout=None) + try: + replay_url = f"https://warthunder.com/en/tournament/replay/{int(session_id, 16)}" + view.add_item(discord.ui.Button(label="View Replay", style=discord.ButtonStyle.link, url=replay_url)) + except (ValueError, TypeError): + pass + view.add_item(discord.ui.Button( + label="View on Website", style=discord.ButtonStyle.link, + url=f"https://tss.pawjob.us/games/{session_id}", emoji="🌐", + )) + return view + + +async def _render_scoreboard(game: dict[str, Any], session_id: str, bar_color: str, lang_column: str) -> Optional[Path]: + """Render (once, cached on disk) the scoreboard for a session/color/language.""" + model = transform.build_scoreboard_model(game, lang_column) + if not model: + log.warning("[TSS-AUTOLOG] could not build model for %s", session_id) + return None + lang_tag = lang_column.strip("<>").lower() or "english" + out_dir = REPLAYS_TSS_DIR / session_id + out_path = out_dir / f"scoreboard-{bar_color}-{lang_tag}.png" + lock = _render_locks.setdefault(session_id, asyncio.Lock()) + async with lock: + if not out_path.exists(): + out_dir.mkdir(parents=True, exist_ok=True) + await scoreboard.create_scoreboard(model, str(out_path), bar_color=bar_color) + return out_path if out_path.exists() else None + + +async def _send_scoreboard(game: dict[str, Any], guild_id: int, channel_id: int, session_id: str, sent: set[int]) -> None: + """Render and post the scoreboard for one subscribed channel (deduped).""" + bot = _bot + if bot is None or channel_id in sent: + return + sent.add(channel_id) # claim the channel before any await so concurrent matches can't double-send + + bar_color = _bar_color(game, guild_id) + lang_column = preferences.guild_language_column(guild_id) + out_path = await _render_scoreboard(game, session_id, bar_color, lang_column) + if out_path is None: + return + + channel = bot.get_channel(channel_id) + if channel is None: + try: + channel = await bot.fetch_channel(channel_id) + except (discord.NotFound, discord.Forbidden) as e: + log.warning("[TSS-AUTOLOG] channel %s unavailable (%s)", channel_id, e) + return + if not isinstance(channel, discord.abc.Messageable): + return + + try: + with open(out_path, "rb") as f: + await channel.send( + file=discord.File(f, filename="scoreboard.png"), + view=build_tss_scoreboard_view(session_id), + ) + log.info("[TSS-AUTOLOG] sent %s -> guild=%s channel=%s", session_id, guild_id, channel_id) + except discord.HTTPException as e: + log.error("[TSS-AUTOLOG] send failed %s -> %s: %s", session_id, channel_id, e) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + async def process_game(game: dict[str, Any]) -> None: - """Match one received game against guild preferences and notify channels. + """Match one received game against guild prefs and post scoreboards. Safe to call from the standalone WS listener (no-ops if no bot registered). """ if _bot is None: return - session_id = str(game.get("_id") or "") if not session_id: return present_team_ids, present_team_names = _present_teams(game) - + present_uids = _present_players(game) sent = _sent_channels_by_session.setdefault(session_id, set()) for guild_id, prefs in preferences.iter_guild_preferences(): for entity_id, entry in prefs.items(): - if not isinstance(entry, dict) or entry.get("Type") != "tss-team": + if not isinstance(entry, dict): + continue + type_ = entry.get("Type") + if type_ not in ("tss-team", "tss-player"): continue channel_id, enabled = preferences.parse_channel(entry.get("Logs")) - if not channel_id or not enabled: + if not channel_id or not enabled or channel_id in sent: continue - name = (entry.get("Name") or "").lower() - matched = str(entity_id) in present_team_ids or (name and name in present_team_names) - - if not matched or channel_id in sent: + if type_ == "tss-team": + name = (entry.get("Name") or "").lower() + matched = str(entity_id) in present_team_ids or (bool(name) and name in present_team_names) + else: # tss-player + matched = str(entity_id) in present_uids + if not matched: continue - sent.add(channel_id) - # TODO: build & send the scoreboard embed to `channel_id`. - # The matched target is fully resolved here; the only thing missing - # is the rendered scoreboard (out of scope for now). - log.info( - "autolog match (send TODO): session=%s guild=%s channel=%s team=%s", - session_id, guild_id, channel_id, entity_id, - ) + try: + await _send_scoreboard(game, guild_id, channel_id, session_id, sent) + except Exception as e: # never let one channel break the rest + log.error("[TSS-AUTOLOG] dispatch error guild=%s channel=%s: %s", guild_id, channel_id, e) diff --git a/BOT/commands.py b/BOT/commands.py index 1f59090..88b0019 100644 --- a/BOT/commands.py +++ b/BOT/commands.py @@ -150,6 +150,29 @@ class TssCommands(commands.Cog): f"βœ… Logging **{name}**'s matches to <#{interaction.channel_id}>.", ephemeral=True ) + # ── /log-player ──────────────────────────────────────────────────────── + @app_commands.command(name="log-player", description="Send a player's matches to this channel") + @app_commands.describe(player="Player username") + @app_commands.autocomplete(player=player_autocomplete) + @app_commands.checks.has_permissions(manage_guild=True) + @not_blacklisted() + async def log_player(self, interaction: discord.Interaction, player: str): + await interaction.response.defer(ephemeral=True) + if interaction.guild_id is None or interaction.channel_id is None: + return await interaction.followup.send("This command must be used in a server channel.", ephemeral=True) + candidates = await storage.resolve_players(player) + if not candidates: + return await interaction.followup.send(f"No players found matching **{player}**.", ephemeral=True) + if len(candidates) > 1: + return await interaction.followup.send(_too_many_msg(candidates), ephemeral=True) + uid, nick = str(candidates[0]["uid"]), candidates[0]["nick"] + preferences.upsert_log_entry( + interaction.guild_id, uid, "tss-player", nick, interaction.channel_id + ) + await interaction.followup.send( + f"βœ… Logging **{nick}**'s matches to <#{interaction.channel_id}>.", ephemeral=True + ) + # ── /set-player ──────────────────────────────────────────────────────── @app_commands.command(name="set-player", description="Link your Discord account to a War Thunder player") @app_commands.describe(player="Player username") @@ -290,7 +313,8 @@ class TssCommands(commands.Cog): name="Autologging β€” Manage Server", value=( "`/set-team ` β€” set this server's team\n" - "`/log-team ` β€” post a team's matches to this channel" + "`/log-team ` β€” post a team's matches to this channel\n" + "`/log-player ` β€” post a player's matches to this channel" ), inline=False, ) diff --git a/BOT/preferences.py b/BOT/preferences.py index 889c143..fe08c96 100644 --- a/BOT/preferences.py +++ b/BOT/preferences.py @@ -39,6 +39,7 @@ log = logging.getLogger("tssbot.preferences") PREFERENCES_DIR: Path = STORAGE_DIR / "PREFERENCES" TEAMS_JSON_PATH: Path = STORAGE_DIR / "TEAMS.json" +FEATURES_DIR: Path = STORAGE_DIR / "FEATURES" _CHANNEL_ID_RE = re.compile(r"(\d{17,20})") @@ -182,3 +183,27 @@ def set_guild_team(guild_id: int | str, team_id: int, name: str) -> bool: teams = load_teams() teams[str(guild_id)] = {"TM_Name": name, "team_id": int(team_id)} return _write_json(TEAMS_JSON_PATH, teams) + + +# --------------------------------------------------------------------------- +# FEATURES.json β€” per-guild feature flags (shared with SREBOT; we use Language) +# --------------------------------------------------------------------------- + +def _guild_features_path(guild_id: int | str) -> Path: + return FEATURES_DIR / f"{guild_id}-features.json" + + +def load_features(guild_id: int | str) -> dict[str, Any]: + """Load a guild's feature flags (shared SREBOT/TSSBOT file).""" + data = _read_json(_guild_features_path(guild_id), {}) + return data if isinstance(data, dict) else {} + + +def save_features(guild_id: int | str, features: dict[str, Any]) -> bool: + """Persist a guild's feature flags.""" + return _write_json(_guild_features_path(guild_id), features) + + +def guild_language_column(guild_id: int | str) -> str: + """Return the guild's LangTableReader column, e.g. ```` (the default).""" + return load_features(guild_id).get("Language") or "" diff --git a/BOT/scoreboard.py b/BOT/scoreboard.py new file mode 100644 index 0000000..93a0e09 --- /dev/null +++ b/BOT/scoreboard.py @@ -0,0 +1,381 @@ +"""TSS scoreboard renderer. + +Generates the match-result image posted by autologging. Mirrors SREBOT's visual +language (blurred map background, vignette, two team columns, stat icons) but is +TSS-native: team names instead of squadron tags, the full per-player vehicle lineup +(unused dimmed) instead of a single vehicle, and per-player pvp_ratio instead of +squadron points. + +Input is the flat model from ``transform.build_scoreboard_model`` β€” this module never +touches the raw feed. +""" +from __future__ import annotations + +import asyncio +import logging +import re +from datetime import datetime, timezone +from functools import lru_cache +from pathlib import Path +from typing import Any, Optional + +import numpy as np +from PIL import Image, ImageDraw, ImageFont +from PIL.Image import Resampling + +from . import SHARED_DIR +from .wl import get_tss_standings + +log = logging.getLogger("tssbot.scoreboard") + +MAPS_DIR = SHARED_DIR / "MAPS" +ICON_BASE_DIR = SHARED_DIR / "ICONS" +VEHICLE_ICON_DIR = ICON_BASE_DIR / "VEHICLES" +TEXT_FONT_PATH = SHARED_DIR / "FONTS" / "arial_unicode_ms.otf" + +# Colors +WIN_FILL = (60, 255, 60, 255) +LOSS_FILL = (255, 60, 60, 255) +DRAW_FILL = (255, 255, 0, 255) +NEUTRAL_FILL = (255, 255, 255, 255) +GREY_FILL = (200, 200, 200, 255) +NAME_FILL = (250, 227, 200, 255) +DIM_FILL = (150, 150, 150, 255) + + +# --------------------------------------------------------------------------- +# Asset caching (mirrors SREBOT) +# --------------------------------------------------------------------------- + +_FONTS: dict[str, ImageFont.FreeTypeFont] = {} + + +def load_fonts(base_width: int) -> dict[str, ImageFont.FreeTypeFont]: + global _FONTS + if _FONTS: + return _FONTS + _FONTS = { + "title": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.04)), + "team": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.03)), + "sub": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.019)), + "body": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.020)), + "stat": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.022)), + "info": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.016)), + "small": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.013)), + "veh": ImageFont.truetype(str(TEXT_FONT_PATH), int(base_width * 0.016)), + } + return _FONTS + + +@lru_cache(maxsize=256) +def _load_icon_cached(path_str: str, size: tuple, alpha: int): + img = Image.open(path_str).convert("RGBA") + if size: + img = img.resize(size, Resampling.LANCZOS) + if alpha < 255: + a = img.getchannel("A").point(lambda v: int(v * alpha / 255)) + img.putalpha(a) + return img + + +def load_icon(path: Path, size: Optional[tuple] = None, alpha: int = 255) -> Image.Image: + return _load_icon_cached(str(path), tuple(size) if size else None, alpha) + + +@lru_cache(maxsize=24) +def _load_map_cached(path_str: str, blur_radius: int): + from PIL import ImageFilter + img = Image.open(path_str).convert("RGBA") + return img.filter(ImageFilter.GaussianBlur(blur_radius)) + + +def _make_vignette(width: int, height: int, base_alpha=140, max_alpha=175, power=4) -> Image.Image: + ys = np.linspace(-1, 1, height)[:, None] + xs = np.linspace(-1, 1, width)[None, :] + dist = np.sqrt(xs ** 2 + ys ** 2) / np.sqrt(2) + band = base_alpha + (max_alpha - base_alpha) * (dist ** power) + return Image.fromarray(np.clip(band, 0, 255).astype(np.uint8), mode="L") + + +def _resolve_map_image(map_name: str) -> Optional[str]: + """Match a TSS mission_name against SHARED/MAPS (spacesβ†’_, .jpg), like SRE.""" + clean = re.sub(r"^\s*\[[^]]+\]\s*", "", map_name).replace(" ", "_") + target = f"{clean}.jpg" + try: + candidates = {f.name for f in MAPS_DIR.iterdir() if f.is_file()} + except FileNotFoundError: + return None + match = next((fn for fn in candidates if fn.lower() == target.lower()), None) + return str(MAPS_DIR / match) if match else None + + +# --------------------------------------------------------------------------- +# Render +# --------------------------------------------------------------------------- + +def _bar_fill(bar_color: str) -> tuple: + return {"win": WIN_FILL, "loss": LOSS_FILL, "draw": DRAW_FILL}.get(bar_color, NEUTRAL_FILL) + + +def _strip_platform(name: str) -> str: + return name.replace("@live", "").replace("@psn", "").strip() + + +def _create_scoreboard_sync(model: dict[str, Any], output_path: str, bar_color: str) -> None: + map_name = model.get("map", "") + map_image_path = _resolve_map_image(map_name) + + if map_image_path: + background = _load_map_cached(map_image_path, 2).copy() + else: + log.warning("[TSS-SB] No map image for %r; using flat background", map_name) + background = Image.new("RGBA", (1920, 1080), (25, 28, 32, 255)) + + bg_w, bg_h = background.size + + band = _make_vignette(bg_w, bg_h) + overlay = Image.new("RGBA", (bg_w, bg_h), (0, 0, 0, 0)) + black = Image.new("RGBA", (bg_w, bg_h), (0, 0, 0, 255)) + overlay = Image.composite(black, overlay, band) + draw = ImageDraw.Draw(overlay) + + fonts = load_fonts(bg_w) + BODY = int(bg_w * 0.020) + STAT = int(bg_w * 0.022) + + # ── Top-right metadata: timestamp + session id + padding = 15 + ts_epoch = int(model.get("utc_timestamp") or 0) + ts_text = datetime.fromtimestamp(ts_epoch, tz=timezone.utc).strftime("%H:%M:%S - %Y-%m-%d UTC") + sid_text = model.get("session_id", "") + for i, txt in enumerate((ts_text, sid_text)): + bbox = draw.textbbox((0, 0), txt, font=fonts["info"]) + x = bg_w - (bbox[2] - bbox[0]) - padding - 10 + y = padding + 10 + i * ((bbox[3] - bbox[1]) + 20) + draw.text((x, y), txt, font=fonts["info"], fill=GREY_FILL) + + # ── Top-center titles: map name, then tournament Β· mode + y = 50 + title = map_name or "Unknown Map" + tb = draw.textbbox((0, 0), title, font=fonts["title"]) + draw.text(((bg_w - (tb[2] - tb[0])) // 2, y), title, font=fonts["title"], fill=NEUTRAL_FILL) + # Gap off the title's full glyph box (not just measured height) so the subtitle + # clears descenders instead of clipping into the map name. + y += int(bg_w * 0.04) + 28 + + sub_parts = [p for p in (model.get("tournament_name"), model.get("mission_mode")) if p] + if sub_parts: + sub = " Β· ".join(sub_parts) + sb = draw.textbbox((0, 0), sub, font=fonts["sub"]) + draw.text(((bg_w - (sb[2] - sb[0])) // 2, y), sub, font=fonts["sub"], fill=GREY_FILL) + y += (sb[3] - sb[1]) + 10 + + y_start = y + 150 + + # ── Layout: two team columns + separator + x_start = 55 + col_width = (bg_w - x_start * 2) // 2 + + sep_x = x_start + col_width + draw.line([(sep_x, y_start - 60), (sep_x, bg_h - 50)], fill=_bar_fill(bar_color), width=5) + + teams = model.get("teams", []) + is_draw = model.get("is_draw", False) + winner = model.get("winner") + + def draw_team(idx: int, team: dict, start_x: int, section_width: int): + flipped = idx == 1 # second team mirrored to the right + players = sorted(team.get("players", []), key=lambda p: int(p.get("score") or 0), reverse=True) + + team_name = team.get("team_name", "Team") + if is_draw: + header_fill = DRAW_FILL + elif winner is not None and team_name == winner: + header_fill = WIN_FILL + else: + header_fill = LOSS_FILL + + header_y = y_start - 90 + nb = draw.textbbox((0, 0), team_name, font=fonts["team"]) + name_w = nb[2] - nb[0] + name_x = start_x if not flipped else start_x + section_width - name_w + draw.text((name_x, header_y), team_name, font=fonts["team"], fill=header_fill) + + # W/L slot β€” only drawn once real per-tournament standings exist (stub returns + # None today, so nothing renders under the team name). + standings = get_tss_standings(team.get("team_id")) + if standings: + wl_text = f"{standings['wins']}W - {standings['losses']}L" + wb = draw.textbbox((0, 0), wl_text, font=fonts["sub"]) + wl_x = start_x if not flipped else start_x + section_width - (wb[2] - wb[0]) + draw.text((wl_x, header_y + (nb[3] - nb[1]) + 18), wl_text, font=fonts["sub"], fill=GREY_FILL) + + # Stat columns (rating now lives inline next to the player name). + col_labels = ["Air", "Ground", "Assists", "Deaths", "Caps"] + icon_map = { + "Air": ICON_BASE_DIR / "fighter_icon.png", + "Ground": ICON_BASE_DIR / "tank_icon.png", + "Assists": ICON_BASE_DIR / "assists_icon.png", + "Deaths": ICON_BASE_DIR / "deaths_icon.png", + "Caps": ICON_BASE_DIR / "cap_icon.png", + } + # Column CENTERS β€” a tight stat block packed against the center separator + # (like SRE). Center-anchoring keeps 1- and 4-digit values aligned. Team 2 + # mirrors across its own center axis so the board is symmetric. + n_cols = len(col_labels) + gutter = section_width * 0.035 # small gap off the center separator + stat_area = section_width * 0.30 # tight column block + right_edge = start_x + section_width - gutter + step = stat_area / n_cols + centers = [right_edge - stat_area + step * (i + 0.5) for i in range(n_cols)] + if flipped: + axis = start_x + section_width / 2.0 + centers = [2 * axis - c for c in centers] + col_center = {lab: int(c) for lab, c in zip(col_labels, centers)} + + ICON_SIZE = int(STAT * 1.1) + head_icon_y = header_y + 72 + for label in col_labels: + cx = col_center[label] + ico = icon_map.get(label) + if ico: + try: + img = load_icon(ico, (ICON_SIZE, ICON_SIZE)) + overlay.paste(img, (int(cx - ICON_SIZE // 2), int(head_icon_y)), img) + except Exception: + pass + + # Player rows. Layout per row: + # top line β€” player name + inline (rating Β±delta) at the column edge; stats + # lower line β€” vehicle icons next to their translated names + body = fonts["body"] + small = fonts["veh"] + name_h = body.getbbox("Ag")[3] + small_h = small.getbbox("Ag")[3] + veh_icon = int(BODY * 1.6) + y_off = header_y + ICON_SIZE + 95 + for p in players: + name = _strip_platform(p.get("fake_nick") or p.get("nick") or "") + units = [u for u in p.get("units", []) if u.get("used")] + row_h = name_h + 12 + veh_icon + 12 + line_y = y_off # player-name line (at the column edge) + id_mid = line_y + name_h // 2 + veh_y = line_y + name_h + 12 # vehicle line (icons + names together) + vname_y = veh_y + (veh_icon - small_h) // 2 + name_w = draw.textbbox((0, 0), name, font=body)[2] + rating_segs = _rating_segments(p.get("pvp_ratio"), p.get("pvp_ratio_delta")) + rating_w = _segments_width(draw, rating_segs, body) + + if not flipped: + # name + rating at the left edge + draw.text((start_x, line_y), name, font=body, fill=NAME_FILL) + _draw_segments(draw, start_x + name_w + 12, line_y, rating_segs, body) + # below: each icon directly next to its own vehicle name + _draw_vehicle_pairs(overlay, draw, units, start_x, veh_y, vname_y, veh_icon, small) + else: + # name + rating at the right edge (rating to the left of the name) + name_x = start_x + section_width - name_w + draw.text((name_x, line_y), name, font=body, fill=NAME_FILL) + _draw_segments(draw, name_x - 12 - rating_w, line_y, rating_segs, body) + # below: icon+name pairs, the whole group right-aligned to the edge + total = _vehicle_pairs_width(draw, units, veh_icon, small) + _draw_vehicle_pairs(overlay, draw, units, start_x + section_width - total, + veh_y, vname_y, veh_icon, small) + + # stat values, vertically centered on the identity line + stats = [ + ("Air", p.get("air_kills", 0)), + ("Ground", p.get("ground_kills", 0)), + ("Assists", p.get("assists", 0)), + ("Deaths", p.get("deaths", 0)), + ("Caps", p.get("captures", 0)), + ] + for label, val in stats: + num = int(val) + fill = NEUTRAL_FILL + if label in ("Air", "Ground") and num > 0: + fill = WIN_FILL + elif label == "Deaths" and num > 0: + fill = (255, 20, 20, 255) + elif label == "Caps" and num > 0: + fill = DRAW_FILL + elif label == "Assists" and num > 0: + fill = (230, 150, 90, 255) + draw.text((col_center[label], id_mid), str(num), font=fonts["stat"], fill=fill, anchor="mm") + + y_off += row_h + 14 + + draw_team(0, teams[0], x_start + 10, col_width) + draw_team(1, teams[1], x_start + col_width, col_width) + + # ── Composite + downsample + final = Image.alpha_composite(background, overlay) + scale = 0.5 + final = final.resize((int(bg_w * scale), int(bg_h * scale)), resample=Resampling.LANCZOS).convert("RGB") + final.save(output_path, format="PNG", compress_level=1) + + +RATING_FILL = (170, 200, 255, 255) + + +def _rating_segments(rating, delta): + """Build colored (text, fill) segments for the inline ``(1468 +2)`` rating. + + ``delta`` is a signed change vs the player's previous rating; it's ``None`` until + rating history is tracked (see wl.py), so for now only the current value shows. + """ + rating_str = f"{int(round(rating))}" if isinstance(rating, (int, float)) else "β€”" + if isinstance(delta, (int, float)) and delta != 0: + dcol = WIN_FILL if delta > 0 else LOSS_FILL + return [(f"({rating_str} ", RATING_FILL), (f"{'+' if delta > 0 else ''}{int(delta)}", dcol), (")", RATING_FILL)] + return [(f"({rating_str})", RATING_FILL)] + + +def _segments_width(draw, segs, font): + return sum(draw.textbbox((0, 0), s, font=font)[2] for s, _ in segs) + + +def _draw_segments(draw, x, y, segs, font): + for s, c in segs: + draw.text((x, y), s, font=font, fill=c) + x += draw.textbbox((0, 0), s, font=font)[2] + return x + + +def _paste_vehicle(overlay, unit, xy, size): + """Paste a vehicle icon (unused ones dimmed), falling back to not_found.""" + alpha = 255 if unit["used"] else 90 + for path in (VEHICLE_ICON_DIR / f"{unit['internal'].lower()}.png", ICON_BASE_DIR / "not_found.png"): + try: + img = load_icon(path, (size, size), alpha=alpha) + overlay.paste(img, (int(xy[0]), int(xy[1])), img) + return + except Exception: + continue + + +_VEH_PAIR_GAP = 22 + + +def _vehicle_pairs_width(draw, units, size, font): + """Total pixel width of the icon+name pairs (for right-aligning team 2).""" + w = 0 + for u in units: + w += size + 4 + draw.textbbox((0, 0), u["name"], font=font)[2] + _VEH_PAIR_GAP + return max(0, w - _VEH_PAIR_GAP) + + +def _draw_vehicle_pairs(overlay, draw, units, x, veh_y, vname_y, size, font): + """Draw each vehicle as an icon immediately followed by its translated name.""" + x = int(x) + for u in units: + _paste_vehicle(overlay, u, (x, veh_y), size) + x += size + 4 + draw.text((x, vname_y), u["name"], font=font, fill=NEUTRAL_FILL) + x += draw.textbbox((0, 0), u["name"], font=font)[2] + _VEH_PAIR_GAP + return x + + +async def create_scoreboard(model: dict[str, Any], output_path: str, bar_color: str = "") -> None: + """Render the TSS scoreboard for a game model and write it to ``output_path``.""" + await asyncio.to_thread(_create_scoreboard_sync, model, output_path, bar_color) diff --git a/BOT/transform.py b/BOT/transform.py new file mode 100644 index 0000000..36ff393 --- /dev/null +++ b/BOT/transform.py @@ -0,0 +1,156 @@ +"""Raw TSS game β†’ scoreboard model adapter. + +``process_game`` receives the raw Spectra TSS payload (also persisted to disk by +``tss_ws._write_game``). Its shape is awkward for rendering: players live in a dict +keyed by uid, team identity and pvp_ratio live in a parallel ``tss`` block, and each +player carries a full ``units[]`` lineup rather than a single vehicle. + +``build_scoreboard_model`` flattens all of that into a stable, renderer-friendly dict +so ``scoreboard.py`` never has to understand the raw feed. Pure (no I/O); unit-tested +against sample replays. +""" +from __future__ import annotations + +import logging +from typing import Any, Optional + +log = logging.getLogger("tssbot.transform") + +# Imported lazily/defensively so the module is importable (and unit-testable for the +# non-translation paths) even if SHARED isn't on sys.path yet. +try: + from data_parser import LangTableReader, apply_vehicle_name_filters # type: ignore +except Exception: # pragma: no cover - exercised only when SHARED is missing + LangTableReader = None # type: ignore + + def apply_vehicle_name_filters(name, strip_decorations=True): # type: ignore + return name + + +def _translator(lang_column: str): + """Return a translate(internal)->human callable for the given lang column. + + Falls back to identity (internal name) when data_parser is unavailable. + """ + if LangTableReader is None: + return lambda v: v + try: + reader = LangTableReader(lang_column) + except Exception as exc: # pragma: no cover + log.warning("LangTableReader(%r) failed: %s", lang_column, exc) + return lambda v: v + + def _t(internal: str) -> str: + try: + translated = reader.get_translate(internal) or internal + except Exception: + translated = internal + # Strip country-leak / premium / tree decoration glyphs the PNG font can't + # render (e.g. the leading ␗ on cn_t_34_85_d_5t), matching SREBOT's renderer. + return apply_vehicle_name_filters(translated) + + return _t + + +def _build_units(units: list[dict[str, Any]], translate) -> list[dict[str, Any]]: + """Normalize a player's lineup: icon key (internal), translated name, used flag.""" + out: list[dict[str, Any]] = [] + for u in units or []: + internal = str(u.get("unit") or "").strip() + if not internal: + continue + out.append({ + "internal": internal, + "name": translate(internal) or u.get("unit_normalized") or internal, + "used": bool(u.get("used")), + }) + return out + + +def _pvp_index(tss: dict[str, Any]) -> dict[str, dict[str, Any]]: + """Map uid -> {pvp_ratio, role} from the tss per-team roster blocks.""" + idx: dict[str, dict[str, Any]] = {} + for slot in ("1", "2"): + team = tss.get(slot) + if not isinstance(team, dict): + continue + for entry in team.get("players") or []: + uid = str(entry.get("uid") or "") + if uid: + idx[uid] = { + "pvp_ratio": entry.get("pvp_ratio"), + "role": entry.get("role"), + } + return idx + + +def build_scoreboard_model(game: dict[str, Any], lang_column: str = "") -> Optional[dict[str, Any]]: + """Flatten a raw TSS game into a scoreboard render model. + + Returns ``None`` if the game lacks the minimum structure (two teams w/ players). + """ + tss = game.get("tss") or {} + players_raw = game.get("players") or {} + if not isinstance(players_raw, dict): + return None + + translate = _translator(lang_column) + pvp = _pvp_index(tss) + + teams: dict[str, dict[str, Any]] = {} + for slot in ("1", "2"): + raw_meta = tss.get(slot) + meta: dict[str, Any] = raw_meta if isinstance(raw_meta, dict) else {} + teams[slot] = { + "slot": slot, + "team_id": str(meta.get("team_id")) if meta.get("team_id") is not None else None, + "team_name": str(meta.get("team_name") or f"Team {slot}"), + "players": [], + } + + for uid, p in players_raw.items(): + if not isinstance(p, dict): + continue + slot = str(p.get("team") or "") + if slot not in teams: + continue + info = pvp.get(str(uid), {}) + teams[slot]["players"].append({ + "uid": str(uid), + "nick": str(p.get("name") or ""), + # No streamer-mode field in the TSS feed yet; renderer falls back to nick. + "fake_nick": None, + "tag": str(p.get("tag") or ""), + "country_id": p.get("country_id"), + "air_kills": int(p.get("air_kills") or 0), + "ground_kills": int(p.get("ground_kills") or 0), + "assists": int(p.get("assists") or 0), + "deaths": int(p.get("deaths") or 0), + "captures": int(p.get("captures") or 0), + "score": int(p.get("score") or 0), + "pvp_ratio": info.get("pvp_ratio"), + "role": info.get("role"), + "units": _build_units(p.get("units") or [], translate), + }) + + if not teams["1"]["players"] or not teams["2"]["players"]: + return None + + winner_slot = str(game.get("winner") or "") + is_draw = bool(game.get("draw")) + winner_name = teams[winner_slot]["team_name"] if winner_slot in teams and not is_draw else None + + return { + "session_id": str(game.get("_id") or ""), + "utc_timestamp": int(game.get("end_ts") or 0), + "start_ts": int(game.get("start_ts") or 0), + "duration": game.get("duration"), + "map": str(game.get("mission_name") or ""), + "mission_mode": str(game.get("mission_mode") or ""), + "tournament_name": str(tss.get("tournament_name") or ""), + "bracket": str(tss.get("bracket") or ""), + "is_draw": is_draw, + "winner": winner_name, + "winner_slot": winner_slot, + "teams": [teams["1"], teams["2"]], + } diff --git a/BOT/wl.py b/BOT/wl.py new file mode 100644 index 0000000..be3700a --- /dev/null +++ b/BOT/wl.py @@ -0,0 +1,20 @@ +"""TSS win/loss standings β€” STUB. + +SREBOT tracks squadron W/L globally. TSS needs the same idea but **scoped per +tournament** (a team's record only means something inside its bracket), so the real +implementation is deferred. Until then ``get_tss_standings`` returns ``None`` for +every team and the scoreboard renders a neutral placeholder in the W/L slot. + +TODO(tss-wl): persist per-(tournament_id, team_id) wins/losses and resolve here. +""" +from __future__ import annotations + +from typing import Any, Optional + + +def get_tss_standings(team_id: Optional[str], tournament_id: Optional[Any] = None) -> Optional[dict[str, int]]: + """Return ``{"wins": int, "losses": int}`` for a team, or ``None`` if untracked. + + Stub: always ``None`` until per-tournament W/L tracking lands. + """ + return None diff --git a/fetch_replays.sh b/fetch_replays.sh index 8462207..9fe876a 100644 --- a/fetch_replays.sh +++ b/fetch_replays.sh @@ -1,18 +1,36 @@ #!/usr/bin/env bash -# Pull TSS replay dirs from the server for local study. -# Run once manually: bash fetch_replays.sh +# Pull the latest TSS replay dirs from the server for local study. +# Run once manually: bash fetch_replays.sh [COUNT] (default COUNT=10) set -euo pipefail REMOTE="srebot" REMOTE_PATH="/mnt/HC_Volume_105581488/STORAGE/REPLAYS/TSS/" LOCAL_PATH="./replays_sample" +COUNT="${1:-10}" mkdir -p "$LOCAL_PATH" +# Grab the COUNT most-recently-modified replay dirs (newest first). +mapfile -t DIRS < <(ssh "$REMOTE" "cd '$REMOTE_PATH' && ls -1dt */ 2>/dev/null | head -n $COUNT") + +if [ "${#DIRS[@]}" -eq 0 ]; then + echo "No replay dirs found on ${REMOTE}:${REMOTE_PATH}" >&2 + exit 1 +fi + +echo "Pulling ${#DIRS[@]} latest replay dir(s):" +printf ' %s\n' "${DIRS[@]}" + +# Build include filters: each dir + its replay_data.json.gz, exclude the rest. +INCLUDES=() +for d in "${DIRS[@]}"; do + d="${d%/}" + INCLUDES+=( --include="${d}/" --include="${d}/replay_data.json.gz" ) +done + rsync -avz --progress \ - --include="*/" \ - --include="replay_data.json.gz" \ + "${INCLUDES[@]}" \ --exclude="*" \ "${REMOTE}:${REMOTE_PATH}" \ "$LOCAL_PATH/" diff --git a/replays_local/6b7801f00031a48/replay_data.json.gz b/replays_local/6b7801f00031a48/replay_data.json.gz deleted file mode 100644 index 15a423f..0000000 Binary files a/replays_local/6b7801f00031a48/replay_data.json.gz and /dev/null differ diff --git a/replays_local/6ba5e84001245fa/replay_data.json.gz b/replays_local/6ba5e84001245fa/replay_data.json.gz deleted file mode 100644 index 7a39a24..0000000 Binary files a/replays_local/6ba5e84001245fa/replay_data.json.gz and /dev/null differ diff --git a/scripts/gen_sample_scoreboards.py b/scripts/gen_sample_scoreboards.py new file mode 100644 index 0000000..a0be5b5 --- /dev/null +++ b/scripts/gen_sample_scoreboards.py @@ -0,0 +1,37 @@ +"""Render scoreboards for every replay in TSSBOT/replays_sample/ for visual review. + +Run from the TSSBOT dir: python -m scripts.gen_sample_scoreboards +Outputs /scoreboard.png next to each replay_data.json. +""" +import asyncio +import glob +import json +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from BOT import transform, scoreboard # noqa: E402 + + +async def main(): + files = sorted(glob.glob("replays_sample/*/replay_data.json")) + if not files: + print("No sample replays found under replays_sample/") + return + for f in files: + with open(f, encoding="utf-8") as fh: + game = json.load(fh) + model = transform.build_scoreboard_model(game, "") + if not model: + print(f"SKIP {f}: model build returned None") + continue + out = os.path.join(os.path.dirname(f), "scoreboard.png") + # bar_color "not_set": no guild team context in this offline render. + await scoreboard.create_scoreboard(model, out, bar_color="not_set") + size = os.path.getsize(out) + print(f"OK {model['map']:<22} {model['session_id']} -> {out} ({size//1024} KB)") + + +if __name__ == "__main__": + asyncio.run(main())