""" task_executors.py Task execution logic for scheduled tasks. Contains the execute_* functions for leaderboard alarms, points tracking, leave detection, squadron stats, and database synchronization routines. """ # Standard Library Imports import asyncio import json import logging import os import shutil import time as T from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # Third-Party Library Imports import aiofiles import aiosqlite import discord from discord import TextChannel, Thread # Local Module Imports from .game_api import obtain_clan_new_points, save_squadrons_to_db, ClanInfoError from .health import record_task_run from .utils import t, lang_from_features from .wl import clean_WL as _clean_wl_db, get_standings from .utils import ( STORAGE_DIR, SQUADRONS_DB_PATH, SQ_BATTLES_DB_PATH, BLACKLISTED_SERVER_IDS, DEFAULT_FOOTER_CAT, compress_json, discord_len, pad_display_width, get_bot, load_features, parse_channel_id, remove_guild_pref_notification, resolve_clan, resolve_pref_key, is_guild_entitled, get_guild_tier, refresh_entitled_guilds, PREMIUM_ACTIVATION_TS, tier_enforcement_active, allowed_pref_keys_for, enabled_pref_keys_for, esc, REPLAYS_DIR, WILDCARD_KEYS, get_most_recent_posted_timeslot_window, ) from .autologging import send_over_cap_warning from .weekly_br_elo import ( br_color_int, player_scores as _wbr_player_scores, squadron_report_for_variants_from_maps, squadron_scores as _wbr_squadron_scores, top_n_squadrons_with_top_k_players_from_maps, ) # Snapshots directory for points alarm SNAPSHOTS_DIR = STORAGE_DIR / "SNAPSHOTS" SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True) # Marker file directory for the Weekly BR Report (idempotency) WEEKLY_BR_DIR = STORAGE_DIR / "WEEKLY_BR" WEEKLY_BR_DIR.mkdir(parents=True, exist_ok=True) WEEKLY_BR_MARKER_PATH = WEEKLY_BR_DIR / "last_fired.json" # ============================================================================ # SNAPSHOT FUNCTIONS (for points alarm) # ============================================================================ ORDINAL_SUFFIXES = { "en": lambda n: f"{n}th" if 11 <= n % 100 <= 13 else f"{n}{['th','st','nd','rd','th'][min(n%10,4)]}", "de": lambda n: f"{n}.", "fr": lambda n: f"{n}er" if n == 1 else f"{n}e", "es": lambda n: f"{n}\u00ba", "it": lambda n: f"{n}\u00ba", "pt": lambda n: f"{n}\u00ba", "pl": lambda n: f"{n}.", "ru": lambda n: f"{n}-е", "uk": lambda n: f"{n}-е", "cs": lambda n: f"{n}.", } def ordinal(n: int, lang: str) -> str: """Format an integer as an ordinal string for the given language.""" fmt = ORDINAL_SUFFIXES.get(lang, ORDINAL_SUFFIXES["en"]) return fmt(n) async def get_squadron_placement(squadron_name: str) -> int | None: """Return 1-based leaderboard placement for a squadron, or None if not ranked.""" try: async with aiosqlite.connect(f"file:{SQUADRONS_DB_PATH}?mode=ro", uri=True) as con: async with con.execute( """ SELECT long_name FROM squadrons_data WHERE clanrating IS NOT NULL AND clanrating > 0 ORDER BY clanrating DESC """ ) as cur: rank = 1 async for row in cur: if row[0] and row[0].lower() == squadron_name.lower(): return rank rank += 1 except Exception as e: logging.error("[PLACEMENT] Error fetching placement for %s: %s", squadron_name, e) return None async def take_snapshot(squadron_name: str, region: str) -> dict | None: """ Return { total_points, members, placement } or None if we hit ANY problem or get no members. """ try: ratings, total = await obtain_clan_new_points(squadron_name) except ClanInfoError: return None if not ratings: return None placement = await get_squadron_placement(squadron_name) return {"total_points": total, "members": ratings, "placement": placement} async def save_snapshot(snapshot: dict, squadron_name: str, region: str) -> None: """Save a squadron points snapshot to a JSON file in the snapshots directory. Args: snapshot: Dict containing total_points and members data. squadron_name: Squadron identifier used in the filename. region: Region string used in the filename. """ filename = f"{squadron_name}-{region}.json" path = SNAPSHOTS_DIR / filename async with aiofiles.open(path, "w", encoding="utf-8") as f: await f.write(json.dumps(snapshot, ensure_ascii=False, indent=2)) async def load_snapshot(squadron_name: str, region: str) -> dict | None: """Load a previously saved squadron points snapshot from disk. Args: squadron_name: Squadron identifier used in the filename. region: Region string used in the filename. Returns: The parsed snapshot dict, or None if not found or on error. """ filename = f"{squadron_name}-{region}.json" path = SNAPSHOTS_DIR / filename try: async with aiofiles.open(path, "r", encoding="utf-8") as f: return json.loads(await f.read()) except FileNotFoundError: return None except Exception as e: logging.error(f"Error loading snapshot {filename}: {e}") return None def compare_points(old_snapshot: dict, new_snapshot: dict) -> tuple[dict, int]: """ Compare `old_snapshot` vs `new_snapshot`, returning: - diffs: { uid: (delta_points:int, new_points:int), ... } • delta_points: how much the player's points changed (positive/negative) • new_points: the player's current total points (0 if they left) - old_total: total points from the old snapshot """ if not old_snapshot or not new_snapshot: return {}, 0 old_members = old_snapshot.get("members", {}) new_members = new_snapshot.get("members", {}) old_total = old_snapshot.get("total_points", 0) diffs = {} # Pass 1: check all UIDs that exist in the new snapshot for uid, info in new_members.items(): new_pts = info.get("points", 0) old_pts = old_members.get(uid, {}).get("points", 0) delta = new_pts - old_pts if delta != 0: diffs[uid] = (delta, new_pts) # Pass 2: check for UIDs that existed before but are missing now for uid, info in old_members.items(): if uid not in new_members: old_pts = info.get("points", 0) if old_pts > 0: diffs[uid] = (-old_pts, 0) return diffs, old_total async def _load_points_alarm_player_slot_stats( uids: List[str], region: str ) -> Dict[str, Dict[str, float]]: """Return per-UID KDR/KPS for the most recently completed posted slot.""" slot = get_most_recent_posted_timeslot_window(region, end_grace_minutes=10) if not slot or not uids: return {} start_ts, end_ts = slot clean_uids = [str(uid) for uid in uids if str(uid)] if not clean_uids: return {} placeholders = ",".join("?" for _ in clean_uids) sql = f""" WITH player_sessions AS ( SELECT UID, session_id, SUM(ground_kills + air_kills) AS kills, SUM(deaths) AS deaths FROM player_games_hist WHERE UID IN ({placeholders}) AND endtime_unix >= ? AND endtime_unix <= ? GROUP BY UID, session_id ) SELECT UID, COUNT(*) AS games, SUM(kills) AS total_kills, SUM(deaths) AS total_deaths FROM player_sessions GROUP BY UID """ out: Dict[str, Dict[str, float]] = {} try: async with aiosqlite.connect( f"file:{SQ_BATTLES_DB_PATH}?mode=ro", uri=True, timeout=30.0 ) as db: await db.execute("PRAGMA busy_timeout=30000;") params = [*clean_uids, start_ts, end_ts] async with db.execute(sql, params) as cursor: async for uid, games, total_kills, total_deaths in cursor: games_i = max(0, int(games or 0)) kills_i = max(0, int(total_kills or 0)) deaths_i = max(0, int(total_deaths or 0)) if games_i <= 0: continue out[str(uid)] = { "kdr": (kills_i / deaths_i) if deaths_i > 0 else float(kills_i), "kps": (kills_i / games_i), } except Exception as e: logging.error("(POINTS) Failed loading slot-scoped KDR/KPS stats: %s", e) return out # ============================================================================ # CLEANUP FUNCTIONS # ============================================================================ async def cleanup_WL() -> None: """ Clears WL tables in SQLite (events + standings). Keeps the async signature for existing callers. """ loop = asyncio.get_running_loop() # run the synchronous DB clear off the event loop await loop.run_in_executor(None, _clean_wl_db) async def cleanup_replays(): """ Cleans up replay directories in STORAGE/REPLAYS/: - After 12 hours: deletes regenerable files (PNGs, MP4s) - After 48 hours: deletes entire directory (GOB + JSON included) Age is determined from the mtime of replay.gob / replay_data.json (written once at capture time), not the directory mtime — directory mtime is bumped whenever files inside are added or removed (including by this cleanup), which would otherwise keep dirs perpetually "fresh". """ KEEP_FILES = {"replay.gob", "replay_data.json"} def _sync_cleanup_replays(): """Synchronous helper that walks replay dirs and deletes stale files.""" replay_file_path = REPLAYS_DIR if not replay_file_path.exists(): return current_time = T.time() cutoff_regen = current_time - (12 * 60 * 60) # 12h — delete regenerable files cutoff_full = current_time - (48 * 60 * 60) # 48h — delete entire directory full_deleted: list[str] = [] regen_cleaned: list[tuple[str, int]] = [] for entry in os.listdir(replay_file_path): entry_path = replay_file_path / entry if not entry_path.is_dir(): continue gob_path = entry_path / "replay.gob" json_path = entry_path / "replay_data.json" if gob_path.exists(): entry_mtime = gob_path.stat().st_mtime elif json_path.exists(): entry_mtime = json_path.stat().st_mtime else: entry_mtime = entry_path.stat().st_mtime if entry_mtime < cutoff_full: try: shutil.rmtree(entry_path) full_deleted.append(entry) except OSError as e: logging.warning(f"cleanup_replays: failed to rmtree {entry_path}: {e}") elif entry_mtime < cutoff_regen: removed_count = 0 for fname in os.listdir(entry_path): if fname not in KEEP_FILES: fpath = entry_path / fname try: os.remove(fpath) removed_count += 1 except OSError: pass if removed_count: regen_cleaned.append((entry, removed_count)) if full_deleted: logging.info( f"cleanup_replays: removed {len(full_deleted)} dirs >48h old: " f"{', '.join(full_deleted)}" ) if regen_cleaned: total_files = sum(n for _, n in regen_cleaned) preview = ", ".join(f"{name}({n})" for name, n in regen_cleaned[:10]) suffix = f" (+{len(regen_cleaned) - 10} more)" if len(regen_cleaned) > 10 else "" logging.info( f"cleanup_replays: cleaned regen files in {len(regen_cleaned)} dirs " f"({total_files} files): {preview}{suffix}" ) if not full_deleted and not regen_cleaned: logging.info("cleanup_replays: nothing to clean") await asyncio.to_thread(_sync_cleanup_replays) async def cleanup_masterLog(): """ Deletes all files in STORAGE_DIR / "PROFILES" / "MASTER_LOG". """ def _sync_cleanup_masterLog(): """Synchronous helper that deletes all files in the MASTER_LOG directory.""" masterLog_dir = STORAGE_DIR / "PROFILES" / "MASTER_LOG" if not masterLog_dir.exists(): logging.warning(f"Master log dir does not exist: {masterLog_dir}") return for entry in masterLog_dir.iterdir(): if entry.is_file(): try: entry.unlink() except Exception as e: logging.error(f"Failed to delete {entry}: {e}") await asyncio.to_thread(_sync_cleanup_masterLog) async def cleanup_comps(): """ Deletes all files in STORAGE_DIR / "COMPS". """ def _sync_cleanup_comps(): """Synchronous helper that deletes all files in the COMPS directory.""" comps_dir = STORAGE_DIR / "COMPS" if not comps_dir.exists(): logging.warning(f"Comps dir does not exist: {comps_dir}") return for entry in comps_dir.iterdir(): if entry.is_file(): try: entry.unlink() except Exception as e: logging.error(f"Failed to delete {entry}: {e}") await asyncio.to_thread(_sync_cleanup_comps) # ============================================================================ # SQUADRON POINTS TABLE # ============================================================================ async def init_squadrons_points_table(): """Initialize the squadrons_points table if it doesn't exist.""" async with aiosqlite.connect(SQUADRONS_DB_PATH, timeout=30.0) as db: await db.execute("PRAGMA busy_timeout=30000;") await db.execute(""" CREATE TABLE IF NOT EXISTS squadrons_points ( id INTEGER PRIMARY KEY AUTOINCREMENT, clan_id INTEGER NOT NULL, long_name TEXT NOT NULL, unix_time INTEGER NOT NULL, clan_pts TEXT NOT NULL, total_score INTEGER NOT NULL ) """) # Create index for efficient queries await db.execute(""" CREATE INDEX IF NOT EXISTS idx_squadrons_points_longname_time ON squadrons_points(long_name, unix_time) """) await db.commit() # ============================================================================ # EXECUTE FUNCTIONS # ============================================================================ async def execute_ldb_alarm_task(): """Execute the leaderboard alarm task. Refreshes squadrons.db, loads the top-50 leaderboard, computes stat deltas (kills, deaths, win rate, play time) against the previous snapshot, builds paginated Discord embeds (top-20 and 21-50), and sends them to all guilds with a configured leaderboard channel. """ bot = get_bot() # 0. Update squadrons.db with fresh data try: await execute_update_squadrons_db_task(count=100) except Exception as e: logging.error(f"(LDB) Error refreshing squadrons.db: {e}") # Continue anyway with existing data # 1. Load top 50 squadrons from squadrons.db (track more, display top 30) try: async with aiosqlite.connect(SQUADRONS_DB_PATH, timeout=30.0) as db: await db.execute("PRAGMA busy_timeout=30000;") async with db.execute(""" SELECT clan_id, position, short_name, tag_name, wins, battles, a_kills, g_kills, deaths, playtime, clanrating, members FROM squadrons_data WHERE position IS NOT NULL AND position < 50 ORDER BY position ASC """) as cursor: rows = list(await cursor.fetchall()) except Exception as e: logging.error(f"(LDB) Fatal error loading squadrons from DB: {e}") return if not rows: logging.warning("(LDB) No squadron data available; skipping") return # 2. Build current leaderboard data current_data = {} for row in rows: clan_id, position, short_name, tag_name, wins, battles, a_kills, g_kills, deaths, playtime, clanrating, members = row # Calculate derived stats wins_val = wins or 0 battles_val = battles or 0 a_kills_val = a_kills or 0 g_kills_val = g_kills or 0 deaths_val = deaths or 0 win_rate = (wins_val / battles_val * 100) if battles_val > 0 else 0.0 total_kills = a_kills_val + g_kills_val kd_ratio = (total_kills / deaths_val) if deaths_val > 0 else 0.0 current_data[str(clan_id)] = { "position": position, "short_name": short_name, "tag_name": tag_name, "wins": wins_val, "battles": battles_val, "a_kills": a_kills_val, "g_kills": g_kills_val, "deaths": deaths_val, "playtime": playtime or 0, "clanrating": clanrating or 0, "members": members or 0, "win_rate": win_rate, "total_kills": total_kills, "kd_ratio": kd_ratio } # 3. Calculate session stats and add to current_data prefs_dir = STORAGE_DIR / "PREFERENCES" leaderboards_dir = STORAGE_DIR / "LEADERBOARDS" leaderboards_dir.mkdir(parents=True, exist_ok=True) # Load previous data to calculate session stats prev_data_all = {} for clan_id_str in current_data.keys(): clan_file = leaderboards_dir / f"{clan_id_str}.json" if clan_file.exists(): try: async with aiofiles.open(clan_file, "r", encoding="utf-8") as fp: prev_data_all[clan_id_str] = json.loads(await fp.read()) except Exception as e: logging.error(f"(LDB) Error loading previous data for clan {clan_id_str}: {e}") # Calculate and store session stats for clan_id_str, data in current_data.items(): prev = prev_data_all.get(clan_id_str, {}) if prev: session_kills = data["total_kills"] - prev.get("total_kills", 0) session_deaths = data["deaths"] - prev.get("deaths", 0) data["session_kd"] = (session_kills / session_deaths) if session_deaths > 0 else None session_wins = data["wins"] - prev.get("wins", 0) session_battles = data["battles"] - prev.get("battles", 0) data["session_wr"] = (session_wins / session_battles * 100) if session_battles > 0 else None else: data["session_kd"] = None data["session_wr"] = None # 4. Helper function to build an embed for a group of squadrons def build_squadron_embed(clans_to_display, sorted_clans, prev_data_all, title, description, color, lang="en"): """Build a Discord embed showing leaderboard stats for a group of squadrons. Args: clans_to_display: List of (clan_id_str, data) tuples to render. sorted_clans: Full sorted list of all clans for position lookups. prev_data_all: Dict mapping clan_id_str to previous snapshot data. title: Embed title string. description: Embed description string. color: discord.Color for the embed sidebar. lang: Locale code for field labels. Returns: A discord.Embed with one inline field per squadron. """ embed = discord.Embed(title=title, description=description, color=color) embed.set_footer(text=DEFAULT_FOOTER_CAT) # Create position lookup map for O(1) access instead of O(n) search position_map = {data['position']: data for _, data in sorted_clans} for clan_id_str, data in clans_to_display: prev = prev_data_all.get(clan_id_str, {}) # Universal indicator: shows either change amount or previous value def get_indicator(current_val, prev_val, precision=0, suffix="", show_diff_instead_of_prev=True): """Return a formatted up/down indicator string comparing current vs previous values.""" if prev_val is None: return "" diff = current_val - prev_val # Choose what to display: difference or current value display_val = abs(diff) if show_diff_instead_of_prev else current_val # Format the value if isinstance(current_val, float): display_fmt = f"{display_val:.{precision}f}" else: display_fmt = f"{display_val}" # Return with appropriate indicator if current_val > prev_val: return f"(🌲 {display_fmt}{suffix})" elif current_val < prev_val: return f"(🔻 {display_fmt}{suffix})" return "" # Build position indicator and details def get_pos_indicator(data, prev): """Return a formatted position-change indicator and stat details line.""" # Position change indicator if prev: prev_pos = prev.get("position", 999) pos_change = prev_pos - data["position"] if pos_change > 0: pos_indicator = f"(🌲 {pos_change})" elif pos_change < 0: pos_indicator = f"(🔻 {abs(pos_change)})" else: pos_indicator = "" else: pos_indicator = "🆕" # Points behind next rank (for non-first places) position_details = "" if data['position'] > 0: # Find squadron directly ahead using O(1) lookup prev_position_data = position_map.get(data['position'] - 1) if prev_position_data: points_behind = prev_position_data['clanrating'] - data['clanrating'] position_details = f"({points_behind:,} Behind)" return pos_indicator, position_details pos_indicator, position_details = get_pos_indicator(data, prev) # Get session stats from data (already calculated) session_kd = data.get("session_kd") session_wr = data.get("session_wr") rating_indicator = get_indicator(data["clanrating"], prev.get("clanrating", 0)) battles_indicator = get_indicator(data["battles"], prev.get("battles", 0)) wins_indicator = get_indicator(data["wins"], prev.get("wins", 0)) # For WR and K/D: compare overall to current session performance win_rate_indicator = get_indicator(session_wr, data["win_rate"], precision=1, suffix="%", show_diff_instead_of_prev=False) if session_wr is not None else "" kd_indicator = get_indicator(session_kd, data["kd_ratio"], precision=2, show_diff_instead_of_prev=False) if session_kd is not None else "" total_kills_indicator = get_indicator(data["total_kills"], prev.get("total_kills", 0)) deaths_indicator = get_indicator(data["deaths"], prev.get("deaths", 0)) members_indicator = get_indicator(data["members"], prev.get("members", 0)) field_name = f"**#{data['position'] + 1} {esc(data['short_name'])} {pos_indicator}\n{position_details}**" field_value = ( f"**{t(lang, 'common.rating_field')}:** {data['clanrating']:,} {rating_indicator}\n" f"**{t(lang, 'common.battles_field')}:** {data['battles']:,} {battles_indicator}\n" f"**{t(lang, 'common.wins_field')}:** {data['wins']:,} {wins_indicator}\n" f"**WR:** {data['win_rate']:.1f}% {win_rate_indicator}\n" f"**{t(lang, 'common.kills_field')}:** {data['total_kills']:,} {total_kills_indicator}\n" f"**{t(lang, 'common.deaths_field')}:** {data['deaths']:,} {deaths_indicator}\n" f"**{t(lang, 'common.kd_field')}:** {data['kd_ratio']:.2f} {kd_indicator}\n" f"**{t(lang, 'common.members_field')}:** {data['members']} {members_indicator}" "\u200b" # Adds spacing ) embed.add_field(name=field_name, value=field_value, inline=True) return embed # 5. Compute shared values used when building embeds per-guild current_timestamp = int(datetime.now(timezone.utc).timestamp()) # Sort by position sorted_clans = sorted(current_data.items(), key=lambda x: x[1]["position"]) # Split into groups Zero_To_Fifteen = [(cid, data) for cid, data in sorted_clans if data["position"] < 15] Sixteen_To_Thirty = [(cid, data) for cid, data in sorted_clans if 15 <= data["position"] < 30] # 7. Loop through guilds and send all embeds for guild in bot.guilds: guild_id = guild.id guild_name = guild.name # Load this guild's preferences pref_path = prefs_dir / f"{guild_id}-preferences.json" try: async with aiofiles.open(pref_path, "r", encoding="utf-8") as fp: prefs = json.loads(await fp.read()) except FileNotFoundError: continue except Exception as e: logging.error(f"(LDB) Error loading prefs for guild {guild_id}: {e}") continue # Find the first usable Leaderboard channel; stale entries are removed. channel = None for squadron_name, squad_prefs in prefs.items(): if not (isinstance(squad_prefs, dict) and "Leaderboard" in squad_prefs): continue channel_str = squad_prefs["Leaderboard"] channel_id = parse_channel_id(str(channel_str)) if channel_id is None: logging.warning(f"(LDB) Invalid channel ID '{channel_str}' for {squadron_name} in {guild_name}") await remove_guild_pref_notification(guild_id, squadron_name, "Leaderboard", preferences=prefs) continue channel = bot.get_channel(channel_id) if channel is None: try: channel = await bot.fetch_channel(channel_id) except discord.NotFound: logging.warning(f"(LDB) Removing stale Leaderboard channel {channel_id} for {squadron_name} in {guild_name}") await remove_guild_pref_notification(guild_id, squadron_name, "Leaderboard", preferences=prefs) channel = None continue except discord.Forbidden: channel = None continue break if channel is None: continue # Resolve guild language guild_features = await load_features(guild_id) lang = lang_from_features(guild_features) # Premium gate — block non-entitled guilds when premium is active if not await is_guild_entitled(guild_id): try: gate_embed = discord.Embed( title=t(lang, "leaderboard_alarm.not_logged_title"), description=t(lang, "leaderboard_alarm.not_logged_desc"), color=discord.Color.red(), ) await channel.send(embed=gate_embed) # type: ignore except Exception: pass continue # Build locale-aware embeds for this guild guild_embeds = [] if Zero_To_Fifteen: embed1 = build_squadron_embed( clans_to_display=Zero_To_Fifteen, sorted_clans=sorted_clans, prev_data_all=prev_data_all, title=t(lang, "leaderboard_alarm.title"), description=t(lang, "leaderboard_alarm.top15_desc", timestamp=current_timestamp), color=discord.Color.gold(), lang=lang, ) guild_embeds.append(embed1) if Sixteen_To_Thirty: embed2 = build_squadron_embed( clans_to_display=Sixteen_To_Thirty, sorted_clans=sorted_clans, prev_data_all=prev_data_all, title=t(lang, "leaderboard_alarm.title"), description=t(lang, "leaderboard_alarm.top30_desc"), color=discord.Color.gold(), lang=lang, ) guild_embeds.append(embed2) # Send all locale-aware embeds to this guild's channel try: for embed in guild_embeds: await channel.send(embed=embed) # type: ignore except Exception as e: logging.error(f"(LDB) Failed to send embeds to guild {guild_name}: {e}") # Premium upsell for non-entitled guilds (pre-deadline) if not await is_guild_entitled(guild_id): warn_embed = discord.Embed( title=t(lang, "leaderboard_alarm.server_not_upgraded_title"), description=t(lang, "leaderboard_alarm.server_not_upgraded_desc", deadline=PREMIUM_ACTIVATION_TS), color=discord.Color.orange(), ) warn_embed.set_footer(text=DEFAULT_FOOTER_CAT) try: await channel.send(embed=warn_embed) # type: ignore except Exception as e: logging.error("(LDB) Error sending premium warning: %s", e) # 8. Save current data for next comparison for clan_id_str, data in current_data.items(): clan_file = leaderboards_dir / f"{clan_id_str}.json" try: async with aiofiles.open(clan_file, "w", encoding="utf-8") as fp: await fp.write(json.dumps(data, indent=2)) except Exception as e: logging.error(f"(LDB) Error saving data for clan {clan_id_str}: {e}") async def _process_squadron_points( squadron_name: str, region: str, opposite_region: str, channels: list[tuple[int, str, str, str]], ) -> None: """ Process a single squadron's points update and fan out to all subscribed channels. Each squadron is processed exactly once regardless of how many guilds track it. """ bot = get_bot() old_snap = await load_snapshot(squadron_name, opposite_region) new_snap = await take_snapshot(squadron_name, region) if not new_snap or not new_snap.get("members"): return # First run for this squadron? if old_snap is None: await save_snapshot(new_snap, squadron_name, region) return # Compute diffs sq_total, old_total = new_snap["total_points"], old_snap["total_points"] points_changes, _ = compare_points(old_snap, new_snap) # SEND UPDATE if points_changes: # Sort by biggest gains first sorted_changes = sorted( points_changes.items(), key=lambda kv: kv[1][1], reverse=True ) slot_stats = await _load_points_alarm_player_slot_stats( [uid for uid, _ in sorted_changes], region ) # SAFE CHUNK BUILDER max_len = 1024 chunks = [] buf = "```\nName Chg Now KDR KPS\n" for uid, (delta, now) in sorted_changes: name_raw = ( new_snap["members"].get(uid, {}).get("nick") or old_snap["members"].get(uid, {}).get("nick") or uid ) stats = slot_stats.get(uid, {}) kdr_str = f"{float(stats['kdr']):.2f}" if "kdr" in stats else "-" kps_str = f"{float(stats['kps']):.2f}" if "kps" in stats else "-" # DO NOT CHANGE arrow = "🌲" if delta > 0 else "🔻" member_str = pad_display_width(name_raw, 16) change_str = f"{arrow} {abs(delta):<5}" current_points_str = f"{now:>6}" # DO NOT CHANGE line = f"{member_str}{change_str}{current_points_str} {kdr_str:>5} {kps_str:>5}\n" # Width-safe check if discord_len(buf) + discord_len(line) > max_len: buf += "```" chunks.append(buf) buf = "```\n" + line else: buf += line # Close final chunk buf += "```" chunks.append(buf) # Final clamp to guarantee <1024 even in absurd cases safe_chunks = [] for c in chunks: if discord_len(c) > max_len: # Truncate by display width, not character count budget = max_len - discord_len("\n```(truncated)```") trimmed = "" width = 0 for ch in c: ch_w = discord_len(ch) if width + ch_w > budget: break trimmed += ch width += ch_w trimmed += "\n```(truncated)```" safe_chunks.append(trimmed) else: safe_chunks.append(c) chunks = safe_chunks # Chart arrow if sq_total > old_total: chart = "📈" elif sq_total < old_total: chart = "📉" else: chart = "ᓚᘏᗢ" # WL record for this session # Preferences are keyed by long_name but wl_standings stores short_name # from replay data, so resolve via squadrons_data first. wl_lookup_name = squadron_name try: clan = await resolve_clan(long=squadron_name) if clan and clan.get("short_name") and clan["long_name"] != "": wl_lookup_name = clan["short_name"] except Exception as e: logging.warning("(POINTS) Could not resolve short name for WL lookup: %s", e) standings = get_standings([wl_lookup_name]).get(wl_lookup_name, {}) w, l = standings.get("wins", 0), standings.get("losses", 0) # Placement change detection old_placement = old_snap.get("placement") new_placement = new_snap.get("placement") # Fan out: send to all subscribed channels for guild_id, raw_chan, flag, pref_key in channels: channel_id = parse_channel_id(str(raw_chan)) if channel_id is None: logging.error( "(POINTS) Invalid channel ID '%s' for guild %s", raw_chan, guild_id ) await remove_guild_pref_notification(guild_id, pref_key, "Points") continue channel = bot.get_channel(channel_id) if not channel: try: channel = await bot.fetch_channel(channel_id) except discord.NotFound: await remove_guild_pref_notification(guild_id, pref_key, "Points") channel = None except discord.Forbidden: channel = None if not channel: logging.error( "(POINTS) Channel %s not found in guild %s", channel_id, guild_id ) continue # Safe sendable channel check if isinstance(channel, (TextChannel, Thread)): # Resolve guild language guild_features = await load_features(guild_id) lang = lang_from_features(guild_features) # Premium gate — block non-entitled guilds when premium is active if not await is_guild_entitled(guild_id): try: gate_embed = discord.Embed( title=t(lang, "autolog.points_not_logged_title"), description=t(lang, "autolog.points_not_logged_desc"), color=discord.Color.red(), ) await channel.send(embed=gate_embed) except Exception: pass continue # Tier cap — send orange upgrade warning and skip the update if flag == "over_cap": tier = await get_guild_tier(guild_id) await send_over_cap_warning( channel, lang, tier, "Points", squadron_name, reason="over_cap" ) continue # Build locale-aware WL line and main embed for this guild wl_line = t(lang, "autolog.wl_line", squadron=esc(squadron_name), wins=w, losses=l) if (w or l) else "" # Placement change line — only if placement actually changed placement_line = "" if (old_placement is not None and new_placement is not None and old_placement != new_placement): old_ord = ordinal(old_placement, lang) new_ord = ordinal(new_placement, lang) if new_placement < old_placement: placement_line = t(lang, "autolog.placement_rose", squadron=esc(squadron_name), old_place=old_ord, new_place=new_ord) else: placement_line = t(lang, "autolog.placement_fell", squadron=esc(squadron_name), old_place=old_ord, new_place=new_ord) embed = discord.Embed( title=t(lang, "autolog.points_update_title", squadron=esc(squadron_name), region=region), description=t(lang, "autolog.points_update_desc", old_total=old_total, new_total=sq_total, chart=chart, wl_line=wl_line, placement_line=placement_line), color=discord.Color.blue() ) for c in chunks: embed.add_field(name="\u200A", value=c, inline=False) embed.set_footer(text=DEFAULT_FOOTER_CAT) try: await channel.send(embed=embed) except Exception as e: logging.error("(POINTS) Error sending update: %s", e) # Premium upsell for non-entitled guilds (pre-deadline) if not await is_guild_entitled(guild_id): warn_embed = discord.Embed( title=t(lang, "autolog.server_not_upgraded_title"), description=t(lang, "autolog.server_not_upgraded_points_desc", deadline=PREMIUM_ACTIVATION_TS), color=discord.Color.orange(), ) warn_embed.set_footer(text=DEFAULT_FOOTER_CAT) try: await channel.send(embed=warn_embed) except Exception as e: logging.error("(POINTS) Error sending premium warning: %s", e) else: logging.error( "(POINTS) Channel %s in guild %s is not a sendable channel (type=%s)", channel_id, guild_id, type(channel).__name__ ) # Overwrite the snapshot await save_snapshot(new_snap, squadron_name, region) async def execute_points_alarm_task(region: str): """ Once-a-minute task that: 1. Cleans old files 2. Collects all squadron -> channel mappings across guilds 3. Processes each unique squadron once (single API call + snapshot) 4. Fans out notifications to all subscribed channels """ bot = get_bot() # CLEANUP (concurrent) — WL cleared after embeds so session record is available cleanup_results = await asyncio.gather( cleanup_replays(), cleanup_masterLog(), cleanup_comps(), return_exceptions=True ) for r in cleanup_results: if isinstance(r, Exception): logging.error(f"[TASK] cleanup sub-task failed: {r}") PREFS_DIR = STORAGE_DIR / "PREFERENCES" opposite_region = "EU" if region == "NA" else "NA" # Refresh entitlement cache once up-front — tier lookups below rely on it await refresh_entitled_guilds() # Step 1: Collect squadron -> [(guild_id, channel_str, flag)] mappings # flag: "ok" (dispatch normal) | "over_cap" (send orange upgrade warning) squadron_channels: dict[str, list[tuple[int, str, str, str]]] = {} for guild in bot.guilds: guild_id = guild.id if guild_id in BLACKLISTED_SERVER_IDS: continue prefs_path = PREFS_DIR / f"{guild_id}-preferences.json" try: preferences = json.loads(prefs_path.read_text(encoding="utf-8")) except FileNotFoundError: continue except Exception as e: logging.error("(POINTS) Error loading prefs for guild %s: %s", guild_id, e) continue tier = await get_guild_tier(guild_id) allowed_points = allowed_pref_keys_for(preferences, tier, "Points") enabled_points = set(enabled_pref_keys_for(preferences, "Points")) over_cap_points = enabled_points - allowed_points enforcement = tier_enforcement_active() for pref_key, squad_prefs in preferences.items(): if not isinstance(squad_prefs, dict): continue if "Points" not in squad_prefs: continue # Wildcard / Global keys aren't squadron points subjects. if str(pref_key).lower() in WILDCARD_KEYS or str(pref_key).lower() == "global": continue if pref_key in allowed_points: flag = "ok" elif enforcement and pref_key in over_cap_points: flag = "over_cap" else: continue # disabled entry or silent drop # Resolve the key to the squadron's CURRENT long_name for the # downstream WT API call. Post-rename, the prefs key is the stable # clan_id while the long_name on squadrons_data tracks the rename. resolved = await resolve_pref_key(pref_key, squad_prefs) target_name = resolved["long_name"] if resolved and resolved.get("long_name") else pref_key squadron_channels.setdefault(target_name, []).append( (guild_id, squad_prefs["Points"], flag, pref_key) ) if not squadron_channels: await cleanup_WL() return # Step 2: Process each unique squadron once (concurrent) squadron_tasks = [ _process_squadron_points(squadron_name, region, opposite_region, channels) for squadron_name, channels in squadron_channels.items() ] if squadron_tasks: results = await asyncio.gather(*squadron_tasks, return_exceptions=True) for r in results: if isinstance(r, Exception): logging.error(f"[TASK] points alarm sub-task failed: {r}") # Clear WL after all embeds have been sent await cleanup_WL() async def execute_leave_alarm_task(): """ Optimized leave-update alarm that: - Loads each guild's squadron preferences - Groups guilds by squadron - Fetches all squadron snapshots in parallel - Detects leavers for each squadron - Sends all notifications in parallel with rate limiting - Saves all snapshots """ bot = get_bot() PREFS_DIR = STORAGE_DIR / "PREFERENCES" # Refresh entitlement cache once — used to filter over-cap squadrons below await refresh_entitled_guilds() # Step 1: Collect all squadron -> [(guild_id, channel_id)] mappings. # Over-cap squadrons are silently dropped here — the Points task already sends # the orange upgrade warning on the shared Points channel (rate-limit prevents dupes). squadron_channels: dict[str, list[tuple[int, int]]] = {} for guild in bot.guilds: guild_id = guild.id guild_name = guild.name prefs_path = PREFS_DIR / f"{guild_id}-preferences.json" try: preferences = json.loads(prefs_path.read_text(encoding="utf-8")) except FileNotFoundError: continue except Exception as e: logging.error("(LEAVE) Error loading prefs for guild %s: %s", guild_id, e) continue tier = await get_guild_tier(guild_id) allowed_points = allowed_pref_keys_for(preferences, tier, "Points") for pref_key, squad_prefs in preferences.items(): if not isinstance(squad_prefs, dict): continue if str(pref_key).lower() in WILDCARD_KEYS or str(pref_key).lower() == "global": continue raw_chan = squad_prefs.get("Leave") or squad_prefs.get("Points") if not raw_chan: continue # Respect the Points tier cap — identical gate used by execute_points_alarm_task. if tier_enforcement_active() and pref_key not in allowed_points: continue try: channel_id = int(raw_chan.strip("<#>")) except Exception: continue resolved = await resolve_pref_key(pref_key, squad_prefs) target_name = resolved["long_name"] if resolved and resolved.get("long_name") else pref_key squadron_channels.setdefault(target_name, []).append((guild_id, channel_id)) if not squadron_channels: return # Step 2: Fetch all snapshots in parallel squadron_names = list(squadron_channels.keys()) # Semaphore to limit concurrent API calls (avoid overwhelming the API) api_semaphore = asyncio.Semaphore(5) async def fetch_squadron_snapshots(sq_name): """Fetch old and new snapshots for a squadron, rate-limited by semaphore.""" async with api_semaphore: old_snap = await load_snapshot(sq_name, "GLOBAL") new_snap = await take_snapshot(sq_name, "GLOBAL") return sq_name, old_snap, new_snap # Fetch all snapshots concurrently snapshot_results = await asyncio.gather( *[fetch_squadron_snapshots(sq) for sq in squadron_names], return_exceptions=True ) # Step 3: Process snapshots and collect all notifications all_notifications = [] # List of (channel, embed, metadata) tuples squadrons_to_save = [] # List of (squadron_name, new_snap) tuples for result in snapshot_results: # Check if result is an exception (including BaseException) if isinstance(result, BaseException): logging.error(f"(LEAVE) Error fetching snapshot: {result}") continue squadron_name, old_snap, new_snap = result guild_channels = squadron_channels[squadron_name] # Validate new snapshot if not new_snap or not new_snap.get("members"): continue # First run - save and skip if not old_snap or not old_snap.get("members"): squadrons_to_save.append((squadron_name, new_snap)) continue # Detect leavers old_members = old_snap.get("members", {}) new_members = new_snap.get("members", {}) leavers = { uid: info.get("points", 0) for uid, info in old_members.items() if uid not in new_members and info.get("points", 0) > 0 } if not leavers: squadrons_to_save.append((squadron_name, new_snap)) continue # Collect notifications for all leavers for uid, old_pts in leavers.items(): nick = old_members.get(uid, {}).get("nick", uid) for guild_id, channel_id in guild_channels: channel = bot.get_channel(channel_id) if not channel: logging.error("(LEAVE) Channel %s not found in guild %s", channel_id, guild_id) continue # Resolve guild language and create locale-aware embed guild_features_leave = await load_features(guild_id) lang_leave = lang_from_features(guild_features_leave) embed = discord.Embed( title=t(lang_leave, "autolog.leave_title", squadron=esc(squadron_name)), description=t(lang_leave, "autolog.leave_desc", nick=esc(nick), uid=uid, points=old_pts), color=discord.Color.red() ) embed.set_footer(text=DEFAULT_FOOTER_CAT) all_notifications.append((channel, embed, squadron_name, nick, uid, guild_id)) squadrons_to_save.append((squadron_name, new_snap)) # Step 4: Send all notifications in parallel if all_notifications: # Semaphore to limit concurrent Discord sends (avoid rate limiting) discord_semaphore = asyncio.Semaphore(10) async def send_notification(channel, embed, sq_name, nick, uid, guild_id): """Send a leave-notice embed to a channel, rate-limited by semaphore.""" async with discord_semaphore: try: await channel.send(embed=embed) # type: ignore except Exception as e: logging.error("(LEAVE) Error sending leave notice: %s", e) # Send all notifications concurrently notif_results = await asyncio.gather( *[send_notification(ch, em, sq, n, u, g) for ch, em, sq, n, u, g in all_notifications], return_exceptions=True ) for r in notif_results: if isinstance(r, Exception): logging.error(f"[TASK] leave notification sub-task failed: {r}") # Step 5: Save all snapshots for squadron_name, new_snap in squadrons_to_save: await save_snapshot(new_snap, squadron_name, "GLOBAL") async def execute_squadron_stats_tracker(): """ Read all squadrons with clanrating > 0 from squadrons_data table, fetch their current clan points, and store in squadrons_points table. """ # Ensure table exists await init_squadrons_points_table() current_time = int(T.time()) tracked_count = 0 error_count = 0 async with aiosqlite.connect(SQUADRONS_DB_PATH, timeout=30.0) as db: await db.execute("PRAGMA busy_timeout=30000;") # Get all squadrons with clanrating > 0 cursor = await db.execute(""" SELECT clan_id, long_name FROM squadrons_data WHERE clanrating > 0 ORDER BY clanrating DESC """) squadrons = list(await cursor.fetchall()) total_squadrons = len(squadrons) # Track each squadron for clan_id, long_name in squadrons: try: # Fetch current clan points using Game_API clan_data = await obtain_clan_new_points(long_name) if clan_data: members_dict, total_score = clan_data # Store as gzip-compressed JSON blob clan_pts_json = compress_json([members_dict, total_score], ensure_ascii=False) # Insert into database await db.execute(""" INSERT INTO squadrons_points (clan_id, long_name, unix_time, clan_pts, total_score) VALUES (?, ?, ?, ?, ?) """, (clan_id, long_name, current_time, clan_pts_json, total_score)) tracked_count += 1 if tracked_count % 10 == 0: await db.commit() # Commit periodically # Small delay to avoid rate limiting await asyncio.sleep(0.5) except Exception as e: error_count += 1 logging.error(f"Error tracking squadron {long_name}: {e}") continue # Final commit await db.commit() async def execute_update_squadrons_db_task(count: int = 1000): """Refresh squadrons.db by fetching the leaderboard from the Game API. Args: count: Number of top squadrons to fetch and upsert. """ try: await save_squadrons_to_db(count) except Exception as e: logging.error(f"[SQ-DB] Error during squadrons.db update: {e}") async def execute_sync_squadron_members_points_loop(count: int = 1000, delay: float = 3.0): """ Continuously update points for existing squadron members forever. Does NOT add/remove members - only updates points for members already in DB. Very slow, runs indefinitely in the background. """ cycle = 0 while True: cycle += 1 try: async with aiosqlite.connect(SQUADRONS_DB_PATH, timeout=30.0) as db: await db.execute("PRAGMA busy_timeout=30000;") # Only process clans that are currently active in the latest # leaderboard snapshot. Stale rows may still exist in the DB. async with db.execute(""" SELECT clan_id, long_name FROM squadrons_data WHERE clanrating > 0 ORDER BY position ASC LIMIT ? """, (count,)) as cursor: squadrons = list(await cursor.fetchall()) if not squadrons: logging.warning("[SQ-POINTS] No squadrons found, sleeping 5 min...") await asyncio.sleep(300) continue total_squadrons = len(squadrons) updated_count = 0 failed_count = 0 members_updated = 0 for i, (clan_id, long_name) in enumerate(squadrons): try: members_data, total_score = await obtain_clan_new_points(long_name) current_time = int(T.time()) if not members_data: failed_count += 1 else: async with aiosqlite.connect(SQUADRONS_DB_PATH, timeout=30.0) as db: await db.execute("PRAGMA busy_timeout=30000;") # Only update points for existing members (don't insert new ones) for uid, info in members_data.items(): result = await db.execute(""" UPDATE squadron_members SET points = ?, updated_at = ? WHERE clan_id = ? AND uid = ? """, (info.get('points', 0), current_time, clan_id, uid)) members_updated += result.rowcount await db.commit() updated_count += 1 except ClanInfoError as e: logging.info(f"[SQ-POINTS] Skipped {long_name}: {e}") failed_count += 1 except Exception as e: logging.error(f"[SQ-POINTS] Error for {long_name}: {e}") failed_count += 1 # Slow delay between clans (skip after the last one) if i < total_squadrons - 1: await asyncio.sleep(delay) logging.info( f"[SQ-POINTS] Cycle {cycle} done: " f"{updated_count}/{total_squadrons} clans updated, " f"{members_updated} member rows written, " f"{failed_count} failures" ) await record_task_run("squadron_points_loop", True) except Exception as e: logging.error(f"[SQ-POINTS] Cycle {cycle} error: {e}") try: await record_task_run("squadron_points_loop", False, str(e)) except Exception: pass await asyncio.sleep(60) # Wait a bit before retrying async def execute_sync_squadron_members_bulk(count: int = 1000, delay: float = 1.0): """ Sync squadron member rosters using the War Thunder game API. For each tracked squadron in squadrons_data, calls obtain_clan_new_points to get the authoritative member list (including zero-point players), then: - Removes players who left the clan - Adds new members with their current points - Updates nicks and points for existing members """ try: async with aiosqlite.connect(SQUADRONS_DB_PATH, timeout=30.0) as db: await db.execute("PRAGMA busy_timeout=30000;") # Ensure table exists await db.execute(""" CREATE TABLE IF NOT EXISTS squadron_members ( clan_id INTEGER, uid TEXT, nick TEXT, points INTEGER DEFAULT 0, updated_at INTEGER, PRIMARY KEY (clan_id, uid) ) """) await db.execute(""" CREATE INDEX IF NOT EXISTS idx_squadron_members_clan ON squadron_members(clan_id) """) await db.commit() # Only process clans that are currently active in the latest # leaderboard snapshot. Stale rows may still exist in the DB. async with db.execute(""" SELECT clan_id, long_name FROM squadrons_data WHERE clanrating > 0 ORDER BY position ASC LIMIT ? """, (count,)) as cursor: squadrons = list(await cursor.fetchall()) if not squadrons: logging.warning("[SQ-MEMBERS] No squadrons found in squadrons_data") return 0 total_squadrons = len(squadrons) logging.info("[SQ-MEMBERS] Starting sync for %d squadrons", total_squadrons) updated_count = 0 failed_count = 0 total_members_added = 0 total_members_removed = 0 current_time = int(T.time()) for i, (clan_id, long_name) in enumerate(squadrons): try: members_data, _ = await obtain_clan_new_points(long_name) if not members_data: logging.info("[SQ-MEMBERS] No members returned for %s (clan %s), skipping", long_name, clan_id) failed_count += 1 continue #logging.info("[SQ-MEMBERS] Updated %s (clan %s) — %d members", long_name, clan_id, len(members_data)) async with aiosqlite.connect(SQUADRONS_DB_PATH, timeout=30.0) as db: await db.execute("PRAGMA busy_timeout=30000;") # Get current member UIDs from DB async with db.execute( "SELECT uid, points FROM squadron_members WHERE clan_id = ?", (clan_id,) ) as cursor: existing = {row[0]: row[1] for row in await cursor.fetchall()} # Build new member set from game API new_members = {uid: info for uid, info in members_data.items()} # Remove players who left the clan to_remove = set(existing.keys()) - set(new_members.keys()) if to_remove: await db.execute( f"DELETE FROM squadron_members WHERE clan_id = ? AND uid IN ({','.join('?' * len(to_remove))})", (clan_id, *to_remove) ) total_members_removed += len(to_remove) # Upsert all current members with fresh points and nicks new_uids = set(new_members.keys()) - set(existing.keys()) total_members_added += len(new_uids) for uid, info in new_members.items(): await db.execute(""" INSERT INTO squadron_members (clan_id, uid, nick, points, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(clan_id, uid) DO UPDATE SET nick = excluded.nick, points = excluded.points, updated_at = excluded.updated_at """, (clan_id, uid, info.get("nick", ""), info.get("points", 0), current_time)) await db.commit() updated_count += 1 except ClanInfoError as e: logging.warning("[SQ-MEMBERS] Failed to update %s (clan %s): %s", long_name, clan_id, e) failed_count += 1 except Exception as e: logging.error("[SQ-MEMBERS] Failed to update %s (clan %s): %s", long_name, clan_id, e) failed_count += 1 # Throttle between clans if i < total_squadrons - 1: await asyncio.sleep(delay) logging.info( "[SQ-MEMBERS] Sync done: %d/%d clans updated, +%d members, -%d members, %d failures", updated_count, total_squadrons, total_members_added, total_members_removed, failed_count ) return updated_count except Exception as e: logging.error("[SQ-MEMBERS] Sync error: %s", e) return 0 async def execute_cleanup_stale_squadrons() -> int: """Remove squadrons from squadrons.db that have never played SQB. A squadron is stale if none of its members (by clan long_name matching squadron_name in player_games_hist) have any recorded game data. Returns the number of squadrons removed. """ try: # Step 1: Get all squadron_names that have game data active_names: set[str] = set() async with aiosqlite.connect( f"file:{SQ_BATTLES_DB_PATH}?mode=ro", uri=True ) as battles_db: cursor = await battles_db.execute( "SELECT DISTINCT squadron_name FROM player_games_hist " "WHERE squadron_name IS NOT NULL AND squadron_name != 'UNKNOWN'" ) rows = await cursor.fetchall() active_names = {row[0] for row in rows} logging.info( "[SQ-CLEANUP] %d squadron names have game data", len(active_names) ) # Step 2: Find and delete stale squadrons async with aiosqlite.connect(SQUADRONS_DB_PATH, timeout=30.0) as db: cursor = await db.execute( "SELECT clan_id, long_name, short_name, tag_name " "FROM squadrons_data" ) all_squads = list(await cursor.fetchall()) stale_clan_ids: list[int] = [] for clan_id, long_name, short_name, tag_name in all_squads: # A squadron is active if any of its names appear in game data names_to_check = { n for n in (long_name, short_name, tag_name) if n } if not names_to_check & active_names: stale_clan_ids.append(clan_id) if not stale_clan_ids: logging.info("[SQ-CLEANUP] No stale squadrons found") return 0 logging.info( "[SQ-CLEANUP] Removing %d stale squadrons (of %d total)", len(stale_clan_ids), len(all_squads), ) # Batch delete in chunks of 500 for i in range(0, len(stale_clan_ids), 500): chunk = stale_clan_ids[i : i + 500] placeholders = ",".join("?" * len(chunk)) await db.execute( f"DELETE FROM squadron_members WHERE clan_id IN ({placeholders})", chunk, ) await db.execute( f"DELETE FROM squadrons_points WHERE clan_id IN ({placeholders})", chunk, ) await db.execute( f"DELETE FROM squadrons_data WHERE clan_id IN ({placeholders})", chunk, ) await db.commit() logging.info( "[SQ-CLEANUP] Purged %d stale squadrons", len(stale_clan_ids) ) return len(stale_clan_ids) except Exception as e: logging.error("[SQ-CLEANUP] Cleanup error: %s", e) return 0 # ============================================================================ # WEEKLY BR REPORT # ============================================================================ async def _load_squadron_name_lookup() -> Dict[str, Dict[str, Any]]: """Build a name->squadron_data map keyed by long_name, tag_name, short_name. Used to resolve `squadron_name` strings from player_games_hist back to the canonical (clan_id, tag_name, long_name, short_name) for display. """ out: Dict[str, Dict[str, Any]] = {} try: async with aiosqlite.connect( f"file:{SQUADRONS_DB_PATH}?mode=ro", uri=True, timeout=30.0 ) as db: await db.execute("PRAGMA busy_timeout=30000;") async with db.execute( "SELECT clan_id, long_name, tag_name, short_name FROM squadrons_data" ) as cur: async for clan_id, long_name, tag_name, short_name in cur: info = { "clan_id": int(clan_id) if clan_id is not None else None, "long_name": str(long_name or ""), "tag_name": str(tag_name or ""), "short_name": str(short_name or ""), } for n in (long_name, tag_name, short_name): if n: out.setdefault(str(n), info) except Exception as e: logging.error("(WBR) Failed to load squadron name lookup: %s", e) return out def _br_window_already_fired(window_start: int) -> bool: """Idempotency: returns True if the marker file already records this window.""" try: if not WEEKLY_BR_MARKER_PATH.exists(): return False with open(WEEKLY_BR_MARKER_PATH, "r", encoding="utf-8") as fp: data = json.load(fp) return int(data.get("br_window_start") or 0) == int(window_start) except Exception: return False def _mark_br_window_fired(window: Dict[str, Any]) -> None: try: payload = { "br_window_start": int(window["start"]), "br_window_end": int(window["end"]), "max_br": float(window["max_br"]), "fired_at": int(T.time()), } with open(WEEKLY_BR_MARKER_PATH, "w", encoding="utf-8") as fp: json.dump(payload, fp) except Exception as e: logging.error("(WBR) Failed to write marker: %s", e) def _strip_tag_decoration(tag: str) -> str: """Strip the decorative glyph chars surrounding a WT squadron tag. Tags in WT often appear as `┤DCRSS├` or `[CLAN]` -- one decoration char on each side. Slice both off only if neither end is alphanumeric, so plain short tags like `AVR` are left alone. """ if len(tag) >= 3 and not tag[0].isalnum() and not tag[-1].isalnum(): return tag[1:-1] return tag _TROPHY = "\U0001F3C6" def _format_player_line_short( rank: int, nick: str, score: float, games: int, is_top: bool ) -> str: """Render one player row in the wildcard roster. Bold-rank prefix (`**1.**`) dodges Discord's ordered-list detection so items 2-5 don't render as continuation lines under item 1. """ safe_nick = discord.utils.escape_markdown(nick) line = f"**{rank}.** {safe_nick} • {score:.2f} • {games}g" if is_top: line += f" {_TROPHY}" return line def _format_player_line_full( rank: int, nick: str, score: float, games: int, kdr: float, is_top: bool ) -> str: """Render one player row in the per-squadron roster (top-15 listing).""" safe_nick = discord.utils.escape_markdown(nick) line = f"**{rank}.** {safe_nick} • {score:.2f} • {games}g • K/D {kdr:.2f}" if is_top: line += f" {_TROPHY}" return line def _build_wildcard_embeds( window: Dict[str, Any], payload: List[Dict[str, Any]], name_lookup: Dict[str, Dict[str, Any]], lang: str, ) -> List[discord.Embed]: """Build the two wildcard-mode embeds (ranks 1-10, 11-20).""" color = discord.Color(br_color_int(float(window["max_br"]))) title = t(lang, "weekly_br.title_wildcard", br=window["max_br"]) window_line = t( lang, "weekly_br.window_label", start=f"", end=f"", ) # The "highest ELO person for the entire week" -- single trophy on the # top-scoring player across all squadrons in the payload. top_uid: Optional[str] = None top_score = -1.0 for sq in payload: for p in sq.get("top_players") or []: score = float(p.get("score") or 0.0) if score > top_score: top_score = score top_uid = str(p.get("uid") or "") def _format_squadron_field(rank: int, sq: Dict[str, Any]) -> tuple[str, str]: sq_name = str(sq.get("squadron_name") or "") info = name_lookup.get(sq_name) or {} raw_tag = info.get("tag_name") or info.get("short_name") or sq_name tag = _strip_tag_decoration(raw_tag) long_name = info.get("long_name") or sq_name score = float(sq.get("score") or 0.0) games = int(sq.get("games") or 0) kdr = float(sq.get("kdr") or 0.0) wr = float(sq.get("win_rate") or 0.0) safe_tag = discord.utils.escape_markdown(tag) safe_long = discord.utils.escape_markdown(long_name) field_name = f"{rank}. [{safe_tag}] {safe_long} - {score:.2f}" stats_line = t( lang, "weekly_br.squadron_stats_line", games=games, kdr=f"{kdr:.2f}", wr=f"{wr:.0f}", ) roster = sq.get("top_players") or [] roster_lines: List[str] = [] for i, p in enumerate(roster, start=1): nick = str(p.get("nick") or "?") p_score = float(p.get("score") or 0.0) p_games = int(p.get("games") or 0) is_top = top_uid is not None and str(p.get("uid") or "") == top_uid roster_lines.append( _format_player_line_short(i, nick, p_score, p_games, is_top) ) if roster_lines: value = stats_line + "\n" + "\n".join(roster_lines) else: value = stats_line return field_name, value half_n = (len(payload) + 1) // 2 # split point (eg. 20 -> 10) first, second = payload[:half_n], payload[half_n:] embeds: List[discord.Embed] = [] if first: e1 = discord.Embed( title=title, description=window_line + "\n" + t( lang, "weekly_br.wildcard_desc_first", count=len(payload), low=1, high=len(first), ), color=color, ) for i, sq in enumerate(first, start=1): name, value = _format_squadron_field(i, sq) e1.add_field(name=name, value=value, inline=False) e1.set_footer(text=DEFAULT_FOOTER_CAT) embeds.append(e1) if second: e2 = discord.Embed( title=title, description=t( lang, "weekly_br.wildcard_desc_second", count=len(payload), low=len(first) + 1, high=len(payload), ), color=color, ) for i, sq in enumerate(second, start=len(first) + 1): name, value = _format_squadron_field(i, sq) e2.add_field(name=name, value=value, inline=False) e2.set_footer(text=DEFAULT_FOOTER_CAT) embeds.append(e2) return embeds def _build_squadron_embed( window: Dict[str, Any], sq_info: Dict[str, Any], sq_row: Optional[Dict[str, Any]], roster: List[Dict[str, Any]], lang: str, ) -> discord.Embed: """Build the per-squadron embed (top-K players for one squadron).""" color = discord.Color(br_color_int(float(window["max_br"]))) raw_tag = sq_info.get("tag_name") or sq_info.get("short_name") or "?" tag = _strip_tag_decoration(raw_tag) long_name = sq_info.get("long_name") or sq_info.get("short_name") or "?" safe_tag = discord.utils.escape_markdown(tag) safe_long = discord.utils.escape_markdown(long_name) title = t( lang, "weekly_br.title_squadron", tag=safe_tag, long=safe_long, br=window["max_br"], ) window_line = t( lang, "weekly_br.window_label", start=f"", end=f"", ) if sq_row is None and not roster: embed = discord.Embed( title=title, description=window_line + "\n\n" + t(lang, "weekly_br.no_data", tag=safe_tag), color=color, ) embed.set_footer(text=DEFAULT_FOOTER_CAT) return embed if sq_row is not None: sq_score = float(sq_row.get("score") or 0.0) sq_games = int(sq_row.get("games") or 0) sq_kdr = float(sq_row.get("kdr") or 0.0) sq_wr = float(sq_row.get("win_rate") or 0.0) header = t( lang, "weekly_br.squadron_header_line", score=f"{sq_score:.2f}", games=sq_games, kdr=f"{sq_kdr:.2f}", wr=f"{sq_wr:.0f}", ) else: header = t(lang, "weekly_br.squadron_header_no_aggregate") parts = [window_line, header] roster_lines: List[str] = [] for i, p in enumerate(roster, start=1): nick = str(p.get("nick") or "?") p_score = float(p.get("score") or 0.0) p_games = int(p.get("games") or 0) p_kdr = float(p.get("kdr") or 0.0) roster_lines.append( _format_player_line_full(i, nick, p_score, p_games, p_kdr, is_top=(i == 1)) ) if roster_lines: parts.append("") # blank line before roster parts.append("\n".join(roster_lines)) embed = discord.Embed(title=title, description="\n".join(parts), color=color) embed.set_footer(text=DEFAULT_FOOTER_CAT) return embed async def execute_weekly_br_report_task(window: Dict[str, Any]) -> None: """Send the Weekly BR Report for a just-closed BR window to all subscribers. `window` is a SCHEDULE.json entry (max_br, start, end). The function: - is idempotent per `start` via a marker file - precomputes the wildcard payload (top 20 squadrons + top 5 players each) once and reuses it across all guilds - iterates each guild's preferences for `everything.WeeklyBR` (wildcard) and `.WeeklyBR` (per-squadron) channel slots - dedupes per channel id within each guild (wildcard wins on collision) - tolerates per-channel send failures """ bot = get_bot() start_ts = int(window["start"]) end_ts = int(window["end"]) if _br_window_already_fired(start_ts): logging.info("(WBR) Skip — marker says window %s already fired", start_ts) return # 1. Hit the heavy ELO pipeline ONCE per fire. The maps are reused below for # both the wildcard payload and every per-squadron variant, so 30 guilds # subscribing don't trigger 30 full ELO recomputes. sq_map = await _wbr_squadron_scores(start_ts, end_ts) pl_map = await _wbr_player_scores(start_ts, end_ts) wildcard_payload = top_n_squadrons_with_top_k_players_from_maps( sq_map, pl_map, n=20, k=5 ) name_lookup = await _load_squadron_name_lookup() if not wildcard_payload: logging.warning( "(WBR) No squadron activity in window %s..%s — skipping all sends", start_ts, end_ts, ) _mark_br_window_fired(window) return logging.info( "(WBR) Window %.1f BR [%s..%s] — %d squadrons, %d players in cached maps", float(window["max_br"]), start_ts, end_ts, len(sq_map), len(pl_map), ) prefs_dir = STORAGE_DIR / "PREFERENCES" sent_guilds = 0 sent_channels = 0 for guild in bot.guilds: guild_id = guild.id guild_name = guild.name pref_path = prefs_dir / f"{guild_id}-preferences.json" try: async with aiofiles.open(pref_path, "r", encoding="utf-8") as fp: prefs = json.loads(await fp.read()) except FileNotFoundError: continue except Exception as e: logging.error("(WBR) Error loading prefs for guild %s: %s", guild_id, e) continue # Collect channels: wildcard + per-squadron, deduped (wildcard wins). wildcard_channel_id: Optional[int] = None wildcard_pref_key: Optional[str] = None squadron_channels: List[Tuple[int, str, Dict[str, Any]]] = [] # (channel_id, pref_key, sq_info) for pref_key, entry in prefs.items(): if not isinstance(entry, dict): continue chan_str = entry.get("WeeklyBR") if not chan_str: continue channel_id = parse_channel_id(str(chan_str)) if channel_id is None: logging.warning( "(WBR) Invalid channel '%s' for key %s in %s", chan_str, pref_key, guild_name, ) await remove_guild_pref_notification(guild_id, pref_key, "WeeklyBR", preferences=prefs) continue if pref_key.lower() in WILDCARD_KEYS: wildcard_channel_id = channel_id wildcard_pref_key = pref_key continue # Per-squadron entry — resolve to name variants resolved = await resolve_pref_key(pref_key, entry) if resolved: long_name = resolved.get("long_name") or entry.get("Long") or "" tag_name = resolved.get("tag_name") or "" short_name = resolved.get("short_name") or entry.get("Short") or "" clan_id = resolved.get("clan_id") else: long_name = entry.get("Long") or "" tag_name = "" short_name = entry.get("Short") or "" clan_id = None squadron_channels.append( ( channel_id, pref_key, { "long_name": str(long_name or ""), "tag_name": str(tag_name or ""), "short_name": str(short_name or ""), "clan_id": clan_id, }, ) ) if wildcard_channel_id is None and not squadron_channels: continue # Dedupe: drop per-squadron entries that target the same channel as wildcard. if wildcard_channel_id is not None: squadron_channels = [ (cid, pref_key, info) for cid, pref_key, info in squadron_channels if cid != wildcard_channel_id ] # Resolve guild language once per guild guild_features = await load_features(guild_id) lang = lang_from_features(guild_features) guild_did_send = False # Wildcard send if wildcard_channel_id is not None: channel = bot.get_channel(wildcard_channel_id) if channel is None: try: channel = await bot.fetch_channel(wildcard_channel_id) except discord.NotFound: if wildcard_pref_key: await remove_guild_pref_notification( guild_id, wildcard_pref_key, "WeeklyBR", preferences=prefs ) channel = None except discord.Forbidden: channel = None if channel is None: logging.warning( "(WBR) Wildcard channel %s missing in guild %s", wildcard_channel_id, guild_name, ) elif not isinstance(channel, (TextChannel, Thread)): logging.warning( "(WBR) Wildcard channel %s is not text-capable in guild %s", wildcard_channel_id, guild_name, ) else: try: embeds = _build_wildcard_embeds( window, wildcard_payload, name_lookup, lang ) if embeds: await channel.send(embeds=embeds) sent_channels += 1 guild_did_send = True except discord.Forbidden: logging.warning( "(WBR) Forbidden sending wildcard report to %s in %s", wildcard_channel_id, guild_name, ) except discord.NotFound: if wildcard_pref_key: await remove_guild_pref_notification( guild_id, wildcard_pref_key, "WeeklyBR", preferences=prefs ) logging.warning( "(WBR) Wildcard channel %s not found in %s", wildcard_channel_id, guild_name, ) except Exception as e: logging.error( "(WBR) Wildcard send failed in %s: %s", guild_name, e ) # Per-squadron sends for channel_id, pref_key, sq_info in squadron_channels: channel = bot.get_channel(channel_id) if channel is None: try: channel = await bot.fetch_channel(channel_id) except discord.NotFound: await remove_guild_pref_notification(guild_id, pref_key, "WeeklyBR", preferences=prefs) channel = None except discord.Forbidden: channel = None if channel is None: logging.warning( "(WBR) Squadron channel %s missing in guild %s", channel_id, guild_name, ) continue if not isinstance(channel, (TextChannel, Thread)): logging.warning( "(WBR) Squadron channel %s not text-capable in guild %s", channel_id, guild_name, ) continue variants = [ sq_info.get("long_name") or "", sq_info.get("tag_name") or "", sq_info.get("short_name") or "", ] try: sq_row, roster = squadron_report_for_variants_from_maps( sq_map, pl_map, variants, k=15 ) embed = _build_squadron_embed(window, sq_info, sq_row, roster, lang) await channel.send(embed=embed) sent_channels += 1 guild_did_send = True except discord.Forbidden: logging.warning( "(WBR) Forbidden sending squadron report to %s in %s", channel_id, guild_name, ) except discord.NotFound: await remove_guild_pref_notification(guild_id, pref_key, "WeeklyBR", preferences=prefs) logging.warning( "(WBR) Squadron channel %s not found in %s", channel_id, guild_name, ) except Exception as e: logging.error( "(WBR) Squadron send failed in %s: %s", guild_name, e ) if guild_did_send: sent_guilds += 1 logging.info( "(WBR) Dispatched window %s..%s to %d guilds across %d channels", start_ts, end_ts, sent_guilds, sent_channels, ) _mark_br_window_fired(window)