Files
sortof/precacher/precacher.py

211 lines
7.0 KiB
Python

"""sortof precacher (Spec E): warm the cache by enqueueing the top-N PZ
Workshop wsids across four time windows (3 months, 6 months, 1 year, all time)
that aren't already known.
Pure feeder for the existing drain pipeline. Inserts into download_jobs and
returns; the drain workers (sortof-drain@1..4) handle the actual DD pulls.
Run on demand:
/opt/sortof/worker/.venv/bin/python /opt/sortof/precacher/precacher.py
Reuses the worker's venv (httpx + asyncpg) since dependencies overlap exactly.
Reads DATABASE_URL from /opt/sortof/.env.
Skip rule: a wsid is "already known" iff a row exists in mod_parsed for it
(any state) OR a row exists in download_jobs for it (any status). This is
deliberately conservative - we never re-queue a wsid the system has seen
before, including ones that previously failed (banned, deleted, no_mod_info).
Forced re-queue is the API's job, not the precacher's.
"""
from __future__ import annotations
import asyncio
import logging
import os
import re
import sys
import urllib.parse
from pathlib import Path
from typing import Dict, List, Set, Tuple
import asyncpg
import httpx
from dotenv import load_dotenv
ENV_PATH = Path(__file__).resolve().parent.parent / ".env"
def _build_dsn() -> str:
"""Mirror api/db.py: prefer DATABASE_URL, else build from POSTGRES_* parts."""
load_dotenv(ENV_PATH)
explicit = os.environ.get("DATABASE_URL")
if explicit:
return explicit
user = os.environ["POSTGRES_USER"]
pw = urllib.parse.quote(os.environ["POSTGRES_PASSWORD"], safe="")
name = os.environ["POSTGRES_DB"]
host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
port = os.environ.get("POSTGRES_PORT", "5439")
return f"postgresql://{user}:{pw}@{host}:{port}/{name}"
PZ_APPID = 108600
BROWSE_URL = "https://steamcommunity.com/workshop/browse/"
PER_PAGE = 30 # Steam HTML default; observed cap.
RATE_LIMIT_S = 0.6 # polite gap between page fetches.
DEFAULT_TARGET = 1000
# Window label -> (browsesort, days param). days < 0 means "all time" -
# Steam's totalvotes sort doesn't take a days param.
WINDOWS: List[Tuple[str, str, int]] = [
("3m", "mostpopular", 90),
("6m", "mostpopular", 180),
("1y", "mostpopular", 365),
("all", "totalvotes", -1),
]
WSID_RE = re.compile(r'data-publishedfileid="(\d{6,12})"')
log = logging.getLogger("sortof.precacher")
async def fetch_page(http: httpx.AsyncClient, sort: str, days: int, page: int) -> List[str]:
params: Dict[str, object] = {
"appid": PZ_APPID,
"browsesort": sort,
"section": "readytouseitems",
"p": page,
"numperpage": PER_PAGE,
}
if days > 0:
params["days"] = days
r = await http.get(BROWSE_URL, params=params, timeout=30.0)
r.raise_for_status()
# De-dupe within the page (the same wsid can appear in multiple HTML blocks).
return list(dict.fromkeys(WSID_RE.findall(r.text)))
async def collect_top_wsids(
http: httpx.AsyncClient, sort: str, days: int, target: int,
) -> List[str]:
"""Walk pages until we have `target` distinct wsids or pagination exhausts."""
seen: Set[str] = set()
out: List[str] = []
page = 1
consecutive_empty = 0
while len(out) < target:
try:
ids = await fetch_page(http, sort, days, page)
except httpx.HTTPError as e:
log.warning("page %d fetch failed: %s", page, e)
break
if not ids:
consecutive_empty += 1
if consecutive_empty >= 2:
break
else:
consecutive_empty = 0
added = 0
for wid in ids:
if wid in seen:
continue
seen.add(wid)
out.append(wid)
added += 1
if len(out) >= target:
break
# If a page returns only duplicates we've already seen, we're cycling.
if ids and added == 0:
break
page += 1
await asyncio.sleep(RATE_LIMIT_S)
return out
async def already_known(conn: asyncpg.Connection, wsids: List[str]) -> Set[str]:
"""Returns the subset of wsids that the system has seen - either parsed
in mod_parsed or sitting in download_jobs (any status). The conservative
superset; precacher never re-queues anything previously touched."""
if not wsids:
return set()
rows = await conn.fetch(
"""
SELECT workshop_id FROM mod_parsed WHERE workshop_id = ANY($1::text[])
UNION
SELECT workshop_id FROM download_jobs WHERE workshop_id = ANY($1::text[])
""",
wsids,
)
return {r["workshop_id"] for r in rows}
async def enqueue(conn: asyncpg.Connection, wsids: List[str]) -> int:
"""INSERT each wsid into download_jobs. Mirrors the API's queue-and-dedup
pattern: skip if a row already exists (race-safe via per-iteration tx)."""
n = 0
for wid in wsids:
async with conn.transaction():
existing = await conn.fetchval(
"SELECT 1 FROM download_jobs WHERE workshop_id = $1 LIMIT 1",
wid,
)
if existing is None:
await conn.execute(
"INSERT INTO download_jobs (workshop_id, status) VALUES ($1, 'queued')",
wid,
)
n += 1
return n
async def main(target: int = DEFAULT_TARGET) -> int:
try:
dsn = _build_dsn()
except KeyError as e:
log.error("missing required env var: %s", e)
return 2
pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=2)
http = httpx.AsyncClient(headers={"User-Agent": "sortof-precacher/1.0"})
totals = {"fetched": 0, "skipped_known": 0, "enqueued": 0}
try:
for label, sort, days in WINDOWS:
log.info(
"window=%s sort=%s days=%d: collecting up to %d wsids",
label, sort, days, target,
)
wsids = await collect_top_wsids(http, sort, days, target)
log.info("window=%s: collected %d wsids", label, len(wsids))
async with pool.acquire() as conn:
known = await already_known(conn, wsids)
fresh = [w for w in wsids if w not in known]
inserted = await enqueue(conn, fresh)
log.info(
"window=%s: known=%d fresh=%d enqueued=%d",
label, len(known), len(fresh), inserted,
)
totals["fetched"] += len(wsids)
totals["skipped_known"] += len(known)
totals["enqueued"] += inserted
finally:
await http.aclose()
await pool.close()
log.info(
"precache run done: fetched=%d skipped_known=%d enqueued=%d",
totals["fetched"], totals["skipped_known"], totals["enqueued"],
)
return 0
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
target = int(sys.argv[1]) if len(sys.argv) > 1 else DEFAULT_TARGET
sys.exit(asyncio.run(main(target)))