Add full sortof codebase: API, drain workers, frontend, schema, specs

This commit is contained in:
2026-05-04 03:27:54 +00:00
parent acda2c90f8
commit 55d3794bfb
43 changed files with 13375 additions and 53 deletions

683
api/adapters.py Normal file
View File

@@ -0,0 +1,683 @@
"""Translate mlos_sort + mod_parsed rows into the SORTOF_DATA shape the frontend expects."""
from __future__ import annotations
import re
from typing import Any, Dict, List, Optional, Set, Tuple
from mlos_sort import ModInfo
CAT_MAP: Dict[str, str] = {
"coreRequirement": "lib",
"tweaks": "lib",
"tile": "tile",
"debug": "debug",
"resource": "resource",
"map": "map",
"qol": "qol",
"moodle": "moodle",
"tweak_minor": "tweak",
"music": "music",
"wearable": "wearable",
"profession": "profession",
"movement": "movement",
"building": "building",
"farming": "farming",
"zombie": "zombie",
"zone": "zone",
"armor": "armor",
"food": "food",
"health": "health",
"weapon": "weapon",
"crafting": "crafting",
"container": "container",
"vehicle": "vehicle",
"vehicle_spawn": "v.spawn",
"loot": "loot",
"code": "gameplay",
"ui": "ui",
"sound": "sound",
"texture": "texture",
"translation": "i18n",
"multiplayer": "mp",
"server_only": "server",
"fix": "fix",
"other": "gameplay",
"undefined": "gameplay",
# Spec G-patch §6: dedicated frontend pill so patches are visible at a
# glance even though their position (last) already telegraphs the role.
"patch": "patch",
}
def to_frontend_cat(mlos_cat: str) -> str:
return CAT_MAP.get(mlos_cat, "gameplay")
def position(load_first: str, load_last: str) -> str:
if load_first == "on":
return "first"
if load_last == "on":
return "last"
return ""
# Hand-curated mod_id alias map: when the user has any of the value mod_ids,
# the key mod_id is considered "satisfied" for missing-dep purposes. This
# stops a B42 mod whose mod_id was renamed across builds (e.g. TrueMoozic)
# from showing up as a missing dep for downstream B41 mods that still
# require the old mod_id (`truemusic`).
MOD_ID_ALIASES: Dict[str, List[str]] = {
"truemusic": ["TrueMoozic"],
}
def _resolve_satisfied_via_alias(input_modids: Set[str]) -> Set[str]:
"""Return the input mod_id set expanded with any keys whose alias values
are present in the input."""
out = set(input_modids)
for key, values in MOD_ID_ALIASES.items():
if any(v in input_modids for v in values):
out.add(key)
return out
def build_warnings(
mlos_warnings: Dict[str, Any],
wsid_lookup: Dict[str, str] | None = None,
source_wsids: Dict[str, str] | None = None,
input_modids: Set[str] | None = None,
) -> List[Dict[str, Any]]:
"""Translate mlos_sort warnings to the SORTOF_DATA WARNINGS shape.
wsid_lookup: optional {mod_id -> workshop_id} map of *deps* we already
have cached. When provided, each 'missing' warning is augmented with
`actions: [{type: "add-wsid", wsid, label}]` so the frontend can render
a click-to-add button. Unknown deps fall back to a Steam Workshop search
link the user follows manually.
source_wsids: optional {mod_id -> workshop_id} map of *every cached mod
in the current sort*. Used to attach the source mod's wsid onto
'missing' / 'conflict' warnings so the frontend can deep-link to the
Workshop page of the mod that's complaining.
"""
lookup = wsid_lookup or {}
sources = source_wsids or {}
out: List[Dict[str, Any]] = []
for path in mlos_warnings.get("cycles", []) or []:
out.append({
"tag": "cycle",
"level": "red",
"msg": "cycle: " + "".join(path),
})
satisfied_via_alias = _resolve_satisfied_via_alias(input_modids or set())
for mod_id, deps in (mlos_warnings.get("missing_requirements") or {}).items():
# Drop deps that the user already has via an alias (e.g.
# `truemusic` is satisfied if the user has `TrueMoozic`). If every
# listed dep is satisfied via alias, skip the warning entirely.
deps = [d for d in deps if d not in satisfied_via_alias]
if not deps:
continue
actions: List[Dict[str, str]] = []
for dep in deps:
wsid = lookup.get(dep)
if wsid:
actions.append({
"type": "add-wsid",
"wsid": wsid,
"modId": dep,
"label": f"add {dep}",
})
else:
# No cache hit -> link to Steam Workshop search so the user
# can find the wsid manually. PZ appid is 108600.
from urllib.parse import quote_plus
actions.append({
"type": "search-workshop",
"modId": dep,
"url": (
"https://steamcommunity.com/workshop/browse/"
f"?appid=108600&searchtext={quote_plus(dep)}"
),
"label": f"find {dep}",
})
src_wsid = sources.get(mod_id)
# When we know the source mod's wsid, the warning's leading mod name
# is hyperlinked to the source's Workshop page (which lists every
# required item natively). The 'find …' Steam Workshop search action
# is redundant in that case (returns noisy generic results), so drop
# it; keep concrete 'add <wsid>' actions for cache-resolved deps.
if src_wsid:
actions = [a for a in actions if a.get("type") != "search-workshop"]
warn: Dict[str, Any] = {
"tag": "missing",
"level": "red",
"msg": f"{mod_id} requires {', '.join(deps)} - not in your list.",
}
if src_wsid:
warn["wsid"] = src_wsid
if actions:
warn["actions"] = actions
out.append(warn)
for mod_id, ene in (mlos_warnings.get("incompatible_enabled") or {}).items():
warn = {
"tag": "conflict",
"level": "amber",
"msg": f"{mod_id} marked incompatible with {', '.join(ene)}.",
}
src_wsid = sources.get(mod_id)
if src_wsid:
warn["wsid"] = src_wsid
out.append(warn)
return out
# ── Spec C §4: branch flavoring + prefix-base + suffix-token utilities ──
# Build flavor detection for Rule A. Matches:
# - "B42" / "B41" at end-of-string when preceded by a separator (`_`, `-`,
# space, start-of-string) OR a lowercase letter (CamelCase boundary like
# "ArmorB42").
# - "_b42" / "_b41" at end (snake_case).
# - "_b42_" / "_b41_" mid-string token.
# - "_legacy_" anywhere -> B41 (legacy is always old-build behavior).
_RE_B41 = re.compile(
r"_legacy_"
r"|(?:^|[_\- ]|(?<=[a-z]))[Bb]41$"
r"|_[Bb]41(?:[_\- ]|$)"
)
_RE_B42 = re.compile(
r"(?:^|[_\- ]|(?<=[a-z]))[Bb]42$"
r"|_[Bb]42(?:[_\- ]|$)"
)
def _build_flavor(mod_id: str) -> Optional[str]:
"""Return 'B41' / 'B42' / None per Spec C §4.3 suffix rule."""
if not mod_id:
return None
if _RE_B41.search(mod_id):
return "B41"
if _RE_B42.search(mod_id):
return "B42"
return None
def _strict_prefix_of(a: str, b: str) -> bool:
"""`a` is a strict prefix of `b` per Spec C §4.4: a != b, b starts with a,
boundary char is non-lowercase-letter (separator/digit/uppercase)."""
if not a or not b or a == b or not b.startswith(a):
return False
boundary = b[len(a)]
return boundary in "_- " or boundary.isdigit() or boundary.isupper()
def _prefix_base(group: List[ModInfo]) -> ModInfo:
"""Return the branch whose mod_id is a strict prefix of every other branch
in `group` (the 'core'/'main'). Falls back to alphabetical-first if no
such universal prefix exists. Spec C §4.4."""
sorted_grp = sorted(group, key=lambda m: m.id)
for cand in sorted_grp:
if all(_strict_prefix_of(cand.id, m.id) for m in sorted_grp if m.id != cand.id):
return cand
return sorted_grp[0]
# Spec C §4.6 hint patterns. Order matters: first match wins.
_HINT_PATTERNS: List[Tuple[re.Pattern, str]] = [
(re.compile(r"_legacy_", re.IGNORECASE), "legacy build - usually not what you want"),
(re.compile(r"_v\d+(?:_\d+)*$", re.IGNORECASE), "legacy build - usually not what you want"),
(re.compile(r"(?:_lite|_light)$", re.IGNORECASE), "lighter alternate variant - pick one"),
(re.compile(r"(?:_hd|_detailshd)$", re.IGNORECASE), "high-resolution variant"),
(re.compile(r"_(?:noce|novanilla|farmdisable|disable[a-z]*)$", re.IGNORECASE),
"opt-out variant"),
(re.compile(r"_(?:usdm|imports|exotics|realnames)$", re.IGNORECASE),
"alternate variant - pick one"),
]
def _branch_hint(mod_id: str) -> Optional[str]:
"""Return the hint text per Spec C §4.6 D/G patterns, or None."""
if not mod_id:
return None
hits: List[str] = []
for pat, text in _HINT_PATTERNS:
if pat.search(mod_id):
hits.append(text)
return " · ".join(dict.fromkeys(hits)) if hits else None
def _suffix_token(mod_id: str, base_id: str) -> Optional[str]:
"""For Rule C: extract the trailing token after the last `_` if mod_id is
a `<base>_<TOKEN>` form. Returns None for one-letter tokens (`_a`, `_x`).
Two-letter tokens like `_AZ` (AuthenticZ patch) and `_GG` are kept - the
user's spec parenthetical excludes only one-letter false positives."""
if not mod_id or not base_id or mod_id == base_id:
return None
if "_" not in mod_id:
return None
suffix = mod_id.rsplit("_", 1)[-1]
if len(suffix) < 2:
return None
return suffix
def _input_match_terms(mod_id: str) -> Set[str]:
"""Build the set of search terms for one input mod_id, used by Rule C.
Produces both:
- normalized form (lowercase + alphanumeric only) -> substring match
- acronym (first letters of CamelCase / whitespace-separated tokens,
lowercased) -> covers cases like "Authentic Z - Current" where the
token `_AZ` is the initials, not a literal substring.
"""
out: Set[str] = set()
if not mod_id:
return out
out.add(re.sub(r"[^a-z0-9]+", "", mod_id.lower()))
pieces: List[str] = []
for chunk in re.split(r"[^A-Za-z0-9]+", mod_id):
if not chunk:
continue
# CamelCase split: each capital-led run is a piece, plus trailing lowercase runs.
parts = re.findall(r"[A-Z]+[a-z0-9]*|[a-z0-9]+", chunk)
pieces.extend(parts)
if pieces:
acro = "".join(p[0].lower() for p in pieces if p and p[0].isalnum())
if len(acro) >= 2:
out.add(acro)
return out
def _apply_branch_rules(
mods: List[ModInfo],
*,
pz_build: str,
input_modids: Set[str],
is_resort: bool = False,
selected_modids: Optional[Set[str]] = None,
) -> Tuple[Set[str], List[Dict[str, Any]], Dict[str, str]]:
"""Spec C §4 auto-disambiguation pipeline.
Returns (drop_ids, warnings, hints). drop_ids = mod_ids to filter out of
SORTED_ORDER/MODS_LINE. hints = {mod_id -> hint_text} for picker rows.
Order per wsid: detect coordinated/radio (exempt), then A → C → B.
A and C may both fire (orthogonal axes). B is the auto-pick fallback.
`input_modids` is the full set across the user's input. Rule C excludes
same-wsid mod_ids when matching to avoid self-references (a wsid's
branches would otherwise always match each other's tokens).
"""
by_wsid: Dict[str, List[ModInfo]] = {}
for m in mods:
wsid = m.workshop_id or ""
if not wsid:
continue
by_wsid.setdefault(wsid, []).append(m)
# Per-wsid mod_id sets so Rule C can exclude same-wsid siblings.
modids_by_wsid: Dict[str, Set[str]] = {
w: {m.id for m in g} for w, g in by_wsid.items()
}
drop_ids: Set[str] = set()
warnings: List[Dict[str, Any]] = []
hints: Dict[str, str] = {}
pz_build = (pz_build or "").upper() if pz_build else "B42"
if pz_build not in ("B41", "B42"):
pz_build = "B42"
for wsid, group in by_wsid.items():
if len(group) < 2:
continue
ids_in_group = {m.id for m in group}
# Exemption 0: ADDON wsid — at least one mod self-identifies as an
# "Optional add-on" (mod.info description) and at least one mod
# doesn't. Treat as additive: primaries stay in MODS_LINE by default,
# addons land in drop_ids so the user has to explicitly tick them in
# the picker. Picker is naturally checkbox-mode (no incompatibles
# between siblings), so toggling lights addons up.
#
# Resort path: user's explicit selection wins. Drop addons NOT in
# the user's selection; keep those they ticked.
addon_mods = [m for m in group if m.is_addon]
primary_mods = [m for m in group if not m.is_addon]
if addon_mods and primary_mods:
for m in addon_mods:
if not is_resort:
drop_ids.add(m.id)
elif selected_modids is not None and m.id not in selected_modids:
drop_ids.add(m.id)
hints[m.id] = "addon"
for m in primary_mods:
hints[m.id] = "main"
continue
# Exemption 1: radio-mode (incompatibleMods sibling). Picker handles.
radio = any(
any(other in ids_in_group for other in m.incompatibleMods)
for m in group
)
if radio:
continue
# Exemption 2: coordinated (cross-refs in requirements/loadAfter/loadBefore).
coordinated = any(
any(other in ids_in_group for other in m.requirements)
or any(other in ids_in_group for other in m.loadAfter)
or any(other in ids_in_group for other in m.loadBefore)
for m in group
)
if coordinated:
# Coordinated wsids stay whole BY DEFAULT, but if a prefix-base
# exists and a non-base addon branch declares an EXTERNAL require
# (a mod_id not in the wsid's siblings) that isn't in input_modids,
# drop the addon. The author's `require=` here means "this addon
# only makes sense if you have the external mod"; if you don't,
# the addon shouldn't be in the load order. Suppresses the
# would-be missing-dep warning at the same time (post-build_warnings
# filter strips warnings whose source mod_id is in drop_ids).
base_candidates = [
m for m in group
if any(_strict_prefix_of(m.id, o.id) for o in group if o.id != m.id)
]
if base_candidates:
base = max(base_candidates, key=lambda m: len(m.id))
for m in group:
if m.id == base.id:
continue
externals = [r for r in m.requirements if r not in ids_in_group]
if externals and not all(r in input_modids for r in externals):
drop_ids.add(m.id)
for m in group:
h = _branch_hint(m.id)
if h:
hints[m.id] = h
continue
# Truly ambiguous: apply A → C → B.
title = (group[0].name or wsid)
kept_by_a: Optional[ModInfo] = None
kept_by_c: List[ModInfo] = []
# ── Rule A: build-aware default ─────────────────────────────────
flavored = [(m, _build_flavor(m.id)) for m in group]
match_active = [m for m, fl in flavored if fl == pz_build]
opposite_build = "B41" if pz_build == "B42" else "B42"
match_opposite = [m for m, fl in flavored if fl == opposite_build]
unflavored = [m for m, fl in flavored if fl is None]
if len(match_active) == 1 and len(group) - 1 == len(match_opposite) + len(unflavored):
# Exactly one branch matches active build; rest are opposite/unflavored.
kept_by_a = match_active[0]
elif not match_active and match_opposite:
# No active-build variant exists -> author-default fallback (un-flavored
# treated as B42), emit build-mismatch.
warnings.append({
"tag": "build-mismatch",
"level": "amber",
"msg": (
f"no {pz_build} variant for {title} ({wsid}); using author default"
),
"wsid": wsid,
})
# ── Rule C: input cross-reference ───────────────────────────────
# Identify the base branch (longest mod_id that is a strict prefix
# of any other) -- if multiple branches share an _<TOKEN>_ form.
base_candidates = [
m for m in group
if any(_strict_prefix_of(m.id, other.id) for other in group if other.id != m.id)
]
rule_c_base: Optional[ModInfo] = None
if base_candidates:
# Pick the longest base that's a prefix of any sibling (i.e., the
# most-specific shared root). For Jeeve's: `JeevesPatches` qualifies.
rule_c_base = max(base_candidates, key=lambda m: len(m.id))
# Cross-reference set: every input mod_id EXCEPT the ones from
# this wsid (otherwise sibling branches always match each other).
same_wsid_ids = modids_by_wsid.get(wsid, set())
other_modids = input_modids - same_wsid_ids
# Two flavors of search term per input mod_id: the normalized
# form (substring match) and the acronym (first letters - so
# token "AZ" matches "Authentic Z - Current"). See _input_match_terms.
norm_input: Set[str] = set()
for mid in other_modids:
norm_input.update(_input_match_terms(mid))
unticked_addons: List[str] = []
for m in group:
if m is rule_c_base:
continue
tok = _suffix_token(m.id, rule_c_base.id)
if not tok:
continue
tok_l = re.sub(r"[^a-z0-9]+", "", tok.lower())
if not tok_l:
continue
hit = any(tok_l in mid_norm for mid_norm in norm_input)
if hit:
kept_by_c.append(m)
else:
unticked_addons.append(m.id)
if kept_by_c:
# Always include the base alongside matched addons.
if rule_c_base not in kept_by_c:
kept_by_c = [rule_c_base] + kept_by_c
elif any(_suffix_token(m.id, rule_c_base.id) for m in group if m is not rule_c_base):
# Suffix-tokened branches exist but none matched - emit warning.
# `picked`/`alternatives` reuse the auto-picked-branch shape
# so the WarnRow renders an inline picker (the warning's
# "tick any you want" is otherwise unreachable from the
# warnings panel).
if unticked_addons:
warnings.append({
"tag": "unmatched-addons",
"level": "amber",
"msg": (
f"{title} ({wsid}) ships addons for other mods you "
f"don't have: {', '.join(unticked_addons[:8])}"
+ ("" if len(unticked_addons) > 8 else "")
+ ". Tick any you want:"
),
"wsid": wsid,
"picked": rule_c_base.id,
"alternatives": [m.id for m in group],
})
# ── Resolve which set of branches is kept ───────────────────────
# A and C are orthogonal: build-flavor pick + addon picks both apply.
kept_set: Set[str] = set()
if kept_by_a:
kept_set.add(kept_by_a.id)
if kept_by_c:
for m in kept_by_c:
kept_set.add(m.id)
if not kept_set:
# Rule B fallback: prefix-base if available, else alphabetical first.
primary = _prefix_base(group)
kept_set.add(primary.id)
skipped = [m.id for m in group if m.id != primary.id]
warnings.append({
"tag": "auto-picked-branch",
"level": "amber",
"msg": (
f"{title} ({wsid}) ships {len(group)} branches; auto-picked "
f"{primary.id}. Pick another:"
),
"wsid": wsid,
"picked": primary.id,
"alternatives": [m.id for m in group],
})
# Resort-mode override: the user has explicitly picked branches via
# the picker. Replace the rules' kept_set with the user's selection
# restricted to this group, and update the matching auto-picked-
# branch warning's `picked` field so the picker UI shows their
# current choice ticked. Without this, narrowing a multi-branch
# wsid to one branch erases the picker section because the warning
# was tied to the rule's auto-pick, not the user's pick.
if is_resort and selected_modids is not None:
user_picks = [m.id for m in group if m.id in selected_modids]
if user_picks:
kept_set = set(user_picks)
for w in warnings:
if (w.get("tag") == "auto-picked-branch"
and w.get("wsid") == wsid):
w["picked"] = user_picks[0]
w["msg"] = (
f"{title} ({wsid}) ships {len(group)} branches; "
f"selected {user_picks[0]}. Pick another:"
)
for m in group:
if m.id not in kept_set:
drop_ids.add(m.id)
h = _branch_hint(m.id)
if h:
hints[m.id] = h
return (drop_ids, warnings, hints)
# Backwards-compat shim used by existing call sites that don't yet pass
# pz_build/input_modids. Removed once all callers migrate.
def _autopick_ambiguous(mods: List[ModInfo]) -> Tuple[Set[str], List[Dict[str, Any]]]:
drop_ids, warns, _hints = _apply_branch_rules(
mods, pz_build="B42", input_modids={m.id for m in mods},
)
return (drop_ids, warns)
def build_response(
input_ids: List[str],
hit_ids: List[str],
mods: List[ModInfo],
sort_result: Dict[str, Any],
status: str,
wsid_lookup: Dict[str, str] | None = None,
pz_build: str = "B42",
is_resort: bool = False,
selected_modids: Optional[Set[str]] = None,
) -> Dict[str, Any]:
by_id = {m.id: m for m in mods}
sorted_order: List[str] = list(sort_result.get("Mods", []))
workshop_line_parts: List[str] = list(sort_result.get("WorkshopItems", []))
map_folders: List[str] = list(sort_result.get("Map", []))
# MAP_LINE composition rules:
# 1. Hoisted bases (MAP_FIRST) at the FRONT, in the listed order.
# These are library/parent map cells that many other map mods
# overlay onto; they must load before everything else for the
# dependents to attach correctly.
# 2. Remaining modded map folders, in mod-load order.
# 3. Vanilla Muldraugh, KY at the end ALWAYS.
BASE_MAP = "Muldraugh, KY"
MAP_FIRST = ("map_distanciado",)
map_line_parts: List[str] = []
seen_maps: set = set()
map_folder_set = set(map_folders)
for m in MAP_FIRST:
if m in map_folder_set and m not in seen_maps:
seen_maps.add(m)
map_line_parts.append(m)
for m in map_folders:
if m and m not in seen_maps:
seen_maps.add(m)
map_line_parts.append(m)
if BASE_MAP not in seen_maps:
map_line_parts.append(BASE_MAP)
# Spec C §4: build-aware + input-cross-reference + prefix-base auto-pick.
# MOD_DB still contains every cached branch so the picker can offer
# alternates; SORTED_ORDER and MODS_LINE reflect the rule output only.
input_modids = {m.id for m in mods}
drop_ids, autopick_warnings, hints = _apply_branch_rules(
mods, pz_build=pz_build or "B42", input_modids=input_modids,
is_resort=is_resort, selected_modids=selected_modids,
)
# Resort path: `sorted_order` already reflects the user's explicit
# selection (sort_mods ran on the selected subset); the drop_ids from
# _apply_branch_rules are for MOD_DB picker visibility, not for filtering
# out user-selected mods.
if not is_resort:
sorted_order = [mid for mid in sorted_order if mid not in drop_ids]
def _modentry(m: ModInfo) -> Dict[str, Any]:
entry: Dict[str, Any] = {
"wsid": m.workshop_id or "",
"modId": m.id,
"name": m.name or m.id,
"cat": to_frontend_cat(m.category),
"deps": list(m.requirements),
"conflicts": list(m.incompatibleMods),
"pos": position(m.loadFirst, m.loadLast),
"isMap": bool(m.maps),
"mapName": m.maps[0] if m.maps else None,
}
h = hints.get(m.id)
if h:
entry["hint"] = h
return entry
mod_db: List[Dict[str, Any]] = []
seen_in_db: set = set()
for mod_id in sorted_order:
m = by_id.get(mod_id)
if m is None:
continue
seen_in_db.add(mod_id)
mod_db.append(_modentry(m))
# Append auto-skipped branches so the picker can show them as alternatives.
# They aren't in SORTED_ORDER, so the table doesn't render rows for them
# directly - they only surface inside their parent wsid's BranchPicker.
for mod_id in sorted(drop_ids):
m = by_id.get(mod_id)
if m is None or mod_id in seen_in_db:
continue
mod_db.append(_modentry(m))
hit_set = set(hit_ids)
pending = [w for w in input_ids if w not in hit_set]
# Filter out missing-dep warnings whose SOURCE mod_id was dropped by
# _apply_branch_rules. Rationale: if we silently dropped, e.g.,
# fhqExpVehSpawnRedRace because its external require RedRacer wasn't
# in input, the would-be "fhqExpVehSpawnRedRace requires RedRacer" warning
# is just noise — that mod isn't in the user's effective load order
# anymore. Pattern-match the leading source mod_id on each warning.
# Map each cached mod_id to its source wsid so build_warnings can attach
# `wsid` onto missing/conflict warnings (lets the UI deep-link to the
# Workshop page of the mod that's actually complaining).
source_wsids = {m.id: m.workshop_id for m in mods if m.workshop_id}
raw_warnings = build_warnings(
sort_result.get("warnings", {}) or {},
wsid_lookup,
source_wsids=source_wsids,
input_modids=input_modids,
)
if drop_ids:
kept_warnings: List[Dict[str, Any]] = []
for w in raw_warnings:
if w.get("tag") not in ("missing", "conflict"):
kept_warnings.append(w)
continue
m = re.match(r"^([A-Za-z0-9_+\-]{2,})\s+", w.get("msg", ""))
if m and m.group(1) in drop_ids:
continue # source mod_id was dropped → suppress its warning
kept_warnings.append(w)
raw_warnings = kept_warnings
return {
"status": status,
"MOD_DB": mod_db,
"SORTED_ORDER": sorted_order,
"WORKSHOP_ITEMS_LINE": ";".join(workshop_line_parts),
"MODS_LINE": ";".join(sorted_order),
"MAP_LINE": ";".join(map_line_parts),
"WARNINGS": raw_warnings + autopick_warnings,
"pending": pending,
}

1480
api/app.py Normal file

File diff suppressed because it is too large Load Diff

34
api/db.py Normal file
View File

@@ -0,0 +1,34 @@
"""asyncpg pool factory. DSN is built from /opt/sortof/.env at startup."""
from __future__ import annotations
import os
import urllib.parse
from pathlib import Path
import asyncpg
from dotenv import load_dotenv
ENV_PATH = Path(__file__).resolve().parent.parent / ".env"
def _build_dsn() -> str:
load_dotenv(ENV_PATH)
explicit = os.environ.get("DATABASE_URL")
if explicit:
return explicit
user = os.environ["POSTGRES_USER"]
pw = urllib.parse.quote(os.environ["POSTGRES_PASSWORD"], safe="")
name = os.environ["POSTGRES_DB"]
host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
port = os.environ.get("POSTGRES_PORT", "5439")
return f"postgresql://{user}:{pw}@{host}:{port}/{name}"
async def create_pool() -> asyncpg.Pool:
return await asyncpg.create_pool(
dsn=_build_dsn(),
min_size=1,
max_size=8,
command_timeout=15,
)

140
api/expansion.py Normal file
View File

@@ -0,0 +1,140 @@
"""Background async task: take a freshly-created sort_jobs row in 'expanding'
phase, resolve its collection_ids via Steam, populate wsids[], advance phase
to 'queued' (and drop wsids into download_jobs as needed)."""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Dict, List, Tuple
import asyncpg
import httpx
from jobs import update_phase
from steam import fetch_collection_details
log = logging.getLogger("sortof.expansion")
async def _resolve_collections(
conn: asyncpg.Connection,
http: httpx.AsyncClient,
collection_ids: List[str],
) -> Tuple[Dict[str, List[str]], List[str]]:
"""Returns (resolved, unresolvable). resolved maps collection_id ->
[child_wsids]. unresolvable lists collection_ids that GetCollectionDetails
couldn't fetch (after one retry)."""
if not collection_ids:
return ({}, [])
# Cache lookup (TTL = 6h via last_fetched_at).
cache_rows = await conn.fetch(
"""
SELECT collection_id, child_workshop_ids
FROM collections
WHERE collection_id = ANY($1::text[])
AND last_fetched_at > now() - interval '6 hours'
""",
collection_ids,
)
resolved: Dict[str, List[str]] = {
r["collection_id"]: list(r["child_workshop_ids"])
for r in cache_rows
}
miss = [cid for cid in collection_ids if cid not in resolved]
unresolvable: List[str] = []
if miss:
# Spec §5.4: 1 retry with 2s backoff on HTTPError. If both attempts
# raise, api_out stays {} and the per-cid pass below uniformly marks
# every miss as unresolvable (rec is None branch).
api_out: Dict[str, Any] = {}
for attempt in (1, 2):
try:
api_out = await fetch_collection_details(http, miss)
break
except httpx.HTTPError as e:
log.warning("GetCollectionDetails attempt %d failed: %s", attempt, e)
if attempt == 1:
await asyncio.sleep(2.0)
for cid in miss:
rec = api_out.get(cid)
if rec is None or rec.get("result") != 1:
unresolvable.append(cid)
continue
children = rec.get("children") or []
resolved[cid] = list(children)
await conn.execute(
"""
INSERT INTO collections (collection_id, child_workshop_ids, last_fetched_at)
VALUES ($1, $2, now())
ON CONFLICT (collection_id) DO UPDATE
SET child_workshop_ids = EXCLUDED.child_workshop_ids,
last_fetched_at = now()
""",
cid, children,
)
return (resolved, unresolvable)
async def run_expansion(
pool: asyncpg.Pool,
http: httpx.AsyncClient,
job_id: str,
bare_wsids: List[str],
collection_ids: List[str],
) -> None:
"""Top-level expansion task. Logs and persists; never raises out."""
try:
async with pool.acquire() as conn:
resolved, unresolvable = await _resolve_collections(conn, http, collection_ids)
# Compose wsids: collections (in input order) + bare wsids, deduped.
seen: set = set()
wsids: List[str] = []
for cid in collection_ids:
for w in resolved.get(cid, []):
if w and w not in seen:
seen.add(w)
wsids.append(w)
for w in bare_wsids:
if w not in seen:
seen.add(w)
wsids.append(w)
if not wsids:
# All collections unresolvable AND no bare wsids. Job dies.
await update_phase(
conn, job_id, "failed",
failure_reason="all input collections unresolvable",
)
log.info("expansion %s: failed - all collections unresolvable", job_id)
return
partial_warnings = [
{
"tag": "collection-partial",
"level": "warning",
"msg": f"collection {cid} could not be fetched",
}
for cid in unresolvable
]
seed_result = {"WARNINGS": partial_warnings} if partial_warnings else None
await update_phase(
conn, job_id, "queued",
wsids=wsids,
result_json=seed_result,
)
log.info(
"expansion %s: queued (wsids=%d unresolvable=%d)",
job_id, len(wsids), len(unresolvable),
)
except Exception:
log.exception("expansion %s: crashed", job_id)
try:
async with pool.acquire() as conn:
await update_phase(conn, job_id, "failed", failure_reason="expansion crashed")
except Exception:
log.exception("expansion %s: cleanup failed", job_id)

199
api/jobs.py Normal file
View File

@@ -0,0 +1,199 @@
"""sort_jobs persistence + phase derivation.
Phase is *derived* on every GET (Spec B+F §4): never stored as the source
of truth except for terminal states. The function `derive_phase` reads
live counts from download_jobs and decides expanding/queued/draining/done.
This makes the system restart-resilient by construction - there is no
event log to replay.
"""
from __future__ import annotations
import json
from typing import Any, Dict, List, Optional
from uuid import UUID
import asyncpg
# ── CRUD ────────────────────────────────────────────────────────────────────
async def create_job(
conn: asyncpg.Connection,
*,
input_raw: str,
collection_ids: List[str],
wsids: Optional[List[str]],
rules_raw: Optional[str],
initial_phase: str,
pz_build: Optional[str] = None,
) -> str:
"""Insert a sort_jobs row and return the job_id (UUID as string).
initial_phase: 'expanding' if collections still need resolving,
'queued' if wsids are already resolved at submit time.
pz_build: 'B41' / 'B42' captured at submit so the polling-path
result regen can emit build-mismatch warnings against the
user's chosen build.
"""
row = await conn.fetchrow(
"""
INSERT INTO sort_jobs (phase, input_raw, collection_ids, wsids, rules_raw, pz_build)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING job_id
""",
initial_phase, input_raw, collection_ids, wsids, rules_raw, pz_build,
)
return str(row["job_id"])
async def get_job_row(conn: asyncpg.Connection, job_id: str) -> Optional[Dict[str, Any]]:
"""Fetch a sort_jobs row by id. Returns None if not found.
job_id may be either a string UUID or asyncpg-native UUID.
"""
try:
uid = UUID(job_id) if isinstance(job_id, str) else job_id
except ValueError:
return None
row = await conn.fetchrow(
"SELECT * FROM sort_jobs WHERE job_id = $1",
uid,
)
if row is None:
return None
out = dict(row)
# asyncpg returns jsonb as raw text by default (no codec registered
# in db.py). Decode result_json so callers always receive a dict.
rj = out.get("result_json")
if isinstance(rj, str):
out["result_json"] = json.loads(rj)
return out
async def update_phase(
conn: asyncpg.Connection,
job_id: str,
phase: str,
*,
wsids: Optional[List[str]] = None,
result_json: Optional[Dict[str, Any]] = None,
failure_reason: Optional[str] = None,
) -> None:
"""Advance a job's phase. wsids/result_json/failure_reason are optional
column updates that pair with phase transitions."""
# Accept str OR UUID; mirrors get_job_row's input shape.
uid = UUID(job_id) if isinstance(job_id, str) else job_id
sets = ["phase = $2", "phase_started_at = now()"]
# Convention: $1=job_id, $2=phase; optional fields start at $3.
params: List[Any] = [uid, phase]
idx = 3
if wsids is not None:
sets.append(f"wsids = ${idx}::text[]")
params.append(wsids)
idx += 1
if result_json is not None:
sets.append(f"result_json = ${idx}::jsonb")
params.append(json.dumps(result_json))
idx += 1
if failure_reason is not None:
sets.append(f"failure_reason = ${idx}")
params.append(failure_reason)
idx += 1
await conn.execute(
f"UPDATE sort_jobs SET {', '.join(sets)} WHERE job_id = $1",
*params,
)
# ── live counts (Spec B+F §6) ───────────────────────────────────────────────
async def compute_counts(conn: asyncpg.Connection, wsids: List[str]) -> Dict[str, int]:
"""Compute live cached/queued/draining/terminal_failed counts.
Empty wsids → all zeros.
terminal_failed: wsids whose LATEST download_jobs row has status='failed'.
These will not appear in mod_parsed and are not coming back; without this,
derive_phase would loop forever when a job's wsids include non-mods or
permanently-broken downloads.
"""
if not wsids:
return {"cached": 0, "queued": 0, "draining": 0, "terminal_failed": 0}
rows = await conn.fetch(
"""
SELECT
(SELECT COUNT(DISTINCT mp.workshop_id)
FROM mod_parsed mp
JOIN workshop_meta wm ON wm.workshop_id = mp.workshop_id
WHERE mp.workshop_id = ANY($1::text[])
AND mp.parsed_at_time_updated = wm.time_updated) AS cached,
(SELECT COUNT(DISTINCT workshop_id)
FROM download_jobs
WHERE workshop_id = ANY($1::text[]) AND status = 'queued') AS queued,
(SELECT COUNT(DISTINCT workshop_id)
FROM download_jobs
WHERE workshop_id = ANY($1::text[]) AND status = 'downloading') AS draining,
(SELECT COUNT(*) FROM (
SELECT DISTINCT ON (workshop_id) workshop_id, status
FROM download_jobs
WHERE workshop_id = ANY($1::text[])
ORDER BY workshop_id, updated_at DESC
) latest WHERE status = 'failed') AS terminal_failed
""",
wsids,
)
r = rows[0]
return {
"cached": int(r["cached"]),
"queued": int(r["queued"]),
"draining": int(r["draining"]),
"terminal_failed": int(r["terminal_failed"]),
}
# ── phase derivation (Spec B+F §4) ──────────────────────────────────────────
def derive_phase(
stored_phase: str,
wsids: Optional[List[str]],
counts: Dict[str, int],
) -> str:
"""Decide the live phase from the row's stored phase + current counts.
Terminal phases (done/failed) are never demoted. Non-terminal phases
are recomputed from current state.
"""
if stored_phase in ("done", "failed"):
return stored_phase
if wsids is None:
return "expanding"
if counts["draining"] > 0:
return "draining"
if counts["queued"] > 0:
return "queued"
# Terminal: every wsid is either cached or permanently-failed.
settled = counts["cached"] + counts.get("terminal_failed", 0)
if settled >= len(wsids):
return "done"
# Transient gap: a row just left 'queued' and hasn't shown up in
# mod_parsed yet. Most likely just-failed and not yet re-queued.
return "queued"
# ── stale-expansion sweep (Spec B+F §9) ─────────────────────────────────────
STALE_EXPANSION_SQL = """
UPDATE sort_jobs
SET phase = 'failed',
failure_reason = 'expansion timed out',
updated_at = now()
WHERE phase = 'expanding'
AND phase_started_at < now() - interval '10 minutes'
RETURNING job_id;
"""
async def sweep_stale_expansions(conn: asyncpg.Connection) -> int:
"""Run on uvicorn lifespan startup. Returns the number of jobs reaped."""
rows = await conn.fetch(STALE_EXPANSION_SQL)
return len(rows)

712
api/mlos_sort.py Normal file
View File

@@ -0,0 +1,712 @@
"""
mlos_sort.py
Python port of MLOS_sorting.lua (Mod Load Order Sorter, by REfRigERatoR).
Faithful to the Lua algorithm:
- preorder: ModManager, ModManagerServer, modoptions
- category buckets: coreRequirement -> tweaks -> resource -> map -> vehicle ->
code -> clothes -> ui -> other -> translation -> undefined
- loadFirst / loadLast: on (0) | category (1) | off (2)
- topological sort by `require` + `loadAfter` with cycle detection
- sorting_rules.txt overrides supported (loadAfter/loadBefore/incompatibleMods/
loadFirst/loadLast/category)
Limitations vs in-game Lua:
- mod.info-only input. We do NOT walk /media/* folders for category detection.
We rely on mod.info `category=` if present, then `frameworkKeys` name
heuristic, then default "other" (or "undefined" if uncategorizable).
- `loadBefore` is converted into corresponding `loadAfter` edges on other mods,
matching the Lua mod's behavior.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# -----------------------------------------------------------------------------
# Constants (mirrors MLOS_sorting.lua)
# -----------------------------------------------------------------------------
PREORDER: Dict[str, int] = {"ModManager": 1, "ModManagerServer": 2, "modoptions": 3}
RAW_CATEGORY_ORDER: List[str] = [
"coreRequirement",
"tweaks", # libraries / frameworks / APIs (matches existing 'lib' pill)
"tile", # tile asset packs
"debug", # error logger, cheat menus
"resource", # other generic resources
"map",
"qol", # QOL changes
"moodle", # moodles / moodlets
"tweak_minor", # tiny tweaks (working aircon, ear protection, …)
"music", # music + music addons (load before vehicles per user spec)
"wearable", # clothing, hair, tattoos (NOT armor)
"profession", # profession mods
"movement", # drop-and-roll, crawl, jump, ladders
"building", # building menus, barricades, light switches
"farming",
"zombie", # zombie behaviour mods (OccultZed, HordeNight, ReactiveZombies)
"zone", # hazardous zones, spore zones
"armor", # armor mods (separate from wearables)
"food",
"health", # first aid, medical
"weapon", # weapons (load before vehicles per user spec)
"crafting",
"container", # backpacks, boxes, tubs
"vehicle",
"vehicle_spawn", # vehicle spawn zones
"loot", # loot tables
"code", # generic gameplay code (legacy fallback)
"ui", # interface
"sound", # audio (non-music)
"texture",
"translation",
"multiplayer", # MP-specific utilities
"server_only", # admin tools, server logs
"fix", # bug-fix overlays (distinct from 'patch' loadLast tier)
"other",
"undefined",
# Spec G-patch: "patch" is a category like any other, but `_initial_sort_key`
# routes patches above all sub-axes via a leading is_patch tuple element so
# they sort strictly LAST (after every loadLast=on map mod).
"patch",
]
CATEGORY_ORDER: Dict[str, int] = {c: i for i, c in enumerate(RAW_CATEGORY_ORDER)}
LOAD_CATEGORIES: Dict[str, int] = {"on": 0, "category": 1, "off": 2}
# from MLOS_sorting.lua: frameworkKeys for name-based tweak detection.
# Lua uses string.find on lowercased name (substring match, no regex anchors).
FRAMEWORK_KEYS: List[str] = [
"framework",
" api",
"_api",
"tweak",
"interface",
"utilit", # matches utility, utilities
"bugfix",
"librar", # matches library, libraries — covers damnlib/tsarslib/StarlitLibrary/etc.
# `derive_category` checks mod.maps before FRAMEWORK_KEYS, so a map
# mod whose name contains "library" still classifies as `map` first.
]
# Multi-key list fields in mod.info (lowercased keys)
LIST_KEYS_MAP = {
"require": "requirements",
"loadafter": "loadAfter",
"loadbefore": "loadBefore",
"incompatiblemods": "incompatibleMods",
"tags": "tags",
}
# -----------------------------------------------------------------------------
# Dataclasses
# -----------------------------------------------------------------------------
@dataclass
class ModInfo:
id: str
name: str = ""
workshop_id: Optional[str] = None
category: str = "undefined"
requirements: List[str] = field(default_factory=list)
loadAfter: List[str] = field(default_factory=list)
loadBefore: List[str] = field(default_factory=list)
incompatibleMods: List[str] = field(default_factory=list)
loadFirst: str = "off"
loadLast: str = "off"
tags: List[str] = field(default_factory=list)
maps: List[str] = field(default_factory=list) # map folder names from media/maps/
flags: List[str] = field(default_factory=list)
is_addon: bool = False # Spec A addon: default-off in multi-mod wsids
# Steam Workshop's controlled-vocab tags (workshop_meta.tags). Canonical
# signal for build / multiplayer / category detection. Distinct from
# `tags` which is mod.info-side (freeform).
workshop_tags: List[str] = field(default_factory=list)
warnings: Dict[str, List[str]] = field(default_factory=dict)
@dataclass
class SortingRule:
loadAfter: List[str] = field(default_factory=list)
loadBefore: List[str] = field(default_factory=list)
incompatibleMods: List[str] = field(default_factory=list)
loadFirst: str = "off"
loadLast: str = "off"
category: Optional[str] = None
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def _split_csv(value: str) -> List[str]:
"""
Mirrors Refr_Utils:splitStringBySeparator - split on commas, trim, drop empties.
Defensively strips `word=` prefixes that some malformed mod.info lines include
(the Lua does this too). Also strips a leading backslash on each entry: B42
mod.info files write deps as `require=\\StarlitLibrary` (path-style); we want
plain modIds so dep names match for the topo sort and missing-dep warnings.
Finally strips a leading "<digits>/" wsid-path prefix some authors put in
front of the modId (e.g. require=2256623447/firearmmod).
"""
if value is None:
return []
cleaned = re.sub(r"\w+\s*=", "", value)
out: List[str] = []
for p in cleaned.split(","):
s = p.strip().lstrip("\\")
s = re.sub(r"^\d+/", "", s)
if s:
out.append(s)
return out
def _convert_load_category(value) -> str:
"""Mirrors convertToLoadCategoryString: normalize to 'on' | 'category' | 'off'."""
if value in (True, "true", 0, "0"):
return "on"
if value in (None, False, "false", 2, "2", ""):
return "off"
if value not in LOAD_CATEGORIES:
return "off"
return str(value)
def _str_contains_any(haystack: str, needles: List[str]) -> bool:
if not haystack:
return False
h = haystack.lower()
return any(n and n in h for n in needles)
# -----------------------------------------------------------------------------
# Parsers
# -----------------------------------------------------------------------------
def parse_mod_info(text: str, workshop_id: Optional[str] = None) -> Optional[ModInfo]:
"""
Parse a mod.info file body. Returns None if no `id=` line found.
Lines are `key=value`; keys lowercased; list-fields comma-separated.
"""
fields: Dict[str, object] = {}
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
m = re.match(r"^\s*(.+?)\s*=\s*(.*?)\s*$", line)
if not m:
continue
key = m.group(1).strip().lower()
value = m.group(2).strip()
if key in LIST_KEYS_MAP:
fields[LIST_KEYS_MAP[key]] = _split_csv(value)
elif key == "loadfirst":
fields["loadFirst"] = _convert_load_category(value)
elif key == "loadlast":
fields["loadLast"] = _convert_load_category(value)
elif key == "category":
fields["category"] = value if value in CATEGORY_ORDER else "undefined"
elif key == "name":
fields["name"] = value
elif key == "id":
# Some authors prefix the wsid into the id (e.g. id=2256623447/firearmmod).
# Strip a leading "<digits>/" so the canonical mod_id is the clean form.
fields["id"] = re.sub(r"^\d+/", "", value)
if "id" not in fields:
return None
return ModInfo(
id=fields["id"],
name=fields.get("name", ""),
workshop_id=workshop_id,
category=fields.get("category", "undefined"),
requirements=fields.get("requirements", []),
loadAfter=fields.get("loadAfter", []),
loadBefore=fields.get("loadBefore", []),
incompatibleMods=fields.get("incompatibleMods", []),
loadFirst=fields.get("loadFirst", "off"),
loadLast=fields.get("loadLast", "off"),
tags=fields.get("tags", []),
)
def parse_sorting_rules(text: str) -> Dict[str, SortingRule]:
"""
Parse a sorting_rules.txt file. Format:
[modId]
loadAfter=mod1,mod2
loadBefore=mod3
incompatibleMods=mod4
loadFirst=on
loadLast=off
category=tweaks
"""
rules: Dict[str, SortingRule] = {}
current: Optional[str] = None
for raw in text.splitlines():
line = raw.strip()
if not line:
continue
m = re.match(r"^\s*\[\s*(.+?)\s*\]\s*$", line)
if m:
current = m.group(1)
rules.setdefault(current, SortingRule())
continue
if current is None:
continue
kv = re.match(r"^\s*(.+?)\s*=\s*(.*?)\s*$", line)
if not kv:
continue
key, value = kv.group(1).lower(), kv.group(2)
rule = rules[current]
if key in ("loadafter", "loadmodafter"):
rule.loadAfter = list(dict.fromkeys(rule.loadAfter + _split_csv(value)))
elif key in ("loadbefore", "loadmodbefore"):
rule.loadBefore = list(dict.fromkeys(rule.loadBefore + _split_csv(value)))
elif key in ("incompatiblemods", "incompatible"):
rule.incompatibleMods = list(dict.fromkeys(rule.incompatibleMods + _split_csv(value)))
elif key == "loadfirst":
rule.loadFirst = _convert_load_category(value)
elif key == "loadlast":
rule.loadLast = _convert_load_category(value)
elif key == "category":
rule.category = value if value in CATEGORY_ORDER else None
return rules
# -----------------------------------------------------------------------------
# Filesystem ingestion (DepotDownloader output layout)
# -----------------------------------------------------------------------------
def load_mods_from_dir(root: Path) -> List[ModInfo]:
"""
Walk `<root>/<workshop_id>/mods/<mod_id>/mod.info` (DepotDownloader output).
Also: `<root>/<workshop_id>/mods/<mod_id>/media/maps/<map_folder>/map.info`
populates `maps` for that mod.
"""
mods: List[ModInfo] = []
if not root.exists():
raise FileNotFoundError(f"Mods root does not exist: {root}")
for workshop_dir in sorted(root.iterdir()):
if not workshop_dir.is_dir():
continue
workshop_id = workshop_dir.name if workshop_dir.name.isdigit() else None
mods_root = workshop_dir / "mods"
if not mods_root.exists():
# also support: some layouts put mods/ at root level directly
mods_root = workshop_dir
if not mods_root.exists() or not mods_root.is_dir():
continue
for mod_dir in sorted(mods_root.iterdir()):
if not mod_dir.is_dir():
continue
mod_info_path = mod_dir / "mod.info"
if not mod_info_path.exists():
continue
try:
text = mod_info_path.read_text(encoding="utf-8", errors="replace")
except OSError as e:
print(f"WARN: cannot read {mod_info_path}: {e}", file=sys.stderr)
continue
mod = parse_mod_info(text, workshop_id=workshop_id)
if mod is None:
print(f"WARN: no `id=` in {mod_info_path}, skipping", file=sys.stderr)
continue
# Collect map folders
maps_dir = mod_dir / "media" / "maps"
if maps_dir.exists() and maps_dir.is_dir():
for map_folder in sorted(maps_dir.iterdir()):
if map_folder.is_dir() and (map_folder / "map.info").exists():
mod.maps.append(map_folder.name)
mods.append(mod)
return mods
# -----------------------------------------------------------------------------
# Category derivation (degraded vs in-game; no folder walk)
# -----------------------------------------------------------------------------
_PATCH_NAME_RE = re.compile(r"\b(patch|compat|compatibility)\b", re.IGNORECASE)
# Substring lists used for derive_category name heuristics. Plain substring
# matching (vs. \b regex) survives PZ's mishmash of camelCase + underscore
# + version-suffix mod names (TrueActions_1.09, TrueMusic, TMMumble, …)
# that strict word boundaries fail on. False positives are accepted in
# exchange — names containing "music" without being music-related are rare
# in PZ.
_LIB_NAME_HINTS = [
"library", "libraries", "framework",
]
_LIB_NAME_RE = re.compile(
r'(?<![A-Za-z])(?:lib|api|core)(?![A-Za-z])'
r'|(?<=[a-z])(?:Lib|API|Core)(?![A-Za-z])',
re.IGNORECASE,
)
_MUSIC_NAME_HINTS = ["music", "moozic", "jukebox"]
_MOODLE_NAME_HINTS = ["moodle", "moodlet"]
_PROFESSION_HINTS = ["profession"]
_MOVEMENT_HINTS = [
"true action", "trueaction", "true_action",
"drop and roll", "dropandroll", "drop_and_roll",
"crawl", "ladder",
]
_ARMOR_NAME_HINTS = ["armor", "armour"]
_HEALTH_NAME_HINTS = ["first aid", "firstaid", "medical", "injur", "disease", "sickness"]
_CRAFTING_HINTS = ["craft"]
_CONTAINER_HINTS = ["backpack", "container", "storage"]
_LOOT_NAME_HINTS = ["loot"]
_TILE_NAME_HINTS = ["tiles", "tileset", "tilepack"]
_DEBUG_NAME_HINTS = ["debug menu", "cheat menu", "error log", "errormagnifier"]
_ZONE_NAME_HINTS = ["hazard zone", "spore zone", "spore zones"]
_ZOMBIE_NAME_HINTS = ["zombie", "horde", "undead"]
_FIX_NAME_HINTS = [" fix", "_fix", "bugfix", "hotfix"]
def _name_has(name: str, hints: List[str]) -> bool:
"""Case-insensitive substring containment against any of `hints`."""
if not name:
return False
n = name.lower()
return any(h in n for h in hints)
def derive_category(mod: ModInfo) -> str:
"""Best-effort category from mod.info + workshop_meta.tags + name.
Detection order (most specific → least):
1. mod.info `category=` if explicit and recognized.
2. patch / fix name regex (Spec G-patch).
3. library/framework name regex (extends FRAMEWORK_KEYS).
4. mod.maps non-empty → map.
5. moodle / profession / movement / specific gameplay axes by name.
6. Workshop tags (canonical Steam controlled vocab): Audio + 'music'
music; Audio → sound; Weapons → weapon; Vehicles → vehicle;
Clothing/Armor + 'armor' → armor, else wearable; Building →
building; Farming → farming; Food → food; Skills → profession
(or moodle); Interface → ui; Textures → texture;
Language/Translation → translation; QOL → qol; Multiplayer alone
→ multiplayer.
7. mod.info tags (freeform fallback).
8. FRAMEWORK_KEYS substring match → tweaks.
9. Default → other.
"""
if mod.category in CATEGORY_ORDER and mod.category != "undefined":
return mod.category
name = mod.name or ""
if name and _PATCH_NAME_RE.search(name):
return "patch"
if _name_has(name, _LIB_NAME_HINTS) or (name and _LIB_NAME_RE.search(name)):
return "tweaks"
if mod.maps:
return "map"
# Music gets PROMOTED before tag-based dispatch: many B41 music mods
# (truemusic, etc.) are tagged "Items"/"Realistic" not "Audio", but the
# name carries the signal. Same for movement (TrueActions_1.09).
if _name_has(name, _MUSIC_NAME_HINTS):
return "music"
if _name_has(name, _MOVEMENT_HINTS):
return "movement"
if _name_has(name, _MOODLE_NAME_HINTS):
return "moodle"
if _name_has(name, _DEBUG_NAME_HINTS):
return "debug"
# Tile packs win against generic "Textures"/"Models" Workshop tags so
# they land in the tile sort bucket (CATEGORY_ORDER 2) before maps load.
if _name_has(name, _TILE_NAME_HINTS):
return "tile"
ws_tags = set(mod.workshop_tags or [])
has_audio = "Audio" in ws_tags
if "Weapons" in ws_tags:
return "weapon"
if "Vehicles" in ws_tags:
if name and "spawn zone" in name.lower():
return "vehicle_spawn"
return "vehicle"
if "Clothing/Armor" in ws_tags:
if _name_has(name, _ARMOR_NAME_HINTS):
return "armor"
return "wearable"
if "Food" in ws_tags:
return "food"
if "building" in {t.lower() for t in ws_tags}:
return "building"
if "Farming" in ws_tags:
return "farming"
if "Skills" in ws_tags:
if _name_has(name, _PROFESSION_HINTS):
return "profession"
return "code"
if "Interface" in ws_tags:
return "ui"
if "Textures" in ws_tags:
return "texture"
if "Language/Translation" in ws_tags:
return "translation"
if "QOL" in ws_tags:
return "qol"
if "Map" in ws_tags:
return "map"
if _name_has(name, _TILE_NAME_HINTS):
return "tile"
if _name_has(name, _ZOMBIE_NAME_HINTS):
return "zombie"
if _name_has(name, _HEALTH_NAME_HINTS):
return "health"
if _name_has(name, _CRAFTING_HINTS):
return "crafting"
if _name_has(name, _CONTAINER_HINTS):
return "container"
if _name_has(name, _LOOT_NAME_HINTS):
return "loot"
if _name_has(name, _FIX_NAME_HINTS):
return "fix"
if _name_has(name, _ZONE_NAME_HINTS):
return "zone"
if has_audio:
return "sound"
# mod.info-side tags as final fallback before name heuristic.
tags_lc = [t.lower() for t in mod.tags]
if any("translation" in t for t in tags_lc):
return "translation"
if any("vehicle" in t for t in tags_lc):
return "vehicle"
if any("interface" in t or "ui" in t for t in tags_lc):
return "ui"
if any("clothing" in t or "skin" in t for t in tags_lc):
return "wearable"
if any("armor" in t for t in tags_lc):
return "armor"
if any("map" in t for t in tags_lc):
return "map"
if _str_contains_any(name, FRAMEWORK_KEYS):
return "tweaks"
if "Multiplayer" in ws_tags:
return "multiplayer"
return "other"
# -----------------------------------------------------------------------------
# Sort
# -----------------------------------------------------------------------------
def _apply_overrides(mods: List[ModInfo], rules: Dict[str, SortingRule]) -> None:
"""In-place: merge rules into mods, then propagate loadBefore -> reverse loadAfter."""
by_id = {m.id: m for m in mods}
for mod in mods:
rule = rules.get(mod.id)
if rule:
mod.loadAfter = list(dict.fromkeys(mod.loadAfter + rule.loadAfter))
mod.loadBefore = list(dict.fromkeys(mod.loadBefore + rule.loadBefore))
mod.incompatibleMods = list(dict.fromkeys(mod.incompatibleMods + rule.incompatibleMods))
mod.loadFirst = _convert_load_category(rule.loadFirst if rule.loadFirst != "off" else mod.loadFirst)
mod.loadLast = _convert_load_category(rule.loadLast if rule.loadLast != "off" else mod.loadLast)
if rule.category:
mod.category = rule.category
# Derive category if still undefined
if mod.category not in CATEGORY_ORDER or mod.category == "undefined":
mod.category = derive_category(mod)
# Translate loadBefore into reverse loadAfter on the target mod
# (mirrors updateSortingRulesLoadAfter)
for mod in mods:
for target in mod.loadBefore:
target_mod = by_id.get(target)
if target_mod and mod.id not in target_mod.loadAfter:
target_mod.loadAfter.append(mod.id)
def _initial_sort_key(mod: ModInfo):
"""Mirrors initialSortMods comparator. Returns sortable tuple.
Spec G-patch: index 0 is `is_patch` so patches sort strictly last - they
have to override loadLast=on map mods at runtime. Within the patch tier
the existing sub-axes still apply (PREORDER, alpha, etc.).
"""
is_patch = 1 if mod.category == "patch" else 0
pre = PREORDER.get(mod.id, 10000)
return (
is_patch,
pre,
LOAD_CATEGORIES[mod.loadFirst], # global loadFirst (on first)
-LOAD_CATEGORIES[mod.loadLast] + 100, # global loadLast (on last) -- keep parity by
# sorting "on" last; we invert so smaller=earlier
CATEGORY_ORDER.get(mod.category, CATEGORY_ORDER["undefined"]),
LOAD_CATEGORIES[mod.loadFirst], # in-category loadFirst
-LOAD_CATEGORIES[mod.loadLast] + 100, # in-category loadLast
mod.id.lower(),
)
def _topological_sort(mods: List[ModInfo]) -> Tuple[List[str], List[List[str]]]:
"""DFS topo sort on (requirements + loadAfter). Returns (order, cycles)."""
by_id = {m.id: m for m in mods}
visited: Dict[str, bool] = {}
visiting: Dict[str, bool] = {}
order: List[str] = []
cycles: List[List[str]] = []
def visit(mod: ModInfo, path: List[str]):
if visiting.get(mod.id):
cycles.append(path + [mod.id])
return
if visited.get(mod.id):
return
visiting[mod.id] = True
for dep in mod.requirements:
target = by_id.get(dep)
if target:
visit(target, path + [mod.id])
for dep in mod.loadAfter:
target = by_id.get(dep)
if target:
visit(target, path + [mod.id])
visiting[mod.id] = False
visited[mod.id] = True
order.append(mod.id)
for mod in mods:
visit(mod, [])
return order, cycles
def sort_mods(
mods: List[ModInfo],
rules: Optional[Dict[str, SortingRule]] = None,
) -> Dict[str, object]:
"""
Top-level entry: returns dict with ordered IDs + warnings.
"""
rules = rules or {}
_apply_overrides(mods, rules)
# Initial deterministic sort (preorder, loadFirst, category, loadLast, alpha)
mods.sort(key=_initial_sort_key)
order, cycles = _topological_sort(mods)
by_id = {m.id: m for m in mods}
enabled = set(by_id.keys())
missing: Dict[str, List[str]] = {}
incompat: Dict[str, List[str]] = {}
for mod in mods:
miss = [r for r in mod.requirements if r not in enabled]
if miss:
missing[mod.id] = miss
inc = [r for r in mod.incompatibleMods if r in enabled]
if inc:
incompat[mod.id] = inc
# Output blocks for the server's .ini file
mods_line = order # already mod IDs in load order
workshop_seen: List[str] = []
workshop_set = set()
for mod_id in order:
wid = by_id[mod_id].workshop_id
if wid and wid not in workshop_set:
workshop_seen.append(wid)
workshop_set.add(wid)
# MAP_LINE convention: dependencies first (leftmost), dependents last
# (rightmost). Vanilla Muldraugh, KY is the ultimate base and is
# prepended at the very front by adapters.build_response. `order` is
# already topo-sorted by mod-level deps so dependencies appear before
# their dependents — walk it forward.
map_folders: List[str] = []
for mod_id in order:
for mf in by_id[mod_id].maps:
if mf not in map_folders:
map_folders.append(mf)
return {
"Mods": mods_line,
"WorkshopItems": workshop_seen,
"Map": map_folders,
"warnings": {
"cycles": cycles,
"missing_requirements": missing,
"incompatible_enabled": incompat,
},
}
# -----------------------------------------------------------------------------
# CLI
# -----------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(
description="Sort PZ mods by load order. Reads DepotDownloader output layout.",
)
ap.add_argument("mods_root", help="Path containing <workshop_id>/mods/<mod_id>/mod.info trees")
ap.add_argument("--rules", help="Optional sorting_rules.txt path")
ap.add_argument("--json", action="store_true", help="Output JSON instead of ini blocks")
args = ap.parse_args()
root = Path(args.mods_root).resolve()
mods = load_mods_from_dir(root)
if not mods:
print("ERROR: no mods found", file=sys.stderr)
sys.exit(2)
rules: Dict[str, SortingRule] = {}
if args.rules:
rules = parse_sorting_rules(Path(args.rules).read_text(encoding="utf-8"))
result = sort_mods(mods, rules)
if args.json:
print(json.dumps(result, indent=2))
return
print("WorkshopItems=" + ";".join(result["WorkshopItems"]))
print("Mods=" + ";".join(result["Mods"]))
if result["Map"]:
print("Map=" + ";".join(result["Map"]))
w = result["warnings"]
if w["cycles"] or w["missing_requirements"] or w["incompatible_enabled"]:
print("\n# Warnings")
if w["cycles"]:
print("# cycles:", w["cycles"])
if w["missing_requirements"]:
print("# missing_requirements:", w["missing_requirements"])
if w["incompatible_enabled"]:
print("# incompatible_enabled:", w["incompatible_enabled"])
if __name__ == "__main__":
main()

75
api/parse.py Normal file
View File

@@ -0,0 +1,75 @@
"""Parse a raw textarea blob into a deduped, ordered list of workshop IDs."""
from __future__ import annotations
import re
from typing import List
def parse_workshop_input(text: str) -> List[str]:
cleaned = re.sub(
r"^\s*(WorkshopItems|Mods|Map)\s*=\s*",
"",
text,
flags=re.MULTILINE | re.IGNORECASE,
)
ids = re.findall(r"\b\d{7,12}\b", cleaned)
seen: set[str] = set()
out: List[str] = []
for i in ids:
if i not in seen:
seen.add(i)
out.append(i)
return out
# Steam Workshop URL form: https://steamcommunity.com/{sharedfiles,workshop}/filedetails/?id=NNNNNNN
_STEAM_URL_RE = re.compile(
r"https?://steamcommunity\.com/(?:sharedfiles|workshop)/filedetails/\?id=(\d{7,12})",
re.IGNORECASE,
)
def parse_with_collections(text: str) -> tuple[List[str], List[str]]:
"""Split an input blob into bare wsids and candidate collection IDs.
A "candidate collection" is any 7-12-digit ID that appears inside a
Steam Workshop URL. Bare numeric IDs in the same blob are treated as
mod wsids (current behavior). Steam doesn't syntactically distinguish
collection IDs from mod IDs; the candidate list is sent to
GetCollectionDetails to confirm. If a candidate isn't actually a
collection, the caller falls it back to wsids.
Returns (wsids, collection_ids), each deduped and in first-seen order.
"""
if not text:
return ([], [])
# 1. Find URL-form IDs FIRST (so they don't get double-counted as bare).
url_ids: List[str] = []
seen_url: set[str] = set()
for m in _STEAM_URL_RE.finditer(text):
i = m.group(1)
if i not in seen_url:
seen_url.add(i)
url_ids.append(i)
# 2. Strip the URLs out before extracting bare numbers.
text_minus_urls = _STEAM_URL_RE.sub("", text)
# 3. Bare wsids: same regex as parse_workshop_input.
cleaned = re.sub(
r"^\s*(WorkshopItems|Mods|Map)\s*=\s*",
"",
text_minus_urls,
flags=re.MULTILINE | re.IGNORECASE,
)
bare_ids = re.findall(r"\b\d{7,12}\b", cleaned)
seen_bare: set[str] = set()
bare_unique: List[str] = []
for i in bare_ids:
if i not in seen_bare and i not in seen_url:
seen_bare.add(i)
bare_unique.append(i)
return (bare_unique, url_ids)

5
api/requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
fastapi
uvicorn[standard]
asyncpg
httpx
python-dotenv

82
api/steam.py Normal file
View File

@@ -0,0 +1,82 @@
"""Async wrapper for Steam's anonymous GetPublishedFileDetails endpoint."""
from __future__ import annotations
from typing import Dict, List, TypedDict
import httpx
class CollectionEntry(TypedDict):
"""One element of fetch_collection_details's response.
result == 1 → valid collection; children populated.
result != 1 → not a collection / deleted / private; children typically [].
"""
result: int
children: List[str]
STEAM_URL = (
"https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/"
)
async def fetch_workshop_details(
client: httpx.AsyncClient,
workshop_ids: List[str],
) -> Dict[str, dict]:
if not workshop_ids:
return {}
data: Dict[str, str] = {"itemcount": str(len(workshop_ids))}
for i, wid in enumerate(workshop_ids):
data[f"publishedfileids[{i}]"] = wid
r = await client.post(STEAM_URL, data=data)
r.raise_for_status()
body = r.json()
out: Dict[str, dict] = {}
for item in body.get("response", {}).get("publishedfiledetails", []) or []:
out[item["publishedfileid"]] = item
return out
COLLECTION_URL = (
"https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/"
)
async def fetch_collection_details(
client: httpx.AsyncClient,
collection_ids: List[str],
) -> Dict[str, CollectionEntry]:
"""Resolve candidate collection IDs to their child wsids.
Returns a dict keyed by collection_id with shape:
{ "result": int, "children": List[str] }
Anonymous endpoint; no API key needed. result==1 means valid collection;
result!=1 means the ID isn't a collection (could be a mod, deleted, or
private). Caller decides what to do with non-1 results - see Spec B+F
§10 Q3 "Partial expansion failure" and Q4 "Flakiness".
"""
if not collection_ids:
return {}
data: Dict[str, str] = {"collectioncount": str(len(collection_ids))}
for i, cid in enumerate(collection_ids):
data[f"publishedfileids[{i}]"] = cid
r = await client.post(COLLECTION_URL, data=data)
r.raise_for_status()
body = r.json()
out: Dict[str, CollectionEntry] = {}
for item in body.get("response", {}).get("collectiondetails", []) or []:
cid = item.get("publishedfileid")
if not cid:
continue
out[cid] = {
"result": int(item.get("result") or 0),
"children": [
c.get("publishedfileid", "")
for c in (item.get("children") or [])
if c.get("publishedfileid")
],
}
return out