"""File-level conflict detection from cached manifests. Port of pzmm core/scanner.py:scan_file_conflicts adapted to read from the mod_files table (populated by worker.build_manifest_and_types) instead of walking on-disk media trees. See docs/plans/2026-05-04-pzmm-conflict-and-typing.md. """ from __future__ import annotations from collections import defaultdict from dataclasses import dataclass from typing import Dict, List, Tuple from mlos_sort import ModInfo @dataclass class FileConflict: rel_path: str providers: List[str] # mod_ids in input load order winner: str # mod_id (last in load order) _FETCH_MANIFEST = """ WITH inputs AS ( SELECT unnest($1::text[]) AS workshop_id, unnest($2::text[]) AS mod_id ) SELECT mf.workshop_id, mf.mod_id, mf.rel_path, mf.sha1 FROM mod_files mf JOIN inputs i ON mf.workshop_id = i.workshop_id AND mf.mod_id = i.mod_id """ async def scan_file_conflicts(conn, mods: List[ModInfo]) -> List[FileConflict]: """For the given (already-loaded) ModInfos, report rel_paths claimed by ≥2 mods with non-equal sha1. Returns list ordered by rel_path. Mods without manifest rows (`files_manifest_built=false`) silently contribute nothing to the conflict scan; the caller is responsible for surfacing them as `missing_manifests` in any user-facing payload. """ if len(mods) < 2: return [] wsids = [m.workshop_id or "" for m in mods] mod_ids = [m.id for m in mods] rows = await conn.fetch(_FETCH_MANIFEST, wsids, mod_ids) # mod_id → load-order index (input order = load order, mirroring pzmm) order_index: Dict[str, int] = {m.id: i for i, m in enumerate(mods)} # rel_path → list of (load_order_index, mod_id, sha1) by_path: Dict[str, List[Tuple[int, str, str]]] = defaultdict(list) for r in rows: mod_id = r["mod_id"] idx = order_index.get(mod_id) if idx is None: continue by_path[r["rel_path"]].append((idx, mod_id, r["sha1"])) conflicts: List[FileConflict] = [] for rel, entries in by_path.items(): # Need ≥2 distinct providers AND ≥2 distinct sha1s. If every # provider ships byte-identical content (same sha1), it's a # duplicate, not a conflict — pzmm scanner.py:55–66. unique_providers = {mod_id for _, mod_id, _ in entries} if len(unique_providers) < 2: continue unique_hashes = {sha for _, _, sha in entries} if len(unique_hashes) < 2: continue # Order providers by input load-order index. Winner = last loaded. ordered = sorted(entries, key=lambda e: e[0]) providers = [mod_id for _, mod_id, _ in ordered] # De-dup providers preserving order (a mod could ship the same # rel_path under both B41 and B42 layouts → seen twice). seen: set = set() dedup_providers: List[str] = [] for p in providers: if p not in seen: seen.add(p) dedup_providers.append(p) conflicts.append(FileConflict( rel_path=rel, providers=dedup_providers, winner=dedup_providers[-1], )) conflicts.sort(key=lambda c: c.rel_path) return conflicts