import type { Sql } from '../db.js'; import type { FastifyBaseLogger } from 'fastify'; import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/server/ws-frames'; import type { Config } from '../config.js'; import { createWorktree, diffWorktree, cleanupWorktree } from './worktrees.js'; import { dispatchViaAcp } from './acp-dispatch.js'; import { dispatchViaPty } from './pty-dispatch.js'; import { clearTaskCommands, setTaskCommands } from './agent-commands-cache.js'; import { getManifestCommands } from './provider-commands.js'; import { persistExternalAgentTurn } from './agent-turn-persist.js'; interface InferenceRunner { enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void; cancel: (sessionId: string, chatId: string) => Promise; hasActive: (chatId: string) => boolean; } interface Deps { sql: Sql; inference: InferenceRunner; broker: Broker; log: FastifyBaseLogger; config: Config; } const POLL_INTERVAL_MS = 5_000; const COMPLETION_POLL_MS = 2_000; export function createDispatcher(deps: Deps): { start(): void; stop(): Promise } { const { sql, inference, broker, log, config } = deps; let timer: ReturnType | null = null; let running = false; let stopping = false; let inflightPromise: Promise | null = null; async function poll(): Promise { if (running || stopping) return; // Grab one pending task const rows = await sql<{ id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; }[]>` SELECT id, project_id, input, agent, model, mode_id, thinking_option_id, session_id FROM tasks WHERE state = 'pending' ORDER BY created_at LIMIT 1 `; if (rows.length === 0) return; const task = rows[0]!; running = true; inflightPromise = runTask(task).finally(() => { running = false; inflightPromise = null; }); } async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; }): Promise { const taskId = task.id; // Determine execution path: if agent is specified AND exists in available_agents → Path B if (task.agent) { const [agentRow] = await sql<{ name: string; supports_acp: boolean; install_path: string | null }[]>` SELECT name, supports_acp, install_path FROM available_agents WHERE name = ${task.agent} `; if (agentRow) { await runExternalAgent(task, agentRow.supports_acp, agentRow.install_path); return; } // Agent specified but not available — fall through to Path A with a warning log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native'); } // Path A — native inference (existing behavior) await runNativeInference(task); } // ─── Path A: Native Inference ─────────────────────────────────────────────── async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; session_id: string | null }): Promise { const taskId = task.id; log.info({ taskId }, 'dispatcher: starting task (path A — native)'); try { // Mark running await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = 'native' WHERE id = ${taskId} `; // Create session + chat for this task const model = task.model ?? config.DEFAULT_MODEL; const sessionName = 'Task: ' + task.input.slice(0, 40); const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${model}, 'open') RETURNING id `; const sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'Task execution', 'open') RETURNING id `; const chatId = chat!.id; // Link task to session await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; // Create user message + streaming assistant await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) RETURNING id `; const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; const assistantId = assistantMsg!.id; // Enqueue inference inference.enqueue(sessionId, chatId, assistantId, 'default'); // Wait for inference to complete (poll message status) const finalStatus = await waitForCompletion(assistantId); if (stopping) { await sql` UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId} `; return; } // Aggregate token cost for the task's session const [costRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const costTokens = costRow?.total ?? null; if (finalStatus === 'complete') { const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} `; const summary = (msg?.content ?? '').slice(0, 500); await sql` UPDATE tasks SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens} WHERE id = ${taskId} `; log.info({ taskId, costTokens }, 'dispatcher: task completed (native)'); } else { const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} `; const summary = (msg?.content ?? 'Inference failed').slice(0, 500); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens} WHERE id = ${taskId} `; log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)'); } } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)'); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} `.catch(() => {}); } } // ─── Path B: External Agent Dispatch ──────���───────────────────────────────── async function runExternalAgent( task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; }, supportsAcp: boolean, installPath: string | null, ): Promise { const taskId = task.id; const agent = task.agent!; const executionPath = supportsAcp ? 'acp' : 'pty'; log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)'); // Resolve the project's root path const [project] = await sql<{ path: string | null }[]>` SELECT path FROM projects WHERE id = ${task.project_id} `; const projectPath = project?.path; if (!projectPath) { await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' WHERE id = ${taskId} `; return; } // Create an abort controller for this task const ac = new AbortController(); try { // Mark running await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath} WHERE id = ${taskId} `; let sessionId: string; let chatId: string; if (task.session_id) { sessionId = task.session_id; const chats = await sql<{ id: string }[]>` SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1 `; if (chats.length === 0) { const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; } else { chatId = chats[0]!.id; } } else { const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`; const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open') RETURNING id `; sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; } if (!task.session_id) { await sql` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) `; } // Step 1: Create worktree log.info({ taskId, projectPath }, 'dispatcher: creating worktree'); const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal }); log.info({ taskId, worktreePath }, 'dispatcher: worktree created'); // Step 2: Dispatch to agent let outputSummary: string; let assistantContent = ''; let acpReasoning = ''; const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; const assistantId = assistantMsg!.id; broker.publishFrame(sessionId, { type: 'message_started', message_id: assistantId, chat_id: chatId, role: 'assistant', } as WsFrame); const manifestCommands = getManifestCommands(agent); if (manifestCommands.length > 0) { setTaskCommands(taskId, manifestCommands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: manifestCommands, } as WsFrame); } if (supportsAcp) { const result = await dispatchViaAcp({ agent, task: task.input, worktreePath, installPath: installPath ?? undefined, model: task.model ?? undefined, modeId: task.mode_id ?? undefined, thinkingOptionId: task.thinking_option_id ?? undefined, taskId, sessionId, chatId, messageId: assistantId, broker, signal: ac.signal, log, }); assistantContent = result.output.slice(0, 50_000); acpReasoning = result.reasoningText.slice(0, 200_000); outputSummary = result.output.slice(0, 500); await persistExternalAgentTurn(sql, assistantId, result.toolSnapshots, acpReasoning); } else { const result = await dispatchViaPty({ agent, task: task.input, worktreePath, installPath: installPath ?? undefined, model: task.model ?? undefined, modeId: task.mode_id ?? undefined, thinkingOptionId: task.thinking_option_id ?? undefined, signal: ac.signal, log, }); assistantContent = (result.stdout || result.stderr || '(no output)').slice(0, 50_000); outputSummary = (result.stdout || result.stderr).slice(0, 500); if (assistantContent) { broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: assistantContent, } as WsFrame); } } await sql` UPDATE messages SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() WHERE id = ${assistantId} `; broker.publishFrame(sessionId, { type: 'message_complete', message_id: assistantId, chat_id: chatId, } as WsFrame); if (stopping) { await sql` UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId} `; await cleanupWorktree(projectPath, taskId); return; } // Step 3: Diff the worktree and queue pending changes log.info({ taskId }, 'dispatcher: diffing worktree'); const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal }); if (diff) { // Queue a single pending_change entry with the full unified diff await sql` INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff) VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}) `; log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change'); } else { log.info({ taskId }, 'dispatcher: no changes detected in worktree'); } // Step 4: Cleanup worktree await cleanupWorktree(projectPath, taskId); // Step 5: Aggregate token cost const [extCostRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const extCostTokens = extCostRow?.total ?? null; // Step 6: Mark task completed await sql` UPDATE tasks SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} WHERE id = ${taskId} `; log.info({ taskId, agent, costTokens: extCostTokens }, 'dispatcher: task completed (external)'); clearTaskCommands(taskId); } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error'); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} `.catch(() => {}); // Best-effort cleanup await cleanupWorktree(projectPath, taskId); clearTaskCommands(taskId); } } // ─── Helpers ──────────────────────────────────────────────────────────────── async function waitForCompletion(assistantId: string): Promise { for (;;) { if (stopping) return 'cancelled'; const [row] = await sql<{ status: string }[]>` SELECT status FROM messages WHERE id = ${assistantId} `; const status = row?.status ?? 'failed'; if (status !== 'streaming') return status; await sleep(COMPLETION_POLL_MS); } } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } return { start() { log.info('dispatcher: starting poll loop'); timer = setInterval(() => { poll().catch((err) => { log.error({ err }, 'dispatcher: poll error'); }); }, POLL_INTERVAL_MS); }, async stop() { stopping = true; if (timer) { clearInterval(timer); timer = null; } if (inflightPromise) { log.info('dispatcher: waiting for in-flight task'); await inflightPromise; } log.info('dispatcher: stopped'); }, }; }