Files
TSSBOT/BOT/storage.py
T
2026-05-30 08:45:32 -07:00

620 lines
22 KiB
Python

"""TSSBOT storage layer — SQLite paths, idempotent DB init, and insert helpers.
Two databases live under ``STORAGE_VOL_PATH`` (set in ``TSSBOT/.env``):
* ``tss_teams.db`` — persistent team registry
* ``tss_battles.db`` — per-match summary + per-player/per-vehicle game history
One row is written to ``player_games_hist`` per vehicle *actually used* by each
player. Player-level stats (kills, deaths, etc.) are Spectra player totals and
are duplicated across vehicle rows — aggregation queries that sum stats must
``DISTINCT`` on session_id first to avoid double-counting.
Session IDs are stored as hex strings throughout (Spectra sends decimal integers;
``tss_ws._session_id`` normalises them before anything hits the DB).
"""
from __future__ import annotations
import logging
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
import aiosqlite
log = logging.getLogger("tssbot.storage")
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
def _require_storage_dir() -> Path:
raw = os.environ.get("STORAGE_VOL_PATH", "").strip()
if not raw:
raise RuntimeError("STORAGE_VOL_PATH must be set in TSSBOT/.env")
p = Path(raw).expanduser().resolve()
p.mkdir(parents=True, exist_ok=True)
return p
STORAGE_DIR: Path = _require_storage_dir()
TSS_BATTLES_DB_PATH: Path = STORAGE_DIR / "tss_battles.db"
TSS_TEAMS_DB_PATH: Path = STORAGE_DIR / "tss_teams.db"
# ---------------------------------------------------------------------------
# tss_battles.db — match_summary + player_games_hist
# ---------------------------------------------------------------------------
_MATCH_SUMMARY_SQL = """
CREATE TABLE IF NOT EXISTS match_summary (
session_id TEXT PRIMARY KEY,
map_name TEXT,
mission_mode TEXT,
difficulty TEXT,
starttime_unix INTEGER,
endtime_unix INTEGER,
duration REAL,
draw INTEGER NOT NULL DEFAULT 0,
winning_slot TEXT,
losing_slot TEXT,
received_unix INTEGER,
tournament_id INTEGER
)
"""
_MATCH_SUMMARY_INDEXES = [
"CREATE INDEX IF NOT EXISTS idx_ms_map_name ON match_summary(map_name)",
"CREATE INDEX IF NOT EXISTS idx_ms_endtime ON match_summary(endtime_unix)",
"CREATE INDEX IF NOT EXISTS idx_ms_difficulty ON match_summary(difficulty)",
]
_PLAYER_GAMES_SQL = """
CREATE TABLE IF NOT EXISTS player_games_hist (
UID TEXT NOT NULL,
nick TEXT NOT NULL,
team_name TEXT,
team_tag TEXT NOT NULL DEFAULT 'UNKNOWN',
team_slot TEXT,
session_id TEXT NOT NULL,
vehicle TEXT,
vehicle_internal TEXT,
ground_kills INTEGER NOT NULL DEFAULT 0,
air_kills INTEGER NOT NULL DEFAULT 0,
assists INTEGER NOT NULL DEFAULT 0,
captures INTEGER NOT NULL DEFAULT 0,
deaths INTEGER NOT NULL DEFAULT 0,
score INTEGER NOT NULL DEFAULT 0,
missile_evades INTEGER NOT NULL DEFAULT 0,
shell_interceptions INTEGER NOT NULL DEFAULT 0,
team_kills_stat INTEGER NOT NULL DEFAULT 0,
country_id INTEGER,
victor_bool TEXT NOT NULL DEFAULT 'Loss',
endtime_unix INTEGER NOT NULL DEFAULT 0,
team_id INTEGER,
tss_team_uuid TEXT,
tss_role TEXT,
tss_place INTEGER,
pvp_ratio REAL,
UNIQUE (UID, session_id, vehicle_internal)
)
"""
_PLAYER_GAMES_INDEXES = [
"CREATE INDEX IF NOT EXISTS idx_pgh_session ON player_games_hist(session_id)",
"CREATE INDEX IF NOT EXISTS idx_pgh_uid_time ON player_games_hist(UID, endtime_unix)",
"CREATE INDEX IF NOT EXISTS idx_pgh_team_tag_time ON player_games_hist(team_tag, endtime_unix)",
"CREATE INDEX IF NOT EXISTS idx_pgh_team_name_time ON player_games_hist(team_name, endtime_unix)",
"CREATE INDEX IF NOT EXISTS idx_pgh_endtime ON player_games_hist(endtime_unix)",
"CREATE INDEX IF NOT EXISTS idx_pgh_nick ON player_games_hist(nick COLLATE NOCASE)",
"CREATE INDEX IF NOT EXISTS idx_pgh_session_team_name ON player_games_hist(session_id, team_name)",
"CREATE INDEX IF NOT EXISTS idx_pgh_team_id_time ON player_games_hist(team_id, endtime_unix)",
"CREATE INDEX IF NOT EXISTS idx_pgh_vehicle_internal ON player_games_hist(vehicle_internal)",
]
# ---------------------------------------------------------------------------
# tss_teams.db — teams_data + team_members + team_name_history
# ---------------------------------------------------------------------------
_TEAMS_DATA_SQL = """
CREATE TABLE IF NOT EXISTS teams_data (
team_id INTEGER PRIMARY KEY AUTOINCREMENT,
long_name TEXT NOT NULL UNIQUE,
short_name TEXT UNIQUE,
tag_name TEXT,
description TEXT,
region TEXT,
members INTEGER NOT NULL DEFAULT 0,
members_json TEXT,
captain_uid TEXT,
guild_id TEXT,
clanrating INTEGER,
created_unix INTEGER,
updated_unix INTEGER
)
"""
_TEAMS_DATA_INDEXES = [
"CREATE INDEX IF NOT EXISTS idx_teams_data_tag_name ON teams_data(tag_name COLLATE NOCASE)",
"CREATE INDEX IF NOT EXISTS idx_teams_data_guild_id ON teams_data(guild_id)",
]
_TEAM_MEMBERS_SQL = """
CREATE TABLE IF NOT EXISTS team_members (
team_id INTEGER NOT NULL,
uid TEXT NOT NULL,
nick TEXT,
role TEXT NOT NULL DEFAULT 'player',
points INTEGER NOT NULL DEFAULT 0,
joined_unix INTEGER,
PRIMARY KEY (team_id, uid)
)
"""
_TEAM_MEMBERS_INDEXES = [
"CREATE INDEX IF NOT EXISTS idx_team_members_team_id ON team_members(team_id)",
"CREATE INDEX IF NOT EXISTS idx_team_members_uid ON team_members(uid)",
]
_TEAM_NAME_HISTORY_SQL = """
CREATE TABLE IF NOT EXISTS team_name_history (
team_id INTEGER NOT NULL,
long_name TEXT NOT NULL,
first_seen INTEGER,
last_seen INTEGER,
PRIMARY KEY (team_id, long_name)
)
"""
_TEAM_NAME_HISTORY_INDEXES = [
"CREATE INDEX IF NOT EXISTS idx_team_name_history_name ON team_name_history(long_name COLLATE NOCASE)",
]
_TEAMS_POINTS_SQL = """
CREATE TABLE IF NOT EXISTS teams_points (
team_id INTEGER NOT NULL,
long_name TEXT,
unix_time INTEGER NOT NULL,
total_score INTEGER,
PRIMARY KEY (team_id, unix_time)
)
"""
_TEAMS_POINTS_INDEXES = [
"CREATE INDEX IF NOT EXISTS idx_teams_points_long_name ON teams_points(long_name COLLATE NOCASE)",
"CREATE INDEX IF NOT EXISTS idx_teams_points_unix_time ON teams_points(unix_time)",
]
# ---------------------------------------------------------------------------
# Init
# ---------------------------------------------------------------------------
_PRAGMAS = (
"PRAGMA journal_mode=WAL;",
"PRAGMA synchronous=NORMAL;",
"PRAGMA busy_timeout=5000;",
"PRAGMA temp_store=MEMORY;",
)
async def _apply(conn: aiosqlite.Connection, statements: list[str]) -> None:
for sql in statements:
await conn.execute(sql)
async def _existing_columns(conn: aiosqlite.Connection, table: str) -> set[str]:
rows = await conn.execute_fetchall(f"PRAGMA table_info({table})")
return {row[1] for row in rows}
async def _migrate(
conn: aiosqlite.Connection, table: str, additions: dict[str, str]
) -> None:
"""Add any missing columns. `additions` maps column name → full column DDL.
SQLite's ``ALTER TABLE ADD COLUMN`` only accepts a literal DEFAULT, which
is fine for all our additions. Existing rows pick up the DEFAULT value.
"""
cols = await _existing_columns(conn, table)
for col, ddl in additions.items():
if col not in cols:
await conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}")
log.info("migrated %s: added column %s", table, col)
async def _init_battles_db() -> None:
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
for sql in _PRAGMAS:
await conn.execute(sql)
await conn.execute(_MATCH_SUMMARY_SQL)
await conn.execute(_PLAYER_GAMES_SQL)
await _apply(conn, _MATCH_SUMMARY_INDEXES)
await _apply(conn, _PLAYER_GAMES_INDEXES)
await _migrate(conn, "match_summary", {
"tournament_id": "tournament_id INTEGER",
})
await _migrate(conn, "player_games_hist", {
"tss_team_uuid": "tss_team_uuid TEXT",
"tss_role": "tss_role TEXT",
"tss_place": "tss_place INTEGER",
"pvp_ratio": "pvp_ratio REAL",
})
await conn.commit()
async def _init_teams_db() -> None:
async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn:
for sql in _PRAGMAS:
await conn.execute(sql)
await conn.execute(_TEAMS_DATA_SQL)
await conn.execute(_TEAM_MEMBERS_SQL)
await conn.execute(_TEAM_NAME_HISTORY_SQL)
await conn.execute(_TEAMS_POINTS_SQL)
# Forward-only migrations for DBs created before a column landed.
# Keep CREATE TABLE statements above in sync — these only matter when
# the table already existed.
await _migrate(conn, "teams_data", {
"clanrating": "clanrating INTEGER",
})
await _migrate(conn, "team_members", {
"points": "points INTEGER NOT NULL DEFAULT 0",
})
await _apply(conn, _TEAMS_DATA_INDEXES)
await _apply(conn, _TEAM_MEMBERS_INDEXES)
await _apply(conn, _TEAM_NAME_HISTORY_INDEXES)
await _apply(conn, _TEAMS_POINTS_INDEXES)
await conn.commit()
async def init_tss_dbs() -> None:
"""Create both TSSBOT SQLite databases if they don't already exist.
Idempotent — safe to call on every startup. Sets WAL pragmas, creates
tables, and ensures indexes.
"""
await _init_battles_db()
await _init_teams_db()
log.info(
"TSS DBs initialised: %s, %s",
TSS_BATTLES_DB_PATH,
TSS_TEAMS_DB_PATH,
)
# ---------------------------------------------------------------------------
# Insertion helpers
# ---------------------------------------------------------------------------
async def insert_match(game: Dict[str, Any]) -> None:
"""Insert one row into match_summary from a normalised game dict.
``game["_id"]`` must already be a hex string (normalised by tss_ws).
Safe to call multiple times — INSERT OR IGNORE skips duplicates.
"""
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
for sql in _PRAGMAS:
await conn.execute(sql)
await conn.execute(
"""
INSERT OR IGNORE INTO match_summary
(session_id, map_name, mission_mode, difficulty,
starttime_unix, endtime_unix, duration,
draw, winning_slot, losing_slot, received_unix)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(game["_id"]),
str(game.get("mission_name") or game.get("level_path") or ""),
str(game.get("mission_mode") or ""),
str(game.get("difficulty") or ""),
int(game.get("start_ts") or 0),
int(game.get("end_ts") or 0),
float(game.get("duration") or 0),
1 if game.get("draw") else 0,
str(game.get("winner") or ""),
str(game.get("loser") or ""),
int(time.time()),
),
)
await conn.commit()
async def insert_player_games(game: Dict[str, Any]) -> None:
"""Insert one row per used vehicle per player into player_games_hist.
victor_bool is set to 'Win' when the player's team slot matches the
winning slot, 'Loss' otherwise.
Safe to call multiple times — INSERT OR IGNORE skips duplicates.
"""
session_id = str(game["_id"])
winner_slot = str(game.get("winner") or "")
end_ts = int(game.get("end_ts") or 0)
players = game.get("players") or {}
rows = []
for uid_str, p in players.items():
victor_bool = "Win" if str(p.get("team", "")) == winner_slot else "Loss"
tag_raw = p.get("tag") or ""
team_tag = tag_raw[1:-1] if len(tag_raw) > 2 else tag_raw
used_units = [u for u in (p.get("units") or []) if u.get("used")]
if not used_units:
continue
for unit in used_units:
rows.append((
str(uid_str),
str(p.get("name") or ""),
"", # team_name — resolved later
team_tag,
str(p.get("team") or ""), # team_slot ("1" or "2")
session_id,
str(unit.get("unit_normalized") or ""),
str(unit.get("unit") or ""),
int(p.get("ground_kills") or 0),
int(p.get("air_kills") or 0),
int(p.get("assists") or 0),
int(p.get("captures") or 0),
int(p.get("deaths") or 0),
int(p.get("score") or 0),
int(p.get("missile_evades") or 0),
int(p.get("shell_interceptions") or 0),
int(p.get("team_kills") or 0),
p.get("country_id"),
victor_bool,
end_ts,
None, # team_id — resolved later
))
if not rows:
return
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
for sql in _PRAGMAS:
await conn.execute(sql)
await conn.executemany(
"""
INSERT OR IGNORE INTO player_games_hist
(UID, nick, team_name, team_tag, team_slot, session_id,
vehicle, vehicle_internal,
ground_kills, air_kills, assists, captures, deaths, score,
missile_evades, shell_interceptions, team_kills_stat,
country_id, victor_bool, endtime_unix, team_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
rows,
)
await conn.commit()
# ---------------------------------------------------------------------------
# Team resolve / lookup (python twin of the Rust backend's find_team)
# ---------------------------------------------------------------------------
async def resolve_team(name_or_id: str) -> Optional[Dict[str, Any]]:
"""Resolve a team by numeric id, or by tag/short/long name (case-insensitive).
Returns ``{team_id, long_name, short_name, tag_name}`` or None.
Tag matches rank above short, then long name.
"""
text = (name_or_id or "").strip()
if not text:
return None
try:
as_id: Optional[int] = int(text)
except ValueError:
as_id = None
async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"""
SELECT team_id, long_name, short_name, tag_name
FROM teams_data
WHERE team_id = ?1
OR long_name = ?2 COLLATE NOCASE
OR short_name = ?2 COLLATE NOCASE
OR tag_name = ?2 COLLATE NOCASE
ORDER BY CASE
WHEN tag_name = ?2 COLLATE NOCASE THEN 0
WHEN short_name = ?2 COLLATE NOCASE THEN 1
WHEN long_name = ?2 COLLATE NOCASE THEN 2
ELSE 3
END
LIMIT 1
""",
(as_id, text),
) as cur:
row = await cur.fetchone()
return dict(row) if row else None
async def resolve_team_id_for_tag(tag: str) -> Optional[int]:
"""Return the team_id whose tag_name matches ``tag`` (case-insensitive)."""
tag = (tag or "").strip()
if not tag:
return None
async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn:
async with conn.execute(
"SELECT team_id FROM teams_data WHERE tag_name = ? COLLATE NOCASE LIMIT 1",
(tag,),
) as cur:
row = await cur.fetchone()
return int(row[0]) if row else None
async def search_teams(query: str, limit: int = 25) -> List[Dict[str, Any]]:
"""Autocomplete-friendly team search. Empty query → top teams by rating."""
q = (query or "").strip()
async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
if not q:
async with conn.execute(
"""
SELECT team_id, long_name, tag_name FROM teams_data
ORDER BY clanrating DESC NULLS LAST, members DESC
LIMIT ?
""",
(limit,),
) as cur:
rows = await cur.fetchall()
else:
like = f"%{q}%"
async with conn.execute(
"""
SELECT team_id, long_name, tag_name FROM teams_data
WHERE long_name LIKE ?1 COLLATE NOCASE
OR tag_name LIKE ?1 COLLATE NOCASE
ORDER BY CASE
WHEN tag_name = ?2 COLLATE NOCASE THEN 0
WHEN long_name = ?2 COLLATE NOCASE THEN 1
ELSE 2
END
LIMIT ?3
""",
(like, q, limit),
) as cur:
rows = await cur.fetchall()
return [dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Player resolve / aggregate (derived from player_games_hist)
# ---------------------------------------------------------------------------
async def search_players(query: str, limit: int = 25) -> List[Dict[str, Any]]:
"""Autocomplete-friendly player search by nick. Returns [{uid, nick}]."""
q = (query or "").strip()
if len(q) < 2:
return []
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
await conn.create_function("ulower", 1, str.lower)
async with conn.execute(
"""
SELECT nick, UID FROM (
SELECT nick, UID, MAX(endtime_unix) AS last_seen
FROM player_games_hist
WHERE nick LIKE ? COLLATE NOCASE
GROUP BY UID
ORDER BY
CASE WHEN ulower(nick) = ulower(?) THEN 0
WHEN ulower(nick) LIKE ulower(?) THEN 1
ELSE 2 END,
last_seen DESC
LIMIT ?
)
""",
(f"{q}%", q, f"{q}%", limit),
) as cur:
rows = await cur.fetchall()
return [{"uid": r["UID"], "nick": r["nick"]} for r in rows]
async def resolve_players(name: str) -> List[Dict[str, Any]]:
"""Resolve a nick to candidates (exact match first, else substring).
Returns ``[{uid, nick}]`` grouped by UID.
"""
name = (name or "").strip()
if not name:
return []
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"SELECT UID, MIN(nick) AS nick FROM player_games_hist "
"WHERE nick = ? COLLATE NOCASE GROUP BY UID ORDER BY nick LIMIT 25",
(name,),
) as cur:
rows = list(await cur.fetchall())
if not rows:
async with conn.execute(
"SELECT UID, MIN(nick) AS nick FROM player_games_hist "
"WHERE nick LIKE ? COLLATE NOCASE GROUP BY UID ORDER BY nick LIMIT 25",
(f"%{name}%",),
) as cur:
rows = list(await cur.fetchall())
return [{"uid": r["UID"], "nick": r["nick"]} for r in rows]
async def latest_nick_for_uid(uid: str) -> str:
"""Best-effort latest nick for a UID; falls back to the UID string."""
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
async with conn.execute(
"SELECT nick FROM player_games_hist WHERE UID = ? ORDER BY endtime_unix DESC LIMIT 1",
(uid,),
) as cur:
row = await cur.fetchone()
return row[0] if row else str(uid)
async def player_career(uid: str) -> Optional[Dict[str, Any]]:
"""Aggregate career stats for a UID.
Player totals (kills/deaths/...) are Spectra per-player totals duplicated
across that player's vehicle rows, so we collapse to one value per session
(MAX) before summing — see the module docstring.
"""
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"""
SELECT
COUNT(*) AS battles,
SUM(CASE WHEN UPPER(victor_bool) = 'WIN' THEN 1 ELSE 0 END) AS wins,
SUM(CASE WHEN UPPER(victor_bool) = 'LOSS' THEN 1 ELSE 0 END) AS losses,
SUM(gk) AS ground_kills, SUM(ak) AS air_kills,
SUM(asi) AS assists, SUM(cap) AS captures, SUM(de) AS deaths
FROM (
SELECT session_id,
MAX(victor_bool) AS victor_bool,
MAX(ground_kills) AS gk, MAX(air_kills) AS ak,
MAX(assists) AS asi, MAX(captures) AS cap, MAX(deaths) AS de
FROM player_games_hist
WHERE UID = ?
GROUP BY session_id
)
""",
(uid,),
) as cur:
row = await cur.fetchone()
if not row or not row["battles"]:
return None
career = dict(row)
career["nick"] = await latest_nick_for_uid(uid)
career["uid"] = str(uid)
return career
async def player_teams(uid: str) -> List[Dict[str, Any]]:
"""Teams a UID has appeared with, most recent first."""
async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(
"""
SELECT team_tag,
MAX(team_name) AS team_name,
team_id,
COUNT(DISTINCT session_id) AS games,
MAX(endtime_unix) AS last_seen
FROM player_games_hist
WHERE UID = ?
GROUP BY team_tag
ORDER BY last_seen DESC
""",
(uid,),
) as cur:
rows = await cur.fetchall()
return [dict(r) for r in rows]