Files

728 lines
23 KiB
Python

"""
tasks.py
Scheduler definitions using discord.ext.tasks loops.
Thin wrappers that invoke the corresponding execute_* functions from task_executors.py
on configured intervals.
"""
# Standard Library Imports
import asyncio
import json
import logging
import os
import shutil
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional
# Third-Party Library Imports
import aiofiles
import aiohttp
from discord.ext import tasks
# Local Module Imports
from . import lux_apis
from .autologging import handle_ws_replays
from .health import get_recent_ttl_stats, record_task_run, write_heartbeat
from .meta_manager import process_all_players, sync_all_guild_metas
from .task_executors import (
execute_ldb_alarm_task,
execute_points_alarm_task,
execute_leave_alarm_task,
execute_squadron_stats_tracker,
execute_update_squadrons_db_task,
execute_sync_squadron_members_bulk,
execute_sync_squadron_members_points_loop,
execute_cleanup_stale_squadrons,
execute_weekly_br_report_task,
init_squadrons_points_table,
cleanup_replays,
)
from .utils import (
get_bot,
STORAGE_DIR,
STACKS_DIR,
refresh_entitled_guilds,
SQB_STATS_TRACKER_WINDOWS,
SQB_BOUNDARY_TIMES,
)
async def _record(task_name: str, success: bool, error: str = ""):
"""Record task execution for health dashboard."""
await record_task_run(task_name, success, error)
def _format_duration(seconds: int | None) -> str:
if seconds is None:
return "No data"
minutes, rem = divmod(int(seconds), 60)
return f"{minutes}m {rem:02d}s"
def _build_ttl_monitor_embed(
*,
title: str,
description: str,
stats: dict,
color: int,
) -> dict:
last_received_ts = stats.get("last_received_ts")
if last_received_ts:
last_received = f"<t:{int(last_received_ts)}:R> (<t:{int(last_received_ts)}:T>)"
else:
last_received = "No recent games"
return {
"title": title,
"description": description,
"color": color,
"fields": [
{
"name": "Average TTL",
"value": _format_duration(stats.get("avg_delay")),
"inline": True,
},
{
"name": "Min / Max",
"value": f"{_format_duration(stats.get('min_delay'))} / {_format_duration(stats.get('max_delay'))}",
"inline": True,
},
{
"name": "Sample",
"value": f"{int(stats.get('sample_size') or 0)} games",
"inline": True,
},
{
"name": "Last received",
"value": last_received,
"inline": False,
},
],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
async def _send_ttl_alert_webhook(embed: dict) -> bool:
webhook_url = os.environ.get("SREBOT_TTL_ALERT_WEBHOOK_URL", "").strip()
if not webhook_url:
logging.warning("[TTL-ALERT] SREBOT_TTL_ALERT_WEBHOOK_URL is not configured.")
return False
payload = {
"username": "SREBOT Monitor",
"embeds": [embed],
"allowed_mentions": {"parse": []},
}
timeout = aiohttp.ClientTimeout(total=15)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(webhook_url, json=payload) as response:
if 200 <= response.status < 300:
return True
body = await response.text()
logging.error("[TTL-ALERT] Webhook failed with HTTP %s: %s", response.status, body[:500])
return False
except Exception as e:
logging.error("[TTL-ALERT] Webhook request failed: %s", e)
return False
_TTL_ALERT_HIGH_THRESHOLD_SECONDS = 20 * 60
_TTL_ALERT_RECOVERY_THRESHOLD_SECONDS = 10 * 60
_TTL_ALERT_HOLD_SECONDS = 10 * 60
_TTL_ALERT_STATE_PATH = STORAGE_DIR / "ttl_alert_state.json"
_ttl_alert_high_since: Optional[float] = None
_ttl_alert_active = False
_ttl_alert_state_loaded = False
async def _load_ttl_alert_state() -> None:
global _ttl_alert_high_since, _ttl_alert_active, _ttl_alert_state_loaded
if _ttl_alert_state_loaded:
return
_ttl_alert_state_loaded = True
try:
if not _TTL_ALERT_STATE_PATH.exists():
return
async with aiofiles.open(_TTL_ALERT_STATE_PATH, "r", encoding="utf-8") as f:
state = json.loads(await f.read())
_ttl_alert_active = bool(state.get("active", False))
high_since = state.get("high_since")
_ttl_alert_high_since = float(high_since) if high_since is not None else None
except Exception as e:
logging.warning("[TTL-ALERT] Failed to load alert state: %s", e)
async def _save_ttl_alert_state() -> None:
try:
_TTL_ALERT_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(_TTL_ALERT_STATE_PATH, "w", encoding="utf-8") as f:
await f.write(json.dumps({
"active": _ttl_alert_active,
"high_since": _ttl_alert_high_since,
"updated_at": time.time(),
}, indent=2))
except Exception as e:
logging.warning("[TTL-ALERT] Failed to save alert state: %s", e)
async def _execute_ttl_alert_check() -> None:
global _ttl_alert_high_since, _ttl_alert_active
await _load_ttl_alert_state()
stats = await get_recent_ttl_stats(limit=30)
avg_delay = stats["avg_delay"]
sample_size = stats["sample_size"]
if avg_delay is None:
_ttl_alert_high_since = None
await _save_ttl_alert_state()
logging.info("[TTL-ALERT] No TTL data available.")
return
now = time.time()
if not _ttl_alert_active:
if avg_delay > _TTL_ALERT_HIGH_THRESHOLD_SECONDS:
if _ttl_alert_high_since is None:
_ttl_alert_high_since = now
await _save_ttl_alert_state()
logging.warning(
"[TTL-ALERT] Avg TTL above threshold: %s across %s games.",
_format_duration(avg_delay),
sample_size,
)
return
if now - _ttl_alert_high_since >= _TTL_ALERT_HOLD_SECONDS:
embed = _build_ttl_monitor_embed(
title="SREBOT status: TTL degradation active",
description=(
"Average TTL has stayed above "
f"{_format_duration(_TTL_ALERT_HIGH_THRESHOLD_SECONDS)} for at least "
f"{_format_duration(_TTL_ALERT_HOLD_SECONDS)}. "
"Historical game messages may be delayed or out of date."
),
stats=stats,
color=0xF59E0B,
)
sent = await _send_ttl_alert_webhook(embed)
if sent:
_ttl_alert_active = True
await _save_ttl_alert_state()
logging.warning("[TTL-ALERT] High TTL webhook sent.")
return
if _ttl_alert_high_since is not None:
_ttl_alert_high_since = None
await _save_ttl_alert_state()
return
if avg_delay < _TTL_ALERT_RECOVERY_THRESHOLD_SECONDS:
embed = _build_ttl_monitor_embed(
title="SREBOT status: all services normal",
description="No sustained TTL degradation is currently active.",
stats=stats,
color=0x22C55E,
)
sent = await _send_ttl_alert_webhook(embed)
if sent:
_ttl_alert_active = False
_ttl_alert_high_since = None
await _save_ttl_alert_state()
logging.info("[TTL-ALERT] Recovery webhook sent.")
# ============================================================================
# TTL ALERT TASK
# ============================================================================
@tasks.loop(minutes=5)
async def ttl_alert_task():
"""Alert when recent game receive TTL is elevated for a sustained period."""
try:
await _execute_ttl_alert_check()
await _record("ttl_alert", True)
except Exception as e:
await _record("ttl_alert", False, str(e))
logging.error("[TTL-ALERT] Failed to check TTL alert state: %s", e)
@ttl_alert_task.before_loop
async def before_ttl_alert_task():
await get_bot().wait_until_ready()
# ============================================================================
# LEADERBOARD ALARM TASK
# ============================================================================
@tasks.loop(seconds=60)
async def ldb_alarm_task():
"""Checks time and triggers leaderboard alarm at 22:35 and 7:35 UTC."""
now_utc = datetime.now(timezone.utc).time()
if (now_utc.hour, now_utc.minute) in [(22, 35), (7, 35)]:
try:
await execute_ldb_alarm_task()
await _record("ldb_alarm", True)
except Exception as e:
await _record("ldb_alarm", False, str(e))
raise
@ldb_alarm_task.before_loop
async def before_ldb_alarm_task():
await get_bot().wait_until_ready()
# ============================================================================
# SQUADRON STATS TRACKER TASKS
# ============================================================================
@tasks.loop(hours=1)
async def squadron_stats_tracker_task():
"""Track squadron points every hour during SQB timeslot hours."""
now_utc = datetime.now(timezone.utc).time()
if any(start <= now_utc <= end for _, start, end in SQB_STATS_TRACKER_WINDOWS):
try:
await execute_squadron_stats_tracker()
await _record("squadron_stats_tracker", True)
except Exception as e:
await _record("squadron_stats_tracker", False, str(e))
raise
@tasks.loop(seconds=60)
async def squadron_stats_boundary_task():
"""Track squadron points at timeslot boundaries (see SQB_BOUNDARY_TIMES in utils)."""
now_utc = datetime.now(timezone.utc)
if now_utc.time().replace(second=0, microsecond=0) in SQB_BOUNDARY_TIMES:
try:
await execute_squadron_stats_tracker()
await _record("squadron_stats_boundary", True)
except Exception as e:
await _record("squadron_stats_boundary", False, str(e))
raise
@squadron_stats_tracker_task.before_loop
async def before_squadron_stats_tracker_task():
await get_bot().wait_until_ready()
await init_squadrons_points_table()
@squadron_stats_boundary_task.before_loop
async def before_squadron_stats_boundary_task():
await get_bot().wait_until_ready()
# ============================================================================
# ENTITLEMENT CACHE REFRESH (hourly fallback)
# ============================================================================
@tasks.loop(hours=1)
async def entitlement_cache_task():
"""Refresh the entitlement cache hourly so new subscriptions are picked up
even when no autolog batches or points alarms are running."""
try:
await refresh_entitled_guilds(force=True)
await _record("entitlement_cache", True)
except Exception as e:
await _record("entitlement_cache", False, str(e))
raise
@entitlement_cache_task.before_loop
async def before_entitlement_cache():
await get_bot().wait_until_ready()
# ============================================================================
# POINTS ALARM TASK
# ============================================================================
@tasks.loop(seconds=60)
async def points_alarm_task():
"""Checks time and triggers points alarm at 22:25 and 7:25 UTC."""
now_utc = datetime.now(timezone.utc).time()
region = None
if now_utc.hour == 22 and now_utc.minute == 25:
region = "EU"
elif now_utc.hour == 7 and now_utc.minute == 25:
region = "NA"
if region:
try:
await execute_points_alarm_task(region)
await _record("points_alarm", True)
except Exception as e:
await _record("points_alarm", False, str(e))
raise
@points_alarm_task.before_loop
async def before_points_alarm_task():
await get_bot().wait_until_ready()
# ============================================================================
# REPLAY CLEANUP TASK
# ============================================================================
@tasks.loop(hours=4)
async def replay_cleanup_task():
"""Cleans up replay directories every 4 hours."""
try:
await cleanup_replays()
await _record("replay_cleanup", True)
except Exception as e:
await _record("replay_cleanup", False, str(e))
raise
@replay_cleanup_task.before_loop
async def before_replay_cleanup_task():
await get_bot().wait_until_ready()
# ============================================================================
# LEAVE ALARM TASK
# ============================================================================
@tasks.loop(minutes=30)
async def leave_alarm_task():
"""Runs leave alarm every 30 minutes."""
try:
await execute_leave_alarm_task()
await _record("leave_alarm", True)
except Exception as e:
await _record("leave_alarm", False, str(e))
raise
@leave_alarm_task.before_loop
async def before_leave_alarm_task():
await get_bot().wait_until_ready()
# ============================================================================
# UPDATE SQUADRONS DB TASK
# ============================================================================
_startup_db_done = False
@tasks.loop(hours=1)
async def update_squadrons_db_task():
"""
Every 1 hour:
1. Refresh the main squadrons.db via Game API.
2. Sync squadron member UIDs for top 1000 squadrons (bulk API).
3. Update the Bot's status
"""
global _startup_db_done
if not _startup_db_done:
_startup_db_done = True
return # Skip — already ran in _startup_heavy_init
try:
await execute_update_squadrons_db_task(count=1000)
await execute_sync_squadron_members_bulk(count=1000)
await execute_cleanup_stale_squadrons()
await _record("update_squadrons_db", True)
except Exception as e:
await _record("update_squadrons_db", False, str(e))
raise
@update_squadrons_db_task.before_loop
async def before_update_squadrons_db_task():
await get_bot().wait_until_ready()
# ============================================================================
# UPDATE META DATA TASK
# ============================================================================
@tasks.loop(hours=12)
async def update_meta_data_task():
"""
Every 12 hours:
Update all player vehicle data in Meta.db from the game API.
Skips players that were updated within 2 days.
"""
try:
await process_all_players(
limit=None,
skip_existing=True,
batch_size=50,
max_concurrent=5
)
await _record("update_meta_data", True)
except Exception as e:
await _record("update_meta_data", False, str(e))
logging.error(f"[META-DB] Error during meta data update: {e}")
@update_meta_data_task.before_loop
async def before_update_meta_data_task():
await get_bot().wait_until_ready()
# ============================================================================
# WEBSOCKET AUTOLOG TASK
# ============================================================================
@tasks.loop(count=1)
async def ws_autolog_task():
"""
Single-run task that maintains persistent WebSocket connection.
Replaces the polling-based auto_logging_task.
"""
await lux_apis.ws_replay_listener(handle_ws_replays)
@ws_autolog_task.before_loop
async def before_ws_autolog():
await get_bot().wait_until_ready()
@ws_autolog_task.after_loop
async def after_ws_autolog():
if ws_autolog_task.failed():
logging.error("[WS] ws_autolog_task died, restarting in 10s...")
await asyncio.sleep(10)
ws_autolog_task.start()
# ============================================================================
# SQUADRON POINTS CONTINUOUS UPDATER
# ============================================================================
@tasks.loop(count=1)
async def squadron_points_loop_task():
"""
Continuously update squadron member points forever.
Very slow (3s per clan), runs indefinitely in the background.
"""
await execute_sync_squadron_members_points_loop(count=1000, delay=3.0)
@squadron_points_loop_task.before_loop
async def before_squadron_points_loop():
await get_bot().wait_until_ready()
@squadron_points_loop_task.after_loop
async def after_squadron_points_loop():
if squadron_points_loop_task.failed():
logging.error("[SQ-POINTS] squadron_points_loop_task died, restarting in 10s...")
await asyncio.sleep(10)
squadron_points_loop_task.start()
# ============================================================================
# STACKS CLEANUP TASK
# ============================================================================
# Fires at 07:30 and 22:30 UTC — 20 minutes after each SQB timeslot ends.
_STACKS_CLEANUP_TIMES = [(7, 30), (22, 30)]
@tasks.loop(seconds=60)
async def stacks_cleanup_task():
"""Wipe all active stacks at the end of each SQB timeslot."""
now_utc = datetime.now(timezone.utc)
if (now_utc.hour, now_utc.minute) in _STACKS_CLEANUP_TIMES:
try:
if STACKS_DIR.exists():
shutil.rmtree(STACKS_DIR)
STACKS_DIR.mkdir(parents=True, exist_ok=True)
logging.info("[STACKS] Wiped STACKS_DIR at timeslot end.")
await _record("stacks_cleanup", True)
except Exception as e:
await _record("stacks_cleanup", False, str(e))
logging.error(f"[STACKS] Failed to wipe STACKS_DIR: {e}")
@stacks_cleanup_task.before_loop
async def before_stacks_cleanup_task():
await get_bot().wait_until_ready()
# ============================================================================
# HEALTH HEARTBEAT TASK
# ============================================================================
@tasks.loop(seconds=30)
async def health_heartbeat_task():
"""Write bot health snapshot every 30 seconds."""
await write_heartbeat()
@health_heartbeat_task.before_loop
async def before_health_heartbeat_task():
await get_bot().wait_until_ready()
# ============================================================================
# GUILD META MEMBER SYNC TASK
# ============================================================================
@tasks.loop(hours=24)
async def sync_guild_metas_task():
"""
Every 24 hours: for each configured guild, fetch the current squadron roster
from the Game API and reconcile Guild_Metas — adding new members and removing
players who have left the squadron.
"""
try:
await sync_all_guild_metas()
await _record("sync_guild_metas", True)
except Exception as e:
await _record("sync_guild_metas", False, str(e))
logging.error(f"[META-SYNC] Error during daily guild meta sync: {e}")
@sync_guild_metas_task.before_loop
async def before_sync_guild_metas_task():
await get_bot().wait_until_ready()
# ============================================================================
# WEEKLY BR REPORT TASK
# ============================================================================
# Fires ~10 min after each BR window ends (10 * 60 = 600s).
_WEEKLY_BR_FIRE_OFFSET_S = 600
_WEEKLY_BR_FIRE_TOLERANCE_S = 60 # tolerance window for the 60s loop tick
_SCHEDULE_PATH = Path(__file__).parent / "SCHEDULE.json"
def _just_ended_br_window(now_ts: int) -> Optional[Dict[str, Any]]:
"""Return the SCHEDULE.json entry whose end is `_WEEKLY_BR_FIRE_OFFSET_S`
seconds in the past (within `_WEEKLY_BR_FIRE_TOLERANCE_S`), or None.
"""
try:
with open(_SCHEDULE_PATH, "r", encoding="utf-8") as fp:
schedule = json.load(fp)
except Exception as e:
logging.error("[WBR] Failed to read SCHEDULE.json: %s", e)
return None
target = now_ts - _WEEKLY_BR_FIRE_OFFSET_S
for entry in schedule:
try:
end_ts = int(entry["end"])
except (KeyError, TypeError, ValueError):
continue
if target - _WEEKLY_BR_FIRE_TOLERANCE_S <= end_ts <= target:
return entry
return None
@tasks.loop(seconds=60)
async def weekly_br_report_task():
"""Fire the Weekly BR Report ~10 min after each BR window ends."""
now_ts = int(datetime.now(timezone.utc).timestamp())
window = _just_ended_br_window(now_ts)
if window is None:
return
try:
await execute_weekly_br_report_task(window)
await _record("weekly_br_report", True)
except Exception as e:
await _record("weekly_br_report", False, str(e))
raise
@weekly_br_report_task.before_loop
async def before_weekly_br_report_task():
await get_bot().wait_until_ready()
# ============================================================================
# TASK STARTER FUNCTION
# ============================================================================
async def _startup_heavy_init():
"""Run heavy DB/API tasks at startup, then start recurring tasks."""
try:
await execute_update_squadrons_db_task(count=1000)
except Exception as e:
logging.error(f"[STARTUP] squadrons.db update failed: {e}")
try:
await execute_sync_squadron_members_bulk(count=1000)
except Exception as e:
logging.error(f"[STARTUP] squadron members sync failed: {e}")
try:
await execute_cleanup_stale_squadrons()
except Exception as e:
logging.error(f"[STARTUP] stale squadron cleanup failed: {e}")
await asyncio.sleep(5)
# Recurring tasks (started after heavy init completes)
update_squadrons_db_task.start()
squadron_stats_tracker_task.start()
leave_alarm_task.start()
# Continuous background tasks
squadron_points_loop_task.start()
update_meta_data_task.start()
sync_guild_metas_task.start()
async def start_all_tasks():
"""
Start all background tasks. Heavy DB init runs in a background task
so on_ready returns quickly.
Order:
1. Lightweight time-check tasks (just check clock, no heavy work)
2. WebSocket listeners (persistent connections)
3. Heavy DB/API tasks (background — does not block on_ready)
"""
# Phase 1: Lightweight time-check tasks
entitlement_cache_task.start()
ldb_alarm_task.start()
squadron_stats_boundary_task.start()
points_alarm_task.start()
stacks_cleanup_task.start()
replay_cleanup_task.start()
health_heartbeat_task.start()
weekly_br_report_task.start()
ttl_alert_task.start()
# Phase 2: WebSocket listeners
ws_autolog_task.start()
# Phase 3: Heavy DB tasks (background — doesn't block on_ready)
asyncio.create_task(_startup_heavy_init())
def stop_all_tasks():
"""Stop all background tasks. Call this on shutdown."""
entitlement_cache_task.cancel()
ldb_alarm_task.cancel()
squadron_stats_tracker_task.cancel()
squadron_stats_boundary_task.cancel()
points_alarm_task.cancel()
stacks_cleanup_task.cancel()
replay_cleanup_task.cancel()
leave_alarm_task.cancel()
update_squadrons_db_task.cancel()
update_meta_data_task.cancel()
ws_autolog_task.cancel()
squadron_points_loop_task.cancel()
sync_guild_metas_task.cancel()
health_heartbeat_task.cancel()
weekly_br_report_task.cancel()
ttl_alert_task.cancel()