/** * ACP dispatch — runs ACP-capable agents directly on the host. * * v2.3: Paseo-aligned tool lifecycle — stable toolCallId, merge on * tool_call_update, reasoning stream, worktree FS client, persist-ready snapshots. */ import type { FastifyBaseLogger } from 'fastify'; import { ClientSideConnection, type Client, type SessionNotification, type RequestPermissionRequest, type RequestPermissionResponse, type ReadTextFileRequest, type ReadTextFileResponse, type WriteTextFileRequest, type WriteTextFileResponse, type CreateTerminalRequest, type CreateTerminalResponse, type CreateElicitationRequest, type CreateElicitationResponse, type SessionConfigOption, type ClientSideConnection as ConnectionType, } from '@agentclientprotocol/sdk'; import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/server/ws-frames'; import { spawn } from 'node:child_process'; import { findThoughtLevelConfigId } from './acp-derive.js'; import { resolveLaunchSpec } from './acp-spawn.js'; import { getResolvedRegistry, type ResolvedProviderDef } from './provider-config-registry.js'; import { createAcpNdJsonStream } from './acp-stream.js'; import { waitForPermissionResponse, waitForElicitationResponse, cancelPendingPermission } from './permission-waiter.js'; import { mergeTaskCommands, getTaskCommands } from './agent-commands-cache.js'; import { readWorktreeTextFile, writeWorktreeTextFile } from './acp-client-fs.js'; import { type AcpToolSnapshot, mergeToolSnapshot, snapshotToWireToolCall, synthesizeCanceledSnapshots, } from './acp-tool-snapshot.js'; export interface AcpDispatchResult { exitCode: number; output: string; toolSnapshots: AcpToolSnapshot[]; reasoningText: string; stopReason: string; } export interface AcpDispatchOpts { agent: string; task: string; worktreePath: string; model?: string; modeId?: string; thinkingOptionId?: string; taskId?: string; sessionId?: string; chatId?: string; messageId?: string; broker?: Broker; installPath?: string; /** v2.3 phase 3: resolved registry def for launch-spec resolution. The * dispatcher loads this by task.agent; falls back to a registry lookup here. */ resolved?: ResolvedProviderDef; signal?: AbortSignal; log: FastifyBaseLogger; } async function applySessionOverrides( connection: ConnectionType, acpSessionId: string, configOptions: SessionConfigOption[] | null | undefined, opts: Pick, ): Promise { const { model, modeId, thinkingOptionId, log } = opts; if (modeId) { try { await connection.setSessionMode({ sessionId: acpSessionId, modeId }); } catch (err) { log.warn({ modeId, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionMode failed'); } } if (model) { try { await connection.unstable_setSessionModel({ sessionId: acpSessionId, modelId: model }); } catch (err) { log.warn({ model, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionModel failed'); } } if (thinkingOptionId) { const configId = findThoughtLevelConfigId(configOptions); if (configId) { try { await connection.setSessionConfigOption({ sessionId: acpSessionId, configId, value: thinkingOptionId, }); } catch (err) { log.warn( { thinkingOptionId, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionConfigOption failed', ); } } } } class AcpStreamContext { readonly textChunks: string[] = []; readonly reasoningChunks: string[] = []; readonly toolSnapshots = new Map(); private aborted = false; constructor( private readonly opts: Pick< AcpDispatchOpts, 'broker' | 'sessionId' | 'chatId' | 'messageId' | 'taskId' >, private readonly worktreePath: string, ) {} get reasoningText(): string { return this.reasoningChunks.join(''); } get output(): string { return this.textChunks.join(''); } get snapshots(): AcpToolSnapshot[] { return [...this.toolSnapshots.values()]; } markAborted(): void { this.aborted = true; for (const snap of synthesizeCanceledSnapshots(this.toolSnapshots.values())) { this.toolSnapshots.set(snap.toolCallId, snap); this.publishToolSnapshot(snap); } } private canStream(): boolean { return !!(this.opts.broker && this.opts.sessionId && this.opts.chatId && this.opts.messageId); } private publishToolSnapshot(snapshot: AcpToolSnapshot): void { if (!this.canStream()) return; const wire = snapshotToWireToolCall(snapshot); this.opts.broker!.publishFrame(this.opts.sessionId!, { type: 'tool_call', message_id: this.opts.messageId!, chat_id: this.opts.chatId!, tool_call: wire, } as WsFrame); } handleToolUpdate(toolCallId: string, update: Parameters[1]): void { const previous = this.toolSnapshots.get(toolCallId); const snapshot = mergeToolSnapshot(toolCallId, update, previous); this.toolSnapshots.set(toolCallId, snapshot); this.publishToolSnapshot(snapshot); } async handleSessionUpdate(params: SessionNotification): Promise { const update = params.update; switch (update.sessionUpdate) { case 'agent_message_chunk': { const content = update.content; if (content.type === 'text' && 'text' in content) { const text = (content as { text: string }).text; this.textChunks.push(text); if (this.canStream()) { this.opts.broker!.publishFrame(this.opts.sessionId!, { type: 'delta', message_id: this.opts.messageId!, chat_id: this.opts.chatId!, content: text, } as WsFrame); } } break; } case 'agent_thought_chunk': { const content = update.content; if (content.type === 'text' && 'text' in content) { const text = (content as { text: string }).text; this.reasoningChunks.push(text); if (this.canStream()) { this.opts.broker!.publishFrame(this.opts.sessionId!, { type: 'reasoning_delta', message_id: this.opts.messageId!, chat_id: this.opts.chatId!, content: text, } as WsFrame); } } break; } case 'tool_call': this.handleToolUpdate(update.toolCallId, update); break; case 'tool_call_update': this.handleToolUpdate(update.toolCallId, update); break; case 'available_commands_update': { const commands = update.availableCommands.map((cmd) => ({ name: cmd.name, description: cmd.description ?? undefined, })); if (this.opts.taskId && commands.length > 0) { mergeTaskCommands(this.opts.taskId, commands); if (this.canStream() && this.opts.sessionId) { const all = getTaskCommands(this.opts.taskId) ?? commands; this.opts.broker!.publishFrame(this.opts.sessionId, { type: 'agent_commands', task_id: this.opts.taskId, session_id: this.opts.sessionId, commands: all, } as WsFrame); } } break; } default: break; } } buildClient(agent: string, modeId: string | undefined, taskId: string | undefined, sessionId: string | undefined): Client { return { sessionUpdate: (params) => this.handleSessionUpdate(params), requestPermission: async (params: RequestPermissionRequest): Promise => { if (taskId && sessionId) { return waitForPermissionResponse(taskId, sessionId, agent, modeId, params); } const firstOption = params.options[0]; if (firstOption) { return { outcome: { outcome: 'selected', optionId: firstOption.optionId } }; } return { outcome: { outcome: 'cancelled' } }; }, readTextFile: async (params: ReadTextFileRequest): Promise => { const content = await readWorktreeTextFile( this.worktreePath, params.path, params.line, params.limit, ); return { content }; }, writeTextFile: async (params: WriteTextFileRequest): Promise => { await writeWorktreeTextFile(this.worktreePath, params.path, params.content); return {}; }, createTerminal: async (_params: CreateTerminalRequest): Promise => { return { terminalId: 'noop' }; }, unstable_createElicitation: async (params: CreateElicitationRequest): Promise => { if (taskId && sessionId) { return waitForElicitationResponse(taskId, sessionId, agent, modeId, params); } return { action: 'decline' }; }, }; } } export async function dispatchViaAcp(opts: AcpDispatchOpts): Promise { const { agent, task, worktreePath, installPath, signal, log, taskId, modeId, sessionId, chatId, messageId, broker, } = opts; // v2.3 phase 3: launch from the resolved registry def (config override / // custom-ACP command) with the built-in switch as the fallback. The dispatcher // passes `resolved`; fall back to a registry lookup if it didn't. const resolved = opts.resolved ?? getResolvedRegistry().get(agent); const spec = resolved ? resolveLaunchSpec(resolved, installPath ?? null) : null; if (!spec) { return { exitCode: 1, output: `Agent '${agent}' does not support ACP.`, toolSnapshots: [], reasoningText: '', stopReason: 'error', }; } log.info({ agent, binary: spec.binary, worktreePath, modeId, model: opts.model }, 'acp-dispatch: spawning'); const child = spawn(spec.binary, spec.args, { cwd: worktreePath, stdio: ['pipe', 'pipe', 'pipe'], env: { ...process.env, ...spec.env }, }); const streamCtx = new AcpStreamContext( { broker, sessionId, chatId, messageId, taskId }, worktreePath, ); let killed = false; const cleanup = () => { if (!killed) { killed = true; streamCtx.markAborted(); child.kill('SIGTERM'); setTimeout(() => child.kill('SIGKILL'), 5_000); } if (taskId) cancelPendingPermission(taskId); }; if (signal) { if (signal.aborted) { cleanup(); return { exitCode: 130, output: 'Aborted before start', toolSnapshots: streamCtx.snapshots, reasoningText: '', stopReason: 'cancelled', }; } signal.addEventListener('abort', cleanup, { once: true }); } try { const stream = createAcpNdJsonStream(child); const connection = new ClientSideConnection( () => streamCtx.buildClient(agent, modeId, taskId, sessionId), stream, ); await connection.initialize({ protocolVersion: 1, clientInfo: { name: 'boocoder', version: '2.3.0' }, clientCapabilities: {}, }); const acpSession = await connection.newSession({ cwd: worktreePath, mcpServers: [] }); log.info({ sessionId: acpSession.sessionId }, 'acp-dispatch: session created'); await applySessionOverrides(connection, acpSession.sessionId, acpSession.configOptions, opts); const promptResult = await connection.prompt({ sessionId: acpSession.sessionId, prompt: [{ type: 'text', text: task }], }); const stopReason = promptResult.stopReason ?? 'end_turn'; log.info( { agent, stopReason, toolCallCount: streamCtx.snapshots.length, reasoningChars: streamCtx.reasoningText.length }, 'acp-dispatch: prompt completed', ); await connection.closeSession({ sessionId: acpSession.sessionId }).catch(() => {}); return { exitCode: 0, output: streamCtx.output, toolSnapshots: streamCtx.snapshots, reasoningText: streamCtx.reasoningText, stopReason, }; } catch (err) { const message = err instanceof Error ? err.message : String(err); log.error({ agent, err: message }, 'acp-dispatch: error'); return { exitCode: 1, output: message, toolSnapshots: streamCtx.snapshots, reasoningText: streamCtx.reasoningText, stopReason: 'error', }; } finally { if (signal) signal.removeEventListener('abort', cleanup); cleanup(); await new Promise((resolve) => { child.on('close', resolve); setTimeout(resolve, 3_000); }); } }