""" wl.py Win/Loss tracking database. Manages the wl.db SQLite database which stores per-squadron win/loss/draw records and match history. Post clan_id migration: wl_standings is keyed by clan_id (stable) with the squadron short tag denormalized for display and joins. Callers still pass short squadron tags from the replay JSON; this module resolves them to clan_id via squadrons.db internally and returns dicts keyed by the input squadron text so the renderer doesn't need to change. """ # Standard Library Imports import asyncio import hashlib import json import logging import os import sqlite3 import time from pathlib import Path from typing import Dict, List, Optional, Tuple from .utils import STORAGE_DIR WL_DB = STORAGE_DIR / "wl.db" SQUADRONS_DB = STORAGE_DIR / "squadrons.db" _DB_PATH: str = str(WL_DB) _INITIALIZED = False # Bounded cache for squadron-name → clan_id lookups. Squadrons rarely change # clan_id, so a 5-minute TTL is plenty and keeps the hot path off disk. _CLAN_ID_CACHE: dict[str, Optional[int]] = {} _CLAN_ID_CACHE_AT: float = 0.0 _CLAN_ID_CACHE_TTL = 300.0 # ============================================================================ # DATABASE INITIALIZATION # ============================================================================ def init(db_path: Optional[str] = None) -> None: """Initialize the W/L database.""" global _DB_PATH, _INITIALIZED _DB_PATH = db_path or str(WL_DB) _ensure_db() _INITIALIZED = True def wl_bootstrap(): """Bootstrap the W/L database (alias for init).""" init(str(WL_DB)) def _conn() -> sqlite3.Connection: """Create a database connection.""" con = sqlite3.connect(_DB_PATH, timeout=10, isolation_level=None) con.execute("PRAGMA journal_mode=WAL;") con.execute("PRAGMA foreign_keys=ON;") return con def _ensure_db() -> None: """Ensure database tables exist (clan_id-keyed schema).""" with _conn() as con: con.executescript(""" CREATE TABLE IF NOT EXISTS wl_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, external_id TEXT UNIQUE, session_id TEXT, winner TEXT NOT NULL, winner_clan_id INTEGER, squads_json TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS wl_standings ( clan_id INTEGER PRIMARY KEY, squadron TEXT NOT NULL, wins INTEGER NOT NULL DEFAULT 0, losses INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_wl_standings_squadron ON wl_standings(squadron); """) # Best-effort ALTER for installs that predate the winner_clan_id column. cols = {r[1] for r in con.execute("PRAGMA table_info(wl_events)").fetchall()} if "winner_clan_id" not in cols: try: con.execute("ALTER TABLE wl_events ADD COLUMN winner_clan_id INTEGER") except sqlite3.OperationalError: pass # ============================================================================ # INTERNAL HELPERS # ============================================================================ def _normalize(s: Optional[str]) -> Optional[str]: """Normalize squadron name for consistency.""" if not s: return None s = s.strip() return s if s and s.upper() != "UNKNOWN" else None def _resolve_clan_id(name: str) -> Optional[int]: """Resolve a squadron short_name / long_name / tag_name → clan_id. Returns None if the squadron can't be found in squadrons_data (orphan). Cached with a TTL since squadrons_data is the source of truth. """ global _CLAN_ID_CACHE, _CLAN_ID_CACHE_AT now = time.time() if now - _CLAN_ID_CACHE_AT > _CLAN_ID_CACHE_TTL: _CLAN_ID_CACHE = {} _CLAN_ID_CACHE_AT = now key = (name or "").strip().lower() if not key or key == "unknown": return None if key in _CLAN_ID_CACHE: return _CLAN_ID_CACHE[key] cid: Optional[int] = None try: with sqlite3.connect(str(SQUADRONS_DB), timeout=5) as con: row = con.execute( "SELECT clan_id FROM squadrons_data " "WHERE LOWER(short_name) = ? OR LOWER(long_name) = ? OR LOWER(tag_name) = ? " "LIMIT 1", (key, key, key), ).fetchone() if row and row[0] is not None: cid = int(row[0]) except Exception: cid = None _CLAN_ID_CACHE[key] = cid return cid def _idempotency_key(session_id: Optional[str], winner: str, squads: List[str]) -> str: """Generate a unique key to prevent duplicate entries.""" base = f"{session_id or ''}|{winner}|{','.join(sorted(squads))}" return hashlib.sha1(base.encode("utf-8")).hexdigest()[:20] def _upsert_standing( con: sqlite3.Connection, *, clan_id: int, squadron: str, wins_delta: int, losses_delta: int, ) -> None: """Upsert a wl_standings row keyed by clan_id, refreshing the squadron text.""" con.execute( """ INSERT INTO wl_standings (clan_id, squadron, wins, losses) VALUES (?, ?, ?, ?) ON CONFLICT(clan_id) DO UPDATE SET squadron = excluded.squadron, wins = wl_standings.wins + excluded.wins, losses = wl_standings.losses + excluded.losses """, (int(clan_id), squadron, int(wins_delta), int(losses_delta)), ) def _tx_upsert_event_and_apply( con: sqlite3.Connection, *, external_id: str, session_id: Optional[str], winner: str, squads: List[str], ) -> None: """Insert event and update standings within a transaction. Squadrons that can't be resolved to a clan_id (orphan / not in squadrons_data) are skipped for the standings upsert but still recorded on the event so a later rebuild can replay them. """ winner_cid = _resolve_clan_id(winner) con.execute( "INSERT OR IGNORE INTO wl_events " "(external_id, session_id, winner, winner_clan_id, squads_json) " "VALUES (?, ?, ?, ?, ?)", ( external_id, session_id, winner, winner_cid, json.dumps(squads, ensure_ascii=False), ), ) if con.execute("SELECT changes()").fetchone()[0] == 0: return # duplicate, already applied if winner_cid is not None: _upsert_standing(con, clan_id=winner_cid, squadron=winner, wins_delta=1, losses_delta=0) else: logging.warning("[W/L] winner '%s' has no clan_id; skipping standings update", winner) for los in (s for s in squads if s != winner): los_cid = _resolve_clan_id(los) if los_cid is None: logging.warning("[W/L] loser '%s' has no clan_id; skipping standings update", los) continue _upsert_standing(con, clan_id=los_cid, squadron=los, wins_delta=0, losses_delta=1) # ============================================================================ # PUBLIC API - ASYNC # ============================================================================ async def record_result( winner: str, squadrons: List[str], session_id: Optional[str] = None ) -> Dict[str, Dict[str, int]]: """ Record a match result (async). Returns updated standings for all squadrons involved (keyed by squadron text). """ if not _INITIALIZED: _ensure_db() w = _normalize(winner) sqs = [s for s in (_normalize(x) for x in squadrons) if s] if w is None or len(sqs) < 2 or w not in sqs: return {} external_id = _idempotency_key(session_id, w, sqs) def _work() -> Dict[str, Dict[str, int]]: with _conn() as con: con.execute("BEGIN IMMEDIATE") try: _tx_upsert_event_and_apply( con, external_id=external_id, session_id=session_id, winner=w, squads=sqs ) con.execute("COMMIT") except Exception: con.execute("ROLLBACK") raise return _read_standings_for(con, sqs) loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _work) async def record_draw( squadrons: List[str], session_id: Optional[str] = None ) -> Dict[str, Dict[str, int]]: """ Record a draw (async) — counts as a loss for all involved squadrons. Returns updated standings (keyed by squadron text). """ if not _INITIALIZED: _ensure_db() sqs = [s for s in (_normalize(x) for x in squadrons) if s] if len(sqs) < 2: return {} external_id = _idempotency_key(session_id, "DRAW", sqs) def _work() -> Dict[str, Dict[str, int]]: with _conn() as con: con.execute("BEGIN IMMEDIATE") try: con.execute( "INSERT OR IGNORE INTO wl_events " "(external_id, session_id, winner, winner_clan_id, squads_json) " "VALUES (?, ?, ?, ?, ?)", (external_id, session_id, "DRAW", None, json.dumps(sqs, ensure_ascii=False)), ) if con.execute("SELECT changes()").fetchone()[0] > 0: for sq in sqs: cid = _resolve_clan_id(sq) if cid is None: logging.warning( "[W/L] draw participant '%s' has no clan_id; " "skipping standings update", sq ) continue _upsert_standing( con, clan_id=cid, squadron=sq, wins_delta=0, losses_delta=1, ) con.execute("COMMIT") except Exception: con.execute("ROLLBACK") raise return _read_standings_for(con, sqs) loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _work) async def schedule_wl_update( winner: str, squadrons: List[str], session_id: Optional[str] = None ) -> Dict[str, Dict[str, int]]: """Alias for record_result (for backwards compatibility).""" return await record_result(winner, list(squadrons), session_id) # ============================================================================ # PUBLIC API - SYNC # ============================================================================ def record_result_sync( winner: str, squadrons: List[str], session_id: Optional[str] = None ) -> Dict[str, Dict[str, int]]: """ Record a match result (sync). Returns updated standings for all squadrons involved (keyed by squadron text). """ if not _INITIALIZED: _ensure_db() w = _normalize(winner) sqs = [s for s in (_normalize(x) for x in squadrons) if s] if w is None or len(sqs) < 2 or w not in sqs: return {} external_id = _idempotency_key(session_id, w, sqs) with _conn() as con: con.execute("BEGIN IMMEDIATE") try: _tx_upsert_event_and_apply( con, external_id=external_id, session_id=session_id, winner=w, squads=sqs ) con.execute("COMMIT") except Exception: con.execute("ROLLBACK") raise return _read_standings_for(con, sqs) def _wl_update_sync( winner: str, squadrons: List[str], session_id: Optional[str] = None ) -> Dict[str, Dict[str, int]]: """Alias for record_result_sync (for backwards compatibility).""" return record_result_sync(winner, list(squadrons), session_id) # ============================================================================ # PUBLIC API - QUERIES # ============================================================================ def _read_standings_for( con: sqlite3.Connection, squadrons: List[str] ) -> Dict[str, Dict[str, int]]: """Return {squadron_text: {wins, losses}} for the given input squadrons. Looks up each squadron's clan_id then reads by clan_id so a squadron that just renamed still finds its row. Falls back to a squadron-text match for orphans. """ out: Dict[str, Dict[str, int]] = {sq: {"wins": 0, "losses": 0} for sq in squadrons} if not squadrons: return out cid_by_input: Dict[str, Optional[int]] = {sq: _resolve_clan_id(sq) for sq in squadrons} cids = [c for c in cid_by_input.values() if c is not None] if cids: rows = con.execute( f"SELECT clan_id, wins, losses FROM wl_standings " f"WHERE clan_id IN ({','.join(['?']*len(cids))})", cids, ).fetchall() by_cid = {int(r[0]): (int(r[1]), int(r[2])) for r in rows} for sq, cid in cid_by_input.items(): if cid is not None and cid in by_cid: w, l = by_cid[cid] out[sq] = {"wins": w, "losses": l} # Fallback by squadron text for orphans (no clan_id resolved). orphans = [sq for sq, cid in cid_by_input.items() if cid is None] if orphans: rows = con.execute( f"SELECT squadron, wins, losses FROM wl_standings " f"WHERE squadron IN ({','.join(['?']*len(orphans))})", orphans, ).fetchall() for s, w, l in rows: out[s] = {"wins": int(w), "losses": int(l)} return out def get_standings(squadrons: List[str]) -> Dict[str, Dict[str, int]]: """Get W/L standings for a list of squadrons (keyed by squadron text).""" if not _INITIALIZED: _ensure_db() if not squadrons: return {} with _conn() as con: return _read_standings_for(con, list(squadrons)) def get_wl_counts(squadrons: List[str]) -> Dict[str, Dict[str, int]]: """Alias for get_standings (for backwards compatibility).""" return get_standings(list(squadrons)) def get_leaderboard(limit: int = 20) -> List[Tuple[str, int, int]]: """Get the W/L leaderboard sorted by win difference.""" if not _INITIALIZED: _ensure_db() with _conn() as con: return [ (str(s), int(w), int(l)) for s, w, l in con.execute(""" SELECT squadron, wins, losses FROM wl_standings ORDER BY (wins - losses) DESC, wins DESC, squadron ASC LIMIT ? """, (limit,)).fetchall() ] def build_wl_leaderboard(limit: int = 20) -> List[Tuple[str, int, int]]: """Alias for get_leaderboard (for backwards compatibility).""" return get_leaderboard(int(limit)) # ============================================================================ # MAINTENANCE # ============================================================================ def rebuild_from_events() -> None: """Rebuild standings from events (for data recovery).""" if not _INITIALIZED: _ensure_db() with _conn() as con: con.execute("BEGIN IMMEDIATE") try: con.execute("DELETE FROM wl_standings") for ext_id, session_id, winner, squads_json in con.execute( "SELECT external_id, session_id, winner, squads_json " "FROM wl_events ORDER BY id ASC" ): squads = json.loads(squads_json) if winner == "DRAW": for sq in squads: cid = _resolve_clan_id(sq) if cid is None: continue _upsert_standing( con, clan_id=cid, squadron=sq, wins_delta=0, losses_delta=1, ) else: _tx_upsert_event_and_apply( con, external_id=ext_id, session_id=session_id, winner=winner, squads=squads, ) con.execute("COMMIT") except Exception: con.execute("ROLLBACK") raise def clean_WL() -> None: """Clear all W/L data (events and standings).""" con = sqlite3.connect(str(_DB_PATH), timeout=10, isolation_level=None) try: con.execute("BEGIN IMMEDIATE") con.execute("DELETE FROM wl_events") con.execute("DELETE FROM wl_standings") con.execute("COMMIT") except Exception: con.execute("ROLLBACK") raise finally: con.close()