WarmAcpBackend (AgentBackend) holds one persistent goose acp / qwen --acp child + ClientSideConnection + ACP session per (chat,agent); initialize+session/new once, reused across turns. Abort = session/cancel the prompt only (never kills the child); child exit -> agent_sessions.status='crashed' -> re-spawn next turn. Dispatcher routes goose/qwen chat-tab tasks to the pooled warm backend via pure shouldUseWarmBackend (needs session_id+chat_id); one-shot runExternalAgent kept as fallback for arena/MCP/new_task. handleSessionUpdate extracted to a shared pure acp-event-map.ts (one-shot path byte-identical). SDK: installed @agentclientprotocol/sdk@^0.22.1 has stable resumeSession/loadSession; resume moot in the warm hot path, deferred to Phase 3. 15 new tests (warm-acp-routing, acp-event-map); 180 coder tests pass; tsc + build clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
380 lines
12 KiB
TypeScript
380 lines
12 KiB
TypeScript
/**
|
|
* ACP dispatch — runs ACP-capable agents directly on the host.
|
|
*
|
|
* v2.3: Paseo-aligned tool lifecycle — stable toolCallId, merge on
|
|
* tool_call_update, reasoning stream, worktree FS client, persist-ready snapshots.
|
|
*/
|
|
import type { FastifyBaseLogger } from 'fastify';
|
|
import {
|
|
ClientSideConnection,
|
|
type Client,
|
|
type SessionNotification,
|
|
type RequestPermissionRequest,
|
|
type RequestPermissionResponse,
|
|
type ReadTextFileRequest,
|
|
type ReadTextFileResponse,
|
|
type WriteTextFileRequest,
|
|
type WriteTextFileResponse,
|
|
type CreateTerminalRequest,
|
|
type CreateTerminalResponse,
|
|
type CreateElicitationRequest,
|
|
type CreateElicitationResponse,
|
|
type SessionConfigOption,
|
|
type ClientSideConnection as ConnectionType,
|
|
} from '@agentclientprotocol/sdk';
|
|
import type { Broker } from '@boocode/server/broker';
|
|
import type { WsFrame } from '@boocode/server/ws-frames';
|
|
import { spawn } from 'node:child_process';
|
|
import { findThoughtLevelConfigId } from './acp-derive.js';
|
|
import { resolveLaunchSpec } from './acp-spawn.js';
|
|
import { getResolvedRegistry, type ResolvedProviderDef } from './provider-config-registry.js';
|
|
import { createAcpNdJsonStream } from './acp-stream.js';
|
|
import { waitForPermissionResponse, waitForElicitationResponse, cancelPendingPermission } from './permission-waiter.js';
|
|
import { mergeTaskCommands, getTaskCommands } from './agent-commands-cache.js';
|
|
import { readWorktreeTextFile, writeWorktreeTextFile } from './acp-client-fs.js';
|
|
import { mapSessionUpdate } from './acp-event-map.js';
|
|
import {
|
|
type AcpToolSnapshot,
|
|
snapshotToWireToolCall,
|
|
synthesizeCanceledSnapshots,
|
|
} from './acp-tool-snapshot.js';
|
|
|
|
export interface AcpDispatchResult {
|
|
exitCode: number;
|
|
output: string;
|
|
toolSnapshots: AcpToolSnapshot[];
|
|
reasoningText: string;
|
|
stopReason: string;
|
|
}
|
|
|
|
export interface AcpDispatchOpts {
|
|
agent: string;
|
|
task: string;
|
|
worktreePath: string;
|
|
model?: string;
|
|
modeId?: string;
|
|
thinkingOptionId?: string;
|
|
taskId?: string;
|
|
sessionId?: string;
|
|
chatId?: string;
|
|
messageId?: string;
|
|
broker?: Broker;
|
|
installPath?: string;
|
|
/** v2.3 phase 3: resolved registry def for launch-spec resolution. The
|
|
* dispatcher loads this by task.agent; falls back to a registry lookup here. */
|
|
resolved?: ResolvedProviderDef;
|
|
signal?: AbortSignal;
|
|
log: FastifyBaseLogger;
|
|
}
|
|
|
|
async function applySessionOverrides(
|
|
connection: ConnectionType,
|
|
acpSessionId: string,
|
|
configOptions: SessionConfigOption[] | null | undefined,
|
|
opts: Pick<AcpDispatchOpts, 'model' | 'modeId' | 'thinkingOptionId' | 'log'>,
|
|
): Promise<void> {
|
|
const { model, modeId, thinkingOptionId, log } = opts;
|
|
|
|
if (modeId) {
|
|
try {
|
|
await connection.setSessionMode({ sessionId: acpSessionId, modeId });
|
|
} catch (err) {
|
|
log.warn({ modeId, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionMode failed');
|
|
}
|
|
}
|
|
|
|
if (model) {
|
|
try {
|
|
await connection.unstable_setSessionModel({ sessionId: acpSessionId, modelId: model });
|
|
} catch (err) {
|
|
log.warn({ model, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionModel failed');
|
|
}
|
|
}
|
|
|
|
if (thinkingOptionId) {
|
|
const configId = findThoughtLevelConfigId(configOptions);
|
|
if (configId) {
|
|
try {
|
|
await connection.setSessionConfigOption({
|
|
sessionId: acpSessionId,
|
|
configId,
|
|
value: thinkingOptionId,
|
|
});
|
|
} catch (err) {
|
|
log.warn(
|
|
{ thinkingOptionId, err: err instanceof Error ? err.message : String(err) },
|
|
'acp-dispatch: setSessionConfigOption failed',
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
class AcpStreamContext {
|
|
readonly textChunks: string[] = [];
|
|
readonly reasoningChunks: string[] = [];
|
|
readonly toolSnapshots = new Map<string, AcpToolSnapshot>();
|
|
private aborted = false;
|
|
|
|
constructor(
|
|
private readonly opts: Pick<
|
|
AcpDispatchOpts,
|
|
'broker' | 'sessionId' | 'chatId' | 'messageId' | 'taskId'
|
|
>,
|
|
private readonly worktreePath: string,
|
|
) {}
|
|
|
|
get reasoningText(): string {
|
|
return this.reasoningChunks.join('');
|
|
}
|
|
|
|
get output(): string {
|
|
return this.textChunks.join('');
|
|
}
|
|
|
|
get snapshots(): AcpToolSnapshot[] {
|
|
return [...this.toolSnapshots.values()];
|
|
}
|
|
|
|
markAborted(): void {
|
|
this.aborted = true;
|
|
for (const snap of synthesizeCanceledSnapshots(this.toolSnapshots.values())) {
|
|
this.toolSnapshots.set(snap.toolCallId, snap);
|
|
this.publishToolSnapshot(snap);
|
|
}
|
|
}
|
|
|
|
private canStream(): boolean {
|
|
return !!(this.opts.broker && this.opts.sessionId && this.opts.chatId && this.opts.messageId);
|
|
}
|
|
|
|
private publishToolSnapshot(snapshot: AcpToolSnapshot): void {
|
|
if (!this.canStream()) return;
|
|
const wire = snapshotToWireToolCall(snapshot);
|
|
this.opts.broker!.publishFrame(this.opts.sessionId!, {
|
|
type: 'tool_call',
|
|
message_id: this.opts.messageId!,
|
|
chat_id: this.opts.chatId!,
|
|
tool_call: wire,
|
|
} as WsFrame);
|
|
}
|
|
|
|
async handleSessionUpdate(params: SessionNotification): Promise<void> {
|
|
// v2.6 Phase 2: the case-by-case mapping now lives in the shared, pure
|
|
// `mapSessionUpdate` (reused by the warm ACP backend). This method keeps the
|
|
// identical broker-publishing side effects — it just translates the normalized
|
|
// AgentEvents back into the same frames it always emitted. `this.toolSnapshots`
|
|
// is the merge accumulator, so a later tool_call_update merges over its
|
|
// tool_call (the prior `handleToolUpdate` behavior, byte-for-byte).
|
|
for (const event of mapSessionUpdate(params, this.toolSnapshots)) {
|
|
switch (event.type) {
|
|
case 'text':
|
|
this.textChunks.push(event.text);
|
|
if (this.canStream()) {
|
|
this.opts.broker!.publishFrame(this.opts.sessionId!, {
|
|
type: 'delta',
|
|
message_id: this.opts.messageId!,
|
|
chat_id: this.opts.chatId!,
|
|
content: event.text,
|
|
} as WsFrame);
|
|
}
|
|
break;
|
|
case 'reasoning':
|
|
this.reasoningChunks.push(event.text);
|
|
if (this.canStream()) {
|
|
this.opts.broker!.publishFrame(this.opts.sessionId!, {
|
|
type: 'reasoning_delta',
|
|
message_id: this.opts.messageId!,
|
|
chat_id: this.opts.chatId!,
|
|
content: event.text,
|
|
} as WsFrame);
|
|
}
|
|
break;
|
|
case 'tool_call':
|
|
case 'tool_update':
|
|
// mapSessionUpdate already stored the merged snapshot in this.toolSnapshots.
|
|
this.publishToolSnapshot(event.toolCall);
|
|
break;
|
|
case 'commands':
|
|
if (this.opts.taskId && event.commands.length > 0) {
|
|
mergeTaskCommands(this.opts.taskId, event.commands);
|
|
if (this.canStream() && this.opts.sessionId) {
|
|
const all = getTaskCommands(this.opts.taskId) ?? event.commands;
|
|
this.opts.broker!.publishFrame(this.opts.sessionId, {
|
|
type: 'agent_commands',
|
|
task_id: this.opts.taskId,
|
|
session_id: this.opts.sessionId,
|
|
commands: all,
|
|
} as WsFrame);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
buildClient(agent: string, modeId: string | undefined, taskId: string | undefined, sessionId: string | undefined): Client {
|
|
return {
|
|
sessionUpdate: (params) => this.handleSessionUpdate(params),
|
|
requestPermission: async (params: RequestPermissionRequest): Promise<RequestPermissionResponse> => {
|
|
if (taskId && sessionId) {
|
|
return waitForPermissionResponse(taskId, sessionId, agent, modeId, params);
|
|
}
|
|
const firstOption = params.options[0];
|
|
if (firstOption) {
|
|
return { outcome: { outcome: 'selected', optionId: firstOption.optionId } };
|
|
}
|
|
return { outcome: { outcome: 'cancelled' } };
|
|
},
|
|
readTextFile: async (params: ReadTextFileRequest): Promise<ReadTextFileResponse> => {
|
|
const content = await readWorktreeTextFile(
|
|
this.worktreePath,
|
|
params.path,
|
|
params.line,
|
|
params.limit,
|
|
);
|
|
return { content };
|
|
},
|
|
writeTextFile: async (params: WriteTextFileRequest): Promise<WriteTextFileResponse> => {
|
|
await writeWorktreeTextFile(this.worktreePath, params.path, params.content);
|
|
return {};
|
|
},
|
|
createTerminal: async (_params: CreateTerminalRequest): Promise<CreateTerminalResponse> => {
|
|
return { terminalId: 'noop' };
|
|
},
|
|
unstable_createElicitation: async (params: CreateElicitationRequest): Promise<CreateElicitationResponse> => {
|
|
if (taskId && sessionId) {
|
|
return waitForElicitationResponse(taskId, sessionId, agent, modeId, params);
|
|
}
|
|
return { action: 'decline' };
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
export async function dispatchViaAcp(opts: AcpDispatchOpts): Promise<AcpDispatchResult> {
|
|
const {
|
|
agent,
|
|
task,
|
|
worktreePath,
|
|
installPath,
|
|
signal,
|
|
log,
|
|
taskId,
|
|
modeId,
|
|
sessionId,
|
|
chatId,
|
|
messageId,
|
|
broker,
|
|
} = opts;
|
|
|
|
// v2.3 phase 3: launch from the resolved registry def (config override /
|
|
// custom-ACP command) with the built-in switch as the fallback. The dispatcher
|
|
// passes `resolved`; fall back to a registry lookup if it didn't.
|
|
const resolved = opts.resolved ?? getResolvedRegistry().get(agent);
|
|
const spec = resolved ? resolveLaunchSpec(resolved, installPath ?? null) : null;
|
|
if (!spec) {
|
|
return {
|
|
exitCode: 1,
|
|
output: `Agent '${agent}' does not support ACP.`,
|
|
toolSnapshots: [],
|
|
reasoningText: '',
|
|
stopReason: 'error',
|
|
};
|
|
}
|
|
|
|
log.info({ agent, binary: spec.binary, worktreePath, modeId, model: opts.model }, 'acp-dispatch: spawning');
|
|
const child = spawn(spec.binary, spec.args, {
|
|
cwd: worktreePath,
|
|
stdio: ['pipe', 'pipe', 'pipe'],
|
|
env: { ...process.env, ...spec.env },
|
|
});
|
|
|
|
const streamCtx = new AcpStreamContext(
|
|
{ broker, sessionId, chatId, messageId, taskId },
|
|
worktreePath,
|
|
);
|
|
|
|
let killed = false;
|
|
const cleanup = () => {
|
|
if (!killed) {
|
|
killed = true;
|
|
streamCtx.markAborted();
|
|
child.kill('SIGTERM');
|
|
setTimeout(() => child.kill('SIGKILL'), 5_000);
|
|
}
|
|
if (taskId) cancelPendingPermission(taskId);
|
|
};
|
|
|
|
if (signal) {
|
|
if (signal.aborted) {
|
|
cleanup();
|
|
return {
|
|
exitCode: 130,
|
|
output: 'Aborted before start',
|
|
toolSnapshots: streamCtx.snapshots,
|
|
reasoningText: '',
|
|
stopReason: 'cancelled',
|
|
};
|
|
}
|
|
signal.addEventListener('abort', cleanup, { once: true });
|
|
}
|
|
|
|
try {
|
|
const stream = createAcpNdJsonStream(child);
|
|
const connection = new ClientSideConnection(
|
|
() => streamCtx.buildClient(agent, modeId, taskId, sessionId),
|
|
stream,
|
|
);
|
|
|
|
await connection.initialize({
|
|
protocolVersion: 1,
|
|
clientInfo: { name: 'boocoder', version: '2.3.0' },
|
|
clientCapabilities: {},
|
|
});
|
|
|
|
const acpSession = await connection.newSession({ cwd: worktreePath, mcpServers: [] });
|
|
log.info({ sessionId: acpSession.sessionId }, 'acp-dispatch: session created');
|
|
|
|
await applySessionOverrides(connection, acpSession.sessionId, acpSession.configOptions, opts);
|
|
|
|
const promptResult = await connection.prompt({
|
|
sessionId: acpSession.sessionId,
|
|
prompt: [{ type: 'text', text: task }],
|
|
});
|
|
|
|
const stopReason = promptResult.stopReason ?? 'end_turn';
|
|
log.info(
|
|
{ agent, stopReason, toolCallCount: streamCtx.snapshots.length, reasoningChars: streamCtx.reasoningText.length },
|
|
'acp-dispatch: prompt completed',
|
|
);
|
|
|
|
await connection.closeSession({ sessionId: acpSession.sessionId }).catch(() => {});
|
|
|
|
return {
|
|
exitCode: 0,
|
|
output: streamCtx.output,
|
|
toolSnapshots: streamCtx.snapshots,
|
|
reasoningText: streamCtx.reasoningText,
|
|
stopReason,
|
|
};
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
log.error({ agent, err: message }, 'acp-dispatch: error');
|
|
return {
|
|
exitCode: 1,
|
|
output: message,
|
|
toolSnapshots: streamCtx.snapshots,
|
|
reasoningText: streamCtx.reasoningText,
|
|
stopReason: 'error',
|
|
};
|
|
} finally {
|
|
if (signal) signal.removeEventListener('abort', cleanup);
|
|
cleanup();
|
|
await new Promise<void>((resolve) => {
|
|
child.on('close', resolve);
|
|
setTimeout(resolve, 3_000);
|
|
});
|
|
}
|
|
}
|