Files
SREBOT/BOT/tally.py
T
NotSoToothless 9222f7c53f Auto merge dev → main (#1339)
* feat(tally): /tally-claim, /tally-transfer, /tally-wipe commands

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tally): idle sweep, startup load, and empty-VC expiry

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* style(tally): parenthesize voice-state guard for clarity

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tally): update live tallies when sessions finish

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(tally): robust winner matching + cleanup of deleted-VC tallies

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(tally): /dev-tally to manually attribute a win/loss in your VC

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 01:19:19 -07:00

307 lines
10 KiB
Python

"""Live per-voice-channel SQB tally tracking.
A member connected to a voice channel claims an IGN or a squadron via the
``/tally-claim`` command. As SQB games finish involving that target, the bot
updates the voice channel's Discord status to a running scoreline such as
``2W-1L: Win against ENEMY``. The count is fresh from the moment of claim and
is independent of the cumulative standings in ``wl.py``.
State lives in an in-memory registry keyed by ``(guild_id, channel_id)``,
mirrored to ``STORAGE_DIR/TALLY.json`` so tallies survive restarts. Tallies are
wiped on a 1h idle sweep or when the voice channel empties of human members.
"""
from __future__ import annotations
import json
import logging
import re
import time
from dataclasses import dataclass, field, asdict
from typing import Optional
import discord
from .utils import t, STORAGE_DIR, get_bot, guild_lang
IDLE_TIMEOUT = 3600
RECENT_SESSIONS_MAX = 50
@dataclass
class Tally:
guild_id: int
channel_id: int
mode: str # "player" | "squadron"
target: str # IGN (player mode) or squadron short (squadron mode)
display_target: str
wins: int = 0
losses: int = 0
last_result_kind: Optional[str] = None # "win" | "loss" | "draw"
last_opponent: Optional[str] = None
last_update_ts: float = 0.0
claimed_by: int = 0
created_ts: float = 0.0
recent_sessions: list[str] = field(default_factory=list)
def strip_tag(s: Optional[str]) -> str:
"""Strip leading/trailing non-alphanumerics (mirrors autologging._strip_tag)."""
if not s:
return ""
return re.sub(r'^[^A-Za-z0-9]+|[^A-Za-z0-9]+$', '', s) or s
def team_short(team: dict) -> str:
"""Bare short tag for a replay team dict."""
return strip_tag(team.get("squadron_short") or team.get("squadron"))
def team_identities(team: dict) -> set[str]:
"""All lowercased, tag-stripped squadron identifiers for a replay team."""
out = set()
for key in ("squadron_short", "squadron", "squadron_tagged"):
v = strip_tag(team.get(key))
if v:
out.add(v.lower())
return out
def team_index_for(tally: Tally, teams: list[dict]) -> Optional[int]:
"""Return the index of the team the tracked entity is on, else None."""
for idx, team in enumerate(teams):
if not team:
continue
if tally.mode == "squadron":
if strip_tag(tally.target).lower() in team_identities(team):
return idx
else: # player mode
target = tally.target.strip().lower()
for p in team.get("players", []):
if str(p.get("nick") or "").strip().lower() == target:
return idx
return None
def evaluate(
tally: Tally,
teams: list[dict],
winner_short: Optional[str],
is_draw: bool,
session_id: str,
) -> Optional[str]:
"""Apply a finished session to the tally, mutating it in place.
Returns the result kind applied ("win"|"loss"|"draw"), or None if the
tracked entity was not in this game or the session was already counted.
"""
if session_id and session_id in tally.recent_sessions:
return None
idx = team_index_for(tally, teams)
if idx is None:
return None
other = teams[1 - idx] if len(teams) == 2 and idx in (0, 1) else None
opponent = team_short(other) if other else None
if is_draw:
kind = "draw"
tally.losses += 1
elif winner_short and strip_tag(winner_short).lower() in team_identities(teams[idx]):
kind = "win"
tally.wins += 1
else:
kind = "loss"
tally.losses += 1
tally.last_result_kind = kind
tally.last_opponent = opponent
if session_id:
tally.recent_sessions.append(session_id)
if len(tally.recent_sessions) > RECENT_SESSIONS_MAX:
tally.recent_sessions = tally.recent_sessions[-RECENT_SESSIONS_MAX:]
return kind
def format_status(tally: Tally, lang: str) -> str:
"""Build the voice-channel status string for a tally."""
base = f"{tally.wins}W-{tally.losses}L"
if tally.last_result_kind and tally.last_opponent:
verb = t(lang, f"commands.tally.result_{tally.last_result_kind}")
return t(
lang,
"commands.tally.status_line",
base=base,
verb=verb,
opponent=tally.last_opponent,
)
return base
# ---------------------------------------------------------------------------
# Registry and persistence
# ---------------------------------------------------------------------------
TALLY_PATH = STORAGE_DIR / "TALLY.json"
_REGISTRY: dict[tuple[int, int], "Tally"] = {}
def _save() -> None:
"""Persist the registry to disk (best-effort)."""
try:
data = [asdict(tly) for tly in _REGISTRY.values()]
tmp = TALLY_PATH.with_suffix(".json.tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
tmp.replace(TALLY_PATH)
except Exception as e:
logging.error(f"[TALLY] failed to save state: {e}")
def load_from_disk() -> None:
"""Load persisted tallies into the in-memory registry."""
try:
raw = json.loads(TALLY_PATH.read_text(encoding="utf-8"))
except FileNotFoundError:
return
except Exception as e:
logging.error(f"[TALLY] failed to load state: {e}")
return
_REGISTRY.clear()
for d in raw:
try:
tly = Tally(**d)
_REGISTRY[(tly.guild_id, tly.channel_id)] = tly
except Exception as e:
logging.error(f"[TALLY] skipping bad record {d}: {e}")
def get(guild_id: int, channel_id: int) -> Optional["Tally"]:
return _REGISTRY.get((guild_id, channel_id))
def tallies_for_guild(guild_id: int) -> list["Tally"]:
return [tly for (g, _c), tly in _REGISTRY.items() if g == guild_id]
def claim(guild_id: int, channel_id: int, mode: str, target: str,
display_target: str, user_id: int) -> "Tally":
now = time.time()
tly = Tally(
guild_id=guild_id, channel_id=channel_id, mode=mode, target=target,
display_target=display_target, last_update_ts=now, claimed_by=user_id,
created_ts=now,
)
_REGISTRY[(guild_id, channel_id)] = tly
_save()
return tly
def transfer(guild_id: int, channel_id: int, target: str,
display_target: str) -> Optional["Tally"]:
"""Switch an active tally to player mode on a new IGN, carrying the count."""
tly = _REGISTRY.get((guild_id, channel_id))
if tly is None:
return None
tly.mode = "player"
tly.target = target
tly.display_target = display_target
tly.last_update_ts = time.time()
_save()
return tly
def wipe(guild_id: int, channel_id: int) -> bool:
if _REGISTRY.pop((guild_id, channel_id), None) is None:
return False
_save()
return True
def apply_manual_result(guild_id: int, channel_id: int, kind: str,
opponent: str = "DEV") -> Optional["Tally"]:
"""Manually bump a VC's active tally by a win or loss (dev/testing).
``kind`` is "win" or "loss". Mirrors what a finished game would do but with
no real opponent, so ``last_opponent`` is set to a fixed label. Returns the
updated tally, or None if the VC has no active tally.
"""
tly = _REGISTRY.get((guild_id, channel_id))
if tly is None:
return None
if kind == "win":
tly.wins += 1
else:
tly.losses += 1
tly.last_result_kind = kind
tly.last_opponent = opponent
tly.last_update_ts = time.time()
_save()
return tly
# ---------------------------------------------------------------------------
# Voice-status HTTP helper
# ---------------------------------------------------------------------------
async def set_voice_status(channel_id: int, text: str) -> None:
"""Set a voice channel's Discord status via the raw HTTP endpoint.
discord.py 2.7.1 has no native voice-status API, so call the route
directly: PUT /channels/{id}/voice-status {"status": text}.
"""
bot = get_bot()
route = discord.http.Route( # type: ignore[attr-defined]
"PUT", "/channels/{channel_id}/voice-status", channel_id=channel_id
)
try:
await bot.http.request(route, json={"status": text})
except Exception as e:
logging.error(f"[TALLY] set_voice_status failed for {channel_id}: {e}")
async def push_status(tly: "Tally") -> None:
lang = await guild_lang(tly.guild_id)
await set_voice_status(tly.channel_id, format_status(tly, lang))
async def clear_status(channel_id: int) -> None:
await set_voice_status(channel_id, "")
# ---------------------------------------------------------------------------
# Session hook
# ---------------------------------------------------------------------------
async def on_session_processed(guild_id: int, teams: list[dict],
winner_short: Optional[str], is_draw: bool,
session_id: str) -> None:
"""Update every active tally in a guild against one finished session."""
for tly in tallies_for_guild(guild_id):
try:
kind = evaluate(tly, teams, winner_short, is_draw, session_id)
if kind is None:
continue
tly.last_update_ts = time.time()
_save()
await push_status(tly)
except Exception as e:
logging.error(f"[TALLY] on_session_processed error (guild={guild_id}): {e}")
# ---------------------------------------------------------------------------
# Idle sweep
# ---------------------------------------------------------------------------
async def sweep_idle() -> None:
"""Wipe tallies that are idle >= IDLE_TIMEOUT or whose channel is gone."""
now = time.time()
bot = get_bot()
for (g, c), tly in list(_REGISTRY.items()):
channel_gone = bot is None or bot.get_channel(c) is None
if channel_gone:
wipe(g, c) # channel gone: drop without an HTTP clear
continue
if now - tly.last_update_ts >= IDLE_TIMEOUT:
wipe(g, c)
await clear_status(c)