a5abecb918
* fix: don't freeze tournament brackets that overrun their scheduled end date_end (TSS dateEndTournament) is only the *scheduled* end; tournaments overrun it. compute_status now trusts in_active (GetActiveTournaments presence) over date_end, and needs_scan no longer disables re-scans past date_end + buffer. Fixes brackets stuck while their final was still pending. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * refactor: split _store_scan_sync into reusable upsert helpers Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * feat: store_schedule writes meta/matches/standings without wiping battles Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * refactor: extract gather_structure_sync; add fetch_schedule_sync (no battles) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * feat: reconcile_battles gap-fills replay links for played matches only Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * feat: debounced schedule refresh coalesces game bursts into one TSS call Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * feat: maybe_link_battle links replays from game match_id with zero TSS calls Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * feat: route received games through maybe_link_battle (fast-path bracket link) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * tss_tournaments: three small final-review cleanups - link_battle_sync: drop unused team_a_name/team_b_name from SELECT and fix status index from r[3] to r[1] - _replace_battles: add clarifying comment about fast-path caveat and that schedule refresh deliberately skips this function - store_scan / store_schedule: dispatch sqlite writes off the event loop via run_in_thread instead of calling sync writers directly Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1119 lines
41 KiB
Python
1119 lines
41 KiB
Python
"""TSSBOT tournament structure layer — ``tss_tournaments.db``.
|
|
|
|
The website needs the *authoritative* bracket of each tournament, not one
|
|
reconstructed from the replays we happened to capture (a partial sample). The
|
|
TSS site exposes the full structure; this module fetches it and stores it so the
|
|
website can read it read-only and overlay the replays we hold.
|
|
|
|
Trigger: when a game arrives carrying ``tss.tournament_id`` we haven't indexed
|
|
(or whose tournament is still live and stale), ``tss_ws`` schedules
|
|
``scan_and_store`` in the background.
|
|
|
|
Everything we know about the TSS API (endpoints, fields, examples, edge cases)
|
|
is documented in ``TSSBOT/docs/tss_tournament_api_reference.md``. The pure parse
|
|
helpers below are intentionally network-free so they can be unit-tested against
|
|
the captured sample payloads.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple
|
|
|
|
from BOT.storage import STORAGE_DIR
|
|
|
|
log = logging.getLogger("tssbot.tss_tournaments")
|
|
|
|
TSS_TOURNAMENTS_DB_PATH: Path = STORAGE_DIR / "tss_tournaments.db"
|
|
|
|
_API_URL = "https://tss.warthunder.com/functions.php"
|
|
_API_HEADERS = {"Content-Type": "application/x-www-form-urlencoded"}
|
|
_API_TIMEOUT = 20
|
|
_DEFAULT_BATTLE_WORKERS = max(1, int(os.environ.get("TSS_TOURNAMENT_BATTLE_WORKERS", "8")))
|
|
|
|
# Re-scan a live tournament at most this often; stop re-scanning this long after
|
|
# its end time (battle rows land shortly after a game finishes).
|
|
RESCAN_INTERVAL_SECONDS = 600
|
|
RESCAN_END_BUFFER_SECONDS = 6 * 3600
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_TOURNAMENTS_SQL = """
|
|
CREATE TABLE IF NOT EXISTS tournaments (
|
|
tournament_id INTEGER PRIMARY KEY,
|
|
name TEXT,
|
|
format TEXT,
|
|
game_mode TEXT,
|
|
cluster TEXT,
|
|
date_start INTEGER,
|
|
date_end INTEGER,
|
|
team_count INTEGER NOT NULL DEFAULT 0,
|
|
match_count INTEGER NOT NULL DEFAULT 0,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
first_seen_unix INTEGER,
|
|
scanned_unix INTEGER
|
|
)
|
|
"""
|
|
|
|
_TOURNAMENT_MATCHES_SQL = """
|
|
CREATE TABLE IF NOT EXISTS tournament_matches (
|
|
tournament_id INTEGER NOT NULL,
|
|
match_id TEXT NOT NULL,
|
|
type_bracket TEXT NOT NULL,
|
|
side TEXT,
|
|
round INTEGER,
|
|
position INTEGER,
|
|
team_a_uuid TEXT,
|
|
team_a_name TEXT,
|
|
team_b_uuid TEXT,
|
|
team_b_name TEXT,
|
|
winner_name TEXT,
|
|
score_a INTEGER NOT NULL DEFAULT 0,
|
|
score_b INTEGER NOT NULL DEFAULT 0,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
time_start INTEGER,
|
|
PRIMARY KEY (tournament_id, match_id, type_bracket)
|
|
)
|
|
"""
|
|
|
|
_TOURNAMENT_BATTLES_SQL = """
|
|
CREATE TABLE IF NOT EXISTS tournament_battles (
|
|
session_hex TEXT PRIMARY KEY,
|
|
session_decimal TEXT,
|
|
tournament_id INTEGER NOT NULL,
|
|
match_id TEXT NOT NULL,
|
|
type_bracket TEXT NOT NULL,
|
|
position INTEGER,
|
|
winner_name TEXT,
|
|
status_replay TEXT
|
|
)
|
|
"""
|
|
|
|
_TOURNAMENT_STANDINGS_SQL = """
|
|
CREATE TABLE IF NOT EXISTS tournament_standings (
|
|
tournament_id INTEGER NOT NULL,
|
|
group_index INTEGER NOT NULL DEFAULT 0,
|
|
team_uuid TEXT NOT NULL,
|
|
team_name TEXT,
|
|
points INTEGER NOT NULL DEFAULT 0,
|
|
wins INTEGER NOT NULL DEFAULT 0,
|
|
draws INTEGER NOT NULL DEFAULT 0,
|
|
losses INTEGER NOT NULL DEFAULT 0,
|
|
buchholz REAL NOT NULL DEFAULT 0,
|
|
rank INTEGER,
|
|
PRIMARY KEY (tournament_id, group_index, team_uuid)
|
|
)
|
|
"""
|
|
|
|
_INDEXES = [
|
|
"CREATE INDEX IF NOT EXISTS idx_tm_tournament ON tournament_matches(tournament_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_tb_tournament ON tournament_battles(tournament_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_tb_match ON tournament_battles(tournament_id, match_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_ts_tournament ON tournament_standings(tournament_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_tournaments_end ON tournaments(date_end)",
|
|
]
|
|
|
|
_PRAGMAS = (
|
|
"PRAGMA journal_mode=WAL;",
|
|
"PRAGMA synchronous=NORMAL;",
|
|
"PRAGMA busy_timeout=5000;",
|
|
"PRAGMA temp_store=MEMORY;",
|
|
)
|
|
|
|
|
|
async def init_tss_tournaments_db() -> None:
|
|
"""Create ``tss_tournaments.db`` if absent. Idempotent."""
|
|
_init_tss_tournaments_db_sync()
|
|
log.info("tss_tournaments.db initialised: %s", TSS_TOURNAMENTS_DB_PATH)
|
|
|
|
|
|
def _connect() -> sqlite3.Connection:
|
|
conn = sqlite3.connect(TSS_TOURNAMENTS_DB_PATH, timeout=5)
|
|
for sql in _PRAGMAS:
|
|
conn.execute(sql)
|
|
return conn
|
|
|
|
|
|
def _init_tss_tournaments_db_sync() -> None:
|
|
with _connect() as conn:
|
|
conn.execute(_TOURNAMENTS_SQL)
|
|
conn.execute(_TOURNAMENT_MATCHES_SQL)
|
|
conn.execute(_TOURNAMENT_BATTLES_SQL)
|
|
conn.execute(_TOURNAMENT_STANDINGS_SQL)
|
|
for sql in _INDEXES:
|
|
conn.execute(sql)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pure helpers (network-free, unit-tested)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def to_hex(decimal_url: Any) -> Optional[str]:
|
|
"""Convert a TSS decimal ``url`` to our hex session id (matches tss_ws)."""
|
|
if decimal_url in (None, ""):
|
|
return None
|
|
try:
|
|
return hex(int(decimal_url))[2:].lower()
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def normalize_side(type_bracket: Optional[str]) -> str:
|
|
"""Bucket a TSS bracket/group label into a display side.
|
|
|
|
Order matters: ``LooserFinal`` contains both 'looser' and 'final'.
|
|
"""
|
|
t = (type_bracket or "").lower()
|
|
if "swiss" in t:
|
|
return "swiss"
|
|
if "group" in t:
|
|
return "group"
|
|
if "looser" in t or "loser" in t:
|
|
return "loser"
|
|
if "semifinal" in t or "final" in t:
|
|
return "final"
|
|
if "winner" in t:
|
|
return "winner"
|
|
return t or "match"
|
|
|
|
|
|
def apply_tournament_side_context(matches: List[Dict[str, Any]]) -> None:
|
|
"""Fix stage labels whose side depends on tournament format.
|
|
|
|
In double-elim data, TSS puts ``Semifinal`` under the loser bracket tree. In
|
|
single-elim data, ``Semifinal`` is just an elimination stage. Use presence of
|
|
Looser/Loser rows to disambiguate.
|
|
"""
|
|
has_loser_side = any(
|
|
"looser" in str(m.get("type_bracket") or "").lower()
|
|
or "loser" in str(m.get("type_bracket") or "").lower()
|
|
for m in matches
|
|
)
|
|
if not has_loser_side:
|
|
return
|
|
for match in matches:
|
|
if "semifinal" in str(match.get("type_bracket") or "").lower():
|
|
match["side"] = "loser"
|
|
|
|
|
|
def derive_format(
|
|
type_brackets: Any, type_tournament: Optional[str] = None
|
|
) -> str:
|
|
"""Authoritative format. ``typeTournament`` wins; else derive from sides."""
|
|
tt = (type_tournament or "").lower()
|
|
if "swiss" in tt:
|
|
return "swiss"
|
|
if tt.startswith("group"):
|
|
return "group"
|
|
if "double" in tt:
|
|
return "double-elim"
|
|
if "single" in tt:
|
|
return "single-elim"
|
|
|
|
sides = {str(t).lower() for t in (type_brackets or [])}
|
|
if any("swiss" in s for s in sides):
|
|
return "swiss"
|
|
if any("group" in s for s in sides):
|
|
return "group"
|
|
if any(("looser" in s or "loser" in s) for s in sides):
|
|
return "double-elim"
|
|
if any(("winner" in s or "final" in s) for s in sides):
|
|
return "single-elim"
|
|
return "unknown"
|
|
|
|
|
|
def team_name_map(all_teams: Any) -> Dict[str, str]:
|
|
"""Build {team_uuid: realName}, skipping the empty-UUID rows TSS sometimes emits."""
|
|
out: Dict[str, str] = {}
|
|
for row in all_teams or []:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
uuid = row.get("teamName")
|
|
name = row.get("realName")
|
|
if uuid:
|
|
out[str(uuid)] = name or ""
|
|
return out
|
|
|
|
|
|
def _as_int(value: Any) -> Optional[int]:
|
|
try:
|
|
return int(value)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def parse_schedule_match(row: Dict[str, Any], names: Dict[str, str]) -> Dict[str, Any]:
|
|
"""Normalize an elimination match row.
|
|
|
|
Schedule rows carry ``matchNumber``/``finished``; bracket fallback rows do not,
|
|
but they often carry inline ``realNameA/B``. Handle both shapes.
|
|
"""
|
|
team_a = row.get("teamA") or ""
|
|
team_b = row.get("teamB") or ""
|
|
winner = row.get("winner") or ""
|
|
type_bracket = str(row.get("typeBracket") or "Winner")
|
|
finished = str(row.get("finished") or "") == "1"
|
|
if "finished" not in row and (winner or row.get("matchResult")):
|
|
finished = True
|
|
has_both = bool(team_a) and bool(team_b)
|
|
|
|
if has_both:
|
|
status = "played" if finished else "pending"
|
|
elif winner:
|
|
status = "bye"
|
|
else:
|
|
status = "pending"
|
|
|
|
return {
|
|
"match_id": str(row.get("id") or ""),
|
|
"type_bracket": type_bracket,
|
|
"side": normalize_side(type_bracket),
|
|
"round": _as_int(row.get("round")),
|
|
"position": _as_int(row.get("matchNumber")),
|
|
"team_a_uuid": team_a or None,
|
|
"team_a_name": (names.get(team_a) if team_a else None) or row.get("realNameA") or None,
|
|
"team_b_uuid": team_b or None,
|
|
"team_b_name": (names.get(team_b) if team_b else None) or row.get("realNameB") or None,
|
|
"winner_name": (names.get(winner) if winner else None)
|
|
or (row.get("realNameA") if winner and winner == team_a else None)
|
|
or (row.get("realNameB") if winner and winner == team_b else None),
|
|
"score_a": _as_int(row.get("scoreA")) or 0,
|
|
"score_b": _as_int(row.get("scoreB")) or 0,
|
|
"status": status,
|
|
"time_start": _as_int(row.get("timeStart")),
|
|
}
|
|
|
|
|
|
def parse_group_match(row: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Normalize a ``GroupMatch[]`` (swiss/group) row — names are inline."""
|
|
team_a = row.get("teamA") or ""
|
|
team_b = row.get("teamB") or ""
|
|
winner = row.get("winner") or ""
|
|
type_bracket = str(row.get("typeGroup") or "Group")
|
|
name_a = row.get("realNameA")
|
|
name_b = row.get("realNameB")
|
|
winner_name = None
|
|
if winner:
|
|
if winner == team_a:
|
|
winner_name = name_a
|
|
elif winner == team_b:
|
|
winner_name = name_b
|
|
played = bool(row.get("statsStatus"))
|
|
|
|
return {
|
|
"match_id": str(row.get("id") or ""),
|
|
"type_bracket": type_bracket,
|
|
"side": normalize_side(type_bracket),
|
|
"round": None,
|
|
"position": None,
|
|
"team_a_uuid": team_a or None,
|
|
"team_a_name": name_a,
|
|
"team_b_uuid": team_b or None,
|
|
"team_b_name": name_b,
|
|
"winner_name": winner_name,
|
|
"score_a": _as_int(row.get("scoreA")) or 0,
|
|
"score_b": _as_int(row.get("scoreB")) or 0,
|
|
"status": "played" if played else "pending",
|
|
"time_start": _as_int(row.get("timeStart")),
|
|
}
|
|
|
|
|
|
def collect_group_rows(value: Any) -> List[Dict[str, Any]]:
|
|
"""Recursively pull match-like dicts from nested bracket/group/swiss structures."""
|
|
out: List[Dict[str, Any]] = []
|
|
if isinstance(value, dict):
|
|
if value.get("id") and ("teamA" in value or "idTeamA" in value):
|
|
out.append(value)
|
|
for child in value.values():
|
|
out.extend(collect_group_rows(child))
|
|
elif isinstance(value, list):
|
|
for child in value:
|
|
out.extend(collect_group_rows(child))
|
|
return out
|
|
|
|
|
|
def fill_names_from_battles(match: Dict[str, Any], rows: Any) -> None:
|
|
"""Use getListAllBattles team objects as a fallback name source.
|
|
|
|
``all_teams`` occasionally omits a UUID. Battle rows include full team
|
|
objects, but the object order is not a reliable slot signal, so match by UUID.
|
|
"""
|
|
if not isinstance(rows, list):
|
|
return
|
|
uuid_to_name: Dict[str, str] = {}
|
|
for row in rows:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
for slot in ("teamA", "teamB"):
|
|
team = row.get(slot)
|
|
if not isinstance(team, dict):
|
|
continue
|
|
uuid = team.get("teamName")
|
|
name = team.get("realName")
|
|
if uuid and name:
|
|
uuid_to_name[str(uuid)] = str(name)
|
|
|
|
if not match.get("team_a_name") and match.get("team_a_uuid"):
|
|
match["team_a_name"] = uuid_to_name.get(str(match["team_a_uuid"]))
|
|
if not match.get("team_b_name") and match.get("team_b_uuid"):
|
|
match["team_b_name"] = uuid_to_name.get(str(match["team_b_uuid"]))
|
|
if not match.get("winner_name"):
|
|
for uuid_key in ("team_a_uuid", "team_b_uuid"):
|
|
uuid = match.get(uuid_key)
|
|
if uuid and str(uuid) in uuid_to_name:
|
|
if (
|
|
uuid_key == "team_a_uuid"
|
|
and match.get("score_a", 0) > match.get("score_b", 0)
|
|
) or (
|
|
uuid_key == "team_b_uuid"
|
|
and match.get("score_b", 0) > match.get("score_a", 0)
|
|
):
|
|
match["winner_name"] = uuid_to_name[str(uuid)]
|
|
|
|
|
|
def parse_standings(group_stage: Any) -> List[Dict[str, Any]]:
|
|
"""Flatten ``GroupStage`` (list of groups of standings rows)."""
|
|
out: List[Dict[str, Any]] = []
|
|
if not isinstance(group_stage, list):
|
|
return out
|
|
for group_index, group in enumerate(group_stage):
|
|
rows = group if isinstance(group, list) else [group]
|
|
for rank, row in enumerate(rows, start=1):
|
|
if not isinstance(row, dict) or not row.get("teamName"):
|
|
continue
|
|
try:
|
|
buchholz = float(row.get("buchholz_points") or 0)
|
|
except (ValueError, TypeError):
|
|
buchholz = 0.0
|
|
out.append({
|
|
"group_index": group_index,
|
|
"team_uuid": str(row.get("teamName")),
|
|
"team_name": row.get("realName"),
|
|
"points": _as_int(row.get("points")) or 0,
|
|
"wins": _as_int(row.get("won")) or 0,
|
|
"draws": _as_int(row.get("draw")) or 0,
|
|
"losses": _as_int(row.get("defeats")) or 0,
|
|
"buchholz": buchholz,
|
|
"rank": rank,
|
|
})
|
|
return out
|
|
|
|
|
|
def parse_battles(
|
|
rows: Any, tournament_id: int, match_id: str, type_bracket: str
|
|
) -> Tuple[List[Dict[str, Any]], bool]:
|
|
"""Return (battle rows with a session id, saw_technical_victory)."""
|
|
battles: List[Dict[str, Any]] = []
|
|
technical = False
|
|
for row in rows or []:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
status_replay = row.get("statusReplay")
|
|
if status_replay == "technical victory":
|
|
technical = True
|
|
if status_replay in ("wait", "technical victory"):
|
|
continue
|
|
session_hex = to_hex(row.get("url"))
|
|
if not session_hex:
|
|
continue
|
|
battles.append({
|
|
"session_hex": session_hex,
|
|
"session_decimal": str(row.get("url")),
|
|
"tournament_id": tournament_id,
|
|
"match_id": match_id,
|
|
"type_bracket": type_bracket,
|
|
"position": _as_int(row.get("position")),
|
|
"winner_name": row.get("winner"),
|
|
"status_replay": status_replay,
|
|
})
|
|
return battles, technical
|
|
|
|
|
|
def compute_status(
|
|
matches: List[Dict[str, Any]],
|
|
date_end: Optional[int],
|
|
now: int,
|
|
in_active: Optional[bool] = True,
|
|
) -> str:
|
|
"""Tournament status from match completion + live-list presence + end date.
|
|
|
|
Presence in ``GetActiveTournaments`` is the authoritative live/over signal:
|
|
absence (``in_active`` False) is over, presence (True) is live. ``date_end``
|
|
is only TSS's *scheduled* end, which tournaments routinely overrun (finals run
|
|
hours/days late), so it must NOT finish a tournament that is still listed as
|
|
active — that froze brackets whose final landed after the scheduled end. The
|
|
``date_end`` heuristic is therefore a fallback used only when the active-list
|
|
state is unknown (``in_active`` None).
|
|
"""
|
|
if in_active is False:
|
|
return "finished"
|
|
if not matches:
|
|
return "pending"
|
|
if all(m["status"] in ("played", "bye", "technical") for m in matches):
|
|
return "finished"
|
|
# Has unplayed matches.
|
|
if in_active is True:
|
|
return "active" # still live/upcoming — overrunning the scheduled end is fine
|
|
# in_active is None: active list unavailable — fall back to scheduled end + buffer.
|
|
if date_end and now > date_end + RESCAN_END_BUFFER_SECONDS:
|
|
return "finished"
|
|
return "active"
|
|
|
|
|
|
def team_count(matches: List[Dict[str, Any]], standings: List[Dict[str, Any]]) -> int:
|
|
names = set()
|
|
for m in matches:
|
|
for key in ("team_a_name", "team_b_name"):
|
|
if m.get(key):
|
|
names.add(str(m[key]).lower())
|
|
for s in standings:
|
|
if s.get("team_name"):
|
|
names.add(str(s["team_name"]).lower())
|
|
return len(names)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Network gather (sync — run via a dedicated thread)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _request(method: str, action: str, **params: Any) -> Any:
|
|
fields = {"action": action, **{k: str(v) for k, v in params.items() if v is not None}}
|
|
if method == "GET":
|
|
url = f"{_API_URL}?{urllib.parse.urlencode(fields)}"
|
|
req = urllib.request.Request(url, headers=_API_HEADERS, method="GET")
|
|
else:
|
|
body = urllib.parse.urlencode(fields).encode()
|
|
req = urllib.request.Request(_API_URL, data=body, headers=_API_HEADERS, method="POST")
|
|
with urllib.request.urlopen(req, timeout=_API_TIMEOUT) as resp:
|
|
text = resp.read().decode("utf-8")
|
|
if not text:
|
|
return None
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def _data_dict(response: Any) -> Dict[str, Any]:
|
|
data = response.get("data") if isinstance(response, dict) else None
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
|
# GetActiveTournaments returns ~hundreds of rows; cache it briefly so a burst of
|
|
# scans doesn't refetch the whole list each time.
|
|
_ACTIVE_CACHE: Dict[str, Any] = {"at": 0, "by_id": {}}
|
|
_ACTIVE_TTL_SECONDS = 120
|
|
|
|
|
|
def _active_meta(tournament_id: int, now: int) -> Tuple[Optional[Dict[str, Any]], Optional[bool]]:
|
|
"""Return (metadata, in_active_list) for a tournament from GetActiveTournaments.
|
|
|
|
``in_active`` is False when a successful active-list fetch says the
|
|
tournament has dropped off the live/upcoming list. It is None when we could
|
|
not establish the active list, so callers must not mark the tournament
|
|
finished solely from absence.
|
|
"""
|
|
if now - _ACTIVE_CACHE["at"] > _ACTIVE_TTL_SECONDS:
|
|
resp = _request("POST", "GetActiveTournaments")
|
|
rows = resp.get("data") if isinstance(resp, dict) else None
|
|
by_id: Dict[str, Any] = {}
|
|
for row in rows or []:
|
|
if isinstance(row, dict) and row.get("tournamentID") is not None:
|
|
by_id[str(row["tournamentID"])] = row
|
|
# Only trust a non-empty fetch; a transient empty reply shouldn't mark
|
|
# every tournament finished.
|
|
if by_id:
|
|
_ACTIVE_CACHE["by_id"] = by_id
|
|
_ACTIVE_CACHE["at"] = now
|
|
by_id = _ACTIVE_CACHE["by_id"]
|
|
meta = by_id.get(str(tournament_id))
|
|
if not by_id:
|
|
return meta, None
|
|
return meta, meta is not None
|
|
|
|
|
|
def gather_structure_sync(
|
|
tournament_id: int,
|
|
*,
|
|
fallback_name: Optional[str] = None,
|
|
active_meta: Optional[Dict[str, Any]] = None,
|
|
now: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Fetch + assemble the bracket structure (matches, standings, meta) for one
|
|
tournament. Network-bound; call via ``run_in_thread``. No battles fetched."""
|
|
now = now or int(time.time())
|
|
if active_meta is None:
|
|
active_meta, in_active = _active_meta(tournament_id, now)
|
|
else:
|
|
in_active = True
|
|
sched = _data_dict(_request("POST", "get_shedule_matches", tournamentId=tournament_id))
|
|
names = team_name_map(sched.get("all_teams"))
|
|
raw_matches = sched.get("all_matches") if isinstance(sched.get("all_matches"), list) else []
|
|
|
|
matches: List[Dict[str, Any]] = []
|
|
standings: List[Dict[str, Any]] = []
|
|
|
|
if raw_matches:
|
|
matches = [parse_schedule_match(r, names) for r in raw_matches if isinstance(r, dict)]
|
|
else:
|
|
bracket = _data_dict(_request("POST", "GetArrayBracketData", tournamentID=tournament_id))
|
|
bracket_rows = collect_group_rows(bracket.get("bracket") or bracket)
|
|
if bracket_rows:
|
|
matches = [parse_schedule_match(r, names) for r in bracket_rows if isinstance(r, dict)]
|
|
|
|
if not matches:
|
|
for action, kw in (
|
|
("GetArraySwissData", {"tournamentID": tournament_id, "id_group": 1}),
|
|
("GetArrayGroupData", {"tournamentID": tournament_id}),
|
|
):
|
|
data = _data_dict(_request("POST", action, **kw))
|
|
rows = collect_group_rows(data.get("GroupMatch"))
|
|
if rows:
|
|
matches = [parse_group_match(r) for r in rows]
|
|
standings = parse_standings(data.get("GroupStage"))
|
|
break
|
|
|
|
apply_tournament_side_context(matches)
|
|
|
|
type_set = {m["type_bracket"] for m in matches}
|
|
meta = active_meta or {}
|
|
name = meta.get("nameEN") or fallback_name
|
|
date_start = _as_int(meta.get("dateStartTournament"))
|
|
date_end = _as_int(meta.get("dateEndTournament"))
|
|
|
|
return {
|
|
"tournament_id": tournament_id,
|
|
"name": name,
|
|
"format": derive_format(type_set, meta.get("typeTournament")),
|
|
"game_mode": meta.get("gameMode"),
|
|
"cluster": meta.get("cluster"),
|
|
"date_start": date_start,
|
|
"date_end": date_end,
|
|
"team_count": team_count(matches, standings),
|
|
"match_count": len(matches),
|
|
"status": compute_status(matches, date_end, now, in_active),
|
|
"scanned_unix": now,
|
|
"matches": matches,
|
|
"standings": standings,
|
|
"in_active": in_active,
|
|
}
|
|
|
|
|
|
def fetch_schedule_sync(
|
|
tournament_id: int,
|
|
*,
|
|
fallback_name: Optional[str] = None,
|
|
now: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Schedule-only gather (no battles), ready for ``store_schedule``."""
|
|
scan = gather_structure_sync(tournament_id, fallback_name=fallback_name, now=now)
|
|
scan["battles"] = []
|
|
return scan
|
|
|
|
|
|
def build_scan_sync(
|
|
tournament_id: int,
|
|
*,
|
|
fallback_name: Optional[str] = None,
|
|
active_meta: Optional[Dict[str, Any]] = None,
|
|
now: Optional[int] = None,
|
|
battle_workers: int = _DEFAULT_BATTLE_WORKERS,
|
|
progress: Optional[Callable[[str], None]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Fetch + assemble the full authoritative structure incl. battles."""
|
|
now = now or int(time.time())
|
|
scan = gather_structure_sync(
|
|
tournament_id, fallback_name=fallback_name, active_meta=active_meta, now=now
|
|
)
|
|
matches = scan["matches"]
|
|
|
|
battle_targets: List[Dict[str, Any]] = []
|
|
seen_battle_keys = set()
|
|
for match in matches:
|
|
mid, tb = match["match_id"], match["type_bracket"]
|
|
if not mid or (mid, tb) in seen_battle_keys:
|
|
continue
|
|
seen_battle_keys.add((mid, tb))
|
|
battle_targets.append(match)
|
|
|
|
if progress:
|
|
progress(f"{len(matches)} matches; fetching battles for {len(battle_targets)} match rows")
|
|
battles = fetch_battles_for_matches(
|
|
tournament_id, battle_targets, workers=max(1, battle_workers), progress=progress
|
|
)
|
|
scan["battles"] = battles
|
|
# Recompute status: fetch_battles_for_matches may have flipped matches to
|
|
# 'technical', which changes whether the tournament reads finished.
|
|
scan["status"] = compute_status(matches, scan["date_end"], now, scan["in_active"])
|
|
return scan
|
|
|
|
|
|
def fetch_battles_for_matches(
|
|
tournament_id: int,
|
|
matches: List[Dict[str, Any]],
|
|
*,
|
|
workers: int = _DEFAULT_BATTLE_WORKERS,
|
|
progress: Optional[Callable[[str], None]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Fetch getListAllBattles rows for each match concurrently."""
|
|
if not matches:
|
|
return []
|
|
|
|
def one(match: Dict[str, Any]) -> Tuple[Dict[str, Any], Any, List[Dict[str, Any]], bool]:
|
|
mid, tb = match["match_id"], match["type_bracket"]
|
|
rows = _request(
|
|
"GET",
|
|
"getListAllBattles",
|
|
tournamentID=tournament_id,
|
|
idMatch=mid,
|
|
typeBracket=tb,
|
|
)
|
|
match_battles, technical = parse_battles(rows, tournament_id, mid, tb)
|
|
return match, rows, match_battles, technical
|
|
|
|
battles: List[Dict[str, Any]] = []
|
|
done = 0
|
|
total = len(matches)
|
|
with ThreadPoolExecutor(max_workers=max(1, workers)) as pool:
|
|
futures = [pool.submit(one, match) for match in matches]
|
|
for future in as_completed(futures):
|
|
match, rows, match_battles, technical = future.result()
|
|
fill_names_from_battles(match, rows)
|
|
if technical and match["status"] in ("pending", "bye"):
|
|
match["status"] = "technical"
|
|
battles.extend(match_battles)
|
|
done += 1
|
|
if progress and (done == total or done % 25 == 0):
|
|
progress(f"battle lookups {done}/{total}")
|
|
return battles
|
|
|
|
|
|
def reconcile_targets_sync(tid: int) -> List[Dict[str, str]]:
|
|
"""Matches that should have a replay link but don't yet: played/technical with
|
|
zero stored battles (a game we missed, or a not-yet-linked technical victory).
|
|
Byes (empty team slot) are excluded — no battle is expected."""
|
|
with _connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT m.match_id, m.type_bracket
|
|
FROM tournament_matches m
|
|
WHERE m.tournament_id = ?
|
|
AND m.status IN ('played', 'technical')
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM tournament_battles b
|
|
WHERE b.tournament_id = m.tournament_id
|
|
AND b.match_id = m.match_id
|
|
AND b.type_bracket = m.type_bracket
|
|
)
|
|
""",
|
|
(tid,),
|
|
).fetchall()
|
|
return [{"match_id": r[0], "type_bracket": r[1]} for r in rows]
|
|
|
|
|
|
def _insert_battles_sync(battles: List[Dict[str, Any]]) -> None:
|
|
"""Upsert battle rows without deleting anything (preserves fast-path links)."""
|
|
if not battles:
|
|
return
|
|
with _connect() as conn:
|
|
conn.executemany(
|
|
"""
|
|
INSERT OR REPLACE INTO tournament_battles
|
|
(session_hex, session_decimal, tournament_id, match_id,
|
|
type_bracket, position, winner_name, status_replay)
|
|
VALUES (?,?,?,?,?,?,?,?)
|
|
""",
|
|
[
|
|
(
|
|
b["session_hex"], b["session_decimal"], b["tournament_id"],
|
|
b["match_id"], b["type_bracket"], b["position"],
|
|
b["winner_name"], b["status_replay"],
|
|
)
|
|
for b in battles
|
|
],
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
async def reconcile_battles(
|
|
tid: int,
|
|
*,
|
|
now: Optional[int] = None,
|
|
force: bool = False,
|
|
workers: int = _DEFAULT_BATTLE_WORKERS,
|
|
) -> int:
|
|
"""Fill replay links for played matches that have no stored session. Calls
|
|
getListAllBattles ONLY for those gap matches. Rate-limited unless ``force``."""
|
|
now = now or int(time.time())
|
|
if not force and (_reconcile_last.get(tid, 0) + _RECONCILE_INTERVAL_SECONDS > now):
|
|
return 0
|
|
targets = await run_in_thread(reconcile_targets_sync, tid)
|
|
if not targets:
|
|
_reconcile_last[tid] = now
|
|
return 0
|
|
battles = await run_in_thread(fetch_battles_for_matches, tid, targets, workers=workers)
|
|
await run_in_thread(_insert_battles_sync, battles)
|
|
_reconcile_last[tid] = now
|
|
return len(battles)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Storage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def store_scan(scan: Dict[str, Any]) -> None:
|
|
"""Upsert one assembled scan into ``tss_tournaments.db`` transactionally."""
|
|
await run_in_thread(_store_scan_sync, scan)
|
|
|
|
|
|
def _upsert_tournament_meta(conn: sqlite3.Connection, scan: Dict[str, Any]) -> None:
|
|
tid = scan["tournament_id"]
|
|
now = scan["scanned_unix"]
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO tournaments
|
|
(tournament_id, name, format, game_mode, cluster, date_start,
|
|
date_end, team_count, match_count, status, first_seen_unix, scanned_unix)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
|
|
ON CONFLICT(tournament_id) DO UPDATE SET
|
|
name=COALESCE(excluded.name, tournaments.name),
|
|
format=excluded.format,
|
|
game_mode=COALESCE(excluded.game_mode, tournaments.game_mode),
|
|
cluster=COALESCE(excluded.cluster, tournaments.cluster),
|
|
date_start=COALESCE(excluded.date_start, tournaments.date_start),
|
|
date_end=COALESCE(excluded.date_end, tournaments.date_end),
|
|
team_count=excluded.team_count,
|
|
match_count=excluded.match_count,
|
|
status=excluded.status,
|
|
scanned_unix=excluded.scanned_unix
|
|
""",
|
|
(
|
|
tid, scan.get("name"), scan.get("format"), scan.get("game_mode"),
|
|
scan.get("cluster"), scan.get("date_start"), scan.get("date_end"),
|
|
scan.get("team_count", 0), scan.get("match_count", 0),
|
|
scan.get("status", "pending"), now, now,
|
|
),
|
|
)
|
|
|
|
|
|
def _replace_matches(conn: sqlite3.Connection, tid: int, matches: List[Dict[str, Any]]) -> None:
|
|
conn.execute("DELETE FROM tournament_matches WHERE tournament_id = ?", (tid,))
|
|
conn.executemany(
|
|
"""
|
|
INSERT OR REPLACE INTO tournament_matches
|
|
(tournament_id, match_id, type_bracket, side, round, position,
|
|
team_a_uuid, team_a_name, team_b_uuid, team_b_name, winner_name,
|
|
score_a, score_b, status, time_start)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
[
|
|
(
|
|
tid, m["match_id"], m["type_bracket"], m["side"], m["round"],
|
|
m["position"], m["team_a_uuid"], m["team_a_name"], m["team_b_uuid"],
|
|
m["team_b_name"], m["winner_name"], m["score_a"], m["score_b"],
|
|
m["status"], m["time_start"],
|
|
)
|
|
for m in matches
|
|
],
|
|
)
|
|
|
|
|
|
def _replace_standings(conn: sqlite3.Connection, tid: int, standings: List[Dict[str, Any]]) -> None:
|
|
conn.execute("DELETE FROM tournament_standings WHERE tournament_id = ?", (tid,))
|
|
conn.executemany(
|
|
"""
|
|
INSERT OR REPLACE INTO tournament_standings
|
|
(tournament_id, group_index, team_uuid, team_name, points,
|
|
wins, draws, losses, buchholz, rank)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
[
|
|
(
|
|
tid, s["group_index"], s["team_uuid"], s["team_name"],
|
|
s["points"], s["wins"], s["draws"], s["losses"],
|
|
s["buchholz"], s["rank"],
|
|
)
|
|
for s in standings
|
|
],
|
|
)
|
|
|
|
|
|
def _replace_battles(conn: sqlite3.Connection, tid: int, battles: List[Dict[str, Any]]) -> None:
|
|
# Authoritative full-scan replace. NOTE: this briefly drops any fast-path
|
|
# battle link (maybe_link_battle) for a session TSS has not yet listed in
|
|
# getListAllBattles; such a link is refilled by a later reconcile or scan.
|
|
# The debounced schedule refresh deliberately does NOT call this.
|
|
conn.execute("DELETE FROM tournament_battles WHERE tournament_id = ?", (tid,))
|
|
conn.executemany(
|
|
"""
|
|
INSERT OR REPLACE INTO tournament_battles
|
|
(session_hex, session_decimal, tournament_id, match_id,
|
|
type_bracket, position, winner_name, status_replay)
|
|
VALUES (?,?,?,?,?,?,?,?)
|
|
""",
|
|
[
|
|
(
|
|
b["session_hex"], b["session_decimal"], b["tournament_id"],
|
|
b["match_id"], b["type_bracket"], b["position"],
|
|
b["winner_name"], b["status_replay"],
|
|
)
|
|
for b in battles
|
|
],
|
|
)
|
|
|
|
|
|
def _store_scan_sync(scan: Dict[str, Any]) -> None:
|
|
"""Synchronous implementation for ``store_scan``; run off the event loop."""
|
|
tid = scan["tournament_id"]
|
|
with _connect() as conn:
|
|
_upsert_tournament_meta(conn, scan)
|
|
_replace_matches(conn, tid, scan["matches"])
|
|
_replace_battles(conn, tid, scan["battles"])
|
|
_replace_standings(conn, tid, scan["standings"])
|
|
conn.commit()
|
|
|
|
|
|
def store_schedule_sync(scan: Dict[str, Any]) -> None:
|
|
"""Persist a schedule-only refresh: meta + matches + standings.
|
|
|
|
Deliberately does NOT touch ``tournament_battles`` — those rows are
|
|
maintained incrementally by the fast-path linker and the reconcile, and a
|
|
schedule refresh that wiped them would drop every replay link.
|
|
"""
|
|
tid = scan["tournament_id"]
|
|
with _connect() as conn:
|
|
_upsert_tournament_meta(conn, scan)
|
|
_replace_matches(conn, tid, scan["matches"])
|
|
_replace_standings(conn, tid, scan["standings"])
|
|
conn.commit()
|
|
|
|
|
|
async def store_schedule(scan: Dict[str, Any]) -> None:
|
|
"""Async wrapper for ``store_schedule_sync``."""
|
|
await run_in_thread(store_schedule_sync, scan)
|
|
|
|
|
|
async def needs_scan(tournament_id: int, *, now: Optional[int] = None) -> bool:
|
|
"""True if we've never scanned this tournament, or it's live and stale.
|
|
|
|
The only permanent stop is ``status == 'finished'`` (which ``compute_status``
|
|
sets reliably once the tournament drops out of ``GetActiveTournaments``). We
|
|
deliberately do NOT gate on ``date_end``: it is only the *scheduled* end, and
|
|
scans are game-triggered — a game arriving for a tournament is itself evidence
|
|
it is still live, even days past that scheduled end. Gating on ``date_end``
|
|
froze brackets whose final was played late. Re-scans stay interval-limited.
|
|
"""
|
|
now = now or int(time.time())
|
|
row = _scan_state_sync(tournament_id)
|
|
if row is None:
|
|
return True
|
|
status, _date_end, scanned_unix = row
|
|
if status == "finished":
|
|
return False
|
|
return (scanned_unix or 0) + RESCAN_INTERVAL_SECONDS <= now
|
|
|
|
|
|
def _scan_state_sync(tournament_id: int) -> Optional[Tuple[str, Optional[int], Optional[int]]]:
|
|
with _connect() as conn:
|
|
return conn.execute(
|
|
"SELECT status, date_end, scanned_unix FROM tournaments WHERE tournament_id = ?",
|
|
(tournament_id,),
|
|
).fetchone()
|
|
|
|
|
|
_db_ready = False
|
|
_inflight: set = set()
|
|
_tasks: set = set()
|
|
_RECONCILE_INTERVAL_SECONDS = 600
|
|
_reconcile_last: Dict[int, int] = {}
|
|
_REFRESH_DEBOUNCE_SECONDS = 30
|
|
_refresh_tasks: Dict[int, "asyncio.Task"] = {}
|
|
_refresh_names: Dict[int, Optional[str]] = {}
|
|
|
|
|
|
async def _ensure_db() -> None:
|
|
global _db_ready
|
|
if not _db_ready:
|
|
await init_tss_tournaments_db()
|
|
_db_ready = True
|
|
|
|
|
|
def link_battle_sync(
|
|
tid: int,
|
|
match_id: str,
|
|
session_hex: str,
|
|
*,
|
|
session_decimal: Optional[str] = None,
|
|
winner_name: Optional[str] = None,
|
|
) -> bool:
|
|
"""Link a captured replay to its match using the game's own ``match_id``.
|
|
|
|
Returns True if the match exists locally (battle inserted), False otherwise
|
|
so the caller can fall back to a full scan. No network access.
|
|
"""
|
|
with _connect() as conn:
|
|
rows = conn.execute(
|
|
"SELECT type_bracket, status "
|
|
"FROM tournament_matches WHERE tournament_id = ? AND match_id = ?",
|
|
(tid, match_id),
|
|
).fetchall()
|
|
if not rows:
|
|
return False
|
|
# Prefer a still-pending row when a match_id spans >1 type_bracket (rare).
|
|
chosen = next((r for r in rows if r[1] == "pending"), rows[0])
|
|
type_bracket = chosen[0]
|
|
if session_decimal is None:
|
|
try:
|
|
session_decimal = str(int(session_hex, 16))
|
|
except (ValueError, TypeError):
|
|
session_decimal = None
|
|
conn.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO tournament_battles
|
|
(session_hex, session_decimal, tournament_id, match_id,
|
|
type_bracket, position, winner_name, status_replay)
|
|
VALUES (?,?,?,?,?,?,?,?)
|
|
""",
|
|
(session_hex, session_decimal, tid, match_id, type_bracket, None,
|
|
winner_name, "view replay"),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
|
|
|
|
async def maybe_link_battle(game: Dict[str, Any]) -> None:
|
|
"""Trigger entry point for each received game: link its replay to the bracket
|
|
slot from ``tss.match_id`` (zero TSS calls) and request a debounced schedule
|
|
refresh. Falls back to a full scan when the match is not yet indexed."""
|
|
tss = game.get("tss") or {}
|
|
raw_tid = tss.get("tournament_id")
|
|
match_id = tss.get("match_id")
|
|
try:
|
|
tid = int(raw_tid)
|
|
except (TypeError, ValueError):
|
|
return
|
|
if tid <= 0:
|
|
return
|
|
await _ensure_db()
|
|
|
|
session_hex = str(game.get("_id") or "")
|
|
name = tss.get("tournament_name") or None
|
|
winner_slot = str(game.get("winner") or "")
|
|
winner_name = None
|
|
slot = tss.get(winner_slot) if winner_slot in ("1", "2") else None
|
|
if isinstance(slot, dict):
|
|
winner_name = slot.get("name")
|
|
|
|
if match_id and session_hex:
|
|
linked = await run_in_thread(
|
|
link_battle_sync, tid, str(match_id), session_hex, winner_name=winner_name
|
|
)
|
|
if linked:
|
|
request_schedule_refresh(tid, fallback_name=name)
|
|
return
|
|
# Unknown match (or missing match_id): fall back to the existing full scan.
|
|
await maybe_scan_tournament(game)
|
|
|
|
|
|
async def maybe_scan_tournament(game: Dict[str, Any]) -> None:
|
|
"""Trigger entry point: schedule a background scan if this game's tournament
|
|
is new or live-and-stale. Non-blocking and deduped per tournament id."""
|
|
tss = game.get("tss") or {}
|
|
raw_tid = tss.get("tournament_id")
|
|
try:
|
|
tid = int(raw_tid)
|
|
except (TypeError, ValueError):
|
|
return
|
|
if tid <= 0 or tid in _inflight:
|
|
return
|
|
await _ensure_db()
|
|
if not await needs_scan(tid):
|
|
return
|
|
|
|
name = tss.get("tournament_name") or None
|
|
_inflight.add(tid)
|
|
|
|
async def _run() -> None:
|
|
try:
|
|
await scan_and_store(tid, fallback_name=name)
|
|
finally:
|
|
_inflight.discard(tid)
|
|
|
|
task = asyncio.create_task(_run())
|
|
_tasks.add(task)
|
|
task.add_done_callback(_tasks.discard)
|
|
|
|
|
|
async def scan_and_store(
|
|
tournament_id: int, *, fallback_name: Optional[str] = None
|
|
) -> None:
|
|
"""Fetch (off-thread) and persist one tournament. Safe to fire-and-forget."""
|
|
try:
|
|
scan = await run_in_thread(build_scan_sync, tournament_id, fallback_name=fallback_name)
|
|
await store_scan(scan)
|
|
log.info(
|
|
"scanned tournament %s: %d matches, %d battles, %d standings (%s)",
|
|
tournament_id, scan["match_count"], len(scan["battles"]),
|
|
len(scan["standings"]), scan["status"],
|
|
)
|
|
except Exception as exc: # pragma: no cover - network/transient
|
|
log.error("tournament scan failed for %s: %s", tournament_id, exc)
|
|
|
|
|
|
def request_schedule_refresh(tid: int, *, fallback_name: Optional[str] = None) -> None:
|
|
"""Schedule a debounced schedule-only refresh for ``tid``. Repeated calls
|
|
within the debounce window collapse into the single pending refresh."""
|
|
if fallback_name and not _refresh_names.get(tid):
|
|
_refresh_names[tid] = fallback_name
|
|
if tid in _refresh_tasks and not _refresh_tasks[tid].done():
|
|
return # a refresh is already pending; this game folds into it
|
|
task = asyncio.create_task(_run_schedule_refresh(tid))
|
|
_refresh_tasks[tid] = task
|
|
_tasks.add(task)
|
|
task.add_done_callback(_tasks.discard)
|
|
|
|
|
|
async def _run_schedule_refresh(tid: int) -> None:
|
|
try:
|
|
await asyncio.sleep(_REFRESH_DEBOUNCE_SECONDS)
|
|
name = _refresh_names.pop(tid, None)
|
|
scan = await run_in_thread(fetch_schedule_sync, tid, fallback_name=name)
|
|
await store_schedule(scan)
|
|
await reconcile_battles(tid, force=(scan["status"] == "finished"))
|
|
log.info("schedule refresh %s: %d matches (%s)",
|
|
tid, scan["match_count"], scan["status"])
|
|
except Exception as exc: # pragma: no cover - network/transient
|
|
log.error("schedule refresh failed for %s: %s", tid, exc)
|
|
finally:
|
|
_refresh_tasks.pop(tid, None)
|
|
|
|
|
|
async def run_in_thread(fn, *args, **kwargs):
|
|
"""Run a blocking callable without relying on asyncio's default executor."""
|
|
loop = asyncio.get_running_loop()
|
|
fut = loop.create_future()
|
|
|
|
def _target() -> None:
|
|
try:
|
|
result = fn(*args, **kwargs)
|
|
except Exception as exc: # pragma: no cover - pass-through
|
|
loop.call_soon_threadsafe(fut.set_exception, exc)
|
|
else:
|
|
loop.call_soon_threadsafe(fut.set_result, result)
|
|
|
|
threading.Thread(target=_target, daemon=True).start()
|
|
return await fut
|