The permission_requested WS frame now carries kind ('tool'|'question'|'plan'|
'elicitation'), input (the tool's rawInput payload), and description fields.
PermissionCard detects question-type permissions (Claude Code's AskUserQuestion)
and renders an interactive radio/checkbox form instead of approve/deny buttons.
Submitting answers auto-selects the first allow option.
Also wires up ACP createElicitation (unstable/experimental) — JSON Schema-driven
forms for structured user input. The same PermissionCard renders elicitation
fields with type-appropriate inputs. Both flows use the existing permission-waiter
blocking pattern with 120s timeout.
The response path (POST /api/coder/tasks/:id/permission) now accepts optional
updated_input alongside option_id, forwarded to the ACP agent as the user's
answer payload. Elicitation responses map to accept/decline/cancel actions.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
391 lines
12 KiB
TypeScript
391 lines
12 KiB
TypeScript
/**
|
|
* ACP dispatch — runs ACP-capable agents directly on the host.
|
|
*
|
|
* v2.3: Paseo-aligned tool lifecycle — stable toolCallId, merge on
|
|
* tool_call_update, reasoning stream, worktree FS client, persist-ready snapshots.
|
|
*/
|
|
import type { FastifyBaseLogger } from 'fastify';
|
|
import {
|
|
ClientSideConnection,
|
|
type Client,
|
|
type SessionNotification,
|
|
type RequestPermissionRequest,
|
|
type RequestPermissionResponse,
|
|
type ReadTextFileRequest,
|
|
type ReadTextFileResponse,
|
|
type WriteTextFileRequest,
|
|
type WriteTextFileResponse,
|
|
type CreateTerminalRequest,
|
|
type CreateTerminalResponse,
|
|
type CreateElicitationRequest,
|
|
type CreateElicitationResponse,
|
|
type SessionConfigOption,
|
|
type ClientSideConnection as ConnectionType,
|
|
} from '@agentclientprotocol/sdk';
|
|
import type { Broker } from '@boocode/server/broker';
|
|
import type { WsFrame } from '@boocode/server/ws-frames';
|
|
import { spawn } from 'node:child_process';
|
|
import { findThoughtLevelConfigId } from './acp-derive.js';
|
|
import { resolveAcpSpawnArgs } from './acp-spawn.js';
|
|
import { createAcpNdJsonStream } from './acp-stream.js';
|
|
import { waitForPermissionResponse, waitForElicitationResponse, cancelPendingPermission } from './permission-waiter.js';
|
|
import { mergeTaskCommands, getTaskCommands } from './agent-commands-cache.js';
|
|
import { readWorktreeTextFile, writeWorktreeTextFile } from './acp-client-fs.js';
|
|
import {
|
|
type AcpToolSnapshot,
|
|
mergeToolSnapshot,
|
|
snapshotToWireToolCall,
|
|
synthesizeCanceledSnapshots,
|
|
} from './acp-tool-snapshot.js';
|
|
|
|
export interface AcpDispatchResult {
|
|
exitCode: number;
|
|
output: string;
|
|
toolSnapshots: AcpToolSnapshot[];
|
|
reasoningText: string;
|
|
stopReason: string;
|
|
}
|
|
|
|
export interface AcpDispatchOpts {
|
|
agent: string;
|
|
task: string;
|
|
worktreePath: string;
|
|
model?: string;
|
|
modeId?: string;
|
|
thinkingOptionId?: string;
|
|
taskId?: string;
|
|
sessionId?: string;
|
|
chatId?: string;
|
|
messageId?: string;
|
|
broker?: Broker;
|
|
installPath?: string;
|
|
signal?: AbortSignal;
|
|
log: FastifyBaseLogger;
|
|
}
|
|
|
|
async function applySessionOverrides(
|
|
connection: ConnectionType,
|
|
acpSessionId: string,
|
|
configOptions: SessionConfigOption[] | null | undefined,
|
|
opts: Pick<AcpDispatchOpts, 'model' | 'modeId' | 'thinkingOptionId' | 'log'>,
|
|
): Promise<void> {
|
|
const { model, modeId, thinkingOptionId, log } = opts;
|
|
|
|
if (modeId) {
|
|
try {
|
|
await connection.setSessionMode({ sessionId: acpSessionId, modeId });
|
|
} catch (err) {
|
|
log.warn({ modeId, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionMode failed');
|
|
}
|
|
}
|
|
|
|
if (model) {
|
|
try {
|
|
await connection.unstable_setSessionModel({ sessionId: acpSessionId, modelId: model });
|
|
} catch (err) {
|
|
log.warn({ model, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionModel failed');
|
|
}
|
|
}
|
|
|
|
if (thinkingOptionId) {
|
|
const configId = findThoughtLevelConfigId(configOptions);
|
|
if (configId) {
|
|
try {
|
|
await connection.setSessionConfigOption({
|
|
sessionId: acpSessionId,
|
|
configId,
|
|
value: thinkingOptionId,
|
|
});
|
|
} catch (err) {
|
|
log.warn(
|
|
{ thinkingOptionId, err: err instanceof Error ? err.message : String(err) },
|
|
'acp-dispatch: setSessionConfigOption failed',
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
class AcpStreamContext {
|
|
readonly textChunks: string[] = [];
|
|
readonly reasoningChunks: string[] = [];
|
|
readonly toolSnapshots = new Map<string, AcpToolSnapshot>();
|
|
private aborted = false;
|
|
|
|
constructor(
|
|
private readonly opts: Pick<
|
|
AcpDispatchOpts,
|
|
'broker' | 'sessionId' | 'chatId' | 'messageId' | 'taskId'
|
|
>,
|
|
private readonly worktreePath: string,
|
|
) {}
|
|
|
|
get reasoningText(): string {
|
|
return this.reasoningChunks.join('');
|
|
}
|
|
|
|
get output(): string {
|
|
return this.textChunks.join('');
|
|
}
|
|
|
|
get snapshots(): AcpToolSnapshot[] {
|
|
return [...this.toolSnapshots.values()];
|
|
}
|
|
|
|
markAborted(): void {
|
|
this.aborted = true;
|
|
for (const snap of synthesizeCanceledSnapshots(this.toolSnapshots.values())) {
|
|
this.toolSnapshots.set(snap.toolCallId, snap);
|
|
this.publishToolSnapshot(snap);
|
|
}
|
|
}
|
|
|
|
private canStream(): boolean {
|
|
return !!(this.opts.broker && this.opts.sessionId && this.opts.chatId && this.opts.messageId);
|
|
}
|
|
|
|
private publishToolSnapshot(snapshot: AcpToolSnapshot): void {
|
|
if (!this.canStream()) return;
|
|
const wire = snapshotToWireToolCall(snapshot);
|
|
this.opts.broker!.publishFrame(this.opts.sessionId!, {
|
|
type: 'tool_call',
|
|
message_id: this.opts.messageId!,
|
|
chat_id: this.opts.chatId!,
|
|
tool_call: wire,
|
|
} as WsFrame);
|
|
}
|
|
|
|
handleToolUpdate(toolCallId: string, update: Parameters<typeof mergeToolSnapshot>[1]): void {
|
|
const previous = this.toolSnapshots.get(toolCallId);
|
|
const snapshot = mergeToolSnapshot(toolCallId, update, previous);
|
|
this.toolSnapshots.set(toolCallId, snapshot);
|
|
this.publishToolSnapshot(snapshot);
|
|
}
|
|
|
|
async handleSessionUpdate(params: SessionNotification): Promise<void> {
|
|
const update = params.update;
|
|
switch (update.sessionUpdate) {
|
|
case 'agent_message_chunk': {
|
|
const content = update.content;
|
|
if (content.type === 'text' && 'text' in content) {
|
|
const text = (content as { text: string }).text;
|
|
this.textChunks.push(text);
|
|
if (this.canStream()) {
|
|
this.opts.broker!.publishFrame(this.opts.sessionId!, {
|
|
type: 'delta',
|
|
message_id: this.opts.messageId!,
|
|
chat_id: this.opts.chatId!,
|
|
content: text,
|
|
} as WsFrame);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
case 'agent_thought_chunk': {
|
|
const content = update.content;
|
|
if (content.type === 'text' && 'text' in content) {
|
|
const text = (content as { text: string }).text;
|
|
this.reasoningChunks.push(text);
|
|
if (this.canStream()) {
|
|
this.opts.broker!.publishFrame(this.opts.sessionId!, {
|
|
type: 'reasoning_delta',
|
|
message_id: this.opts.messageId!,
|
|
chat_id: this.opts.chatId!,
|
|
content: text,
|
|
} as WsFrame);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
case 'tool_call':
|
|
this.handleToolUpdate(update.toolCallId, update);
|
|
break;
|
|
case 'tool_call_update':
|
|
this.handleToolUpdate(update.toolCallId, update);
|
|
break;
|
|
case 'available_commands_update': {
|
|
const commands = update.availableCommands.map((cmd) => ({
|
|
name: cmd.name,
|
|
description: cmd.description ?? undefined,
|
|
}));
|
|
if (this.opts.taskId && commands.length > 0) {
|
|
mergeTaskCommands(this.opts.taskId, commands);
|
|
if (this.canStream() && this.opts.sessionId) {
|
|
const all = getTaskCommands(this.opts.taskId) ?? commands;
|
|
this.opts.broker!.publishFrame(this.opts.sessionId, {
|
|
type: 'agent_commands',
|
|
task_id: this.opts.taskId,
|
|
session_id: this.opts.sessionId,
|
|
commands: all,
|
|
} as WsFrame);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
buildClient(agent: string, modeId: string | undefined, taskId: string | undefined, sessionId: string | undefined): Client {
|
|
return {
|
|
sessionUpdate: (params) => this.handleSessionUpdate(params),
|
|
requestPermission: async (params: RequestPermissionRequest): Promise<RequestPermissionResponse> => {
|
|
if (taskId && sessionId) {
|
|
return waitForPermissionResponse(taskId, sessionId, agent, modeId, params);
|
|
}
|
|
const firstOption = params.options[0];
|
|
if (firstOption) {
|
|
return { outcome: { outcome: 'selected', optionId: firstOption.optionId } };
|
|
}
|
|
return { outcome: { outcome: 'cancelled' } };
|
|
},
|
|
readTextFile: async (params: ReadTextFileRequest): Promise<ReadTextFileResponse> => {
|
|
const content = await readWorktreeTextFile(
|
|
this.worktreePath,
|
|
params.path,
|
|
params.line,
|
|
params.limit,
|
|
);
|
|
return { content };
|
|
},
|
|
writeTextFile: async (params: WriteTextFileRequest): Promise<WriteTextFileResponse> => {
|
|
await writeWorktreeTextFile(this.worktreePath, params.path, params.content);
|
|
return {};
|
|
},
|
|
createTerminal: async (_params: CreateTerminalRequest): Promise<CreateTerminalResponse> => {
|
|
return { terminalId: 'noop' };
|
|
},
|
|
unstable_createElicitation: async (params: CreateElicitationRequest): Promise<CreateElicitationResponse> => {
|
|
if (taskId && sessionId) {
|
|
return waitForElicitationResponse(taskId, sessionId, agent, modeId, params);
|
|
}
|
|
return { action: 'decline' };
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
export async function dispatchViaAcp(opts: AcpDispatchOpts): Promise<AcpDispatchResult> {
|
|
const {
|
|
agent,
|
|
task,
|
|
worktreePath,
|
|
installPath,
|
|
signal,
|
|
log,
|
|
taskId,
|
|
modeId,
|
|
sessionId,
|
|
chatId,
|
|
messageId,
|
|
broker,
|
|
} = opts;
|
|
|
|
const args = resolveAcpSpawnArgs(agent);
|
|
if (!args) {
|
|
return {
|
|
exitCode: 1,
|
|
output: `Agent '${agent}' does not support ACP.`,
|
|
toolSnapshots: [],
|
|
reasoningText: '',
|
|
stopReason: 'error',
|
|
};
|
|
}
|
|
|
|
const binary = installPath ?? agent;
|
|
log.info({ agent, binary, worktreePath, modeId, model: opts.model }, 'acp-dispatch: spawning');
|
|
const child = spawn(binary, args, {
|
|
cwd: worktreePath,
|
|
stdio: ['pipe', 'pipe', 'pipe'],
|
|
env: { ...process.env },
|
|
});
|
|
|
|
const streamCtx = new AcpStreamContext(
|
|
{ broker, sessionId, chatId, messageId, taskId },
|
|
worktreePath,
|
|
);
|
|
|
|
let killed = false;
|
|
const cleanup = () => {
|
|
if (!killed) {
|
|
killed = true;
|
|
streamCtx.markAborted();
|
|
child.kill('SIGTERM');
|
|
setTimeout(() => child.kill('SIGKILL'), 5_000);
|
|
}
|
|
if (taskId) cancelPendingPermission(taskId);
|
|
};
|
|
|
|
if (signal) {
|
|
if (signal.aborted) {
|
|
cleanup();
|
|
return {
|
|
exitCode: 130,
|
|
output: 'Aborted before start',
|
|
toolSnapshots: streamCtx.snapshots,
|
|
reasoningText: '',
|
|
stopReason: 'cancelled',
|
|
};
|
|
}
|
|
signal.addEventListener('abort', cleanup, { once: true });
|
|
}
|
|
|
|
try {
|
|
const stream = createAcpNdJsonStream(child);
|
|
const connection = new ClientSideConnection(
|
|
() => streamCtx.buildClient(agent, modeId, taskId, sessionId),
|
|
stream,
|
|
);
|
|
|
|
await connection.initialize({
|
|
protocolVersion: 1,
|
|
clientInfo: { name: 'boocoder', version: '2.3.0' },
|
|
clientCapabilities: {},
|
|
});
|
|
|
|
const acpSession = await connection.newSession({ cwd: worktreePath, mcpServers: [] });
|
|
log.info({ sessionId: acpSession.sessionId }, 'acp-dispatch: session created');
|
|
|
|
await applySessionOverrides(connection, acpSession.sessionId, acpSession.configOptions, opts);
|
|
|
|
const promptResult = await connection.prompt({
|
|
sessionId: acpSession.sessionId,
|
|
prompt: [{ type: 'text', text: task }],
|
|
});
|
|
|
|
const stopReason = promptResult.stopReason ?? 'end_turn';
|
|
log.info(
|
|
{ agent, stopReason, toolCallCount: streamCtx.snapshots.length, reasoningChars: streamCtx.reasoningText.length },
|
|
'acp-dispatch: prompt completed',
|
|
);
|
|
|
|
await connection.closeSession({ sessionId: acpSession.sessionId }).catch(() => {});
|
|
|
|
return {
|
|
exitCode: 0,
|
|
output: streamCtx.output,
|
|
toolSnapshots: streamCtx.snapshots,
|
|
reasoningText: streamCtx.reasoningText,
|
|
stopReason,
|
|
};
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
log.error({ agent, err: message }, 'acp-dispatch: error');
|
|
return {
|
|
exitCode: 1,
|
|
output: message,
|
|
toolSnapshots: streamCtx.snapshots,
|
|
reasoningText: streamCtx.reasoningText,
|
|
stopReason: 'error',
|
|
};
|
|
} finally {
|
|
if (signal) signal.removeEventListener('abort', cleanup);
|
|
cleanup();
|
|
await new Promise<void>((resolve) => {
|
|
child.on('close', resolve);
|
|
setTimeout(resolve, 3_000);
|
|
});
|
|
}
|
|
}
|