feat(coder): v2.6 Phase 2 — warm ACP backend for goose/qwen
WarmAcpBackend (AgentBackend) holds one persistent goose acp / qwen --acp child + ClientSideConnection + ACP session per (chat,agent); initialize+session/new once, reused across turns. Abort = session/cancel the prompt only (never kills the child); child exit -> agent_sessions.status='crashed' -> re-spawn next turn. Dispatcher routes goose/qwen chat-tab tasks to the pooled warm backend via pure shouldUseWarmBackend (needs session_id+chat_id); one-shot runExternalAgent kept as fallback for arena/MCP/new_task. handleSessionUpdate extracted to a shared pure acp-event-map.ts (one-shot path byte-identical). SDK: installed @agentclientprotocol/sdk@^0.22.1 has stable resumeSession/loadSession; resume moot in the warm hot path, deferred to Phase 3. 15 new tests (warm-acp-routing, acp-event-map); 180 coder tests pass; tsc + build clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
411
apps/coder/src/services/backends/warm-acp.ts
Normal file
411
apps/coder/src/services/backends/warm-acp.ts
Normal file
@@ -0,0 +1,411 @@
|
||||
/**
|
||||
* v2.6 Phase 2 — WarmAcpBackend (goose, qwen).
|
||||
*
|
||||
* One persistent stdio process + ONE `ClientSideConnection` per (chat, agent),
|
||||
* `initialize` + `session/new` done ONCE, reused across every turn — the warm
|
||||
* analogue of the previous one-shot `acp-dispatch.ts` (which spawned/torn-down a
|
||||
* fresh `goose acp` / `qwen --acp` per turn). Mirrors Paseo's `SpawnedACPProcess`.
|
||||
*
|
||||
* Implements the Phase 0 `AgentBackend` interface (same contract as
|
||||
* `OpenCodeServerBackend`). Emits transport-agnostic `AgentEvent`s via the SHARED
|
||||
* `mapSessionUpdate` (reused verbatim from the one-shot stack); the dispatcher maps
|
||||
* those to WS frames + `persistExternalAgentTurn`, unchanged.
|
||||
*
|
||||
* Lifecycle decisions (design.md §2b / §10):
|
||||
* - **Child lifetime is the pool's, not a request's.** Spawned once; never tied
|
||||
* to a per-turn abort signal. Only the in-flight `prompt` gets `ctx.signal` —
|
||||
* abort = ACP `session/cancel`, NOT killing the child.
|
||||
* - **Per-turn abort** cancels the prompt on the warm connection so the SAME
|
||||
* process serves the next turn.
|
||||
* - **Crash** (child exit) marks `agent_sessions.status='crashed'` + logs; the
|
||||
* next `ensureSession` re-spawns + re-`session/new` (Phase 3 hardens auto-restart).
|
||||
* - **Resume across a process restart is NOT attempted in Phase 2.** goose ACP
|
||||
* advertises no `loadSession`/`session.resume`; qwen does, but cross-restart
|
||||
* resume is Phase 3. Within ONE live process the ACP session persists across
|
||||
* turns (the whole point of "warm"); a restart re-`session/new` (memory loss
|
||||
* across restart, accepted per §10). The agent's resume capabilities ARE
|
||||
* probed and logged for forward-compat.
|
||||
*
|
||||
* Each WarmAcpBackend instance owns exactly one (chat, agent) — the dispatcher
|
||||
* pools them under `agentPool.register(chatId, agent, backend)`.
|
||||
*
|
||||
* SDK note (@agentclientprotocol/sdk@^0.22.1, cross-checked against the design's
|
||||
* `^0.14` worry): the resume method is the STABLE `resumeSession` (`session/resume`,
|
||||
* gated by `agentCapabilities.sessionCapabilities.resume`), NOT the `^0.14`
|
||||
* `unstable_resumeSession`. `loadSession` is gated by `agentCapabilities.loadSession`.
|
||||
*/
|
||||
import { spawn, type ChildProcess } from 'node:child_process';
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import {
|
||||
ClientSideConnection,
|
||||
type Client,
|
||||
type SessionNotification,
|
||||
type RequestPermissionRequest,
|
||||
type RequestPermissionResponse,
|
||||
type ReadTextFileRequest,
|
||||
type ReadTextFileResponse,
|
||||
type WriteTextFileRequest,
|
||||
type WriteTextFileResponse,
|
||||
type CreateTerminalRequest,
|
||||
type CreateTerminalResponse,
|
||||
type CreateElicitationRequest,
|
||||
type CreateElicitationResponse,
|
||||
} from '@agentclientprotocol/sdk';
|
||||
import type { Sql } from '../../db.js';
|
||||
import { resolveLaunchSpec } from '../acp-spawn.js';
|
||||
import { isTurnOkForStopReason } from './warm-acp-routing.js';
|
||||
import { getResolvedRegistry, type ResolvedProviderDef } from '../provider-config-registry.js';
|
||||
import { createAcpNdJsonStream } from '../acp-stream.js';
|
||||
import { mapSessionUpdate } from '../acp-event-map.js';
|
||||
import { readWorktreeTextFile, writeWorktreeTextFile } from '../acp-client-fs.js';
|
||||
import { waitForPermissionResponse, waitForElicitationResponse, cancelPendingPermission } from '../permission-waiter.js';
|
||||
import { type AcpToolSnapshot, synthesizeCanceledSnapshots } from '../acp-tool-snapshot.js';
|
||||
import type {
|
||||
AgentBackend,
|
||||
AgentEvent,
|
||||
AgentSessionHandle,
|
||||
EnsureSessionOpts,
|
||||
PromptCtx,
|
||||
TurnResult,
|
||||
} from '../agent-backend.js';
|
||||
|
||||
/** State for one in-flight turn (only one at a time per backend — turns serialize). */
|
||||
interface TurnState {
|
||||
/** Per-turn task id, for routing permission prompts back to the UI. */
|
||||
taskId: string | undefined;
|
||||
/** BooCode session id for permission-waiter's broker frames. */
|
||||
sessionId: string;
|
||||
/** Per-turn mode id (autonomous-mode gate in permission-waiter). */
|
||||
modeId: string | undefined;
|
||||
onEvent: (e: AgentEvent) => void;
|
||||
/** Tool-call snapshot accumulator for this turn — merge across tool_call_update. */
|
||||
snapshots: Map<string, AcpToolSnapshot>;
|
||||
}
|
||||
|
||||
export interface WarmAcpBackendDeps {
|
||||
sql: Sql;
|
||||
log: FastifyBaseLogger;
|
||||
/** The (chat, agent) this backend serves — its pool identity + DB key. */
|
||||
chatId: string;
|
||||
agent: string;
|
||||
/** Resolved binary for the agent (from available_agents.install_path), or null. */
|
||||
installPath: string | null;
|
||||
/** Optional override of the resolved registry def (defaults to a live lookup). */
|
||||
resolved?: ResolvedProviderDef;
|
||||
}
|
||||
|
||||
export class WarmAcpBackend implements AgentBackend {
|
||||
readonly backend = 'acp_warm' as const;
|
||||
|
||||
private readonly sql: Sql;
|
||||
private readonly log: FastifyBaseLogger;
|
||||
private readonly chatId: string;
|
||||
private readonly agent: string;
|
||||
private readonly installPath: string | null;
|
||||
private readonly resolvedOverride: ResolvedProviderDef | undefined;
|
||||
|
||||
private child: ChildProcess | null = null;
|
||||
private connection: ClientSideConnection | null = null;
|
||||
/** The single ACP session id for this warm process; null until session/new. */
|
||||
private acpSessionId: string | null = null;
|
||||
private up = false;
|
||||
/** Idempotent spawn guard — one warm process per backend, started lazily. */
|
||||
private starting: Promise<void> | null = null;
|
||||
/** Resume capabilities probed at initialize, logged for forward-compat (Phase 3). */
|
||||
private supportsLoadSession = false;
|
||||
private supportsResumeSession = false;
|
||||
|
||||
/** The current in-flight turn; the Client closures read it. Null between turns. */
|
||||
private activeTurn: TurnState | null = null;
|
||||
|
||||
constructor(deps: WarmAcpBackendDeps) {
|
||||
this.sql = deps.sql;
|
||||
this.log = deps.log;
|
||||
this.chatId = deps.chatId;
|
||||
this.agent = deps.agent;
|
||||
this.installPath = deps.installPath;
|
||||
this.resolvedOverride = deps.resolved;
|
||||
}
|
||||
|
||||
/** §2: liveness for the health endpoint + dispatcher fallback decision. */
|
||||
health(): 'up' | 'down' {
|
||||
return this.up ? 'up' : 'down';
|
||||
}
|
||||
|
||||
// ─── warm-process lifecycle (2.1 spawn + initialize + session/new ONCE) ───────
|
||||
|
||||
/** Lazy: spawn the warm process on first use. Idempotent — one process per backend. */
|
||||
private ensureProcess(worktreePath: string): Promise<void> {
|
||||
if (this.up && this.connection && this.acpSessionId) return Promise.resolve();
|
||||
if (!this.starting) {
|
||||
this.starting = this.startProcess(worktreePath).catch((err) => {
|
||||
// Reset so a later ensureSession can retry the spawn after a failed start.
|
||||
this.starting = null;
|
||||
throw err;
|
||||
});
|
||||
}
|
||||
return this.starting;
|
||||
}
|
||||
|
||||
private async startProcess(worktreePath: string): Promise<void> {
|
||||
const resolved = this.resolvedOverride ?? getResolvedRegistry().get(this.agent);
|
||||
const spec = resolved ? resolveLaunchSpec(resolved, this.installPath) : null;
|
||||
if (!spec) throw new Error(`warm-acp: agent '${this.agent}' does not support ACP (no launch spec)`);
|
||||
|
||||
this.log.info({ agent: this.agent, chatId: this.chatId, binary: spec.binary, worktreePath }, 'warm-acp: spawning warm process');
|
||||
// Child lifetime is the pool's. NOT tied to any per-turn abort signal — only
|
||||
// the in-flight prompt is cancellable (via ACP session/cancel in prompt()).
|
||||
const child = spawn(spec.binary, spec.args, {
|
||||
cwd: worktreePath,
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
env: { ...process.env, ...spec.env },
|
||||
});
|
||||
this.child = child;
|
||||
|
||||
// 2.3: supervise the child; react to its exit, never let a request scope kill it.
|
||||
child.on('exit', (code, signal) => {
|
||||
this.up = false;
|
||||
this.connection = null;
|
||||
this.acpSessionId = null;
|
||||
this.starting = null;
|
||||
this.log.warn({ agent: this.agent, chatId: this.chatId, code, signal }, 'warm-acp: warm process exited — marking crashed (rebuild on next turn)');
|
||||
void this.markCrashed();
|
||||
});
|
||||
// A spawn error (e.g. ENOENT) surfaces here, not as an exit.
|
||||
child.on('error', (err) => {
|
||||
this.up = false;
|
||||
this.log.error({ agent: this.agent, chatId: this.chatId, err: errMsg(err) }, 'warm-acp: warm process error');
|
||||
});
|
||||
|
||||
const stream = createAcpNdJsonStream(child);
|
||||
const connection = new ClientSideConnection(() => this.buildClient(worktreePath), stream);
|
||||
|
||||
const init = await connection.initialize({
|
||||
protocolVersion: 1,
|
||||
clientInfo: { name: 'boocoder', version: '2.6.0' },
|
||||
clientCapabilities: {},
|
||||
});
|
||||
const caps = init.agentCapabilities;
|
||||
this.supportsLoadSession = caps?.loadSession === true;
|
||||
this.supportsResumeSession = caps?.sessionCapabilities?.resume != null;
|
||||
|
||||
const session = await connection.newSession({ cwd: worktreePath, mcpServers: [] });
|
||||
this.connection = connection;
|
||||
this.acpSessionId = session.sessionId;
|
||||
this.up = true;
|
||||
this.log.info(
|
||||
{
|
||||
agent: this.agent,
|
||||
chatId: this.chatId,
|
||||
acpSessionId: session.sessionId,
|
||||
loadSession: this.supportsLoadSession,
|
||||
resumeSession: this.supportsResumeSession,
|
||||
},
|
||||
'warm-acp: warm session ready',
|
||||
);
|
||||
}
|
||||
|
||||
/** Build the ACP Client callbacks ONCE per connection. They read `this.activeTurn`
|
||||
* so each turn's events/permissions route to the right place — exactly the
|
||||
* opencode-server `activeTurn` pattern. Worktree-scoped FS like AcpStreamContext. */
|
||||
private buildClient(worktreePath: string): Client {
|
||||
return {
|
||||
sessionUpdate: async (params: SessionNotification): Promise<void> => {
|
||||
const turn = this.activeTurn;
|
||||
if (!turn) return; // between turns — drop (no orphan settles a future turn)
|
||||
for (const event of mapSessionUpdate(params, turn.snapshots)) {
|
||||
turn.onEvent(event);
|
||||
}
|
||||
},
|
||||
requestPermission: async (params: RequestPermissionRequest): Promise<RequestPermissionResponse> => {
|
||||
const turn = this.activeTurn;
|
||||
if (turn?.taskId) {
|
||||
// Route to the UI via the per-turn task id (same as the one-shot path).
|
||||
return waitForPermissionResponse(turn.taskId, turn.sessionId, this.agent, turn.modeId, params);
|
||||
}
|
||||
const firstOption = params.options[0];
|
||||
if (firstOption) return { outcome: { outcome: 'selected', optionId: firstOption.optionId } };
|
||||
return { outcome: { outcome: 'cancelled' } };
|
||||
},
|
||||
readTextFile: async (params: ReadTextFileRequest): Promise<ReadTextFileResponse> => {
|
||||
const content = await readWorktreeTextFile(worktreePath, params.path, params.line, params.limit);
|
||||
return { content };
|
||||
},
|
||||
writeTextFile: async (params: WriteTextFileRequest): Promise<WriteTextFileResponse> => {
|
||||
await writeWorktreeTextFile(worktreePath, params.path, params.content);
|
||||
return {};
|
||||
},
|
||||
createTerminal: async (_params: CreateTerminalRequest): Promise<CreateTerminalResponse> => {
|
||||
return { terminalId: 'noop' };
|
||||
},
|
||||
unstable_createElicitation: async (params: CreateElicitationRequest): Promise<CreateElicitationResponse> => {
|
||||
const turn = this.activeTurn;
|
||||
if (turn?.taskId) {
|
||||
return waitForElicitationResponse(turn.taskId, turn.sessionId, this.agent, turn.modeId, params);
|
||||
}
|
||||
return { action: 'decline' };
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// ─── ensureSession: create-or-reuse the warm session (2.1) ───────────────────
|
||||
|
||||
async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise<AgentSessionHandle> {
|
||||
await this.ensureProcess(opts.worktreePath);
|
||||
if (!this.acpSessionId) throw new Error('warm-acp: session not ready after ensureProcess');
|
||||
|
||||
// P1.5-b: agent_sessions keys on (chat_id, agent). The ACP session id is the
|
||||
// resume handle WITHIN the live process; across a process restart it's stale,
|
||||
// so ensureProcess re-`session/new` and we upsert the fresh id here.
|
||||
await this.sql`
|
||||
INSERT INTO agent_sessions
|
||||
(chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at)
|
||||
VALUES
|
||||
(${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'acp_warm', ${this.acpSessionId}, NULL, 'active', clock_timestamp())
|
||||
ON CONFLICT (chat_id, agent) DO UPDATE SET
|
||||
session_id = EXCLUDED.session_id,
|
||||
worktree_id = EXCLUDED.worktree_id,
|
||||
backend = 'acp_warm',
|
||||
agent_session_id = EXCLUDED.agent_session_id,
|
||||
server_port = NULL,
|
||||
status = 'active',
|
||||
last_active_at = clock_timestamp()
|
||||
`.catch((err) => {
|
||||
this.log.warn({ err: errMsg(err), chatId: opts.chatId, agent: opts.agent }, 'warm-acp: agent_sessions upsert failed (non-fatal)');
|
||||
});
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
agent: opts.agent,
|
||||
backend: 'acp_warm',
|
||||
chatId: opts.chatId,
|
||||
worktreeId: opts.worktreeId,
|
||||
agentSessionId: this.acpSessionId,
|
||||
serverPort: null,
|
||||
};
|
||||
}
|
||||
|
||||
// ─── prompt: one turn on the warm connection (2.2) ───────────────────────────
|
||||
|
||||
async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise<TurnResult> {
|
||||
// The warm process may have crashed between ensureSession and here, or this
|
||||
// backend was rebuilt — re-establish before prompting.
|
||||
await this.ensureProcess(ctx.worktreePath);
|
||||
const connection = this.connection;
|
||||
const acpSessionId = this.acpSessionId;
|
||||
if (!connection || !acpSessionId) {
|
||||
return { ok: false, error: 'warm-acp: no live ACP connection' };
|
||||
}
|
||||
|
||||
const snapshots = new Map<string, AcpToolSnapshot>();
|
||||
// taskId routes permission/elicitation prompts back to the UI. The dispatcher
|
||||
// passes it (plus mode) on the per-turn PromptCtx; permission-waiter keys on it.
|
||||
const turn: TurnState = {
|
||||
taskId: ctx.taskId,
|
||||
sessionId: handle.sessionId,
|
||||
modeId: ctx.modeId,
|
||||
onEvent: ctx.onEvent,
|
||||
snapshots,
|
||||
};
|
||||
this.activeTurn = turn;
|
||||
|
||||
// Per-turn abort: cancel the in-flight prompt on the SAME connection — never
|
||||
// kill the child (that's the pool's lifetime). On cancel we also synthesize
|
||||
// 'canceled' updates for any still-running tool calls so the UI doesn't leave
|
||||
// them spinning (mirrors AcpStreamContext.markAborted).
|
||||
let aborted = false;
|
||||
const onAbort = () => {
|
||||
if (aborted) return;
|
||||
aborted = true;
|
||||
connection.cancel({ sessionId: acpSessionId }).catch(() => {});
|
||||
if (ctx.taskId) cancelPendingPermission(ctx.taskId);
|
||||
for (const snap of synthesizeCanceledSnapshots(snapshots.values())) {
|
||||
snapshots.set(snap.toolCallId, snap);
|
||||
ctx.onEvent({ type: 'tool_update', toolCall: snap });
|
||||
}
|
||||
};
|
||||
|
||||
if (ctx.signal.aborted) {
|
||||
this.activeTurn = null;
|
||||
return { ok: false, error: 'aborted' };
|
||||
}
|
||||
ctx.signal.addEventListener('abort', onAbort, { once: true });
|
||||
|
||||
try {
|
||||
const result = await connection.prompt({
|
||||
sessionId: acpSessionId,
|
||||
prompt: [{ type: 'text', text: input }],
|
||||
});
|
||||
if (aborted) return { ok: false, error: 'aborted' };
|
||||
const stopReason = result.stopReason ?? 'end_turn';
|
||||
return isTurnOkForStopReason(stopReason)
|
||||
? { ok: true }
|
||||
: { ok: false, error: `stop_reason: ${stopReason}` };
|
||||
} catch (err) {
|
||||
if (aborted) return { ok: false, error: 'aborted' };
|
||||
return { ok: false, error: errMsg(err) };
|
||||
} finally {
|
||||
ctx.signal.removeEventListener('abort', onAbort);
|
||||
this.activeTurn = null;
|
||||
await this.sql`
|
||||
UPDATE agent_sessions SET status = 'idle', last_active_at = clock_timestamp()
|
||||
WHERE chat_id = ${this.chatId} AND agent = ${this.agent}
|
||||
`.catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
// ─── teardown ────────────────────────────────────────────────────────────────
|
||||
|
||||
async closeSession(handle: AgentSessionHandle): Promise<void> {
|
||||
// Gracefully close the ACP session if the agent supports it; then kill the child.
|
||||
if (this.connection && this.acpSessionId) {
|
||||
await this.connection.closeSession({ sessionId: this.acpSessionId }).catch(() => {});
|
||||
}
|
||||
await this.killChild();
|
||||
await this.sql`
|
||||
UPDATE agent_sessions SET status = 'closed'
|
||||
WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent}
|
||||
`.catch(() => {});
|
||||
}
|
||||
|
||||
async dispose(): Promise<void> {
|
||||
this.up = false;
|
||||
this.activeTurn = null;
|
||||
if (this.connection && this.acpSessionId) {
|
||||
await this.connection.closeSession({ sessionId: this.acpSessionId }).catch(() => {});
|
||||
}
|
||||
await this.killChild();
|
||||
this.connection = null;
|
||||
this.acpSessionId = null;
|
||||
this.starting = null;
|
||||
}
|
||||
|
||||
private async killChild(): Promise<void> {
|
||||
const child = this.child;
|
||||
this.child = null;
|
||||
if (!child || child.killed) return;
|
||||
child.kill('SIGTERM');
|
||||
await new Promise<void>((resolve) => {
|
||||
const t = setTimeout(() => {
|
||||
if (!child.killed) child.kill('SIGKILL');
|
||||
resolve();
|
||||
}, 5_000);
|
||||
t.unref?.();
|
||||
child.once('close', () => {
|
||||
clearTimeout(t);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private async markCrashed(): Promise<void> {
|
||||
await this.sql`
|
||||
UPDATE agent_sessions SET status = 'crashed'
|
||||
WHERE chat_id = ${this.chatId} AND agent = ${this.agent}
|
||||
`.catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
function errMsg(e: unknown): string {
|
||||
return e instanceof Error ? e.message : String(e);
|
||||
}
|
||||
Reference in New Issue
Block a user