Files
sortof/worker/worker.py
indifferentketchup b73325882e feat: pzmm conflict detection + content-type categorization
- mod_files manifest table populated at parse time
- POST /api/conflicts endpoint
- mod_types fingerprinting feeds derive_category
- DD filelist regex broadened to cover conflict-eligible exts
- media/maps/<*>/* excluded from manifest (per-mod namespaced,
  no conflict value, can be tens of MB per mod)

Plan: docs/plans/2026-05-04-pzmm-conflict-and-typing.md
2026-05-04 15:22:35 +00:00

797 lines
30 KiB
Python

"""
worker.py - pzsort cache filler
Single-shot CLI that takes Steam Workshop IDs on argv, refreshes metadata
from Steam's anonymous API, and only runs DepotDownloader for cache misses
(where workshop_meta.time_updated has changed since last parse).
Usage:
python3 worker.py <workshop_id> [<workshop_id> ...]
python3 worker.py --force <workshop_id> ... # ignore cache, re-download
Env (or .env file):
DATABASE_URL postgresql://pzsort:<pw>@127.0.0.1:5439/pzsort
DD_PATH path to DepotDownloader executable
PZ_APP_ID 108600 (default)
"""
from __future__ import annotations
import argparse
import asyncio
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import asyncpg
import httpx
# Reuse the parser from the sorter
sys.path.insert(0, str(Path(__file__).parent))
from mlos_sort import parse_mod_info, ModInfo # noqa: E402
PZ_APP_ID = int(os.environ.get("PZ_APP_ID", "108600"))
DEFAULT_DD_PATH = os.environ.get("DD_PATH", "./DepotDownloader")
STEAM_API = "https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/"
# -----------------------------------------------------------------------------
# Steam API
# -----------------------------------------------------------------------------
def fetch_workshop_details(workshop_ids: List[str]) -> Dict[str, dict]:
"""
POST to legacy GetPublishedFileDetails. Anonymous, no API key needed.
Returns {workshop_id: detail_dict}.
"""
if not workshop_ids:
return {}
data: Dict[str, str] = {"itemcount": str(len(workshop_ids))}
for i, wid in enumerate(workshop_ids):
data[f"publishedfileids[{i}]"] = wid
with httpx.Client(timeout=30.0) as client:
r = client.post(STEAM_API, data=data)
r.raise_for_status()
body = r.json()
out: Dict[str, dict] = {}
for item in body.get("response", {}).get("publishedfiledetails", []):
out[item["publishedfileid"]] = item
return out
def flatten_tags(detail: dict) -> List[str]:
return [t.get("tag", "") for t in detail.get("tags", []) if t.get("tag")]
# Public Steam Workshop page URL. The anonymous GetPublishedFileDetails API
# does NOT return `children` for individual mods (only collections), so to
# learn a mod's "Required Items" we have to scrape the public HTML page.
_WORKSHOP_PAGE_URL = "https://steamcommunity.com/sharedfiles/filedetails/?id={wsid}"
_RE_REQUIRED_BLOCK = re.compile(
r'<div[^>]*id="RequiredItems"[^>]*>(.*?)</div>\s*</div>',
re.DOTALL,
)
_RE_REQUIRED_LINK = re.compile(r'filedetails/\?id=(\d+)')
# ── rate-limit safety for Steam HTML scraping ─────────────────────────────
# Steam aggressively 429s anonymous /sharedfiles/filedetails/ HTML requests;
# during a 2026-05-03 backfill at ~1 RPS our IP was blocked for hours and a
# subsequent single curl probe still got 429. Two file-locked, multi-process
# safeguards now sit in front of every scrape:
#
# 1. THROTTLE FILE — records the timestamp of the last attempted scrape.
# Every worker waits via flock until at least
# `_MIN_SCRAPE_INTERVAL_S` seconds have elapsed since the last one.
# Serializes 4 concurrent drain processes so they can't burst.
#
# 2. COOLDOWN FILE — when we observe a hard 429 (after retries), we write
# `now() + _COOLDOWN_S` here. While active, every fetch returns None
# instantly without touching Steam, preserving cached values until the
# IP block ages out.
#
# Defaults: 6s spacing → ≤10 RPM steady-state, 1h cooldown after a 429
# storm. Overridable via SORTOF_STEAM_MIN_INTERVAL / SORTOF_STEAM_COOLDOWN.
import fcntl as _fcntl
_THROTTLE_FILE = "/tmp/sortof_steam_throttle"
_COOLDOWN_FILE = "/tmp/sortof_steam_cooldown"
_MIN_SCRAPE_INTERVAL_S = float(os.environ.get("SORTOF_STEAM_MIN_INTERVAL", "6"))
_COOLDOWN_S = float(os.environ.get("SORTOF_STEAM_COOLDOWN", "3600"))
def _read_cooldown_until() -> float:
try:
with open(_COOLDOWN_FILE, "r") as f:
return float(f.read().strip() or 0)
except (OSError, ValueError):
return 0.0
def _write_cooldown_until(epoch_s: float) -> None:
try:
with open(_COOLDOWN_FILE, "w") as f:
f.write(str(epoch_s))
except OSError:
pass
def _throttle_scrape() -> None:
"""Block until at least `_MIN_SCRAPE_INTERVAL_S` has elapsed since the
last scrape by ANY drain process (multi-process safe via flock)."""
import time as _t
Path(_THROTTLE_FILE).touch(exist_ok=True)
with open(_THROTTLE_FILE, "r+") as f:
_fcntl.flock(f.fileno(), _fcntl.LOCK_EX)
try:
f.seek(0)
raw = f.read().strip()
last = float(raw) if raw else 0.0
now = _t.time()
wait = _MIN_SCRAPE_INTERVAL_S - (now - last)
if wait > 0:
_t.sleep(wait)
now = _t.time()
f.seek(0); f.truncate(); f.write(str(now))
finally:
_fcntl.flock(f.fileno(), _fcntl.LOCK_UN)
def fetch_required_wsids(
workshop_id: str,
timeout: int = 15,
max_attempts: int = 4,
backoff_429: float = 30.0,
) -> Optional[List[str]]:
"""Scrape the public Workshop page for Required Items wsids.
Returns
None — fetch/parse error, persistent 429, or active cooldown.
Caller MUST NOT overwrite the existing cached value.
[] — page loaded successfully but has no required items section.
list — required item wsids in declaration order, deduped.
"""
import time as _time
cooldown_until = _read_cooldown_until()
if cooldown_until and _time.time() < cooldown_until:
return None # Steam recently 429'd us — back off entirely.
_throttle_scrape()
url = _WORKSHOP_PAGE_URL.format(wsid=workshop_id)
html: Optional[str] = None
for attempt in range(1, max_attempts + 1):
try:
with httpx.Client(timeout=timeout, follow_redirects=True) as client:
r = client.get(url)
if r.status_code == 429:
if attempt < max_attempts:
_time.sleep(backoff_429 * attempt)
continue
# Final 429 → arm the global cooldown so other workers
# (and this one's next call) skip Steam entirely.
_write_cooldown_until(_time.time() + _COOLDOWN_S)
print(f" ! required_wsids 429 (gave up) for {workshop_id}; "
f"cooldown {int(_COOLDOWN_S)}s armed", file=sys.stderr)
return None
r.raise_for_status()
html = r.text
break
except (httpx.HTTPError, httpx.TimeoutException) as e:
print(f" ! required_wsids fetch failed for {workshop_id}: {e}",
file=sys.stderr)
return None
if html is None:
return None
m = _RE_REQUIRED_BLOCK.search(html)
if not m:
return []
seen: set = set()
out: List[str] = []
for w in _RE_REQUIRED_LINK.findall(m.group(1)):
if w not in seen and w != workshop_id:
seen.add(w)
out.append(w)
return out
# -----------------------------------------------------------------------------
# DepotDownloader
# -----------------------------------------------------------------------------
def run_depot_downloader(
workshop_id: str,
output_dir: Path,
dd_path: Path,
# mod.info / map.info for the existing parse path, plus _CONFLICT_EXTS
# (lua/txt/xml/json/ini) for build_manifest_and_types — both the conflict
# manifest and pzmm-style content sniffing live on these. Binary assets
# (.png/.dds/.bank/.ogg/.wav/.X) are intentionally excluded; their type
# signals degrade gracefully via workshop_meta.tags fallback in
# mlos_sort.derive_category.
filelist_regex: str = r"regex:.*\.(info|lua|txt|xml|json|ini)$",
timeout: int = 300,
max_attempts: int = 3,
backoff_s: float = 2.0,
) -> bool:
"""
Fetch workshop item using DepotDownloader, filtered to mod.info plus
conflict-eligible asset files (lua/txt/xml/json/ini). Writes
<output_dir>/mods/<mod_id>/mod.info (and the asset tree under media/).
Returns True on success.
Retries up to max_attempts times on rc!=0 or timeout - Steam Workshop's
CDN occasionally flakes on the manifest fetch and a fresh DD invocation
typically succeeds. Caller is also free to retry at a higher level
(drain.py's MAX_ATTEMPTS), but in-process retry avoids the full re-claim
cycle for the common transient case.
"""
import time as _time
output_dir.mkdir(parents=True, exist_ok=True)
filelist = output_dir / "_filelist.txt"
filelist.write_text(filelist_regex + "\n", encoding="utf-8")
cmd = [
str(dd_path),
"-app", str(PZ_APP_ID),
"-pubfile", workshop_id,
"-filelist", str(filelist),
"-dir", str(output_dir),
]
last_err = ""
for attempt in range(1, max_attempts + 1):
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
except subprocess.TimeoutExpired:
last_err = "timeout"
print(f" ! DepotDownloader timeout for {workshop_id} (attempt {attempt}/{max_attempts})",
file=sys.stderr)
else:
if proc.returncode == 0:
if attempt > 1:
print(f" ✓ DepotDownloader recovered for {workshop_id} on attempt {attempt}",
file=sys.stderr)
return True
last_err = f"rc={proc.returncode}"
print(f" ! DepotDownloader rc={proc.returncode} for {workshop_id} "
f"(attempt {attempt}/{max_attempts})", file=sys.stderr)
print(proc.stderr[-500:] if proc.stderr else proc.stdout[-500:], file=sys.stderr)
if attempt < max_attempts:
_time.sleep(backoff_s)
print(f" !! DepotDownloader gave up on {workshop_id} after {max_attempts} attempts (last: {last_err})",
file=sys.stderr)
return False
def discover_mod_infos(output_dir: Path) -> List[Path]:
"""Find all mod.info files. Two layouts coexist in the wild:
B41: mods/<mod_id>/mod.info
B42: mods/<mod_id>/<gameVersion>/mod.info e.g. mods/Foo/42/mod.info
A single mod can ship both. UPSERT on (workshop_id, mod_id) collapses
duplicates; lexicographic sort means the B41 (root-level) variant wins
last when present, the highest-numbered B42 variant otherwise."""
out = list(output_dir.glob("mods/*/mod.info"))
out.extend(output_dir.glob("mods/*/*/mod.info"))
return sorted(out)
def discover_map_folders(mip_parent: Path) -> List[str]:
"""Find map folders for the mod whose mod.info lives in `mip_parent`.
Three layouts coexist:
B41: mods/<modId>/mod.info
mods/<modId>/media/maps/<x>/map.info
B42: mods/<modId>/<branch>/mod.info (branch is e.g., '42','42.13')
mods/<modId>/<branch>/media/maps/<x>/map.info
B42 split: mod.info under '42/' but map data under a sibling 'common/'
branch — observed in Project RV Interior. This is why we
walk back to the mod-id root and enumerate every branch.
"""
if mip_parent.parent.name == "mods":
modid_root = mip_parent
else:
modid_root = mip_parent.parent
seen: set = set()
out: List[str] = []
candidates = list(modid_root.glob("media/maps/*/map.info"))
candidates.extend(modid_root.glob("*/media/maps/*/map.info"))
for cand in sorted(candidates):
folder = cand.parent.name
if folder in seen:
continue
seen.add(folder)
out.append(folder)
return out
# -----------------------------------------------------------------------------
# Single-pass manifest + content-type detection (Plan: 2026-05-04 pzmm port)
#
# Both Integration A (file-conflict manifest) and Integration B (mod_types
# fingerprinting) read the same files from the temp DD extraction. We walk
# the mod_id root once, hashing conflict-eligible files into a manifest map
# AND collecting pzmm-style content signals into a tag set in the same loop.
# No two-pass implementations.
# -----------------------------------------------------------------------------
_CONFLICT_EXTS = {".lua", ".txt", ".xml", ".json", ".ini"}
# map subtrees are namespaced per-mod by directory; conflict surface is ~zero,
# and worldmap.xml can be tens of MB. Skipping anything under maps/<MapName>/
# from manifest insertion AND mod_types content sniffing. map.info itself
# still drives `mod.maps` (via discover_map_folders) which feeds the existing
# `mod.maps non-empty → map` rule in derive_category.
_RE_MAP_SUBTREE = re.compile(r"^maps/[^/]+/")
# Ordered by pzmm preference: first match wins when types_to_category maps
# this list down to a single sortof CATEGORY_ORDER bucket.
_TYPE_PREFERRED = [
"Maps", "Vehicles", "Weapons", "Items", "Clothing", "Traits",
"Professions", "Recipes", "Tiles", "Textures", "Sounds",
"Animations", "UI", "Translations", "Lua", "Patch", "Dependency",
"Framework",
]
def _sha1(path: Path) -> str:
h = hashlib.sha1()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 128), b""):
h.update(chunk)
return h.hexdigest()
def _read_small_text(path: Path, limit: int = 256_000) -> str:
try:
with path.open("rb") as f:
data = f.read(limit)
return data.decode("utf-8", errors="ignore").lower()
except Exception:
return ""
def build_manifest_and_types(
mip_parent: Path, mod_id: str, raw: str
) -> Tuple[List[Tuple[str, str, int]], List[str]]:
"""Single-pass walk under the mod_id root.
Returns:
manifest_rows: list of (rel_path, sha1, size_bytes) for conflict-eligible
files (lua/txt/xml/json/ini). rel_path is lowercased,
posix-style, prefixed with "media/" so it's comparable
across mods regardless of B41/B42/split branch layout.
Last-wins on duplicate rel_paths from multi-branch mods.
mod_types: pzmm-ordered content tag list (Maps, Vehicles, …).
Falls back to ["Dependency"] or ["Unknown"] when no
media/ subtree exists.
"""
if mip_parent.parent.name == "mods":
modid_root = mip_parent
else:
modid_root = mip_parent.parent
tags: set = set()
name_blob = (raw or "").lower()
if any(w in name_blob for w in ("compat", "compatibility", "patch", "fix")):
tags.add("Patch")
if any(w in name_blob for w in ("api", "core", "dependency", "framework", "library", "required")):
tags.add("Dependency")
manifest_map: Dict[str, Tuple[str, int]] = {}
script_text_parts: List[str] = []
lua_text_parts: List[str] = []
has_media = False
for media_dir in modid_root.rglob("media"):
if not media_dir.is_dir():
continue
has_media = True
for path in media_dir.rglob("*"):
if not path.is_file():
continue
try:
rel_below = path.relative_to(media_dir).as_posix().lower()
except ValueError:
continue
suffix = path.suffix.lower()
in_map_subtree = bool(_RE_MAP_SUBTREE.match(rel_below))
# Manifest: skip per-map subtree (see _RE_MAP_SUBTREE comment).
if suffix in _CONFLICT_EXTS and not in_map_subtree:
rel = "media/" + rel_below
try:
size = path.stat().st_size
sha = _sha1(path)
manifest_map[rel] = (sha, size)
except OSError:
pass
# Path-based mod_type signals — always fire, even for files inside
# the per-map subtree. The "Maps" tag is the canonical signal that
# this mod ships a map and shouldn't be lost when we exclude the
# heavy worldmap.xml / objects.lua content from the manifest.
if rel_below.startswith("maps/"):
tags.add("Maps")
if rel_below.startswith("texturepacks/") or "tiledefinitions" in rel_below:
tags.add("Tiles")
if (rel_below.startswith(("textures/", "models_x/", "models/"))
or suffix in {".png", ".dds"}):
if path.name.lower() != "poster.png":
tags.add("Textures")
if rel_below.startswith(("scripts/vehicles/", "scripts/vehicle")):
tags.add("Vehicles")
if rel_below.startswith(("clothing/", "scripts/clothing/")):
tags.add("Clothing")
if (rel_below.startswith(("sound/", "sounds/", "fmod/"))
or suffix in {".bank", ".ogg", ".wav"}):
tags.add("Sounds")
if rel_below.startswith(("ui/", "lua/client/ui/")):
tags.add("UI")
if rel_below.startswith(("anims/", "animsets/", "actiongroups/")):
tags.add("Animations")
if rel_below.startswith("lua/shared/translate/") or "/translate/" in rel_below:
tags.add("Translations")
# Content-blob accumulation: skip per-map subtree. Map-internal lua
# / scripts (rare but possible) wouldn't normally collide with
# other mods anyway, and reading them just costs memory for no
# detection upside.
if not in_map_subtree:
if rel_below.startswith("lua/"):
tags.add("Lua")
if suffix == ".lua" and len(lua_text_parts) < 60:
lua_text_parts.append(_read_small_text(path, 64_000))
if (rel_below.startswith("scripts/") and suffix in {".txt", ".xml"}
and len(script_text_parts) < 80):
script_text_parts.append(_read_small_text(path, 96_000))
if not has_media:
return [], (["Dependency"] if "Dependency" in tags else ["Unknown"])
script_blob = "\n".join(script_text_parts)
lua_blob = "\n".join(lua_text_parts)
if " type = weapon" in script_blob or "displaycategory = weapon" in script_blob:
tags.add("Weapons")
if " vehicle " in script_blob or "module vehicles" in script_blob:
tags.add("Vehicles")
if "item " in script_blob:
tags.add("Items")
if "recipe " in script_blob or " evolvedrecipe " in script_blob:
tags.add("Recipes")
if "bodylocation" in script_blob or "clothingitem" in script_blob:
tags.add("Clothing")
if "traitfactory.addtrait" in lua_blob:
tags.add("Traits")
if "professionfactory.addprofession" in lua_blob:
tags.add("Professions")
has_require = bool(re.search(r"^\s*require\s*=", raw or "", re.IGNORECASE | re.MULTILINE))
if not tags and has_require and not script_blob:
tags.add("Framework")
ordered = [t for t in _TYPE_PREFERRED if t in tags] or ["Unknown"]
manifest_rows = [(rel, sha, size) for rel, (sha, size) in sorted(manifest_map.items())]
return manifest_rows, ordered
# -----------------------------------------------------------------------------
# DB upserts
# -----------------------------------------------------------------------------
UPSERT_WORKSHOP_META = """
INSERT INTO workshop_meta (
workshop_id, title, description, tags, creator_steamid,
time_created, time_updated, file_size, preview_url,
consumer_app_id, visibility, banned, last_checked_at
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12, now())
ON CONFLICT (workshop_id) DO UPDATE SET
title = EXCLUDED.title,
description = EXCLUDED.description,
tags = EXCLUDED.tags,
creator_steamid = EXCLUDED.creator_steamid,
time_created = EXCLUDED.time_created,
time_updated = EXCLUDED.time_updated,
file_size = EXCLUDED.file_size,
preview_url = EXCLUDED.preview_url,
consumer_app_id = EXCLUDED.consumer_app_id,
visibility = EXCLUDED.visibility,
banned = EXCLUDED.banned,
last_checked_at = now();
"""
EVICT_AND_RECORD_CONFLICT = """
-- Per the cache invariant: a mod_id is owned by exactly one wsid at a time.
-- When we're about to UPSERT (wsid, mod_id), evict any (other_wsid, mod_id)
-- claims so the new pull becomes canonical, and record the eviction in
-- mod_id_conflicts so /api/sort can warn users who paste the displaced wsid.
WITH evicted AS (
DELETE FROM mod_parsed
WHERE mod_id = $2 AND workshop_id <> $1
RETURNING workshop_id
)
INSERT INTO mod_id_conflicts (mod_id, evicting_wsid, evicted_wsid)
SELECT $2, $1, workshop_id FROM evicted
ON CONFLICT (mod_id, evicting_wsid, evicted_wsid)
DO UPDATE SET recorded_at = now();
"""
UPSERT_MOD_PARSED = """
INSERT INTO mod_parsed (
workshop_id, mod_id, name, category,
requirements, load_after, load_before, incompatible_mods,
load_first, load_last, tags, maps,
raw_mod_info, version_min, is_addon,
mod_types, files_manifest_built,
parsed_at_time_updated, parsed_at
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18, now())
ON CONFLICT (workshop_id, mod_id) DO UPDATE SET
name = EXCLUDED.name,
category = EXCLUDED.category,
requirements = EXCLUDED.requirements,
load_after = EXCLUDED.load_after,
load_before = EXCLUDED.load_before,
incompatible_mods = EXCLUDED.incompatible_mods,
load_first = EXCLUDED.load_first,
load_last = EXCLUDED.load_last,
tags = EXCLUDED.tags,
maps = EXCLUDED.maps,
raw_mod_info = EXCLUDED.raw_mod_info,
version_min = EXCLUDED.version_min,
is_addon = EXCLUDED.is_addon,
mod_types = EXCLUDED.mod_types,
files_manifest_built = EXCLUDED.files_manifest_built,
parsed_at_time_updated = EXCLUDED.parsed_at_time_updated,
parsed_at = now();
"""
DELETE_MOD_FILES = """
DELETE FROM mod_files WHERE workshop_id = $1 AND mod_id = $2;
"""
INSERT_MOD_FILE = """
INSERT INTO mod_files (workshop_id, mod_id, rel_path, sha1, size_bytes)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (workshop_id, mod_id, rel_path) DO UPDATE SET
sha1 = EXCLUDED.sha1,
size_bytes = EXCLUDED.size_bytes;
"""
# Description-text heuristic for "this mod is an optional add-on to the
# primary mod published by the same wsid". Matches:
# "Optional add-on: removes ..." (TMMumble)
# "optional addon ..."
# "Optional add on ..."
# Strict "optional + add-on" keyword pair to avoid false positives on
# generic "addon" naming. Author-driven signal — set via the description=
# field of mod.info.
_RE_OPTIONAL_ADDON = re.compile(
r"description\s*=\s*[^\r\n]*\bOptional\s+Add[- ]?on\b",
re.IGNORECASE,
)
def detect_is_addon(raw: str) -> bool:
"""Return True if the mod.info description self-identifies as an
optional add-on (`Optional add-on: …`)."""
return bool(_RE_OPTIONAL_ADDON.search(raw or ""))
DELETE_STALE_MOD_PARSED = """
DELETE FROM mod_parsed
WHERE workshop_id = $1 AND mod_id <> ALL($2::text[]);
"""
CHECK_PARSED_FRESH = """
SELECT mod_id FROM mod_parsed
WHERE workshop_id = $1 AND parsed_at_time_updated = $2;
"""
def extract_version_min(raw: str) -> Optional[str]:
for line in raw.splitlines():
s = line.strip().lower()
if s.startswith("versionmin"):
_, _, v = line.partition("=")
return v.strip() or None
return None
# -----------------------------------------------------------------------------
# Main flow
# -----------------------------------------------------------------------------
async def process_one(
conn: asyncpg.Connection,
workshop_id: str,
detail: dict,
dd_path: Path,
force: bool,
) -> str:
"""Returns 'hit' | 'refreshed' | 'banned' | 'missing' | 'no_mod_info' | 'failed'.
'no_mod_info' = DepotDownloader succeeded but the workshop item contained
no parseable mod.info file (typical for collections, art-only items, and
other non-mod uploads that share the PZ consumer_app_id). Distinct from
'failed' (DD itself errored), so the API can surface "this isn't a mod"
differently from "we couldn't fetch this."
"""
# Pre-flight: bad results
if detail.get("result") != 1:
return "missing"
if detail.get("banned"):
return "banned"
if detail.get("consumer_app_id") != PZ_APP_ID:
return "failed" # wrong app
time_updated = int(detail.get("time_updated", 0))
# Always refresh meta (cheap)
await conn.execute(
UPSERT_WORKSHOP_META,
workshop_id,
detail.get("title", ""),
detail.get("description", "") or "",
flatten_tags(detail),
str(detail.get("creator", "")) or None,
int(detail.get("time_created", 0)) or None,
time_updated,
int(detail.get("file_size", 0)) or None,
detail.get("preview_url"),
detail.get("consumer_app_id"),
detail.get("visibility"),
bool(detail.get("banned", False)),
)
# Cache check
if not force:
rows = await conn.fetch(CHECK_PARSED_FRESH, workshop_id, time_updated)
if rows:
return "hit"
# Cache miss → download + parse
with tempfile.TemporaryDirectory(prefix=f"pzsort_{workshop_id}_") as tmpdir:
tmp = Path(tmpdir)
ok = run_depot_downloader(workshop_id, tmp, dd_path)
if not ok:
return "failed"
mod_info_paths = discover_mod_infos(tmp)
if not mod_info_paths:
print(f" ! no mod.info found in {workshop_id}", file=sys.stderr)
return "no_mod_info"
seen_mod_ids: List[str] = []
for mip in mod_info_paths:
raw = mip.read_text(encoding="utf-8", errors="replace")
mod = parse_mod_info(raw, workshop_id=workshop_id)
if mod is None:
continue
maps = discover_map_folders(mip.parent)
# Single-pass walk under the mod_id root: produces both the
# conflict manifest and the pzmm-style mod_types list. See
# build_manifest_and_types.
manifest_rows, mod_types = build_manifest_and_types(mip.parent, mod.id, raw)
# Evict any other wsid's claim on this mod_id before we install
# ours. Cache invariant: at most one wsid per mod_id, with the
# most-recent pull winning.
await conn.execute(EVICT_AND_RECORD_CONFLICT, workshop_id, mod.id)
await conn.execute(
UPSERT_MOD_PARSED,
workshop_id,
mod.id,
mod.name,
mod.category,
mod.requirements,
mod.loadAfter,
mod.loadBefore,
mod.incompatibleMods,
mod.loadFirst,
mod.loadLast,
mod.tags,
maps,
raw,
extract_version_min(raw),
detect_is_addon(raw),
mod_types,
True, # files_manifest_built
time_updated,
)
# Replace any stale manifest rows for this (workshop_id, mod_id)
# so a re-parse can't leave behind orphans from a prior layout.
await conn.execute(DELETE_MOD_FILES, workshop_id, mod.id)
if manifest_rows:
await conn.executemany(
INSERT_MOD_FILE,
[(workshop_id, mod.id, rel, sha, size)
for rel, sha, size in manifest_rows],
)
seen_mod_ids.append(mod.id)
# Drop rows for mods that no longer exist in this workshop item
if seen_mod_ids:
await conn.execute(DELETE_STALE_MOD_PARSED, workshop_id, seen_mod_ids)
# Scrape the public Workshop page for the "Required Items" section so the
# API can auto-resolve missing-dep warnings against this mod's declared
# Steam-side dependencies. Best-effort: None on fetch error → leave the
# existing cached value; [] or list → overwrite.
required = await asyncio.to_thread(fetch_required_wsids, workshop_id)
if required is not None:
await conn.execute(
"""
UPDATE workshop_meta
SET required_wsids = $1, required_scraped_at = now()
WHERE workshop_id = $2
""",
required, workshop_id,
)
return "refreshed"
async def main_async(workshop_ids: List[str], dd_path: Path, force: bool, dsn: str) -> int:
print(f"[steam] fetching metadata for {len(workshop_ids)} item(s)")
details = fetch_workshop_details(workshop_ids)
missing_from_steam = [w for w in workshop_ids if w not in details]
if missing_from_steam:
print(f"[steam] no detail returned for: {missing_from_steam}", file=sys.stderr)
summary: Dict[str, int] = {"hit": 0, "refreshed": 0, "banned": 0, "missing": 0, "failed": 0}
conn = await asyncpg.connect(dsn=dsn)
try:
for wid in workshop_ids:
detail = details.get(wid)
if detail is None:
summary["missing"] += 1
print(f" - {wid} -> missing (no Steam response)")
continue
status = await process_one(conn, wid, detail, dd_path, force)
summary[status] += 1
print(f" - {wid} -> {status}")
finally:
await conn.close()
print(f"[done] {summary}")
return 0 if summary["failed"] == 0 else 1
def main():
ap = argparse.ArgumentParser()
ap.add_argument("workshop_ids", nargs="+")
ap.add_argument("--force", action="store_true", help="ignore cache, always re-download")
ap.add_argument("--dd-path", default=DEFAULT_DD_PATH)
ap.add_argument("--dsn", default=os.environ.get("DATABASE_URL"))
args = ap.parse_args()
if not args.dsn:
print("ERROR: --dsn or DATABASE_URL required", file=sys.stderr)
sys.exit(2)
dd = Path(args.dd_path)
if not dd.is_file():
print(f"ERROR: DepotDownloader not found at {dd}", file=sys.stderr)
sys.exit(2)
rc = asyncio.run(main_async(args.workshop_ids, dd, args.force, args.dsn))
sys.exit(rc)
if __name__ == "__main__":
main()