"""Live per-voice-channel SQB tally tracking. A member connected to a voice channel claims an IGN or a squadron via the ``/tally-claim`` command. As SQB games finish involving that target, the bot updates the voice channel's Discord status to a running scoreline such as ``2W-1L: Win against ENEMY``. The count is fresh from the moment of claim and is independent of the cumulative standings in ``wl.py``. State lives in an in-memory registry keyed by ``(guild_id, channel_id)``, mirrored to ``STORAGE_DIR/TALLY.json`` so tallies survive restarts. Tallies are wiped on a 1h idle sweep or when the voice channel empties of human members. """ from __future__ import annotations import json import logging import re import time from dataclasses import dataclass, field, asdict from typing import Optional import discord from .utils import t, STORAGE_DIR, get_bot, guild_lang IDLE_TIMEOUT = 3600 RECENT_SESSIONS_MAX = 50 @dataclass class Tally: guild_id: int channel_id: int mode: str # "player" | "squadron" target: str # IGN (player mode) or squadron short (squadron mode) display_target: str wins: int = 0 losses: int = 0 last_result_kind: Optional[str] = None # "win" | "loss" | "draw" last_opponent: Optional[str] = None last_update_ts: float = 0.0 claimed_by: int = 0 created_ts: float = 0.0 recent_sessions: list[str] = field(default_factory=list) def strip_tag(s: Optional[str]) -> str: """Strip leading/trailing non-alphanumerics (mirrors autologging._strip_tag).""" if not s: return "" return re.sub(r'^[^A-Za-z0-9]+|[^A-Za-z0-9]+$', '', s) or s def team_short(team: dict) -> str: """Bare short tag for a replay team dict.""" return strip_tag(team.get("squadron_short") or team.get("squadron")) def team_identities(team: dict) -> set[str]: """All lowercased, tag-stripped squadron identifiers for a replay team.""" out = set() for key in ("squadron_short", "squadron", "squadron_tagged"): v = strip_tag(team.get(key)) if v: out.add(v.lower()) return out def team_index_for(tally: Tally, teams: list[dict]) -> Optional[int]: """Return the index of the team the tracked entity is on, else None.""" for idx, team in enumerate(teams): if not team: continue if tally.mode == "squadron": if strip_tag(tally.target).lower() in team_identities(team): return idx else: # player mode target = tally.target.strip().lower() for p in team.get("players", []): if str(p.get("nick") or "").strip().lower() == target: return idx return None def evaluate( tally: Tally, teams: list[dict], winner_short: Optional[str], is_draw: bool, session_id: str, ) -> Optional[str]: """Apply a finished session to the tally, mutating it in place. Returns the result kind applied ("win"|"loss"|"draw"), or None if the tracked entity was not in this game or the session was already counted. """ if session_id and session_id in tally.recent_sessions: return None idx = team_index_for(tally, teams) if idx is None: return None other = teams[1 - idx] if len(teams) == 2 and idx in (0, 1) else None opponent = team_short(other) if other else None if is_draw: kind = "draw" tally.losses += 1 elif winner_short and strip_tag(winner_short).lower() in team_identities(teams[idx]): kind = "win" tally.wins += 1 else: kind = "loss" tally.losses += 1 tally.last_result_kind = kind tally.last_opponent = opponent if session_id: tally.recent_sessions.append(session_id) if len(tally.recent_sessions) > RECENT_SESSIONS_MAX: tally.recent_sessions = tally.recent_sessions[-RECENT_SESSIONS_MAX:] return kind def format_status(tally: Tally, lang: str) -> str: """Build the voice-channel status string for a tally.""" base = f"{tally.wins}W-{tally.losses}L" if tally.last_result_kind and tally.last_opponent: verb = t(lang, f"commands.tally.result_{tally.last_result_kind}") return t( lang, "commands.tally.status_line", base=base, verb=verb, opponent=tally.last_opponent, ) return base # --------------------------------------------------------------------------- # Registry and persistence # --------------------------------------------------------------------------- TALLY_PATH = STORAGE_DIR / "TALLY.json" _REGISTRY: dict[tuple[int, int], "Tally"] = {} def _save() -> None: """Persist the registry to disk (best-effort).""" try: data = [asdict(tly) for tly in _REGISTRY.values()] tmp = TALLY_PATH.with_suffix(".json.tmp") tmp.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8") tmp.replace(TALLY_PATH) except Exception as e: logging.error(f"[TALLY] failed to save state: {e}") def load_from_disk() -> None: """Load persisted tallies into the in-memory registry.""" try: raw = json.loads(TALLY_PATH.read_text(encoding="utf-8")) except FileNotFoundError: return except Exception as e: logging.error(f"[TALLY] failed to load state: {e}") return _REGISTRY.clear() for d in raw: try: tly = Tally(**d) _REGISTRY[(tly.guild_id, tly.channel_id)] = tly except Exception as e: logging.error(f"[TALLY] skipping bad record {d}: {e}") def get(guild_id: int, channel_id: int) -> Optional["Tally"]: return _REGISTRY.get((guild_id, channel_id)) def tallies_for_guild(guild_id: int) -> list["Tally"]: return [tly for (g, _c), tly in _REGISTRY.items() if g == guild_id] def claim(guild_id: int, channel_id: int, mode: str, target: str, display_target: str, user_id: int) -> "Tally": now = time.time() tly = Tally( guild_id=guild_id, channel_id=channel_id, mode=mode, target=target, display_target=display_target, last_update_ts=now, claimed_by=user_id, created_ts=now, ) _REGISTRY[(guild_id, channel_id)] = tly _save() return tly def transfer(guild_id: int, channel_id: int, target: str, display_target: str) -> Optional["Tally"]: """Switch an active tally to player mode on a new IGN, carrying the count.""" tly = _REGISTRY.get((guild_id, channel_id)) if tly is None: return None tly.mode = "player" tly.target = target tly.display_target = display_target tly.last_update_ts = time.time() _save() return tly def wipe(guild_id: int, channel_id: int) -> bool: if _REGISTRY.pop((guild_id, channel_id), None) is None: return False _save() return True def apply_manual_result(guild_id: int, channel_id: int, kind: str, opponent: str = "DEV") -> Optional["Tally"]: """Manually bump a VC's active tally by a win or loss (dev/testing). ``kind`` is "win" or "loss". Mirrors what a finished game would do but with no real opponent, so ``last_opponent`` is set to a fixed label. Returns the updated tally, or None if the VC has no active tally. """ tly = _REGISTRY.get((guild_id, channel_id)) if tly is None: return None if kind == "win": tly.wins += 1 else: tly.losses += 1 tly.last_result_kind = kind tly.last_opponent = opponent tly.last_update_ts = time.time() _save() return tly # --------------------------------------------------------------------------- # Voice-status HTTP helper # --------------------------------------------------------------------------- async def set_voice_status(channel_id: int, text: str) -> None: """Set a voice channel's Discord status via the raw HTTP endpoint. discord.py 2.7.1 has no native voice-status API, so call the route directly: PUT /channels/{id}/voice-status {"status": text}. """ bot = get_bot() route = discord.http.Route( # type: ignore[attr-defined] "PUT", "/channels/{channel_id}/voice-status", channel_id=channel_id ) try: await bot.http.request(route, json={"status": text}) except Exception as e: logging.error(f"[TALLY] set_voice_status failed for {channel_id}: {e}") async def push_status(tly: "Tally") -> None: lang = await guild_lang(tly.guild_id) await set_voice_status(tly.channel_id, format_status(tly, lang)) async def clear_status(channel_id: int) -> None: await set_voice_status(channel_id, "") # --------------------------------------------------------------------------- # Session hook # --------------------------------------------------------------------------- async def on_session_processed(guild_id: int, teams: list[dict], winner_short: Optional[str], is_draw: bool, session_id: str) -> None: """Update every active tally in a guild against one finished session.""" for tly in tallies_for_guild(guild_id): try: kind = evaluate(tly, teams, winner_short, is_draw, session_id) if kind is None: continue tly.last_update_ts = time.time() _save() await push_status(tly) except Exception as e: logging.error(f"[TALLY] on_session_processed error (guild={guild_id}): {e}") # --------------------------------------------------------------------------- # Idle sweep # --------------------------------------------------------------------------- async def sweep_idle() -> None: """Wipe tallies that are idle >= IDLE_TIMEOUT or whose channel is gone.""" now = time.time() bot = get_bot() for (g, c), tly in list(_REGISTRY.items()): channel_gone = bot is None or bot.get_channel(c) is None if channel_gone: wipe(g, c) # channel gone: drop without an HTTP clear continue if now - tly.last_update_ts >= IDLE_TIMEOUT: wipe(g, c) await clear_status(c)