""" game_api.py War Thunder game API client. Handles JWT authentication, clan/squadron data retrieval, leaderboard fetching, player points tracking, and squadron member points diff calculation with SQLite caching for replay idempotency. """ # Standard Library Imports import asyncio import json import logging import os import sqlite3 import time from pathlib import Path from typing import Any, Dict, Tuple # Third-Party Library Imports import aiofiles import aiohttp import aiosqlite from dotenv import load_dotenv # DAGOR_FILES lives in BOTS/SHARED; BOT/__init__.py already put it on sys.path. from DAGOR_FILES.WtFileUtils.blk.BlkParser import BlkDecoder # Local Module Imports from .utils import CACHE_DIR, STORAGE_DIR, compress_json, decompress_json load_dotenv() session = None CHAR_URL = os.getenv("WT_CHAR_URL", "") # Auth file - use the required shared storage root AUTH_DIR = STORAGE_DIR / "AUTH" _auth_new_location = AUTH_DIR / "auth_JWT.json" _auth_old_location = Path(__file__).parent / "auth_JWT.json" AUTH_FILE = _auth_new_location if _auth_new_location.exists() else _auth_old_location SQ_DB_PATH = STORAGE_DIR / "squadrons.db" _POINTS_DB = STORAGE_DIR / "points.db" _POINTS_DB_READY = False # In-process cache of squadron-name → clan_id resolutions used by _apply_game_points. # Cleared periodically — squadrons_data is the authority and renames are rare. _CLAN_ID_CACHE: dict[str, int] = {} _CLAN_ID_CACHE_AT: float = 0.0 _CLAN_ID_CACHE_TTL = 300.0 def _resolve_clan_id_for_points(squadron_name: str) -> int: """Resolve a squadron's clan_id by name (long_name preferred, then short_name). Returns -1 if not found - the caller treats this as "orphan" but still persists data under that sentinel so it isn't lost. """ global _CLAN_ID_CACHE, _CLAN_ID_CACHE_AT now = time.time() if now - _CLAN_ID_CACHE_AT > _CLAN_ID_CACHE_TTL: _CLAN_ID_CACHE = {} _CLAN_ID_CACHE_AT = now key = (squadron_name or "").lower() if not key: return -1 cached = _CLAN_ID_CACHE.get(key) if cached is not None: return cached try: with sqlite3.connect(SQ_DB_PATH) as con: row = con.execute( "SELECT clan_id FROM squadrons_data " "WHERE LOWER(long_name) = ? OR LOWER(short_name) = ? LIMIT 1", (key, key), ).fetchone() cid = int(row[0]) if row and row[0] is not None else -1 except Exception: cid = -1 _CLAN_ID_CACHE[key] = cid return cid def _points_db_init(db_path: str | None = None) -> None: """Initialize the points SQLite database schema. Creates the ``profile_member_points``, ``profile_totals``, and ``game_cache`` tables if they do not already exist, and enables WAL journal mode. Subsequent calls are no-ops unless ``db_path`` changes the target file. Args: db_path: Optional override for the database file path. Defaults to ``_POINTS_DB`` under the required HC storage volume. """ global _POINTS_DB, _POINTS_DB_READY if db_path: _POINTS_DB = db_path if _POINTS_DB_READY: return # Schema is keyed on clan_id post-clan_id migration. Fresh installs get the # new shape directly. Existing installs were rebuilt by scripts/migrate_clan_id.py. with sqlite3.connect(_POINTS_DB) as con: con.executescript(""" PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL; PRAGMA foreign_keys=ON; CREATE TABLE IF NOT EXISTS profile_member_points( clan_id INTEGER NOT NULL, squadron TEXT NOT NULL, uid TEXT NOT NULL, points INTEGER NOT NULL, PRIMARY KEY (clan_id, uid) ); CREATE INDEX IF NOT EXISTS idx_pmp_squadron ON profile_member_points(squadron); CREATE TABLE IF NOT EXISTS profile_totals( clan_id INTEGER PRIMARY KEY, squadron TEXT NOT NULL, total INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS game_cache( game_id TEXT NOT NULL, clan_id INTEGER NOT NULL, squadron TEXT NOT NULL, diffs_json TEXT NOT NULL, diff_total INTEGER NOT NULL, updated_json TEXT NOT NULL, created_at REAL NOT NULL DEFAULT (strftime('%s','now')), PRIMARY KEY (game_id, clan_id) ); """) _POINTS_DB_READY = True def _points_conn() -> sqlite3.Connection: """Open a WAL-mode SQLite connection to the points database. Returns: A ``sqlite3.Connection`` with ``isolation_level=None`` (autocommit), WAL journal mode, and ``NORMAL`` synchronous pragma already set. """ con = sqlite3.connect(_POINTS_DB, timeout=10, isolation_level=None) con.execute("PRAGMA journal_mode=WAL;") con.execute("PRAGMA synchronous=NORMAL;") return con def _apply_game_points( game_id: str, squadron: str, team_uids: set[str], curr_points_by_uid: dict[str, int], curr_total: int, ) -> tuple[dict[str, int], int, dict[str, int]]: """ Returns (diffs, diff_total, updated_snapshot) diffs: {uid: delta} with rule: if previous==0 → delta=0 diff_total: curr_total - prev_total updated: {uid: curr, ..., "total": curr_total} Cached by (game_id, squadron) for replay idempotency. """ team_uids = {str(u) for u in team_uids} clan_id = _resolve_clan_id_for_points(squadron) with _points_conn() as con: # Cache hit? row = con.execute( "SELECT diffs_json, diff_total, updated_json FROM game_cache WHERE game_id=? AND clan_id=?", (game_id, clan_id) ).fetchone() if row: return decompress_json(row[0]), int(row[1]), decompress_json(row[2]) con.execute("BEGIN IMMEDIATE") try: prev_rows = [] if team_uids: placeholders = ",".join("?" * len(team_uids)) prev_rows = con.execute( f"SELECT uid, points FROM profile_member_points WHERE clan_id=? AND uid IN ({placeholders})", (clan_id, *team_uids) ).fetchall() prev = {u: int(p) for u, p in prev_rows} prev_total_row = con.execute( "SELECT total FROM profile_totals WHERE clan_id=?", (clan_id,) ).fetchone() prev_total = int(prev_total_row[0]) if prev_total_row else 0 # Compute per-UID diffs (preserve rule) diffs: dict[str, int] = {} updated: dict[str, int] = {} for uid in team_uids: curr = int(curr_points_by_uid.get(uid, 0)) was = int(prev.get(uid, 0)) diffs[uid] = 0 if was == 0 else (curr - was) updated[uid] = curr diff_total = int(curr_total) - prev_total updated["total"] = int(curr_total) # Debug logging for zero squadron diff with non-zero player diffs if diff_total == 0 and any(d != 0 for d in diffs.values()): logging.warning( f"[POINTS-DEBUG] Squadron diff=0 but players changed! " f"game={game_id}, squadron={squadron}, " f"curr_total={curr_total}, prev_total={prev_total}, " f"player_diffs={diffs}" ) # Upsert touched members if team_uids: con.executemany( """INSERT INTO profile_member_points (clan_id, squadron, uid, points) VALUES (?,?,?,?) ON CONFLICT(clan_id, uid) DO UPDATE SET points=excluded.points, squadron=excluded.squadron""", [(clan_id, squadron, uid, int(curr_points_by_uid.get(uid, 0))) for uid in team_uids] ) # Upsert total con.execute( """INSERT INTO profile_totals (clan_id, squadron, total) VALUES (?,?,?) ON CONFLICT(clan_id) DO UPDATE SET total=excluded.total, squadron=excluded.squadron""", (clan_id, squadron, int(curr_total)) ) # Cache this game's result con.execute( "INSERT INTO game_cache (game_id, clan_id, squadron, diffs_json, diff_total, updated_json, created_at) VALUES (?,?,?,?,?,?,?)", (game_id, clan_id, squadron, compress_json(diffs), int(diff_total), compress_json(updated), time.time()) ) con.execute("COMMIT") except Exception: con.execute("ROLLBACK"); raise return diffs, diff_total, updated def join_duplicate_keys(ordered_pairs): """JSON object-pairs hook that merges duplicate keys into lists. When the same key appears more than once in a JSON object, standard ``json.loads`` keeps only the last value. This hook collects all values for a repeated key into a list instead. Args: ordered_pairs: Sequence of ``(key, value)`` pairs produced by ``json.JSONDecoder``. Returns: A dict where single-occurrence keys map to their value and multi-occurrence keys map to a list of values. """ d = {} for k, v in ordered_pairs: if k in d: if type(d[k]) == list: d[k].append(v) else: newlist = [] newlist.append(d[k]) newlist.append(v) d[k] = newlist else: d[k] = v return d # --- BEGIN dump helper --- def _dump_failed_payload(status: int, content_type: str, raw: bytes, clan_name: str) -> None: """Save a failed API response payload to disk for debugging. If the server returned JSON, saves a pretty-printed JSON file. Otherwise, saves a text file with headers and raw bytes appended. Files are written to the CACHE_DIR directory. Args: status: HTTP status code from the response. content_type: Content-Type header value. raw: Raw response bytes. clan_name: Clan name used in the request (used for filename). """ out_dir = CACHE_DIR try: if raw[:1] in (b"{", b"[") or "json" in (content_type or "").lower(): try: body = json.loads(raw.decode("utf-8", "replace")) except Exception: body = {"_note": "Failed to decode JSON. UTF-8 text follows.", "_text": raw.decode("utf-8", "replace")} payload = { "note": "Unexpected JSON from cln_clan_get", "status": status, "content_type": content_type, "clan": clan_name, "body": body, } (out_dir / "failed_points_api.json").write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8" ) else: with open(out_dir / "failed_points_api.txt", "wb") as f: f.write(f"status: {status}\n".encode("ascii", "ignore")) f.write(f"content-type: {content_type}\n".encode("ascii", "ignore")) f.write(f"clan: {clan_name}\n\n".encode("ascii", "ignore")) f.write(b"raw-bytes:\n") f.write(raw) except Exception: # Don’t let dumping mask the original error logging.exception("Failed to dump unexpected payload") # --- END dump helper --- class ClanInfoError(Exception): """Raised when we fail to fetch or parse clan info.""" async def bin_blk_to_json(data: bytes) -> Dict[str, Any]: """ Parse the War Thunder BLK blob into a Python dict. Raises ClanInfoError on ANY parse failure. """ try: return BlkDecoder(data).to_dict() except Exception: # don’t log the raw KeyError:123 here—just wrap it raise ClanInfoError("BLK payload malformed or squadron doesn’t exist") def api_init_session(): """Lazily initialize the module-level ``aiohttp.ClientSession``. Creates a new session only if one does not already exist. Must be called inside a running asyncio event loop. """ global session if not session: session = aiohttp.ClientSession() async def obtain_clans_leaderboard(start=0, count=5): """Fetch the squadron leaderboard sorted by era-5 historical rating. Args: start: Zero-based offset into the leaderboard. count: Number of squadrons to retrieve. Returns: A list of dicts, each containing squadron metadata (``clan_id``, ``tag``, ``long_name``, ``clanrating``, ``members``, etc.). Raises: Exception: If the HTTP request returns a non-200 status. ClanInfoError: If the BLK response cannot be parsed. """ api_init_session() logging.info('Loading squadrons leaderboard %s:%s', start, count) headers = { 'action': 'cln_clan_get_leaderboard', 'token': json.load(open(AUTH_FILE))['jwt'], 'start': str(start), 'count': str(count), 'sortField': 'dr_era5_hist', 'shortMode': 'off', } if session: async with session.get(CHAR_URL, headers=headers) as res: if res.status != 200: raise Exception('cln_clan_get_leaderboard HTTP '+str(res.status)) data = await bin_blk_to_json(await res.read()) # Unwrap root if present if 'root' in data: data = data['root'] result = [] for clan in data['clan']: astat = clan.get('astat', {}) result.append({ 'position': clan.get('pos', 0), 'clan_id': clan['_id'], 'tag': clan['tag'], 'short_name': clan['tag'][1:-1], 'long_name': clan['name'], 'description': clan.get('desc', ''), 'slogan': clan.get('slogan', ''), 'region': clan.get('region', ''), 'members': clan.get('members_cnt', 0), 'clanrating': astat.get('dr_era5_hist', 0), 'wins': astat.get('wins_hist', 0), 'battles': astat.get('battles_hist', 0), 'a_kills': astat.get('akills_hist', 0), 'g_kills': astat.get('gkills_hist', 0), 'deaths': astat.get('deaths_hist', 0), 'playtime': astat.get('ftime_hist', 0), }) return result async def save_squadrons_to_db(count: int = 1000): """Fetch top squadrons via Game API and upsert into squadrons.db. Performs schema migration for missing columns, inserts or updates squadron records, and detects season resets by comparing old vs new total ratings (>50% drop threshold). When a season reset is detected, resets all member points in the squadron_members table. Args: count: Number of top squadrons to fetch from the leaderboard. Returns: Number of squadrons inserted or updated. """ logging.info(f"[SQ-DB] Fetching top {count} squadrons via Game API...") try: clans = await obtain_clans_leaderboard(start=0, count=count) except Exception as e: logging.error(f"[SQ-DB] Failed to fetch leaderboard: {e}") raise if not clans: logging.warning("[SQ-DB] No clans returned from API") return 0 # Initialize DB + ensure schema async with aiosqlite.connect(SQ_DB_PATH, timeout=30.0) as db: await db.execute("PRAGMA busy_timeout=30000;") await db.execute(""" CREATE TABLE IF NOT EXISTS squadrons_data ( clan_id INTEGER PRIMARY KEY, long_name TEXT UNIQUE, short_name TEXT UNIQUE, tag_name TEXT ) """) await db.commit() expected_cols = { "position": "INTEGER", "description": "TEXT", "slogan": "TEXT", "region": "TEXT", "members": "INTEGER", "wins": "INTEGER", "battles": "INTEGER", "a_kills": "INTEGER", "g_kills": "INTEGER", "deaths": "INTEGER", "playtime": "INTEGER", "clanrating": "INTEGER", } async with db.execute("PRAGMA table_info(squadrons_data);") as cursor: existing_cols = {row[1] for row in await cursor.fetchall()} for col, typ in expected_cols.items(): if col not in existing_cols: try: await db.execute(f"ALTER TABLE squadrons_data ADD COLUMN {col} {typ}") logging.info(f"[SQ-DB] Added missing column: {col} ({typ})") except Exception as e: logging.warning(f"[SQ-DB] Could not add column {col}: {e}") await db.commit() # Snapshot old total before reset to detect season resets later. async with db.execute("SELECT COALESCE(SUM(clanrating), 0) FROM squadrons_data") as cursor: row = await cursor.fetchone() old_total = row[0] if row is not None else 0 # Reset both rating and position before inserting fresh data so clans # that drop out of the current top-N do not keep a stale rank. await db.execute("UPDATE squadrons_data SET clanrating = 0, position = NULL") await db.commit() # Insert/update clans inserted = 0 for clan in clans: try: await db.execute(""" INSERT OR REPLACE INTO squadrons_data ( clan_id, position, long_name, short_name, tag_name, description, slogan, region, members, wins, battles, a_kills, g_kills, deaths, playtime, clanrating ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( clan['clan_id'], clan['position'], clan['long_name'], clan['short_name'], clan['tag'], clan['description'], clan['slogan'], clan['region'], clan['members'], clan['wins'], clan['battles'], clan['a_kills'], clan['g_kills'], clan['deaths'], clan['playtime'], clan['clanrating'], )) inserted += 1 except Exception as e: logging.error(f"[SQ-DB] Failed to insert {clan.get('long_name')}: {e}") await db.commit() # Season-reset detection: if total points dropped by more than 50% from a # meaningful baseline, the season rolled over. Reset per-member points so # the squadron profile pages don't show stale pre-season values while the # slow points-loop catches up. async with db.execute("SELECT COALESCE(SUM(clanrating), 0) FROM squadrons_data") as cursor: row = await cursor.fetchone() new_total = row[0] if row is not None else 0 if old_total > 10000 and new_total < old_total * 0.5: logging.warning( f"[SQ-DB] Season reset detected (old={old_total}, new={new_total}). " "Resetting squadron_members.points to 0." ) await db.execute("UPDATE squadron_members SET points = 0") await db.commit() logging.info("[SQ-DB] Member points reset complete.") async with db.execute(""" SELECT COUNT(*) AS total_rows, SUM(CASE WHEN clanrating > 0 THEN 1 ELSE 0 END) AS active_rows, SUM(CASE WHEN clanrating = 0 AND position IS NOT NULL THEN 1 ELSE 0 END) AS stale_ranked_rows FROM squadrons_data """) as cursor: stats_row = await cursor.fetchone() total_rows = stats_row[0] if stats_row else 0 active_rows = stats_row[1] if stats_row else 0 stale_ranked_rows = stats_row[2] if stats_row else 0 logging.info( "[SQ-DB] Post-refresh stats: total=%s active=%s stale_ranked=%s", total_rows, active_rows, stale_ranked_rows, ) logging.info(f"[SQ-DB] Done. Inserted/updated {inserted} squadrons.") return inserted async def obtain_clan_new_points( clan_name: str, max_retries: int = 5, base_wait: float = 2.0, ) -> Tuple[Dict[str, Dict[str, Any]], int]: """Fetch clan member points by long name from the War Thunder API. Parses the BLK payload, unwraps nested wrappers, and extracts rounded ``dr_era5_hist`` for each member. Retries with exponential backoff on 403 or network errors. Args: clan_name: The squadron's long (display) name. max_retries: Maximum number of retry attempts. base_wait: Base wait time in seconds for exponential backoff. Returns: Tuple of ({uid: {"nick": str, "points": int}, ...}, total_points). Raises: ClanInfoError: If all retries are exhausted or the response is unparseable. """ if not clan_name or not clan_name.strip(): raise ValueError("clan_name must be a non-empty string") # 1) Load JWT try: async with aiofiles.open(AUTH_FILE, "r", encoding="utf-8") as f: jwt = json.loads(await f.read())["jwt"] except Exception as e: logging.error("Failed to load JWT from %s", AUTH_FILE, exc_info=e) raise ClanInfoError("Authentication token load failed") from e headers = {"action": "cln_clan_get", "token": jwt, "clanName": clan_name} raw = b"" status = 0 content_type = "" attempt = 0 while attempt < max_retries: try: async with aiohttp.ClientSession() as session: async with session.get( CHAR_URL, headers=headers ) as res: raw = await res.read() status = res.status content_type = res.headers.get("Content-Type", "") # Retry on 403 Forbidden if status == 403: raise aiohttp.ClientResponseError( res.request_info, res.history, status=status, message="Forbidden" ) res.raise_for_status() # Inspect JSON for token expiry if raw[:1] in (b"{", b"[") or "json" in content_type.lower(): try: j = json.loads(raw.decode("utf-8", "replace")) except Exception: j = {} err = (j.get("result") or {}).get("error") if err in ("TOKEN_EXPIRED", "INVALID_TOKEN"): logging.warning( f"JWT ERROR ({err}) for '%s'; refreshing token and exiting early.", clan_name, ) try: await get_JWT() logging.info("JWT refreshed at %s", AUTH_FILE) except Exception: logging.exception("JWT refresh failed") return {}, 0 try: _dump_failed_payload(status, content_type, raw, clan_name) except Exception: pass raise ClanInfoError(f"Server returned JSON (error={err!r})") except (aiohttp.ClientError, aiohttp.ClientResponseError) as e: attempt += 1 wait = base_wait * attempt await asyncio.sleep(wait) continue # try again with new proxy # 2) Parse BLK if we got here without exceptions try: data = await bin_blk_to_json(raw) # unwrap single-key envelopes while isinstance(data, dict) and "member_ratings" not in data and len(data) == 1: data = next(iter(data.values())) # A real clan record always carries a `members` roster. Deleted # clans come back as JSON (handled above), and truncated payloads # raise inside bin_blk_to_json, so a parsed BLK without `members` # is genuinely unavailable. if not isinstance(data, dict) or "members" not in data: raise KeyError("members") # After a season reset the game API can omit the aggregate `astat` # block and/or the per-member `member_ratings` block for inactive # squadrons even though the clan and its roster still exist. Default # both to zero so the squadron renders at 0 points instead of being # reported as deleted. total_score = (data.get("astat") or {}).get("dr_era5_hist", 0) raw_ratings = data.get("member_ratings") or {} except ClanInfoError as e: try: _dump_failed_payload(status, content_type, raw, clan_name) except Exception: pass raise ClanInfoError(f"Squadron '{clan_name}' unavailable or deleted") from e except (TypeError, KeyError) as e: try: _dump_failed_payload(status, content_type, raw, clan_name) except Exception: pass raise ClanInfoError(f"Squadron '{clan_name}' unavailable or deleted") from e # 3) Build uid→nick lookup uid_to_nick = { str(m.get("uid", "")): m.get("nick", "") for m in data.get("members", []) } # 4) Build full ratings including zero-point members full_ratings: Dict[str, Dict[str, Any]] = {} for uid, nick in uid_to_nick.items(): pts = int(round(raw_ratings.get(uid, {}).get("dr_era5_hist", 0))) full_ratings[uid] = {"nick": nick, "points": pts} # 5) Sort descending by points sorted_items = sorted( full_ratings.items(), key=lambda kv: kv[1]["points"], reverse=True, ) return dict(sorted_items), total_score # Retries exhausted raise ClanInfoError( f"Failed to fetch points for '{clan_name}' after {max_retries} attempts" ) async def get_point_diff( game_id: str, team: Dict[str, Any], ) -> Tuple[Dict[str, int], int, Dict[str, int]]: """ Fast per-game diffs using SQLite (no JSON files). Returns (per-UID diffs, total diff, updated snapshot). """ _points_db_init(str(_POINTS_DB)) squadron = team["squadron_long"] team_uids = { str(p["uid"]) for p in team.get("players", []) } # Pull current snapshot from your (already correct) parser clan_ratings, total_score = await obtain_clan_new_points(squadron) # returns {uid:{"nick","points"}}, total. :contentReference[oaicite:2]{index=2} if not clan_ratings and total_score == 0: # same contract as before: caller can retry if token was refreshed return {}, 0, {} # per-UID points for this team curr_points_by_uid = { uid: int((clan_ratings.get(uid) or {}).get("points", 0)) for uid in team_uids } diffs, diff_total, updated = _apply_game_points( game_id=game_id, squadron=squadron, team_uids=team_uids, curr_points_by_uid=curr_points_by_uid, curr_total=int(total_score), ) return diffs, diff_total, updated def _clean_player_data(data: dict) -> dict: """ Clean up raw player data to only include essential fields. Returns a dict with: - nick: Player nickname - userID: Player user ID - clanTag: Clan tag (if in clan) - clanName: Clan name (if in clan) - clanID: Clan ID (if in clan) - lastDay: Unix timestamp of last login - vehicles: List of vehicles with stats from arcade/historical/simulation total sections """ root = data[0].get("root", {}) if isinstance(data, list) and len(data) > 0 else data.get("root", {}) cleaned = { "nick": root.get("nick", ""), "userID": root.get("userid", ""), "clanTag": root.get("clanTag", ""), "clanName": root.get("clanName", ""), "clanID": root.get("clanId", ""), "lastDay": root.get("lastDay", 0), "vehicles": [] } # Extract vehicles from userstat > arcade/historical/simulation > total userstat = root.get("userstat", {}) game_modes = ["arcade", "historical", "simulation"] for mode in game_modes: mode_data = userstat.get(mode, {}) total = mode_data.get("total", {}) for vehicle_name, vehicle_stats in total.items(): # Create vehicle entry vehicle_entry = { "name": vehicle_name, "mode": mode, "flyouts": vehicle_stats.get("flyouts", 0), "was_in_session": vehicle_stats.get("was_in_session", 0), } # Include all other stats that exist for key, value in vehicle_stats.items(): if key not in ["flyouts", "was_in_session"]: vehicle_entry[key] = value cleaned["vehicles"].append(vehicle_entry) return cleaned async def obtain_player_data_api(player_uid: str, raw: bool = False): """ Fetch player info by uid, parse the BLK payload Args: player_uid: The player's UID raw: If True, return raw parsed data. If False, return cleaned data with only essential fields. If the server returns JSON with error TOKEN_EXPIRED, refresh the JWT via get_JWT() and exit early by returning {} so the caller can retry later. """ if not player_uid or not player_uid.strip(): raise ValueError("player_uid must be a non-empty string") # 1) Load JWT try: async with aiofiles.open(AUTH_FILE, "r", encoding="utf-8") as f: jwt = json.loads(await f.read())["jwt"] except Exception as e: logging.error("Failed to load JWT from %s", AUTH_FILE, exc_info=e) raise ClanInfoError("Authentication token load failed") from e # 2) Fetch raw payload headers = {"action": "ano_get_public_userstat", "token": jwt, "userid": player_uid} raw_payload = b"" status = 0 content_type = "" try: async with aiohttp.ClientSession() as session: async with session.get(CHAR_URL, headers=headers) as res: res.raise_for_status() raw_payload = await res.read() status = res.status content_type = res.headers.get("Content-Type", "") #type: ignore # If server returned JSON, inspect for TOKEN_EXPIRED and bail after refreshing if raw_payload[:1] in (b"{", b"[") or "json" in content_type.lower(): try: j = json.loads(raw_payload.decode("utf-8", "replace")) except Exception: j = {} err = (j.get("result") or {}).get("error") if err in ("TOKEN_EXPIRED", "INVALID_TOKEN"): logging.warning(f"JWT ERROR ({err}) for '%s'; refreshing token and exiting early.", player_uid) try: await get_JWT() logging.info("JWT refreshed at %s", AUTH_FILE) except Exception: logging.exception("JWT refresh failed") return {} try: _dump_failed_payload(status, content_type, raw_payload, player_uid) except Exception: pass raise ClanInfoError(f"Server returned JSON (error={err!r})") except aiohttp.ClientError as e: # Check if it's a rate limit (503) error - don't spam logs for these error_msg = str(e) if "503" in error_msg or "RETRY" in error_msg: # Rate limit - log at debug level only logging.info("HTTP request to %s rate limited (503)", CHAR_URL) else: # Other errors - log normally logging.error("HTTP request to %s failed", CHAR_URL, exc_info=e) raise ClanInfoError("Network request failed") from e # 3) Parse BLK, unwrap, and extract try: data = await bin_blk_to_json(raw_payload) # If raw=True, return the full data if raw: return data # Otherwise, clean up the data cleaned_data = _clean_player_data(data) return cleaned_data except ClanInfoError as e: # BLK parsing failures try: _dump_failed_payload(status, content_type, raw_payload, player_uid) except Exception: pass raise ClanInfoError(f"Squadron '{player_uid}' unavailable or deleted") from e except (TypeError, KeyError) as e: # missing fields try: _dump_failed_payload(status, content_type, raw_payload, player_uid) except Exception: pass raise ClanInfoError(f"Squadron '{player_uid}' unavailable or deleted") from e async def obtain_clan_info_api( squadron_key: str, key_type: str, # must be one of: "clanName", "clanTag", "clanID" max_retries: int = 5, base_wait: float = 2.0, ): """Fetch full clan/squadron info from the War Thunder API. Looks up a clan by name, tag, or numeric ID depending on ``key_type``, parses the BLK response, and returns the unwrapped data dict. Retries with linear backoff on 403 or network errors. Args: squadron_key: The clan identifier (name, tag, or ID string). key_type: One of ``"clanName"``, ``"clanTag"``, or ``"clanID"``. max_retries: Maximum number of retry attempts. base_wait: Base wait time in seconds between retries. Returns: A dict of parsed clan data with the ``"root"`` wrapper removed. Returns an empty dict if the JWT is expired and was refreshed. Raises: ValueError: If ``squadron_key`` is empty or ``key_type`` is invalid. ClanInfoError: If all retries are exhausted or parsing fails. """ if not squadron_key or not squadron_key.strip(): raise ValueError("squadron_key must be a non-empty string") if key_type not in ("clanName", "clanTag", "clanID"): raise ValueError("key_type must be one of: 'clanName', 'clanTag', 'clanID'") # 1) Load JWT try: async with aiofiles.open(AUTH_FILE, "r", encoding="utf-8") as f: jwt = json.loads(await f.read())["jwt"] except Exception as e: logging.error("Failed to load JWT from %s", AUTH_FILE, exc_info=e) raise ClanInfoError("Authentication token load failed") from e headers = {"action": "cln_clan_get", "token": jwt, key_type: squadron_key} attempt = 0 while attempt < max_retries: try: async with aiohttp.ClientSession() as session: async with session.get( CHAR_URL, headers=headers ) as res: raw = await res.read() status = res.status content_type = res.headers.get("Content-Type", "") # Retry on 403 if status == 403: raise aiohttp.ClientResponseError( res.request_info, res.history, status=status, message="Forbidden" ) res.raise_for_status() # Handle JSON responses if raw[:1] in (b"{", b"[") or "json" in content_type.lower(): try: j = json.loads(raw.decode("utf-8", "replace")) except Exception: j = {} err = (j.get("result") or {}).get("error") if err in ("TOKEN_EXPIRED", "INVALID_TOKEN"): logging.warning( f"JWT ERROR ({err}) for '%s'; refreshing token and exiting early.", squadron_key, ) try: await get_JWT() logging.info("JWT refreshed at %s", AUTH_FILE) except Exception: logging.exception("JWT refresh failed") return {} try: _dump_failed_payload(status, content_type, raw, squadron_key) except Exception: pass raise ClanInfoError(f"Server returned JSON (error={err!r})") except (aiohttp.ClientError, aiohttp.ClientResponseError) as e: attempt += 1 wait = base_wait * attempt await asyncio.sleep(wait) continue # try again with new proxy # 4) Parse BLK if we got here without exceptions try: data = await bin_blk_to_json(raw) # unwrap "root" if it's the only key if isinstance(data, dict) and "root" in data and isinstance(data["root"], dict): data = data["root"] return data except (ClanInfoError, TypeError, KeyError) as e: try: _dump_failed_payload(status, content_type, raw, squadron_key) except Exception: pass raise ClanInfoError(f"Clan '{squadron_key}' unavailable or deleted") from e # If retries exhausted raise ClanInfoError( f"Failed to fetch clan '{squadron_key}' after {max_retries} attempts" ) ### JWT MANAGEMENT ### # Load credentials from environment _wt_client_id = os.getenv("WT_CLIENT_ID", "") _wt_auth_url = os.getenv("WT_AUTH_URL", "https://auth.gaijinent.com/login.php") _wt_login_email = os.getenv("WT_LOGIN_EMAIL", "") _wt_login_password = os.getenv("WT_LOGIN_PASSWORD", "") _jwt_headers = { "Host": "auth.gaijinent.com", "User-Agent": "yuplay2 lib / WarThunder", "Accept": "*/*", "Accept-Encoding": "deflate, gzip, br, zstd", "Content-Type": "application/x-www-form-urlencoded", "X-Client-Id": _wt_client_id } _jwt_payload = { "client": _wt_client_id, "game": "wt", "gapp_id": 50278, "login": _wt_login_email, "meta": 1, "password": _wt_login_password, "v": 2, } async def get_JWT(): """Authenticate with the War Thunder auth server and persist the JWT. Posts login credentials to the Gaijin auth endpoint, writes the returned JSON (containing the ``jwt`` field) to ``AUTH_FILE``. Returns: The parsed JSON response dict from the auth server. Raises: aiohttp.ClientError: If the HTTP request fails. """ async with aiohttp.ClientSession() as session: async with session.post(_wt_auth_url, headers=_jwt_headers, data=_jwt_payload) as resp: logging.info("JWT Status: %s", resp.status) data = await resp.json() # Write where obtain_clan_new_points() reads from async with aiofiles.open(AUTH_FILE, "w", encoding="utf-8") as f: await f.write(json.dumps(data, ensure_ascii=False, indent=4)) return data # optional: return for logging/tests ### JWT MANAGEMENT ### pts__api_test = True if __name__ == "__main__" and pts__api_test: pts = asyncio.run(obtain_clan_new_points("Death Sentence")) out_path = Path("clan_pts.json") with out_path.open("w", encoding="utf-8") as f: json.dump(pts, f, indent=2, ensure_ascii=False) print(f"✅ Clan data dumped to {out_path.resolve()}") clan_info_api_test = False if __name__ == "__main__" and clan_info_api_test: clan_data = asyncio.run(obtain_clan_info_api("Death Sentence", "clanName")) out_path = Path("clan_data.json") with out_path.open("w", encoding="utf-8") as f: json.dump(clan_data, f, indent=2, ensure_ascii=False) print(f"✅ Clan data dumped to {out_path.resolve()}") player_test = False if __name__ == "__main__" and player_test: player_data = asyncio.run(obtain_player_data_api("199874256", raw=True)) out_path = Path("player_data.json") with out_path.open("w", encoding="utf-8") as f: json.dump(player_data, f, indent=2, ensure_ascii=False) print(f"✅ Player data dumped to {out_path.resolve()}") jwt_test = False if __name__ == "__main__" and jwt_test: pts = asyncio.run(get_JWT()) logging.info("Saved JSON response to JWT.json") leaderboard_test = False if __name__ == "__main__" and leaderboard_test: lb_data = asyncio.run(obtain_clans_leaderboard(start=0, count=1000)) out_path = Path("leaderboard_data.json") with out_path.open("w", encoding="utf-8") as f: json.dump(lb_data, f, indent=2, ensure_ascii=False) print(f"✅ Leaderboard data dumped to {out_path.resolve()}")