From f89c8f3f155333a202a2f148a072c2143dee1378 Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Fri, 29 May 2026 03:11:34 +0000 Subject: [PATCH] coder(dispatcher): react to new tasks via LISTEN/NOTIFY, poll as fallback AFTER INSERT trigger on tasks fires pg_notify('tasks_new'); the dispatcher listens via porsager sql.listen and triggers an immediate poll, with the setInterval poll kept at 2s as a missed-notification safety net. Per-session guard unchanged (no double-dispatch). Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/coder/src/schema.sql | 19 +++++++++++ apps/coder/src/services/dispatcher.ts | 47 +++++++++++++++++++++++---- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/apps/coder/src/schema.sql b/apps/coder/src/schema.sql index 7edb8b9..2e1e4dd 100644 --- a/apps/coder/src/schema.sql +++ b/apps/coder/src/schema.sql @@ -71,3 +71,22 @@ ALTER TABLE available_agents ADD COLUMN IF NOT EXISTS transport TEXT DEFAULT 'pt ALTER TABLE tasks ADD COLUMN IF NOT EXISTS mode_id TEXT; ALTER TABLE tasks ADD COLUMN IF NOT EXISTS thinking_option_id TEXT; ALTER TABLE tasks ADD COLUMN IF NOT EXISTS feature_values JSONB; + +-- LISTEN/NOTIFY fast path: every tasks INSERT (from any call site — routes, +-- new_task tool, arena, MCP server) fires pg_notify('tasks_new') in the same +-- transaction, so the dispatcher reacts immediately instead of waiting for the +-- fallback poll. Postgres holds the notification until COMMIT, so the listener +-- always sees the committed row. A trigger covers all insert paths with no +-- app-code drift. Idempotent: re-applied on every startup. +CREATE OR REPLACE FUNCTION notify_tasks_new() RETURNS trigger AS $$ +BEGIN + PERFORM pg_notify('tasks_new', ''); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS tasks_notify_new ON tasks; +CREATE TRIGGER tasks_notify_new + AFTER INSERT ON tasks + FOR EACH ROW + EXECUTE FUNCTION notify_tasks_new(); diff --git a/apps/coder/src/services/dispatcher.ts b/apps/coder/src/services/dispatcher.ts index 1890e72..7860f7c 100644 --- a/apps/coder/src/services/dispatcher.ts +++ b/apps/coder/src/services/dispatcher.ts @@ -24,16 +24,29 @@ interface Deps { config: Config; } -const POLL_INTERVAL_MS = 5_000; +// LISTEN/NOTIFY ('tasks_new') is the fast path — the dispatcher reacts to new +// tasks immediately. The poll is only a safety net for notifications missed +// during a listen-connection drop (porsager auto-reconnects), so it can stay slow. +const POLL_INTERVAL_MS = 2_000; const COMPLETION_POLL_MS = 2_000; export function createDispatcher(deps: Deps): { start(): void; stop(): Promise } { const { sql, inference, broker, log, config } = deps; let timer: ReturnType | null = null; + let listener: { unlisten: () => Promise } | null = null; let running = false; let stopping = false; let inflightPromise: Promise | null = null; + // Shared entry point for both the poll timer and the NOTIFY listener. poll()'s + // `running`/`stopping` guard makes this safe to call concurrently — a notify + // arriving mid-task returns immediately and never double-dispatches. + function triggerPoll(reason: string): void { + poll().catch((err) => { + log.error({ err, reason }, 'dispatcher: poll error'); + }); + } + async function poll(): Promise { if (running || stopping) return; @@ -463,12 +476,28 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise { - poll().catch((err) => { - log.error({ err }, 'dispatcher: poll error'); + log.info('dispatcher: starting poll loop + tasks_new listener'); + + // Fallback poll — catches notifications missed while the listen connection + // was down. The fast path is the NOTIFY listener below. + timer = setInterval(() => triggerPoll('interval'), POLL_INTERVAL_MS); + + // Fast path: react immediately to new tasks. porsager reserves a dedicated + // connection and auto-resubscribes on reconnect; the onlisten callback + // fires on each (re)subscribe, so we kick a catch-up poll there too to + // sweep up anything inserted during a disconnect. + sql + .listen( + 'tasks_new', + () => triggerPoll('notify'), + () => triggerPoll('listen-subscribed'), + ) + .then((meta) => { + listener = meta; + }) + .catch((err) => { + log.error({ err }, 'dispatcher: failed to LISTEN tasks_new — relying on poll fallback'); }); - }, POLL_INTERVAL_MS); }, async stop() { @@ -477,6 +506,12 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise { + log.error({ err }, 'dispatcher: unlisten error'); + }); + listener = null; + } if (inflightPromise) { log.info('dispatcher: waiting for in-flight task'); await inflightPromise;