# Collection Expansion + Live Drain Progress Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Accept Steam Workshop collection URLs in `/api/sort`, expand them server-side via `GetCollectionDetails`, and drive a polling endpoint (`/api/jobs/{job_id}`) that gives the frontend live `cached / queued / draining` counters during cold loads. **Architecture:** A new `sort_jobs` table tracks asynchronous expansion + drain lifecycles. `/api/sort` becomes polymorphic: all-cached bare wsids return synchronously (unchanged); anything that needs work returns a `job_id`. The frontend polls `GET /api/jobs/{job_id}` every 2.5s and renders phase-specific status strip text. Phase is **derived live** from `download_jobs` counts on every poll - no event log, no leader, restart-resilient by construction. **Tech Stack:** Postgres (new table + indexes via additive migration), FastAPI (two new routes + background `asyncio.create_task` for expansion), asyncpg parameterized queries (mirroring existing patterns), httpx for `GetCollectionDetails`, vanilla React + Babel-standalone on the frontend (no build step - same as Spec A). > **Spec dependency:** Read `/opt/sortof/docs/specs/2026-05-01-collection-expansion.md` (270 lines, all decisions locked in §10) before starting. The acceptance criteria in §11 and the test recipes in §12 are what Task 11 verifies. --- ## File structure | Path | Action | Responsibility | |---|---|---| | `/opt/sortof/init/02_sort_jobs.sql` | **Create** | Schema for fresh deploys (idempotent `CREATE TABLE IF NOT EXISTS`). Identical DDL also applied to the live DB via one-shot `psql` in Task 1. | | `/opt/sortof/api/parse.py` | Modify | Add `parse_with_collections(text) -> (wsids, collection_ids)`. Reuses the existing wsid extractor; classifies URL-form IDs as candidate collections. | | `/opt/sortof/api/steam.py` | Modify | Add `async fetch_collection_details(client, ids)` mirroring the existing `fetch_workshop_details` pattern. | | `/opt/sortof/api/jobs.py` | **Create** | `sort_jobs` row CRUD, phase derivation (the §4 rule executed inside `GET`), live counts SQL, lifespan-startup stale-expansion sweep. | | `/opt/sortof/api/app.py` | Modify | Polymorphic `/api/sort`; new `GET /api/jobs/{job_id}`; new `DELETE /api/jobs/{job_id}`; lifespan sweep wired in. | | `/opt/sortof/frontend/sortof-app.jsx` | Modify | Detect `job_id` in `/api/sort` response; `pollJob()` async loop @ 2.5s; phase-specific status-strip text; cancel button; 404 expired-job toast. | | `/opt/sortof/frontend/index.html` | Modify | CSS for new phase indicators (e.g. `.status-pill.expanding`, `.cancel-btn`). | No changes to `/opt/sortof/worker/` - drain stays exactly as-is. Collections expand at API time; the resulting wsids flow into `download_jobs` via the existing queueing path. **Verification fixtures** (referenced throughout): - All-cached bare wsids: `2169435993;2392709985;2487022075` → MODS_LINE="modoptions;tsarslib;TMC_TrueActions" (canonical sync regression). - Synthetic collection (no Steam round-trip): direct `INSERT INTO collections` with known children. Used for cache-hit verification. - Real Steam collection: Task 11 step 2 instructs the implementer to find a public PZ collection URL on `https://steamcommunity.com/workshop/browse/?appid=108600§ion=collections` and use its ID. Required for cold-expansion path. --- ## Task 1: Schema migration - `sort_jobs` table **Files:** - Create: `/opt/sortof/init/02_sort_jobs.sql` - One-shot apply to live DB via `docker exec sortof_db psql` - [ ] **Step 1: Write the schema file** Create `/opt/sortof/init/02_sort_jobs.sql` with: ```sql -- Async sort jobs: lifecycle + result for collection expansion + cold drains. -- Created 2026-05-01 (Spec B+F). CREATE TABLE IF NOT EXISTS sort_jobs ( job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), phase TEXT NOT NULL CHECK (phase IN ('expanding','queued','draining','done','failed')), phase_started_at TIMESTAMPTZ NOT NULL DEFAULT now(), created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), input_raw TEXT NOT NULL, collection_ids TEXT[] NOT NULL DEFAULT '{}', wsids TEXT[], rules_raw TEXT, result_json JSONB, failure_reason TEXT ); CREATE INDEX IF NOT EXISTS sort_jobs_phase_idx ON sort_jobs (phase); CREATE INDEX IF NOT EXISTS sort_jobs_updated_idx ON sort_jobs (updated_at); DROP TRIGGER IF EXISTS sort_jobs_touch ON sort_jobs; CREATE TRIGGER sort_jobs_touch BEFORE UPDATE ON sort_jobs FOR EACH ROW EXECUTE FUNCTION touch_updated_at(); ``` The `touch_updated_at()` function already exists (defined in `init/01_schema.sql` for `download_jobs`). Note: `init/` is owned by root. Use `sudo tee` to write the file: ```bash sudo tee /opt/sortof/init/02_sort_jobs.sql > /dev/null <<'SQL' -- Async sort jobs: lifecycle + result for collection expansion + cold drains. -- Created 2026-05-01 (Spec B+F). CREATE TABLE IF NOT EXISTS sort_jobs ( job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), phase TEXT NOT NULL CHECK (phase IN ('expanding','queued','draining','done','failed')), phase_started_at TIMESTAMPTZ NOT NULL DEFAULT now(), created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), input_raw TEXT NOT NULL, collection_ids TEXT[] NOT NULL DEFAULT '{}', wsids TEXT[], rules_raw TEXT, result_json JSONB, failure_reason TEXT ); CREATE INDEX IF NOT EXISTS sort_jobs_phase_idx ON sort_jobs (phase); CREATE INDEX IF NOT EXISTS sort_jobs_updated_idx ON sort_jobs (updated_at); DROP TRIGGER IF EXISTS sort_jobs_touch ON sort_jobs; CREATE TRIGGER sort_jobs_touch BEFORE UPDATE ON sort_jobs FOR EACH ROW EXECUTE FUNCTION touch_updated_at(); SQL ``` - [ ] **Step 2: Apply DDL to the live DB** ```bash sudo docker exec -i sortof_db psql -U sortof -d sortof < /opt/sortof/init/02_sort_jobs.sql ``` Expected: a few `CREATE TABLE / CREATE INDEX / CREATE TRIGGER` notices (or none if already applied). - [ ] **Step 3: Verify the table exists with the right columns** ```bash sudo docker exec -i sortof_db psql -U sortof -d sortof -c "\d sort_jobs" ``` Expected: 11 columns matching the schema, 3 indexes (PK + phase_idx + updated_idx), trigger present. The `\d` output should mention `gen_random_uuid()` as the default for `job_id` and the CHECK constraint on `phase`. - [ ] **Step 4: Smoke insert / select** ```bash sudo docker exec -i sortof_db psql -U sortof -d sortof -c " INSERT INTO sort_jobs (phase, input_raw) VALUES ('expanding', 'smoke') RETURNING job_id, phase; DELETE FROM sort_jobs WHERE input_raw='smoke';" ``` Expected: one row returned with a UUID and phase='expanding', followed by `DELETE 1`. - [ ] **Step 5: Checkpoint** - schema is live; foundation for Tasks 4+ ready. No backup needed (DDL is idempotent and the table was empty). --- ## Task 2: Parser extension - `parse_with_collections()` **Files:** - Modify: `/opt/sortof/api/parse.py` - [ ] **Step 1: Backup** ```bash cp /opt/sortof/api/parse.py /opt/sortof/api/parse.py.bak-$(date +%Y%m%d-%H%M) ``` - [ ] **Step 2: Add the new function and a Steam-URL regex** Open `/opt/sortof/api/parse.py`. The current file defines only `parse_workshop_input(text)`. Add at the bottom (do **not** modify the existing function - it's still used by `/api/sort` for backwards compat through the polymorphic path): ```python import re as _re_module # already imported at top; alias avoided if duplicate # Steam Workshop URL form: https://steamcommunity.com/{sharedfiles,workshop}/filedetails/?id=NNNNNNN _STEAM_URL_RE = re.compile( r"https?://steamcommunity\.com/(?:sharedfiles|workshop)/filedetails/\?id=(\d{7,12})", re.IGNORECASE, ) def parse_with_collections(text: str) -> tuple[List[str], List[str]]: """Split an input blob into bare wsids and candidate collection IDs. A "candidate collection" is any 7-12-digit ID that appears inside a Steam Workshop URL. Bare numeric IDs in the same blob are treated as mod wsids (current behavior). Steam doesn't syntactically distinguish collection IDs from mod IDs; the candidate list is sent to GetCollectionDetails to confirm. If a candidate isn't actually a collection, the caller falls it back to wsids. Returns (wsids, collection_ids), each deduped and in first-seen order. """ if not text: return ([], []) # 1. Find URL-form IDs FIRST (so they don't get double-counted as bare). url_ids: List[str] = [] seen_url: set[str] = set() for m in _STEAM_URL_RE.finditer(text): i = m.group(1) if i not in seen_url: seen_url.add(i) url_ids.append(i) # 2. Strip the URLs out before extracting bare numbers. text_minus_urls = _STEAM_URL_RE.sub("", text) # 3. Bare wsids: same regex as parse_workshop_input. cleaned = re.sub( r"^\s*(WorkshopItems|Mods|Map)\s*=\s*", "", text_minus_urls, flags=re.MULTILINE | re.IGNORECASE, ) bare_ids = re.findall(r"\b\d{7,12}\b", cleaned) seen_bare: set[str] = set() bare_unique: List[str] = [] for i in bare_ids: if i not in seen_bare and i not in seen_url: seen_bare.add(i) bare_unique.append(i) return (bare_unique, url_ids) ``` (The `import re as _re_module` line is a paste-safe stub - `re` is already imported at the top of the file. Drop the alias line if a static check complains about a duplicate import.) - [ ] **Step 3: py_compile** ```bash /opt/sortof/api/.venv/bin/python -m py_compile /opt/sortof/api/parse.py && echo PY_OK ``` - [ ] **Step 4: Functional smoke test in the venv REPL** ```bash cd /opt/sortof/api && .venv/bin/python -c " from parse import parse_with_collections, parse_workshop_input # Pure bare wsids - backwards compat. assert parse_with_collections('2169435993;2392709985') == (['2169435993','2392709985'], []) # Pure URL. assert parse_with_collections('https://steamcommunity.com/sharedfiles/filedetails/?id=2200148440') == ([], ['2200148440']) # Mixed: URL + bare. assert parse_with_collections('https://steamcommunity.com/sharedfiles/filedetails/?id=2200148440\n2169435993') == (['2169435993'], ['2200148440']) # Same ID appearing both as URL AND as bare → URL wins, bare side dedups. assert parse_with_collections('2200148440\nhttps://steamcommunity.com/sharedfiles/filedetails/?id=2200148440') == ([], ['2200148440']) # Empty. assert parse_with_collections('') == ([], []) assert parse_with_collections(None) == ([], []) # Existing parse_workshop_input still works. assert parse_workshop_input('2169435993;2392709985') == ['2169435993','2392709985'] print('ALL_OK') " ``` Expected: `ALL_OK`. Any AssertionError stops the task - fix the regex/dedupe logic before proceeding. - [ ] **Step 5: Checkpoint** - parser ready for the API to consume. --- ## Task 3: Steam helper - `fetch_collection_details()` **Files:** - Modify: `/opt/sortof/api/steam.py` - [ ] **Step 1: Backup** ```bash cp /opt/sortof/api/steam.py /opt/sortof/api/steam.py.bak-$(date +%Y%m%d-%H%M) ``` - [ ] **Step 2: Add the helper, mirroring `fetch_workshop_details`** Open `/opt/sortof/api/steam.py`. The current file has one async helper. Add a sibling at the bottom: ```python COLLECTION_URL = ( "https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/" ) async def fetch_collection_details( client: httpx.AsyncClient, collection_ids: List[str], ) -> Dict[str, Dict]: """Resolve candidate collection IDs to their child wsids. Returns a dict keyed by collection_id with shape: { "result": int, "children": List[str] } Anonymous endpoint; no API key needed. result==1 means valid collection; result!=1 means the ID isn't a collection (could be a mod, deleted, or private). Caller decides what to do with non-1 results - see Spec B+F §10 Q3 "Partial expansion failure" and Q4 "Flakiness". """ if not collection_ids: return {} data: Dict[str, str] = {"collectioncount": str(len(collection_ids))} for i, cid in enumerate(collection_ids): data[f"publishedfileids[{i}]"] = cid r = await client.post(COLLECTION_URL, data=data) r.raise_for_status() body = r.json() out: Dict[str, Dict] = {} for item in body.get("response", {}).get("collectiondetails", []) or []: cid = item.get("publishedfileid") if not cid: continue out[cid] = { "result": int(item.get("result", 0)), "children": [ c.get("publishedfileid", "") for c in (item.get("children") or []) if c.get("publishedfileid") ], } return out ``` - [ ] **Step 3: py_compile + smoke import** ```bash /opt/sortof/api/.venv/bin/python -m py_compile /opt/sortof/api/steam.py && echo PY_OK cd /opt/sortof/api && .venv/bin/python -c "import app; from steam import fetch_collection_details; print(fetch_collection_details.__doc__.split(chr(10))[0])" ``` Expected: `PY_OK` and the first line of the helper's docstring. - [ ] **Step 4: Functional smoke test against real Steam (one collection ID)** The implementer should pick a known PZ collection - search `https://steamcommunity.com/workshop/browse/?appid=108600§ion=collections` for any active collection, copy its ID from the URL bar, and use it here. Substitute below: ```bash COLL_ID="" curl -sS -X POST 'https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/' \ --data-urlencode 'collectioncount=1' \ --data-urlencode "publishedfileids[0]=$COLL_ID" \ | jq '.response.collectiondetails[0] | {result, n_children: (.children | length)}' ``` Expected: `result: 1`, `n_children > 0`. Then call our helper through the venv: ```bash cd /opt/sortof/api && .venv/bin/python -c " import asyncio, httpx from steam import fetch_collection_details async def main(): async with httpx.AsyncClient(timeout=30.0) as c: out = await fetch_collection_details(c, ['$COLL_ID']) print(out) asyncio.run(main()) " ``` Expected: a dict like `{'': {'result': 1, 'children': ['', '', ...]}}`. - [ ] **Step 5: Checkpoint** - Steam helper verified end-to-end. --- ## Task 4: `jobs.py` - CRUD, phase derivation, live counts, lifespan sweep **Files:** - Create: `/opt/sortof/api/jobs.py` - [ ] **Step 1: Write the module** Create `/opt/sortof/api/jobs.py` with: ```python """sort_jobs persistence + phase derivation. Phase is *derived* on every GET (Spec B+F §4): never stored as the source of truth except for terminal states. The function `derive_phase` reads live counts from download_jobs and decides expanding/queued/draining/done. This makes the system restart-resilient by construction - there is no event log to replay. """ from __future__ import annotations import json from typing import Any, Dict, List, Optional, Tuple from uuid import UUID import asyncpg # ── CRUD ──────────────────────────────────────────────────────────────────── async def create_job( conn: asyncpg.Connection, *, input_raw: str, collection_ids: List[str], wsids: Optional[List[str]], rules_raw: Optional[str], initial_phase: str, ) -> str: """Insert a sort_jobs row and return the job_id (UUID as string). initial_phase: 'expanding' if collections still need resolving, 'queued' if wsids are already resolved at submit time. """ row = await conn.fetchrow( """ INSERT INTO sort_jobs (phase, input_raw, collection_ids, wsids, rules_raw) VALUES ($1, $2, $3, $4, $5) RETURNING job_id """, initial_phase, input_raw, collection_ids, wsids, rules_raw, ) return str(row["job_id"]) async def get_job_row(conn: asyncpg.Connection, job_id: str) -> Optional[Dict[str, Any]]: """Fetch a sort_jobs row by id. Returns None if not found. job_id may be either a string UUID or asyncpg-native UUID. """ try: uid = UUID(job_id) if isinstance(job_id, str) else job_id except ValueError: return None row = await conn.fetchrow( "SELECT * FROM sort_jobs WHERE job_id = $1", uid, ) return dict(row) if row else None async def update_phase( conn: asyncpg.Connection, job_id: str, phase: str, *, wsids: Optional[List[str]] = None, result_json: Optional[Dict[str, Any]] = None, failure_reason: Optional[str] = None, ) -> None: """Advance a job's phase. wsids/result_json/failure_reason are optional column updates that pair with phase transitions.""" sets = ["phase = $2", "phase_started_at = now()"] params: List[Any] = [UUID(job_id), phase] idx = 3 if wsids is not None: sets.append(f"wsids = ${idx}::text[]") params.append(wsids) idx += 1 if result_json is not None: sets.append(f"result_json = ${idx}::jsonb") params.append(json.dumps(result_json)) idx += 1 if failure_reason is not None: sets.append(f"failure_reason = ${idx}") params.append(failure_reason) idx += 1 await conn.execute( f"UPDATE sort_jobs SET {', '.join(sets)} WHERE job_id = $1", *params, ) # ── live counts (Spec B+F §6) ─────────────────────────────────────────────── async def compute_counts(conn: asyncpg.Connection, wsids: List[str]) -> Dict[str, int]: """Compute live cached/queued/draining counts for a set of wsids. Empty wsids → all zeros.""" if not wsids: return {"cached": 0, "queued": 0, "draining": 0} rows = await conn.fetch( """ SELECT (SELECT COUNT(DISTINCT mp.workshop_id) FROM mod_parsed mp JOIN workshop_meta wm ON wm.workshop_id = mp.workshop_id WHERE mp.workshop_id = ANY($1::text[]) AND mp.parsed_at_time_updated = wm.time_updated) AS cached, (SELECT COUNT(DISTINCT workshop_id) FROM download_jobs WHERE workshop_id = ANY($1::text[]) AND status = 'queued') AS queued, (SELECT COUNT(DISTINCT workshop_id) FROM download_jobs WHERE workshop_id = ANY($1::text[]) AND status = 'downloading') AS draining """, wsids, ) r = rows[0] return {"cached": int(r["cached"]), "queued": int(r["queued"]), "draining": int(r["draining"])} # ── phase derivation (Spec B+F §4) ────────────────────────────────────────── def derive_phase( stored_phase: str, wsids: Optional[List[str]], counts: Dict[str, int], ) -> str: """Decide the live phase from the row's stored phase + current counts. Terminal phases (done/failed) are never demoted. Non-terminal phases are recomputed from current state. """ if stored_phase in ("done", "failed"): return stored_phase if wsids is None: return "expanding" if counts["draining"] > 0: return "draining" if counts["queued"] > 0: return "queued" if counts["cached"] >= len(wsids): return "done" # Transient gap: a row just left 'queued' and hasn't shown up in # mod_parsed yet. Most likely just-failed and not yet re-queued. return "queued" # ── stale-expansion sweep (Spec B+F §9) ───────────────────────────────────── STALE_EXPANSION_SQL = """ UPDATE sort_jobs SET phase = 'failed', failure_reason = 'expansion timed out', updated_at = now() WHERE phase = 'expanding' AND phase_started_at < now() - interval '10 minutes' RETURNING job_id; """ async def sweep_stale_expansions(conn: asyncpg.Connection) -> int: """Run on uvicorn lifespan startup. Returns the number of jobs reaped.""" rows = await conn.fetch(STALE_EXPANSION_SQL) return len(rows) ``` - [ ] **Step 2: py_compile + smoke import** ```bash /opt/sortof/api/.venv/bin/python -m py_compile /opt/sortof/api/jobs.py && echo PY_OK cd /opt/sortof/api && .venv/bin/python -c "import jobs; print(sorted(n for n in dir(jobs) if not n.startswith('_'))[:8])" ``` Expected: `PY_OK` followed by a list including `compute_counts`, `create_job`, `derive_phase`, `get_job_row`, `sweep_stale_expansions`, `update_phase`. - [ ] **Step 3: Phase derivation unit smoke** Phase derivation is pure (no DB), so it's testable without a connection: ```bash cd /opt/sortof/api && .venv/bin/python -c " from jobs import derive_phase # Terminal preserved. assert derive_phase('done', ['a'], {'cached':1,'queued':0,'draining':0}) == 'done' assert derive_phase('failed', ['a'], {'cached':0,'queued':0,'draining':0}) == 'failed' # wsids null → expanding. assert derive_phase('expanding', None, {'cached':0,'queued':0,'draining':0}) == 'expanding' # Active drain. assert derive_phase('queued', ['a','b'], {'cached':0,'queued':1,'draining':1}) == 'draining' # Just queued. assert derive_phase('queued', ['a','b'], {'cached':0,'queued':2,'draining':0}) == 'queued' # All cached. assert derive_phase('queued', ['a','b'], {'cached':2,'queued':0,'draining':0}) == 'done' # Transient gap (between queued exit and parsed entry). assert derive_phase('queued', ['a','b'], {'cached':1,'queued':0,'draining':0}) == 'queued' print('PHASE_OK') " ``` Expected: `PHASE_OK`. - [ ] **Step 4: Live-counts smoke (DB round-trip)** ```bash cd /opt/sortof/api && .venv/bin/python -c " import asyncio, db from jobs import compute_counts async def main(): pool = await db.create_pool() async with pool.acquire() as conn: c1 = await compute_counts(conn, []) assert c1 == {'cached':0,'queued':0,'draining':0} # canonical 3-mod test set is fully cached. c2 = await compute_counts(conn, ['2169435993','2392709985','2487022075']) assert c2['cached'] == 3 await pool.close() print('COUNTS_OK') asyncio.run(main()) " ``` Expected: `COUNTS_OK`. - [ ] **Step 5: Checkpoint** - module reusable from `app.py`. --- ## Task 5: Background expansion task **Files:** - Create: `/opt/sortof/api/expansion.py` - [ ] **Step 1: Write the expansion runner** Create `/opt/sortof/api/expansion.py` with: ```python """Background async task: take a freshly-created sort_jobs row in 'expanding' phase, resolve its collection_ids via Steam, populate wsids[], advance phase to 'queued' (and drop wsids into download_jobs as needed).""" from __future__ import annotations import asyncio import logging from typing import Any, Dict, List import asyncpg import httpx from jobs import update_phase from steam import fetch_collection_details log = logging.getLogger("sortof.expansion") COLLECTION_TTL_SECONDS = 6 * 3600 # Spec B+F §5.3 async def _resolve_collections( conn: asyncpg.Connection, http: httpx.AsyncClient, collection_ids: List[str], ) -> tuple[Dict[str, List[str]], List[str]]: """Returns (resolved, unresolvable). resolved maps collection_id -> [child_wsids]. unresolvable lists collection_ids that GetCollectionDetails couldn't fetch (after one retry).""" if not collection_ids: return ({}, []) # Cache lookup (TTL = 6h via last_fetched_at). cache_rows = await conn.fetch( """ SELECT collection_id, child_workshop_ids FROM collections WHERE collection_id = ANY($1::text[]) AND last_fetched_at > now() - interval '6 hours' """, collection_ids, ) resolved: Dict[str, List[str]] = { r["collection_id"]: list(r["child_workshop_ids"]) for r in cache_rows } miss = [cid for cid in collection_ids if cid not in resolved] unresolvable: List[str] = [] if miss: for attempt in (1, 2): try: api_out = await fetch_collection_details(http, miss) except httpx.HTTPError as e: log.warning("GetCollectionDetails attempt %d failed: %s", attempt, e) if attempt == 1: await asyncio.sleep(2.0) continue unresolvable = list(miss) api_out = {} for cid in miss: rec = api_out.get(cid) if rec is None or rec.get("result") != 1: unresolvable.append(cid) continue children = rec.get("children") or [] resolved[cid] = list(children) await conn.execute( """ INSERT INTO collections (collection_id, child_workshop_ids, last_fetched_at) VALUES ($1, $2, now()) ON CONFLICT (collection_id) DO UPDATE SET child_workshop_ids = EXCLUDED.child_workshop_ids, last_fetched_at = now() """, cid, children, ) break # success - stop retrying # Dedupe (in case retry-on-flake added the same cid twice). seen: set[str] = set() out_unres: List[str] = [] for u in unresolvable: if u not in seen: seen.add(u) out_unres.append(u) return (resolved, out_unres) async def run_expansion( pool: asyncpg.Pool, http: httpx.AsyncClient, job_id: str, bare_wsids: List[str], collection_ids: List[str], ) -> None: """Top-level expansion task. Logs and persists; never raises out.""" try: async with pool.acquire() as conn: resolved, unresolvable = await _resolve_collections(conn, http, collection_ids) # Compose wsids: collections (in input order) + bare wsids, deduped. seen: set[str] = set() wsids: List[str] = [] for cid in collection_ids: for w in resolved.get(cid, []): if w and w not in seen: seen.add(w) wsids.append(w) for w in bare_wsids: if w not in seen: seen.add(w) wsids.append(w) if not wsids: # All collections unresolvable AND no bare wsids. Job dies. await update_phase( conn, job_id, "failed", failure_reason="all input collections unresolvable", ) log.info("expansion %s: failed - all collections unresolvable", job_id) return partial_warnings = [ { "tag": "collection-partial", "level": "warning", "msg": f"collection {cid} could not be fetched", } for cid in unresolvable ] seed_result: Dict[str, Any] = {"WARNINGS": partial_warnings} if partial_warnings else None await update_phase( conn, job_id, "queued", wsids=wsids, result_json=seed_result, ) log.info( "expansion %s: queued (wsids=%d unresolvable=%d)", job_id, len(wsids), len(unresolvable), ) except Exception: log.exception("expansion %s: crashed", job_id) try: async with pool.acquire() as conn: await update_phase(conn, job_id, "failed", failure_reason="expansion crashed") except Exception: log.exception("expansion %s: cleanup failed", job_id) ``` - [ ] **Step 2: py_compile + smoke import** ```bash /opt/sortof/api/.venv/bin/python -m py_compile /opt/sortof/api/expansion.py && echo PY_OK cd /opt/sortof/api && .venv/bin/python -c "from expansion import run_expansion; print('IMPORT_OK')" ``` Expected: `PY_OK` and `IMPORT_OK`. - [ ] **Step 3: End-to-end smoke against the live DB + Steam** ```bash cd /opt/sortof/api && .venv/bin/python -c " import asyncio, httpx, db, jobs, expansion COLL_ID = '' # same one you used in Task 3 step 4 async def main(): pool = await db.create_pool() async with pool.acquire() as conn: # Pre-clear any cached row to test the cold path. await conn.execute('DELETE FROM collections WHERE collection_id=\$1', COLL_ID) jid = await jobs.create_job( conn, input_raw=f'https://steamcommunity.com/sharedfiles/filedetails/?id={COLL_ID}', collection_ids=[COLL_ID], wsids=None, rules_raw=None, initial_phase='expanding', ) async with httpx.AsyncClient(timeout=30.0) as http: await expansion.run_expansion(pool, http, jid, [], [COLL_ID]) async with pool.acquire() as conn: row = await jobs.get_job_row(conn, jid) assert row['phase'] == 'queued', row assert row['wsids'] is not None and len(row['wsids']) > 0 # Cleanup. await conn.execute('DELETE FROM sort_jobs WHERE job_id=\$1', row['job_id']) await pool.close() print('EXPANSION_OK') asyncio.run(main()) " ``` Expected: `EXPANSION_OK`. Substitute `COLL_ID` with the real ID from Task 3. - [ ] **Step 4: Checkpoint** - expansion runner ready to be triggered from `/api/sort`. --- ## Task 6: Polymorphic `/api/sort` + lifespan sweep wiring **Files:** - Modify: `/opt/sortof/api/app.py` - [ ] **Step 1: Backup** ```bash cp /opt/sortof/api/app.py /opt/sortof/api/app.py.bak-$(date +%Y%m%d-%H%M) ``` - [ ] **Step 2: Add new imports** Find the existing import block (around lines 1-30). Add: ```python import asyncio import jobs import expansion from parse import parse_with_collections # existing parse_workshop_input import stays ``` - [ ] **Step 3: Wire stale-expansion sweep into lifespan startup** Find the existing `@asynccontextmanager async def lifespan(app: FastAPI):` block (around line 38). Inside the body, after the pool/http are created and before `yield`, add: ```python async with pool.acquire() as conn: n_reaped = await jobs.sweep_stale_expansions(conn) if n_reaped: log.info("lifespan startup: reaped %d stale expansion job(s)", n_reaped) ``` - [ ] **Step 4: Make `/api/sort` polymorphic** Find the `async def sort_endpoint(...)` function. The current body parses input via `parse_workshop_input`, hits Steam, queues misses, runs `mlos_sort`, returns sync. Replace the parsing line: ```python input_ids = parse_workshop_input(req.input or "") ``` with: ```python bare_wsids, collection_ids = parse_with_collections(req.input or "") input_ids = bare_wsids # used by existing code paths below ``` Then, immediately after the validation `raise HTTPException(...)` checks (so we still 400 on empty input and 413 on >MAX_IDS), but **before** the Steam metadata fetch, insert a fork: ```python # ── B+F: route to async job if collections present OR uncached wsids # require drain time ─────────────────────────────────────────────────── if collection_ids or len(input_ids) > 0: # Fast-path probe: are ALL bare wsids already cache-fresh? If so AND # there are no collections, fall through to the existing sync path. # (Spec B+F §10 Q1: "Bare wsid + all-cached → synchronous".) if not collection_ids and input_ids: try: steam_details = await steam.fetch_workshop_details( request.app.state.http, input_ids, ) except httpx.HTTPError as e: log.warning("steam api error: %s", e) elapsed_ms = int((time.monotonic() - t0) * 1000) log.info( "sort done hits=0 misses=%d status=error ms=%d", len(input_ids), elapsed_ms, ) return _empty_payload(input_ids, "error") # Cache check: are all input_ids in mod_parsed and fresh? pool = request.app.state.db async with pool.acquire() as conn: fresh = 0 for wid in input_ids: d = steam_details.get(wid) if not d or d.get("result") != 1: break # bail out - there's a non-cacheable id, route to job tu = int(d.get("time_updated", 0)) row = await conn.fetchrow( "SELECT 1 FROM mod_parsed " "WHERE workshop_id = $1 AND parsed_at_time_updated = $2 LIMIT 1", wid, tu, ) if row is not None: fresh += 1 else: break if fresh == len(input_ids): # All cache-fresh - sync path. Re-use the existing flow # by NOT routing to a job. Fall through. pass else: # Async path. return await _route_to_job( request, conn, req.input or "", req.rules, bare_wsids, collection_ids, ) elif collection_ids: pool = request.app.state.db async with pool.acquire() as conn: return await _route_to_job( request, conn, req.input or "", req.rules, bare_wsids, collection_ids, ) ``` This is the routing fork. The fast-path probe lets all-cached bare-wsid input fall through to the existing sync code unchanged. Anything else (uncached wsids OR any collection) returns a job_id. - [ ] **Step 5: Add `_route_to_job` helper near the route definition** Just above the `@app.post("/api/sort")` decorator, insert: ```python async def _route_to_job( request: Request, conn, input_raw: str, rules_raw: Optional[str], bare_wsids: List[str], collection_ids: List[str], ) -> Dict[str, Any]: """Create a sort_jobs row and (if needed) kick off background expansion. Returns {status, job_id} for the client to start polling.""" if collection_ids: # Will resolve in the background. job_id = await jobs.create_job( conn, input_raw=input_raw, collection_ids=collection_ids, wsids=None, rules_raw=rules_raw, initial_phase="expanding", ) asyncio.create_task(expansion.run_expansion( request.app.state.db, request.app.state.http, job_id, bare_wsids, collection_ids, )) return {"status": "expanding", "job_id": job_id} else: # Bare wsids that include uncached. Kick off cold drain by queueing. # We dedupe wsids before storing them on the job (the existing # /api/sort flow does this for bare input lists). seen: set = set() wsids: List[str] = [] for w in bare_wsids: if w not in seen: seen.add(w) wsids.append(w) job_id = await jobs.create_job( conn, input_raw=input_raw, collection_ids=[], wsids=wsids, rules_raw=rules_raw, initial_phase="queued", ) # Queue any wsids not already in download_jobs (mirrors the existing # flow at the bottom of sort_endpoint, but we don't need Steam validation # here since the GET poll will surface unknowns/non-mods naturally # via the counts contract). for wid in wsids: existing = await conn.fetchval( "SELECT 1 FROM download_jobs " "WHERE workshop_id = $1 AND status IN ('queued','downloading') LIMIT 1", wid, ) if existing is None: await conn.execute( "INSERT INTO download_jobs (workshop_id, status) VALUES ($1, 'queued')", wid, ) return {"status": "queued", "job_id": job_id} ``` - [ ] **Step 6: py_compile + smoke import** ```bash /opt/sortof/api/.venv/bin/python -m py_compile /opt/sortof/api/app.py && echo PY_OK cd /opt/sortof/api && .venv/bin/python -c "import app" && echo IMPORT_OK ``` - [ ] **Step 7: Restart API** ```bash sudo systemctl restart sortof-api && sleep 2 && sudo systemctl is-active sortof-api ``` Expected: `active`. - [ ] **Step 8: Verify the sync fast path is unchanged** ```bash curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d '{"input":"2169435993;2392709985;2487022075"}' \ | jq '{status, MODS_LINE, has_job_id: (has("job_id"))}' ``` Expected: `{"status":"success","MODS_LINE":"modoptions;tsarslib;TMC_TrueActions","has_job_id":false}`. - [ ] **Step 9: Verify the bare-uncached path returns a job_id** ```bash # First, find a wsid that ISN'T cached. The HellDrinx wsid is non_mod, not great. # Use a real PZ mod that isn't in mod_parsed yet - implementer needs to find one # fresh from Steam. Or simpler: temporarily delete a cached row to force a miss: sudo docker exec -i sortof_db psql -U sortof -d sortof -c " DELETE FROM mod_parsed WHERE workshop_id='2196102849'; DELETE FROM workshop_meta WHERE workshop_id='2196102849';" curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d '{"input":"2196102849"}' | jq '{status, has_job_id: (has("job_id")), job_id_preview: (.job_id // "" | .[0:8])}' ``` Expected: `{"status":"queued","has_job_id":true,"job_id_preview":"<8-hex-chars>"}`. The drain will reprocess `2196102849` (Raven Creek) and re-cache it; that's fine. - [ ] **Step 10: Verify the collection path returns expanding + job_id** ```bash COLL_ID="" curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d "{\"input\":\"https://steamcommunity.com/sharedfiles/filedetails/?id=$COLL_ID\"}" \ | jq '{status, has_job_id: (has("job_id"))}' ``` Expected: `{"status":"expanding","has_job_id":true}`. - [ ] **Step 11: Checkpoint** - `/api/sort` polymorphism live; jobs being created. Polling endpoint is Task 7. --- ## Task 7: `GET` + `DELETE /api/jobs/{job_id}` **Files:** - Modify: `/opt/sortof/api/app.py` (add two endpoints near the existing routes) - [ ] **Step 1: Backup (if more than ~15 minutes since the Task 6 backup)** ```bash cp /opt/sortof/api/app.py /opt/sortof/api/app.py.bak-$(date +%Y%m%d-%H%M) ``` - [ ] **Step 2: Add the GET endpoint** Find the end of `sort_endpoint` (the function body ends with `return payload`). Below it, insert: ```python @app.get("/api/jobs/{job_id}") async def get_job_endpoint(job_id: str, request: Request) -> Dict[str, Any]: pool = request.app.state.db async with pool.acquire() as conn: row = await jobs.get_job_row(conn, job_id) if row is None: raise HTTPException(status_code=404, detail="job not found or expired") wsids = list(row["wsids"]) if row["wsids"] else None counts = await jobs.compute_counts(conn, wsids or []) phase = jobs.derive_phase(row["phase"], wsids, counts) # If we just transitioned a non-terminal job to 'done', persist the # final result for future polls (and for the §3 24h TTL artifact). result_json = row["result_json"] if phase == "done" and row["phase"] != "done": result_json = await _build_result_for_job(conn, wsids, row["rules_raw"]) await jobs.update_phase( conn, job_id, "done", result_json=result_json, ) elif phase != "done" and wsids: # Compute a fresh partial result on every poll - cheap, avoids # staleness. Don't persist; only `done` writes result_json. result_json = await _build_result_for_job(conn, wsids, row["rules_raw"]) return { "job_id": str(row["job_id"]), "phase": phase, "counts": counts, "wsids": wsids, "result": result_json, "failure_reason": row["failure_reason"], } ``` - [ ] **Step 3: Add the `_build_result_for_job` helper** Just above the `_empty_payload` helper (around line 100), insert: ```python async def _build_result_for_job( conn, wsids: List[str], rules_raw: Optional[str], ) -> Dict[str, Any]: """Compute the SORTOF_DATA payload from currently-cached mod_parsed rows for the given wsids. Used both for partial results during draining and for the final result on phase transition to 'done'.""" if not wsids: return _empty_payload([], "success") rows = await conn.fetch( """ SELECT mp.workshop_id, mp.mod_id, mp.name, mp.category, mp.requirements, mp.load_after, mp.load_before, mp.incompatible_mods, mp.load_first, mp.load_last, mp.tags, mp.maps FROM mod_parsed mp JOIN workshop_meta wm ON wm.workshop_id = mp.workshop_id WHERE mp.workshop_id = ANY($1::text[]) AND mp.parsed_at_time_updated = wm.time_updated ORDER BY mp.workshop_id, mp.mod_id """, wsids, ) mods = [_row_to_modinfo(r) for r in rows] rules: Dict[str, Any] = {} if rules_raw: try: rules = parse_sorting_rules(rules_raw) except Exception: log.warning("job result: failed to parse sorting_rules") sort_result = sort_mods(mods, rules) cached_ids = list({r["workshop_id"] for r in rows}) payload = adapters.build_response( input_ids=wsids, # contract: WORKSHOP_ITEMS_LINE = wsids[] at job creation hit_ids=cached_ids, mods=mods, sort_result=sort_result, status="success" if len(cached_ids) >= len(wsids) else "partial", ) # Forced override: WORKSHOP_ITEMS_LINE locked to the original wsids[] # regardless of which are currently cached (Spec A §8 / Spec B+F §6). payload["WORKSHOP_ITEMS_LINE"] = ";".join(wsids) + ";" if wsids else "" payload["pending"] = [w for w in wsids if w not in set(cached_ids)] payload["unknown"] = [] # this endpoint doesn't compute Steam-result-9 payload["non_mod"] = [] # nor non-mod classification - those are sync-path concerns return payload ``` - [ ] **Step 4: Add the DELETE endpoint** Below the GET endpoint, insert: ```python @app.delete("/api/jobs/{job_id}", status_code=204) async def delete_job_endpoint(job_id: str, request: Request): """Cancel a job. Idempotent: cancelling a terminal job is a no-op 204. Does NOT touch download_jobs (Spec B+F §8).""" pool = request.app.state.db async with pool.acquire() as conn: row = await jobs.get_job_row(conn, job_id) if row is None: raise HTTPException(status_code=404, detail="job not found") if row["phase"] not in ("done", "failed"): await jobs.update_phase(conn, job_id, "failed", failure_reason="cancelled") return None ``` - [ ] **Step 5: py_compile + restart** ```bash /opt/sortof/api/.venv/bin/python -m py_compile /opt/sortof/api/app.py && echo PY_OK cd /opt/sortof/api && .venv/bin/python -c "import app" && echo IMPORT_OK sudo systemctl restart sortof-api && sleep 2 && sudo systemctl is-active sortof-api ``` - [ ] **Step 6: Verify GET on a fresh collection job** ```bash COLL_ID="" JOB_RESP=$(curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d "{\"input\":\"https://steamcommunity.com/sharedfiles/filedetails/?id=$COLL_ID\"}") JID=$(echo "$JOB_RESP" | jq -r '.job_id') echo "job_id=$JID" # First poll, likely expanding curl -sS http://100.114.205.53:8801/api/jobs/$JID | jq '{phase, counts, has_wsids: (.wsids != null)}' # Wait for expansion + initial drain sleep 3 curl -sS http://100.114.205.53:8801/api/jobs/$JID | jq '{phase, counts, n_wsids: (.wsids|length)}' # 404 on garbage id curl -sS -o /dev/null -w 'http=%{http_code}\n' http://100.114.205.53:8801/api/jobs/00000000-0000-0000-0000-000000000000 ``` Expected: first poll has `phase: "expanding"`; second poll has `phase` in `(queued, draining, done)` with `n_wsids > 0`; the garbage id returns `http=404`. - [ ] **Step 7: Verify DELETE on an active job** ```bash # Submit a fresh job so we can cancel it before it drains JID=$(curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d "{\"input\":\"https://steamcommunity.com/sharedfiles/filedetails/?id=$COLL_ID\"}" \ | jq -r '.job_id') # Cancel immediately curl -sS -o /dev/null -w 'cancel=%{http_code}\n' -X DELETE http://100.114.205.53:8801/api/jobs/$JID # Idempotent re-cancel curl -sS -o /dev/null -w 'recancel=%{http_code}\n' -X DELETE http://100.114.205.53:8801/api/jobs/$JID # Confirm phase curl -sS http://100.114.205.53:8801/api/jobs/$JID | jq '{phase, failure_reason}' ``` Expected: `cancel=204`, `recancel=204`, `phase: "failed"`, `failure_reason: "cancelled"`. - [ ] **Step 8: Checkpoint** - backend complete. Frontend wiring is Tasks 8-10. --- ## Task 8: Frontend - detect `job_id`, polling loop, partial-result rendering **Files:** - Modify: `/opt/sortof/frontend/sortof-app.jsx` - [ ] **Step 1: Backup** ```bash cp /opt/sortof/frontend/sortof-app.jsx /opt/sortof/frontend/sortof-app.jsx.bak-$(date +%Y%m%d-%H%M) ``` - [ ] **Step 2: Add a polling helper near other top-of-file helpers** Find the `buildModsLine` function (around line 32). Below it (and **above** the `isRadioMode` / `defaultSelectionForBranches` block), add: ```jsx // Spec B+F: poll a job. Resolves with { phase, result, counts, wsids } on // terminal phase OR when an explicit stop signal fires. Caller controls the // AbortSignal to cancel polling on unmount / cancel-button / new-sort. const POLL_INTERVAL_MS = 2500; async function pollJobOnce(jobId) { const res = await fetch(`/api/jobs/${jobId}`); if (res.status === 404) return { kind: 'expired' }; if (!res.ok) return { kind: 'error', status: res.status }; const json = await res.json(); return { kind: 'ok', body: json }; } function pollJobLoop(jobId, signal, onTick) { // Returns a Promise that resolves on terminal phase or AbortSignal. return new Promise((resolve) => { let timer = null; async function tick() { if (signal.aborted) { if (timer) clearTimeout(timer); resolve({ kind: 'aborted' }); return; } const r = await pollJobOnce(jobId); if (signal.aborted) { resolve({ kind: 'aborted' }); return; } onTick(r); if (r.kind === 'expired' || r.kind === 'error') { resolve(r); return; } const phase = r.body.phase; if (phase === 'done' || phase === 'failed') { resolve(r); return; } timer = setTimeout(tick, POLL_INTERVAL_MS); } tick(); }); } ``` - [ ] **Step 3: Add an AbortController ref + cancel-job state in App** Find the App function's state declarations (search for `const [pzBuild, setPzBuild]`). Below the existing useState/useRef block but above the existing useEffects, add: ```jsx const pollAbortRef = useRef(null); const [activeJobId, setActiveJobId] = useState(null); ``` - [ ] **Step 4: Update `onSort` to branch on `job_id`** Find the `async function onSort()` body. The current body POSTs to `/api/sort` and applies the response. Find the line that receives the response: ```jsx const json = await res.json(); _liveSortData = json; ``` Insert a branch immediately before `_liveSortData = json`: ```jsx const json = await res.json(); if (json.job_id) { // Async path - start polling and let the loop drive state. // Abort any in-flight previous poll. if (pollAbortRef.current) { pollAbortRef.current.abort(); } const ctrl = new AbortController(); pollAbortRef.current = ctrl; setActiveJobId(json.job_id); pollJobLoop(json.job_id, ctrl.signal, (r) => { if (r.kind !== 'ok') return; const b = r.body; if (b.result) { _liveSortData = b.result; sortContextRef.current = { workshopItemsLine: (b.result.WORKSHOP_ITEMS_LINE) || '', originalQueued: (b.result.pending || []).length, unknown: b.result.unknown || [], nonMod: b.result.non_mod || [], }; } setProgress(b.phase === 'expanding' ? 5 : Math.min(95, 10 + b.counts.cached)); setCounts({ cached: b.counts.cached, queued: b.counts.queued, parsing: b.counts.draining, warnings: ((b.result && b.result.WARNINGS) || []).length, unknown: ((b.result && b.result.unknown) || []).length, nonMod: ((b.result && b.result.non_mod) || []).length, }); setState(b.phase); // 'expanding' | 'queued' | 'draining' | 'done' | 'failed' }).then((final) => { setActiveJobId(null); if (final.kind === 'expired') { setState('error'); _liveSortData = { ...(_liveSortData || {}), WARNINGS: [ ...((_liveSortData?.WARNINGS) || []), { tag: 'retry', level: 'red', msg: 'this job expired - re-submit' }, ], }; } }); return; } // Sync fast path - existing code follows. _liveSortData = json; ``` - [ ] **Step 5: Verify served file picks up the new symbols** ```bash curl -sS http://100.114.205.53:8801/sortof-app.jsx | grep -cE 'pollJobLoop|pollJobOnce|activeJobId|pollAbortRef|/api/jobs/' ``` Expected: ≥ 6. - [ ] **Step 6: Manual browser smoke (or curl-driven simulation)** The implementer should open `https://sortof.indifferentketchup.com/` and submit a real PZ collection URL. Expect: status strip text changes from `expanding…` to `queued`/`draining` to `done`. Network tab shows `GET /api/jobs/` calls every 2.5s. Output panel populates as mods land in `mod_parsed`. If headless: replicate by curling `/api/sort` with a collection URL, capturing the job_id, and curling `/api/jobs/` repeatedly until `phase=done`. Confirm `result_json` populates with `MODS_LINE` etc. once draining completes. - [ ] **Step 7: Checkpoint** - polling drives `_liveSortData` updates. Phase-specific status strip is Task 9. --- ## Task 9: Frontend - phase-specific status strip **Files:** - Modify: `/opt/sortof/frontend/sortof-app.jsx` - [ ] **Step 1: Backup** ```bash cp /opt/sortof/frontend/sortof-app.jsx /opt/sortof/frontend/sortof-app.jsx.bak-$(date +%Y%m%d-%H%M) ``` - [ ] **Step 2: Update `StatusStrip` to render phase-specific text** Find the existing `function StatusStrip({ state, counts, progress })`. The existing function returns either an idle strip or a counts strip based on `state`. Replace its body with: ```jsx function StatusStrip({ state, counts, progress }) { // Idle / terminal states - single pill summary. if (state === 'idle' || state === 'success' || state === 'error' || state === 'cold' || state === 'done' || state === 'failed') { return (
{state === 'idle' && 'ready when you are'} {state === 'success' && `done. ${counts.cached} mods, ${counts.warnings} warnings`} {state === 'done' && `done. ${counts.cached} mods, ${counts.warnings} warnings`} {state === 'error' && 'something went sideways'} {state === 'failed' && 'job failed'} {state === 'cold' && 'cache miss - be patient'}
); } // 'expanding' phase - no useful counts yet. if (state === 'expanding') { return (
expanding collection…
); } // 'queued' or 'draining' - live counts. Existing 'partial'/'loading' too. return (
{counts.cached} cached {counts.queued} queued {counts.parsing} draining {counts.unknown > 0 && ( {counts.unknown} unknown )} {counts.nonMod > 0 && ( {counts.nonMod} non-mod )}
); } ``` - [ ] **Step 3: Verify served** ```bash curl -sS http://100.114.205.53:8801/sortof-app.jsx | grep -cE 'expanding collection|state === .expanding|state === .draining|state === .done|state === .failed' ``` Expected: ≥ 4. - [ ] **Step 4: Manual browser smoke** Submit a collection URL. Expect: strip starts with `expanding collection…`, transitions to live counts, ends with `done. N mods, …` summary. --- ## Task 10: Frontend - Cancel button + 404 expired-job handling + CSS **Files:** - Modify: `/opt/sortof/frontend/sortof-app.jsx` - Modify: `/opt/sortof/frontend/index.html` - [ ] **Step 1: Backup** ```bash cp /opt/sortof/frontend/sortof-app.jsx /opt/sortof/frontend/sortof-app.jsx.bak-$(date +%Y%m%d-%H%M) cp /opt/sortof/frontend/index.html /opt/sortof/frontend/index.html.bak-$(date +%Y%m%d-%H%M) ``` - [ ] **Step 2: Render a Cancel button when a job is active** Find where the Sort button is rendered (search for `sort-btn` and `onClick={onSort}`). It's inside the left column. Below the existing Sort button JSX, add: ```jsx {activeJobId && ( )} ``` - [ ] **Step 3: CSS for `.status-pill.expanding` and `.cancel-btn`** Open `/opt/sortof/frontend/index.html`. Find the `.status-pill.nonmod` rule (added during the unknown/non-mod feature). Below it, add: ```css .status-pill.expanding { color: var(--acc-blue); } .status-pill.expanding .dot-led { background: var(--acc-blue); animation: bl 1.2s ease-in-out infinite; } .cancel-btn { appearance: none; width: 100%; height: 32px; margin-top: 6px; border: 1px solid var(--line); background: transparent; color: var(--fg-2); border-radius: var(--radius); font-family: var(--mono); font-size: 12px; cursor: pointer; transition: color .12s, border-color .12s, background .12s; } .cancel-btn:hover { color: var(--acc-red); border-color: var(--acc-red); background: var(--acc-red-bg); } ``` - [ ] **Step 4: Verify served** ```bash curl -sS http://100.114.205.53:8801/sortof-app.jsx | grep -c 'cancel-btn' curl -sS http://100.114.205.53:8801/ | grep -cE '\.status-pill\.expanding|\.cancel-btn' ``` Expected: ≥ 1 (jsx), ≥ 2 (CSS). - [ ] **Step 5: Manual browser smoke** Submit a fresh cold collection. While the strip reads `expanding` or `draining`, click cancel. Expect: strip clears, `setActiveJobId(null)` fires, no further GET polls in the network tab. --- ## Task 11: Spec §11 acceptance + §12 test recipes For each item, document expected vs actual. If any fails, return to the relevant task. - [ ] **Step 1: §11.1 Sync fast path** - bare wsids, all cached. ```bash curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d '{"input":"2169435993;2392709985;2487022075"}' | jq '{has_job_id: (has("job_id")), MODS_LINE}' ``` Expected: `{"has_job_id": false, "MODS_LINE": "modoptions;tsarslib;TMC_TrueActions"}`. - [ ] **Step 2: §11.2 Async path on uncached bare wsid** ```bash sudo docker exec -i sortof_db psql -U sortof -d sortof -c "DELETE FROM mod_parsed WHERE workshop_id='2196102849'; DELETE FROM workshop_meta WHERE workshop_id='2196102849';" curl -sS -X POST http://100.114.205.53:8801/api/sort -H 'Content-Type: application/json' -d '{"input":"2196102849"}' | jq '{status, has_job_id: (has("job_id"))}' ``` Expected: `{"status":"queued","has_job_id":true}`. - [ ] **Step 3: §11.3 Collection URL → expanding** ```bash COLL_ID="" curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d "{\"input\":\"https://steamcommunity.com/sharedfiles/filedetails/?id=$COLL_ID\"}" \ | jq '{status, has_job_id: (has("job_id"))}' ``` Expected: `{"status":"expanding","has_job_id":true}`. - [ ] **Step 4: §11.4 GET on bogus job → 404** ```bash curl -sS -o /dev/null -w 'http=%{http_code}\n' http://100.114.205.53:8801/api/jobs/00000000-0000-0000-0000-000000000000 ``` Expected: `http=404`. - [ ] **Step 5: §11.5 DELETE → idempotent 204** ```bash JID=$(curl -sS -X POST http://100.114.205.53:8801/api/sort -H 'Content-Type: application/json' -d "{\"input\":\"https://steamcommunity.com/sharedfiles/filedetails/?id=$COLL_ID\"}" | jq -r '.job_id') curl -sS -o /dev/null -w 'cancel1=%{http_code}\n' -X DELETE http://100.114.205.53:8801/api/jobs/$JID curl -sS -o /dev/null -w 'cancel2=%{http_code}\n' -X DELETE http://100.114.205.53:8801/api/jobs/$JID curl -sS http://100.114.205.53:8801/api/jobs/$JID | jq '{phase, failure_reason}' ``` Expected: `cancel1=204`, `cancel2=204`, `{"phase":"failed","failure_reason":"cancelled"}`. - [ ] **Step 6: §11.6 Steam URL detection + GetCollectionDetails routing** Already verified by step 3 of this task. Confirm via journal: ```bash sudo journalctl -u sortof-api --since "2 min ago" | grep -E 'GetCollectionDetails|expansion' ``` Expected: at least one entry naming the collection ID. - [ ] **Step 7: §11.7 Cache hit on second submit (within 6h)** Re-submit the same collection URL. Confirm: response is fast; `journalctl` for the new request does NOT show a fresh GetCollectionDetails call. (The expansion task's cache hit short-circuits the API call.) An implementation note: a second submit creates a new sort_jobs row but reuses the cached children. - [ ] **Step 8: §11.8 Partial-collection failure** ```bash # Combine real + bogus collection URL curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d "{\"input\":\"https://steamcommunity.com/sharedfiles/filedetails/?id=$COLL_ID\nhttps://steamcommunity.com/sharedfiles/filedetails/?id=99999999\"}" | jq '.job_id' ``` Then poll the returned job until `phase=done`, and check `result.WARNINGS`: ```bash curl -sS http://100.114.205.53:8801/api/jobs/ | jq '.result.WARNINGS[] | select(.tag=="collection-partial")' ``` Expected: one entry with `msg` mentioning `99999999`. - [ ] **Step 9: §11.9 All collections fail** ```bash curl -sS -X POST http://100.114.205.53:8801/api/sort \ -H 'Content-Type: application/json' \ -d '{"input":"https://steamcommunity.com/sharedfiles/filedetails/?id=99999999"}' | jq -r '.job_id' | tee /tmp/jid_test sleep 3 curl -sS http://100.114.205.53:8801/api/jobs/$(cat /tmp/jid_test) | jq '{phase, failure_reason}' ``` Expected: `{"phase":"failed","failure_reason":"all input collections unresolvable"}`. - [ ] **Step 10: §11.10 Stale-expansion sweep on restart** ```bash # Manually create a stale expansion row sudo docker exec -i sortof_db psql -U sortof -d sortof -c " INSERT INTO sort_jobs (phase, phase_started_at, input_raw, collection_ids) VALUES ('expanding', now() - interval '15 minutes', 'sweep test', ARRAY['99999999']) RETURNING job_id;" | tail -3 | head -1 | xargs -I{} echo "stale_jid={}" sudo systemctl restart sortof-api && sleep 3 sudo docker exec -i sortof_db psql -U sortof -d sortof -c " SELECT phase, failure_reason FROM sort_jobs WHERE input_raw='sweep test';" ``` Expected: phase=`failed`, failure_reason=`expansion timed out`. - [ ] **Step 11: §11.11 Counts contract - sum equals total minus non_mod** After a cold collection drains, sum the live counts and compare to wsids count: ```bash JID= curl -sS http://100.114.205.53:8801/api/jobs/$JID | jq ' .counts as $c | .wsids as $w | { sum: ($c.cached + $c.queued + $c.draining), total_wsids: ($w | length), delta: (($w | length) - ($c.cached + $c.queued + $c.draining)) }' ``` Expected: `delta` is the count of wsids that ended up `non_mod` or `unknown` (not in any of the three count buckets) - typically 0 or a small integer. - [ ] **Step 12: §11.12 WORKSHOP_ITEMS_LINE locked at job creation** After cancellation or partial-failure, the final `result.WORKSHOP_ITEMS_LINE` from `/api/jobs/` must equal `wsids[]` joined by `;` regardless of how many landed in `non_mod` / `unknown`. Spot-check by comparing. - [ ] **Step 13: Public-hostname mirror** ```bash curl -sS -X POST https://sortof.indifferentketchup.com/api/sort \ -H 'Content-Type: application/json' \ -d '{"input":"2169435993;2392709985;2487022075"}' | jq '{status, MODS_LINE}' ``` Expected: success, canonical MODS_LINE. Public side mirrors all backend behavior. - [ ] **Step 14: Final regression** - re-run the canonical 3-mod sync sort once more and confirm the response shape did not gain a `job_id` field. --- ## Self-review (already applied) - **Spec coverage:** §1 (overview) - Tasks 1-7 cover backend, 8-10 cover frontend. §2 (API contract) - Tasks 6-7. §3 (schema) - Task 1. §4 (phase machine) - `derive_phase` in Task 4. §5 (Steam expansion) - Tasks 3, 5. §6 (counts contract) - `compute_counts` in Task 4, applied in Task 7. §7 (frontend) - Tasks 8-10. §8 (cancellation) - Task 7 step 4 + Task 10. §9 (restart resilience) - `sweep_stale_expansions` in Task 4 + lifespan wiring in Task 6. §10 (open questions) - locked in spec; plan implements verbatim. §11/§12 - Task 11. - **Placeholders:** all `` markers explicitly call out the implementer-action; no TBDs. - **Type consistency:** `wsids` is `List[str] | None` everywhere; `counts` is `{cached: int, queued: int, draining: int}` everywhere; `phase` is the locked enum throughout. `pollJobLoop` callback receives the same shape `r.body` matches the GET endpoint return. - **No git, by design:** every code-changing task starts with a `cp file file.bak-$(date)` step in lieu of a commit. The schema migration in Task 1 is idempotent (`CREATE TABLE IF NOT EXISTS`) so no rollback file is needed. - **Restart-vs-no-restart:** backend tasks (1, 4, 5, 6, 7) end with `sudo systemctl restart sortof-api`. Frontend tasks (8, 9, 10) end with `curl-grep` only - `StaticFiles` serves from disk; hard-refresh in browser. Worker is unchanged across the entire feature.