from __future__ import annotations import json from fastapi import APIRouter, HTTPException from web.api import db router = APIRouter() @router.get("/api/tss/live") async def live(limit: int = 50) -> dict: rows = await db.query(db.battles_path(), "SELECT * FROM match_summary ORDER BY endtime_unix DESC LIMIT ?", (max(1, min(limit, 200)),)) return {"total": len(rows), "matches": rows} def _roster_team(rows: list[dict], slot: str, winning: str, losing: str, draw: int) -> dict: players = [r for r in rows if (r.get("team_slot") or "") == slot] for p in players: p["UID"] = str(p["UID"]) return { "team_slot": slot, "team_name": players[0]["team_name"] if players else None, "team_id": players[0]["team_id"] if players else None, "is_winner": (not draw) and slot == winning, "is_loser": (not draw) and slot == losing, "players": players, } @router.get("/api/tss/match/{session_id}") async def match(session_id: str) -> dict: summary = await db.query_one(db.battles_path(), "SELECT * FROM match_summary WHERE session_id = ?", (session_id,)) if not summary: raise HTTPException(status_code=404, detail=f"match {session_id} not found") rows = await db.query(db.battles_path(), "SELECT * FROM player_games_hist WHERE session_id = ?", (session_id,)) slots = sorted({(r.get("team_slot") or "") for r in rows if r.get("team_slot")}) teams = [_roster_team(rows, s, summary["winning_slot"], summary["losing_slot"], summary["draw"]) for s in slots] return {"match": summary, "teams": teams} @router.get("/api/tss/match/{session_id}/scoreboard") async def scoreboard(session_id: str) -> dict: base = await match(session_id) # reuses 404 + roster logic logs_row = await db.query_one(db.battles_path(), "SELECT chat_log_json, battle_log_json, event_log_json FROM match_logs WHERE session_id = ?", (session_id,)) if logs_row: def _parse(v): try: return json.loads(v) if v else None except (json.JSONDecodeError, TypeError): return None logs = {"available": True, "chat": _parse(logs_row["chat_log_json"]), "battle": _parse(logs_row["battle_log_json"]), "events": _parse(logs_row["event_log_json"])} else: logs = {"available": False} return {**base, "logs": logs} @router.get("/api/tss/matches/search") async def matches_search(player: str | None = None, team: str | None = None, mission: str | None = None, tournament: str | None = None, time_from: int | None = None, time_to: int | None = None, limit: int = 50) -> dict: clauses: list[str] = [] params: list = [] join = "" if player or team: join = "JOIN player_games_hist p ON p.session_id = m.session_id" if player: clauses.append("p.UID = ?"); params.append(player) if team: clauses.append("p.team_name = ?"); params.append(team) if mission: clauses.append("m.mission_name LIKE ?"); params.append(f"%{mission}%") if tournament: clauses.append("m.tournament_name LIKE ?"); params.append(f"%{tournament}%") if time_from is not None: clauses.append("m.endtime_unix >= ?"); params.append(time_from) if time_to is not None: clauses.append("m.endtime_unix <= ?"); params.append(time_to) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" params.append(max(1, min(limit, 200))) rows = await db.query(db.battles_path(), f"SELECT DISTINCT m.* FROM match_summary m {join} {where} " f"ORDER BY m.endtime_unix DESC LIMIT ?", tuple(params)) return {"total": len(rows), "matches": rows} @router.get("/api/tss/maps") async def maps() -> dict: rows = await db.query(db.battles_path(), "SELECT DISTINCT mission_name, level_path FROM match_summary " "WHERE mission_name IS NOT NULL ORDER BY mission_name") return {"total": len(rows), "maps": rows}