#!/usr/bin/env python3 """ tss-stats-collector.py Queries the TSS tournament API (tss.warthunder.com) for a specific tournament and returns enriched team/player data for a given set of player UIDs. Intended to run as an enrichment step on game receipt: 1. Game arrives with player UIDs 2. Call fetch_players_from_tournament(tournament_id, uids) to get team info 3. Merge results into player rows before writing to DB Usage (standalone): python tss-stats-collector.py --tournament-id 20000 --uids 627841 118846315 python tss-stats-collector.py --tournament-id 20000 --uids 627841 --full-team """ from __future__ import annotations import argparse import asyncio import json import logging import random from typing import Any, Optional import aiohttp log = logging.getLogger("tss-stats-collector") TSS_API_URL = "https://tss.warthunder.com/functions.php" TSS_API_HEADERS = {"Content-Type": "application/x-www-form-urlencoded"} RETRYABLE_STATUS = {408, 425, 429, 500, 502, 503, 504} # --------------------------------------------------------------------------- # HTTP client (ported from TSS/collector/client.py) # --------------------------------------------------------------------------- class TSSClient: def __init__( self, *, request_timeout: float = 15.0, retry_limit: int = 2, retry_base_delay: float = 1.0, retry_max_delay: float = 10.0, ): self._timeout = aiohttp.ClientTimeout(total=request_timeout) self._retry_limit = retry_limit self._retry_base = retry_base_delay self._retry_max = retry_max_delay self._session: Optional[aiohttp.ClientSession] = None async def __aenter__(self) -> "TSSClient": self._session = aiohttp.ClientSession( connector=aiohttp.TCPConnector(ttl_dns_cache=300, enable_cleanup_closed=True), timeout=self._timeout, headers=TSS_API_HEADERS, ) return self async def __aexit__(self, *_exc) -> None: if self._session: await self._session.close() self._session = None async def call(self, action: str, **params: Any) -> Optional[dict]: """POST to the TSS API; returns parsed JSON or None on failure.""" assert self._session, "use `async with TSSClient()`" data = {"action": action, **{k: str(v) for k, v in params.items() if v is not None}} attempt = 0 while True: try: async with self._session.post(TSS_API_URL, data=data) as resp: if resp.status in RETRYABLE_STATUS: raise aiohttp.ClientResponseError( resp.request_info, resp.history, status=resp.status, message=resp.reason or "", ) if resp.status != 200: log.warning("%s %s -> HTTP %s", action, params, resp.status) return None text = await resp.text() if not text: return None try: return json.loads(text) except json.JSONDecodeError: log.error("%s -> non-JSON response", action) return None except (aiohttp.ClientError, asyncio.TimeoutError) as exc: if attempt >= self._retry_limit: log.error("%s failed after %d retries: %s", action, attempt, exc) return None delay = min(self._retry_max, self._retry_base * (2 ** attempt)) * (0.5 + random.random()) await asyncio.sleep(delay) attempt += 1 # --------------------------------------------------------------------------- # Enrichment functions (intended for use in TSSBOT receive pipeline) # --------------------------------------------------------------------------- async def fetch_tournament_short(client: TSSClient, tournament_id: int) -> Optional[dict]: """Raw GetStatsTournamentShort response for one tournament.""" result = await client.call("GetStatsTournamentShort", tournamentID=tournament_id) if not result or result.get("status") == "ERROR": return None return result async def fetch_team_info(client: TSSClient, tournament_id: int, team_id: int) -> Optional[dict]: """Raw infoTeam response for a specific team in a tournament.""" return await client.call("infoTeam", tournamentID=tournament_id, teamID=team_id) async def fetch_players_from_tournament( client: TSSClient, tournament_id: int, target_uids: set[str], *, include_team_info: bool = False, ) -> list[dict]: """ Main enrichment function. Given a tournament ID and a set of player UIDs, returns a list of dicts with TSS data for each matching player: { tournament_id, player_id, nick, team_id, team_tag, team_uuid, role, place, death, frag, exp_hit, pvp_ratio, # if include_team_info=True, also: team_country, team_hash, team_members, tournament_name_en, ... } Returns empty list if tournament not found or no UIDs match. """ result = await fetch_tournament_short(client, tournament_id) if not result: return [] # Merge allUserStats + readyTopTeamsTournament, keyed by UID combined: dict[str, dict] = {} for entry in (result.get("allUserStats") or []) + (result.get("readyTopTeamsTournament") or []): uid = str(entry.get("userID") or "") if uid: combined.setdefault(uid, entry) hits = [] for uid, entry in combined.items(): if uid not in target_uids: continue hits.append({ "tournament_id": tournament_id, "player_id": entry.get("userID"), "nick": entry.get("nick"), "team_id": entry.get("teamID"), "team_tag": entry.get("realName"), "team_uuid": entry.get("teamName"), "role": entry.get("role"), "place": entry.get("place"), "death": entry.get("DEATH"), "frag": entry.get("FRAG"), "exp_hit": entry.get("EXP_HIT"), "pvp_ratio": entry.get("pvp_ratio"), }) if include_team_info and hits: # One infoTeam call per unique team_id team_ids = {h["team_id"] for h in hits if h.get("team_id")} team_cache: dict = {} for team_id in team_ids: info = await fetch_team_info(client, tournament_id, int(team_id)) if info: team_cache[team_id] = info for h in hits: info = team_cache.get(h.get("team_id")) if not info: continue param_team = info.get("param_team") or {} h["team_country"] = param_team.get("country") h["team_hash"] = param_team.get("teamID") h["team_members"] = [ {"player_id": e.get("userID"), "nick": e.get("nick"), "role": e.get("role")} for e in (info.get("users_team") or []) ] param_t = info.get("param_tournaments") or {} h["tournament_name_en"] = param_t.get("nameEN") h["tournament_name_ru"] = param_t.get("nameRU") return hits # --------------------------------------------------------------------------- # CLI (for manual lookups / testing) # --------------------------------------------------------------------------- async def _main(args: argparse.Namespace) -> None: logging.basicConfig(level=logging.WARNING, format="[%(levelname)s] %(message)s") target_uids = {str(u) for u in args.uids} print(f"Querying tournament {args.tournament_id} for UIDs: {sorted(target_uids)}\n") async with TSSClient() as client: results = await fetch_players_from_tournament( client, args.tournament_id, target_uids, include_team_info=args.full_team, ) if not results: print("No matching players found in that tournament.") return print(f"Found {len(results)} match(es):\n") for r in results: print(json.dumps(r, indent=2, ensure_ascii=False)) print() def _parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--tournament-id", type=int, required=True, help="TSS tournament ID to query") p.add_argument("--uids", nargs="+", type=int, required=True, help="Player UIDs to look up") p.add_argument("--full-team", action="store_true", help="Also fetch infoTeam for richer data (members, country, etc.)") return p.parse_args() if __name__ == "__main__": asyncio.run(_main(_parse_args()))