lets get this party starteddddd (#1287)

This commit is contained in:
NotSoToothless
2026-05-30 08:45:32 -07:00
committed by GitHub
parent bd3871ef20
commit 7edc0202f4
8 changed files with 892 additions and 2 deletions
+7
View File
@@ -0,0 +1,7 @@
[
{
"title": "Welcome to TSSBOT!",
"body": "TSSBOT is just getting started. Use `/help` for more information on how to use it.",
"expires": 1784073600
}
]
+15
View File
@@ -0,0 +1,15 @@
"""TSSBOT BOT package bootstrap.
Puts the sibling ``BOTS/SHARED`` directory on ``sys.path`` (mirroring
SREBOT/BOT/__init__.py) so shared modules like ``shared_store`` and
``data_parser`` import by bare name from anywhere that imports ``BOT.*``.
``SHARED_DIR`` is re-exported for asset-path use.
"""
import sys
from pathlib import Path
SHARED_DIR: Path = Path(__file__).resolve().parents[2] / "SHARED"
if str(SHARED_DIR) not in sys.path:
sys.path.insert(0, str(SHARED_DIR))
__all__ = ["SHARED_DIR"]
+110
View File
@@ -0,0 +1,110 @@
"""TSSBOT autolog matcher.
For each game received from the Spectra TSS feed, match it against every
subscribing guild's preferences (``tss-team`` / ``tss-player`` entries) and
work out which channels should be notified, deduping per session.
NOTE: building and sending the scoreboard embed is intentionally a TODO for now
(it needs significant rework). The matching + dedup pipeline below is complete
and logs the targets it *would* post to. Wire the send in at the marked spot.
"""
from __future__ import annotations
import logging
from typing import Any, Optional
import discord
from . import preferences, storage
log = logging.getLogger("tssbot.autolog")
# Registered by start_bot once the client exists; standalone tss_ws leaves it None.
_bot: Optional[discord.Client] = None
# session_id -> set of channel_ids already handled (in-memory idempotency,
# mirrors SREBOT's _sent_channels_by_session).
_sent_channels_by_session: dict[str, set[int]] = {}
def set_bot(bot: discord.Client) -> None:
"""Register the Discord client so the matcher can post (and run at all)."""
global _bot
_bot = bot
def _present_entities(game: dict[str, Any]) -> tuple[set[str], set[str]]:
"""Return (player_uids, team_tags) present in a game dict."""
players = game.get("players") or {}
uids = {str(k) for k in players}
tags: set[str] = set()
for p in players.values():
if not isinstance(p, dict):
continue
tag_raw = p.get("tag") or ""
tag = tag_raw[1:-1] if len(tag_raw) > 2 else tag_raw
if tag:
tags.add(tag)
return uids, tags
async def process_game(game: dict[str, Any]) -> None:
"""Match one received game against guild preferences and notify channels.
Safe to call from the standalone WS listener (no-ops if no bot registered).
"""
if _bot is None:
return
session_id = str(game.get("_id") or "")
if not session_id:
return
present_uids, present_tags = _present_entities(game)
tags_lower = {t.lower() for t in present_tags}
# Resolve present tags → team_ids so team subscriptions (keyed by team_id) match.
present_team_ids: set[str] = set()
for tag in present_tags:
try:
tid = await storage.resolve_team_id_for_tag(tag)
except Exception as exc:
log.error("tag resolve failed for %s: %s", tag, exc)
tid = None
if tid is not None:
present_team_ids.add(str(tid))
sent = _sent_channels_by_session.setdefault(session_id, set())
for guild_id, prefs in preferences.iter_guild_preferences():
for entity_id, entry in prefs.items():
if not isinstance(entry, dict):
continue
channel_id, enabled = preferences.parse_channel(entry.get("Logs"))
if not channel_id or not enabled:
continue
etype = entry.get("Type")
matched = False
if etype == "tss-team":
if str(entity_id) in present_team_ids:
matched = True
else:
name = (entry.get("Name") or "").lower()
if name and name in tags_lower:
matched = True
elif etype == "tss-player":
if str(entity_id) in present_uids:
matched = True
if not matched or channel_id in sent:
continue
sent.add(channel_id)
# TODO: build & send the scoreboard embed to `channel_id`.
# The matched target is fully resolved here; the only thing missing
# is the rendered scoreboard (out of scope for now).
log.info(
"autolog match (send TODO): session=%s guild=%s channel=%s entity=%s type=%s",
session_id, guild_id, channel_id, entity_id, etype,
)
+330
View File
@@ -0,0 +1,330 @@
"""TSSBOT slash commands.
TSSBOT's first slash commands, mirroring SREBOT's resolve + autocomplete UX:
* ``/set-team`` — set this server's team (writes TEAMS.json)
* ``/log-team`` — subscribe a team's matches to this channel (autolog)
* ``/log-player`` — subscribe a player's matches to this channel (autolog)
* ``/set-player`` — link your Discord account to a WT player (shared PLAYERS.json)
* ``/player-stats`` — TSS career stats for a player (defaults to your linked account)
Config commands (set/log) require Manage Server. All commands honor the shared
discord-user blacklist.
"""
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from typing import List
import discord
from discord import app_commands
from discord.ext import commands
from . import preferences, storage
from shared_store import check_user_blacklist, get_linked_uid, save_player_link
log = logging.getLogger("tssbot.commands")
FOOTER = "ᓚᘏᗢ"
_NEWS_JSON = Path(__file__).resolve().parent / "NEWS.json"
# ---------------------------------------------------------------------------
# Checks
# ---------------------------------------------------------------------------
def not_blacklisted():
"""App-command check rejecting blacklisted Discord users (shared BLACKLIST.json)."""
async def predicate(interaction: discord.Interaction) -> bool:
blocked, reason = check_user_blacklist(interaction.user.id)
if blocked:
raise app_commands.CheckFailure(reason or "You are blacklisted from using this bot.")
return True
return app_commands.check(predicate)
# ---------------------------------------------------------------------------
# Autocomplete
# ---------------------------------------------------------------------------
async def team_autocomplete(interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]:
try:
rows = await storage.search_teams(current, 25)
except Exception:
return []
choices = []
for r in rows:
label = (r.get("tag_name") or r.get("long_name") or str(r["team_id"]))[:100]
choices.append(app_commands.Choice(name=label, value=str(r["team_id"])))
return choices
async def player_autocomplete(interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]:
try:
rows = await storage.search_players(current, 25)
except Exception:
return []
return [app_commands.Choice(name=r["nick"][:100], value=r["nick"][:100]) for r in rows]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _too_many_msg(candidates: List[dict]) -> str:
preview = ", ".join(f"`{c['nick']}`" for c in candidates[:10])
return (
"Multiple players match — pick a more specific name (use the autocomplete):\n"
f"{preview}"
)
# ---------------------------------------------------------------------------
# Cog
# ---------------------------------------------------------------------------
class TssCommands(commands.Cog):
"""Container for TSSBOT's application commands."""
def __init__(self, bot: commands.Bot):
self.bot = bot
async def cog_app_command_error(self, interaction: discord.Interaction, error: app_commands.AppCommandError):
if isinstance(error, app_commands.CheckFailure):
msg = str(error) or "You can't use this command."
if interaction.response.is_done():
await interaction.followup.send(msg, ephemeral=True)
else:
await interaction.response.send_message(msg, ephemeral=True)
return
log.error("app command error: %s", error)
try:
if interaction.response.is_done():
await interaction.followup.send(f"Something went wrong: {error}", ephemeral=True)
else:
await interaction.response.send_message(f"Something went wrong: {error}", ephemeral=True)
except discord.HTTPException:
pass
# ── /set-team ──────────────────────────────────────────────────────────
@app_commands.command(name="set-team", description="Set this server's team")
@app_commands.describe(team="Team name, tag, or ID")
@app_commands.autocomplete(team=team_autocomplete)
@app_commands.checks.has_permissions(manage_guild=True)
@not_blacklisted()
async def set_team(self, interaction: discord.Interaction, team: str):
await interaction.response.defer(ephemeral=True)
if interaction.guild_id is None:
return await interaction.followup.send("This command must be used in a server.", ephemeral=True)
resolved = await storage.resolve_team(team)
if not resolved:
return await interaction.followup.send(f"Could not find a team matching **{team}**.", ephemeral=True)
name = resolved.get("tag_name") or resolved.get("long_name") or str(resolved["team_id"])
preferences.set_guild_team(interaction.guild_id, int(resolved["team_id"]), name)
await interaction.followup.send(
f"✅ This server's team is now **{name}** (id `{resolved['team_id']}`).", ephemeral=True
)
# ── /log-team ──────────────────────────────────────────────────────────
@app_commands.command(name="log-team", description="Send a team's matches to this channel")
@app_commands.describe(team="Team name, tag, or ID")
@app_commands.autocomplete(team=team_autocomplete)
@app_commands.checks.has_permissions(manage_guild=True)
@not_blacklisted()
async def log_team(self, interaction: discord.Interaction, team: str):
await interaction.response.defer(ephemeral=True)
if interaction.guild_id is None or interaction.channel_id is None:
return await interaction.followup.send("This command must be used in a server channel.", ephemeral=True)
resolved = await storage.resolve_team(team)
if not resolved:
return await interaction.followup.send(f"Could not find a team matching **{team}**.", ephemeral=True)
name = resolved.get("tag_name") or resolved.get("long_name") or str(resolved["team_id"])
preferences.upsert_log_entry(
interaction.guild_id, int(resolved["team_id"]), "tss-team", name, interaction.channel_id
)
await interaction.followup.send(
f"✅ Logging **{name}**'s matches to <#{interaction.channel_id}>.", ephemeral=True
)
# ── /log-player ────────────────────────────────────────────────────────
@app_commands.command(name="log-player", description="Send a player's matches to this channel")
@app_commands.describe(player="Player username")
@app_commands.autocomplete(player=player_autocomplete)
@app_commands.checks.has_permissions(manage_guild=True)
@not_blacklisted()
async def log_player(self, interaction: discord.Interaction, player: str):
await interaction.response.defer(ephemeral=True)
if interaction.guild_id is None or interaction.channel_id is None:
return await interaction.followup.send("This command must be used in a server channel.", ephemeral=True)
candidates = await storage.resolve_players(player)
if not candidates:
return await interaction.followup.send(f"No players found matching **{player}**.", ephemeral=True)
if len(candidates) > 1:
return await interaction.followup.send(_too_many_msg(candidates), ephemeral=True)
uid, nick = str(candidates[0]["uid"]), candidates[0]["nick"]
preferences.upsert_log_entry(interaction.guild_id, uid, "tss-player", nick, interaction.channel_id)
await interaction.followup.send(
f"✅ Logging **{nick}**'s matches to <#{interaction.channel_id}>.", ephemeral=True
)
# ── /set-player ────────────────────────────────────────────────────────
@app_commands.command(name="set-player", description="Link your Discord account to a War Thunder player")
@app_commands.describe(player="Player username")
@app_commands.autocomplete(player=player_autocomplete)
@not_blacklisted()
async def set_player(self, interaction: discord.Interaction, player: str):
await interaction.response.defer(ephemeral=True)
candidates = await storage.resolve_players(player)
if not candidates:
return await interaction.followup.send(f"No players found matching **{player}**.", ephemeral=True)
if len(candidates) > 1:
return await interaction.followup.send(_too_many_msg(candidates), ephemeral=True)
uid, nick = str(candidates[0]["uid"]), candidates[0]["nick"]
save_player_link(interaction.user.id, uid)
await interaction.followup.send(
f"✅ Linked your Discord account to **{nick}** (UID `{uid}`).\n"
"`/player-stats` with no argument will now default to this account.",
ephemeral=True,
)
# ── /player-stats ──────────────────────────────────────────────────────
@app_commands.command(name="player-stats", description="View TSS career stats for a player")
@app_commands.describe(player="Player username (defaults to your linked account)")
@app_commands.autocomplete(player=player_autocomplete)
@not_blacklisted()
async def player_stats(self, interaction: discord.Interaction, player: str = ""):
await interaction.response.defer()
if player:
candidates = await storage.resolve_players(player)
if not candidates:
return await interaction.followup.send(f"No players found matching **{player}**.", ephemeral=True)
if len(candidates) > 1:
return await interaction.followup.send(_too_many_msg(candidates), ephemeral=True)
uid = str(candidates[0]["uid"])
else:
uid = get_linked_uid(interaction.user.id) or ""
if not uid:
return await interaction.followup.send(
"Provide a player, or link your account first with `/set-player`.", ephemeral=True
)
career = await storage.player_career(uid)
if not career:
return await interaction.followup.send("No TSS games on record for that player.", ephemeral=True)
teams = await storage.player_teams(uid)
battles = career.get("battles") or 0
wins = career.get("wins") or 0
losses = career.get("losses") or 0
win_rate = f"{(wins / battles * 100):.1f}%" if battles else "0.0%"
deaths = career.get("deaths") or 0
kills = (career.get("ground_kills") or 0) + (career.get("air_kills") or 0)
kd = f"{(kills / deaths):.2f}" if deaths else f"{kills}"
embed = discord.Embed(
title=f"{career['nick']} — TSS career",
color=discord.Color.orange(),
)
embed.add_field(name="Battles", value=str(battles), inline=True)
embed.add_field(name="Win rate", value=f"{win_rate} ({wins}W / {losses}L)", inline=True)
embed.add_field(name="K/D", value=f"{kd} ({kills} kills / {deaths} deaths)", inline=True)
embed.add_field(
name="Ground / Air kills",
value=f"{career.get('ground_kills') or 0} / {career.get('air_kills') or 0}",
inline=True,
)
embed.add_field(name="Assists", value=str(career.get("assists") or 0), inline=True)
embed.add_field(name="Captures", value=str(career.get("captures") or 0), inline=True)
if teams:
top = teams[:5]
embed.add_field(
name="Teams seen with",
value="\n".join(
f"`{(tm.get('team_tag') or '?')}` — {tm.get('games') or 0} games" for tm in top
),
inline=False,
)
embed.set_footer(text=FOOTER)
await interaction.followup.send(embed=embed)
# ── /news ──────────────────────────────────────────────────────────────
@app_commands.command(name="news", description="View the latest TSSBOT news and announcements")
@not_blacklisted()
async def news(self, interaction: discord.Interaction):
await interaction.response.defer()
all_entries = []
if _NEWS_JSON.exists():
try:
with open(_NEWS_JSON, "r", encoding="utf-8") as f:
all_entries = json.load(f)
except (json.JSONDecodeError, OSError):
all_entries = []
now_ts = int(time.time())
entries = [
e for e in all_entries
if isinstance(e, dict) and now_ts < e.get("expires", float("inf"))
]
if not entries:
await interaction.followup.send(
embed=discord.Embed(
title="No news",
description="There are no announcements right now.",
color=discord.Color.orange(),
),
ephemeral=True,
)
return
embeds = [
discord.Embed(
title=item.get("title", "Announcement"),
description=item.get("body", ""),
color=discord.Color.orange(),
)
for item in entries[:10]
]
embeds[-1].set_footer(text=FOOTER)
await interaction.followup.send(embeds=embeds)
# ── /help ──────────────────────────────────────────────────────────────
@app_commands.command(name="help", description="List TSSBOT commands and what they do")
@not_blacklisted()
async def help_cmd(self, interaction: discord.Interaction):
embed = discord.Embed(
title="TSSBOT commands",
description="Live TSS team & player stats, plus match autologging.",
color=discord.Color.orange(),
)
embed.add_field(
name="Stats",
value=(
"`/player-stats [player]` — TSS career stats for a player "
"(defaults to your linked account)\n"
"`/set-player <player>` — link your Discord account to a WT player"
),
inline=False,
)
embed.add_field(
name="Autologging — Manage Server",
value=(
"`/set-team <team>` — set this server's team\n"
"`/log-team <team>` — post a team's matches to this channel\n"
"`/log-player <player>` — post a player's matches to this channel"
),
inline=False,
)
embed.add_field(
name="Info",
value="`/news` — latest announcements\n`/help` — this message",
inline=False,
)
embed.set_footer(text=FOOTER)
await interaction.response.send_message(embed=embed)
async def setup_commands(bot: commands.Bot) -> None:
"""Register the TSS command cog on the bot."""
await bot.add_cog(TssCommands(bot))
+184
View File
@@ -0,0 +1,184 @@
"""TSSBOT per-guild autolog preferences + guild team context (TEAMS.json).
Mirrors SREBOT's PREFERENCES model, adapted for TSS.
Per-guild file: ``STORAGE_VOL_PATH/PREFERENCES/<guild_id>-preferences.json`` —
**shared with SREBOT** (both bots write to the same file; entries are told apart
by ``Type``). A flat dict keyed by the watched entity id (a ``team_id`` for team
subs, a player ``uid`` for player subs)::
{
"<entity_id>": {
"Type": "tss-team" | "tss-player",
"Name": "STaYA",
"Logs": "<#channelid>"
}
}
The ``Type`` value's ``tss``/``sre`` prefix marks the data source and the
``team``/``player`` suffix marks the entry kind. Channel values use the SREBOT
encoding ``"<#ID>"`` (enabled) / ``"<#DISABLED-ID>"`` (disabled).
TEAMS.json (set by ``/set-team``) records each guild's own team, mirroring
SREBOT's SQUADRONS.json::
{ "<guild_id>": { "TM_Name": "STaYA", "team_id": 12 } }
"""
from __future__ import annotations
import json
import logging
import os
import re
from pathlib import Path
from typing import Any, Iterator, Optional
from .storage import STORAGE_DIR
log = logging.getLogger("tssbot.preferences")
PREFERENCES_DIR: Path = STORAGE_DIR / "PREFERENCES"
TEAMS_JSON_PATH: Path = STORAGE_DIR / "TEAMS.json"
_CHANNEL_ID_RE = re.compile(r"(\d{17,20})")
# ---------------------------------------------------------------------------
# Low-level JSON IO
# ---------------------------------------------------------------------------
def _read_json(path: Path, default: Any) -> Any:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return default
except (json.JSONDecodeError, OSError) as e:
log.error("failed reading %s: %s", path, e)
return default
def _write_json(path: Path, data: Any) -> bool:
try:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
os.replace(tmp, path)
return True
except OSError as e:
log.error("failed writing %s: %s", path, e)
return False
# ---------------------------------------------------------------------------
# Channel encoding
# ---------------------------------------------------------------------------
def channel_mention(channel_id: int | str) -> str:
"""Return the enabled channel-route encoding for a channel id."""
return f"<#{channel_id}>"
def parse_channel(value: Any) -> tuple[Optional[int], bool]:
"""Return (channel_id, enabled) from a stored route value.
Handles ``"<#ID>"``, ``"<#DISABLED-ID>"``, and bare digit strings.
Returns ``(None, False)`` for anything unparseable.
"""
if value is None:
return None, False
text = str(value)
match = _CHANNEL_ID_RE.search(text)
if not match:
return None, False
enabled = "DISABLED" not in text.upper()
return int(match.group(1)), enabled
# ---------------------------------------------------------------------------
# Per-guild preferences
# ---------------------------------------------------------------------------
_PREF_SUFFIX = "-preferences.json"
def _guild_pref_path(guild_id: int | str) -> Path:
return PREFERENCES_DIR / f"{guild_id}{_PREF_SUFFIX}"
def load_guild_preferences(guild_id: int | str) -> dict[str, Any]:
"""Load one guild's preferences dict (empty dict if none)."""
data = _read_json(_guild_pref_path(guild_id), {})
return data if isinstance(data, dict) else {}
def save_guild_preferences(guild_id: int | str, prefs: dict[str, Any]) -> bool:
"""Persist one guild's preferences dict."""
return _write_json(_guild_pref_path(guild_id), prefs)
def upsert_log_entry(
guild_id: int | str,
entity_id: int | str,
type_: str,
name: str,
channel_id: int | str,
) -> bool:
"""Add/replace a ``tss-team``/``tss-player`` route for a guild."""
prefs = load_guild_preferences(guild_id)
entry = prefs.setdefault(str(entity_id), {})
if not isinstance(entry, dict):
entry = {}
prefs[str(entity_id)] = entry
entry["Type"] = type_
entry["Name"] = name
entry["Logs"] = channel_mention(channel_id)
return save_guild_preferences(guild_id, prefs)
def remove_entry(guild_id: int | str, entity_id: int | str) -> bool:
"""Remove a watched entity from a guild's preferences. True if removed."""
prefs = load_guild_preferences(guild_id)
if str(entity_id) in prefs:
del prefs[str(entity_id)]
return save_guild_preferences(guild_id, prefs)
return False
def iter_guild_preferences() -> Iterator[tuple[int, dict[str, Any]]]:
"""Yield (guild_id, prefs) for every guild with a preferences file.
Reads the shared ``<guild_id>-preferences.json`` files; SREBOT entries in
them are ignored by callers (the autolog matcher acts only on tss-* Types).
"""
if not PREFERENCES_DIR.is_dir():
return
for path in PREFERENCES_DIR.glob(f"*{_PREF_SUFFIX}"):
try:
guild_id = int(path.name[: -len(_PREF_SUFFIX)])
except ValueError:
continue
data = _read_json(path, {})
if isinstance(data, dict) and data:
yield guild_id, data
# ---------------------------------------------------------------------------
# TEAMS.json — each guild's own team
# ---------------------------------------------------------------------------
def load_teams() -> dict[str, Any]:
data = _read_json(TEAMS_JSON_PATH, {})
return data if isinstance(data, dict) else {}
def get_guild_team(guild_id: int | str) -> Optional[dict[str, Any]]:
entry = load_teams().get(str(guild_id))
return entry if isinstance(entry, dict) else None
def set_guild_team(guild_id: int | str, team_id: int, name: str) -> bool:
teams = load_teams()
teams[str(guild_id)] = {"TM_Name": name, "team_id": int(team_id)}
return _write_json(TEAMS_JSON_PATH, teams)
+222 -1
View File
@@ -19,7 +19,7 @@ import logging
import os
import time
from pathlib import Path
from typing import Any, Dict
from typing import Any, Dict, List, Optional
import aiosqlite
@@ -396,3 +396,224 @@ async def insert_player_games(game: Dict[str, Any]) -> None:
rows,
)
await conn.commit()
# ---------------------------------------------------------------------------
# Team resolve / lookup (python twin of the Rust backend's find_team)
# ---------------------------------------------------------------------------
async def resolve_team(name_or_id: str) -> Optional[Dict[str, Any]]:
"""Resolve a team by numeric id, or by tag/short/long name (case-insensitive).
Returns ``{team_id, long_name, short_name, tag_name}`` or None.
Tag matches rank above short, then long name.
"""
text = (name_or_id or "").strip()
if not text:
return None
try:
as_id: Optional[int] = int(text)
except ValueError:
as_id = None
async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"""
SELECT team_id, long_name, short_name, tag_name
FROM teams_data
WHERE team_id = ?1
OR long_name = ?2 COLLATE NOCASE
OR short_name = ?2 COLLATE NOCASE
OR tag_name = ?2 COLLATE NOCASE
ORDER BY CASE
WHEN tag_name = ?2 COLLATE NOCASE THEN 0
WHEN short_name = ?2 COLLATE NOCASE THEN 1
WHEN long_name = ?2 COLLATE NOCASE THEN 2
ELSE 3
END
LIMIT 1
""",
(as_id, text),
) as cur:
row = await cur.fetchone()
return dict(row) if row else None
async def resolve_team_id_for_tag(tag: str) -> Optional[int]:
"""Return the team_id whose tag_name matches ``tag`` (case-insensitive)."""
tag = (tag or "").strip()
if not tag:
return None
async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn:
async with conn.execute(
"SELECT team_id FROM teams_data WHERE tag_name = ? COLLATE NOCASE LIMIT 1",
(tag,),
) as cur:
row = await cur.fetchone()
return int(row[0]) if row else None
async def search_teams(query: str, limit: int = 25) -> List[Dict[str, Any]]:
"""Autocomplete-friendly team search. Empty query → top teams by rating."""
q = (query or "").strip()
async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
if not q:
async with conn.execute(
"""
SELECT team_id, long_name, tag_name FROM teams_data
ORDER BY clanrating DESC NULLS LAST, members DESC
LIMIT ?
""",
(limit,),
) as cur:
rows = await cur.fetchall()
else:
like = f"%{q}%"
async with conn.execute(
"""
SELECT team_id, long_name, tag_name FROM teams_data
WHERE long_name LIKE ?1 COLLATE NOCASE
OR tag_name LIKE ?1 COLLATE NOCASE
ORDER BY CASE
WHEN tag_name = ?2 COLLATE NOCASE THEN 0
WHEN long_name = ?2 COLLATE NOCASE THEN 1
ELSE 2
END
LIMIT ?3
""",
(like, q, limit),
) as cur:
rows = await cur.fetchall()
return [dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Player resolve / aggregate (derived from player_games_hist)
# ---------------------------------------------------------------------------
async def search_players(query: str, limit: int = 25) -> List[Dict[str, Any]]:
"""Autocomplete-friendly player search by nick. Returns [{uid, nick}]."""
q = (query or "").strip()
if len(q) < 2:
return []
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
await conn.create_function("ulower", 1, str.lower)
async with conn.execute(
"""
SELECT nick, UID FROM (
SELECT nick, UID, MAX(endtime_unix) AS last_seen
FROM player_games_hist
WHERE nick LIKE ? COLLATE NOCASE
GROUP BY UID
ORDER BY
CASE WHEN ulower(nick) = ulower(?) THEN 0
WHEN ulower(nick) LIKE ulower(?) THEN 1
ELSE 2 END,
last_seen DESC
LIMIT ?
)
""",
(f"{q}%", q, f"{q}%", limit),
) as cur:
rows = await cur.fetchall()
return [{"uid": r["UID"], "nick": r["nick"]} for r in rows]
async def resolve_players(name: str) -> List[Dict[str, Any]]:
"""Resolve a nick to candidates (exact match first, else substring).
Returns ``[{uid, nick}]`` grouped by UID.
"""
name = (name or "").strip()
if not name:
return []
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"SELECT UID, MIN(nick) AS nick FROM player_games_hist "
"WHERE nick = ? COLLATE NOCASE GROUP BY UID ORDER BY nick LIMIT 25",
(name,),
) as cur:
rows = list(await cur.fetchall())
if not rows:
async with conn.execute(
"SELECT UID, MIN(nick) AS nick FROM player_games_hist "
"WHERE nick LIKE ? COLLATE NOCASE GROUP BY UID ORDER BY nick LIMIT 25",
(f"%{name}%",),
) as cur:
rows = list(await cur.fetchall())
return [{"uid": r["UID"], "nick": r["nick"]} for r in rows]
async def latest_nick_for_uid(uid: str) -> str:
"""Best-effort latest nick for a UID; falls back to the UID string."""
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
async with conn.execute(
"SELECT nick FROM player_games_hist WHERE UID = ? ORDER BY endtime_unix DESC LIMIT 1",
(uid,),
) as cur:
row = await cur.fetchone()
return row[0] if row else str(uid)
async def player_career(uid: str) -> Optional[Dict[str, Any]]:
"""Aggregate career stats for a UID.
Player totals (kills/deaths/...) are Spectra per-player totals duplicated
across that player's vehicle rows, so we collapse to one value per session
(MAX) before summing — see the module docstring.
"""
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"""
SELECT
COUNT(*) AS battles,
SUM(CASE WHEN UPPER(victor_bool) = 'WIN' THEN 1 ELSE 0 END) AS wins,
SUM(CASE WHEN UPPER(victor_bool) = 'LOSS' THEN 1 ELSE 0 END) AS losses,
SUM(gk) AS ground_kills, SUM(ak) AS air_kills,
SUM(asi) AS assists, SUM(cap) AS captures, SUM(de) AS deaths
FROM (
SELECT session_id,
MAX(victor_bool) AS victor_bool,
MAX(ground_kills) AS gk, MAX(air_kills) AS ak,
MAX(assists) AS asi, MAX(captures) AS cap, MAX(deaths) AS de
FROM player_games_hist
WHERE UID = ?
GROUP BY session_id
)
""",
(uid,),
) as cur:
row = await cur.fetchone()
if not row or not row["battles"]:
return None
career = dict(row)
career["nick"] = await latest_nick_for_uid(uid)
career["uid"] = str(uid)
return career
async def player_teams(uid: str) -> List[Dict[str, Any]]:
"""Teams a UID has appeared with, most recent first."""
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"""
SELECT team_tag,
MAX(team_name) AS team_name,
team_id,
COUNT(DISTINCT session_id) AS games,
MAX(endtime_unix) AS last_seen
FROM player_games_hist
WHERE UID = ?
GROUP BY team_tag
ORDER BY last_seen DESC
""",
(uid,),
) as cur:
rows = await cur.fetchall()
return [dict(r) for r in rows]
+18 -1
View File
@@ -20,6 +20,8 @@ load_dotenv(dotenv_path=_HERE / ".env")
# Imported after load_dotenv so env vars are available.
from BOT.storage import init_tss_dbs # noqa: E402
from BOT.autologging import set_bot as set_autolog_bot # noqa: E402
from BOT.commands import setup_commands # noqa: E402
from tss_ws import listen, _handle_game # noqa: E402
logging.basicConfig(
@@ -41,7 +43,22 @@ intents = discord.Intents.default()
intents.message_content = False
intents.reactions = True
intents.messages = True
bot = commands.Bot(command_prefix="!", intents=intents)
class TSSBot(commands.Bot):
async def setup_hook(self) -> None:
# Register the client so the autolog matcher can post, wire up the
# slash commands, and push them to Discord.
set_autolog_bot(self)
await setup_commands(self)
try:
synced = await self.tree.sync()
log.info("synced %d slash command(s)", len(synced))
except Exception as exc:
log.error("slash command sync failed: %s", exc)
bot = TSSBot(command_prefix="!", intents=intents)
async def _ws_task_loop() -> None:
+6
View File
@@ -30,6 +30,7 @@ from dotenv import load_dotenv
from websockets.asyncio.client import connect as wsconnect
from BOT.storage import insert_match, insert_player_games
from BOT.autologging import process_game as autolog_process_game
_HERE = Path(__file__).resolve().parent
load_dotenv(dotenv_path=_HERE / ".env")
@@ -172,6 +173,11 @@ async def _handle_game(game: Dict[str, Any]) -> None:
log.info("Stored game %s in DB", sid)
except Exception as exc:
log.error("DB insert failed for %s: %s", sid, exc)
# Autolog match/dispatch (no-ops in standalone mode where no bot is set).
try:
await autolog_process_game(game)
except Exception as exc:
log.error("autolog failed for %s: %s", sid, exc)
print(json.dumps(game, indent=2, ensure_ascii=False))
print("-" * 80)