#!/usr/bin/env python3 """ Move legacy repo-root replays into STORAGE/REPLAYS. Legacy directories were named replays/0. The new canonical layout is STORAGE/REPLAYS/. """ from __future__ import annotations import argparse import os import re import shutil from pathlib import Path REPO_ROOT = Path(__file__).resolve().parents[1] DEFAULT_SOURCE = REPO_ROOT / "replays" _storage_env = os.environ.get("STORAGE_VOL_PATH", "").strip() if not _storage_env: raise RuntimeError("STORAGE_VOL_PATH must be set") DEFAULT_STORAGE = Path(_storage_env) LEGACY_REPLAY_DIR = re.compile(r"^0([0-9a-fA-F]+)$") HEX_REPLAY_DIR = re.compile(r"^[0-9a-fA-F]+$") def canonical_name(name: str) -> str | None: legacy = LEGACY_REPLAY_DIR.fullmatch(name) if legacy: return legacy.group(1).lower() if HEX_REPLAY_DIR.fullmatch(name): return name.lower() return None def iter_files(root: Path) -> list[Path]: return [p for p in root.rglob("*") if p.is_file()] def copy_replay_dir(source: Path, dest: Path, dry_run: bool) -> tuple[int, int]: copied = 0 skipped = 0 for src_file in iter_files(source): rel = src_file.relative_to(source) dst_file = dest / rel if dst_file.exists() and dst_file.stat().st_size == src_file.stat().st_size: skipped += 1 continue copied += 1 if dry_run: continue dst_file.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src_file, dst_file) return copied, skipped def copied_completely(source: Path, dest: Path) -> bool: for src_file in iter_files(source): dst_file = dest / src_file.relative_to(source) if not dst_file.exists() or dst_file.stat().st_size != src_file.stat().st_size: return False return True def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--source", type=Path, default=DEFAULT_SOURCE, help="Legacy replays directory.") parser.add_argument( "--storage-dir", type=Path, default=DEFAULT_STORAGE, help="Storage root. Destination defaults to /REPLAYS.", ) parser.add_argument("--dest", type=Path, help="Destination replay directory.") parser.add_argument("--execute", action="store_true", help="Actually copy files. Default is dry-run.") parser.add_argument("--move", action="store_true", help="Remove source dirs after a successful executed copy.") args = parser.parse_args() source = args.source.expanduser().resolve() dest_root = (args.dest or (args.storage_dir / "REPLAYS")).expanduser().resolve() dry_run = not args.execute if args.move and dry_run: parser.error("--move requires --execute") if not source.exists(): raise SystemExit(f"Source does not exist: {source}") planned = 0 copied_total = 0 skipped_total = 0 print(f"Source: {source}") print(f"Destination: {dest_root}") print(f"Mode: {'dry-run' if dry_run else 'execute'}") for entry in sorted(source.iterdir()): if not entry.is_dir(): continue new_name = canonical_name(entry.name) if new_name is None: print(f"skip non-replay dir: {entry.name}") continue planned += 1 dest = dest_root / new_name copied, skipped = copy_replay_dir(entry, dest, dry_run) copied_total += copied skipped_total += skipped print(f"{entry.name} -> {new_name}: copy {copied}, skip {skipped}") if args.move and copied_completely(entry, dest): shutil.rmtree(entry) print(f"removed source dir: {entry}") print(f"Replay dirs: {planned}; files to copy: {copied_total}; already present: {skipped_total}") if dry_run: print("Dry-run only. Re-run with --execute to copy files.") return 0 if __name__ == "__main__": raise SystemExit(main())