Files
boocode/apps/coder/src/services/acp-dispatch.ts
indifferentketchup 1937af8df9 feat: in-app Orchestrator (Phase 2) — multi-agent conductor
Brings the deterministic Han-flow conductor into BooCode: launch any read-only
flow from BooChat or BooCoder, watch each agent stream live in a Paseo-style
run pane, get an evidence-disciplined report — on local Qwen, persisted and
resumable. Read-only enforced hard via qwen --approval-mode plan (orchestrator
tasks fail closed if qwen is unavailable; never fall to write-capable native).

Backend (apps/coder): re-homed conductor defs, flow_runs/flow_steps schema,
flow-runner + dispatcher onTaskTerminal hook, restart-resume, runs routes
(launch/list/get/cancel), user-channel WS. Contracts: two flow_run_* frames.
Web: orchestrator pane kind + OrchestratorPane, Workflow button + slash flows
(BooChat/BooCoder parity), FlowLauncherDialog, "New Orchestrator" in the + and
split menus, runs history + export. Plan: openspec/changes/orchestrator.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 15:22:48 +00:00

307 lines
9.9 KiB
TypeScript

/**
* ACP dispatch — runs ACP-capable agents directly on the host.
*
* v2.3: Paseo-aligned tool lifecycle — stable toolCallId, merge on
* tool_call_update, reasoning stream, worktree FS client, persist-ready snapshots.
*/
import type { FastifyBaseLogger } from 'fastify';
import {
ClientSideConnection,
type Client,
type SessionNotification,
type SessionConfigOption,
type ClientSideConnection as ConnectionType,
} from '@agentclientprotocol/sdk';
import type { Broker } from '@boocode/server/broker';
import { spawn } from 'node:child_process';
import { findThoughtLevelConfigId } from './acp-derive.js';
import { resolveLaunchSpec } from './acp-spawn.js';
import { getResolvedRegistry, type ResolvedProviderDef } from './provider-config-registry.js';
import { createAcpNdJsonStream } from './acp-stream.js';
import { cancelPendingPermission } from './permission-waiter.js';
import { mapSessionUpdate } from './acp-event-map.js';
import { type AcpToolSnapshot, synthesizeCanceledSnapshots } from './acp-tool-snapshot.js';
import { makeFrameEmitter, type FrameEmitter } from './frame-emitter.js';
import { buildAcpClient } from './acp-client.js';
/**
* Mode ids that enforce read-only at the agent's tool layer. When one of these is
* requested, applying it is safety-critical: a failure to set it must abort the
* turn (fail closed), never continue write-capable. `plan` is qwen's read-only
* approval mode (the orchestrator's gate, D-4); extend this set if another agent's
* read-only mode id is added to flows.
*/
const READ_ONLY_MODE_IDS = new Set(['plan']);
export interface AcpDispatchResult {
exitCode: number;
output: string;
toolSnapshots: AcpToolSnapshot[];
reasoningText: string;
stopReason: string;
}
export interface AcpDispatchOpts {
agent: string;
task: string;
worktreePath: string;
model?: string;
modeId?: string;
thinkingOptionId?: string;
taskId?: string;
sessionId?: string;
chatId?: string;
messageId?: string;
broker?: Broker;
installPath?: string;
/** v2.3 phase 3: resolved registry def for launch-spec resolution. The
* dispatcher loads this by task.agent; falls back to a registry lookup here. */
resolved?: ResolvedProviderDef;
signal?: AbortSignal;
log: FastifyBaseLogger;
}
async function applySessionOverrides(
connection: ConnectionType,
acpSessionId: string,
configOptions: SessionConfigOption[] | null | undefined,
opts: Pick<AcpDispatchOpts, 'model' | 'modeId' | 'thinkingOptionId' | 'log'>,
): Promise<void> {
const { model, modeId, thinkingOptionId, log } = opts;
if (modeId) {
try {
await connection.setSessionMode({ sessionId: acpSessionId, modeId });
} catch (err) {
// Defense-in-depth for the orchestrator read-only invariant (D-4): a
// read-only / plan mode that CANNOT be applied must FAIL CLOSED. If we only
// warned and continued (the default for non-read-only modes), the agent
// would run write-capable under a plan-mode request — exactly the silent
// gap this guards. Re-throw so dispatchViaAcp's catch marks the task failed
// rather than running an unguarded turn. (Orchestrator qwen+plan is routed
// to the PTY hard gate and never reaches here; this backstops any other
// route that lands a read-only mode on the ACP path.)
if (READ_ONLY_MODE_IDS.has(modeId)) {
log.error(
{ modeId, err: err instanceof Error ? err.message : String(err) },
'acp-dispatch: read-only setSessionMode failed — failing closed (aborting turn)',
);
throw err instanceof Error ? err : new Error(String(err));
}
log.warn({ modeId, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionMode failed');
}
}
if (model) {
try {
await connection.unstable_setSessionModel({ sessionId: acpSessionId, modelId: model });
} catch (err) {
log.warn({ model, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionModel failed');
}
}
if (thinkingOptionId) {
const configId = findThoughtLevelConfigId(configOptions);
if (configId) {
try {
await connection.setSessionConfigOption({
sessionId: acpSessionId,
configId,
value: thinkingOptionId,
});
} catch (err) {
log.warn(
{ thinkingOptionId, err: err instanceof Error ? err.message : String(err) },
'acp-dispatch: setSessionConfigOption failed',
);
}
}
}
}
class AcpStreamContext {
/** AgentEvent → WS-frame mapping + text/reasoning/tool accumulation (shared
* `makeFrameEmitter`). The one-shot path passes no `dcp` stripper, so text is
* emitted verbatim — byte-identical to the prior inline switch. */
private readonly emitter: FrameEmitter;
constructor(
opts: Pick<AcpDispatchOpts, 'broker' | 'sessionId' | 'chatId' | 'messageId' | 'taskId'>,
private readonly worktreePath: string,
) {
this.emitter = makeFrameEmitter({
broker: opts.broker,
sessionId: opts.sessionId,
chatId: opts.chatId,
assistantId: opts.messageId,
taskId: opts.taskId,
});
}
get reasoningText(): string {
return this.emitter.reasoningText;
}
get output(): string {
return this.emitter.output;
}
get snapshots(): AcpToolSnapshot[] {
return this.emitter.snapshots;
}
markAborted(): void {
// Synthesize 'canceled' updates for still-running tool calls so the UI doesn't
// leave them spinning, then emit them through the same frame path (tool_update
// → the same `tool_call` wire frame the original published).
for (const snap of synthesizeCanceledSnapshots(this.emitter.toolSnapshots.values())) {
this.emitter.onEvent({ type: 'tool_update', toolCall: snap });
}
}
handleSessionUpdate(params: SessionNotification): void {
// The merge accumulator (`this.emitter.toolSnapshots`) is the same Map the
// emitter publishes from, so a later tool_call_update merges over its tool_call.
for (const event of mapSessionUpdate(params, this.emitter.toolSnapshots)) {
this.emitter.onEvent(event);
}
}
buildClient(agent: string, modeId: string | undefined, taskId: string | undefined, sessionId: string | undefined): Client {
return buildAcpClient(this.worktreePath, () => ({
taskId,
sessionId,
modeId,
agent,
onSessionUpdate: (params) => this.handleSessionUpdate(params),
}));
}
}
export async function dispatchViaAcp(opts: AcpDispatchOpts): Promise<AcpDispatchResult> {
const {
agent,
task,
worktreePath,
installPath,
signal,
log,
taskId,
modeId,
sessionId,
chatId,
messageId,
broker,
} = opts;
// v2.3 phase 3: launch from the resolved registry def (config override /
// custom-ACP command) with the built-in switch as the fallback. The dispatcher
// passes `resolved`; fall back to a registry lookup if it didn't.
const resolved = opts.resolved ?? getResolvedRegistry().get(agent);
const spec = resolved ? resolveLaunchSpec(resolved, installPath ?? null) : null;
if (!spec) {
return {
exitCode: 1,
output: `Agent '${agent}' does not support ACP.`,
toolSnapshots: [],
reasoningText: '',
stopReason: 'error',
};
}
log.info({ agent, binary: spec.binary, worktreePath, modeId, model: opts.model }, 'acp-dispatch: spawning');
const child = spawn(spec.binary, spec.args, {
cwd: worktreePath,
stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env, ...spec.env },
});
const streamCtx = new AcpStreamContext(
{ broker, sessionId, chatId, messageId, taskId },
worktreePath,
);
let killed = false;
const cleanup = () => {
if (!killed) {
killed = true;
streamCtx.markAborted();
child.kill('SIGTERM');
setTimeout(() => child.kill('SIGKILL'), 5_000);
}
if (taskId) cancelPendingPermission(taskId);
};
if (signal) {
if (signal.aborted) {
cleanup();
return {
exitCode: 130,
output: 'Aborted before start',
toolSnapshots: streamCtx.snapshots,
reasoningText: '',
stopReason: 'cancelled',
};
}
signal.addEventListener('abort', cleanup, { once: true });
}
try {
const stream = createAcpNdJsonStream(child);
const connection = new ClientSideConnection(
() => streamCtx.buildClient(agent, modeId, taskId, sessionId),
stream,
);
await connection.initialize({
protocolVersion: 1,
clientInfo: { name: 'boocoder', version: '2.3.0' },
clientCapabilities: {},
});
const acpSession = await connection.newSession({ cwd: worktreePath, mcpServers: [] });
log.info({ sessionId: acpSession.sessionId }, 'acp-dispatch: session created');
await applySessionOverrides(connection, acpSession.sessionId, acpSession.configOptions, opts);
const promptResult = await connection.prompt({
sessionId: acpSession.sessionId,
prompt: [{ type: 'text', text: task }],
});
const stopReason = promptResult.stopReason ?? 'end_turn';
log.info(
{ agent, stopReason, toolCallCount: streamCtx.snapshots.length, reasoningChars: streamCtx.reasoningText.length },
'acp-dispatch: prompt completed',
);
await connection.closeSession({ sessionId: acpSession.sessionId }).catch(() => {});
return {
exitCode: 0,
output: streamCtx.output,
toolSnapshots: streamCtx.snapshots,
reasoningText: streamCtx.reasoningText,
stopReason,
};
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.error({ agent, err: message }, 'acp-dispatch: error');
return {
exitCode: 1,
output: message,
toolSnapshots: streamCtx.snapshots,
reasoningText: streamCtx.reasoningText,
stopReason: 'error',
};
} finally {
if (signal) signal.removeEventListener('abort', cleanup);
cleanup();
await new Promise<void>((resolve) => {
child.on('close', resolve);
setTimeout(resolve, 3_000);
});
}
}