230 lines
6.5 KiB
Python
230 lines
6.5 KiB
Python
"""sortof download_jobs drainer.
|
|
|
|
Long-running asyncio loop that claims queued jobs from download_jobs,
|
|
calls worker.process_one() to materialize mod_parsed rows via
|
|
DepotDownloader, and updates job status. Single connection per process;
|
|
multiple instances are safe because claims use FOR UPDATE SKIP LOCKED.
|
|
|
|
Manual requeue (after a transient failure):
|
|
UPDATE download_jobs
|
|
SET status='queued', attempts=0, error=NULL
|
|
WHERE id='<uuid>';
|
|
|
|
Bulk requeue everything that hit MAX_ATTEMPTS:
|
|
UPDATE download_jobs
|
|
SET status='queued', attempts=0, error=NULL
|
|
WHERE status='failed';
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
|
|
import asyncpg
|
|
from dotenv import load_dotenv
|
|
|
|
from worker import (
|
|
DEFAULT_DD_PATH,
|
|
fetch_workshop_details,
|
|
process_one,
|
|
)
|
|
|
|
ENV_PATH = Path(__file__).resolve().parent.parent / ".env"
|
|
|
|
IDLE_SLEEP_S = 5
|
|
HEARTBEAT_S = 60
|
|
BATCH_SIZE = 1
|
|
MAX_ATTEMPTS = 5
|
|
STALE_RECLAIM_MIN = 30
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
log = logging.getLogger("sortof.drain")
|
|
|
|
|
|
CLAIM_SQL = """
|
|
UPDATE download_jobs
|
|
SET status='downloading', attempts=attempts+1, updated_at=now()
|
|
WHERE id IN (
|
|
SELECT id FROM download_jobs
|
|
WHERE status='queued' AND attempts < $1
|
|
ORDER BY priority DESC, created_at ASC
|
|
LIMIT $2
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, workshop_id, attempts
|
|
"""
|
|
|
|
RECLAIM_SQL = f"""
|
|
UPDATE download_jobs
|
|
SET status='queued', updated_at=now()
|
|
WHERE status='downloading'
|
|
AND updated_at < now() - interval '{STALE_RECLAIM_MIN} minutes'
|
|
RETURNING id
|
|
"""
|
|
|
|
DEPTH_SQL = """
|
|
SELECT COUNT(*) FROM download_jobs
|
|
WHERE status='queued' AND attempts < $1
|
|
"""
|
|
|
|
DONE_SQL = """
|
|
UPDATE download_jobs
|
|
SET status='done', completed_at=now(), updated_at=now(), error=NULL
|
|
WHERE id=$1
|
|
"""
|
|
|
|
FAIL_SQL = """
|
|
UPDATE download_jobs
|
|
SET status='failed', updated_at=now(), error=$2
|
|
WHERE id=$1
|
|
"""
|
|
|
|
|
|
def build_dsn() -> str:
|
|
load_dotenv(ENV_PATH)
|
|
explicit = os.environ.get("DATABASE_URL")
|
|
if explicit:
|
|
return explicit
|
|
user = os.environ["POSTGRES_USER"]
|
|
pw = urllib.parse.quote(os.environ["POSTGRES_PASSWORD"], safe="")
|
|
name = os.environ["POSTGRES_DB"]
|
|
host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
|
|
port = os.environ.get("POSTGRES_PORT", "5439")
|
|
return f"postgresql://{user}:{pw}@{host}:{port}/{name}"
|
|
|
|
|
|
def resolve_dd_path() -> Path:
|
|
"""Resolve the DepotDownloader binary or fail loudly.
|
|
|
|
Order of precedence: $DD_PATH env var, then worker.py's argparse
|
|
default (DEFAULT_DD_PATH).
|
|
"""
|
|
candidates: list[Path] = []
|
|
env_dd = os.environ.get("DD_PATH")
|
|
if env_dd:
|
|
candidates.append(Path(env_dd))
|
|
candidates.append(Path(DEFAULT_DD_PATH))
|
|
for p in candidates:
|
|
if p.is_file():
|
|
return p
|
|
raise RuntimeError(
|
|
"DepotDownloader not found. Tried: "
|
|
+ ", ".join(str(c) for c in candidates)
|
|
+ ". Set DD_PATH or place the binary at the default path."
|
|
)
|
|
|
|
|
|
async def reclaim_stale(conn: asyncpg.Connection) -> int:
|
|
rows = await conn.fetch(RECLAIM_SQL)
|
|
return len(rows)
|
|
|
|
|
|
async def claim_batch(conn: asyncpg.Connection, n: int):
|
|
return await conn.fetch(CLAIM_SQL, MAX_ATTEMPTS, n)
|
|
|
|
|
|
async def queue_depth(conn: asyncpg.Connection) -> int:
|
|
return await conn.fetchval(DEPTH_SQL, MAX_ATTEMPTS)
|
|
|
|
|
|
async def mark_done(conn: asyncpg.Connection, job_id) -> None:
|
|
await conn.execute(DONE_SQL, job_id)
|
|
|
|
|
|
async def mark_failed(conn: asyncpg.Connection, job_id, msg: str) -> None:
|
|
await conn.execute(FAIL_SQL, job_id, msg[:500])
|
|
|
|
|
|
async def run() -> int:
|
|
dd_path = resolve_dd_path() # raises before opening DB if missing
|
|
|
|
dsn = build_dsn()
|
|
conn = await asyncpg.connect(dsn=dsn)
|
|
|
|
stop = asyncio.Event()
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def _handle(sig: signal.Signals):
|
|
log.info("drain shutting down (signal=%s)", sig.name)
|
|
stop.set()
|
|
|
|
for s in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(s, _handle, s)
|
|
|
|
try:
|
|
n_reclaimed = await reclaim_stale(conn)
|
|
log.info(
|
|
"drain starting, reclaimed %d stale, dd_path=%s",
|
|
n_reclaimed, dd_path,
|
|
)
|
|
|
|
last_heartbeat = 0.0
|
|
while not stop.is_set():
|
|
rows = await claim_batch(conn, BATCH_SIZE)
|
|
|
|
if not rows:
|
|
now = time.monotonic()
|
|
if now - last_heartbeat >= HEARTBEAT_S:
|
|
depth = await queue_depth(conn)
|
|
log.info("idle, queue depth=%d", depth)
|
|
last_heartbeat = now
|
|
try:
|
|
await asyncio.wait_for(stop.wait(), timeout=IDLE_SLEEP_S)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
continue
|
|
|
|
ids = [r["workshop_id"] for r in rows]
|
|
try:
|
|
details = await asyncio.to_thread(fetch_workshop_details, ids)
|
|
except Exception as e:
|
|
log.warning("steam fetch failed: %s", e)
|
|
for r in rows:
|
|
await mark_failed(conn, r["id"], "steam fetch error")
|
|
continue
|
|
|
|
for r in rows:
|
|
wid = r["workshop_id"]
|
|
attempt = r["attempts"]
|
|
log.info("claimed wsid=%s attempt=%d", wid, attempt)
|
|
detail = details.get(wid)
|
|
if not detail or detail.get("result") != 1:
|
|
reason = (
|
|
f"steam result={detail.get('result') if detail else 'none'}"
|
|
)
|
|
log.info("failed wsid=%s reason=%s", wid, reason)
|
|
await mark_failed(conn, r["id"], reason)
|
|
continue
|
|
try:
|
|
outcome = await process_one(conn, wid, detail, dd_path, False)
|
|
except Exception as e:
|
|
log.exception("drain exception wsid=%s", wid)
|
|
await mark_failed(conn, r["id"], str(e)[:500])
|
|
continue
|
|
if outcome in ("hit", "refreshed"):
|
|
log.info("done wsid=%s outcome=%s", wid, outcome)
|
|
await mark_done(conn, r["id"])
|
|
else:
|
|
reason = f"process_one={outcome}"
|
|
log.info("failed wsid=%s reason=%s", wid, reason)
|
|
await mark_failed(conn, r["id"], reason)
|
|
finally:
|
|
await conn.close()
|
|
log.info("drain stopped")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(asyncio.run(run()))
|