"""TSSBOT tournament structure layer — ``tss_tournaments.db``. The website needs the *authoritative* bracket of each tournament, not one reconstructed from the replays we happened to capture (a partial sample). The TSS site exposes the full structure; this module fetches it and stores it so the website can read it read-only and overlay the replays we hold. Trigger: when a game arrives carrying ``tss.tournament_id`` we haven't indexed (or whose tournament is still live and stale), ``tss_ws`` schedules ``scan_and_store`` in the background. Everything we know about the TSS API (endpoints, fields, examples, edge cases) is documented in ``TSSBOT/docs/tss_tournament_api_reference.md``. The pure parse helpers below are intentionally network-free so they can be unit-tested against the captured sample payloads. """ from __future__ import annotations import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed import json import logging import os import sqlite3 import threading import time import urllib.parse import urllib.request from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple from BOT.storage import STORAGE_DIR log = logging.getLogger("tssbot.tss_tournaments") TSS_TOURNAMENTS_DB_PATH: Path = STORAGE_DIR / "tss_tournaments.db" _API_URL = "https://tss.warthunder.com/functions.php" _API_HEADERS = {"Content-Type": "application/x-www-form-urlencoded"} _API_TIMEOUT = 20 _DEFAULT_BATTLE_WORKERS = max(1, int(os.environ.get("TSS_TOURNAMENT_BATTLE_WORKERS", "8"))) # Re-scan a live tournament at most this often; stop re-scanning this long after # its end time (battle rows land shortly after a game finishes). RESCAN_INTERVAL_SECONDS = 600 RESCAN_END_BUFFER_SECONDS = 6 * 3600 # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- _TOURNAMENTS_SQL = """ CREATE TABLE IF NOT EXISTS tournaments ( tournament_id INTEGER PRIMARY KEY, name TEXT, format TEXT, game_mode TEXT, cluster TEXT, date_start INTEGER, date_end INTEGER, team_count INTEGER NOT NULL DEFAULT 0, match_count INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'pending', first_seen_unix INTEGER, scanned_unix INTEGER ) """ _TOURNAMENT_MATCHES_SQL = """ CREATE TABLE IF NOT EXISTS tournament_matches ( tournament_id INTEGER NOT NULL, match_id TEXT NOT NULL, type_bracket TEXT NOT NULL, side TEXT, round INTEGER, position INTEGER, team_a_uuid TEXT, team_a_name TEXT, team_b_uuid TEXT, team_b_name TEXT, winner_name TEXT, score_a INTEGER NOT NULL DEFAULT 0, score_b INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'pending', time_start INTEGER, PRIMARY KEY (tournament_id, match_id, type_bracket) ) """ _TOURNAMENT_BATTLES_SQL = """ CREATE TABLE IF NOT EXISTS tournament_battles ( session_hex TEXT PRIMARY KEY, session_decimal TEXT, tournament_id INTEGER NOT NULL, match_id TEXT NOT NULL, type_bracket TEXT NOT NULL, position INTEGER, winner_name TEXT, status_replay TEXT ) """ _TOURNAMENT_STANDINGS_SQL = """ CREATE TABLE IF NOT EXISTS tournament_standings ( tournament_id INTEGER NOT NULL, group_index INTEGER NOT NULL DEFAULT 0, team_uuid TEXT NOT NULL, team_name TEXT, points INTEGER NOT NULL DEFAULT 0, wins INTEGER NOT NULL DEFAULT 0, draws INTEGER NOT NULL DEFAULT 0, losses INTEGER NOT NULL DEFAULT 0, buchholz REAL NOT NULL DEFAULT 0, rank INTEGER, PRIMARY KEY (tournament_id, group_index, team_uuid) ) """ _INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_tm_tournament ON tournament_matches(tournament_id)", "CREATE INDEX IF NOT EXISTS idx_tb_tournament ON tournament_battles(tournament_id)", "CREATE INDEX IF NOT EXISTS idx_tb_match ON tournament_battles(tournament_id, match_id)", "CREATE INDEX IF NOT EXISTS idx_ts_tournament ON tournament_standings(tournament_id)", "CREATE INDEX IF NOT EXISTS idx_tournaments_end ON tournaments(date_end)", ] _PRAGMAS = ( "PRAGMA journal_mode=WAL;", "PRAGMA synchronous=NORMAL;", "PRAGMA busy_timeout=5000;", "PRAGMA temp_store=MEMORY;", ) async def init_tss_tournaments_db() -> None: """Create ``tss_tournaments.db`` if absent. Idempotent.""" _init_tss_tournaments_db_sync() log.info("tss_tournaments.db initialised: %s", TSS_TOURNAMENTS_DB_PATH) def _connect() -> sqlite3.Connection: conn = sqlite3.connect(TSS_TOURNAMENTS_DB_PATH, timeout=5) for sql in _PRAGMAS: conn.execute(sql) return conn def _init_tss_tournaments_db_sync() -> None: with _connect() as conn: conn.execute(_TOURNAMENTS_SQL) conn.execute(_TOURNAMENT_MATCHES_SQL) conn.execute(_TOURNAMENT_BATTLES_SQL) conn.execute(_TOURNAMENT_STANDINGS_SQL) for sql in _INDEXES: conn.execute(sql) conn.commit() # --------------------------------------------------------------------------- # Pure helpers (network-free, unit-tested) # --------------------------------------------------------------------------- def to_hex(decimal_url: Any) -> Optional[str]: """Convert a TSS decimal ``url`` to our hex session id (matches tss_ws).""" if decimal_url in (None, ""): return None try: return hex(int(decimal_url))[2:].lower() except (ValueError, TypeError): return None def normalize_side(type_bracket: Optional[str]) -> str: """Bucket a TSS bracket/group label into a display side. Order matters: ``LooserFinal`` contains both 'looser' and 'final'. """ t = (type_bracket or "").lower() if "swiss" in t: return "swiss" if "group" in t: return "group" if "looser" in t or "loser" in t: return "loser" if "semifinal" in t or "final" in t: return "final" if "winner" in t: return "winner" return t or "match" def apply_tournament_side_context(matches: List[Dict[str, Any]]) -> None: """Fix stage labels whose side depends on tournament format. In double-elim data, TSS puts ``Semifinal`` under the loser bracket tree. In single-elim data, ``Semifinal`` is just an elimination stage. Use presence of Looser/Loser rows to disambiguate. """ has_loser_side = any( "looser" in str(m.get("type_bracket") or "").lower() or "loser" in str(m.get("type_bracket") or "").lower() for m in matches ) if not has_loser_side: return for match in matches: if "semifinal" in str(match.get("type_bracket") or "").lower(): match["side"] = "loser" def derive_format( type_brackets: Any, type_tournament: Optional[str] = None ) -> str: """Authoritative format. ``typeTournament`` wins; else derive from sides.""" tt = (type_tournament or "").lower() if "swiss" in tt: return "swiss" if tt.startswith("group"): return "group" if "double" in tt: return "double-elim" if "single" in tt: return "single-elim" sides = {str(t).lower() for t in (type_brackets or [])} if any("swiss" in s for s in sides): return "swiss" if any("group" in s for s in sides): return "group" if any(("looser" in s or "loser" in s) for s in sides): return "double-elim" if any(("winner" in s or "final" in s) for s in sides): return "single-elim" return "unknown" def team_name_map(all_teams: Any) -> Dict[str, str]: """Build {team_uuid: realName}, skipping the empty-UUID rows TSS sometimes emits.""" out: Dict[str, str] = {} for row in all_teams or []: if not isinstance(row, dict): continue uuid = row.get("teamName") name = row.get("realName") if uuid: out[str(uuid)] = name or "" return out def _as_int(value: Any) -> Optional[int]: try: return int(value) except (ValueError, TypeError): return None def parse_schedule_match(row: Dict[str, Any], names: Dict[str, str]) -> Dict[str, Any]: """Normalize an elimination match row. Schedule rows carry ``matchNumber``/``finished``; bracket fallback rows do not, but they often carry inline ``realNameA/B``. Handle both shapes. """ team_a = row.get("teamA") or "" team_b = row.get("teamB") or "" winner = row.get("winner") or "" type_bracket = str(row.get("typeBracket") or "Winner") finished = str(row.get("finished") or "") == "1" if "finished" not in row and (winner or row.get("matchResult")): finished = True has_both = bool(team_a) and bool(team_b) if has_both: status = "played" if finished else "pending" elif winner: status = "bye" else: status = "pending" return { "match_id": str(row.get("id") or ""), "type_bracket": type_bracket, "side": normalize_side(type_bracket), "round": _as_int(row.get("round")), "position": _as_int(row.get("matchNumber")), "team_a_uuid": team_a or None, "team_a_name": (names.get(team_a) if team_a else None) or row.get("realNameA") or None, "team_b_uuid": team_b or None, "team_b_name": (names.get(team_b) if team_b else None) or row.get("realNameB") or None, "winner_name": (names.get(winner) if winner else None) or (row.get("realNameA") if winner and winner == team_a else None) or (row.get("realNameB") if winner and winner == team_b else None), "score_a": _as_int(row.get("scoreA")) or 0, "score_b": _as_int(row.get("scoreB")) or 0, "status": status, "time_start": _as_int(row.get("timeStart")), } def parse_group_match(row: Dict[str, Any]) -> Dict[str, Any]: """Normalize a ``GroupMatch[]`` (swiss/group) row — names are inline.""" team_a = row.get("teamA") or "" team_b = row.get("teamB") or "" winner = row.get("winner") or "" type_bracket = str(row.get("typeGroup") or "Group") name_a = row.get("realNameA") name_b = row.get("realNameB") winner_name = None if winner: if winner == team_a: winner_name = name_a elif winner == team_b: winner_name = name_b played = bool(row.get("statsStatus")) return { "match_id": str(row.get("id") or ""), "type_bracket": type_bracket, "side": normalize_side(type_bracket), "round": None, "position": None, "team_a_uuid": team_a or None, "team_a_name": name_a, "team_b_uuid": team_b or None, "team_b_name": name_b, "winner_name": winner_name, "score_a": _as_int(row.get("scoreA")) or 0, "score_b": _as_int(row.get("scoreB")) or 0, "status": "played" if played else "pending", "time_start": _as_int(row.get("timeStart")), } def collect_group_rows(value: Any) -> List[Dict[str, Any]]: """Recursively pull match-like dicts from nested bracket/group/swiss structures.""" out: List[Dict[str, Any]] = [] if isinstance(value, dict): if value.get("id") and ("teamA" in value or "idTeamA" in value): out.append(value) for child in value.values(): out.extend(collect_group_rows(child)) elif isinstance(value, list): for child in value: out.extend(collect_group_rows(child)) return out def fill_names_from_battles(match: Dict[str, Any], rows: Any) -> None: """Use getListAllBattles team objects as a fallback name source. ``all_teams`` occasionally omits a UUID. Battle rows include full team objects, but the object order is not a reliable slot signal, so match by UUID. """ if not isinstance(rows, list): return uuid_to_name: Dict[str, str] = {} for row in rows: if not isinstance(row, dict): continue for slot in ("teamA", "teamB"): team = row.get(slot) if not isinstance(team, dict): continue uuid = team.get("teamName") name = team.get("realName") if uuid and name: uuid_to_name[str(uuid)] = str(name) if not match.get("team_a_name") and match.get("team_a_uuid"): match["team_a_name"] = uuid_to_name.get(str(match["team_a_uuid"])) if not match.get("team_b_name") and match.get("team_b_uuid"): match["team_b_name"] = uuid_to_name.get(str(match["team_b_uuid"])) if not match.get("winner_name"): for uuid_key in ("team_a_uuid", "team_b_uuid"): uuid = match.get(uuid_key) if uuid and str(uuid) in uuid_to_name: if ( uuid_key == "team_a_uuid" and match.get("score_a", 0) > match.get("score_b", 0) ) or ( uuid_key == "team_b_uuid" and match.get("score_b", 0) > match.get("score_a", 0) ): match["winner_name"] = uuid_to_name[str(uuid)] def parse_standings(group_stage: Any) -> List[Dict[str, Any]]: """Flatten ``GroupStage`` (list of groups of standings rows).""" out: List[Dict[str, Any]] = [] if not isinstance(group_stage, list): return out for group_index, group in enumerate(group_stage): rows = group if isinstance(group, list) else [group] for rank, row in enumerate(rows, start=1): if not isinstance(row, dict) or not row.get("teamName"): continue try: buchholz = float(row.get("buchholz_points") or 0) except (ValueError, TypeError): buchholz = 0.0 out.append({ "group_index": group_index, "team_uuid": str(row.get("teamName")), "team_name": row.get("realName"), "points": _as_int(row.get("points")) or 0, "wins": _as_int(row.get("won")) or 0, "draws": _as_int(row.get("draw")) or 0, "losses": _as_int(row.get("defeats")) or 0, "buchholz": buchholz, "rank": rank, }) return out def parse_battles( rows: Any, tournament_id: int, match_id: str, type_bracket: str ) -> Tuple[List[Dict[str, Any]], bool]: """Return (battle rows with a session id, saw_technical_victory).""" battles: List[Dict[str, Any]] = [] technical = False for row in rows or []: if not isinstance(row, dict): continue status_replay = row.get("statusReplay") if status_replay == "technical victory": technical = True if status_replay in ("wait", "technical victory"): continue session_hex = to_hex(row.get("url")) if not session_hex: continue battles.append({ "session_hex": session_hex, "session_decimal": str(row.get("url")), "tournament_id": tournament_id, "match_id": match_id, "type_bracket": type_bracket, "position": _as_int(row.get("position")), "winner_name": row.get("winner"), "status_replay": status_replay, }) return battles, technical def compute_status( matches: List[Dict[str, Any]], date_end: Optional[int], now: int, in_active: Optional[bool] = True, ) -> str: """Tournament status from match completion + end date + live-list presence. A tournament absent from ``GetActiveTournaments`` (``in_active`` False) is over — this is the reliable finished signal for old tournaments that carry no ``date_end``, and it stops ``needs_scan`` from re-scanning them forever. """ if in_active is False: return "finished" if not matches: return "pending" if date_end and now > date_end + RESCAN_END_BUFFER_SECONDS: return "finished" if all(m["status"] in ("played", "bye", "technical") for m in matches): return "finished" return "active" def team_count(matches: List[Dict[str, Any]], standings: List[Dict[str, Any]]) -> int: names = set() for m in matches: for key in ("team_a_name", "team_b_name"): if m.get(key): names.add(str(m[key]).lower()) for s in standings: if s.get("team_name"): names.add(str(s["team_name"]).lower()) return len(names) # --------------------------------------------------------------------------- # Network gather (sync — run via a dedicated thread) # --------------------------------------------------------------------------- def _request(method: str, action: str, **params: Any) -> Any: fields = {"action": action, **{k: str(v) for k, v in params.items() if v is not None}} if method == "GET": url = f"{_API_URL}?{urllib.parse.urlencode(fields)}" req = urllib.request.Request(url, headers=_API_HEADERS, method="GET") else: body = urllib.parse.urlencode(fields).encode() req = urllib.request.Request(_API_URL, data=body, headers=_API_HEADERS, method="POST") with urllib.request.urlopen(req, timeout=_API_TIMEOUT) as resp: text = resp.read().decode("utf-8") if not text: return None try: return json.loads(text) except json.JSONDecodeError: return None def _data_dict(response: Any) -> Dict[str, Any]: data = response.get("data") if isinstance(response, dict) else None return data if isinstance(data, dict) else {} # GetActiveTournaments returns ~hundreds of rows; cache it briefly so a burst of # scans doesn't refetch the whole list each time. _ACTIVE_CACHE: Dict[str, Any] = {"at": 0, "by_id": {}} _ACTIVE_TTL_SECONDS = 120 def _active_meta(tournament_id: int, now: int) -> Tuple[Optional[Dict[str, Any]], Optional[bool]]: """Return (metadata, in_active_list) for a tournament from GetActiveTournaments. ``in_active`` is False when a successful active-list fetch says the tournament has dropped off the live/upcoming list. It is None when we could not establish the active list, so callers must not mark the tournament finished solely from absence. """ if now - _ACTIVE_CACHE["at"] > _ACTIVE_TTL_SECONDS: resp = _request("POST", "GetActiveTournaments") rows = resp.get("data") if isinstance(resp, dict) else None by_id: Dict[str, Any] = {} for row in rows or []: if isinstance(row, dict) and row.get("tournamentID") is not None: by_id[str(row["tournamentID"])] = row # Only trust a non-empty fetch; a transient empty reply shouldn't mark # every tournament finished. if by_id: _ACTIVE_CACHE["by_id"] = by_id _ACTIVE_CACHE["at"] = now by_id = _ACTIVE_CACHE["by_id"] meta = by_id.get(str(tournament_id)) if not by_id: return meta, None return meta, meta is not None def build_scan_sync( tournament_id: int, *, fallback_name: Optional[str] = None, active_meta: Optional[Dict[str, Any]] = None, now: Optional[int] = None, battle_workers: int = _DEFAULT_BATTLE_WORKERS, progress: Optional[Callable[[str], None]] = None, ) -> Dict[str, Any]: """Fetch + assemble the full authoritative structure for one tournament. Network-bound; call via ``run_in_thread``. Returns a dict of rows ready for ``store_scan``. """ now = now or int(time.time()) if active_meta is None: active_meta, in_active = _active_meta(tournament_id, now) else: in_active = True sched = _data_dict(_request("POST", "get_shedule_matches", tournamentId=tournament_id)) names = team_name_map(sched.get("all_teams")) raw_matches = sched.get("all_matches") if isinstance(sched.get("all_matches"), list) else [] matches: List[Dict[str, Any]] = [] standings: List[Dict[str, Any]] = [] if raw_matches: matches = [parse_schedule_match(r, names) for r in raw_matches if isinstance(r, dict)] else: # Empty schedule can mean bracket fallback, swiss/group, or pre-start. bracket = _data_dict(_request("POST", "GetArrayBracketData", tournamentID=tournament_id)) bracket_rows = collect_group_rows(bracket.get("bracket") or bracket) if bracket_rows: matches = [parse_schedule_match(r, names) for r in bracket_rows if isinstance(r, dict)] if not matches: # Empty schedule/bracket → swiss/group. Try swiss, then group. for action, kw in ( ("GetArraySwissData", {"tournamentID": tournament_id, "id_group": 1}), ("GetArrayGroupData", {"tournamentID": tournament_id}), ): data = _data_dict(_request("POST", action, **kw)) rows = collect_group_rows(data.get("GroupMatch")) if rows: matches = [parse_group_match(r) for r in rows] standings = parse_standings(data.get("GroupStage")) break apply_tournament_side_context(matches) # Battles per match → session links. Dedupe match_ids (same id can repeat # across sources); fetch once per (match_id, type_bracket). battle_targets: List[Dict[str, Any]] = [] seen_battle_keys = set() for match in matches: mid, tb = match["match_id"], match["type_bracket"] if not mid or (mid, tb) in seen_battle_keys: continue seen_battle_keys.add((mid, tb)) battle_targets.append(match) if progress: progress(f"{len(matches)} matches; fetching battles for {len(battle_targets)} match rows") battles = fetch_battles_for_matches( tournament_id, battle_targets, workers=max(1, battle_workers), progress=progress, ) type_set = {m["type_bracket"] for m in matches} meta = active_meta or {} name = meta.get("nameEN") or fallback_name date_start = _as_int(meta.get("dateStartTournament")) date_end = _as_int(meta.get("dateEndTournament")) return { "tournament_id": tournament_id, "name": name, "format": derive_format(type_set, meta.get("typeTournament")), "game_mode": meta.get("gameMode"), "cluster": meta.get("cluster"), "date_start": date_start, "date_end": date_end, "team_count": team_count(matches, standings), "match_count": len(matches), "status": compute_status(matches, date_end, now, in_active), "scanned_unix": now, "matches": matches, "battles": battles, "standings": standings, } def fetch_battles_for_matches( tournament_id: int, matches: List[Dict[str, Any]], *, workers: int = _DEFAULT_BATTLE_WORKERS, progress: Optional[Callable[[str], None]] = None, ) -> List[Dict[str, Any]]: """Fetch getListAllBattles rows for each match concurrently.""" if not matches: return [] def one(match: Dict[str, Any]) -> Tuple[Dict[str, Any], Any, List[Dict[str, Any]], bool]: mid, tb = match["match_id"], match["type_bracket"] rows = _request( "GET", "getListAllBattles", tournamentID=tournament_id, idMatch=mid, typeBracket=tb, ) match_battles, technical = parse_battles(rows, tournament_id, mid, tb) return match, rows, match_battles, technical battles: List[Dict[str, Any]] = [] done = 0 total = len(matches) with ThreadPoolExecutor(max_workers=max(1, workers)) as pool: futures = [pool.submit(one, match) for match in matches] for future in as_completed(futures): match, rows, match_battles, technical = future.result() fill_names_from_battles(match, rows) if technical and match["status"] in ("pending", "bye"): match["status"] = "technical" battles.extend(match_battles) done += 1 if progress and (done == total or done % 25 == 0): progress(f"battle lookups {done}/{total}") return battles # --------------------------------------------------------------------------- # Storage # --------------------------------------------------------------------------- async def store_scan(scan: Dict[str, Any]) -> None: """Upsert one assembled scan into ``tss_tournaments.db`` transactionally.""" _store_scan_sync(scan) def _store_scan_sync(scan: Dict[str, Any]) -> None: """Synchronous implementation for ``store_scan``; run off the event loop.""" tid = scan["tournament_id"] now = scan["scanned_unix"] with _connect() as conn: conn.execute( """ INSERT INTO tournaments (tournament_id, name, format, game_mode, cluster, date_start, date_end, team_count, match_count, status, first_seen_unix, scanned_unix) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT(tournament_id) DO UPDATE SET name=COALESCE(excluded.name, tournaments.name), format=excluded.format, game_mode=COALESCE(excluded.game_mode, tournaments.game_mode), cluster=COALESCE(excluded.cluster, tournaments.cluster), date_start=COALESCE(excluded.date_start, tournaments.date_start), date_end=COALESCE(excluded.date_end, tournaments.date_end), team_count=excluded.team_count, match_count=excluded.match_count, status=excluded.status, scanned_unix=excluded.scanned_unix """, ( tid, scan.get("name"), scan.get("format"), scan.get("game_mode"), scan.get("cluster"), scan.get("date_start"), scan.get("date_end"), scan.get("team_count", 0), scan.get("match_count", 0), scan.get("status", "pending"), now, now, ), ) # Replace child rows for this tournament (authoritative snapshot). conn.execute("DELETE FROM tournament_matches WHERE tournament_id = ?", (tid,)) conn.execute("DELETE FROM tournament_battles WHERE tournament_id = ?", (tid,)) conn.execute("DELETE FROM tournament_standings WHERE tournament_id = ?", (tid,)) conn.executemany( """ INSERT OR REPLACE INTO tournament_matches (tournament_id, match_id, type_bracket, side, round, position, team_a_uuid, team_a_name, team_b_uuid, team_b_name, winner_name, score_a, score_b, status, time_start) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) """, [ ( tid, m["match_id"], m["type_bracket"], m["side"], m["round"], m["position"], m["team_a_uuid"], m["team_a_name"], m["team_b_uuid"], m["team_b_name"], m["winner_name"], m["score_a"], m["score_b"], m["status"], m["time_start"], ) for m in scan["matches"] ], ) conn.executemany( """ INSERT OR REPLACE INTO tournament_battles (session_hex, session_decimal, tournament_id, match_id, type_bracket, position, winner_name, status_replay) VALUES (?,?,?,?,?,?,?,?) """, [ ( b["session_hex"], b["session_decimal"], b["tournament_id"], b["match_id"], b["type_bracket"], b["position"], b["winner_name"], b["status_replay"], ) for b in scan["battles"] ], ) conn.executemany( """ INSERT OR REPLACE INTO tournament_standings (tournament_id, group_index, team_uuid, team_name, points, wins, draws, losses, buchholz, rank) VALUES (?,?,?,?,?,?,?,?,?,?) """, [ ( tid, s["group_index"], s["team_uuid"], s["team_name"], s["points"], s["wins"], s["draws"], s["losses"], s["buchholz"], s["rank"], ) for s in scan["standings"] ], ) conn.commit() async def needs_scan(tournament_id: int, *, now: Optional[int] = None) -> bool: """True if we've never scanned this tournament, or it's live and stale.""" now = now or int(time.time()) row = _scan_state_sync(tournament_id) if row is None: return True status, date_end, scanned_unix = row if status == "finished": return False if date_end and now > date_end + RESCAN_END_BUFFER_SECONDS: return False return (scanned_unix or 0) + RESCAN_INTERVAL_SECONDS <= now def _scan_state_sync(tournament_id: int) -> Optional[Tuple[str, Optional[int], Optional[int]]]: with _connect() as conn: return conn.execute( "SELECT status, date_end, scanned_unix FROM tournaments WHERE tournament_id = ?", (tournament_id,), ).fetchone() _db_ready = False _inflight: set = set() _tasks: set = set() async def _ensure_db() -> None: global _db_ready if not _db_ready: await init_tss_tournaments_db() _db_ready = True async def maybe_scan_tournament(game: Dict[str, Any]) -> None: """Trigger entry point: schedule a background scan if this game's tournament is new or live-and-stale. Non-blocking and deduped per tournament id.""" tss = game.get("tss") or {} raw_tid = tss.get("tournament_id") try: tid = int(raw_tid) except (TypeError, ValueError): return if tid <= 0 or tid in _inflight: return await _ensure_db() if not await needs_scan(tid): return name = tss.get("tournament_name") or None _inflight.add(tid) async def _run() -> None: try: await scan_and_store(tid, fallback_name=name) finally: _inflight.discard(tid) task = asyncio.create_task(_run()) _tasks.add(task) task.add_done_callback(_tasks.discard) async def scan_and_store( tournament_id: int, *, fallback_name: Optional[str] = None ) -> None: """Fetch (off-thread) and persist one tournament. Safe to fire-and-forget.""" try: scan = await run_in_thread(build_scan_sync, tournament_id, fallback_name=fallback_name) await store_scan(scan) log.info( "scanned tournament %s: %d matches, %d battles, %d standings (%s)", tournament_id, scan["match_count"], len(scan["battles"]), len(scan["standings"]), scan["status"], ) except Exception as exc: # pragma: no cover - network/transient log.error("tournament scan failed for %s: %s", tournament_id, exc) async def run_in_thread(fn, *args, **kwargs): """Run a blocking callable without relying on asyncio's default executor.""" loop = asyncio.get_running_loop() fut = loop.create_future() def _target() -> None: try: result = fn(*args, **kwargs) except Exception as exc: # pragma: no cover - pass-through loop.call_soon_threadsafe(fut.set_exception, exc) else: loop.call_soon_threadsafe(fut.set_result, result) threading.Thread(target=_target, daemon=True).start() return await fut