import type { Sql } from '../db.js'; import type { FastifyBaseLogger } from 'fastify'; import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/server/ws-frames'; import type { Config } from '../config.js'; import { createWorktree, diffWorktree, cleanupWorktree, ensureSessionWorktree } from './worktrees.js'; import { dispatchViaAcp } from './acp-dispatch.js'; import { getResolvedRegistry } from './provider-config-registry.js'; import { dispatchViaPty } from './pty-dispatch.js'; import { clearTaskCommands, setTaskCommands } from './agent-commands-cache.js'; import { getManifestCommands } from './provider-commands.js'; import { persistExternalAgentTurn } from './agent-turn-persist.js'; import { snapshotToWireToolCall, type AcpToolSnapshot } from './acp-tool-snapshot.js'; import { agentPool } from './agent-pool.js'; import { OpenCodeServerBackend } from './backends/opencode-server.js'; import type { AgentBackend, AgentEvent } from './agent-backend.js'; interface InferenceRunner { enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void; cancel: (sessionId: string, chatId: string) => Promise; hasActive: (chatId: string) => boolean; } interface Deps { sql: Sql; inference: InferenceRunner; broker: Broker; log: FastifyBaseLogger; config: Config; } // LISTEN/NOTIFY ('tasks_new') is the fast path — the dispatcher reacts to new // tasks immediately. The poll is only a safety net for notifications missed // during a listen-connection drop (porsager auto-reconnects), so it can stay slow. const POLL_INTERVAL_MS = 2_000; const COMPLETION_POLL_MS = 2_000; export function createDispatcher(deps: Deps): { start(): void; stop(): Promise } { const { sql, inference, broker, log, config } = deps; let timer: ReturnType | null = null; let listener: { unlisten: () => Promise } | null = null; let polling = false; let stopping = false; // v2.6 (1.9): per-session in-flight registry replaces the global `running` // boolean. Key = session_id (or `task:` for sessionless tasks). Sessions // without an in-flight turn run concurrently; within a session, strictly one // turn at a time. const inflight = new Map>(); // Shared entry point for both the poll timer and the NOTIFY listener. poll()'s // `polling`/`stopping` guard makes this safe to call concurrently — a notify // arriving mid-poll returns immediately and never double-dispatches. function triggerPoll(reason: string): void { poll().catch((err) => { log.error({ err, reason }, 'dispatcher: poll error'); }); } function concurrencyKey(task: { id: string; session_id: string | null }): string { return task.session_id ?? `task:${task.id}`; } async function poll(): Promise { // `polling` serializes poll() execution itself (timer + NOTIFY can fire // concurrently) so we never double-select a task. It does NOT serialize task // execution — that's what `inflight` (keyed per session) governs. if (polling || stopping) return; polling = true; try { // Oldest-first; start every pending task whose session isn't already busy. const rows = await sql<{ id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; }[]>` SELECT id, project_id, input, agent, model, mode_id, thinking_option_id, session_id FROM tasks WHERE state = 'pending' ORDER BY created_at LIMIT 50 `; for (const task of rows) { if (stopping) break; const key = concurrencyKey(task); if (inflight.has(key)) continue; // this session already has an in-flight turn // Register synchronously (before any await) so a later row in this pass // with the same key is skipped and a concurrent poll can't re-pick it. const p = runTask(task).finally(() => { inflight.delete(key); }); inflight.set(key, p); } } finally { polling = false; } } async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; }): Promise { const taskId = task.id; // Determine execution path: if agent is specified AND exists in available_agents → Path B if (task.agent) { const [agentRow] = await sql<{ name: string; supports_acp: boolean; install_path: string | null }[]>` SELECT name, supports_acp, install_path FROM available_agents WHERE name = ${task.agent} `; if (agentRow) { // v2.6 (1.7): opencode routes to the warm pool backend; every other // external agent keeps the existing one-shot ACP/PTY path untouched. if (task.agent === 'opencode') { await runOpenCodeServerTask(task, agentRow.install_path); } else { await runExternalAgent(task, agentRow.supports_acp, agentRow.install_path); } return; } // Agent specified but not available — fall through to Path A with a warning log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native'); } // Path A — native inference (existing behavior) await runNativeInference(task); } // ─── Path A: Native Inference ─────────────────────────────────────────────── async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; session_id: string | null }): Promise { const taskId = task.id; log.info({ taskId }, 'dispatcher: starting task (path A — native)'); try { // Mark running await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = 'native' WHERE id = ${taskId} `; // Create session + chat for this task const model = task.model ?? config.DEFAULT_MODEL; const sessionName = 'Task: ' + task.input.slice(0, 40); const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${model}, 'open') RETURNING id `; const sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'Task execution', 'open') RETURNING id `; const chatId = chat!.id; // Link task to session await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; // Create user message + streaming assistant await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) RETURNING id `; const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; const assistantId = assistantMsg!.id; // Enqueue inference inference.enqueue(sessionId, chatId, assistantId, 'default'); // Wait for inference to complete (poll message status) const finalStatus = await waitForCompletion(assistantId); if (stopping) { await sql` UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId} `; return; } // Aggregate token cost for the task's session const [costRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const costTokens = costRow?.total ?? null; if (finalStatus === 'complete') { const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} `; const summary = (msg?.content ?? '').slice(0, 500); await sql` UPDATE tasks SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens} WHERE id = ${taskId} `; log.info({ taskId, costTokens }, 'dispatcher: task completed (native)'); } else { const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} `; const summary = (msg?.content ?? 'Inference failed').slice(0, 500); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens} WHERE id = ${taskId} `; log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)'); } } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)'); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} `.catch(() => {}); } } // ─── Path B: External Agent Dispatch ──────���───────────────────────────────── async function runExternalAgent( task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; }, supportsAcp: boolean, installPath: string | null, ): Promise { const taskId = task.id; const agent = task.agent!; const executionPath = supportsAcp ? 'acp' : 'pty'; log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)'); // Resolve the project's root path const [project] = await sql<{ path: string | null }[]>` SELECT path FROM projects WHERE id = ${task.project_id} `; const projectPath = project?.path; if (!projectPath) { await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' WHERE id = ${taskId} `; return; } // Create an abort controller for this task const ac = new AbortController(); try { // Mark running await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath} WHERE id = ${taskId} `; let sessionId: string; let chatId: string; if (task.session_id) { sessionId = task.session_id; const chats = await sql<{ id: string }[]>` SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1 `; if (chats.length === 0) { const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; } else { chatId = chats[0]!.id; } } else { const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`; const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open') RETURNING id `; sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; } if (!task.session_id) { await sql` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) `; } // Step 1: Create worktree log.info({ taskId, projectPath }, 'dispatcher: creating worktree'); const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal }); log.info({ taskId, worktreePath }, 'dispatcher: worktree created'); // Step 2: Dispatch to agent let outputSummary: string; let assistantContent = ''; let acpReasoning = ''; const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; const assistantId = assistantMsg!.id; broker.publishFrame(sessionId, { type: 'message_started', message_id: assistantId, chat_id: chatId, role: 'assistant', } as WsFrame); const manifestCommands = getManifestCommands(agent); if (manifestCommands.length > 0) { setTaskCommands(taskId, manifestCommands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: manifestCommands, } as WsFrame); } if (supportsAcp) { const result = await dispatchViaAcp({ agent, resolved: getResolvedRegistry().get(agent), task: task.input, worktreePath, installPath: installPath ?? undefined, model: task.model ?? undefined, modeId: task.mode_id ?? undefined, thinkingOptionId: task.thinking_option_id ?? undefined, taskId, sessionId, chatId, messageId: assistantId, broker, signal: ac.signal, log, }); assistantContent = result.output.slice(0, 50_000); acpReasoning = result.reasoningText.slice(0, 200_000); outputSummary = result.output.slice(0, 500); await persistExternalAgentTurn(sql, assistantId, result.toolSnapshots, acpReasoning); } else { const result = await dispatchViaPty({ agent, task: task.input, worktreePath, installPath: installPath ?? undefined, model: task.model ?? undefined, modeId: task.mode_id ?? undefined, thinkingOptionId: task.thinking_option_id ?? undefined, signal: ac.signal, log, }); assistantContent = (result.stdout || result.stderr || '(no output)').slice(0, 50_000); outputSummary = (result.stdout || result.stderr).slice(0, 500); if (assistantContent) { broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: assistantContent, } as WsFrame); } } await sql` UPDATE messages SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() WHERE id = ${assistantId} `; broker.publishFrame(sessionId, { type: 'message_complete', message_id: assistantId, chat_id: chatId, } as WsFrame); if (stopping) { await sql` UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId} `; await cleanupWorktree(projectPath, taskId); return; } // Step 3: Diff the worktree and queue pending changes log.info({ taskId }, 'dispatcher: diffing worktree'); const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal }); if (diff) { // Queue a single pending_change entry with the full unified diff await sql` INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff) VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}) `; log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change'); } else { log.info({ taskId }, 'dispatcher: no changes detected in worktree'); } // Step 4: Cleanup worktree await cleanupWorktree(projectPath, taskId); // Step 5: Aggregate token cost const [extCostRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const extCostTokens = extCostRow?.total ?? null; // Step 6: Mark task completed await sql` UPDATE tasks SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} WHERE id = ${taskId} `; log.info({ taskId, agent, costTokens: extCostTokens }, 'dispatcher: task completed (external)'); clearTaskCommands(taskId); } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error'); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} `.catch(() => {}); // Best-effort cleanup await cleanupWorktree(projectPath, taskId); clearTaskCommands(taskId); } } // ─── Path B (opencode): warm OpenCode server backend (v2.6 1.7 + 1.10) ─────── // OpenCode runs ONE server per BooCoder process, shared across all sessions // (the backend multiplexes sessions internally), so it's pooled under a fixed // key rather than per-session. Warm ACP backends (Phase 2) will be per-session. const OPENCODE_POOL_KEY = '__opencode_server__'; function getOpenCodeBackend(installPath: string | null): AgentBackend { let backend = agentPool.get(OPENCODE_POOL_KEY, 'opencode'); if (!backend) { backend = new OpenCodeServerBackend({ sql, log, opencodeBinary: installPath ?? 'opencode' }); agentPool.register(OPENCODE_POOL_KEY, 'opencode', backend); } return backend; } async function runOpenCodeServerTask( task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; }, installPath: string | null, ): Promise { const taskId = task.id; const agent = 'opencode'; log.info({ taskId, agent }, 'dispatcher: starting task (path B — opencode server)'); const [project] = await sql<{ path: string | null }[]>` SELECT path FROM projects WHERE id = ${task.project_id} `; const projectPath = project?.path; if (!projectPath) { await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' WHERE id = ${taskId} `; return; } const ac = new AbortController(); try { // execution_path = 'acp' — the schema CHECK has no 'opencode_server' value // (schema is frozen at Phase 0); the warm-vs-one-shot distinction lives in // agent_sessions.backend. Reuse the closest existing value. await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp' WHERE id = ${taskId} `; // Resolve session + chat (mirrors runExternalAgent). let sessionId: string; let chatId: string; if (task.session_id) { sessionId = task.session_id; const chats = await sql<{ id: string }[]>` SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1 `; if (chats.length === 0) { const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; } else { chatId = chats[0]!.id; } } else { const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`; const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open') RETURNING id `; sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; } if (!task.session_id) { await sql` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) `; } // Persistent, session-keyed worktree (shared across turns; NOT torn down // per turn — Phase 3 reaps it). Captures base_commit for a stable diff. const { worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, { signal: ac.signal, }); log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready'); const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; const assistantId = assistantMsg!.id; broker.publishFrame(sessionId, { type: 'message_started', message_id: assistantId, chat_id: chatId, role: 'assistant', } as WsFrame); const manifestCommands = getManifestCommands(agent); if (manifestCommands.length > 0) { setTaskCommands(taskId, manifestCommands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: manifestCommands, } as WsFrame); } // Accumulate the turn's stream for persistence + the final message content. const textChunks: string[] = []; const reasoningChunks: string[] = []; const toolSnaps = new Map(); // Map transport-agnostic AgentEvents → the SAME WS frames the ACP path emits. // This boundary is where message_id/chat_id get attached (the backend never // owns them). const onEvent = (e: AgentEvent): void => { switch (e.type) { case 'text': textChunks.push(e.text); broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'reasoning': reasoningChunks.push(e.text); broker.publishFrame(sessionId, { type: 'reasoning_delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'tool_call': case 'tool_update': toolSnaps.set(e.toolCall.toolCallId, e.toolCall); broker.publishFrame(sessionId, { type: 'tool_call', message_id: assistantId, chat_id: chatId, tool_call: snapshotToWireToolCall(e.toolCall), } as WsFrame); break; case 'commands': // opencode-server doesn't emit these today; ignore if it ever does. break; } }; // opencode expects provider-prefixed model ids (e.g. 'llama-swap/qwen3.6-35b…'). // DEFAULT_MODEL is bare (no prefix) because native inference uses it directly // against llama-swap. Coalesce empty string (frontend sends '' when no models // listed) and prefix bare ids so parseModel always succeeds. const rawModel = (task.model && task.model.trim()) || config.DEFAULT_MODEL; const model = rawModel.includes('/') ? rawModel : `llama-swap/${rawModel}`; const backend = getOpenCodeBackend(installPath); const handle = await backend.ensureSession(sessionId, { agent, model, worktreePath, projectId: task.project_id, }); const result = await backend.prompt(handle, task.input, { worktreePath, model, signal: ac.signal, onEvent, }); const assistantContent = textChunks.join('').slice(0, 50_000); const reasoningText = reasoningChunks.join('').slice(0, 200_000); const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'opencode turn failed').slice(0, 500); await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText); await sql` UPDATE messages SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() WHERE id = ${assistantId} `; broker.publishFrame(sessionId, { type: 'message_complete', message_id: assistantId, chat_id: chatId, } as WsFrame); if (stopping) { await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`; return; // worktree persists (no cleanup); backend stays warm } // 1.10: diff the persistent worktree against its captured baseline and // SUPERSEDE the session's prior pending row (latest-wins, one accumulating // diff) instead of stacking. Stamp agent for DiffPanel attribution. const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal, baseRef: baseCommit ?? 'HEAD', }); if (diff) { await sql` DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending' `; await sql` INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent) VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent}) `; log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change'); } else { log.info({ taskId }, 'dispatcher: no changes detected in session worktree'); } // NO worktree cleanup — it's persistent (Phase 3 reaps it). Backend stays warm. const [extCostRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const extCostTokens = extCostRow?.total ?? null; const finalState = result.ok ? 'completed' : 'failed'; await sql` UPDATE tasks SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} WHERE id = ${taskId} `; log.info({ taskId, agent, finalState, costTokens: extCostTokens }, 'dispatcher: task finished (opencode server)'); clearTaskCommands(taskId); } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log.error({ taskId, agent, err: errMsg }, 'dispatcher: opencode server error'); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} `.catch(() => {}); clearTaskCommands(taskId); // No worktree cleanup (persistent); backend stays warm for the next turn. } } // ─── Helpers ──────────────────────────────────────────────────────────────── async function waitForCompletion(assistantId: string): Promise { for (;;) { if (stopping) return 'cancelled'; const [row] = await sql<{ status: string }[]>` SELECT status FROM messages WHERE id = ${assistantId} `; const status = row?.status ?? 'failed'; if (status !== 'streaming') return status; await sleep(COMPLETION_POLL_MS); } } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } return { start() { log.info('dispatcher: starting poll loop + tasks_new listener'); // Fallback poll — catches notifications missed while the listen connection // was down. The fast path is the NOTIFY listener below. timer = setInterval(() => triggerPoll('interval'), POLL_INTERVAL_MS); // Fast path: react immediately to new tasks. porsager reserves a dedicated // connection and auto-resubscribes on reconnect; the onlisten callback // fires on each (re)subscribe, so we kick a catch-up poll there too to // sweep up anything inserted during a disconnect. sql .listen( 'tasks_new', () => triggerPoll('notify'), () => triggerPoll('listen-subscribed'), ) .then((meta) => { listener = meta; }) .catch((err) => { log.error({ err }, 'dispatcher: failed to LISTEN tasks_new — relying on poll fallback'); }); }, async stop() { stopping = true; if (timer) { clearInterval(timer); timer = null; } if (listener) { await listener.unlisten().catch((err) => { log.error({ err }, 'dispatcher: unlisten error'); }); listener = null; } if (inflight.size > 0) { log.info({ count: inflight.size }, 'dispatcher: waiting for in-flight tasks'); await Promise.allSettled([...inflight.values()]); } log.info('dispatcher: stopped'); }, }; }