import type { Sql } from '../db.js'; import type { FastifyBaseLogger } from 'fastify'; import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/contracts/ws-frames'; import type { Config } from '../config.js'; import { createWorktree, diffWorktree, cleanupWorktree, ensureSessionWorktree } from './worktrees.js'; import { createCheckpoint } from './checkpoints.js'; import { makeDcpStreamStripper } from './dcp-strip.js'; import { dispatchViaAcp } from './acp-dispatch.js'; import { getResolvedRegistry } from './provider-config-registry.js'; import { dispatchViaPty } from './pty-dispatch.js'; import { clearTaskCommands, setTaskCommands } from './agent-commands-cache.js'; import { getManifestCommands } from './provider-commands.js'; import { persistExternalAgentTurn } from './agent-turn-persist.js'; import { snapshotToWireToolCall, type AcpToolSnapshot } from './acp-tool-snapshot.js'; import { agentPool, OPENCODE_POOL_KEY } from './agent-pool.js'; import { OpenCodeServerBackend } from './backends/opencode-server.js'; import { WarmAcpBackend } from './backends/warm-acp.js'; import { ClaudeSdkBackend } from './backends/claude-sdk.js'; import { shouldUseWarmBackend } from './backends/warm-acp-routing.js'; import { shouldUseClaudeSdk } from './backends/claude-sdk-routing.js'; import type { AgentBackend, AgentEvent } from './agent-backend.js'; import { publishAgentStatus } from './agent-status-publish.js'; import type { AgentStatus } from './normalize-agent-status.js'; import { createCancelRegistry } from './cancel-registry.js'; import { finalizeStreamingMessage, classifyTerminalStatus, type TerminalMessageStatus, } from './finalize-message.js'; interface InferenceRunner { enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void; cancel: (sessionId: string, chatId: string) => Promise; hasActive: (chatId: string) => boolean; } interface Deps { sql: Sql; inference: InferenceRunner; broker: Broker; log: FastifyBaseLogger; config: Config; } // LISTEN/NOTIFY ('tasks_new') is the fast path — the dispatcher reacts to new // tasks immediately. The poll is only a safety net for notifications missed // during a listen-connection drop (porsager auto-reconnects), so it can stay slow. const POLL_INTERVAL_MS = 2_000; const COMPLETION_POLL_MS = 2_000; export function createDispatcher(deps: Deps): { cancelExternalTask(taskId: string): boolean; start(): void; stop(): Promise; } { const { sql, inference, broker, log, config } = deps; let timer: ReturnType | null = null; let listener: { unlisten: () => Promise } | null = null; let polling = false; let stopping = false; // v2.6 (1.9): per-session in-flight registry replaces the global `running` // boolean. Key = session_id (or `task:` for sessionless tasks). Sessions // without an in-flight turn run concurrently; within a session, strictly one // turn at a time. const inflight = new Map>(); // F1: per-task abort registry. Each external run-function registers its per-turn // AbortController here (keyed by task id); the cancel route reaches it through the // exported `cancelExternalTask`; the run's `.finally` deletes the entry. Native // boocode tasks are never registered, so a Stop on one returns false and falls // through to the unchanged inference.cancel path. const taskControllers = createCancelRegistry(); // Shared entry point for both the poll timer and the NOTIFY listener. poll()'s // `polling`/`stopping` guard makes this safe to call concurrently — a notify // arriving mid-poll returns immediately and never double-dispatches. function triggerPoll(reason: string): void { poll().catch((err) => { log.error({ err, reason }, 'dispatcher: poll error'); }); } function concurrencyKey(task: { id: string; session_id: string | null }): string { return task.session_id ?? `task:${task.id}`; } // agent-status-normalize (#10): publish a normalized per-(chat,agent) status on // the session channel. Every external-agent path (warm-acp / opencode / claude-sdk / // pty one-shot) reports `working` at turn start, `idle` on clean completion, and // `error` on the failure path through this single helper so the four paths stay // DRY and consistent. Best-effort — publishAgentStatus never throws. function emitAgentStatus( sessionId: string, chatId: string, agent: string, status: AgentStatus, reason: string, ): void { publishAgentStatus(broker.publishFrame, sessionId, chatId, agent, status, reason); } // F1 (OCE-001/OCE-002): finalize a streaming assistant message into a terminal // state and publish the matching message_complete frame. Best-effort + idempotent // (the helper's `WHERE status='streaming'` guard) — a failure here must never mask // the original abort/error, so it logs and swallows. function finalizeMessage( sessionId: string, chatId: string, assistantId: string, status: TerminalMessageStatus, model: string | null, content?: string, ): Promise { return finalizeStreamingMessage(sql, broker.publishFrame, { sessionId, chatId, assistantId, status, model, content, }).catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err), assistantId }, 'dispatcher: finalizeStreamingMessage failed'); return false; }); } // F1: the cancel route's reach into an in-flight external run. Idempotent — a // double-Stop re-aborts an already-aborted controller (no-op) and a Stop on a // finished/native task returns false. Aborting only fires the backend's per-turn // cancel (session.abort / session/cancel / interrupt / child.kill); it never kills // a warm pool process, so persistent worktrees + pooled backends are preserved. function cancelExternalTask(taskId: string): boolean { return taskControllers.cancel(taskId); } async function poll(): Promise { // `polling` serializes poll() execution itself (timer + NOTIFY can fire // concurrently) so we never double-select a task. It does NOT serialize task // execution — that's what `inflight` (keyed per session) governs. if (polling || stopping) return; polling = true; try { // Oldest-first; start every pending task whose session isn't already busy. const rows = await sql<{ id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; chat_id: string | null; }[]>` SELECT id, project_id, input, agent, model, mode_id, thinking_option_id, session_id, chat_id FROM tasks WHERE state = 'pending' ORDER BY created_at LIMIT 50 `; for (const task of rows) { if (stopping) break; const key = concurrencyKey(task); if (inflight.has(key)) continue; // this session already has an in-flight turn // Register synchronously (before any await) so a later row in this pass // with the same key is skipped and a concurrent poll can't re-pick it. const p = runTask(task).finally(() => { inflight.delete(key); // F1: drop the abort controller once the run settles. After this, a Stop // on the (now-finished) task returns false — cancel-after-exit is safe. taskControllers.delete(task.id); }); inflight.set(key, p); } } finally { polling = false; } } async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; chat_id: string | null; }): Promise { const taskId = task.id; // Determine execution path: if agent is specified AND exists in available_agents → Path B if (task.agent) { const [agentRow] = await sql<{ name: string; supports_acp: boolean; install_path: string | null }[]>` SELECT name, supports_acp, install_path FROM available_agents WHERE name = ${task.agent} `; if (agentRow) { // v2.6 (1.7): opencode routes to its warm HTTP-server backend. // v2.6 Phase 2 (2.4): goose/qwen route to the warm ACP backend WHEN the // task came from a real chat tab (session_id + chat_id) — shouldUseWarmBackend. // Session-less creators (arena, MCP, new_task, generic /api/tasks) keep the // existing one-shot worktree-per-task ACP/PTY path untouched. if (task.agent === 'opencode') { await runOpenCodeServerTask(task, agentRow.install_path); } else if (shouldUseClaudeSdk(task)) { // claude-sdk-sessionstore #9 (Part 2): env-flagged (CLAUDE_SDK_BACKEND, default // OFF) warm Claude-SDK backend for chat-tab claude tasks. When the flag is off // (production default) this predicate returns false and claude falls through to // the UNCHANGED one-shot PTY runExternalAgent path below. await runClaudeSdkTask(task, agentRow.install_path); } else if (shouldUseWarmBackend(task)) { await runWarmAcpTask(task, agentRow.install_path); } else { await runExternalAgent(task, agentRow.supports_acp, agentRow.install_path); } return; } // Agent specified but not available — fall through to Path A with a warning log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native'); } // Path A — native inference (existing behavior) await runNativeInference(task); } // ─── Path A: Native Inference ─────────────────────────────────────────────── async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; session_id: string | null }): Promise { const taskId = task.id; log.info({ taskId }, 'dispatcher: starting task (path A — native)'); try { // Mark running await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = 'native' WHERE id = ${taskId} `; // Create session + chat for this task const model = task.model ?? config.DEFAULT_MODEL; const sessionName = 'Task: ' + task.input.slice(0, 40); const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${model}, 'open') RETURNING id `; const sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'Task execution', 'open') RETURNING id `; const chatId = chat!.id; // Link task to session await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; // Create user message + streaming assistant await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) RETURNING id `; const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp()) RETURNING id `; const assistantId = assistantMsg!.id; // Enqueue inference inference.enqueue(sessionId, chatId, assistantId, 'default'); // Wait for inference to complete (poll message status) const finalStatus = await waitForCompletion(assistantId); if (stopping) { await sql` UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId} `; return; } // Aggregate token cost for the task's session const [costRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const costTokens = costRow?.total ?? null; if (finalStatus === 'complete') { const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} `; const summary = (msg?.content ?? '').slice(0, 500); await sql` UPDATE tasks SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens} WHERE id = ${taskId} `; log.info({ taskId, costTokens }, 'dispatcher: task completed (native)'); } else { const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} `; const summary = (msg?.content ?? 'Inference failed').slice(0, 500); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens} WHERE id = ${taskId} `; log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)'); } } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)'); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} `.catch(() => {}); } } // ─── Path B: External Agent Dispatch ──────���───────────────────────────────── async function runExternalAgent( task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; }, supportsAcp: boolean, installPath: string | null, ): Promise { const taskId = task.id; const agent = task.agent!; const executionPath = supportsAcp ? 'acp' : 'pty'; log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)'); // Resolve the project's root path const [project] = await sql<{ path: string | null }[]>` SELECT path FROM projects WHERE id = ${task.project_id} `; const projectPath = project?.path; if (!projectPath) { await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' WHERE id = ${taskId} `; return; } // F1: register the per-task abort controller so a Stop reaches this run. const ac = taskControllers.register(taskId); // #10: hoisted above the try so the catch block can report `error` status with // the (chat, agent) key. Empty until resolved below; guarded before use. let sessionId = ''; let chatId = ''; // F1: hoisted so the catch / abort short-circuit can finalize the streaming // assistant row. Empty until the row is created; finalize no-ops on ''. let assistantId = ''; try { // Mark running await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath} WHERE id = ${taskId} `; if (task.session_id) { sessionId = task.session_id; const chats = await sql<{ id: string }[]>` SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1 `; if (chats.length === 0) { const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; } else { chatId = chats[0]!.id; } } else { const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`; const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open') RETURNING id `; sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; } if (!task.session_id) { await sql` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) `; } // Step 1: Create worktree log.info({ taskId, projectPath }, 'dispatcher: creating worktree'); const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal }); log.info({ taskId, worktreePath }, 'dispatcher: worktree created'); // Step 2: Dispatch to agent let outputSummary: string; let assistantContent = ''; let acpReasoning = ''; const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp()) RETURNING id `; assistantId = assistantMsg!.id; // write-edit-robustness #4: pre-turn worktree checkpoint (best-effort; a // failure logs and never breaks dispatch). This path uses a per-task worktree // (createWorktree, not the session worktree), so there's no worktrees-table id // — pass null for worktreeId, the path is enough for restore's reset. await createCheckpoint( sql, { chatId, sessionId, worktreeId: null, worktreePath, messageId: assistantId }, { signal: ac.signal, log }, ).catch(() => null); broker.publishFrame(sessionId, { type: 'message_started', message_id: assistantId, chat_id: chatId, role: 'assistant', } as WsFrame); // #10: external-agent turn begins. emitAgentStatus(sessionId, chatId, agent, 'working', 'turn_start'); const manifestCommands = getManifestCommands(agent); if (manifestCommands.length > 0) { setTaskCommands(taskId, manifestCommands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: manifestCommands, } as WsFrame); } if (supportsAcp) { const result = await dispatchViaAcp({ agent, resolved: getResolvedRegistry().get(agent), task: task.input, worktreePath, installPath: installPath ?? undefined, model: task.model ?? undefined, modeId: task.mode_id ?? undefined, thinkingOptionId: task.thinking_option_id ?? undefined, taskId, sessionId, chatId, messageId: assistantId, broker, signal: ac.signal, log, }); assistantContent = result.output.slice(0, 50_000); acpReasoning = result.reasoningText.slice(0, 200_000); outputSummary = result.output.slice(0, 500); await persistExternalAgentTurn(sql, assistantId, result.toolSnapshots, acpReasoning); } else { // v#7 (stream-json): claude + qwen run with --output-format stream-json. // Parse the NDJSON live in pty-dispatch and forward AgentEvents here so we // publish the SAME live frames the warm-ACP / opencode paths emit (text, // reasoning, tool) and persist structured parts. Accumulate for the final // message content + persistence; fall back to the opaque stdout slice when // nothing parsed (agent ran without the flag, or crashed before emitting). const ptyTextChunks: string[] = []; const ptyReasoningChunks: string[] = []; const ptyToolSnaps = new Map(); const onPtyEvent = (e: AgentEvent): void => { switch (e.type) { case 'text': ptyTextChunks.push(e.text); broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'reasoning': ptyReasoningChunks.push(e.text); broker.publishFrame(sessionId, { type: 'reasoning_delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'tool_call': case 'tool_update': ptyToolSnaps.set(e.toolCall.toolCallId, e.toolCall); broker.publishFrame(sessionId, { type: 'tool_call', message_id: assistantId, chat_id: chatId, tool_call: snapshotToWireToolCall(e.toolCall), } as WsFrame); break; case 'commands': // stream-json carries no commands today; ignore if it ever does. break; } }; const result = await dispatchViaPty({ agent, task: task.input, worktreePath, installPath: installPath ?? undefined, model: task.model ?? undefined, modeId: task.mode_id ?? undefined, thinkingOptionId: task.thinking_option_id ?? undefined, signal: ac.signal, log, onEvent: onPtyEvent, }); if (result.streamed) { assistantContent = ptyTextChunks.join('').slice(0, 50_000); // stream-json text can be empty for a tool-only turn — surface stderr or a // placeholder so the message row isn't blank. if (!assistantContent) { assistantContent = (result.stderr || '(no text output)').slice(0, 50_000); } outputSummary = (ptyTextChunks.join('') || result.stderr).slice(0, 500); acpReasoning = ptyReasoningChunks.join('').slice(0, 200_000); await persistExternalAgentTurn(sql, assistantId, [...ptyToolSnaps.values()], acpReasoning); } else { // Fallback: agent produced no parseable NDJSON (ran without the flag, or // crashed). Preserve today's opaque stdout-slice + single delta behavior. assistantContent = (result.stdout || result.stderr || '(no output)').slice(0, 50_000); outputSummary = (result.stdout || result.stderr).slice(0, 500); if (assistantContent) { broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: assistantContent, } as WsFrame); } } } // F1: abort short-circuit BEFORE the unconditional 'complete' write. A Stop // (cancelExternalTask → ac.abort) or shutdown finalizes the streaming row as // 'cancelled' (keeping whatever streamed) instead of recording 'complete', // and skips the diff. This one-shot path owns a per-task worktree, so we DO // tear it down here (unlike the warm paths, which keep their persistent one). if (ac.signal.aborted || stopping) { await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent); await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`; emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled'); await cleanupWorktree(projectPath, taskId); clearTaskCommands(taskId); return; } await sql` UPDATE messages SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() WHERE id = ${assistantId} `; broker.publishFrame(sessionId, { type: 'message_complete', message_id: assistantId, chat_id: chatId, model: task.model, } as WsFrame); // Step 3: Diff the worktree and queue pending changes log.info({ taskId }, 'dispatcher: diffing worktree'); const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal }); if (diff) { // Queue a single pending_change entry with the full unified diff, stamped // with the dispatched agent for DiffPanel attribution (v2.6 Phase 1-UX). await sql` INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent) VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent}) `; log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change'); } else { log.info({ taskId }, 'dispatcher: no changes detected in worktree'); } // Step 4: Cleanup worktree await cleanupWorktree(projectPath, taskId); // Step 5: Aggregate token cost const [extCostRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const extCostTokens = extCostRow?.total ?? null; // Step 6: Mark task completed await sql` UPDATE tasks SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} WHERE id = ${taskId} `; log.info({ taskId, agent, costTokens: extCostTokens }, 'dispatcher: task completed (external)'); // #10: external-agent turn completed cleanly. emitAgentStatus(sessionId, chatId, agent, 'idle', 'turn_complete'); clearTaskCommands(taskId); } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); const status = classifyTerminalStatus({ aborted: ac.signal.aborted, error: err }); log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error'); // Guard `NOT IN ('cancelled','completed')` so a genuine error in the catch // never overwrites a state the cancel route already wrote (user-Stop wins). await sql` UPDATE tasks SET state = ${status}, ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} AND state NOT IN ('cancelled', 'completed') `.catch(() => {}); // F1 (OCE-001): finalize the streaming assistant message — the catch // previously updated only `tasks` and left the message 'streaming' forever // (the BooChat 5-min sweep runs in a different process and can't reach it). await finalizeMessage(sessionId, chatId, assistantId, status, task.model); // #10: external-agent turn failed/crashed. chatId may be unbound if the throw // preceded its assignment — guard so the status publish never masks the real // error. if (chatId) emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'failed'); // Best-effort cleanup await cleanupWorktree(projectPath, taskId); clearTaskCommands(taskId); } } // ─── Path B (opencode): warm OpenCode server backend (v2.6 1.7 + 1.10) ─────── // OpenCode runs ONE server per BooCoder process, shared across all sessions // (the backend multiplexes sessions internally), so it's pooled under a fixed // key (OPENCODE_POOL_KEY, shared with the lifecycle close-hook) rather than // per-session. Warm ACP backends (Phase 2) are per (chat, agent). function getOpenCodeBackend(installPath: string | null): AgentBackend { let backend = agentPool.get(OPENCODE_POOL_KEY, 'opencode'); if (!backend) { backend = new OpenCodeServerBackend({ sql, log, opencodeBinary: installPath ?? 'opencode' }); agentPool.register(OPENCODE_POOL_KEY, 'opencode', backend); } return backend; } async function runOpenCodeServerTask( task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; chat_id: string | null; }, installPath: string | null, ): Promise { const taskId = task.id; const agent = 'opencode'; log.info({ taskId, agent }, 'dispatcher: starting task (path B — opencode server)'); const [project] = await sql<{ path: string | null }[]>` SELECT path FROM projects WHERE id = ${task.project_id} `; const projectPath = project?.path; if (!projectPath) { await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' WHERE id = ${taskId} `; return; } // F1: register the per-task abort controller so a Stop reaches this run. const ac = taskControllers.register(taskId); // #10: hoisted so the catch can report `error` with the (chat, agent) key. let sessionId = ''; let chatId = ''; // F1: hoisted so the catch / abort short-circuit can finalize the streaming row. let assistantId = ''; try { // execution_path = 'acp' — the schema CHECK has no 'opencode_server' value // (schema is frozen at Phase 0); the warm-vs-one-shot distinction lives in // agent_sessions.backend. Reuse the closest existing value. await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp' WHERE id = ${taskId} `; // Resolve session + chat. P1.5-b: the chat (tab) is the context key, so the // chat_id MUST be non-null and stable before ensureSession. The coder message // route + skills route stamp task.chat_id with the frontend tab's chat — use // it directly. Session-less creators (arena, MCP, new_task, generic // /api/tasks) leave it null; fall back to resolving/creating a real chat so // ensureSession never receives a degenerate (null, agent) key. if (task.chat_id && task.session_id) { sessionId = task.session_id; chatId = task.chat_id; } else if (task.session_id) { sessionId = task.session_id; const chats = await sql<{ id: string }[]>` SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1 `; if (chats.length === 0) { const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; } else { chatId = chats[0]!.id; } } else { const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`; const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open') RETURNING id `; sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'External agent execution', 'open') RETURNING id `; chatId = chat!.id; await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; } if (!task.session_id) { await sql` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) `; } // Persistent, session-keyed worktree (shared across turns; NOT torn down // per turn — Phase 3 reaps it). Captures base_commit for a stable diff. const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, { signal: ac.signal, }); log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready'); const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp()) RETURNING id `; assistantId = assistantMsg!.id; // write-edit-robustness #4: pre-turn checkpoint of the persistent session // worktree (best-effort; never breaks dispatch). worktreeId comes from the // worktrees table (ensureSessionWorktree above). await createCheckpoint( sql, { chatId, sessionId, worktreeId, worktreePath, messageId: assistantId }, { signal: ac.signal, log }, ).catch(() => null); broker.publishFrame(sessionId, { type: 'message_started', message_id: assistantId, chat_id: chatId, role: 'assistant', } as WsFrame); // #10: opencode-server turn begins. emitAgentStatus(sessionId, chatId, agent, 'working', 'turn_start'); const manifestCommands = getManifestCommands(agent); if (manifestCommands.length > 0) { setTaskCommands(taskId, manifestCommands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: manifestCommands, } as WsFrame); } // Accumulate the turn's stream for persistence + the final message content. const textChunks: string[] = []; const reasoningChunks: string[] = []; const toolSnaps = new Map(); // opencode's dcp plugin appends to the // text, streamed split across deltas — a per-chunk regex misses it (see // dcp-strip.ts). Buffer text through a cross-chunk stripper so neither the // live `delta` frames nor the persisted content ever carry the tag. const dcp = makeDcpStreamStripper(); // Map transport-agnostic AgentEvents → the SAME WS frames the ACP path emits. // This boundary is where message_id/chat_id get attached (the backend never // owns them). const onEvent = (e: AgentEvent): void => { switch (e.type) { case 'text': { const safe = dcp.push(e.text); if (safe) { textChunks.push(safe); broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: safe, } as WsFrame); } break; } case 'reasoning': reasoningChunks.push(e.text); broker.publishFrame(sessionId, { type: 'reasoning_delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'tool_call': case 'tool_update': toolSnaps.set(e.toolCall.toolCallId, e.toolCall); broker.publishFrame(sessionId, { type: 'tool_call', message_id: assistantId, chat_id: chatId, tool_call: snapshotToWireToolCall(e.toolCall), } as WsFrame); break; case 'commands': // opencode-server doesn't emit these today; ignore if it ever does. break; } }; // opencode expects provider-prefixed model ids (e.g. 'llama-swap/qwen3.6-35b…'). // DEFAULT_MODEL is bare (no prefix) because native inference uses it directly // against llama-swap. Coalesce empty string (frontend sends '' when no models // listed) and prefix bare ids so parseModel always succeeds. const rawModel = (task.model && task.model.trim()) || config.DEFAULT_MODEL; const model = rawModel.includes('/') ? rawModel : `llama-swap/${rawModel}`; const backend = getOpenCodeBackend(installPath); const handle = await backend.ensureSession(sessionId, { agent, model, chatId, worktreePath, worktreeId, projectId: task.project_id, }); const result = await backend.prompt(handle, task.input, { worktreePath, model, signal: ac.signal, onEvent, }); // Phase 3: keep the pooled backend's slot warm across this (possibly long) // turn so the idle sweep measures from turn END, not start. agentPool.touch(OPENCODE_POOL_KEY, agent); // Flush any text held back mid-tag at stream end (complete tags stripped). const dcpTail = dcp.flush(); if (dcpTail) { textChunks.push(dcpTail); broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: dcpTail, } as WsFrame); } const assistantContent = textChunks.join('').slice(0, 50_000); const reasoningText = reasoningChunks.join('').slice(0, 200_000); const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'opencode turn failed').slice(0, 500); await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText); // F1: abort short-circuit BEFORE the unconditional 'complete' write — fixes // the warm success-path recording 'complete' on a Stop'd turn. The abort fired // session.abort on the prompt only: the persistent session worktree is kept // (no cleanup) and the pooled opencode server stays warm for the next turn. if (ac.signal.aborted || stopping) { await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent); await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`; emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled'); clearTaskCommands(taskId); return; // worktree persists (no cleanup); backend stays warm } await sql` UPDATE messages SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() WHERE id = ${assistantId} `; broker.publishFrame(sessionId, { type: 'message_complete', message_id: assistantId, chat_id: chatId, model: task.model, } as WsFrame); // 1.10: diff the persistent worktree against its captured baseline and // SUPERSEDE the session's prior pending row (latest-wins, one accumulating // diff) instead of stacking. Stamp agent for DiffPanel attribution. const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal, baseRef: baseCommit ?? 'HEAD', }); if (diff) { await sql` DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending' `; await sql` INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent) VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent}) `; log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change'); } else { log.info({ taskId }, 'dispatcher: no changes detected in session worktree'); } // NO worktree cleanup — it's persistent (Phase 3 reaps it). Backend stays warm. const [extCostRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const extCostTokens = extCostRow?.total ?? null; const finalState = result.ok ? 'completed' : 'failed'; await sql` UPDATE tasks SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} WHERE id = ${taskId} `; log.info({ taskId, agent, finalState, costTokens: extCostTokens }, 'dispatcher: task finished (opencode server)'); // #10: clean completion → idle; backend-reported failure → error. emitAgentStatus( sessionId, chatId, agent, result.ok ? 'idle' : 'error', result.ok ? 'turn_complete' : 'failed', ); clearTaskCommands(taskId); } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); const status = classifyTerminalStatus({ aborted: ac.signal.aborted, error: err }); log.error({ taskId, agent, err: errMsg }, 'dispatcher: opencode server error'); await sql` UPDATE tasks SET state = ${status}, ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} AND state NOT IN ('cancelled', 'completed') `.catch(() => {}); // F1 (OCE-001): finalize the streaming message (was left 'streaming'). await finalizeMessage(sessionId, chatId, assistantId, status, task.model); // #10: turn crashed. if (chatId) emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed'); clearTaskCommands(taskId); // No worktree cleanup (persistent); backend stays warm for the next turn. } } // ─── Path B (warm ACP): goose / qwen warm backend (v2.6 Phase 2) ───────────── // Warm ACP backends are per (chat, agent): each owns ONE stdio process + ACP // connection + session. Pool key = chatId; the AgentPool's secondary key is the // agent. This mirrors agent_sessions' (chat_id, agent) PK. function getWarmAcpBackend(chatId: string, agent: string, installPath: string | null): WarmAcpBackend { let backend = agentPool.get(chatId, agent); if (!backend) { backend = new WarmAcpBackend({ sql, log, chatId, agent, installPath, resolved: getResolvedRegistry().get(agent), }); agentPool.register(chatId, agent, backend); } return backend as WarmAcpBackend; } async function runWarmAcpTask( task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; chat_id: string | null; }, installPath: string | null, ): Promise { const taskId = task.id; const agent = task.agent!; // shouldUseWarmBackend guarantees both non-null before we get here. const sessionId = task.session_id!; const chatId = task.chat_id!; log.info({ taskId, agent, chatId }, 'dispatcher: starting task (path B — warm ACP)'); const [project] = await sql<{ path: string | null }[]>` SELECT path FROM projects WHERE id = ${task.project_id} `; const projectPath = project?.path; if (!projectPath) { await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' WHERE id = ${taskId} `; return; } // F1: register the per-task abort controller so a Stop reaches this run. const ac = taskControllers.register(taskId); // F1: hoisted so the catch / abort short-circuit can finalize the streaming row. let assistantId = ''; try { await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp' WHERE id = ${taskId} `; // Persistent, session-keyed worktree (shared across turns + agents; NOT torn // down per turn — Phase 3 reaps it). Same as the opencode-server path so a // chat that switches opencode↔goose↔qwen shares one worktree. const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, { signal: ac.signal, }); log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready (warm ACP)'); const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp()) RETURNING id `; assistantId = assistantMsg!.id; // write-edit-robustness #4: pre-turn checkpoint of the persistent session // worktree (best-effort; never breaks dispatch). Same worktree the opencode // path uses — a chat that switches opencode↔goose↔qwen shares one worktree. await createCheckpoint( sql, { chatId, sessionId, worktreeId, worktreePath, messageId: assistantId }, { signal: ac.signal, log }, ).catch(() => null); broker.publishFrame(sessionId, { type: 'message_started', message_id: assistantId, chat_id: chatId, role: 'assistant', } as WsFrame); // #10: warm-ACP turn begins. emitAgentStatus(sessionId, chatId, agent, 'working', 'turn_start'); const manifestCommands = getManifestCommands(agent); if (manifestCommands.length > 0) { setTaskCommands(taskId, manifestCommands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: manifestCommands, } as WsFrame); } // Accumulate the turn's stream for persistence + the final message content. const textChunks: string[] = []; const reasoningChunks: string[] = []; const toolSnaps = new Map(); // Map transport-agnostic AgentEvents → the SAME WS frames the one-shot ACP // path emits (identical to runOpenCodeServerTask's onEvent). No dcp stripping: // that's an opencode-plugin artifact; goose/qwen don't emit dcp tags. const onEvent = (e: AgentEvent): void => { switch (e.type) { case 'text': textChunks.push(e.text); broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'reasoning': reasoningChunks.push(e.text); broker.publishFrame(sessionId, { type: 'reasoning_delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'tool_call': case 'tool_update': toolSnaps.set(e.toolCall.toolCallId, e.toolCall); broker.publishFrame(sessionId, { type: 'tool_call', message_id: assistantId, chat_id: chatId, tool_call: snapshotToWireToolCall(e.toolCall), } as WsFrame); break; case 'commands': if (e.commands.length > 0) { setTaskCommands(taskId, e.commands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: e.commands, } as WsFrame); } break; } }; const model = task.model ?? undefined; const backend = getWarmAcpBackend(chatId, agent, installPath); const handle = await backend.ensureSession(sessionId, { agent, model: model ?? '', chatId, worktreePath, worktreeId, projectId: task.project_id, }); const result = await backend.prompt(handle, task.input, { worktreePath, model: model ?? '', signal: ac.signal, onEvent, taskId, modeId: task.mode_id ?? undefined, }); // Phase 3: keep the pooled (chat,agent) backend warm across the turn. agentPool.touch(chatId, agent); const assistantContent = textChunks.join('').slice(0, 50_000); const reasoningText = reasoningChunks.join('').slice(0, 200_000); const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'warm ACP turn failed').slice(0, 500); await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText); // F1: abort short-circuit BEFORE the unconditional 'complete' write — fixes // the warm success-path recording 'complete' on a Stop'd turn. The abort fired // session/cancel on the warm connection only (never killed the child), so the // persistent worktree is kept and the pooled (chat,agent) backend stays warm. if (ac.signal.aborted || stopping) { await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent); await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`; emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled'); clearTaskCommands(taskId); return; // worktree persists (no cleanup); backend stays warm } await sql` UPDATE messages SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() WHERE id = ${assistantId} `; broker.publishFrame(sessionId, { type: 'message_complete', message_id: assistantId, chat_id: chatId, model: task.model, } as WsFrame); // Diff the persistent worktree against its captured baseline and SUPERSEDE // the session's prior pending row (latest-wins) — identical to opencode. const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal, baseRef: baseCommit ?? 'HEAD', }); if (diff) { await sql` DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending' `; await sql` INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent) VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent}) `; log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change (warm ACP)'); } else { log.info({ taskId }, 'dispatcher: no changes detected in session worktree (warm ACP)'); } // NO worktree cleanup — persistent (Phase 3 reaps it). Backend stays warm. const [extCostRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const extCostTokens = extCostRow?.total ?? null; const finalState = result.ok ? 'completed' : 'failed'; await sql` UPDATE tasks SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} WHERE id = ${taskId} `; log.info({ taskId, agent, finalState }, 'dispatcher: task finished (warm ACP)'); // #10: clean completion → idle; backend-reported failure → error. emitAgentStatus( sessionId, chatId, agent, result.ok ? 'idle' : 'error', result.ok ? 'turn_complete' : 'failed', ); clearTaskCommands(taskId); } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); const status = classifyTerminalStatus({ aborted: ac.signal.aborted, error: err }); log.error({ taskId, agent, err: errMsg }, 'dispatcher: warm ACP error'); await sql` UPDATE tasks SET state = ${status}, ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} AND state NOT IN ('cancelled', 'completed') `.catch(() => {}); // F1 (OCE-001): finalize the streaming message (was left 'streaming'). await finalizeMessage(sessionId, chatId, assistantId, status, task.model); // #10: turn crashed. emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed'); clearTaskCommands(taskId); // No worktree cleanup (persistent); backend stays warm for the next turn. } } // ─── Path B (claude SDK): warm Claude-SDK backend (v2.6 #9 Part 2) ─────────── // Claude-SDK backends are per (chat, agent) — each owns ONE persistent query() // generator driven in streaming-input mode. Pool key = chatId (secondary = agent), // mirroring agent_sessions' (chat_id, agent) PK + the warm-ACP pooling. function getClaudeSdkBackend(chatId: string, agent: string, installPath: string | null): ClaudeSdkBackend { let backend = agentPool.get(chatId, agent); if (!backend) { backend = new ClaudeSdkBackend({ sql, log, chatId, agent, installPath }); agentPool.register(chatId, agent, backend); } return backend as ClaudeSdkBackend; } async function runClaudeSdkTask( task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; thinking_option_id: string | null; session_id: string | null; chat_id: string | null; }, installPath: string | null, ): Promise { const taskId = task.id; const agent = task.agent!; // shouldUseClaudeSdk guarantees both non-null before we get here. const sessionId = task.session_id!; const chatId = task.chat_id!; log.info({ taskId, agent, chatId }, 'dispatcher: starting task (path B — claude SDK)'); const [project] = await sql<{ path: string | null }[]>` SELECT path FROM projects WHERE id = ${task.project_id} `; const projectPath = project?.path; if (!projectPath) { await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' WHERE id = ${taskId} `; return; } // F1: register the per-task abort controller so a Stop reaches this run. const ac = taskControllers.register(taskId); // F1: hoisted so the catch / abort short-circuit can finalize the streaming row. let assistantId = ''; try { await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp' WHERE id = ${taskId} `; // Persistent, session-keyed worktree (shared across turns + agents; NOT torn // down per turn — Phase 3 reaps it). Same as the opencode/warm-ACP paths so a // chat that switches agents shares one worktree. const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, { signal: ac.signal, }); log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready (claude SDK)'); const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp()) RETURNING id `; assistantId = assistantMsg!.id; // write-edit-robustness #4: pre-turn checkpoint of the persistent session // worktree (best-effort; never breaks dispatch). await createCheckpoint( sql, { chatId, sessionId, worktreeId, worktreePath, messageId: assistantId }, { signal: ac.signal, log }, ).catch(() => null); broker.publishFrame(sessionId, { type: 'message_started', message_id: assistantId, chat_id: chatId, role: 'assistant', } as WsFrame); // #10: claude-SDK turn begins. emitAgentStatus(sessionId, chatId, agent, 'working', 'turn_start'); const manifestCommands = getManifestCommands(agent); if (manifestCommands.length > 0) { setTaskCommands(taskId, manifestCommands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: manifestCommands, } as WsFrame); } // Accumulate the turn's stream for persistence + the final message content. const textChunks: string[] = []; const reasoningChunks: string[] = []; const toolSnaps = new Map(); // Map transport-agnostic AgentEvents → the SAME WS frames the warm-ACP / // opencode paths emit. This boundary attaches message_id/chat_id. const onEvent = (e: AgentEvent): void => { switch (e.type) { case 'text': textChunks.push(e.text); broker.publishFrame(sessionId, { type: 'delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'reasoning': reasoningChunks.push(e.text); broker.publishFrame(sessionId, { type: 'reasoning_delta', message_id: assistantId, chat_id: chatId, content: e.text, } as WsFrame); break; case 'tool_call': case 'tool_update': toolSnaps.set(e.toolCall.toolCallId, e.toolCall); broker.publishFrame(sessionId, { type: 'tool_call', message_id: assistantId, chat_id: chatId, tool_call: snapshotToWireToolCall(e.toolCall), } as WsFrame); break; case 'commands': if (e.commands.length > 0) { setTaskCommands(taskId, e.commands); broker.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: e.commands, } as WsFrame); } break; } }; const model = task.model ?? undefined; const backend = getClaudeSdkBackend(chatId, agent, installPath); const handle = await backend.ensureSession(sessionId, { agent, model: model ?? '', chatId, worktreePath, worktreeId, projectId: task.project_id, }); const result = await backend.prompt(handle, task.input, { worktreePath, model: model ?? '', signal: ac.signal, onEvent, taskId, modeId: task.mode_id ?? undefined, }); // Phase 3: keep the pooled (chat,agent) backend warm across the turn. agentPool.touch(chatId, agent); const assistantContent = textChunks.join('').slice(0, 50_000); const reasoningText = reasoningChunks.join('').slice(0, 200_000); const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'claude SDK turn failed').slice(0, 500); await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText); // F1: abort short-circuit BEFORE the unconditional 'complete' write — fixes // the warm success-path recording 'complete' on a Stop'd turn. The abort fired // the SDK interrupt on the same query generator only (never killed the warm // process), so the persistent worktree is kept and the backend stays warm. if (ac.signal.aborted || stopping) { await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent); await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`; emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled'); clearTaskCommands(taskId); return; // worktree persists (no cleanup); backend stays warm } // ctx_used/ctx_max from the SDK result (1M-aware) → the assistant message, so // the ContextBar renders a real context-window fill for claude. await sql` UPDATE messages SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp(), ctx_used = ${result.ctxUsed ?? null}, ctx_max = ${result.ctxMax ?? null} WHERE id = ${assistantId} `; broker.publishFrame(sessionId, { type: 'message_complete', message_id: assistantId, chat_id: chatId, model: task.model, } as WsFrame); // Diff the persistent worktree against its captured baseline and SUPERSEDE // the session's prior pending row (latest-wins) — identical to opencode/ACP. const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal, baseRef: baseCommit ?? 'HEAD', }); if (diff) { await sql` DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending' `; await sql` INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent) VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent}) `; log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change (claude SDK)'); } else { log.info({ taskId }, 'dispatcher: no changes detected in session worktree (claude SDK)'); } // NO worktree cleanup — persistent (Phase 3 reaps it). Backend stays warm. const [extCostRow] = await sql<{ total: number | null }[]>` SELECT SUM(tokens_used)::int AS total FROM messages WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL `; const extCostTokens = extCostRow?.total ?? null; const finalState = result.ok ? 'completed' : 'failed'; await sql` UPDATE tasks SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} WHERE id = ${taskId} `; log.info({ taskId, agent, finalState }, 'dispatcher: task finished (claude SDK)'); // #10: clean completion → idle; backend-reported failure → error. emitAgentStatus( sessionId, chatId, agent, result.ok ? 'idle' : 'error', result.ok ? 'turn_complete' : 'failed', ); clearTaskCommands(taskId); } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); const status = classifyTerminalStatus({ aborted: ac.signal.aborted, error: err }); log.error({ taskId, agent, err: errMsg }, 'dispatcher: claude SDK error'); await sql` UPDATE tasks SET state = ${status}, ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} AND state NOT IN ('cancelled', 'completed') `.catch(() => {}); // F1 (OCE-001): finalize the streaming message (was left 'streaming'). await finalizeMessage(sessionId, chatId, assistantId, status, task.model); // #10: turn crashed. emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed'); clearTaskCommands(taskId); // No worktree cleanup (persistent); backend stays warm for the next turn. } } // ─── Helpers ──────────────────────────────────────────────────────────────── async function waitForCompletion(assistantId: string): Promise { for (;;) { if (stopping) return 'cancelled'; const [row] = await sql<{ status: string }[]>` SELECT status FROM messages WHERE id = ${assistantId} `; const status = row?.status ?? 'failed'; if (status !== 'streaming') return status; await sleep(COMPLETION_POLL_MS); } } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } return { cancelExternalTask, start() { log.info('dispatcher: starting poll loop + tasks_new listener'); // Fallback poll — catches notifications missed while the listen connection // was down. The fast path is the NOTIFY listener below. timer = setInterval(() => triggerPoll('interval'), POLL_INTERVAL_MS); // Fast path: react immediately to new tasks. porsager reserves a dedicated // connection and auto-resubscribes on reconnect; the onlisten callback // fires on each (re)subscribe, so we kick a catch-up poll there too to // sweep up anything inserted during a disconnect. sql .listen( 'tasks_new', () => triggerPoll('notify'), () => triggerPoll('listen-subscribed'), ) .then((meta) => { listener = meta; }) .catch((err) => { log.error({ err }, 'dispatcher: failed to LISTEN tasks_new — relying on poll fallback'); }); }, async stop() { stopping = true; if (timer) { clearInterval(timer); timer = null; } if (listener) { await listener.unlisten().catch((err) => { log.error({ err }, 'dispatcher: unlisten error'); }); listener = null; } if (inflight.size > 0) { log.info({ count: inflight.size }, 'dispatcher: waiting for in-flight tasks'); await Promise.allSettled([...inflight.values()]); } log.info('dispatcher: stopped'); }, }; }