Files
sortof/worker/drain.py

230 lines
6.5 KiB
Python

"""sortof download_jobs drainer.
Long-running asyncio loop that claims queued jobs from download_jobs,
calls worker.process_one() to materialize mod_parsed rows via
DepotDownloader, and updates job status. Single connection per process;
multiple instances are safe because claims use FOR UPDATE SKIP LOCKED.
Manual requeue (after a transient failure):
UPDATE download_jobs
SET status='queued', attempts=0, error=NULL
WHERE id='<uuid>';
Bulk requeue everything that hit MAX_ATTEMPTS:
UPDATE download_jobs
SET status='queued', attempts=0, error=NULL
WHERE status='failed';
"""
from __future__ import annotations
import asyncio
import logging
import os
import signal
import sys
import time
import urllib.parse
from pathlib import Path
import asyncpg
from dotenv import load_dotenv
from worker import (
DEFAULT_DD_PATH,
fetch_workshop_details,
process_one,
)
ENV_PATH = Path(__file__).resolve().parent.parent / ".env"
IDLE_SLEEP_S = 5
HEARTBEAT_S = 60
BATCH_SIZE = 1
MAX_ATTEMPTS = 5
STALE_RECLAIM_MIN = 30
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
log = logging.getLogger("sortof.drain")
CLAIM_SQL = """
UPDATE download_jobs
SET status='downloading', attempts=attempts+1, updated_at=now()
WHERE id IN (
SELECT id FROM download_jobs
WHERE status='queued' AND attempts < $1
ORDER BY priority DESC, created_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
RETURNING id, workshop_id, attempts
"""
RECLAIM_SQL = f"""
UPDATE download_jobs
SET status='queued', updated_at=now()
WHERE status='downloading'
AND updated_at < now() - interval '{STALE_RECLAIM_MIN} minutes'
RETURNING id
"""
DEPTH_SQL = """
SELECT COUNT(*) FROM download_jobs
WHERE status='queued' AND attempts < $1
"""
DONE_SQL = """
UPDATE download_jobs
SET status='done', completed_at=now(), updated_at=now(), error=NULL
WHERE id=$1
"""
FAIL_SQL = """
UPDATE download_jobs
SET status='failed', updated_at=now(), error=$2
WHERE id=$1
"""
def build_dsn() -> str:
load_dotenv(ENV_PATH)
explicit = os.environ.get("DATABASE_URL")
if explicit:
return explicit
user = os.environ["POSTGRES_USER"]
pw = urllib.parse.quote(os.environ["POSTGRES_PASSWORD"], safe="")
name = os.environ["POSTGRES_DB"]
host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
port = os.environ.get("POSTGRES_PORT", "5439")
return f"postgresql://{user}:{pw}@{host}:{port}/{name}"
def resolve_dd_path() -> Path:
"""Resolve the DepotDownloader binary or fail loudly.
Order of precedence: $DD_PATH env var, then worker.py's argparse
default (DEFAULT_DD_PATH).
"""
candidates: list[Path] = []
env_dd = os.environ.get("DD_PATH")
if env_dd:
candidates.append(Path(env_dd))
candidates.append(Path(DEFAULT_DD_PATH))
for p in candidates:
if p.is_file():
return p
raise RuntimeError(
"DepotDownloader not found. Tried: "
+ ", ".join(str(c) for c in candidates)
+ ". Set DD_PATH or place the binary at the default path."
)
async def reclaim_stale(conn: asyncpg.Connection) -> int:
rows = await conn.fetch(RECLAIM_SQL)
return len(rows)
async def claim_batch(conn: asyncpg.Connection, n: int):
return await conn.fetch(CLAIM_SQL, MAX_ATTEMPTS, n)
async def queue_depth(conn: asyncpg.Connection) -> int:
return await conn.fetchval(DEPTH_SQL, MAX_ATTEMPTS)
async def mark_done(conn: asyncpg.Connection, job_id) -> None:
await conn.execute(DONE_SQL, job_id)
async def mark_failed(conn: asyncpg.Connection, job_id, msg: str) -> None:
await conn.execute(FAIL_SQL, job_id, msg[:500])
async def run() -> int:
dd_path = resolve_dd_path() # raises before opening DB if missing
dsn = build_dsn()
conn = await asyncpg.connect(dsn=dsn)
stop = asyncio.Event()
loop = asyncio.get_running_loop()
def _handle(sig: signal.Signals):
log.info("drain shutting down (signal=%s)", sig.name)
stop.set()
for s in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(s, _handle, s)
try:
n_reclaimed = await reclaim_stale(conn)
log.info(
"drain starting, reclaimed %d stale, dd_path=%s",
n_reclaimed, dd_path,
)
last_heartbeat = 0.0
while not stop.is_set():
rows = await claim_batch(conn, BATCH_SIZE)
if not rows:
now = time.monotonic()
if now - last_heartbeat >= HEARTBEAT_S:
depth = await queue_depth(conn)
log.info("idle, queue depth=%d", depth)
last_heartbeat = now
try:
await asyncio.wait_for(stop.wait(), timeout=IDLE_SLEEP_S)
except asyncio.TimeoutError:
pass
continue
ids = [r["workshop_id"] for r in rows]
try:
details = await asyncio.to_thread(fetch_workshop_details, ids)
except Exception as e:
log.warning("steam fetch failed: %s", e)
for r in rows:
await mark_failed(conn, r["id"], "steam fetch error")
continue
for r in rows:
wid = r["workshop_id"]
attempt = r["attempts"]
log.info("claimed wsid=%s attempt=%d", wid, attempt)
detail = details.get(wid)
if not detail or detail.get("result") != 1:
reason = (
f"steam result={detail.get('result') if detail else 'none'}"
)
log.info("failed wsid=%s reason=%s", wid, reason)
await mark_failed(conn, r["id"], reason)
continue
try:
outcome = await process_one(conn, wid, detail, dd_path, False)
except Exception as e:
log.exception("drain exception wsid=%s", wid)
await mark_failed(conn, r["id"], str(e)[:500])
continue
if outcome in ("hit", "refreshed"):
log.info("done wsid=%s outcome=%s", wid, outcome)
await mark_done(conn, r["id"])
else:
reason = f"process_one={outcome}"
log.info("failed wsid=%s reason=%s", wid, reason)
await mark_failed(conn, r["id"], reason)
finally:
await conn.close()
log.info("drain stopped")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(run()))