Files
SREBOT/scripts/leaderboard_score_probe.py
T
FURRO404 2b399fdb81 add SREBOT, SHARED, TSSBOT contents (fixup for #1223)
PR #1223 only staged the deletions of the old paths because the new
top-level directories were still untracked when the commit was authored.
This commit adds the actual restructured tree: SREBOT/ (existing bot),
SHARED/ (vromfs, data_parser, ICONS/MAPS/FONTS, DAGOR_FILES,
update_game_files), and TSSBOT/ (skeleton).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 23:17:02 -07:00

185 lines
5.9 KiB
Python

#!/usr/bin/env python3
"""Probe the War Thunder clan leaderboard for a specific squadron.
This mirrors the bot's ``obtain_clans_leaderboard()`` flow:
1. Load the JWT from the storage auth file.
2. Call the clan leaderboard endpoint with ``action=cln_clan_get_leaderboard``.
3. Decode the BLK payload.
4. Print leaderboard counts and the first matching squadron entry.
Usage:
python3 scripts/leaderboard_score_probe.py
python3 scripts/leaderboard_score_probe.py --needle SCORE --count 1000
python3 scripts/leaderboard_score_probe.py --auth-file /mnt/.../STORAGE/auth_JWT.json --char-url "$WT_CHAR_URL"
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from pathlib import Path
from typing import Any
from dotenv import load_dotenv
def _infer_storage_root(auth_file: Path) -> Path:
if auth_file.parent.name == "AUTH":
return auth_file.parent.parent
return auth_file.parent
def _prepare_env(storage_root: Path) -> None:
os.environ["SREBOT_STORAGE_VOL_PATH"] = str(storage_root)
def _score_hit(clan: dict[str, Any], needle: str) -> bool:
needle_u = needle.upper()
values = (
str(clan.get("short_name", "")).upper(),
str(clan.get("tag", "")).upper(),
str(clan.get("long_name", "")).upper(),
)
return any(v == needle_u or needle_u in v for v in values)
async def _main() -> int:
repo_root = Path(__file__).resolve().parents[1]
env_path = repo_root / ".env"
if str(repo_root) not in sys.path:
sys.path.insert(0, str(repo_root))
parser = argparse.ArgumentParser()
parser.add_argument("--needle", default="SCORE", help="Squadron name to search for")
parser.add_argument("--start", type=int, default=0, help="Leaderboard offset")
parser.add_argument("--count", type=int, default=1000, help="Leaderboard page size")
parser.add_argument(
"--auth-file",
default="",
help="Path to the JWT auth JSON file. If omitted, the script will use a temp file under the storage root.",
)
parser.add_argument(
"--char-url",
default="",
help="War Thunder char API URL (defaults to WT_CHAR_URL env var)",
)
args = parser.parse_args()
if args.auth_file:
auth_file = Path(args.auth_file).expanduser().resolve()
storage_root = _infer_storage_root(auth_file)
else:
storage_root_env = os.environ.get("SREBOT_STORAGE_VOL_PATH", "").strip()
if not storage_root_env:
print(
"SREBOT_STORAGE_VOL_PATH is not set. Export it or pass --auth-file.",
file=sys.stderr,
)
return 2
storage_root = Path(storage_root_env)
auth_file = storage_root / "AUTH" / "auth_JWT.json"
_prepare_env(storage_root)
auth_file.parent.mkdir(parents=True, exist_ok=True)
load_dotenv(dotenv_path=env_path)
if not args.char_url:
args.char_url = os.environ.get("WT_CHAR_URL", "")
if args.char_url:
os.environ["WT_CHAR_URL"] = args.char_url
elif not os.environ.get("WT_CHAR_URL"):
print(
"WT_CHAR_URL is not set. Pass --char-url or export WT_CHAR_URL first.",
file=sys.stderr,
)
return 2
# Import after env setup so BOT.game_api picks up the correct paths.
from BOT import game_api # type: ignore
import aiohttp
game_api.AUTH_FILE = auth_file
bin_blk_to_json = game_api.bin_blk_to_json
if not auth_file.exists():
await game_api.get_JWT()
with auth_file.open("r", encoding="utf-8") as f:
jwt = json.load(f)["jwt"]
headers = {
"action": "cln_clan_get_leaderboard",
"token": jwt,
"start": str(args.start),
"count": str(args.count),
"sortField": "dr_era5_hist",
"shortMode": "off",
}
url = os.environ["WT_CHAR_URL"]
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as res:
raw = await res.read()
print(f"http_status={res.status}")
if res.status != 200:
print(raw[:1000].decode("utf-8", "replace"), file=sys.stderr)
return 1
data = await bin_blk_to_json(raw)
if "root" in data:
data = data["root"]
clans = data.get("clan") or []
print(f"returned_clans={len(clans)}")
positive = sum(1 for clan in clans if int((clan.get("astat") or {}).get("dr_era5_hist") or 0) > 0)
print(f"positive_clans={positive}")
matches = []
for idx, clan in enumerate(clans, start=1):
if _score_hit(clan, args.needle):
astat = clan.get("astat") or {}
matches.append(
{
"index": idx,
"clan_id": clan.get("_id"),
"tag": clan.get("tag"),
"name": clan.get("name"),
"short_name": clan.get("tag", "")[1:-1] if clan.get("tag") else "",
"clanrating": astat.get("dr_era5_hist"),
"members": clan.get("members_cnt"),
"position": clan.get("pos"),
}
)
if not matches:
print(f"needle={args.needle!r} not found")
return 0
print("matches=" + json.dumps(matches[:10], ensure_ascii=False))
first = matches[0]["index"]
lo = max(1, first - 3)
hi = min(len(clans), first + 3)
window = []
for idx in range(lo, hi + 1):
clan = clans[idx - 1]
astat = clan.get("astat") or {}
window.append(
{
"index": idx,
"tag": clan.get("tag"),
"name": clan.get("name"),
"clanrating": astat.get("dr_era5_hist"),
"position": clan.get("pos"),
}
)
print("window=" + json.dumps(window, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(asyncio.run(_main()))