"""TSSBOT storage layer — SQLite paths, idempotent DB init, and insert helpers. Two databases live under ``STORAGE_VOL_PATH`` (set in ``TSSBOT/.env``): * ``tss_teams.db`` — replay-sourced TSS team registry * ``tss_battles.db`` — per-match summary + per-player/per-vehicle game history One row is written to ``player_games_hist`` per vehicle *actually used* by each player. Player-level stats (kills, deaths, etc.) are Spectra player totals and are duplicated across vehicle rows — aggregation queries that sum stats must ``DISTINCT`` on session_id first to avoid double-counting. Session IDs are stored as hex strings throughout (Spectra sends decimal integers; ``tss_ws._session_id`` normalises them before anything hits the DB). """ from __future__ import annotations import logging import os import time from pathlib import Path from typing import Any, Dict, List, Optional import aiosqlite log = logging.getLogger("tssbot.storage") # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- def _require_storage_dir() -> Path: raw = os.environ.get("STORAGE_VOL_PATH", "").strip() if not raw: raise RuntimeError("STORAGE_VOL_PATH must be set in TSSBOT/.env") p = Path(raw).expanduser().resolve() p.mkdir(parents=True, exist_ok=True) return p STORAGE_DIR: Path = _require_storage_dir() TSS_BATTLES_DB_PATH: Path = STORAGE_DIR / "tss_battles.db" TSS_TEAMS_DB_PATH: Path = STORAGE_DIR / "tss_teams.db" # --------------------------------------------------------------------------- # tss_battles.db — match_summary + player_games_hist # --------------------------------------------------------------------------- _MATCH_SUMMARY_SQL = """ CREATE TABLE IF NOT EXISTS match_summary ( session_id TEXT PRIMARY KEY, mission_mode TEXT, mission_name TEXT, level_path TEXT, mission_path TEXT, difficulty TEXT, starttime_unix INTEGER, endtime_unix INTEGER, duration REAL, draw INTEGER NOT NULL DEFAULT 0, winning_slot TEXT, losing_slot TEXT, received_unix INTEGER, tournament_id INTEGER, tournament_name TEXT, match_id TEXT, bracket TEXT ) """ _MATCH_SUMMARY_INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_ms_mission_name ON match_summary(mission_name)", "CREATE INDEX IF NOT EXISTS idx_ms_level_path ON match_summary(level_path)", "CREATE INDEX IF NOT EXISTS idx_ms_mission_path ON match_summary(mission_path)", "CREATE INDEX IF NOT EXISTS idx_ms_endtime ON match_summary(endtime_unix)", "CREATE INDEX IF NOT EXISTS idx_ms_difficulty ON match_summary(difficulty)", ] _PLAYER_GAMES_SQL = """ CREATE TABLE IF NOT EXISTS player_games_hist ( UID TEXT NOT NULL, nick TEXT NOT NULL, team_name TEXT, team_slot TEXT, session_id TEXT NOT NULL, vehicle TEXT, vehicle_internal TEXT, ground_kills INTEGER NOT NULL DEFAULT 0, air_kills INTEGER NOT NULL DEFAULT 0, assists INTEGER NOT NULL DEFAULT 0, captures INTEGER NOT NULL DEFAULT 0, deaths INTEGER NOT NULL DEFAULT 0, score INTEGER NOT NULL DEFAULT 0, missile_evades INTEGER NOT NULL DEFAULT 0, shell_interceptions INTEGER NOT NULL DEFAULT 0, team_kills_stat INTEGER NOT NULL DEFAULT 0, country_id INTEGER, victor_bool TEXT NOT NULL DEFAULT 'Loss', endtime_unix INTEGER NOT NULL DEFAULT 0, team_id INTEGER, tss_role TEXT, pvp_ratio REAL, UNIQUE (UID, session_id, vehicle_internal) ) """ _PLAYER_GAMES_INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_pgh_session ON player_games_hist(session_id)", "CREATE INDEX IF NOT EXISTS idx_pgh_uid_time ON player_games_hist(UID, endtime_unix)", "CREATE INDEX IF NOT EXISTS idx_pgh_team_name_time ON player_games_hist(team_name, endtime_unix)", "CREATE INDEX IF NOT EXISTS idx_pgh_endtime ON player_games_hist(endtime_unix)", "CREATE INDEX IF NOT EXISTS idx_pgh_nick ON player_games_hist(nick COLLATE NOCASE)", "CREATE INDEX IF NOT EXISTS idx_pgh_session_team_name ON player_games_hist(session_id, team_name)", "CREATE INDEX IF NOT EXISTS idx_pgh_team_id_time ON player_games_hist(team_id, endtime_unix)", "CREATE INDEX IF NOT EXISTS idx_pgh_vehicle_internal ON player_games_hist(vehicle_internal)", ] # --------------------------------------------------------------------------- # tss_teams.db — teams_data + team_members + team_name_history # --------------------------------------------------------------------------- _TEAMS_DATA_SQL = """ CREATE TABLE IF NOT EXISTS teams_data ( team_id INTEGER PRIMARY KEY, name TEXT NOT NULL, members INTEGER NOT NULL DEFAULT 0, captain_uid TEXT ) """ _TEAMS_DATA_INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_teams_data_name ON teams_data(name COLLATE NOCASE)", ] _TEAM_MEMBERS_SQL = """ CREATE TABLE IF NOT EXISTS team_members ( team_id INTEGER NOT NULL, uid TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'player', PRIMARY KEY (team_id, uid) ) """ _TEAM_MEMBERS_INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_team_members_team_id ON team_members(team_id)", "CREATE INDEX IF NOT EXISTS idx_team_members_uid ON team_members(uid)", ] _TEAM_NAME_HISTORY_SQL = """ CREATE TABLE IF NOT EXISTS team_name_history ( team_id INTEGER NOT NULL, name TEXT NOT NULL, first_seen INTEGER, last_seen INTEGER, PRIMARY KEY (team_id, name) ) """ _TEAM_NAME_HISTORY_INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_team_name_history_name ON team_name_history(name COLLATE NOCASE)", ] # --------------------------------------------------------------------------- # Init # --------------------------------------------------------------------------- _PRAGMAS = ( "PRAGMA journal_mode=WAL;", "PRAGMA synchronous=NORMAL;", "PRAGMA busy_timeout=5000;", "PRAGMA temp_store=MEMORY;", ) async def _apply(conn: aiosqlite.Connection, statements: list[str]) -> None: for sql in statements: await conn.execute(sql) async def _existing_columns(conn: aiosqlite.Connection, table: str) -> set[str]: rows = await conn.execute_fetchall(f"PRAGMA table_info({table})") return {row[1] for row in rows} async def _rebuild_table( conn: aiosqlite.Connection, table: str, create_sql: str, desired_columns: list[str], ) -> None: """Rebuild a table when its schema contains obsolete or missing columns.""" existing = await _existing_columns(conn, table) desired = set(desired_columns) if existing == desired: return old_table = f"{table}_old_schema" await conn.execute(f"DROP TABLE IF EXISTS {old_table}") await conn.execute(f"ALTER TABLE {table} RENAME TO {old_table}") await conn.execute(create_sql) common = [column for column in desired_columns if column in existing] if common: columns = ", ".join(common) await conn.execute( f"INSERT OR IGNORE INTO {table} ({columns}) " f"SELECT {columns} FROM {old_table}" ) await conn.execute(f"DROP TABLE {old_table}") log.info("migrated %s: rebuilt replay-native schema", table) async def _init_battles_db() -> None: async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: for sql in _PRAGMAS: await conn.execute(sql) await conn.execute(_MATCH_SUMMARY_SQL) await conn.execute(_PLAYER_GAMES_SQL) await _rebuild_table(conn, "match_summary", _MATCH_SUMMARY_SQL, [ "session_id", "mission_mode", "mission_name", "level_path", "mission_path", "difficulty", "starttime_unix", "endtime_unix", "duration", "draw", "winning_slot", "losing_slot", "received_unix", "tournament_id", "tournament_name", "match_id", "bracket", ]) await _rebuild_table(conn, "player_games_hist", _PLAYER_GAMES_SQL, [ "UID", "nick", "team_name", "team_slot", "session_id", "vehicle", "vehicle_internal", "ground_kills", "air_kills", "assists", "captures", "deaths", "score", "missile_evades", "shell_interceptions", "team_kills_stat", "country_id", "victor_bool", "endtime_unix", "team_id", "tss_role", "pvp_ratio", ]) await _apply(conn, _MATCH_SUMMARY_INDEXES) await _apply(conn, _PLAYER_GAMES_INDEXES) await conn.commit() async def _init_teams_db() -> None: async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn: for sql in _PRAGMAS: await conn.execute(sql) await conn.execute(_TEAMS_DATA_SQL) await conn.execute(_TEAM_MEMBERS_SQL) await conn.execute(_TEAM_NAME_HISTORY_SQL) await conn.execute("DROP TABLE IF EXISTS teams_points") await _rebuild_table(conn, "teams_data", _TEAMS_DATA_SQL, [ "team_id", "name", "members", "captain_uid", ]) await _rebuild_table(conn, "team_members", _TEAM_MEMBERS_SQL, [ "team_id", "uid", "role", ]) await _apply(conn, _TEAMS_DATA_INDEXES) await _apply(conn, _TEAM_MEMBERS_INDEXES) await _apply(conn, _TEAM_NAME_HISTORY_INDEXES) await conn.commit() async def init_tss_dbs() -> None: """Create both TSSBOT SQLite databases if they don't already exist. Idempotent — safe to call on every startup. Sets WAL pragmas, creates tables, and ensures indexes. """ await _init_battles_db() await _init_teams_db() log.info( "TSS DBs initialised: %s, %s", TSS_BATTLES_DB_PATH, TSS_TEAMS_DB_PATH, ) # --------------------------------------------------------------------------- # Insertion helpers # --------------------------------------------------------------------------- async def insert_match(game: Dict[str, Any]) -> None: """Insert one row into match_summary from a normalised game dict. ``game["_id"]`` must already be a hex string (normalised by tss_ws). Safe to call multiple times; newer replay metadata refreshes the row. """ async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: for sql in _PRAGMAS: await conn.execute(sql) tss = game.get("tss") or {} await conn.execute( """ INSERT INTO match_summary (session_id, mission_mode, mission_name, level_path, mission_path, difficulty, starttime_unix, endtime_unix, duration, draw, winning_slot, losing_slot, received_unix, tournament_id, tournament_name, match_id, bracket) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(session_id) DO UPDATE SET mission_mode = excluded.mission_mode, mission_name = excluded.mission_name, level_path = excluded.level_path, mission_path = excluded.mission_path, difficulty = excluded.difficulty, starttime_unix = excluded.starttime_unix, endtime_unix = excluded.endtime_unix, duration = excluded.duration, draw = excluded.draw, winning_slot = excluded.winning_slot, losing_slot = excluded.losing_slot, tournament_id = excluded.tournament_id, tournament_name = excluded.tournament_name, match_id = excluded.match_id, bracket = excluded.bracket """, ( str(game["_id"]), str(game.get("mission_mode") or ""), str(game.get("mission_name") or ""), str(game.get("level_path") or ""), str(game.get("mission_path") or ""), str(game.get("difficulty") or ""), int(game.get("start_ts") or 0), int(game.get("end_ts") or 0), float(game.get("duration") or 0), 1 if game.get("draw") else 0, str(game.get("winner") or ""), str(game.get("loser") or ""), int(time.time()), tss.get("tournament_id"), str(tss.get("tournament_name") or ""), str(tss.get("match_id") or ""), str(tss.get("bracket") or ""), ), ) await conn.commit() async def insert_player_games(game: Dict[str, Any]) -> None: """Insert one row per used vehicle per player into player_games_hist. victor_bool is set to 'Win' when the player's team slot matches the winning slot, 'Loss' otherwise. Safe to call multiple times; newer replay metadata refreshes existing rows. """ session_id = str(game["_id"]) winner_slot = str(game.get("winner") or "") end_ts = int(game.get("end_ts") or 0) players = game.get("players") or {} tss = game.get("tss") or {} tss_players: dict[str, dict[str, Any]] = {} tss_teams: dict[str, dict[str, Any]] = {} for slot in ("1", "2"): team = tss.get(slot) if not isinstance(team, dict): continue tss_teams[slot] = team for member in team.get("players") or []: if isinstance(member, dict) and member.get("uid") is not None: tss_players[str(member["uid"])] = member rows = [] for uid_str, p in players.items(): victor_bool = "Win" if str(p.get("team", "")) == winner_slot else "Loss" tss_team = tss_teams.get(str(p.get("team") or ""), {}) tss_player = tss_players.get(str(uid_str), {}) used_units = [u for u in (p.get("units") or []) if u.get("used")] if not used_units: continue for unit in used_units: rows.append(( str(uid_str), str(p.get("name") or ""), str(tss_team.get("team_name") or tss_team.get("name") or ""), str(p.get("team") or ""), # team_slot ("1" or "2") session_id, str(unit.get("unit_normalized") or ""), str(unit.get("unit") or ""), int(p.get("ground_kills") or 0), int(p.get("air_kills") or 0), int(p.get("assists") or 0), int(p.get("captures") or 0), int(p.get("deaths") or 0), int(p.get("score") or 0), int(p.get("missile_evades") or 0), int(p.get("shell_interceptions") or 0), int(p.get("team_kills") or 0), p.get("country_id"), victor_bool, end_ts, tss_team.get("team_id"), str(tss_player.get("role") or ""), tss_player.get("pvp_ratio"), )) if not rows: return async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: for sql in _PRAGMAS: await conn.execute(sql) await conn.executemany( """ INSERT INTO player_games_hist (UID, nick, team_name, team_slot, session_id, vehicle, vehicle_internal, ground_kills, air_kills, assists, captures, deaths, score, missile_evades, shell_interceptions, team_kills_stat, country_id, victor_bool, endtime_unix, team_id, tss_role, pvp_ratio) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(UID, session_id, vehicle_internal) DO UPDATE SET nick = excluded.nick, team_name = excluded.team_name, team_slot = excluded.team_slot, vehicle = excluded.vehicle, ground_kills = excluded.ground_kills, air_kills = excluded.air_kills, assists = excluded.assists, captures = excluded.captures, deaths = excluded.deaths, score = excluded.score, missile_evades = excluded.missile_evades, shell_interceptions = excluded.shell_interceptions, team_kills_stat = excluded.team_kills_stat, country_id = excluded.country_id, victor_bool = excluded.victor_bool, endtime_unix = excluded.endtime_unix, team_id = excluded.team_id, tss_role = excluded.tss_role, pvp_ratio = excluded.pvp_ratio """, rows, ) await conn.commit() async def upsert_tss_teams(game: Dict[str, Any]) -> None: """Upsert replay-sourced team identity, roster, and name history.""" tss = game.get("tss") or {} seen_unix = int(game.get("end_ts") or time.time()) async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn: for sql in _PRAGMAS: await conn.execute(sql) for slot in ("1", "2"): team = tss.get(slot) if not isinstance(team, dict) or team.get("team_id") is None: continue team_id = int(team["team_id"]) team_name = str(team.get("team_name") or team.get("name") or "") if not team_name: continue members = [ member for member in (team.get("players") or []) if isinstance(member, dict) and member.get("uid") is not None ] await conn.execute( """ INSERT INTO teams_data (team_id, name, members, captain_uid) VALUES (?, ?, ?, ?) ON CONFLICT(team_id) DO UPDATE SET name = excluded.name, members = excluded.members, captain_uid = excluded.captain_uid """, (team_id, team_name, len(members), str(team.get("captain_id") or "")), ) await conn.execute( """ INSERT INTO team_name_history (team_id, name, first_seen, last_seen) VALUES (?, ?, ?, ?) ON CONFLICT(team_id, name) DO UPDATE SET last_seen = excluded.last_seen """, (team_id, team_name, seen_unix, seen_unix), ) if "players" in team: await conn.execute("DELETE FROM team_members WHERE team_id = ?", (team_id,)) for member in members: await conn.execute( """ INSERT INTO team_members (team_id, uid, role) VALUES (?, ?, ?) ON CONFLICT(team_id, uid) DO UPDATE SET role = excluded.role """, (team_id, str(member["uid"]), str(member.get("role") or "player")), ) await conn.commit() # --------------------------------------------------------------------------- # Team resolve / lookup (python twin of the Rust backend's find_team) # --------------------------------------------------------------------------- async def resolve_team(name_or_id: str) -> Optional[Dict[str, Any]]: """Resolve a team by numeric ID or full TSS team name.""" text = (name_or_id or "").strip() if not text: return None try: as_id: Optional[int] = int(text) except ValueError: as_id = None async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn: conn.row_factory = aiosqlite.Row async with conn.execute( """ SELECT team_id, name FROM teams_data WHERE team_id = ?1 OR name = ?2 COLLATE NOCASE LIMIT 1 """, (as_id, text), ) as cur: row = await cur.fetchone() return dict(row) if row else None async def search_teams(query: str, limit: int = 25) -> List[Dict[str, Any]]: """Autocomplete-friendly TSS team-name search.""" q = (query or "").strip() async with aiosqlite.connect(TSS_TEAMS_DB_PATH) as conn: conn.row_factory = aiosqlite.Row if not q: async with conn.execute( """ SELECT team_id, name FROM teams_data ORDER BY members DESC, name COLLATE NOCASE LIMIT ? """, (limit,), ) as cur: rows = await cur.fetchall() else: like = f"%{q}%" async with conn.execute( """ SELECT team_id, name FROM teams_data WHERE name LIKE ?1 COLLATE NOCASE ORDER BY CASE WHEN name = ?2 COLLATE NOCASE THEN 0 ELSE 1 END LIMIT ?3 """, (like, q, limit), ) as cur: rows = await cur.fetchall() return [dict(r) for r in rows] # --------------------------------------------------------------------------- # Player resolve / aggregate (derived from player_games_hist) # --------------------------------------------------------------------------- async def search_players(query: str, limit: int = 25) -> List[Dict[str, Any]]: """Autocomplete-friendly player search by nick. Returns [{uid, nick}].""" q = (query or "").strip() if len(q) < 2: return [] async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: conn.row_factory = aiosqlite.Row await conn.create_function("ulower", 1, str.lower) async with conn.execute( """ SELECT nick, UID FROM ( SELECT nick, UID, MAX(endtime_unix) AS last_seen FROM player_games_hist WHERE nick LIKE ? COLLATE NOCASE GROUP BY UID ORDER BY CASE WHEN ulower(nick) = ulower(?) THEN 0 WHEN ulower(nick) LIKE ulower(?) THEN 1 ELSE 2 END, last_seen DESC LIMIT ? ) """, (f"{q}%", q, f"{q}%", limit), ) as cur: rows = await cur.fetchall() return [{"uid": r["UID"], "nick": r["nick"]} for r in rows] async def resolve_players(name: str) -> List[Dict[str, Any]]: """Resolve a nick to candidates (exact match first, else substring). Returns ``[{uid, nick}]`` grouped by UID. """ name = (name or "").strip() if not name: return [] async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: conn.row_factory = aiosqlite.Row async with conn.execute( "SELECT UID, MIN(nick) AS nick FROM player_games_hist " "WHERE nick = ? COLLATE NOCASE GROUP BY UID ORDER BY nick LIMIT 25", (name,), ) as cur: rows = list(await cur.fetchall()) if not rows: async with conn.execute( "SELECT UID, MIN(nick) AS nick FROM player_games_hist " "WHERE nick LIKE ? COLLATE NOCASE GROUP BY UID ORDER BY nick LIMIT 25", (f"%{name}%",), ) as cur: rows = list(await cur.fetchall()) return [{"uid": r["UID"], "nick": r["nick"]} for r in rows] async def latest_nick_for_uid(uid: str) -> str: """Best-effort latest nick for a UID; falls back to the UID string.""" async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: async with conn.execute( "SELECT nick FROM player_games_hist WHERE UID = ? ORDER BY endtime_unix DESC LIMIT 1", (uid,), ) as cur: row = await cur.fetchone() return row[0] if row else str(uid) async def player_career(uid: str) -> Optional[Dict[str, Any]]: """Aggregate career stats for a UID. Player totals (kills/deaths/...) are Spectra per-player totals duplicated across that player's vehicle rows, so we collapse to one value per session (MAX) before summing — see the module docstring. """ async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: conn.row_factory = aiosqlite.Row async with conn.execute( """ SELECT COUNT(*) AS battles, SUM(CASE WHEN UPPER(victor_bool) = 'WIN' THEN 1 ELSE 0 END) AS wins, SUM(CASE WHEN UPPER(victor_bool) = 'LOSS' THEN 1 ELSE 0 END) AS losses, SUM(gk) AS ground_kills, SUM(ak) AS air_kills, SUM(asi) AS assists, SUM(cap) AS captures, SUM(de) AS deaths FROM ( SELECT session_id, MAX(victor_bool) AS victor_bool, MAX(ground_kills) AS gk, MAX(air_kills) AS ak, MAX(assists) AS asi, MAX(captures) AS cap, MAX(deaths) AS de FROM player_games_hist WHERE UID = ? GROUP BY session_id ) """, (uid,), ) as cur: row = await cur.fetchone() if not row or not row["battles"]: return None career = dict(row) career["nick"] = await latest_nick_for_uid(uid) career["uid"] = str(uid) return career async def player_teams(uid: str) -> List[Dict[str, Any]]: """Teams a UID has appeared with, most recent first.""" async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: conn.row_factory = aiosqlite.Row async with conn.execute( """ SELECT team_id, MAX(team_name) AS team_name, COUNT(DISTINCT session_id) AS games, MAX(endtime_unix) AS last_seen FROM player_games_hist WHERE UID = ? GROUP BY team_id, team_name ORDER BY last_seen DESC """, (uid,), ) as cur: rows = await cur.fetchall() return [dict(r) for r in rows]