/** * ACP dispatch — runs ACP-capable agents (opencode, goose) on the host via SSH. * * Uses the @agentclientprotocol/sdk to establish a structured JSON-RPC session * with the agent subprocess. The SSH tunnel provides stdio transport. * * Flow: * 1. SSH to host, start `opencode acp` (or `goose acp`) in the worktree * 2. Wrap SSH child's stdin/stdout into NDJSON streams * 3. Create a ClientSideConnection from the SDK * 4. Initialize → newSession → prompt(task) * 5. Collect session updates (tool calls, text output) * 6. On prompt completion → return collected output */ import type { FastifyBaseLogger } from 'fastify'; import { Readable, Writable } from 'node:stream'; import { ClientSideConnection, ndJsonStream, type Client, type SessionNotification, type RequestPermissionRequest, type RequestPermissionResponse, type ReadTextFileRequest, type ReadTextFileResponse, type WriteTextFileRequest, type WriteTextFileResponse, type CreateTerminalRequest, type CreateTerminalResponse, } from '@agentclientprotocol/sdk'; import { sshSpawn } from './ssh.js'; export interface AcpDispatchResult { exitCode: number; output: string; toolCalls: Array<{ title: string; input: unknown; output?: unknown }>; stopReason: string; } export interface AcpDispatchOpts { agent: string; task: string; worktreePath: string; model?: string; signal?: AbortSignal; log: FastifyBaseLogger; } /** Map agent name to the ACP command it exposes. */ function acpCommand(agent: string): string | null { switch (agent) { case 'opencode': return 'opencode acp'; case 'goose': return 'goose acp'; default: return null; } } /** * Convert a Node.js Readable stream to a web ReadableStream. */ function nodeReadableToWeb(nodeStream: NodeJS.ReadableStream): ReadableStream { return new ReadableStream({ start(controller) { nodeStream.on('data', (chunk: Buffer) => { controller.enqueue(new Uint8Array(chunk)); }); nodeStream.on('end', () => { controller.close(); }); nodeStream.on('error', (err) => { controller.error(err); }); }, cancel() { if ('destroy' in nodeStream && typeof (nodeStream as Readable).destroy === 'function') { (nodeStream as Readable).destroy(); } }, }); } /** * Convert a Node.js Writable stream to a web WritableStream. */ function nodeWritableToWeb(nodeStream: NodeJS.WritableStream): WritableStream { return new WritableStream({ write(chunk) { return new Promise((resolve, reject) => { const ok = (nodeStream as Writable).write(chunk, (err) => { if (err) reject(err); }); if (ok) resolve(); else (nodeStream as Writable).once('drain', resolve); }); }, close() { return new Promise((resolve) => { (nodeStream as Writable).end(resolve); }); }, abort() { (nodeStream as Writable).destroy(); }, }); } /** * Dispatch a task to an ACP-capable agent via SSH. * * Opens a structured ACP session, sends the task as a prompt, and collects * all session updates. Returns the collected output and tool calls. */ export async function dispatchViaAcp(opts: AcpDispatchOpts): Promise { const { agent, task, worktreePath, signal, log } = opts; const cmd = acpCommand(agent); if (!cmd) { return { exitCode: 1, output: `Agent '${agent}' does not support ACP.`, toolCalls: [], stopReason: 'error', }; } // Spawn SSH with the ACP command running in the worktree const escapedPath = worktreePath.replace(/'/g, "'\\''"); const fullCommand = `cd '${escapedPath}' && ${cmd}`; log.info({ agent, worktreePath }, 'acp-dispatch: spawning'); const child = sshSpawn(fullCommand); // Wire up abort let killed = false; const cleanup = () => { if (!killed) { killed = true; child.kill('SIGTERM'); setTimeout(() => child.kill('SIGKILL'), 5_000); } }; if (signal) { if (signal.aborted) { cleanup(); return { exitCode: 130, output: 'Aborted before start', toolCalls: [], stopReason: 'cancelled' }; } signal.addEventListener('abort', cleanup, { once: true }); } try { // Create web streams from the child process stdio const inputStream = nodeReadableToWeb(child.stdout!); const outputStream = nodeWritableToWeb(child.stdin!); // Create the NDJSON ACP stream const stream = ndJsonStream(outputStream, inputStream); // Collected session updates const textChunks: string[] = []; const toolCalls: Array<{ title: string; input: unknown; output?: unknown }> = []; // Create client-side connection — we are the "client" (editor), the agent is remote const connection = new ClientSideConnection( (_agentInterface): Client => ({ // Handle session updates from the agent async sessionUpdate(params: SessionNotification): Promise { const update = params.update; if (update.sessionUpdate === 'agent_message_chunk') { // ContentChunk with content: ContentBlock const content = update.content; if (content.type === 'text' && 'text' in content) { textChunks.push((content as { text: string }).text); } } else if (update.sessionUpdate === 'tool_call') { toolCalls.push({ title: update.title, input: update.rawInput, }); } else if (update.sessionUpdate === 'tool_call_update') { const last = toolCalls[toolCalls.length - 1]; if (last && update.rawOutput !== undefined) { last.output = update.rawOutput; } } }, // Permission requests — auto-approve by selecting the first option (worktree is isolated) async requestPermission(params: RequestPermissionRequest): Promise { // Select the first available option to auto-approve const firstOption = params.options[0]; if (firstOption) { return { outcome: { outcome: 'selected', optionId: firstOption.optionId }, }; } // No options available — cancel return { outcome: { outcome: 'cancelled' } }; }, // File system operations — let the agent handle them directly in the worktree async readTextFile(_params: ReadTextFileRequest): Promise { return { content: '' }; }, async writeTextFile(_params: WriteTextFileRequest): Promise { return {}; }, async createTerminal(_params: CreateTerminalRequest): Promise { return { terminalId: 'noop' }; }, }), stream, ); // Initialize the connection // ProtocolVersion is a number in this SDK version const initResult = await connection.initialize({ protocolVersion: 1, clientInfo: { name: 'boocoder', version: '2.0.1' }, clientCapabilities: {}, }); log.info({ agentInfo: initResult.agentInfo }, 'acp-dispatch: initialized'); // Create a new session const session = await connection.newSession({ cwd: worktreePath, mcpServers: [], }); log.info({ sessionId: session.sessionId }, 'acp-dispatch: session created'); // Send the prompt const promptResult = await connection.prompt({ sessionId: session.sessionId, prompt: [{ type: 'text', text: task }], }); const stopReason = promptResult.stopReason ?? 'end_turn'; log.info({ agent, stopReason, toolCallCount: toolCalls.length }, 'acp-dispatch: prompt completed'); // Clean shutdown await connection.closeSession({ sessionId: session.sessionId }).catch(() => {}); return { exitCode: 0, output: textChunks.join(''), toolCalls, stopReason, }; } catch (err) { const message = err instanceof Error ? err.message : String(err); log.error({ agent, err: message }, 'acp-dispatch: error'); return { exitCode: 1, output: message, toolCalls: [], stopReason: 'error', }; } finally { if (signal) signal.removeEventListener('abort', cleanup); cleanup(); // Wait for child to exit await new Promise((resolve) => { child.on('close', resolve); setTimeout(resolve, 3_000); }); } }