From 0d3d08f5f27f0401356f843446d17363e1ce66db Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Sun, 31 May 2026 23:57:03 +0000 Subject: [PATCH] =?UTF-8?q?feat(coder):=20v2.6=20Phase=202=20=E2=80=94=20w?= =?UTF-8?q?arm=20ACP=20backend=20for=20goose/qwen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WarmAcpBackend (AgentBackend) holds one persistent goose acp / qwen --acp child + ClientSideConnection + ACP session per (chat,agent); initialize+session/new once, reused across turns. Abort = session/cancel the prompt only (never kills the child); child exit -> agent_sessions.status='crashed' -> re-spawn next turn. Dispatcher routes goose/qwen chat-tab tasks to the pooled warm backend via pure shouldUseWarmBackend (needs session_id+chat_id); one-shot runExternalAgent kept as fallback for arena/MCP/new_task. handleSessionUpdate extracted to a shared pure acp-event-map.ts (one-shot path byte-identical). SDK: installed @agentclientprotocol/sdk@^0.22.1 has stable resumeSession/loadSession; resume moot in the warm hot path, deferred to Phase 3. 15 new tests (warm-acp-routing, acp-event-map); 180 coder tests pass; tsc + build clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../services/__tests__/acp-event-map.test.ts | 110 +++++ apps/coder/src/services/acp-dispatch.ts | 88 ++-- apps/coder/src/services/acp-event-map.ts | 68 +++ apps/coder/src/services/agent-backend.ts | 6 + .../__tests__/warm-acp-routing.test.ts | 59 +++ .../src/services/backends/warm-acp-routing.ts | 41 ++ apps/coder/src/services/backends/warm-acp.ts | 411 ++++++++++++++++++ apps/coder/src/services/dispatcher.ts | 250 ++++++++++- 8 files changed, 978 insertions(+), 55 deletions(-) create mode 100644 apps/coder/src/services/__tests__/acp-event-map.test.ts create mode 100644 apps/coder/src/services/acp-event-map.ts create mode 100644 apps/coder/src/services/backends/__tests__/warm-acp-routing.test.ts create mode 100644 apps/coder/src/services/backends/warm-acp-routing.ts create mode 100644 apps/coder/src/services/backends/warm-acp.ts diff --git a/apps/coder/src/services/__tests__/acp-event-map.test.ts b/apps/coder/src/services/__tests__/acp-event-map.test.ts new file mode 100644 index 0000000..f60ed26 --- /dev/null +++ b/apps/coder/src/services/__tests__/acp-event-map.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect } from 'vitest'; +import type { SessionNotification } from '@agentclientprotocol/sdk'; +import { mapSessionUpdate } from '../acp-event-map.js'; +import type { AcpToolSnapshot } from '../acp-tool-snapshot.js'; + +/** + * Pure event-mapping shared by the one-shot ACP dispatch (AcpStreamContext) and + * the warm ACP backend (Phase 2). Mirrors the original handleSessionUpdate switch + * verbatim but returns normalized AgentEvents instead of publishing broker frames. + */ +describe('mapSessionUpdate (shared ACP event mapping)', () => { + function note(update: SessionNotification['update']): SessionNotification { + return { sessionId: 's1', update }; + } + + it('maps an agent_message_chunk text → a text event', () => { + const events = mapSessionUpdate( + note({ sessionUpdate: 'agent_message_chunk', content: { type: 'text', text: 'hello' } }), + ); + expect(events).toEqual([{ type: 'text', text: 'hello' }]); + }); + + it('maps an agent_thought_chunk text → a reasoning event', () => { + const events = mapSessionUpdate( + note({ sessionUpdate: 'agent_thought_chunk', content: { type: 'text', text: 'thinking' } }), + ); + expect(events).toEqual([{ type: 'reasoning', text: 'thinking' }]); + }); + + it('ignores non-text content on message/thought chunks', () => { + const img = mapSessionUpdate( + note({ + sessionUpdate: 'agent_message_chunk', + content: { type: 'image', data: 'x', mimeType: 'image/png' }, + } as never), + ); + expect(img).toEqual([]); + }); + + it('maps a tool_call → a tool_call event with a merged snapshot', () => { + const events = mapSessionUpdate( + note({ + sessionUpdate: 'tool_call', + toolCallId: 't1', + title: 'read_file', + status: 'pending', + rawInput: { path: 'a.ts' }, + } as never), + ); + expect(events).toHaveLength(1); + expect(events[0]!.type).toBe('tool_call'); + const snap = (events[0] as { type: 'tool_call'; toolCall: AcpToolSnapshot }).toolCall; + expect(snap.toolCallId).toBe('t1'); + expect(snap.title).toBe('read_file'); + expect(snap.status).toBe('pending'); + expect(snap.rawInput).toEqual({ path: 'a.ts' }); + }); + + it('maps a tool_call_update → a tool_update event merged over the prior snapshot', () => { + const prior = new Map([ + ['t1', { toolCallId: 't1', title: 'read_file', status: 'pending', rawInput: { path: 'a.ts' } }], + ]); + const events = mapSessionUpdate( + note({ + sessionUpdate: 'tool_call_update', + toolCallId: 't1', + status: 'completed', + rawOutput: 'file body', + } as never), + prior, + ); + expect(events).toHaveLength(1); + expect(events[0]!.type).toBe('tool_update'); + const snap = (events[0] as { type: 'tool_update'; toolCall: AcpToolSnapshot }).toolCall; + expect(snap.toolCallId).toBe('t1'); + // merged: title carried from prior, status updated, output added, input retained + expect(snap.title).toBe('read_file'); + expect(snap.status).toBe('completed'); + expect(snap.rawOutput).toBe('file body'); + expect(snap.rawInput).toEqual({ path: 'a.ts' }); + }); + + it('maps available_commands_update → a commands event', () => { + const events = mapSessionUpdate( + note({ + sessionUpdate: 'available_commands_update', + availableCommands: [ + { name: 'plan', description: 'make a plan' }, + { name: 'review', description: null }, + ], + } as never), + ); + expect(events).toEqual([ + { + type: 'commands', + commands: [ + { name: 'plan', description: 'make a plan' }, + { name: 'review', description: undefined }, + ], + }, + ]); + }); + + it('returns [] for unhandled update kinds (plan, mode change)', () => { + expect(mapSessionUpdate(note({ sessionUpdate: 'plan', entries: [] } as never))).toEqual([]); + expect( + mapSessionUpdate(note({ sessionUpdate: 'current_mode_update', currentModeId: 'code' } as never)), + ).toEqual([]); + }); +}); diff --git a/apps/coder/src/services/acp-dispatch.ts b/apps/coder/src/services/acp-dispatch.ts index 0e5505b..404c5e2 100644 --- a/apps/coder/src/services/acp-dispatch.ts +++ b/apps/coder/src/services/acp-dispatch.ts @@ -32,9 +32,9 @@ import { createAcpNdJsonStream } from './acp-stream.js'; import { waitForPermissionResponse, waitForElicitationResponse, cancelPendingPermission } from './permission-waiter.js'; import { mergeTaskCommands, getTaskCommands } from './agent-commands-cache.js'; import { readWorktreeTextFile, writeWorktreeTextFile } from './acp-client-fs.js'; +import { mapSessionUpdate } from './acp-event-map.js'; import { type AcpToolSnapshot, - mergeToolSnapshot, snapshotToWireToolCall, synthesizeCanceledSnapshots, } from './acp-tool-snapshot.js'; @@ -159,75 +159,57 @@ class AcpStreamContext { } as WsFrame); } - handleToolUpdate(toolCallId: string, update: Parameters[1]): void { - const previous = this.toolSnapshots.get(toolCallId); - const snapshot = mergeToolSnapshot(toolCallId, update, previous); - this.toolSnapshots.set(toolCallId, snapshot); - this.publishToolSnapshot(snapshot); - } - async handleSessionUpdate(params: SessionNotification): Promise { - const update = params.update; - switch (update.sessionUpdate) { - case 'agent_message_chunk': { - const content = update.content; - if (content.type === 'text' && 'text' in content) { - const text = (content as { text: string }).text; - this.textChunks.push(text); + // v2.6 Phase 2: the case-by-case mapping now lives in the shared, pure + // `mapSessionUpdate` (reused by the warm ACP backend). This method keeps the + // identical broker-publishing side effects — it just translates the normalized + // AgentEvents back into the same frames it always emitted. `this.toolSnapshots` + // is the merge accumulator, so a later tool_call_update merges over its + // tool_call (the prior `handleToolUpdate` behavior, byte-for-byte). + for (const event of mapSessionUpdate(params, this.toolSnapshots)) { + switch (event.type) { + case 'text': + this.textChunks.push(event.text); if (this.canStream()) { this.opts.broker!.publishFrame(this.opts.sessionId!, { type: 'delta', message_id: this.opts.messageId!, chat_id: this.opts.chatId!, - content: text, + content: event.text, } as WsFrame); } - } - break; - } - case 'agent_thought_chunk': { - const content = update.content; - if (content.type === 'text' && 'text' in content) { - const text = (content as { text: string }).text; - this.reasoningChunks.push(text); + break; + case 'reasoning': + this.reasoningChunks.push(event.text); if (this.canStream()) { this.opts.broker!.publishFrame(this.opts.sessionId!, { type: 'reasoning_delta', message_id: this.opts.messageId!, chat_id: this.opts.chatId!, - content: text, + content: event.text, } as WsFrame); } - } - break; - } - case 'tool_call': - this.handleToolUpdate(update.toolCallId, update); - break; - case 'tool_call_update': - this.handleToolUpdate(update.toolCallId, update); - break; - case 'available_commands_update': { - const commands = update.availableCommands.map((cmd) => ({ - name: cmd.name, - description: cmd.description ?? undefined, - })); - if (this.opts.taskId && commands.length > 0) { - mergeTaskCommands(this.opts.taskId, commands); - if (this.canStream() && this.opts.sessionId) { - const all = getTaskCommands(this.opts.taskId) ?? commands; - this.opts.broker!.publishFrame(this.opts.sessionId, { - type: 'agent_commands', - task_id: this.opts.taskId, - session_id: this.opts.sessionId, - commands: all, - } as WsFrame); + break; + case 'tool_call': + case 'tool_update': + // mapSessionUpdate already stored the merged snapshot in this.toolSnapshots. + this.publishToolSnapshot(event.toolCall); + break; + case 'commands': + if (this.opts.taskId && event.commands.length > 0) { + mergeTaskCommands(this.opts.taskId, event.commands); + if (this.canStream() && this.opts.sessionId) { + const all = getTaskCommands(this.opts.taskId) ?? event.commands; + this.opts.broker!.publishFrame(this.opts.sessionId, { + type: 'agent_commands', + task_id: this.opts.taskId, + session_id: this.opts.sessionId, + commands: all, + } as WsFrame); + } } - } - break; + break; } - default: - break; } } diff --git a/apps/coder/src/services/acp-event-map.ts b/apps/coder/src/services/acp-event-map.ts new file mode 100644 index 0000000..aea9f6b --- /dev/null +++ b/apps/coder/src/services/acp-event-map.ts @@ -0,0 +1,68 @@ +/** + * Shared ACP session-update → normalized AgentEvent mapping. + * + * Extracted verbatim (v2.6 Phase 2) from `AcpStreamContext.handleSessionUpdate` + * in `acp-dispatch.ts` so the warm ACP backend (`backends/warm-acp.ts`) and the + * one-shot dispatch share ONE mapping. The one-shot path translates the returned + * events into broker frames itself (preserving its prior behavior byte-for-byte); + * the warm backend forwards them to the dispatcher's `ctx.onEvent` exactly like + * the opencode-server backend does. No I/O, no broker — pure, so it's unit-testable. + * + * Spec: openspec/changes/v2-6-persistent-agent-sessions/design.md §2b. + */ +import type { SessionNotification } from '@agentclientprotocol/sdk'; +import type { AgentEvent } from './agent-backend.js'; +import { type AcpToolSnapshot, mergeToolSnapshot } from './acp-tool-snapshot.js'; + +/** + * Map one ACP `session/update` notification to zero-or-more normalized AgentEvents. + * + * `priorSnapshots` is the caller-owned tool-call snapshot accumulator (toolCallId → + * snapshot). For `tool_call` / `tool_call_update` the merged snapshot is written + * back into it (mutated in place, mirroring `AcpStreamContext.handleToolUpdate`) + * so a later `tool_call_update` merges over the earlier `tool_call`. Pass an empty + * Map for a stateless single call. + * + * Returns an array (never throws) so the caller can splat it onto `onEvent`. + */ +export function mapSessionUpdate( + params: SessionNotification, + priorSnapshots: Map = new Map(), +): AgentEvent[] { + const update = params.update; + switch (update.sessionUpdate) { + case 'agent_message_chunk': { + const content = update.content; + if (content.type === 'text' && 'text' in content) { + return [{ type: 'text', text: (content as { text: string }).text }]; + } + return []; + } + case 'agent_thought_chunk': { + const content = update.content; + if (content.type === 'text' && 'text' in content) { + return [{ type: 'reasoning', text: (content as { text: string }).text }]; + } + return []; + } + case 'tool_call': { + const snapshot = mergeToolSnapshot(update.toolCallId, update, priorSnapshots.get(update.toolCallId)); + priorSnapshots.set(update.toolCallId, snapshot); + return [{ type: 'tool_call', toolCall: snapshot }]; + } + case 'tool_call_update': { + const snapshot = mergeToolSnapshot(update.toolCallId, update, priorSnapshots.get(update.toolCallId)); + priorSnapshots.set(update.toolCallId, snapshot); + return [{ type: 'tool_update', toolCall: snapshot }]; + } + case 'available_commands_update': { + const commands = update.availableCommands.map((cmd) => ({ + name: cmd.name, + description: cmd.description ?? undefined, + })); + return [{ type: 'commands', commands }]; + } + default: + return []; + } +} diff --git a/apps/coder/src/services/agent-backend.ts b/apps/coder/src/services/agent-backend.ts index b0d1d59..ec52aaf 100644 --- a/apps/coder/src/services/agent-backend.ts +++ b/apps/coder/src/services/agent-backend.ts @@ -70,6 +70,12 @@ export interface PromptCtx { model: string; signal: AbortSignal; onEvent: (e: AgentEvent) => void; + /** Phase 2: per-turn task id, so a warm ACP backend can route permission / + * elicitation prompts back to the UI via the permission-waiter. Optional — + * the opencode-server backend (autonomous) ignores it. */ + taskId?: string; + /** Phase 2: per-turn mode id (gates autonomous mode in the permission-waiter). */ + modeId?: string; } /** Result of a completed turn (§2). Diff/persist happen outside the backend. */ diff --git a/apps/coder/src/services/backends/__tests__/warm-acp-routing.test.ts b/apps/coder/src/services/backends/__tests__/warm-acp-routing.test.ts new file mode 100644 index 0000000..40ee963 --- /dev/null +++ b/apps/coder/src/services/backends/__tests__/warm-acp-routing.test.ts @@ -0,0 +1,59 @@ +import { describe, it, expect } from 'vitest'; +import { shouldUseWarmBackend, isTurnOkForStopReason } from '../warm-acp-routing.js'; + +/** + * Phase 2 routing predicate: which goose/qwen tasks go to the warm pool backend + * vs the existing one-shot ACP path. + * + * The warm backend is keyed (chat_id, agent) — the persistent context unit (same + * as opencode-server). A task only routes warm when it carries BOTH a session_id + * and a chat_id, i.e. it originates from a real chat tab (the coder message route + * stamps both). Session-less creators (arena, MCP-created, generic /api/tasks, + * new_task) lack chat_id/session_id and keep the one-shot worktree-per-task path, + * which never spawns a warm process. + */ +describe('shouldUseWarmBackend (Phase 2 routing)', () => { + it('routes a chat-tab task (session_id + chat_id) to the warm backend', () => { + expect(shouldUseWarmBackend({ agent: 'qwen', session_id: 's1', chat_id: 'c1' })).toBe(true); + expect(shouldUseWarmBackend({ agent: 'goose', session_id: 's1', chat_id: 'c1' })).toBe(true); + }); + + it('keeps a session-less arena/MCP task on the one-shot path', () => { + expect(shouldUseWarmBackend({ agent: 'qwen', session_id: null, chat_id: null })).toBe(false); + }); + + it('keeps a task with a session but no chat on the one-shot path', () => { + // chat_id is the warm-key half; without it ensureSession would get a degenerate + // (null, agent) key, so fall back to one-shot rather than synthesize a chat. + expect(shouldUseWarmBackend({ agent: 'goose', session_id: 's1', chat_id: null })).toBe(false); + }); + + it('keeps a task with a chat but no session on the one-shot path', () => { + expect(shouldUseWarmBackend({ agent: 'qwen', session_id: null, chat_id: 'c1' })).toBe(false); + }); + + it('only applies to warm-capable agents (goose, qwen); others never warm here', () => { + // opencode has its own dedicated warm path; native/claude/etc. are not ACP-warm. + expect(shouldUseWarmBackend({ agent: 'opencode', session_id: 's1', chat_id: 'c1' })).toBe(false); + expect(shouldUseWarmBackend({ agent: 'claude', session_id: 's1', chat_id: 'c1' })).toBe(false); + expect(shouldUseWarmBackend({ agent: null, session_id: 's1', chat_id: 'c1' })).toBe(false); + }); +}); + +describe('isTurnOkForStopReason (ACP stop-reason → ok/fail)', () => { + it('treats normal completions as ok', () => { + expect(isTurnOkForStopReason('end_turn')).toBe(true); + expect(isTurnOkForStopReason('max_tokens')).toBe(true); + expect(isTurnOkForStopReason('max_turn_requests')).toBe(true); + }); + + it('treats refusal and cancelled as failures', () => { + expect(isTurnOkForStopReason('refusal')).toBe(false); + expect(isTurnOkForStopReason('cancelled')).toBe(false); + }); + + it('defaults an absent stop reason to a successful end_turn', () => { + expect(isTurnOkForStopReason(undefined)).toBe(true); + expect(isTurnOkForStopReason(null)).toBe(true); + }); +}); diff --git a/apps/coder/src/services/backends/warm-acp-routing.ts b/apps/coder/src/services/backends/warm-acp-routing.ts new file mode 100644 index 0000000..3736468 --- /dev/null +++ b/apps/coder/src/services/backends/warm-acp-routing.ts @@ -0,0 +1,41 @@ +/** + * v2.6 Phase 2 — warm-vs-one-shot routing predicate for goose/qwen. + * + * The warm ACP backend keys its persistent process + ACP session on (chat_id, + * agent) — exactly like the opencode-server backend. A task therefore only routes + * to the warm pool when it carries BOTH a `session_id` and a `chat_id`, i.e. it + * came from a real chat tab (the coder message route + skills route stamp both). + * + * Session-less creators — arena contestants, MCP-created tasks, generic + * `POST /api/tasks`, `new_task` — leave one or both null. Those keep the existing + * one-shot worktree-per-task ACP path (`runExternalAgent`), which spawns a fresh + * `goose acp` / `qwen --acp` per turn and never holds a warm process. Routing them + * warm would either synthesize a degenerate (null, agent) key or create a chat per + * arena contestant — neither is wanted, so they stay one-shot. + * + * Pure, so it's unit-testable; the dispatcher consumes it. + */ +const WARM_CAPABLE_AGENTS = new Set(['goose', 'qwen']); + +export function shouldUseWarmBackend(task: { + agent: string | null; + session_id: string | null; + chat_id: string | null; +}): boolean { + if (!task.agent || !WARM_CAPABLE_AGENTS.has(task.agent)) return false; + return task.session_id != null && task.chat_id != null; +} + +/** + * Map an ACP prompt `stopReason` to the backend's ok/fail contract (TurnResult.ok). + * + * ACP's `StopReason` union includes normal completions (`end_turn`, `max_tokens`, + * `max_turn_requests`) and abnormal ones (`refusal`, `cancelled`). Only the latter + * two read as a failed turn; everything else (including an undefined/absent reason, + * which we default to `end_turn`) is a successful completion. Pure so it's testable + * independently of the warm process. + */ +export function isTurnOkForStopReason(stopReason: string | null | undefined): boolean { + const reason = stopReason ?? 'end_turn'; + return reason !== 'refusal' && reason !== 'cancelled'; +} diff --git a/apps/coder/src/services/backends/warm-acp.ts b/apps/coder/src/services/backends/warm-acp.ts new file mode 100644 index 0000000..02838bd --- /dev/null +++ b/apps/coder/src/services/backends/warm-acp.ts @@ -0,0 +1,411 @@ +/** + * v2.6 Phase 2 — WarmAcpBackend (goose, qwen). + * + * One persistent stdio process + ONE `ClientSideConnection` per (chat, agent), + * `initialize` + `session/new` done ONCE, reused across every turn — the warm + * analogue of the previous one-shot `acp-dispatch.ts` (which spawned/torn-down a + * fresh `goose acp` / `qwen --acp` per turn). Mirrors Paseo's `SpawnedACPProcess`. + * + * Implements the Phase 0 `AgentBackend` interface (same contract as + * `OpenCodeServerBackend`). Emits transport-agnostic `AgentEvent`s via the SHARED + * `mapSessionUpdate` (reused verbatim from the one-shot stack); the dispatcher maps + * those to WS frames + `persistExternalAgentTurn`, unchanged. + * + * Lifecycle decisions (design.md §2b / §10): + * - **Child lifetime is the pool's, not a request's.** Spawned once; never tied + * to a per-turn abort signal. Only the in-flight `prompt` gets `ctx.signal` — + * abort = ACP `session/cancel`, NOT killing the child. + * - **Per-turn abort** cancels the prompt on the warm connection so the SAME + * process serves the next turn. + * - **Crash** (child exit) marks `agent_sessions.status='crashed'` + logs; the + * next `ensureSession` re-spawns + re-`session/new` (Phase 3 hardens auto-restart). + * - **Resume across a process restart is NOT attempted in Phase 2.** goose ACP + * advertises no `loadSession`/`session.resume`; qwen does, but cross-restart + * resume is Phase 3. Within ONE live process the ACP session persists across + * turns (the whole point of "warm"); a restart re-`session/new` (memory loss + * across restart, accepted per §10). The agent's resume capabilities ARE + * probed and logged for forward-compat. + * + * Each WarmAcpBackend instance owns exactly one (chat, agent) — the dispatcher + * pools them under `agentPool.register(chatId, agent, backend)`. + * + * SDK note (@agentclientprotocol/sdk@^0.22.1, cross-checked against the design's + * `^0.14` worry): the resume method is the STABLE `resumeSession` (`session/resume`, + * gated by `agentCapabilities.sessionCapabilities.resume`), NOT the `^0.14` + * `unstable_resumeSession`. `loadSession` is gated by `agentCapabilities.loadSession`. + */ +import { spawn, type ChildProcess } from 'node:child_process'; +import type { FastifyBaseLogger } from 'fastify'; +import { + ClientSideConnection, + type Client, + type SessionNotification, + type RequestPermissionRequest, + type RequestPermissionResponse, + type ReadTextFileRequest, + type ReadTextFileResponse, + type WriteTextFileRequest, + type WriteTextFileResponse, + type CreateTerminalRequest, + type CreateTerminalResponse, + type CreateElicitationRequest, + type CreateElicitationResponse, +} from '@agentclientprotocol/sdk'; +import type { Sql } from '../../db.js'; +import { resolveLaunchSpec } from '../acp-spawn.js'; +import { isTurnOkForStopReason } from './warm-acp-routing.js'; +import { getResolvedRegistry, type ResolvedProviderDef } from '../provider-config-registry.js'; +import { createAcpNdJsonStream } from '../acp-stream.js'; +import { mapSessionUpdate } from '../acp-event-map.js'; +import { readWorktreeTextFile, writeWorktreeTextFile } from '../acp-client-fs.js'; +import { waitForPermissionResponse, waitForElicitationResponse, cancelPendingPermission } from '../permission-waiter.js'; +import { type AcpToolSnapshot, synthesizeCanceledSnapshots } from '../acp-tool-snapshot.js'; +import type { + AgentBackend, + AgentEvent, + AgentSessionHandle, + EnsureSessionOpts, + PromptCtx, + TurnResult, +} from '../agent-backend.js'; + +/** State for one in-flight turn (only one at a time per backend — turns serialize). */ +interface TurnState { + /** Per-turn task id, for routing permission prompts back to the UI. */ + taskId: string | undefined; + /** BooCode session id for permission-waiter's broker frames. */ + sessionId: string; + /** Per-turn mode id (autonomous-mode gate in permission-waiter). */ + modeId: string | undefined; + onEvent: (e: AgentEvent) => void; + /** Tool-call snapshot accumulator for this turn — merge across tool_call_update. */ + snapshots: Map; +} + +export interface WarmAcpBackendDeps { + sql: Sql; + log: FastifyBaseLogger; + /** The (chat, agent) this backend serves — its pool identity + DB key. */ + chatId: string; + agent: string; + /** Resolved binary for the agent (from available_agents.install_path), or null. */ + installPath: string | null; + /** Optional override of the resolved registry def (defaults to a live lookup). */ + resolved?: ResolvedProviderDef; +} + +export class WarmAcpBackend implements AgentBackend { + readonly backend = 'acp_warm' as const; + + private readonly sql: Sql; + private readonly log: FastifyBaseLogger; + private readonly chatId: string; + private readonly agent: string; + private readonly installPath: string | null; + private readonly resolvedOverride: ResolvedProviderDef | undefined; + + private child: ChildProcess | null = null; + private connection: ClientSideConnection | null = null; + /** The single ACP session id for this warm process; null until session/new. */ + private acpSessionId: string | null = null; + private up = false; + /** Idempotent spawn guard — one warm process per backend, started lazily. */ + private starting: Promise | null = null; + /** Resume capabilities probed at initialize, logged for forward-compat (Phase 3). */ + private supportsLoadSession = false; + private supportsResumeSession = false; + + /** The current in-flight turn; the Client closures read it. Null between turns. */ + private activeTurn: TurnState | null = null; + + constructor(deps: WarmAcpBackendDeps) { + this.sql = deps.sql; + this.log = deps.log; + this.chatId = deps.chatId; + this.agent = deps.agent; + this.installPath = deps.installPath; + this.resolvedOverride = deps.resolved; + } + + /** §2: liveness for the health endpoint + dispatcher fallback decision. */ + health(): 'up' | 'down' { + return this.up ? 'up' : 'down'; + } + + // ─── warm-process lifecycle (2.1 spawn + initialize + session/new ONCE) ─────── + + /** Lazy: spawn the warm process on first use. Idempotent — one process per backend. */ + private ensureProcess(worktreePath: string): Promise { + if (this.up && this.connection && this.acpSessionId) return Promise.resolve(); + if (!this.starting) { + this.starting = this.startProcess(worktreePath).catch((err) => { + // Reset so a later ensureSession can retry the spawn after a failed start. + this.starting = null; + throw err; + }); + } + return this.starting; + } + + private async startProcess(worktreePath: string): Promise { + const resolved = this.resolvedOverride ?? getResolvedRegistry().get(this.agent); + const spec = resolved ? resolveLaunchSpec(resolved, this.installPath) : null; + if (!spec) throw new Error(`warm-acp: agent '${this.agent}' does not support ACP (no launch spec)`); + + this.log.info({ agent: this.agent, chatId: this.chatId, binary: spec.binary, worktreePath }, 'warm-acp: spawning warm process'); + // Child lifetime is the pool's. NOT tied to any per-turn abort signal — only + // the in-flight prompt is cancellable (via ACP session/cancel in prompt()). + const child = spawn(spec.binary, spec.args, { + cwd: worktreePath, + stdio: ['pipe', 'pipe', 'pipe'], + env: { ...process.env, ...spec.env }, + }); + this.child = child; + + // 2.3: supervise the child; react to its exit, never let a request scope kill it. + child.on('exit', (code, signal) => { + this.up = false; + this.connection = null; + this.acpSessionId = null; + this.starting = null; + this.log.warn({ agent: this.agent, chatId: this.chatId, code, signal }, 'warm-acp: warm process exited — marking crashed (rebuild on next turn)'); + void this.markCrashed(); + }); + // A spawn error (e.g. ENOENT) surfaces here, not as an exit. + child.on('error', (err) => { + this.up = false; + this.log.error({ agent: this.agent, chatId: this.chatId, err: errMsg(err) }, 'warm-acp: warm process error'); + }); + + const stream = createAcpNdJsonStream(child); + const connection = new ClientSideConnection(() => this.buildClient(worktreePath), stream); + + const init = await connection.initialize({ + protocolVersion: 1, + clientInfo: { name: 'boocoder', version: '2.6.0' }, + clientCapabilities: {}, + }); + const caps = init.agentCapabilities; + this.supportsLoadSession = caps?.loadSession === true; + this.supportsResumeSession = caps?.sessionCapabilities?.resume != null; + + const session = await connection.newSession({ cwd: worktreePath, mcpServers: [] }); + this.connection = connection; + this.acpSessionId = session.sessionId; + this.up = true; + this.log.info( + { + agent: this.agent, + chatId: this.chatId, + acpSessionId: session.sessionId, + loadSession: this.supportsLoadSession, + resumeSession: this.supportsResumeSession, + }, + 'warm-acp: warm session ready', + ); + } + + /** Build the ACP Client callbacks ONCE per connection. They read `this.activeTurn` + * so each turn's events/permissions route to the right place — exactly the + * opencode-server `activeTurn` pattern. Worktree-scoped FS like AcpStreamContext. */ + private buildClient(worktreePath: string): Client { + return { + sessionUpdate: async (params: SessionNotification): Promise => { + const turn = this.activeTurn; + if (!turn) return; // between turns — drop (no orphan settles a future turn) + for (const event of mapSessionUpdate(params, turn.snapshots)) { + turn.onEvent(event); + } + }, + requestPermission: async (params: RequestPermissionRequest): Promise => { + const turn = this.activeTurn; + if (turn?.taskId) { + // Route to the UI via the per-turn task id (same as the one-shot path). + return waitForPermissionResponse(turn.taskId, turn.sessionId, this.agent, turn.modeId, params); + } + const firstOption = params.options[0]; + if (firstOption) return { outcome: { outcome: 'selected', optionId: firstOption.optionId } }; + return { outcome: { outcome: 'cancelled' } }; + }, + readTextFile: async (params: ReadTextFileRequest): Promise => { + const content = await readWorktreeTextFile(worktreePath, params.path, params.line, params.limit); + return { content }; + }, + writeTextFile: async (params: WriteTextFileRequest): Promise => { + await writeWorktreeTextFile(worktreePath, params.path, params.content); + return {}; + }, + createTerminal: async (_params: CreateTerminalRequest): Promise => { + return { terminalId: 'noop' }; + }, + unstable_createElicitation: async (params: CreateElicitationRequest): Promise => { + const turn = this.activeTurn; + if (turn?.taskId) { + return waitForElicitationResponse(turn.taskId, turn.sessionId, this.agent, turn.modeId, params); + } + return { action: 'decline' }; + }, + }; + } + + // ─── ensureSession: create-or-reuse the warm session (2.1) ─────────────────── + + async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise { + await this.ensureProcess(opts.worktreePath); + if (!this.acpSessionId) throw new Error('warm-acp: session not ready after ensureProcess'); + + // P1.5-b: agent_sessions keys on (chat_id, agent). The ACP session id is the + // resume handle WITHIN the live process; across a process restart it's stale, + // so ensureProcess re-`session/new` and we upsert the fresh id here. + await this.sql` + INSERT INTO agent_sessions + (chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at) + VALUES + (${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'acp_warm', ${this.acpSessionId}, NULL, 'active', clock_timestamp()) + ON CONFLICT (chat_id, agent) DO UPDATE SET + session_id = EXCLUDED.session_id, + worktree_id = EXCLUDED.worktree_id, + backend = 'acp_warm', + agent_session_id = EXCLUDED.agent_session_id, + server_port = NULL, + status = 'active', + last_active_at = clock_timestamp() + `.catch((err) => { + this.log.warn({ err: errMsg(err), chatId: opts.chatId, agent: opts.agent }, 'warm-acp: agent_sessions upsert failed (non-fatal)'); + }); + + return { + sessionId, + agent: opts.agent, + backend: 'acp_warm', + chatId: opts.chatId, + worktreeId: opts.worktreeId, + agentSessionId: this.acpSessionId, + serverPort: null, + }; + } + + // ─── prompt: one turn on the warm connection (2.2) ─────────────────────────── + + async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise { + // The warm process may have crashed between ensureSession and here, or this + // backend was rebuilt — re-establish before prompting. + await this.ensureProcess(ctx.worktreePath); + const connection = this.connection; + const acpSessionId = this.acpSessionId; + if (!connection || !acpSessionId) { + return { ok: false, error: 'warm-acp: no live ACP connection' }; + } + + const snapshots = new Map(); + // taskId routes permission/elicitation prompts back to the UI. The dispatcher + // passes it (plus mode) on the per-turn PromptCtx; permission-waiter keys on it. + const turn: TurnState = { + taskId: ctx.taskId, + sessionId: handle.sessionId, + modeId: ctx.modeId, + onEvent: ctx.onEvent, + snapshots, + }; + this.activeTurn = turn; + + // Per-turn abort: cancel the in-flight prompt on the SAME connection — never + // kill the child (that's the pool's lifetime). On cancel we also synthesize + // 'canceled' updates for any still-running tool calls so the UI doesn't leave + // them spinning (mirrors AcpStreamContext.markAborted). + let aborted = false; + const onAbort = () => { + if (aborted) return; + aborted = true; + connection.cancel({ sessionId: acpSessionId }).catch(() => {}); + if (ctx.taskId) cancelPendingPermission(ctx.taskId); + for (const snap of synthesizeCanceledSnapshots(snapshots.values())) { + snapshots.set(snap.toolCallId, snap); + ctx.onEvent({ type: 'tool_update', toolCall: snap }); + } + }; + + if (ctx.signal.aborted) { + this.activeTurn = null; + return { ok: false, error: 'aborted' }; + } + ctx.signal.addEventListener('abort', onAbort, { once: true }); + + try { + const result = await connection.prompt({ + sessionId: acpSessionId, + prompt: [{ type: 'text', text: input }], + }); + if (aborted) return { ok: false, error: 'aborted' }; + const stopReason = result.stopReason ?? 'end_turn'; + return isTurnOkForStopReason(stopReason) + ? { ok: true } + : { ok: false, error: `stop_reason: ${stopReason}` }; + } catch (err) { + if (aborted) return { ok: false, error: 'aborted' }; + return { ok: false, error: errMsg(err) }; + } finally { + ctx.signal.removeEventListener('abort', onAbort); + this.activeTurn = null; + await this.sql` + UPDATE agent_sessions SET status = 'idle', last_active_at = clock_timestamp() + WHERE chat_id = ${this.chatId} AND agent = ${this.agent} + `.catch(() => {}); + } + } + + // ─── teardown ──────────────────────────────────────────────────────────────── + + async closeSession(handle: AgentSessionHandle): Promise { + // Gracefully close the ACP session if the agent supports it; then kill the child. + if (this.connection && this.acpSessionId) { + await this.connection.closeSession({ sessionId: this.acpSessionId }).catch(() => {}); + } + await this.killChild(); + await this.sql` + UPDATE agent_sessions SET status = 'closed' + WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent} + `.catch(() => {}); + } + + async dispose(): Promise { + this.up = false; + this.activeTurn = null; + if (this.connection && this.acpSessionId) { + await this.connection.closeSession({ sessionId: this.acpSessionId }).catch(() => {}); + } + await this.killChild(); + this.connection = null; + this.acpSessionId = null; + this.starting = null; + } + + private async killChild(): Promise { + const child = this.child; + this.child = null; + if (!child || child.killed) return; + child.kill('SIGTERM'); + await new Promise((resolve) => { + const t = setTimeout(() => { + if (!child.killed) child.kill('SIGKILL'); + resolve(); + }, 5_000); + t.unref?.(); + child.once('close', () => { + clearTimeout(t); + resolve(); + }); + }); + } + + private async markCrashed(): Promise { + await this.sql` + UPDATE agent_sessions SET status = 'crashed' + WHERE chat_id = ${this.chatId} AND agent = ${this.agent} + `.catch(() => {}); + } +} + +function errMsg(e: unknown): string { + return e instanceof Error ? e.message : String(e); +} diff --git a/apps/coder/src/services/dispatcher.ts b/apps/coder/src/services/dispatcher.ts index 465969d..fc62d6d 100644 --- a/apps/coder/src/services/dispatcher.ts +++ b/apps/coder/src/services/dispatcher.ts @@ -14,6 +14,8 @@ import { persistExternalAgentTurn } from './agent-turn-persist.js'; import { snapshotToWireToolCall, type AcpToolSnapshot } from './acp-tool-snapshot.js'; import { agentPool } from './agent-pool.js'; import { OpenCodeServerBackend } from './backends/opencode-server.js'; +import { WarmAcpBackend } from './backends/warm-acp.js'; +import { shouldUseWarmBackend } from './backends/warm-acp-routing.js'; import type { AgentBackend, AgentEvent } from './agent-backend.js'; interface InferenceRunner { @@ -121,10 +123,15 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise { + const taskId = task.id; + const agent = task.agent!; + // shouldUseWarmBackend guarantees both non-null before we get here. + const sessionId = task.session_id!; + const chatId = task.chat_id!; + log.info({ taskId, agent, chatId }, 'dispatcher: starting task (path B — warm ACP)'); + + const [project] = await sql<{ path: string | null }[]>` + SELECT path FROM projects WHERE id = ${task.project_id} + `; + const projectPath = project?.path; + if (!projectPath) { + await sql` + UPDATE tasks + SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' + WHERE id = ${taskId} + `; + return; + } + + const ac = new AbortController(); + + try { + await sql` + UPDATE tasks + SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp' + WHERE id = ${taskId} + `; + + // Persistent, session-keyed worktree (shared across turns + agents; NOT torn + // down per turn — Phase 3 reaps it). Same as the opencode-server path so a + // chat that switches opencode↔goose↔qwen shares one worktree. + const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, { + signal: ac.signal, + }); + log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready (warm ACP)'); + + const [assistantMsg] = await sql<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) + RETURNING id + `; + const assistantId = assistantMsg!.id; + + broker.publishFrame(sessionId, { + type: 'message_started', + message_id: assistantId, + chat_id: chatId, + role: 'assistant', + } as WsFrame); + + const manifestCommands = getManifestCommands(agent); + if (manifestCommands.length > 0) { + setTaskCommands(taskId, manifestCommands); + broker.publishFrame(sessionId, { + type: 'agent_commands', + task_id: taskId, + session_id: sessionId, + commands: manifestCommands, + } as WsFrame); + } + + // Accumulate the turn's stream for persistence + the final message content. + const textChunks: string[] = []; + const reasoningChunks: string[] = []; + const toolSnaps = new Map(); + + // Map transport-agnostic AgentEvents → the SAME WS frames the one-shot ACP + // path emits (identical to runOpenCodeServerTask's onEvent). No dcp stripping: + // that's an opencode-plugin artifact; goose/qwen don't emit dcp tags. + const onEvent = (e: AgentEvent): void => { + switch (e.type) { + case 'text': + textChunks.push(e.text); + broker.publishFrame(sessionId, { + type: 'delta', + message_id: assistantId, + chat_id: chatId, + content: e.text, + } as WsFrame); + break; + case 'reasoning': + reasoningChunks.push(e.text); + broker.publishFrame(sessionId, { + type: 'reasoning_delta', + message_id: assistantId, + chat_id: chatId, + content: e.text, + } as WsFrame); + break; + case 'tool_call': + case 'tool_update': + toolSnaps.set(e.toolCall.toolCallId, e.toolCall); + broker.publishFrame(sessionId, { + type: 'tool_call', + message_id: assistantId, + chat_id: chatId, + tool_call: snapshotToWireToolCall(e.toolCall), + } as WsFrame); + break; + case 'commands': + if (e.commands.length > 0) { + setTaskCommands(taskId, e.commands); + broker.publishFrame(sessionId, { + type: 'agent_commands', + task_id: taskId, + session_id: sessionId, + commands: e.commands, + } as WsFrame); + } + break; + } + }; + + const model = task.model ?? undefined; + const backend = getWarmAcpBackend(chatId, agent, installPath); + const handle = await backend.ensureSession(sessionId, { + agent, + model: model ?? '', + chatId, + worktreePath, + worktreeId, + projectId: task.project_id, + }); + const result = await backend.prompt(handle, task.input, { + worktreePath, + model: model ?? '', + signal: ac.signal, + onEvent, + taskId, + modeId: task.mode_id ?? undefined, + }); + + const assistantContent = textChunks.join('').slice(0, 50_000); + const reasoningText = reasoningChunks.join('').slice(0, 200_000); + const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'warm ACP turn failed').slice(0, 500); + + await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText); + + await sql` + UPDATE messages + SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() + WHERE id = ${assistantId} + `; + broker.publishFrame(sessionId, { + type: 'message_complete', + message_id: assistantId, + chat_id: chatId, + } as WsFrame); + + if (stopping) { + await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`; + return; // worktree persists (no cleanup); backend stays warm + } + + // Diff the persistent worktree against its captured baseline and SUPERSEDE + // the session's prior pending row (latest-wins) — identical to opencode. + const diff = await diffWorktree(worktreePath, projectPath, { + signal: ac.signal, + baseRef: baseCommit ?? 'HEAD', + }); + if (diff) { + await sql` + DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending' + `; + await sql` + INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent) + VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent}) + `; + log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change (warm ACP)'); + } else { + log.info({ taskId }, 'dispatcher: no changes detected in session worktree (warm ACP)'); + } + + // NO worktree cleanup — persistent (Phase 3 reaps it). Backend stays warm. + + const [extCostRow] = await sql<{ total: number | null }[]>` + SELECT SUM(tokens_used)::int AS total + FROM messages + WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL + `; + const extCostTokens = extCostRow?.total ?? null; + + const finalState = result.ok ? 'completed' : 'failed'; + await sql` + UPDATE tasks + SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} + WHERE id = ${taskId} + `; + log.info({ taskId, agent, finalState }, 'dispatcher: task finished (warm ACP)'); + clearTaskCommands(taskId); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + log.error({ taskId, agent, err: errMsg }, 'dispatcher: warm ACP error'); + await sql` + UPDATE tasks + SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} + WHERE id = ${taskId} + `.catch(() => {}); + clearTaskCommands(taskId); + // No worktree cleanup (persistent); backend stays warm for the next turn. + } + } + // ─── Helpers ──────────────────────────────────────────────────────────────── async function waitForCompletion(assistantId: string): Promise {