Files
TSSBOT/BOT/tss_tournaments.py
T
NotSoToothless a5abecb918 Auto merge dev → main (#1361)
* fix: don't freeze tournament brackets that overrun their scheduled end

date_end (TSS dateEndTournament) is only the *scheduled* end; tournaments
overrun it. compute_status now trusts in_active (GetActiveTournaments
presence) over date_end, and needs_scan no longer disables re-scans past
date_end + buffer. Fixes brackets stuck while their final was still pending.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* refactor: split _store_scan_sync into reusable upsert helpers

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: store_schedule writes meta/matches/standings without wiping battles

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* refactor: extract gather_structure_sync; add fetch_schedule_sync (no battles)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: reconcile_battles gap-fills replay links for played matches only

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: debounced schedule refresh coalesces game bursts into one TSS call

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: maybe_link_battle links replays from game match_id with zero TSS calls

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat: route received games through maybe_link_battle (fast-path bracket link)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* tss_tournaments: three small final-review cleanups

- link_battle_sync: drop unused team_a_name/team_b_name from SELECT and
  fix status index from r[3] to r[1]
- _replace_battles: add clarifying comment about fast-path caveat and
  that schedule refresh deliberately skips this function
- store_scan / store_schedule: dispatch sqlite writes off the event loop
  via run_in_thread instead of calling sync writers directly

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-29 04:23:34 -07:00

1119 lines
41 KiB
Python

"""TSSBOT tournament structure layer — ``tss_tournaments.db``.
The website needs the *authoritative* bracket of each tournament, not one
reconstructed from the replays we happened to capture (a partial sample). The
TSS site exposes the full structure; this module fetches it and stores it so the
website can read it read-only and overlay the replays we hold.
Trigger: when a game arrives carrying ``tss.tournament_id`` we haven't indexed
(or whose tournament is still live and stale), ``tss_ws`` schedules
``scan_and_store`` in the background.
Everything we know about the TSS API (endpoints, fields, examples, edge cases)
is documented in ``TSSBOT/docs/tss_tournament_api_reference.md``. The pure parse
helpers below are intentionally network-free so they can be unit-tested against
the captured sample payloads.
"""
from __future__ import annotations
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import logging
import os
import sqlite3
import threading
import time
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from BOT.storage import STORAGE_DIR
log = logging.getLogger("tssbot.tss_tournaments")
TSS_TOURNAMENTS_DB_PATH: Path = STORAGE_DIR / "tss_tournaments.db"
_API_URL = "https://tss.warthunder.com/functions.php"
_API_HEADERS = {"Content-Type": "application/x-www-form-urlencoded"}
_API_TIMEOUT = 20
_DEFAULT_BATTLE_WORKERS = max(1, int(os.environ.get("TSS_TOURNAMENT_BATTLE_WORKERS", "8")))
# Re-scan a live tournament at most this often; stop re-scanning this long after
# its end time (battle rows land shortly after a game finishes).
RESCAN_INTERVAL_SECONDS = 600
RESCAN_END_BUFFER_SECONDS = 6 * 3600
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
_TOURNAMENTS_SQL = """
CREATE TABLE IF NOT EXISTS tournaments (
tournament_id INTEGER PRIMARY KEY,
name TEXT,
format TEXT,
game_mode TEXT,
cluster TEXT,
date_start INTEGER,
date_end INTEGER,
team_count INTEGER NOT NULL DEFAULT 0,
match_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending',
first_seen_unix INTEGER,
scanned_unix INTEGER
)
"""
_TOURNAMENT_MATCHES_SQL = """
CREATE TABLE IF NOT EXISTS tournament_matches (
tournament_id INTEGER NOT NULL,
match_id TEXT NOT NULL,
type_bracket TEXT NOT NULL,
side TEXT,
round INTEGER,
position INTEGER,
team_a_uuid TEXT,
team_a_name TEXT,
team_b_uuid TEXT,
team_b_name TEXT,
winner_name TEXT,
score_a INTEGER NOT NULL DEFAULT 0,
score_b INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending',
time_start INTEGER,
PRIMARY KEY (tournament_id, match_id, type_bracket)
)
"""
_TOURNAMENT_BATTLES_SQL = """
CREATE TABLE IF NOT EXISTS tournament_battles (
session_hex TEXT PRIMARY KEY,
session_decimal TEXT,
tournament_id INTEGER NOT NULL,
match_id TEXT NOT NULL,
type_bracket TEXT NOT NULL,
position INTEGER,
winner_name TEXT,
status_replay TEXT
)
"""
_TOURNAMENT_STANDINGS_SQL = """
CREATE TABLE IF NOT EXISTS tournament_standings (
tournament_id INTEGER NOT NULL,
group_index INTEGER NOT NULL DEFAULT 0,
team_uuid TEXT NOT NULL,
team_name TEXT,
points INTEGER NOT NULL DEFAULT 0,
wins INTEGER NOT NULL DEFAULT 0,
draws INTEGER NOT NULL DEFAULT 0,
losses INTEGER NOT NULL DEFAULT 0,
buchholz REAL NOT NULL DEFAULT 0,
rank INTEGER,
PRIMARY KEY (tournament_id, group_index, team_uuid)
)
"""
_INDEXES = [
"CREATE INDEX IF NOT EXISTS idx_tm_tournament ON tournament_matches(tournament_id)",
"CREATE INDEX IF NOT EXISTS idx_tb_tournament ON tournament_battles(tournament_id)",
"CREATE INDEX IF NOT EXISTS idx_tb_match ON tournament_battles(tournament_id, match_id)",
"CREATE INDEX IF NOT EXISTS idx_ts_tournament ON tournament_standings(tournament_id)",
"CREATE INDEX IF NOT EXISTS idx_tournaments_end ON tournaments(date_end)",
]
_PRAGMAS = (
"PRAGMA journal_mode=WAL;",
"PRAGMA synchronous=NORMAL;",
"PRAGMA busy_timeout=5000;",
"PRAGMA temp_store=MEMORY;",
)
async def init_tss_tournaments_db() -> None:
"""Create ``tss_tournaments.db`` if absent. Idempotent."""
_init_tss_tournaments_db_sync()
log.info("tss_tournaments.db initialised: %s", TSS_TOURNAMENTS_DB_PATH)
def _connect() -> sqlite3.Connection:
conn = sqlite3.connect(TSS_TOURNAMENTS_DB_PATH, timeout=5)
for sql in _PRAGMAS:
conn.execute(sql)
return conn
def _init_tss_tournaments_db_sync() -> None:
with _connect() as conn:
conn.execute(_TOURNAMENTS_SQL)
conn.execute(_TOURNAMENT_MATCHES_SQL)
conn.execute(_TOURNAMENT_BATTLES_SQL)
conn.execute(_TOURNAMENT_STANDINGS_SQL)
for sql in _INDEXES:
conn.execute(sql)
conn.commit()
# ---------------------------------------------------------------------------
# Pure helpers (network-free, unit-tested)
# ---------------------------------------------------------------------------
def to_hex(decimal_url: Any) -> Optional[str]:
"""Convert a TSS decimal ``url`` to our hex session id (matches tss_ws)."""
if decimal_url in (None, ""):
return None
try:
return hex(int(decimal_url))[2:].lower()
except (ValueError, TypeError):
return None
def normalize_side(type_bracket: Optional[str]) -> str:
"""Bucket a TSS bracket/group label into a display side.
Order matters: ``LooserFinal`` contains both 'looser' and 'final'.
"""
t = (type_bracket or "").lower()
if "swiss" in t:
return "swiss"
if "group" in t:
return "group"
if "looser" in t or "loser" in t:
return "loser"
if "semifinal" in t or "final" in t:
return "final"
if "winner" in t:
return "winner"
return t or "match"
def apply_tournament_side_context(matches: List[Dict[str, Any]]) -> None:
"""Fix stage labels whose side depends on tournament format.
In double-elim data, TSS puts ``Semifinal`` under the loser bracket tree. In
single-elim data, ``Semifinal`` is just an elimination stage. Use presence of
Looser/Loser rows to disambiguate.
"""
has_loser_side = any(
"looser" in str(m.get("type_bracket") or "").lower()
or "loser" in str(m.get("type_bracket") or "").lower()
for m in matches
)
if not has_loser_side:
return
for match in matches:
if "semifinal" in str(match.get("type_bracket") or "").lower():
match["side"] = "loser"
def derive_format(
type_brackets: Any, type_tournament: Optional[str] = None
) -> str:
"""Authoritative format. ``typeTournament`` wins; else derive from sides."""
tt = (type_tournament or "").lower()
if "swiss" in tt:
return "swiss"
if tt.startswith("group"):
return "group"
if "double" in tt:
return "double-elim"
if "single" in tt:
return "single-elim"
sides = {str(t).lower() for t in (type_brackets or [])}
if any("swiss" in s for s in sides):
return "swiss"
if any("group" in s for s in sides):
return "group"
if any(("looser" in s or "loser" in s) for s in sides):
return "double-elim"
if any(("winner" in s or "final" in s) for s in sides):
return "single-elim"
return "unknown"
def team_name_map(all_teams: Any) -> Dict[str, str]:
"""Build {team_uuid: realName}, skipping the empty-UUID rows TSS sometimes emits."""
out: Dict[str, str] = {}
for row in all_teams or []:
if not isinstance(row, dict):
continue
uuid = row.get("teamName")
name = row.get("realName")
if uuid:
out[str(uuid)] = name or ""
return out
def _as_int(value: Any) -> Optional[int]:
try:
return int(value)
except (ValueError, TypeError):
return None
def parse_schedule_match(row: Dict[str, Any], names: Dict[str, str]) -> Dict[str, Any]:
"""Normalize an elimination match row.
Schedule rows carry ``matchNumber``/``finished``; bracket fallback rows do not,
but they often carry inline ``realNameA/B``. Handle both shapes.
"""
team_a = row.get("teamA") or ""
team_b = row.get("teamB") or ""
winner = row.get("winner") or ""
type_bracket = str(row.get("typeBracket") or "Winner")
finished = str(row.get("finished") or "") == "1"
if "finished" not in row and (winner or row.get("matchResult")):
finished = True
has_both = bool(team_a) and bool(team_b)
if has_both:
status = "played" if finished else "pending"
elif winner:
status = "bye"
else:
status = "pending"
return {
"match_id": str(row.get("id") or ""),
"type_bracket": type_bracket,
"side": normalize_side(type_bracket),
"round": _as_int(row.get("round")),
"position": _as_int(row.get("matchNumber")),
"team_a_uuid": team_a or None,
"team_a_name": (names.get(team_a) if team_a else None) or row.get("realNameA") or None,
"team_b_uuid": team_b or None,
"team_b_name": (names.get(team_b) if team_b else None) or row.get("realNameB") or None,
"winner_name": (names.get(winner) if winner else None)
or (row.get("realNameA") if winner and winner == team_a else None)
or (row.get("realNameB") if winner and winner == team_b else None),
"score_a": _as_int(row.get("scoreA")) or 0,
"score_b": _as_int(row.get("scoreB")) or 0,
"status": status,
"time_start": _as_int(row.get("timeStart")),
}
def parse_group_match(row: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize a ``GroupMatch[]`` (swiss/group) row — names are inline."""
team_a = row.get("teamA") or ""
team_b = row.get("teamB") or ""
winner = row.get("winner") or ""
type_bracket = str(row.get("typeGroup") or "Group")
name_a = row.get("realNameA")
name_b = row.get("realNameB")
winner_name = None
if winner:
if winner == team_a:
winner_name = name_a
elif winner == team_b:
winner_name = name_b
played = bool(row.get("statsStatus"))
return {
"match_id": str(row.get("id") or ""),
"type_bracket": type_bracket,
"side": normalize_side(type_bracket),
"round": None,
"position": None,
"team_a_uuid": team_a or None,
"team_a_name": name_a,
"team_b_uuid": team_b or None,
"team_b_name": name_b,
"winner_name": winner_name,
"score_a": _as_int(row.get("scoreA")) or 0,
"score_b": _as_int(row.get("scoreB")) or 0,
"status": "played" if played else "pending",
"time_start": _as_int(row.get("timeStart")),
}
def collect_group_rows(value: Any) -> List[Dict[str, Any]]:
"""Recursively pull match-like dicts from nested bracket/group/swiss structures."""
out: List[Dict[str, Any]] = []
if isinstance(value, dict):
if value.get("id") and ("teamA" in value or "idTeamA" in value):
out.append(value)
for child in value.values():
out.extend(collect_group_rows(child))
elif isinstance(value, list):
for child in value:
out.extend(collect_group_rows(child))
return out
def fill_names_from_battles(match: Dict[str, Any], rows: Any) -> None:
"""Use getListAllBattles team objects as a fallback name source.
``all_teams`` occasionally omits a UUID. Battle rows include full team
objects, but the object order is not a reliable slot signal, so match by UUID.
"""
if not isinstance(rows, list):
return
uuid_to_name: Dict[str, str] = {}
for row in rows:
if not isinstance(row, dict):
continue
for slot in ("teamA", "teamB"):
team = row.get(slot)
if not isinstance(team, dict):
continue
uuid = team.get("teamName")
name = team.get("realName")
if uuid and name:
uuid_to_name[str(uuid)] = str(name)
if not match.get("team_a_name") and match.get("team_a_uuid"):
match["team_a_name"] = uuid_to_name.get(str(match["team_a_uuid"]))
if not match.get("team_b_name") and match.get("team_b_uuid"):
match["team_b_name"] = uuid_to_name.get(str(match["team_b_uuid"]))
if not match.get("winner_name"):
for uuid_key in ("team_a_uuid", "team_b_uuid"):
uuid = match.get(uuid_key)
if uuid and str(uuid) in uuid_to_name:
if (
uuid_key == "team_a_uuid"
and match.get("score_a", 0) > match.get("score_b", 0)
) or (
uuid_key == "team_b_uuid"
and match.get("score_b", 0) > match.get("score_a", 0)
):
match["winner_name"] = uuid_to_name[str(uuid)]
def parse_standings(group_stage: Any) -> List[Dict[str, Any]]:
"""Flatten ``GroupStage`` (list of groups of standings rows)."""
out: List[Dict[str, Any]] = []
if not isinstance(group_stage, list):
return out
for group_index, group in enumerate(group_stage):
rows = group if isinstance(group, list) else [group]
for rank, row in enumerate(rows, start=1):
if not isinstance(row, dict) or not row.get("teamName"):
continue
try:
buchholz = float(row.get("buchholz_points") or 0)
except (ValueError, TypeError):
buchholz = 0.0
out.append({
"group_index": group_index,
"team_uuid": str(row.get("teamName")),
"team_name": row.get("realName"),
"points": _as_int(row.get("points")) or 0,
"wins": _as_int(row.get("won")) or 0,
"draws": _as_int(row.get("draw")) or 0,
"losses": _as_int(row.get("defeats")) or 0,
"buchholz": buchholz,
"rank": rank,
})
return out
def parse_battles(
rows: Any, tournament_id: int, match_id: str, type_bracket: str
) -> Tuple[List[Dict[str, Any]], bool]:
"""Return (battle rows with a session id, saw_technical_victory)."""
battles: List[Dict[str, Any]] = []
technical = False
for row in rows or []:
if not isinstance(row, dict):
continue
status_replay = row.get("statusReplay")
if status_replay == "technical victory":
technical = True
if status_replay in ("wait", "technical victory"):
continue
session_hex = to_hex(row.get("url"))
if not session_hex:
continue
battles.append({
"session_hex": session_hex,
"session_decimal": str(row.get("url")),
"tournament_id": tournament_id,
"match_id": match_id,
"type_bracket": type_bracket,
"position": _as_int(row.get("position")),
"winner_name": row.get("winner"),
"status_replay": status_replay,
})
return battles, technical
def compute_status(
matches: List[Dict[str, Any]],
date_end: Optional[int],
now: int,
in_active: Optional[bool] = True,
) -> str:
"""Tournament status from match completion + live-list presence + end date.
Presence in ``GetActiveTournaments`` is the authoritative live/over signal:
absence (``in_active`` False) is over, presence (True) is live. ``date_end``
is only TSS's *scheduled* end, which tournaments routinely overrun (finals run
hours/days late), so it must NOT finish a tournament that is still listed as
active — that froze brackets whose final landed after the scheduled end. The
``date_end`` heuristic is therefore a fallback used only when the active-list
state is unknown (``in_active`` None).
"""
if in_active is False:
return "finished"
if not matches:
return "pending"
if all(m["status"] in ("played", "bye", "technical") for m in matches):
return "finished"
# Has unplayed matches.
if in_active is True:
return "active" # still live/upcoming — overrunning the scheduled end is fine
# in_active is None: active list unavailable — fall back to scheduled end + buffer.
if date_end and now > date_end + RESCAN_END_BUFFER_SECONDS:
return "finished"
return "active"
def team_count(matches: List[Dict[str, Any]], standings: List[Dict[str, Any]]) -> int:
names = set()
for m in matches:
for key in ("team_a_name", "team_b_name"):
if m.get(key):
names.add(str(m[key]).lower())
for s in standings:
if s.get("team_name"):
names.add(str(s["team_name"]).lower())
return len(names)
# ---------------------------------------------------------------------------
# Network gather (sync — run via a dedicated thread)
# ---------------------------------------------------------------------------
def _request(method: str, action: str, **params: Any) -> Any:
fields = {"action": action, **{k: str(v) for k, v in params.items() if v is not None}}
if method == "GET":
url = f"{_API_URL}?{urllib.parse.urlencode(fields)}"
req = urllib.request.Request(url, headers=_API_HEADERS, method="GET")
else:
body = urllib.parse.urlencode(fields).encode()
req = urllib.request.Request(_API_URL, data=body, headers=_API_HEADERS, method="POST")
with urllib.request.urlopen(req, timeout=_API_TIMEOUT) as resp:
text = resp.read().decode("utf-8")
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return None
def _data_dict(response: Any) -> Dict[str, Any]:
data = response.get("data") if isinstance(response, dict) else None
return data if isinstance(data, dict) else {}
# GetActiveTournaments returns ~hundreds of rows; cache it briefly so a burst of
# scans doesn't refetch the whole list each time.
_ACTIVE_CACHE: Dict[str, Any] = {"at": 0, "by_id": {}}
_ACTIVE_TTL_SECONDS = 120
def _active_meta(tournament_id: int, now: int) -> Tuple[Optional[Dict[str, Any]], Optional[bool]]:
"""Return (metadata, in_active_list) for a tournament from GetActiveTournaments.
``in_active`` is False when a successful active-list fetch says the
tournament has dropped off the live/upcoming list. It is None when we could
not establish the active list, so callers must not mark the tournament
finished solely from absence.
"""
if now - _ACTIVE_CACHE["at"] > _ACTIVE_TTL_SECONDS:
resp = _request("POST", "GetActiveTournaments")
rows = resp.get("data") if isinstance(resp, dict) else None
by_id: Dict[str, Any] = {}
for row in rows or []:
if isinstance(row, dict) and row.get("tournamentID") is not None:
by_id[str(row["tournamentID"])] = row
# Only trust a non-empty fetch; a transient empty reply shouldn't mark
# every tournament finished.
if by_id:
_ACTIVE_CACHE["by_id"] = by_id
_ACTIVE_CACHE["at"] = now
by_id = _ACTIVE_CACHE["by_id"]
meta = by_id.get(str(tournament_id))
if not by_id:
return meta, None
return meta, meta is not None
def gather_structure_sync(
tournament_id: int,
*,
fallback_name: Optional[str] = None,
active_meta: Optional[Dict[str, Any]] = None,
now: Optional[int] = None,
) -> Dict[str, Any]:
"""Fetch + assemble the bracket structure (matches, standings, meta) for one
tournament. Network-bound; call via ``run_in_thread``. No battles fetched."""
now = now or int(time.time())
if active_meta is None:
active_meta, in_active = _active_meta(tournament_id, now)
else:
in_active = True
sched = _data_dict(_request("POST", "get_shedule_matches", tournamentId=tournament_id))
names = team_name_map(sched.get("all_teams"))
raw_matches = sched.get("all_matches") if isinstance(sched.get("all_matches"), list) else []
matches: List[Dict[str, Any]] = []
standings: List[Dict[str, Any]] = []
if raw_matches:
matches = [parse_schedule_match(r, names) for r in raw_matches if isinstance(r, dict)]
else:
bracket = _data_dict(_request("POST", "GetArrayBracketData", tournamentID=tournament_id))
bracket_rows = collect_group_rows(bracket.get("bracket") or bracket)
if bracket_rows:
matches = [parse_schedule_match(r, names) for r in bracket_rows if isinstance(r, dict)]
if not matches:
for action, kw in (
("GetArraySwissData", {"tournamentID": tournament_id, "id_group": 1}),
("GetArrayGroupData", {"tournamentID": tournament_id}),
):
data = _data_dict(_request("POST", action, **kw))
rows = collect_group_rows(data.get("GroupMatch"))
if rows:
matches = [parse_group_match(r) for r in rows]
standings = parse_standings(data.get("GroupStage"))
break
apply_tournament_side_context(matches)
type_set = {m["type_bracket"] for m in matches}
meta = active_meta or {}
name = meta.get("nameEN") or fallback_name
date_start = _as_int(meta.get("dateStartTournament"))
date_end = _as_int(meta.get("dateEndTournament"))
return {
"tournament_id": tournament_id,
"name": name,
"format": derive_format(type_set, meta.get("typeTournament")),
"game_mode": meta.get("gameMode"),
"cluster": meta.get("cluster"),
"date_start": date_start,
"date_end": date_end,
"team_count": team_count(matches, standings),
"match_count": len(matches),
"status": compute_status(matches, date_end, now, in_active),
"scanned_unix": now,
"matches": matches,
"standings": standings,
"in_active": in_active,
}
def fetch_schedule_sync(
tournament_id: int,
*,
fallback_name: Optional[str] = None,
now: Optional[int] = None,
) -> Dict[str, Any]:
"""Schedule-only gather (no battles), ready for ``store_schedule``."""
scan = gather_structure_sync(tournament_id, fallback_name=fallback_name, now=now)
scan["battles"] = []
return scan
def build_scan_sync(
tournament_id: int,
*,
fallback_name: Optional[str] = None,
active_meta: Optional[Dict[str, Any]] = None,
now: Optional[int] = None,
battle_workers: int = _DEFAULT_BATTLE_WORKERS,
progress: Optional[Callable[[str], None]] = None,
) -> Dict[str, Any]:
"""Fetch + assemble the full authoritative structure incl. battles."""
now = now or int(time.time())
scan = gather_structure_sync(
tournament_id, fallback_name=fallback_name, active_meta=active_meta, now=now
)
matches = scan["matches"]
battle_targets: List[Dict[str, Any]] = []
seen_battle_keys = set()
for match in matches:
mid, tb = match["match_id"], match["type_bracket"]
if not mid or (mid, tb) in seen_battle_keys:
continue
seen_battle_keys.add((mid, tb))
battle_targets.append(match)
if progress:
progress(f"{len(matches)} matches; fetching battles for {len(battle_targets)} match rows")
battles = fetch_battles_for_matches(
tournament_id, battle_targets, workers=max(1, battle_workers), progress=progress
)
scan["battles"] = battles
# Recompute status: fetch_battles_for_matches may have flipped matches to
# 'technical', which changes whether the tournament reads finished.
scan["status"] = compute_status(matches, scan["date_end"], now, scan["in_active"])
return scan
def fetch_battles_for_matches(
tournament_id: int,
matches: List[Dict[str, Any]],
*,
workers: int = _DEFAULT_BATTLE_WORKERS,
progress: Optional[Callable[[str], None]] = None,
) -> List[Dict[str, Any]]:
"""Fetch getListAllBattles rows for each match concurrently."""
if not matches:
return []
def one(match: Dict[str, Any]) -> Tuple[Dict[str, Any], Any, List[Dict[str, Any]], bool]:
mid, tb = match["match_id"], match["type_bracket"]
rows = _request(
"GET",
"getListAllBattles",
tournamentID=tournament_id,
idMatch=mid,
typeBracket=tb,
)
match_battles, technical = parse_battles(rows, tournament_id, mid, tb)
return match, rows, match_battles, technical
battles: List[Dict[str, Any]] = []
done = 0
total = len(matches)
with ThreadPoolExecutor(max_workers=max(1, workers)) as pool:
futures = [pool.submit(one, match) for match in matches]
for future in as_completed(futures):
match, rows, match_battles, technical = future.result()
fill_names_from_battles(match, rows)
if technical and match["status"] in ("pending", "bye"):
match["status"] = "technical"
battles.extend(match_battles)
done += 1
if progress and (done == total or done % 25 == 0):
progress(f"battle lookups {done}/{total}")
return battles
def reconcile_targets_sync(tid: int) -> List[Dict[str, str]]:
"""Matches that should have a replay link but don't yet: played/technical with
zero stored battles (a game we missed, or a not-yet-linked technical victory).
Byes (empty team slot) are excluded — no battle is expected."""
with _connect() as conn:
rows = conn.execute(
"""
SELECT m.match_id, m.type_bracket
FROM tournament_matches m
WHERE m.tournament_id = ?
AND m.status IN ('played', 'technical')
AND NOT EXISTS (
SELECT 1 FROM tournament_battles b
WHERE b.tournament_id = m.tournament_id
AND b.match_id = m.match_id
AND b.type_bracket = m.type_bracket
)
""",
(tid,),
).fetchall()
return [{"match_id": r[0], "type_bracket": r[1]} for r in rows]
def _insert_battles_sync(battles: List[Dict[str, Any]]) -> None:
"""Upsert battle rows without deleting anything (preserves fast-path links)."""
if not battles:
return
with _connect() as conn:
conn.executemany(
"""
INSERT OR REPLACE INTO tournament_battles
(session_hex, session_decimal, tournament_id, match_id,
type_bracket, position, winner_name, status_replay)
VALUES (?,?,?,?,?,?,?,?)
""",
[
(
b["session_hex"], b["session_decimal"], b["tournament_id"],
b["match_id"], b["type_bracket"], b["position"],
b["winner_name"], b["status_replay"],
)
for b in battles
],
)
conn.commit()
async def reconcile_battles(
tid: int,
*,
now: Optional[int] = None,
force: bool = False,
workers: int = _DEFAULT_BATTLE_WORKERS,
) -> int:
"""Fill replay links for played matches that have no stored session. Calls
getListAllBattles ONLY for those gap matches. Rate-limited unless ``force``."""
now = now or int(time.time())
if not force and (_reconcile_last.get(tid, 0) + _RECONCILE_INTERVAL_SECONDS > now):
return 0
targets = await run_in_thread(reconcile_targets_sync, tid)
if not targets:
_reconcile_last[tid] = now
return 0
battles = await run_in_thread(fetch_battles_for_matches, tid, targets, workers=workers)
await run_in_thread(_insert_battles_sync, battles)
_reconcile_last[tid] = now
return len(battles)
# ---------------------------------------------------------------------------
# Storage
# ---------------------------------------------------------------------------
async def store_scan(scan: Dict[str, Any]) -> None:
"""Upsert one assembled scan into ``tss_tournaments.db`` transactionally."""
await run_in_thread(_store_scan_sync, scan)
def _upsert_tournament_meta(conn: sqlite3.Connection, scan: Dict[str, Any]) -> None:
tid = scan["tournament_id"]
now = scan["scanned_unix"]
conn.execute(
"""
INSERT INTO tournaments
(tournament_id, name, format, game_mode, cluster, date_start,
date_end, team_count, match_count, status, first_seen_unix, scanned_unix)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(tournament_id) DO UPDATE SET
name=COALESCE(excluded.name, tournaments.name),
format=excluded.format,
game_mode=COALESCE(excluded.game_mode, tournaments.game_mode),
cluster=COALESCE(excluded.cluster, tournaments.cluster),
date_start=COALESCE(excluded.date_start, tournaments.date_start),
date_end=COALESCE(excluded.date_end, tournaments.date_end),
team_count=excluded.team_count,
match_count=excluded.match_count,
status=excluded.status,
scanned_unix=excluded.scanned_unix
""",
(
tid, scan.get("name"), scan.get("format"), scan.get("game_mode"),
scan.get("cluster"), scan.get("date_start"), scan.get("date_end"),
scan.get("team_count", 0), scan.get("match_count", 0),
scan.get("status", "pending"), now, now,
),
)
def _replace_matches(conn: sqlite3.Connection, tid: int, matches: List[Dict[str, Any]]) -> None:
conn.execute("DELETE FROM tournament_matches WHERE tournament_id = ?", (tid,))
conn.executemany(
"""
INSERT OR REPLACE INTO tournament_matches
(tournament_id, match_id, type_bracket, side, round, position,
team_a_uuid, team_a_name, team_b_uuid, team_b_name, winner_name,
score_a, score_b, status, time_start)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""",
[
(
tid, m["match_id"], m["type_bracket"], m["side"], m["round"],
m["position"], m["team_a_uuid"], m["team_a_name"], m["team_b_uuid"],
m["team_b_name"], m["winner_name"], m["score_a"], m["score_b"],
m["status"], m["time_start"],
)
for m in matches
],
)
def _replace_standings(conn: sqlite3.Connection, tid: int, standings: List[Dict[str, Any]]) -> None:
conn.execute("DELETE FROM tournament_standings WHERE tournament_id = ?", (tid,))
conn.executemany(
"""
INSERT OR REPLACE INTO tournament_standings
(tournament_id, group_index, team_uuid, team_name, points,
wins, draws, losses, buchholz, rank)
VALUES (?,?,?,?,?,?,?,?,?,?)
""",
[
(
tid, s["group_index"], s["team_uuid"], s["team_name"],
s["points"], s["wins"], s["draws"], s["losses"],
s["buchholz"], s["rank"],
)
for s in standings
],
)
def _replace_battles(conn: sqlite3.Connection, tid: int, battles: List[Dict[str, Any]]) -> None:
# Authoritative full-scan replace. NOTE: this briefly drops any fast-path
# battle link (maybe_link_battle) for a session TSS has not yet listed in
# getListAllBattles; such a link is refilled by a later reconcile or scan.
# The debounced schedule refresh deliberately does NOT call this.
conn.execute("DELETE FROM tournament_battles WHERE tournament_id = ?", (tid,))
conn.executemany(
"""
INSERT OR REPLACE INTO tournament_battles
(session_hex, session_decimal, tournament_id, match_id,
type_bracket, position, winner_name, status_replay)
VALUES (?,?,?,?,?,?,?,?)
""",
[
(
b["session_hex"], b["session_decimal"], b["tournament_id"],
b["match_id"], b["type_bracket"], b["position"],
b["winner_name"], b["status_replay"],
)
for b in battles
],
)
def _store_scan_sync(scan: Dict[str, Any]) -> None:
"""Synchronous implementation for ``store_scan``; run off the event loop."""
tid = scan["tournament_id"]
with _connect() as conn:
_upsert_tournament_meta(conn, scan)
_replace_matches(conn, tid, scan["matches"])
_replace_battles(conn, tid, scan["battles"])
_replace_standings(conn, tid, scan["standings"])
conn.commit()
def store_schedule_sync(scan: Dict[str, Any]) -> None:
"""Persist a schedule-only refresh: meta + matches + standings.
Deliberately does NOT touch ``tournament_battles`` — those rows are
maintained incrementally by the fast-path linker and the reconcile, and a
schedule refresh that wiped them would drop every replay link.
"""
tid = scan["tournament_id"]
with _connect() as conn:
_upsert_tournament_meta(conn, scan)
_replace_matches(conn, tid, scan["matches"])
_replace_standings(conn, tid, scan["standings"])
conn.commit()
async def store_schedule(scan: Dict[str, Any]) -> None:
"""Async wrapper for ``store_schedule_sync``."""
await run_in_thread(store_schedule_sync, scan)
async def needs_scan(tournament_id: int, *, now: Optional[int] = None) -> bool:
"""True if we've never scanned this tournament, or it's live and stale.
The only permanent stop is ``status == 'finished'`` (which ``compute_status``
sets reliably once the tournament drops out of ``GetActiveTournaments``). We
deliberately do NOT gate on ``date_end``: it is only the *scheduled* end, and
scans are game-triggered — a game arriving for a tournament is itself evidence
it is still live, even days past that scheduled end. Gating on ``date_end``
froze brackets whose final was played late. Re-scans stay interval-limited.
"""
now = now or int(time.time())
row = _scan_state_sync(tournament_id)
if row is None:
return True
status, _date_end, scanned_unix = row
if status == "finished":
return False
return (scanned_unix or 0) + RESCAN_INTERVAL_SECONDS <= now
def _scan_state_sync(tournament_id: int) -> Optional[Tuple[str, Optional[int], Optional[int]]]:
with _connect() as conn:
return conn.execute(
"SELECT status, date_end, scanned_unix FROM tournaments WHERE tournament_id = ?",
(tournament_id,),
).fetchone()
_db_ready = False
_inflight: set = set()
_tasks: set = set()
_RECONCILE_INTERVAL_SECONDS = 600
_reconcile_last: Dict[int, int] = {}
_REFRESH_DEBOUNCE_SECONDS = 30
_refresh_tasks: Dict[int, "asyncio.Task"] = {}
_refresh_names: Dict[int, Optional[str]] = {}
async def _ensure_db() -> None:
global _db_ready
if not _db_ready:
await init_tss_tournaments_db()
_db_ready = True
def link_battle_sync(
tid: int,
match_id: str,
session_hex: str,
*,
session_decimal: Optional[str] = None,
winner_name: Optional[str] = None,
) -> bool:
"""Link a captured replay to its match using the game's own ``match_id``.
Returns True if the match exists locally (battle inserted), False otherwise
so the caller can fall back to a full scan. No network access.
"""
with _connect() as conn:
rows = conn.execute(
"SELECT type_bracket, status "
"FROM tournament_matches WHERE tournament_id = ? AND match_id = ?",
(tid, match_id),
).fetchall()
if not rows:
return False
# Prefer a still-pending row when a match_id spans >1 type_bracket (rare).
chosen = next((r for r in rows if r[1] == "pending"), rows[0])
type_bracket = chosen[0]
if session_decimal is None:
try:
session_decimal = str(int(session_hex, 16))
except (ValueError, TypeError):
session_decimal = None
conn.execute(
"""
INSERT OR REPLACE INTO tournament_battles
(session_hex, session_decimal, tournament_id, match_id,
type_bracket, position, winner_name, status_replay)
VALUES (?,?,?,?,?,?,?,?)
""",
(session_hex, session_decimal, tid, match_id, type_bracket, None,
winner_name, "view replay"),
)
conn.commit()
return True
async def maybe_link_battle(game: Dict[str, Any]) -> None:
"""Trigger entry point for each received game: link its replay to the bracket
slot from ``tss.match_id`` (zero TSS calls) and request a debounced schedule
refresh. Falls back to a full scan when the match is not yet indexed."""
tss = game.get("tss") or {}
raw_tid = tss.get("tournament_id")
match_id = tss.get("match_id")
try:
tid = int(raw_tid)
except (TypeError, ValueError):
return
if tid <= 0:
return
await _ensure_db()
session_hex = str(game.get("_id") or "")
name = tss.get("tournament_name") or None
winner_slot = str(game.get("winner") or "")
winner_name = None
slot = tss.get(winner_slot) if winner_slot in ("1", "2") else None
if isinstance(slot, dict):
winner_name = slot.get("name")
if match_id and session_hex:
linked = await run_in_thread(
link_battle_sync, tid, str(match_id), session_hex, winner_name=winner_name
)
if linked:
request_schedule_refresh(tid, fallback_name=name)
return
# Unknown match (or missing match_id): fall back to the existing full scan.
await maybe_scan_tournament(game)
async def maybe_scan_tournament(game: Dict[str, Any]) -> None:
"""Trigger entry point: schedule a background scan if this game's tournament
is new or live-and-stale. Non-blocking and deduped per tournament id."""
tss = game.get("tss") or {}
raw_tid = tss.get("tournament_id")
try:
tid = int(raw_tid)
except (TypeError, ValueError):
return
if tid <= 0 or tid in _inflight:
return
await _ensure_db()
if not await needs_scan(tid):
return
name = tss.get("tournament_name") or None
_inflight.add(tid)
async def _run() -> None:
try:
await scan_and_store(tid, fallback_name=name)
finally:
_inflight.discard(tid)
task = asyncio.create_task(_run())
_tasks.add(task)
task.add_done_callback(_tasks.discard)
async def scan_and_store(
tournament_id: int, *, fallback_name: Optional[str] = None
) -> None:
"""Fetch (off-thread) and persist one tournament. Safe to fire-and-forget."""
try:
scan = await run_in_thread(build_scan_sync, tournament_id, fallback_name=fallback_name)
await store_scan(scan)
log.info(
"scanned tournament %s: %d matches, %d battles, %d standings (%s)",
tournament_id, scan["match_count"], len(scan["battles"]),
len(scan["standings"]), scan["status"],
)
except Exception as exc: # pragma: no cover - network/transient
log.error("tournament scan failed for %s: %s", tournament_id, exc)
def request_schedule_refresh(tid: int, *, fallback_name: Optional[str] = None) -> None:
"""Schedule a debounced schedule-only refresh for ``tid``. Repeated calls
within the debounce window collapse into the single pending refresh."""
if fallback_name and not _refresh_names.get(tid):
_refresh_names[tid] = fallback_name
if tid in _refresh_tasks and not _refresh_tasks[tid].done():
return # a refresh is already pending; this game folds into it
task = asyncio.create_task(_run_schedule_refresh(tid))
_refresh_tasks[tid] = task
_tasks.add(task)
task.add_done_callback(_tasks.discard)
async def _run_schedule_refresh(tid: int) -> None:
try:
await asyncio.sleep(_REFRESH_DEBOUNCE_SECONDS)
name = _refresh_names.pop(tid, None)
scan = await run_in_thread(fetch_schedule_sync, tid, fallback_name=name)
await store_schedule(scan)
await reconcile_battles(tid, force=(scan["status"] == "finished"))
log.info("schedule refresh %s: %d matches (%s)",
tid, scan["match_count"], scan["status"])
except Exception as exc: # pragma: no cover - network/transient
log.error("schedule refresh failed for %s: %s", tid, exc)
finally:
_refresh_tasks.pop(tid, None)
async def run_in_thread(fn, *args, **kwargs):
"""Run a blocking callable without relying on asyncio's default executor."""
loop = asyncio.get_running_loop()
fut = loop.create_future()
def _target() -> None:
try:
result = fn(*args, **kwargs)
except Exception as exc: # pragma: no cover - pass-through
loop.call_soon_threadsafe(fut.set_exception, exc)
else:
loop.call_soon_threadsafe(fut.set_result, result)
threading.Thread(target=_target, daemon=True).start()
return await fut