214 lines
7.5 KiB
Python
214 lines
7.5 KiB
Python
"""Chat + battle log builder and persistence for TSS matches.
|
|
|
|
Ports SREBOT/BOT/utils.py log formatting so both sites render identically.
|
|
Logs are stored in match_logs (tss_battles.db) as JSON string arrays.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
import aiosqlite
|
|
|
|
try:
|
|
from data_parser import LangTableReader, apply_vehicle_name_filters # type: ignore
|
|
except Exception: # pragma: no cover
|
|
LangTableReader = None # type: ignore
|
|
|
|
def apply_vehicle_name_filters(name, strip_decorations=True): # type: ignore
|
|
return name
|
|
|
|
log = logging.getLogger("tssbot.match_logs")
|
|
|
|
MATCH_LOGS_SQL = """
|
|
CREATE TABLE IF NOT EXISTS match_logs (
|
|
session_id TEXT PRIMARY KEY,
|
|
chat_log_json TEXT,
|
|
battle_log_json TEXT,
|
|
event_log_json TEXT,
|
|
built_unix INTEGER
|
|
)
|
|
"""
|
|
|
|
def _fmt_time(ms: int) -> str:
|
|
total_s = int(ms) // 1000
|
|
return f"{total_s // 60:02d}:{total_s % 60:02d}"
|
|
|
|
|
|
def _decompress_events(raw_events: Any) -> dict:
|
|
if isinstance(raw_events, str):
|
|
try:
|
|
import zstandard
|
|
compressed = base64.b85decode(raw_events)
|
|
return json.loads(zstandard.decompress(compressed).decode("utf-8"))
|
|
except Exception as exc: # pragma: no cover - corrupt payloads
|
|
log.error("Failed to decompress events: %s", exc)
|
|
return {}
|
|
return raw_events or {}
|
|
|
|
|
|
def build_event_log(game: dict[str, Any]) -> dict[str, Any]:
|
|
"""Return the raw event slices the website needs for per-unit state."""
|
|
events = _decompress_events(game.get("events", {}))
|
|
if not isinstance(events, dict):
|
|
return {"kills": [], "damage": [], "chat": list(game.get("chat") or [])}
|
|
return {
|
|
"kills": list(events.get("kills") or []),
|
|
"damage": list(events.get("damage") or []),
|
|
"chat": list(game.get("chat") or []),
|
|
}
|
|
|
|
|
|
def _strip_tag(tag: str) -> str:
|
|
s = (tag or "").strip()
|
|
if len(s) >= 3 and not s[0].isalnum() and not s[-1].isalnum():
|
|
return s[1:-1]
|
|
return s
|
|
|
|
|
|
def build_match_logs(game: dict[str, Any]) -> tuple[list[str], list[str]]:
|
|
"""Return (chat_log, battle_log) as pre-formatted string lists."""
|
|
players = game.get("players", {}) or {}
|
|
tss = game.get("tss") or {}
|
|
winner_slot = str(game.get("winner") or "")
|
|
loser_slot = str(game.get("loser") or "")
|
|
|
|
def _team_name(slot: str) -> str:
|
|
raw = tss.get(slot)
|
|
if isinstance(raw, dict) and raw.get("name"):
|
|
return str(raw["name"])
|
|
return f"Team {slot}" if slot else ""
|
|
|
|
uid_lookup: dict[str, dict[str, str]] = {}
|
|
for uid_str, p in players.items():
|
|
slot = str(p.get("team") or "")
|
|
uid_lookup[str(uid_str)] = {
|
|
"name": p.get("name", "") or "",
|
|
"team_slot": slot,
|
|
"team_name": _team_name(slot),
|
|
}
|
|
|
|
translate = None
|
|
if LangTableReader is not None:
|
|
try:
|
|
translate = LangTableReader("English")
|
|
except Exception: # pragma: no cover
|
|
translate = None
|
|
|
|
def _veh(cdk: str | None) -> str:
|
|
if not cdk:
|
|
return "Unknown"
|
|
if translate is not None:
|
|
t = translate.get_translate(cdk)
|
|
if t:
|
|
return apply_vehicle_name_filters(t)
|
|
return cdk
|
|
|
|
def _player(uid: str | None) -> tuple[str, str, str]:
|
|
if uid is None:
|
|
return "Unknown", "", ""
|
|
info = uid_lookup.get(str(uid))
|
|
if info:
|
|
return info["name"], info["team_name"], info["team_slot"]
|
|
return f"Player#{uid}", "", ""
|
|
|
|
def _prefix(slot: str) -> str:
|
|
if slot == winner_slot:
|
|
return "+"
|
|
if slot == loser_slot:
|
|
return "-"
|
|
return " "
|
|
|
|
chat_log: list[str] = []
|
|
for c in game.get("chat", []) or []:
|
|
info = uid_lookup.get(
|
|
str(c.get("uid", "")), {"name": "Unknown", "team_name": "??", "team_slot": ""}
|
|
)
|
|
chat_log.append(
|
|
f"{_prefix(info['team_slot'])}[{_fmt_time(c.get('time', 0))}] [{c.get('type', 'ALL')}] "
|
|
f"[{info['team_name']}] `{info['name']}`: {c.get('message', '')}"
|
|
)
|
|
|
|
events = _decompress_events(game.get("events", {}))
|
|
merged: list[dict[str, Any]] = []
|
|
for k in events.get("kills", []) or []:
|
|
merged.append({
|
|
"kind": "kill", "time": k.get("time", 0),
|
|
"off_uid": str(k["offender_uid"]) if k.get("offender_uid") is not None else None,
|
|
"off_unit": k.get("offender_unit"),
|
|
"vic_uid": str(k["offended_uid"]) if k.get("offended_uid") is not None else None,
|
|
"vic_unit": k.get("offended_unit"),
|
|
"crashed": k.get("crashed", False), "afire": False,
|
|
})
|
|
for d in events.get("damage", []) or []:
|
|
merged.append({
|
|
"kind": "damage", "time": d.get("time", 0),
|
|
"off_uid": str(d["offender_uid"]) if d.get("offender_uid") is not None else None,
|
|
"off_unit": d.get("offender_unit"),
|
|
"vic_uid": str(d["offended_uid"]) if d.get("offended_uid") is not None else None,
|
|
"vic_unit": d.get("offended_unit"),
|
|
"crashed": False, "afire": d.get("afire", False),
|
|
})
|
|
merged.sort(key=lambda e: e.get("time", 0))
|
|
|
|
battle_log: list[str] = []
|
|
for ev in merged:
|
|
ts = _fmt_time(ev["time"])
|
|
vic_name, vic_team, vic_slot = _player(ev["vic_uid"])
|
|
vic_veh = _veh(ev["vic_unit"])
|
|
if ev["kind"] == "kill":
|
|
if ev["off_uid"] is None or ev["crashed"]:
|
|
battle_log.append(
|
|
f"{_prefix(vic_slot)}[{ts}] {f'[{vic_team}]':<14} "
|
|
f"{vic_name} ({vic_veh}) crashed"
|
|
)
|
|
else:
|
|
name, team, slot = _player(ev["off_uid"])
|
|
battle_log.append(
|
|
f"{_prefix(slot)}[{ts}] {f'[{team}]':<14} {name} ({_veh(ev['off_unit'])}) "
|
|
f"destroyed {vic_name} ({vic_veh})"
|
|
)
|
|
else:
|
|
if ev["off_uid"] is None:
|
|
continue
|
|
name, team, slot = _player(ev["off_uid"])
|
|
afire = "(FIRE) " if ev["afire"] else ""
|
|
battle_log.append(
|
|
f"{_prefix(slot)}[{ts}] {f'[{team}]':<14} {name} ({_veh(ev['off_unit'])}) "
|
|
f"damaged {afire}{vic_name} ({vic_veh})"
|
|
)
|
|
|
|
return chat_log, battle_log
|
|
|
|
|
|
async def upsert_match_logs(
|
|
db_path,
|
|
session_id: str,
|
|
chat_log: list[str],
|
|
battle_log: list[str],
|
|
event_log: dict[str, Any] | None = None,
|
|
) -> None:
|
|
import time
|
|
async with aiosqlite.connect(db_path) as conn:
|
|
await conn.execute(MATCH_LOGS_SQL)
|
|
try:
|
|
await conn.execute("ALTER TABLE match_logs ADD COLUMN event_log_json TEXT")
|
|
except Exception:
|
|
pass
|
|
await conn.execute(
|
|
"""INSERT INTO match_logs (session_id, chat_log_json, battle_log_json, event_log_json, built_unix)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(session_id) DO UPDATE SET
|
|
chat_log_json=excluded.chat_log_json,
|
|
battle_log_json=excluded.battle_log_json,
|
|
event_log_json=excluded.event_log_json,
|
|
built_unix=excluded.built_unix""",
|
|
(str(session_id), json.dumps(chat_log, ensure_ascii=False),
|
|
json.dumps(battle_log, ensure_ascii=False),
|
|
json.dumps(event_log or {"kills": [], "damage": []}, ensure_ascii=False),
|
|
int(time.time())),
|
|
)
|
|
await conn.commit()
|