28a635438d
- Move tally hook from process_session (per-guild, gated by Logs subs) to process_ws_replays (once per game, all guilds) via on_game_finished - Add set_voice_channel_status permission check at /tally-claim time so failures are immediate and visible rather than silent on every game - Remove entitlement gate from tally_claim and tally_transfer - Add VC tally permission section to /diagnose-perms when run in a VC - Add 5 new locale keys to en.json for the permission messages Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
293 lines
9.7 KiB
Python
293 lines
9.7 KiB
Python
"""Live per-voice-channel SQB tally tracking.
|
|
|
|
A member connected to a voice channel claims an IGN or a squadron via the
|
|
``/tally-claim`` command. As SQB games finish involving that target, the bot
|
|
updates the voice channel's Discord status to a running scoreline such as
|
|
``2W-1L: Win against ENEMY``. The count is fresh from the moment of claim and
|
|
is independent of the cumulative standings in ``wl.py``.
|
|
|
|
State lives in an in-memory registry keyed by ``(guild_id, channel_id)``,
|
|
mirrored to ``STORAGE_DIR/TALLY.json`` so tallies survive restarts. Tallies are
|
|
wiped on a 1h idle sweep or when the voice channel empties of human members.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass, field, asdict
|
|
from typing import Optional
|
|
|
|
import discord
|
|
|
|
from .utils import t, STORAGE_DIR, get_bot, guild_lang
|
|
|
|
IDLE_TIMEOUT = 3600
|
|
RECENT_SESSIONS_MAX = 50
|
|
|
|
|
|
@dataclass
|
|
class Tally:
|
|
guild_id: int
|
|
channel_id: int
|
|
mode: str # "player" | "squadron"
|
|
target: str # IGN (player mode) or squadron short (squadron mode)
|
|
display_target: str
|
|
wins: int = 0
|
|
losses: int = 0
|
|
last_result_kind: Optional[str] = None # "win" | "loss" | "draw"
|
|
last_opponent: Optional[str] = None
|
|
last_update_ts: float = 0.0
|
|
claimed_by: int = 0
|
|
created_ts: float = 0.0
|
|
recent_sessions: list[str] = field(default_factory=list)
|
|
|
|
|
|
def strip_tag(s: Optional[str]) -> str:
|
|
"""Strip leading/trailing non-alphanumerics (mirrors autologging._strip_tag)."""
|
|
if not s:
|
|
return ""
|
|
return re.sub(r'^[^A-Za-z0-9]+|[^A-Za-z0-9]+$', '', s) or s
|
|
|
|
|
|
def team_short(team: dict) -> str:
|
|
"""Bare short tag for a replay team dict."""
|
|
return strip_tag(team.get("squadron_short") or team.get("squadron"))
|
|
|
|
|
|
def team_identities(team: dict) -> set[str]:
|
|
"""All lowercased, tag-stripped squadron identifiers for a replay team."""
|
|
out = set()
|
|
for key in ("squadron_short", "squadron", "squadron_tagged"):
|
|
v = strip_tag(team.get(key))
|
|
if v:
|
|
out.add(v.lower())
|
|
return out
|
|
|
|
|
|
def team_index_for(tally: Tally, teams: list[dict]) -> Optional[int]:
|
|
"""Return the index of the team the tracked entity is on, else None."""
|
|
for idx, team in enumerate(teams):
|
|
if not team:
|
|
continue
|
|
if tally.mode == "squadron":
|
|
if strip_tag(tally.target).lower() in team_identities(team):
|
|
return idx
|
|
else: # player mode
|
|
target = tally.target.strip().lower()
|
|
for p in team.get("players", []):
|
|
if str(p.get("nick") or "").strip().lower() == target:
|
|
return idx
|
|
return None
|
|
|
|
|
|
def evaluate(
|
|
tally: Tally,
|
|
teams: list[dict],
|
|
winner_short: Optional[str],
|
|
is_draw: bool,
|
|
session_id: str,
|
|
) -> Optional[str]:
|
|
"""Apply a finished session to the tally, mutating it in place.
|
|
|
|
Returns the result kind applied ("win"|"loss"|"draw"), or None if the
|
|
tracked entity was not in this game or the session was already counted.
|
|
"""
|
|
if session_id and session_id in tally.recent_sessions:
|
|
return None
|
|
idx = team_index_for(tally, teams)
|
|
if idx is None:
|
|
return None
|
|
|
|
other = teams[1 - idx] if len(teams) == 2 and idx in (0, 1) else None
|
|
opponent = team_short(other) if other else None
|
|
|
|
if is_draw:
|
|
kind = "draw"
|
|
tally.losses += 1
|
|
elif winner_short and strip_tag(winner_short).lower() in team_identities(teams[idx]):
|
|
kind = "win"
|
|
tally.wins += 1
|
|
else:
|
|
kind = "loss"
|
|
tally.losses += 1
|
|
|
|
tally.last_result_kind = kind
|
|
tally.last_opponent = opponent
|
|
if session_id:
|
|
tally.recent_sessions.append(session_id)
|
|
if len(tally.recent_sessions) > RECENT_SESSIONS_MAX:
|
|
tally.recent_sessions = tally.recent_sessions[-RECENT_SESSIONS_MAX:]
|
|
return kind
|
|
|
|
|
|
def format_status(tally: Tally, lang: str) -> str:
|
|
"""Build the voice-channel status string for a tally."""
|
|
base = f"{tally.wins}W-{tally.losses}L"
|
|
if tally.last_result_kind and tally.last_opponent:
|
|
verb = t(lang, f"commands.tally.result_{tally.last_result_kind}")
|
|
return t(
|
|
lang,
|
|
"commands.tally.status_line",
|
|
base=base,
|
|
verb=verb,
|
|
opponent=tally.last_opponent,
|
|
)
|
|
return base
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registry and persistence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
TALLY_PATH = STORAGE_DIR / "TALLY.json"
|
|
|
|
_REGISTRY: dict[tuple[int, int], "Tally"] = {}
|
|
|
|
|
|
def _save() -> None:
|
|
"""Persist the registry to disk (best-effort)."""
|
|
try:
|
|
data = [asdict(tly) for tly in _REGISTRY.values()]
|
|
tmp = TALLY_PATH.with_suffix(".json.tmp")
|
|
tmp.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
|
|
tmp.replace(TALLY_PATH)
|
|
except Exception as e:
|
|
logging.error(f"[TALLY] failed to save state: {e}")
|
|
|
|
|
|
def load_from_disk() -> None:
|
|
"""Load persisted tallies into the in-memory registry."""
|
|
try:
|
|
raw = json.loads(TALLY_PATH.read_text(encoding="utf-8"))
|
|
except FileNotFoundError:
|
|
return
|
|
except Exception as e:
|
|
logging.error(f"[TALLY] failed to load state: {e}")
|
|
return
|
|
_REGISTRY.clear()
|
|
for d in raw:
|
|
try:
|
|
tly = Tally(**d)
|
|
_REGISTRY[(tly.guild_id, tly.channel_id)] = tly
|
|
except Exception as e:
|
|
logging.error(f"[TALLY] skipping bad record {d}: {e}")
|
|
|
|
|
|
def get(guild_id: int, channel_id: int) -> Optional["Tally"]:
|
|
return _REGISTRY.get((guild_id, channel_id))
|
|
|
|
|
|
def tallies_for_guild(guild_id: int) -> list["Tally"]:
|
|
return [tly for (g, _c), tly in _REGISTRY.items() if g == guild_id]
|
|
|
|
|
|
def claim(guild_id: int, channel_id: int, mode: str, target: str,
|
|
display_target: str, user_id: int) -> "Tally":
|
|
now = time.time()
|
|
tly = Tally(
|
|
guild_id=guild_id, channel_id=channel_id, mode=mode, target=target,
|
|
display_target=display_target, last_update_ts=now, claimed_by=user_id,
|
|
created_ts=now,
|
|
)
|
|
_REGISTRY[(guild_id, channel_id)] = tly
|
|
_save()
|
|
return tly
|
|
|
|
|
|
def transfer(guild_id: int, channel_id: int, target: str,
|
|
display_target: str) -> Optional["Tally"]:
|
|
"""Switch an active tally to player mode on a new IGN, carrying the count."""
|
|
tly = _REGISTRY.get((guild_id, channel_id))
|
|
if tly is None:
|
|
return None
|
|
tly.mode = "player"
|
|
tly.target = target
|
|
tly.display_target = display_target
|
|
tly.last_update_ts = time.time()
|
|
_save()
|
|
return tly
|
|
|
|
|
|
def wipe(guild_id: int, channel_id: int) -> bool:
|
|
if _REGISTRY.pop((guild_id, channel_id), None) is None:
|
|
return False
|
|
_save()
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Voice-status HTTP helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def set_voice_status(channel_id: int, text: str) -> None:
|
|
"""Set a voice channel's Discord status via the raw HTTP endpoint.
|
|
|
|
discord.py 2.7.1 has no native voice-status API, so call the route
|
|
directly: PUT /channels/{id}/voice-status {"status": text}.
|
|
"""
|
|
bot = get_bot()
|
|
route = discord.http.Route( # type: ignore[attr-defined]
|
|
"PUT", "/channels/{channel_id}/voice-status", channel_id=channel_id
|
|
)
|
|
try:
|
|
await bot.http.request(route, json={"status": text})
|
|
except Exception as e:
|
|
logging.error(f"[TALLY] set_voice_status failed for {channel_id}: {e}")
|
|
|
|
|
|
async def push_status(tly: "Tally") -> None:
|
|
lang = await guild_lang(tly.guild_id)
|
|
await set_voice_status(tly.channel_id, format_status(tly, lang))
|
|
|
|
|
|
async def clear_status(channel_id: int) -> None:
|
|
await set_voice_status(channel_id, "")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session hook
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def on_session_processed(guild_id: int, teams: list[dict],
|
|
winner_short: Optional[str], is_draw: bool,
|
|
session_id: str) -> None:
|
|
"""Update every active tally in a guild against one finished session."""
|
|
for tly in tallies_for_guild(guild_id):
|
|
try:
|
|
kind = evaluate(tly, teams, winner_short, is_draw, session_id)
|
|
if kind is None:
|
|
continue
|
|
tly.last_update_ts = time.time()
|
|
_save()
|
|
await push_status(tly)
|
|
except Exception as e:
|
|
logging.error(f"[TALLY] on_session_processed error (guild={guild_id}): {e}")
|
|
|
|
|
|
async def on_game_finished(teams: list[dict], winner_short: Optional[str],
|
|
is_draw: bool, session_id: str) -> None:
|
|
"""Update all active tallies across every guild for one finished session."""
|
|
guild_ids = {gid for (gid, _) in _REGISTRY}
|
|
for guild_id in guild_ids:
|
|
await on_session_processed(guild_id, teams, winner_short, is_draw, session_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Idle sweep
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def sweep_idle() -> None:
|
|
"""Wipe tallies that are idle >= IDLE_TIMEOUT or whose channel is gone."""
|
|
now = time.time()
|
|
bot = get_bot()
|
|
for (g, c), tly in list(_REGISTRY.items()):
|
|
channel_gone = bot is None or bot.get_channel(c) is None
|
|
if channel_gone:
|
|
wipe(g, c) # channel gone: drop without an HTTP clear
|
|
continue
|
|
if now - tly.last_update_ts >= IDLE_TIMEOUT:
|
|
wipe(g, c)
|
|
await clear_status(c)
|