feat: modpack-bundled sorting rules + B41/B42 build pair

- data/modpack_rules/helldrinx.txt: bundled rules for HellDrinx FULL/LITE
- app.py auto-injects modpack rules when a trigger wsid is in input;
  user-supplied rules are appended after and override on conflict
- MANUAL_BUILD_PAIRS: betterLockpicking (B41) ↔ NFsBetterLockpicking (B42)
- mlos_sort.py: minor adjustments (kept in lockstep across api/worker)
This commit is contained in:
2026-05-04 15:58:39 +00:00
parent b73325882e
commit cee433f47e
4 changed files with 242 additions and 25 deletions

View File

@@ -14,7 +14,7 @@ import os
import time
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID
import httpx
@@ -290,9 +290,107 @@ MANUAL_BUILD_PAIRS: Dict[str, str] = {
# builds; B42 wsid bundles a TMMumble addon as a second mod_id.
"2613146550": "3632610172",
"3632610172": "2613146550",
# betterLockpicking (B41) ↔ NFsBetterLockpicking (B42). Different author
# picked up the B42 port and renamed the mod_id, so the mod_parsed-based
# cross-reference can't find the pair.
"2368058459": "3440867775",
"3440867775": "2368058459",
}
# Modpack-bundled sorting_rules.txt files: when a trigger wsid is in the
# user's input, we prepend the matching file's contents to the user's rules
# text before parse_sorting_rules runs. User-supplied rules come last so they
# override modpack defaults on conflicting keys.
_MODPACK_RULES_DIR = Path(__file__).resolve().parent.parent / "data" / "modpack_rules"
_MODPACK_RULES_TRIGGERS: Dict[str, str] = {
# HellDrinx FULL + LITE both bundle the same sorting_rules.txt.
"3672556207": "helldrinx.txt", # HellDrinx FULL
"3662909244": "helldrinx.txt", # HellDrinx LITE
}
# Human-readable trigger labels for the warning message.
_MODPACK_RULES_LABELS: Dict[str, str] = {
"3672556207": "HellDrinx FULL",
"3662909244": "HellDrinx LITE",
}
def _modpack_rules_for(input_wsids: List[str]) -> Tuple[str, List[Tuple[str, str]]]:
"""Return (rules_text, triggers) where triggers is a list of (wsid, label)
tuples that fired and rules_text is the concatenated content of every
distinct file referenced. Each rules file is included at most once even
if multiple trigger wsids share it (HellDrinx FULL + LITE case)."""
files_seen: set = set()
parts: List[str] = []
triggers: List[Tuple[str, str]] = []
for wsid in input_wsids:
fname = _MODPACK_RULES_TRIGGERS.get(wsid)
if not fname:
continue
triggers.append((wsid, _MODPACK_RULES_LABELS.get(wsid, wsid)))
if fname in files_seen:
continue
files_seen.add(fname)
try:
parts.append((_MODPACK_RULES_DIR / fname).read_text(encoding="utf-8"))
except OSError as e:
log.warning("modpack rules load failed for %s (%s): %s", wsid, fname, e)
return ("\n\n".join(parts), triggers)
def _parse_rules_with_modpacks(
rules_raw: Optional[str],
input_wsids: List[str],
) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
"""Parse user-supplied rules with any auto-detected modpack rules
prepended. Returns (parsed_rules, triggers). Caller emits a warning per
trigger so the user knows the auto-injection happened."""
modpack_text, triggers = _modpack_rules_for(input_wsids)
combined = (
(modpack_text + "\n\n" + (rules_raw or "")).strip()
if modpack_text else (rules_raw or "")
)
if not combined:
return ({}, triggers)
try:
return (parse_sorting_rules(combined), triggers)
except Exception:
log.warning("failed to parse modpack+user rules; ignoring")
return ({}, triggers)
def _emit_modpack_rules_warnings(
payload: Dict[str, Any],
triggers: List[Tuple[str, str]],
) -> None:
"""Append a `modpack-rules-applied` warning per modpack trigger so users
know the modpack's bundled sorting_rules.txt was auto-injected. Skips
triggers already flagged (idempotent across resort cycles)."""
if not triggers:
return
existing = payload.get("WARNINGS") or []
already_flagged = {
w.get("wsid") for w in existing
if w.get("tag") == "modpack-rules-applied" and w.get("wsid")
}
new_warnings: List[Dict[str, Any]] = []
for wsid, label in triggers:
if wsid in already_flagged:
continue
new_warnings.append({
"tag": "modpack-rules-applied",
"level": "amber",
"wsid": wsid,
"msg": (
f"{label} ({wsid}) detected — auto-applied its bundled "
f"sorting_rules.txt. Your manually-entered rules (if any) "
f"override modpack rules."
),
})
if new_warnings:
payload["WARNINGS"] = list(existing) + new_warnings
async def _find_swap_candidate(
conn,
wsid: str,
@@ -405,10 +503,13 @@ async def _emit_build_mismatch_warnings(
Build tags don't include the requested build.
`workshop_meta.tags` is Steam's controlled vocabulary — `Build 41` /
`Build 42` are the canonical signal. Three cases per wsid:
`Build 42` are the canonical signal. Cases per wsid:
- both builds tagged → multi-build, no warn
- only the OPPOSITE build tagged → wrong build, emit warn
- neither build tagged → author didn't mark, can't tell, no warn
- neither build tagged BUT a MANUAL_BUILD_PAIRS partner is tagged for
the target build → infer wrong build, emit warn (covers untagged
wsids whose author never set a Steam build tag)
- neither build tagged AND no paired partner → can't tell, no warn
- only the TARGET build tagged → fine, no warn
Multi-branch wsids already get their own `build-mismatch` from
@@ -417,14 +518,22 @@ async def _emit_build_mismatch_warnings(
"""
if not input_wsids or pz_build not in ("B41", "B42"):
return
# Expand the lookup set to include MANUAL_BUILD_PAIRS partners so we can
# check the partner's Steam tags in the same query (no N+1).
lookup_wsids = set(input_wsids)
for w in input_wsids:
if w in MANUAL_BUILD_PAIRS:
lookup_wsids.add(MANUAL_BUILD_PAIRS[w])
rows = await conn.fetch(
"""
SELECT workshop_id, title, tags
FROM workshop_meta
WHERE workshop_id = ANY($1::text[])
""",
list(input_wsids),
list(lookup_wsids),
)
tags_by_wsid: Dict[str, List[str]] = {r["workshop_id"]: list(r["tags"] or []) for r in rows}
titles_by_wsid: Dict[str, str] = {r["workshop_id"]: r["title"] for r in rows}
target_tag = "Build 41" if pz_build == "B41" else "Build 42"
other_tag = "Build 42" if pz_build == "B41" else "Build 41"
existing = payload.get("WARNINGS") or []
@@ -433,16 +542,24 @@ async def _emit_build_mismatch_warnings(
if w.get("tag") == "build-mismatch" and w.get("wsid")
}
new_warnings: List[Dict[str, Any]] = []
for r in rows:
wsid = r["workshop_id"]
for wsid in input_wsids:
if wsid in already_flagged:
continue
tags = list(r["tags"] or [])
tags = tags_by_wsid.get(wsid)
if tags is None:
continue # no workshop_meta row cached
if target_tag in tags:
continue # supports the picked build
if other_tag not in tags:
continue # author didn't tag a build, can't tell
title = r["title"] or wsid
# Two paths to "this is the wrong build":
# (a) Steam directly tags it as the OPPOSITE build.
# (b) MANUAL_BUILD_PAIRS partner is tagged for our target build.
partner_supports_target = (
wsid in MANUAL_BUILD_PAIRS
and target_tag in tags_by_wsid.get(MANUAL_BUILD_PAIRS[wsid], [])
)
if other_tag not in tags and not partner_supports_target:
continue # untagged with no helpful partner — can't tell
title = titles_by_wsid.get(wsid) or wsid
warn: Dict[str, Any] = {
"tag": "build-mismatch",
"level": "amber",
@@ -696,12 +813,7 @@ async def _build_result_for_job(
wsids,
)
mods = [_row_to_modinfo(r) for r in rows]
rules: Dict[str, Any] = {}
if rules_raw:
try:
rules = parse_sorting_rules(rules_raw)
except Exception:
log.warning("job result: failed to parse sorting_rules")
rules, modpack_triggers = _parse_rules_with_modpacks(rules_raw, wsids)
_inject_addon_loadafter(mods)
sort_result = sort_mods(mods, rules)
cached_ids = {r["workshop_id"] for r in rows}
@@ -724,6 +836,7 @@ async def _build_result_for_job(
await _augment_with_required_items(conn, payload, wsids)
if pz_build:
await _emit_build_mismatch_warnings(conn, payload, wsids, pz_build)
_emit_modpack_rules_warnings(payload, modpack_triggers)
_reorder_warnings(payload)
# Spec A §8 ownership: WORKSHOP_ITEMS_LINE preserves the SET of input
# wsids regardless of which are cached/non-mod/unknown. Within that set,
@@ -1017,13 +1130,7 @@ async def sort_endpoint(req: SortRequest, request: Request) -> Dict[str, Any]:
mods: List[ModInfo] = [_row_to_modinfo(r) for r in rows]
rules: Dict[str, Any] = {}
if req.rules:
try:
rules = parse_sorting_rules(req.rules)
except Exception:
log.warning("failed to parse sorting_rules; ignoring")
rules = {}
rules, modpack_triggers = _parse_rules_with_modpacks(req.rules, input_ids)
_inject_addon_loadafter(mods)
sort_result = sort_mods(mods, rules)
@@ -1057,6 +1164,7 @@ async def sort_endpoint(req: SortRequest, request: Request) -> Dict[str, Any]:
await _emit_build_mismatch_warnings(
conn, payload, input_ids, req.pz_build or "B42",
)
_emit_modpack_rules_warnings(payload, modpack_triggers)
_reorder_warnings(payload)
# Surface ghost / non-mod IDs separately from real pending so the UI can
# distinguish "indexing 2 mods" from "2 of your IDs are deleted/typo'd"