diff --git a/BOT/storage.py b/BOT/storage.py index 1e84401..77d6327 100644 --- a/BOT/storage.py +++ b/BOT/storage.py @@ -1,28 +1,25 @@ -"""TSSBOT storage layer — SQLite paths + idempotent DB init. +"""TSSBOT storage layer — SQLite paths, idempotent DB init, and insert helpers. Two databases live under ``STORAGE_VOL_PATH`` (set in ``TSSBOT/.env``): -* ``tss_teams.db`` — persistent team registry (analogue of SREBOT's squadrons.db) -* ``tss_battles.db`` — per-match summary + per-player game history - (analogue of SREBOT's sq_battles.db) +* ``tss_teams.db`` — persistent team registry +* ``tss_battles.db`` — per-match summary + per-player/per-vehicle game history -Schemas mirror SREBOT shape so query patterns transfer directly. The -TSS-specific differences are: +One row is written to ``player_games_hist`` per vehicle *actually used* by each +player. Player-level stats (kills, deaths, etc.) are Spectra player totals and +are duplicated across vehicle rows — aggregation queries that sum stats must +``DISTINCT`` on session_id first to avoid double-counting. -* "squadron" → "team" / "clan_id" → "team_id" -* Spectra ships an untransformed per-player stat blob, so we keep extra - columns the raw payload provides (``score``, ``missile_evades``, - ``shell_interceptions``, ``team_kills_stat``, ``country_id``, ``team_slot``). -* No vehicle-translate / stat-divide transform — one row per - ``(UID, session_id, vehicle_internal)`` for each used unit, with the - player's full stat line copied onto each row. Aggregation queries that - sum across vehicles must ``DISTINCT`` on session_id first. +Session IDs are stored as hex strings throughout (Spectra sends decimal integers; +``tss_ws._session_id`` normalises them before anything hits the DB). """ from __future__ import annotations import logging import os +import time from pathlib import Path +from typing import Any, Dict import aiosqlite @@ -53,31 +50,25 @@ TSS_TEAMS_DB_PATH: Path = STORAGE_DIR / "tss_teams.db" _MATCH_SUMMARY_SQL = """ CREATE TABLE IF NOT EXISTS match_summary ( - session_id TEXT PRIMARY KEY, - map_name TEXT, - mission_mode TEXT, - starttime_unix INTEGER, - endtime_unix INTEGER, - draw INTEGER NOT NULL DEFAULT 0, - winning_slot TEXT, - losing_slot TEXT, - winning_team TEXT, - losing_team TEXT, - winning_team_json TEXT, - losing_team_json TEXT, - received_unix INTEGER, - winning_team_id INTEGER, - losing_team_id INTEGER + session_id TEXT PRIMARY KEY, + map_name TEXT, + mission_mode TEXT, + difficulty TEXT, + starttime_unix INTEGER, + endtime_unix INTEGER, + duration REAL, + draw INTEGER NOT NULL DEFAULT 0, + winning_slot TEXT, + losing_slot TEXT, + received_unix INTEGER, + tournament_id INTEGER ) """ _MATCH_SUMMARY_INDEXES = [ - "CREATE INDEX IF NOT EXISTS idx_ms_map_name ON match_summary(map_name)", - "CREATE INDEX IF NOT EXISTS idx_ms_endtime ON match_summary(endtime_unix)", - "CREATE INDEX IF NOT EXISTS idx_ms_winning_team ON match_summary(winning_team)", - "CREATE INDEX IF NOT EXISTS idx_ms_losing_team ON match_summary(losing_team)", - "CREATE INDEX IF NOT EXISTS idx_ms_winning_team_id ON match_summary(winning_team_id)", - "CREATE INDEX IF NOT EXISTS idx_ms_losing_team_id ON match_summary(losing_team_id)", + "CREATE INDEX IF NOT EXISTS idx_ms_map_name ON match_summary(map_name)", + "CREATE INDEX IF NOT EXISTS idx_ms_endtime ON match_summary(endtime_unix)", + "CREATE INDEX IF NOT EXISTS idx_ms_difficulty ON match_summary(difficulty)", ] @@ -85,7 +76,7 @@ _PLAYER_GAMES_SQL = """ CREATE TABLE IF NOT EXISTS player_games_hist ( UID TEXT NOT NULL, nick TEXT NOT NULL, - team_name TEXT NOT NULL DEFAULT 'UNKNOWN', + team_name TEXT, team_tag TEXT NOT NULL DEFAULT 'UNKNOWN', team_slot TEXT, session_id TEXT NOT NULL, @@ -104,6 +95,10 @@ CREATE TABLE IF NOT EXISTS player_games_hist ( victor_bool TEXT NOT NULL DEFAULT 'Loss', endtime_unix INTEGER NOT NULL DEFAULT 0, team_id INTEGER, + tss_team_uuid TEXT, + tss_role TEXT, + tss_place INTEGER, + pvp_ratio REAL, UNIQUE (UID, session_id, vehicle_internal) ) """ @@ -243,6 +238,15 @@ async def _init_battles_db() -> None: await conn.execute(_PLAYER_GAMES_SQL) await _apply(conn, _MATCH_SUMMARY_INDEXES) await _apply(conn, _PLAYER_GAMES_INDEXES) + await _migrate(conn, "match_summary", { + "tournament_id": "tournament_id INTEGER", + }) + await _migrate(conn, "player_games_hist", { + "tss_team_uuid": "tss_team_uuid TEXT", + "tss_role": "tss_role TEXT", + "tss_place": "tss_place INTEGER", + "pvp_ratio": "pvp_ratio REAL", + }) await conn.commit() @@ -261,6 +265,7 @@ async def _init_teams_db() -> None: await _migrate(conn, "teams_data", { "clanrating": "clanrating INTEGER", }) + await _migrate(conn, "team_members", { "points": "points INTEGER NOT NULL DEFAULT 0", }) @@ -285,3 +290,109 @@ async def init_tss_dbs() -> None: TSS_BATTLES_DB_PATH, TSS_TEAMS_DB_PATH, ) + + +# --------------------------------------------------------------------------- +# Insertion helpers +# --------------------------------------------------------------------------- + +async def insert_match(game: Dict[str, Any]) -> None: + """Insert one row into match_summary from a normalised game dict. + + ``game["_id"]`` must already be a hex string (normalised by tss_ws). + Safe to call multiple times — INSERT OR IGNORE skips duplicates. + """ + async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: + for sql in _PRAGMAS: + await conn.execute(sql) + await conn.execute( + """ + INSERT OR IGNORE INTO match_summary + (session_id, map_name, mission_mode, difficulty, + starttime_unix, endtime_unix, duration, + draw, winning_slot, losing_slot, received_unix) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + str(game["_id"]), + str(game.get("mission_name") or game.get("level_path") or ""), + str(game.get("mission_mode") or ""), + str(game.get("difficulty") or ""), + int(game.get("start_ts") or 0), + int(game.get("end_ts") or 0), + float(game.get("duration") or 0), + 1 if game.get("draw") else 0, + str(game.get("winner") or ""), + str(game.get("loser") or ""), + int(time.time()), + ), + ) + await conn.commit() + + +async def insert_player_games(game: Dict[str, Any]) -> None: + """Insert one row per used vehicle per player into player_games_hist. + + victor_bool is set to 'Win' when the player's team slot matches the + winning slot, 'Loss' otherwise. + Safe to call multiple times — INSERT OR IGNORE skips duplicates. + """ + session_id = str(game["_id"]) + winner_slot = str(game.get("winner") or "") + end_ts = int(game.get("end_ts") or 0) + players = game.get("players") or {} + + rows = [] + for uid_str, p in players.items(): + victor_bool = "Win" if str(p.get("team", "")) == winner_slot else "Loss" + tag_raw = p.get("tag") or "" + team_tag = tag_raw[1:-1] if len(tag_raw) > 2 else tag_raw + + used_units = [u for u in (p.get("units") or []) if u.get("used")] + if not used_units: + continue + + for unit in used_units: + rows.append(( + str(uid_str), + str(p.get("name") or ""), + "", # team_name — resolved later + team_tag, + str(p.get("team") or ""), # team_slot ("1" or "2") + session_id, + str(unit.get("unit_normalized") or ""), + str(unit.get("unit") or ""), + int(p.get("ground_kills") or 0), + int(p.get("air_kills") or 0), + int(p.get("assists") or 0), + int(p.get("captures") or 0), + int(p.get("deaths") or 0), + int(p.get("score") or 0), + int(p.get("missile_evades") or 0), + int(p.get("shell_interceptions") or 0), + int(p.get("team_kills") or 0), + p.get("country_id"), + victor_bool, + end_ts, + None, # team_id — resolved later + )) + + if not rows: + return + + async with aiosqlite.connect(TSS_BATTLES_DB_PATH) as conn: + for sql in _PRAGMAS: + await conn.execute(sql) + await conn.executemany( + """ + INSERT OR IGNORE INTO player_games_hist + (UID, nick, team_name, team_tag, team_slot, session_id, + vehicle, vehicle_internal, + ground_kills, air_kills, assists, captures, deaths, score, + missile_evades, shell_interceptions, team_kills_stat, + country_id, victor_bool, endtime_unix, team_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + rows, + ) + await conn.commit() diff --git a/fetch_replays.sh b/fetch_replays.sh new file mode 100644 index 0000000..8462207 --- /dev/null +++ b/fetch_replays.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# Pull TSS replay dirs from the server for local study. +# Run once manually: bash fetch_replays.sh + +set -euo pipefail + +REMOTE="srebot" +REMOTE_PATH="/mnt/HC_Volume_105581488/STORAGE/REPLAYS/TSS/" +LOCAL_PATH="./replays_sample" + +mkdir -p "$LOCAL_PATH" + +rsync -avz --progress \ + --include="*/" \ + --include="replay_data.json.gz" \ + --exclude="*" \ + "${REMOTE}:${REMOTE_PATH}" \ + "$LOCAL_PATH/" + +echo "Done. Files in: $LOCAL_PATH" diff --git a/scripts/migrate_replay_ids.py b/scripts/migrate_replay_ids.py new file mode 100644 index 0000000..08b053d --- /dev/null +++ b/scripts/migrate_replay_ids.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +migrate_replay_ids.py + +One-shot migration: renames decimal replay directories to hex and updates +the _id field inside each replay_data.json.gz to match. + +Run on the server: + python3 migrate_replay_ids.py + python3 migrate_replay_ids.py --dry-run # preview without touching anything + +Reads STORAGE_VOL_PATH from environment or TSSBOT/.env. +""" +from __future__ import annotations + +import argparse +import gzip +import json +import os +import sys +from pathlib import Path + +_HERE = Path(__file__).resolve().parent + +# Try loading .env if python-dotenv is available +try: + from dotenv import load_dotenv + load_dotenv(dotenv_path=_HERE / ".env") +except ImportError: + pass + +STORAGE_VOL_PATH = os.environ.get("STORAGE_VOL_PATH", "").strip() +if not STORAGE_VOL_PATH: + print("ERROR: STORAGE_VOL_PATH not set", file=sys.stderr) + sys.exit(1) + +REPLAYS_DIR = Path(STORAGE_VOL_PATH) / "REPLAYS" / "TSS" + + +def is_decimal_id(name: str) -> bool: + """True if the directory name looks like a plain decimal integer (not hex).""" + try: + int(name) + return True + except ValueError: + return False + + +def to_hex(decimal_str: str) -> str: + return hex(int(decimal_str))[2:].lower() + + +def migrate_one(src_dir: Path, *, dry_run: bool) -> tuple[str, str] | None: + """ + Rename src_dir from decimal to hex, update _id inside replay_data.json.gz. + Returns (old_name, new_name) on success, None if already hex or skipped. + """ + name = src_dir.name + if not is_decimal_id(name): + return None + + hex_name = to_hex(name) + dst_dir = src_dir.parent / hex_name + + if dst_dir.exists(): + print(f" SKIP {name} → {hex_name} (destination already exists)") + return None + + gz_path = src_dir / "replay_data.json.gz" + + if gz_path.exists(): + try: + with gzip.open(gz_path, "rb") as fh: + data = json.loads(fh.read().decode("utf-8")) + except Exception as exc: + print(f" ERROR reading {gz_path}: {exc}") + return None + + data["_id"] = hex_name + + if not dry_run: + with gzip.open(gz_path, "wb") as fh: + fh.write(json.dumps(data, ensure_ascii=False).encode("utf-8")) + + if not dry_run: + src_dir.rename(dst_dir) + + return name, hex_name + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--dry-run", action="store_true", help="Print what would happen without changing anything") + args = parser.parse_args() + + if not REPLAYS_DIR.is_dir(): + print(f"ERROR: {REPLAYS_DIR} does not exist", file=sys.stderr) + sys.exit(1) + + entries = sorted(p for p in REPLAYS_DIR.iterdir() if p.is_dir()) + candidates = [p for p in entries if is_decimal_id(p.name)] + + if not candidates: + print("No decimal-named replay directories found. Nothing to do.") + return + + mode = "DRY RUN — " if args.dry_run else "" + print(f"{mode}Found {len(candidates)} decimal director(ies) to migrate in {REPLAYS_DIR}\n") + + ok = skipped = errors = 0 + for src_dir in candidates: + result = migrate_one(src_dir, dry_run=args.dry_run) + if result is None: + skipped += 1 + else: + old, new = result + tag = "[dry-run] " if args.dry_run else "" + print(f" {tag}{old} → {new}") + ok += 1 + + print(f"\nDone. migrated={ok} skipped={skipped} errors={errors}") + if args.dry_run: + print("(dry run — nothing was changed)") + + +if __name__ == "__main__": + main() diff --git a/tss-stats-collector.py b/tss-stats-collector.py new file mode 100644 index 0000000..a9f6335 --- /dev/null +++ b/tss-stats-collector.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +""" +tss-stats-collector.py + +Queries the TSS tournament API (tss.warthunder.com) for a specific tournament +and returns enriched team/player data for a given set of player UIDs. + +Intended to run as an enrichment step on game receipt: + 1. Game arrives with player UIDs + 2. Call fetch_players_from_tournament(tournament_id, uids) to get team info + 3. Merge results into player rows before writing to DB + +Usage (standalone): + python tss-stats-collector.py --tournament-id 20000 --uids 627841 118846315 + python tss-stats-collector.py --tournament-id 20000 --uids 627841 --full-team +""" +from __future__ import annotations + +import argparse +import asyncio +import json +import logging +import random +from typing import Any, Optional + +import aiohttp + +log = logging.getLogger("tss-stats-collector") + +TSS_API_URL = "https://tss.warthunder.com/functions.php" +TSS_API_HEADERS = {"Content-Type": "application/x-www-form-urlencoded"} + +RETRYABLE_STATUS = {408, 425, 429, 500, 502, 503, 504} + + +# --------------------------------------------------------------------------- +# HTTP client (ported from TSS/collector/client.py) +# --------------------------------------------------------------------------- + +class TSSClient: + def __init__( + self, + *, + request_timeout: float = 15.0, + retry_limit: int = 2, + retry_base_delay: float = 1.0, + retry_max_delay: float = 10.0, + ): + self._timeout = aiohttp.ClientTimeout(total=request_timeout) + self._retry_limit = retry_limit + self._retry_base = retry_base_delay + self._retry_max = retry_max_delay + self._session: Optional[aiohttp.ClientSession] = None + + async def __aenter__(self) -> "TSSClient": + self._session = aiohttp.ClientSession( + connector=aiohttp.TCPConnector(ttl_dns_cache=300, enable_cleanup_closed=True), + timeout=self._timeout, + headers=TSS_API_HEADERS, + ) + return self + + async def __aexit__(self, *_exc) -> None: + if self._session: + await self._session.close() + self._session = None + + async def call(self, action: str, **params: Any) -> Optional[dict]: + """POST to the TSS API; returns parsed JSON or None on failure.""" + assert self._session, "use `async with TSSClient()`" + data = {"action": action, **{k: str(v) for k, v in params.items() if v is not None}} + attempt = 0 + while True: + try: + async with self._session.post(TSS_API_URL, data=data) as resp: + if resp.status in RETRYABLE_STATUS: + raise aiohttp.ClientResponseError( + resp.request_info, resp.history, + status=resp.status, message=resp.reason or "", + ) + if resp.status != 200: + log.warning("%s %s -> HTTP %s", action, params, resp.status) + return None + text = await resp.text() + if not text: + return None + try: + return json.loads(text) + except json.JSONDecodeError: + log.error("%s -> non-JSON response", action) + return None + except (aiohttp.ClientError, asyncio.TimeoutError) as exc: + if attempt >= self._retry_limit: + log.error("%s failed after %d retries: %s", action, attempt, exc) + return None + delay = min(self._retry_max, self._retry_base * (2 ** attempt)) * (0.5 + random.random()) + await asyncio.sleep(delay) + attempt += 1 + + +# --------------------------------------------------------------------------- +# Enrichment functions (intended for use in TSSBOT receive pipeline) +# --------------------------------------------------------------------------- + +async def fetch_tournament_short(client: TSSClient, tournament_id: int) -> Optional[dict]: + """Raw GetStatsTournamentShort response for one tournament.""" + result = await client.call("GetStatsTournamentShort", tournamentID=tournament_id) + if not result or result.get("status") == "ERROR": + return None + return result + + +async def fetch_team_info(client: TSSClient, tournament_id: int, team_id: int) -> Optional[dict]: + """Raw infoTeam response for a specific team in a tournament.""" + return await client.call("infoTeam", tournamentID=tournament_id, teamID=team_id) + + +async def fetch_players_from_tournament( + client: TSSClient, + tournament_id: int, + target_uids: set[str], + *, + include_team_info: bool = False, +) -> list[dict]: + """ + Main enrichment function. Given a tournament ID and a set of player UIDs, + returns a list of dicts with TSS data for each matching player: + + { + tournament_id, player_id, nick, team_id, team_tag, team_uuid, + role, place, death, frag, exp_hit, pvp_ratio, + # if include_team_info=True, also: + team_country, team_hash, team_members, tournament_name_en, ... + } + + Returns empty list if tournament not found or no UIDs match. + """ + result = await fetch_tournament_short(client, tournament_id) + if not result: + return [] + + # Merge allUserStats + readyTopTeamsTournament, keyed by UID + combined: dict[str, dict] = {} + for entry in (result.get("allUserStats") or []) + (result.get("readyTopTeamsTournament") or []): + uid = str(entry.get("userID") or "") + if uid: + combined.setdefault(uid, entry) + + hits = [] + for uid, entry in combined.items(): + if uid not in target_uids: + continue + hits.append({ + "tournament_id": tournament_id, + "player_id": entry.get("userID"), + "nick": entry.get("nick"), + "team_id": entry.get("teamID"), + "team_tag": entry.get("realName"), + "team_uuid": entry.get("teamName"), + "role": entry.get("role"), + "place": entry.get("place"), + "death": entry.get("DEATH"), + "frag": entry.get("FRAG"), + "exp_hit": entry.get("EXP_HIT"), + "pvp_ratio": entry.get("pvp_ratio"), + }) + + if include_team_info and hits: + # One infoTeam call per unique team_id + team_ids = {h["team_id"] for h in hits if h.get("team_id")} + team_cache: dict = {} + for team_id in team_ids: + info = await fetch_team_info(client, tournament_id, int(team_id)) + if info: + team_cache[team_id] = info + + for h in hits: + info = team_cache.get(h.get("team_id")) + if not info: + continue + param_team = info.get("param_team") or {} + h["team_country"] = param_team.get("country") + h["team_hash"] = param_team.get("teamID") + h["team_members"] = [ + {"player_id": e.get("userID"), "nick": e.get("nick"), "role": e.get("role")} + for e in (info.get("users_team") or []) + ] + param_t = info.get("param_tournaments") or {} + h["tournament_name_en"] = param_t.get("nameEN") + h["tournament_name_ru"] = param_t.get("nameRU") + + return hits + + +# --------------------------------------------------------------------------- +# CLI (for manual lookups / testing) +# --------------------------------------------------------------------------- + +async def _main(args: argparse.Namespace) -> None: + logging.basicConfig(level=logging.WARNING, format="[%(levelname)s] %(message)s") + + target_uids = {str(u) for u in args.uids} + print(f"Querying tournament {args.tournament_id} for UIDs: {sorted(target_uids)}\n") + + async with TSSClient() as client: + results = await fetch_players_from_tournament( + client, + args.tournament_id, + target_uids, + include_team_info=args.full_team, + ) + + if not results: + print("No matching players found in that tournament.") + return + + print(f"Found {len(results)} match(es):\n") + for r in results: + print(json.dumps(r, indent=2, ensure_ascii=False)) + print() + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--tournament-id", type=int, required=True, help="TSS tournament ID to query") + p.add_argument("--uids", nargs="+", type=int, required=True, help="Player UIDs to look up") + p.add_argument("--full-team", action="store_true", help="Also fetch infoTeam for richer data (members, country, etc.)") + return p.parse_args() + + +if __name__ == "__main__": + asyncio.run(_main(_parse_args())) diff --git a/tss_ws.py b/tss_ws.py index 17b9a6f..e4f5e60 100644 --- a/tss_ws.py +++ b/tss_ws.py @@ -29,6 +29,8 @@ import zstandard as zstd from dotenv import load_dotenv from websockets.asyncio.client import connect as wsconnect +from BOT.storage import insert_match, insert_player_games + _HERE = Path(__file__).resolve().parent load_dotenv(dotenv_path=_HERE / ".env") @@ -56,20 +58,21 @@ def _auth_header() -> str: def _session_id(game: Dict[str, Any]) -> str: - """Extract a filesystem-safe session ID from a game dict.""" + """Return a hex session ID, converting decimal string/int IDs from Spectra.""" raw = game.get("_id") or game.get("id") if raw is None: return f"unknown_{int(time.time())}" - # Numeric ID → hex string (matches SREBOT convention) - if isinstance(raw, int): - return hex(raw)[2:].lower() - s = str(raw).strip().lower() - return s[2:] if s.startswith("0x") else s + try: + return hex(int(raw))[2:].lower() + except (ValueError, TypeError): + s = str(raw).strip().lower() + return s[2:] if s.startswith("0x") else s def _write_game(game: Dict[str, Any]) -> Path: - """Write game dict to REPLAYS/TSS//replay_data.json.gz.""" + """Normalize _id to hex, then write to REPLAYS/TSS//replay_data.json.gz.""" sid = _session_id(game) + game["_id"] = sid # hex from this point forward session_dir = REPLAYS_DIR / sid session_dir.mkdir(parents=True, exist_ok=True) out = session_dir / "replay_data.json.gz" @@ -161,8 +164,14 @@ async def listen( async def _handle_game(game: Dict[str, Any]) -> None: out = _write_game(game) - sid = _session_id(game) + sid = game["_id"] # hex, normalized by _write_game log.info("Saved game %s → %s", sid, out) + try: + await insert_match(game) + await insert_player_games(game) + log.info("Stored game %s in DB", sid) + except Exception as exc: + log.error("DB insert failed for %s: %s", sid, exc) print(json.dumps(game, indent=2, ensure_ascii=False)) print("-" * 80)