From f3a0197d6a4046d759df1190ec72c3b4ab41662a Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Mon, 1 Jun 2026 13:37:57 +0000 Subject: [PATCH] feat: Claude Agent SDK backend + clean-room PostgresSessionStore (v2.7.5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the lean-SDK direction (boocode_code_review_v2 §1 #9) behind a flag. Adds @anthropic-ai/claude-agent-sdk@0.3.159 (Commercial Terms, runtime dep). - PostgresSessionStore: clean-room impl of the SDK's real SessionStore type over a new claude_session_entries table. Typechecks against the SDK type; 8 DB-integration tests. - ClaudeSdkBackend (implements AgentBackend): one warm query() per (chat,claude) in streaming-input mode via a pushable async-iterable pump, sessionStore + resume continuity, pure mapSdkMessage->AgentEvent, session_id from init, usage/cost onto agent_sessions (backend CHECK gains 'claude_sdk'). - Routing env-gated by CLAUDE_SDK_BACKEND (default off) -> PTY path UNCHANGED. - Built against real SDK 0.3.159 types (install paid off: partial=stream_event needing includePartialMessages, MessageParam, result error arm). - Fix latent test-infra deadlock: serialize DB suites (fileParallelism:false). Coder 269 passing default / 290 with DB; tsc clean vs SDK types; builds clean. LIVE pump + resume + actual claude turn need a host smoke (CLAUDE_SDK_BACKEND=1 + claude binary + auth). zod peer-dep wants ^4 (workspace 3.25). Builds on v2.7.4. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 4 + apps/coder/package.json | 3 +- apps/coder/src/schema.sql | 28 ++ apps/coder/src/services/agent-backend.ts | 2 +- .../backends/__tests__/claude-sdk-map.test.ts | 181 +++++++++ .../__tests__/claude-sdk-routing.test.ts | 49 +++ .../__tests__/claude-session-store.test.ts | 135 +++++++ .../__tests__/pushable-iterable.test.ts | 96 +++++ .../src/services/backends/claude-sdk-map.ts | 192 +++++++++ .../services/backends/claude-sdk-routing.ts | 38 ++ .../coder/src/services/backends/claude-sdk.ts | 364 ++++++++++++++++++ .../services/backends/claude-session-store.ts | 117 ++++++ .../services/backends/pushable-iterable.ts | 96 +++++ apps/coder/src/services/dispatcher.ts | 249 ++++++++++++ apps/coder/src/services/provider-registry.ts | 6 + apps/coder/vitest.config.ts | 6 + .../claude-sdk-sessionstore/proposal.md | 68 ++++ pnpm-lock.yaml | 144 +++++++ 18 files changed, 1776 insertions(+), 2 deletions(-) create mode 100644 apps/coder/src/services/backends/__tests__/claude-sdk-map.test.ts create mode 100644 apps/coder/src/services/backends/__tests__/claude-sdk-routing.test.ts create mode 100644 apps/coder/src/services/backends/__tests__/claude-session-store.test.ts create mode 100644 apps/coder/src/services/backends/__tests__/pushable-iterable.test.ts create mode 100644 apps/coder/src/services/backends/claude-sdk-map.ts create mode 100644 apps/coder/src/services/backends/claude-sdk-routing.ts create mode 100644 apps/coder/src/services/backends/claude-sdk.ts create mode 100644 apps/coder/src/services/backends/claude-session-store.ts create mode 100644 apps/coder/src/services/backends/pushable-iterable.ts create mode 100644 openspec/changes/claude-sdk-sessionstore/proposal.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f5beda..f285b35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes per release tag. Most recent on top, ordered by tag creation date (which matches the git history). Tag names follow `vMAJOR.MINOR.PATCH-slug` — the slug describes what shipped, so the tag name alone is enough to recall the batch. +## v2.7.5-claude-sdk-sessionstore — 2026-06-01 + +Lands the Claude Agent SDK direction (`boocode_code_review_v2.md` §1 #9, §6.2 "lean SDK") behind a flag. Adds `@anthropic-ai/claude-agent-sdk@0.3.159` (Commercial Terms — runtime dep, code reference-only) and builds a warm, resumable claude backend to supersede one-shot PTY dispatch — env-gated (`CLAUDE_SDK_BACKEND`, default off) so production claude stays on the unchanged PTY path until a host smoke. **Clean-room `PostgresSessionStore`** implements the SDK's real `SessionStore` type (`append`/`load`/`listSessions`/`delete`/`listSubkeys`) over a new `claude_session_entries` table — typechecked against the installed SDK type, 8 DB-integration tests. **`ClaudeSdkBackend`** (`implements AgentBackend`, mirroring warm-acp/opencode-server) drives one persistent `query()` per `(chat,'claude')` in streaming-input mode via a pushable async-iterable pump, with `sessionStore` + `resume` for cross-turn/cross-restart continuity, a pure `mapSdkMessage`→`AgentEvent` mapper, `session_id` captured from the `init` message, and `result.usage`/`total_cost_usd` accumulated onto `agent_sessions` (backend CHECK gains `'claude_sdk'`). Built against the REAL SDK 0.3.159 types after installing it — surfacing shapes a blind build would have missed (`SDKPartialAssistantMessage` is `type:'stream_event'` needing `includePartialMessages`; `SDKUserMessage.message` is `MessageParam`; the `SDKResultMessage` error arm). Also fixes a latent test-infra deadlock — three DB-integration suites applying the full schema in parallel under `DATABASE_URL` deadlocked, now serialized via `fileParallelism:false`. ~32 new tests (8 store + 10 mapper + 8 pushable + 6 routing); coder suite 269 passing default / 290 with DB; tsc clean against the SDK types; builds clean. **The live streaming pump + resume + an actual claude turn need a host smoke (`CLAUDE_SDK_BACKEND=1` + claude binary + ANTHROPIC auth) — cannot run from the dev container.** The zod peer-dep wants `^4` (workspace `3.25`) — watch at runtime. Builds on `v2.7.4-mistake-tracker-ledger`; openspec `claude-sdk-sessionstore`. + ## v2.7.4-mistake-tracker-ledger — 2026-06-01 Two native-inference hardening features from `boocode_code_review_v2.md` §1 #12 (cline, algorithm-reimplemented). **MistakeTracker:** complements the doom-loop guard (identical repeats) and cap-hit (budget) by catching a run of consecutive tool *failures*. A new pure `mistake-tracker.ts` tracks heterogeneous failure kinds (`zod_reject`/`tool_not_found`/`exec_error`/`api_error`/`permission_denied`, surfaced per tool from `tool-phase.ts`); after 3 consecutive failures the `turn.ts` loop does a **soft nudge** — injects model-facing recovery guidance into the next step + drops a `mistake_recovery` UI sentinel + resets — then **escalates** to stopping the turn (cap-hit-style, with a Continue affordance) if it re-trips without an intervening success, so heterogeneous failures can't burn the whole step budget. **File-provenance ledger:** `compaction.ts` now derives a deterministic, sorted `## Files Read` list from the head messages' read-tool calls (`view_file`/`grep`/`find_files`/`list_dir`) and injects it into the rolling-summary prompt so file provenance survives compaction (no new table; prompt-driven merge, read-only since BooChat has no write tools). The `mistake_recovery` sentinel adds an arm to `MessageMetadata` in both server + web type copies plus a `MessageBubble` render branch. Built by two parallel agents (backend + frontend sentinel) over disjoint apps; server 545 tests passing (23 new: 12 mistake-tracker + 11 compaction), build + web tsc clean. Native-inference only (external agents run their own loops). Builds on `v2.7.3-sampling-streamjson-tokens`; openspec `mistake-tracker-file-ledger`. diff --git a/apps/coder/package.json b/apps/coder/package.json index d87e093..bfc65ab 100644 --- a/apps/coder/package.json +++ b/apps/coder/package.json @@ -14,11 +14,12 @@ }, "dependencies": { "@agentclientprotocol/sdk": "^0.22.1", + "@anthropic-ai/claude-agent-sdk": "^0.3.159", "@boocode/server": "workspace:*", "@fastify/static": "^7.0.4", - "@opencode-ai/sdk": "~1.15.0", "@fastify/websocket": "^10.0.1", "@modelcontextprotocol/sdk": "^1.29.0", + "@opencode-ai/sdk": "~1.15.0", "fastify": "^4.28.1", "postgres": "^3.4.4", "ws": "^8.18.0", diff --git a/apps/coder/src/schema.sql b/apps/coder/src/schema.sql index b546126..bde4061 100644 --- a/apps/coder/src/schema.sql +++ b/apps/coder/src/schema.sql @@ -261,6 +261,34 @@ CREATE TABLE IF NOT EXISTS checkpoints ( ); CREATE INDEX IF NOT EXISTS checkpoints_chat_created_idx ON checkpoints(chat_id, created_at); +-- claude-sdk-sessionstore #9 (Part 1): append-only mirror of Claude Agent SDK +-- session transcripts. The SDK's SessionStore adapter writes one JSONL line per +-- entry; PostgresSessionStore (services/backends/claude-session-store.ts) inserts +-- one row per entry and replays them ORDER BY id on resume. The store is generic +-- per the SDK's SessionKey (project_key, session_id, subpath) — chat↔session +-- ownership lives in agent_sessions, not here. subpath '' is the main transcript +-- (the SDK's undefined subpath maps to '' in the column). +CREATE TABLE IF NOT EXISTS claude_session_entries ( + id BIGSERIAL PRIMARY KEY, + project_key TEXT NOT NULL, + session_id TEXT NOT NULL, + subpath TEXT NOT NULL DEFAULT '', -- '' = main transcript (SDK's undefined subpath maps here) + entry JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp() +); +CREATE INDEX IF NOT EXISTS claude_session_entries_key_idx ON claude_session_entries (project_key, session_id, subpath, id); + +-- claude-sdk-sessionstore #9 (Part 2): the warm Claude-SDK backend persists its +-- agent_sessions rows with backend='claude_sdk'. Widen the named CHECK to accept +-- it. Idempotent: DROP the named constraint (the inline CREATE TABLE check above +-- carries this explicit name, so DROP IF EXISTS targets it) + re-ADD the widened +-- list. Re-runs/fresh deploys land on the same final constraint (the table-level +-- CREATE already includes only the old two values on a fresh DB; this block then +-- replaces it with the three-value list). +ALTER TABLE agent_sessions DROP CONSTRAINT IF EXISTS agent_sessions_backend_chk; +ALTER TABLE agent_sessions ADD CONSTRAINT agent_sessions_backend_chk + CHECK (backend IN ('opencode_server', 'acp_warm', 'claude_sdk')); + -- LISTEN/NOTIFY fast path: every tasks INSERT (from any call site — routes, -- new_task tool, arena, MCP server) fires pg_notify('tasks_new') in the same -- transaction, so the dispatcher reacts immediately instead of waiting for the diff --git a/apps/coder/src/services/agent-backend.ts b/apps/coder/src/services/agent-backend.ts index 3fd8529..efd3628 100644 --- a/apps/coder/src/services/agent-backend.ts +++ b/apps/coder/src/services/agent-backend.ts @@ -13,7 +13,7 @@ import type { AcpToolSnapshot } from './acp-tool-snapshot.js'; import type { AgentCommand } from './provider-types.js'; /** Backend transport kind. Mirrors `agent_sessions.backend` CHECK in schema.sql. */ -export type AgentBackendKind = 'opencode_server' | 'acp_warm'; +export type AgentBackendKind = 'opencode_server' | 'acp_warm' | 'claude_sdk'; /** * Normalized, transport-agnostic events a backend emits during a turn (§2). diff --git a/apps/coder/src/services/backends/__tests__/claude-sdk-map.test.ts b/apps/coder/src/services/backends/__tests__/claude-sdk-map.test.ts new file mode 100644 index 0000000..329316f --- /dev/null +++ b/apps/coder/src/services/backends/__tests__/claude-sdk-map.test.ts @@ -0,0 +1,181 @@ +import { describe, it, expect } from 'vitest'; +import type { SDKMessage } from '@anthropic-ai/claude-agent-sdk'; +import { mapSdkMessage, createClaudeSdkMapState } from '../claude-sdk-map.js'; +import type { AgentEvent } from '../../agent-backend.js'; + +/** + * Pure mapper for Claude-SDK messages → AgentEvents (claude-sdk-sessionstore #9 Part 2). + * Verifies the partial-stream → live-delta mapping, tool assembly across blocks, and + * the final-assistant dedup, with no live `claude` binary involved. + * + * Messages are cast through `unknown` to `SDKMessage`: the real SDK shapes carry many + * fields (uuid, parent_tool_use_id, …) irrelevant to the mapper, which reads only the + * `type`/`event`/`message.content` it discriminates on. The cast keeps the fixtures + * minimal while the production code path sees the full real types (the backend's + * typecheck against the real SDK is the type-safety proof). + */ +function msg(m: unknown): SDKMessage { + return m as SDKMessage; +} + +/** A partial-stream message wrapping one BetaRawMessageStreamEvent. */ +function streamEvent(event: unknown): SDKMessage { + return msg({ type: 'stream_event', event, parent_tool_use_id: null, uuid: 'u', session_id: 's' }); +} + +describe('mapSdkMessage — partial stream deltas', () => { + it('maps a text_delta to a text event', () => { + const state = createClaudeSdkMapState(); + const out = mapSdkMessage( + streamEvent({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Hello' } }), + state, + ); + expect(out).toEqual([{ type: 'text', text: 'Hello' }]); + }); + + it('maps a thinking_delta to a reasoning event', () => { + const state = createClaudeSdkMapState(); + const out = mapSdkMessage( + streamEvent({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'pondering', estimated_tokens: null }, + }), + state, + ); + expect(out).toEqual([{ type: 'reasoning', text: 'pondering' }]); + }); + + it('drops empty text/thinking deltas', () => { + const state = createClaudeSdkMapState(); + expect( + mapSdkMessage(streamEvent({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: '' } }), state), + ).toEqual([]); + expect( + mapSdkMessage( + streamEvent({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: '', estimated_tokens: null } }), + state, + ), + ).toEqual([]); + }); + + it('ignores message framing + signature/citation deltas', () => { + const state = createClaudeSdkMapState(); + expect(mapSdkMessage(streamEvent({ type: 'message_start', message: {} }), state)).toEqual([]); + expect(mapSdkMessage(streamEvent({ type: 'message_stop' }), state)).toEqual([]); + expect( + mapSdkMessage(streamEvent({ type: 'content_block_delta', index: 0, delta: { type: 'signature_delta', signature: 'x' } }), state), + ).toEqual([]); + }); +}); + +describe('mapSdkMessage — tool assembly across blocks', () => { + it('opens a tool_call on content_block_start, buffers input_json_delta, emits tool_update with parsed input on stop', () => { + const state = createClaudeSdkMapState(); + + const started = mapSdkMessage( + streamEvent({ + type: 'content_block_start', + index: 1, + content_block: { type: 'tool_use', id: 'tool-1', name: 'view_file', input: {} }, + }), + state, + ); + expect(started).toEqual([ + { type: 'tool_call', toolCall: { toolCallId: 'tool-1', title: 'view_file', kind: null, status: 'in_progress', rawInput: {}, rawOutput: undefined } }, + ]); + + // args stream in fragments under the same block index + expect( + mapSdkMessage(streamEvent({ type: 'content_block_delta', index: 1, delta: { type: 'input_json_delta', partial_json: '{"path":' } }), state), + ).toEqual([]); + expect( + mapSdkMessage(streamEvent({ type: 'content_block_delta', index: 1, delta: { type: 'input_json_delta', partial_json: '"a.ts"}' } }), state), + ).toEqual([]); + + const stopped = mapSdkMessage(streamEvent({ type: 'content_block_stop', index: 1 }), state); + expect(stopped).toHaveLength(1); + const ev = stopped[0]!; + expect(ev.type).toBe('tool_update'); + if (ev.type === 'tool_update') { + expect(ev.toolCall.toolCallId).toBe('tool-1'); + expect(ev.toolCall.title).toBe('view_file'); + expect(ev.toolCall.rawInput).toEqual({ path: 'a.ts' }); + } + }); + + it('content_block_stop for a non-tool block (no tracked index) emits nothing', () => { + const state = createClaudeSdkMapState(); + // text block was streamed at index 0 but never tracked as a tool + mapSdkMessage(streamEvent({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'hi' } }), state); + expect(mapSdkMessage(streamEvent({ type: 'content_block_stop', index: 0 }), state)).toEqual([]); + }); + + it('falls back to the prior input when the buffered tool JSON is invalid', () => { + const state = createClaudeSdkMapState(); + mapSdkMessage( + streamEvent({ type: 'content_block_start', index: 2, content_block: { type: 'tool_use', id: 't2', name: 'grep', input: { q: 'seed' } } }), + state, + ); + mapSdkMessage(streamEvent({ type: 'content_block_delta', index: 2, delta: { type: 'input_json_delta', partial_json: '{not json' } }), state); + const stopped = mapSdkMessage(streamEvent({ type: 'content_block_stop', index: 2 }), state); + const ev = stopped[0]!; + if (ev.type === 'tool_update') { + expect(ev.toolCall.rawInput).toEqual({ q: 'seed' }); + } else { + throw new Error('expected tool_update'); + } + }); +}); + +describe('mapSdkMessage — final assistant message', () => { + function assistant(content: unknown[]): SDKMessage { + return msg({ type: 'assistant', message: { content }, parent_tool_use_id: null, uuid: 'u', session_id: 's' }); + } + + it('dedups text/thinking (already streamed) and emits a completed tool_update per tool_use block', () => { + const state = createClaudeSdkMapState(); + const out = mapSdkMessage( + assistant([ + { type: 'text', text: 'final answer', citations: null }, + { type: 'thinking', thinking: 'reasoned', signature: 'sig' }, + { type: 'tool_use', id: 'tool-9', name: 'find_files', input: { glob: '**/*.ts' } }, + ]), + state, + ); + expect(out).toEqual([ + { + type: 'tool_update', + toolCall: { toolCallId: 'tool-9', title: 'find_files', kind: null, status: 'completed', rawInput: { glob: '**/*.ts' }, rawOutput: undefined }, + }, + ]); + }); + + it('preserves a title from a prior partial tool_call snapshot', () => { + const state = createClaudeSdkMapState(); + mapSdkMessage( + streamEvent({ type: 'content_block_start', index: 0, content_block: { type: 'tool_use', id: 'tool-x', name: 'view_file', input: {} } }), + state, + ); + const out = mapSdkMessage(assistant([{ type: 'tool_use', id: 'tool-x', name: 'view_file', input: { path: 'z' } }]), state); + const ev = out[0]!; + if (ev.type === 'tool_update') { + expect(ev.toolCall.status).toBe('completed'); + expect(ev.toolCall.title).toBe('view_file'); + expect(ev.toolCall.rawInput).toEqual({ path: 'z' }); + } else { + throw new Error('expected tool_update'); + } + }); +}); + +describe('mapSdkMessage — non-content messages', () => { + it('returns [] for system/init, status, result, and other variants', () => { + const state = createClaudeSdkMapState(); + expect(mapSdkMessage(msg({ type: 'system', subtype: 'init', session_id: 's', uuid: 'u' }), state)).toEqual([]); + expect(mapSdkMessage(msg({ type: 'system', subtype: 'status', status: null, session_id: 's', uuid: 'u' }), state)).toEqual([]); + expect( + mapSdkMessage(msg({ type: 'result', subtype: 'success', result: 'done', session_id: 's', uuid: 'u' }), state), + ).toEqual([]); + }); +}); diff --git a/apps/coder/src/services/backends/__tests__/claude-sdk-routing.test.ts b/apps/coder/src/services/backends/__tests__/claude-sdk-routing.test.ts new file mode 100644 index 0000000..d215608 --- /dev/null +++ b/apps/coder/src/services/backends/__tests__/claude-sdk-routing.test.ts @@ -0,0 +1,49 @@ +import { describe, it, expect } from 'vitest'; +import { shouldUseClaudeSdk, claudeSdkBackendEnabled } from '../claude-sdk-routing.js'; + +/** + * Env-flagged routing for the warm Claude-SDK backend. With CLAUDE_SDK_BACKEND off + * (the production default) every claude task falls through to the unchanged PTY path; + * with it on, only chat-tab claude tasks (session_id + chat_id) route to the SDK. + */ +const ON = { CLAUDE_SDK_BACKEND: '1' } as NodeJS.ProcessEnv; +const OFF = {} as NodeJS.ProcessEnv; + +describe('claudeSdkBackendEnabled', () => { + it('is false when unset or falsy', () => { + expect(claudeSdkBackendEnabled({} as NodeJS.ProcessEnv)).toBe(false); + expect(claudeSdkBackendEnabled({ CLAUDE_SDK_BACKEND: '' } as NodeJS.ProcessEnv)).toBe(false); + expect(claudeSdkBackendEnabled({ CLAUDE_SDK_BACKEND: '0' } as NodeJS.ProcessEnv)).toBe(false); + expect(claudeSdkBackendEnabled({ CLAUDE_SDK_BACKEND: 'false' } as NodeJS.ProcessEnv)).toBe(false); + expect(claudeSdkBackendEnabled({ CLAUDE_SDK_BACKEND: 'off' } as NodeJS.ProcessEnv)).toBe(false); + expect(claudeSdkBackendEnabled({ CLAUDE_SDK_BACKEND: 'no' } as NodeJS.ProcessEnv)).toBe(false); + }); + + it('is true for any other truthy value', () => { + expect(claudeSdkBackendEnabled({ CLAUDE_SDK_BACKEND: '1' } as NodeJS.ProcessEnv)).toBe(true); + expect(claudeSdkBackendEnabled({ CLAUDE_SDK_BACKEND: 'true' } as NodeJS.ProcessEnv)).toBe(true); + expect(claudeSdkBackendEnabled({ CLAUDE_SDK_BACKEND: 'on' } as NodeJS.ProcessEnv)).toBe(true); + }); +}); + +describe('shouldUseClaudeSdk', () => { + it('is always false while the env flag is off — production claude stays on PTY', () => { + expect(shouldUseClaudeSdk({ agent: 'claude', session_id: 's1', chat_id: 'c1' }, OFF)).toBe(false); + }); + + it('routes a chat-tab claude task to the SDK when the flag is on', () => { + expect(shouldUseClaudeSdk({ agent: 'claude', session_id: 's1', chat_id: 'c1' }, ON)).toBe(true); + }); + + it('only applies to the claude agent', () => { + expect(shouldUseClaudeSdk({ agent: 'qwen', session_id: 's1', chat_id: 'c1' }, ON)).toBe(false); + expect(shouldUseClaudeSdk({ agent: 'opencode', session_id: 's1', chat_id: 'c1' }, ON)).toBe(false); + expect(shouldUseClaudeSdk({ agent: null, session_id: 's1', chat_id: 'c1' }, ON)).toBe(false); + }); + + it('requires both session_id and chat_id (session-less creators stay one-shot)', () => { + expect(shouldUseClaudeSdk({ agent: 'claude', session_id: null, chat_id: null }, ON)).toBe(false); + expect(shouldUseClaudeSdk({ agent: 'claude', session_id: 's1', chat_id: null }, ON)).toBe(false); + expect(shouldUseClaudeSdk({ agent: 'claude', session_id: null, chat_id: 'c1' }, ON)).toBe(false); + }); +}); diff --git a/apps/coder/src/services/backends/__tests__/claude-session-store.test.ts b/apps/coder/src/services/backends/__tests__/claude-session-store.test.ts new file mode 100644 index 0000000..744fb44 --- /dev/null +++ b/apps/coder/src/services/backends/__tests__/claude-session-store.test.ts @@ -0,0 +1,135 @@ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import { readFileSync } from 'node:fs'; +import { resolve } from 'node:path'; +import postgres from 'postgres'; +import { PostgresSessionStore } from '../claude-session-store.js'; +import type { SessionStoreEntry } from '@anthropic-ai/claude-agent-sdk'; + +/** + * claude-sdk-sessionstore #9 (Part 1) — PostgresSessionStore tests. + * + * DB-opt-in (DATABASE_URL), mirrors checkpoints.test.ts: skips cleanly when the + * var is unset; otherwise applies the server + coder schemas and exercises the + * real append/load/listSessions/delete/listSubkeys round trips against postgres. + * Rows are namespaced under a unique project_key so concurrent suites / leftover + * data can't collide, and afterAll deletes everything written. + */ +describe.runIf(!!process.env.DATABASE_URL)('PostgresSessionStore (DB)', () => { + let sql: ReturnType; + let store: PostgresSessionStore; + const projectKey = `claude-store-test-${Date.now()}`; + + const entry = (type: string, extra: Record = {}): SessionStoreEntry => ({ + type, + ...extra, + }); + + beforeAll(async () => { + sql = postgres(process.env.DATABASE_URL!, { max: 3 }); + const serverSchema = resolve(__dirname, '../../../../../server/src/schema.sql'); + const coderSchema = resolve(__dirname, '../../../schema.sql'); + await sql.unsafe(readFileSync(serverSchema, 'utf8')); + await sql.unsafe(readFileSync(coderSchema, 'utf8')); + store = new PostgresSessionStore(sql); + }); + + afterAll(async () => { + if (sql) { + await sql`DELETE FROM claude_session_entries WHERE project_key = ${projectKey}`.catch(() => {}); + await sql.end({ timeout: 5 }); + } + }); + + it('append → load round-trips and preserves order across two appends', async () => { + const key = { projectKey, sessionId: 'sess-order' }; + await store.append(key, [entry('user', { uuid: 'u1' }), entry('assistant', { uuid: 'a1' })]); + await store.append(key, [entry('result', { uuid: 'r1' })]); + + const loaded = await store.load(key); + expect(loaded).not.toBeNull(); + expect(loaded!.map((e) => e.uuid)).toEqual(['u1', 'a1', 'r1']); + expect(loaded!.map((e) => e.type)).toEqual(['user', 'assistant', 'result']); + }); + + it('append with an empty batch is a no-op (load still null for an otherwise-unseen key)', async () => { + const key = { projectKey, sessionId: 'sess-empty' }; + await store.append(key, []); + expect(await store.load(key)).toBeNull(); + }); + + it('load of a key that was never written returns null', async () => { + expect(await store.load({ projectKey, sessionId: 'never-seen' })).toBeNull(); + }); + + it('isolates the main transcript from a subpath (load each independently)', async () => { + const sessionId = 'sess-subpath'; + const mainKey = { projectKey, sessionId }; + const subKey = { projectKey, sessionId, subpath: 'subagents/x' }; + + await store.append(mainKey, [entry('user', { uuid: 'main-1' })]); + await store.append(subKey, [entry('assistant', { uuid: 'sub-1' })]); + + const main = await store.load(mainKey); + const sub = await store.load(subKey); + expect(main!.map((e) => e.uuid)).toEqual(['main-1']); + expect(sub!.map((e) => e.uuid)).toEqual(['sub-1']); + }); + + it('listSessions returns the session with a numeric mtime (main transcripts only)', async () => { + const sessionId = 'sess-list'; + await store.append({ projectKey, sessionId }, [entry('user', { uuid: 'l1' })]); + // A subagent-only session must NOT surface as a main-transcript session. + await store.append( + { projectKey, sessionId: 'sess-sub-only', subpath: 'subagents/y' }, + [entry('user', { uuid: 's1' })], + ); + + const sessions = await store.listSessions(projectKey); + const ids = sessions.map((s) => s.sessionId); + expect(ids).toContain(sessionId); + expect(ids).not.toContain('sess-sub-only'); + + const row = sessions.find((s) => s.sessionId === sessionId)!; + expect(typeof row.mtime).toBe('number'); + expect(Number.isFinite(row.mtime)).toBe(true); + expect(row.mtime).toBeGreaterThan(0); + }); + + it('delete with a subpath removes only that subpath', async () => { + const sessionId = 'sess-del-subpath'; + const mainKey = { projectKey, sessionId }; + const subKey = { projectKey, sessionId, subpath: 'subagents/z' }; + await store.append(mainKey, [entry('user', { uuid: 'keep-1' })]); + await store.append(subKey, [entry('assistant', { uuid: 'drop-1' })]); + + await store.delete(subKey); + + expect(await store.load(subKey)).toBeNull(); + expect((await store.load(mainKey))!.map((e) => e.uuid)).toEqual(['keep-1']); + }); + + it('delete without a subpath removes the whole session (all subpaths)', async () => { + const sessionId = 'sess-del-all'; + const mainKey = { projectKey, sessionId }; + const subKey = { projectKey, sessionId, subpath: 'subagents/w' }; + await store.append(mainKey, [entry('user', { uuid: 'm' })]); + await store.append(subKey, [entry('assistant', { uuid: 's' })]); + + await store.delete({ projectKey, sessionId }); + + expect(await store.load(mainKey)).toBeNull(); + expect(await store.load(subKey)).toBeNull(); + expect(await store.listSubkeys({ projectKey, sessionId })).toEqual([]); + }); + + it('listSubkeys returns the distinct non-main subpaths', async () => { + const sessionId = 'sess-subkeys'; + await store.append({ projectKey, sessionId }, [entry('user', { uuid: 'main' })]); + await store.append({ projectKey, sessionId, subpath: 'subagents/a' }, [entry('user', { uuid: 'a1' })]); + await store.append({ projectKey, sessionId, subpath: 'subagents/a' }, [entry('user', { uuid: 'a2' })]); + await store.append({ projectKey, sessionId, subpath: 'subagents/b' }, [entry('user', { uuid: 'b1' })]); + + const subkeys = await store.listSubkeys({ projectKey, sessionId }); + expect(subkeys.sort()).toEqual(['subagents/a', 'subagents/b']); + }); +}); diff --git a/apps/coder/src/services/backends/__tests__/pushable-iterable.test.ts b/apps/coder/src/services/backends/__tests__/pushable-iterable.test.ts new file mode 100644 index 0000000..c81f88a --- /dev/null +++ b/apps/coder/src/services/backends/__tests__/pushable-iterable.test.ts @@ -0,0 +1,96 @@ +import { describe, it, expect } from 'vitest'; +import { createPushable } from '../pushable-iterable.js'; + +/** + * The pushable async-iterable that feeds the Claude SDK's streaming-input query() + * one message per turn while staying open across turns. Tests cover the ordering + * contract (push/close/async-iterate) without any SDK shape. + */ +describe('createPushable — push/iterate ordering', () => { + it('yields buffered values in FIFO order then parks', async () => { + const p = createPushable(); + const it = p.iterable[Symbol.asyncIterator](); + + p.push(1); + p.push(2); + expect(await it.next()).toEqual({ value: 1, done: false }); + expect(await it.next()).toEqual({ value: 2, done: false }); + + // No more buffered → next() parks; resolve it by pushing. + const parked = it.next(); + p.push(3); + expect(await parked).toEqual({ value: 3, done: false }); + }); + + it('hands a value directly to a parked consumer (push after await)', async () => { + const p = createPushable(); + const it = p.iterable[Symbol.asyncIterator](); + const pending = it.next(); // parks immediately (empty buffer) + p.push('hello'); + expect(await pending).toEqual({ value: 'hello', done: false }); + }); + + it('close() resolves a parked consumer as done and reports done thereafter', async () => { + const p = createPushable(); + const it = p.iterable[Symbol.asyncIterator](); + const pending = it.next(); + p.close(); + expect(await pending).toEqual({ value: undefined, done: true }); + expect(await it.next()).toEqual({ value: undefined, done: true }); + expect(p.closed).toBe(true); + }); + + it('still drains values buffered BEFORE close', async () => { + const p = createPushable(); + const it = p.iterable[Symbol.asyncIterator](); + p.push(10); + p.push(20); + p.close(); + expect(await it.next()).toEqual({ value: 10, done: false }); + expect(await it.next()).toEqual({ value: 20, done: false }); + expect(await it.next()).toEqual({ value: undefined, done: true }); + }); + + it('drops values pushed after close', async () => { + const p = createPushable(); + const it = p.iterable[Symbol.asyncIterator](); + p.close(); + p.push(99); // no-op + expect(await it.next()).toEqual({ value: undefined, done: true }); + }); + + it('close() is idempotent', () => { + const p = createPushable(); + p.close(); + expect(() => p.close()).not.toThrow(); + expect(p.closed).toBe(true); + }); + + it('works with a for-await loop driven by interleaved pushes', async () => { + const p = createPushable(); + const seen: number[] = []; + const consumer = (async () => { + for await (const v of p.iterable) seen.push(v); + })(); + + p.push(1); + await Promise.resolve(); + p.push(2); + await Promise.resolve(); + p.close(); + await consumer; + expect(seen).toEqual([1, 2]); + }); + + it('return() on the iterator closes the queue (for-await break)', async () => { + const p = createPushable(); + const it = p.iterable[Symbol.asyncIterator](); + p.push(1); + expect(await it.next()).toEqual({ value: 1, done: false }); + // Simulate a `break` in for-await: the runtime calls return(). + expect(await it.return!()).toEqual({ value: undefined, done: true }); + expect(p.closed).toBe(true); + p.push(2); // dropped — queue is closed + expect(await it.next()).toEqual({ value: undefined, done: true }); + }); +}); diff --git a/apps/coder/src/services/backends/claude-sdk-map.ts b/apps/coder/src/services/backends/claude-sdk-map.ts new file mode 100644 index 0000000..55e637b --- /dev/null +++ b/apps/coder/src/services/backends/claude-sdk-map.ts @@ -0,0 +1,192 @@ +/** + * claude-sdk-sessionstore #9 (Part 2) — PURE Claude-SDK message → AgentEvent mapper. + * + * `ClaudeSdkBackend` drives one `query()` per (chat, agent) session and feeds each + * `SDKMessage` it yields through this function, forwarding the returned + * `AgentEvent[]` to the dispatcher's `onEvent` (which maps them to WS frames + + * persists). Kept PURE (one message + a caller-owned accumulator → events) so it's + * unit-testable without a live `claude` binary — the whole point of Part 2's + * typecheck-and-unit-test gate (the live pump needs a host smoke). + * + * SDK shapes (verified against @anthropic-ai/claude-agent-sdk@0.3.159 sdk.d.ts + + * @anthropic-ai/sdk beta messages d.ts): + * - `SDKPartialAssistantMessage` (`type:'stream_event'`) carries a + * `BetaRawMessageStreamEvent` — the LIVE delta stream (only emitted when + * `options.includePartialMessages` is set, which the backend sets). We map: + * · content_block_delta + text_delta → { text } + * · content_block_delta + thinking_delta → { reasoning } + * · content_block_start + tool_use block → { tool_call } (in_progress) + * · content_block_delta + input_json_delta → buffered into the tool's args + * (no event; the assembled input rides the terminal tool_update) + * - `SDKAssistantMessage` (`type:'assistant'`) carries the FINAL `message.content` + * blocks. Text/thinking there are post-hoc repeats of what the partials already + * streamed, so we DROP them (dedup) and only emit a terminal `tool_update` + * (status completed) per `tool_use` block, with its now-complete `input`. + * - All other `SDKMessage` variants (system/init, status, result, hooks, task + * notifications, …) carry no renderable turn content → return []. + * + * Tool assembly spans messages: a tool_use block opens in a partial + * `content_block_start`, its args stream as `input_json_delta` frames keyed by the + * block `index`, and the final assistant message restates the complete block. The + * caller owns a `ClaudeSdkMapState` (snapshot map + per-index tool tracking) that + * threads this across calls, mirroring the `Map` the other + * backends pass into `mapSessionUpdate`. The result frames carry the SAME + * `AcpToolSnapshot` shape, so `persistExternalAgentTurn` / `snapshotToWireToolCall` + * are reused unchanged. + */ +import type { SDKMessage } from '@anthropic-ai/claude-agent-sdk'; +import type { AgentEvent } from '../agent-backend.js'; +import type { AcpToolSnapshot } from '../acp-tool-snapshot.js'; + +/** + * The underlying `@anthropic-ai/sdk` Beta message types (`BetaRawMessageStreamEvent`, + * `BetaContentBlock`) are a TRANSITIVE dep of `@anthropic-ai/claude-agent-sdk` — not + * a direct dependency of apps/coder — so a `@anthropic-ai/sdk/...` import does NOT + * resolve here under pnpm's strict node_modules. We instead DERIVE both shapes from + * the SDK's own exported message types, which is also more correct (it tracks the + * exact `event` / `content` shapes the SDK yields, not a hand-picked import path). + */ +type StreamEvent = Extract['event']; +type AssistantContent = Extract['message']['content']; +type ContentBlock = AssistantContent extends readonly (infer B)[] ? B : never; + +/** + * Caller-owned accumulator threaded across `mapSdkMessage` calls within ONE turn. + * The backend creates a fresh one per turn and clears it at turn end. + */ +export interface ClaudeSdkMapState { + /** Stable tool-call snapshots by tool_use id, merged across start/delta/stop. */ + snapshots: Map; + /** + * Partial-stream block index → in-flight tool assembly. Anthropic's stream keys + * blocks by a numeric `index`; tool_use args arrive as `input_json_delta`s under + * that index with no id, so we map index→id to route them and buffer the raw + * JSON fragments until the block closes (or the final assistant message lands). + */ + toolByIndex: Map; +} + +/** Construct a fresh per-turn accumulator. */ +export function createClaudeSdkMapState(): ClaudeSdkMapState { + return { snapshots: new Map(), toolByIndex: new Map() }; +} + +/** + * Map one `SDKMessage` → zero or more `AgentEvent`s, mutating `state` for + * cross-message tool assembly + dedup. Pure w.r.t. its inputs otherwise. + */ +export function mapSdkMessage(msg: SDKMessage, state: ClaudeSdkMapState): AgentEvent[] { + switch (msg.type) { + case 'stream_event': + return mapStreamEvent(msg.event, state); + case 'assistant': + return mapFinalAssistant(msg.message.content, state); + default: + // system/init, status, result, hooks, task_*, etc. — no turn content here. + // (The backend reads session_id off the init message and usage/cost off the + // result message directly; neither produces a renderable AgentEvent.) + return []; + } +} + +/** Live partial-stream delta → AgentEvent(s). */ +function mapStreamEvent(event: StreamEvent, state: ClaudeSdkMapState): AgentEvent[] { + switch (event.type) { + case 'content_block_start': { + const block = event.content_block; + if (block.type === 'tool_use') { + const snap: AcpToolSnapshot = { + toolCallId: block.id, + title: block.name, + kind: null, + status: 'in_progress', + rawInput: block.input ?? undefined, + rawOutput: undefined, + }; + state.snapshots.set(block.id, snap); + state.toolByIndex.set(event.index, { id: block.id, name: block.name, jsonBuf: '' }); + return [{ type: 'tool_call', toolCall: snap }]; + } + return []; + } + case 'content_block_delta': { + const delta = event.delta; + if (delta.type === 'text_delta') { + return delta.text ? [{ type: 'text', text: delta.text }] : []; + } + if (delta.type === 'thinking_delta') { + return delta.thinking ? [{ type: 'reasoning', text: delta.thinking }] : []; + } + if (delta.type === 'input_json_delta') { + // Buffer the tool's streamed args under its block index; no event yet — + // the assembled input rides the terminal tool_update (or the final block). + const t = state.toolByIndex.get(event.index); + if (t) t.jsonBuf += delta.partial_json ?? ''; + return []; + } + // signature_delta / citations_delta / compaction_delta — nothing to render. + return []; + } + case 'content_block_stop': { + // Close out a streamed tool block: parse its buffered JSON args and emit a + // tool_update carrying the assembled input. The final assistant message will + // restate the same block, but its snapshot is dedup-merged (same id) so this + // is harmless — we emit here so a tool's input renders even if the assistant + // message is delayed/dropped. + const t = state.toolByIndex.get(event.index); + if (!t) return []; + state.toolByIndex.delete(event.index); + const prev = state.snapshots.get(t.id); + const snap: AcpToolSnapshot = { + toolCallId: t.id, + title: prev?.title ?? t.name, + kind: null, + status: 'in_progress', + rawInput: parseJsonOr(t.jsonBuf, prev?.rawInput), + rawOutput: undefined, + }; + state.snapshots.set(t.id, snap); + return [{ type: 'tool_update', toolCall: snap }]; + } + default: + // message_start / message_delta / message_stop — turn framing, no content. + return []; + } +} + +/** + * Final assistant message content blocks. Text/thinking are post-hoc repeats of + * the partial stream → dropped (dedup). Only tool_use blocks emit a terminal + * tool_update carrying the complete `input`. + */ +function mapFinalAssistant(content: ContentBlock[], state: ClaudeSdkMapState): AgentEvent[] { + const out: AgentEvent[] = []; + for (const block of content) { + if (block.type === 'tool_use') { + const prev = state.snapshots.get(block.id); + const snap: AcpToolSnapshot = { + toolCallId: block.id, + title: prev?.title ?? block.name, + kind: null, + status: 'completed', + rawInput: block.input ?? prev?.rawInput, + rawOutput: undefined, + }; + state.snapshots.set(block.id, snap); + out.push({ type: 'tool_update', toolCall: snap }); + } + // text / thinking / redacted_thinking blocks: already streamed via partials. + } + return out; +} + +/** Parse a buffered JSON string; fall back to a prior value on empty/invalid. */ +function parseJsonOr(buf: string, fallback: unknown): unknown { + const s = buf.trim(); + if (!s) return fallback; + try { + return JSON.parse(s); + } catch { + return fallback; + } +} diff --git a/apps/coder/src/services/backends/claude-sdk-routing.ts b/apps/coder/src/services/backends/claude-sdk-routing.ts new file mode 100644 index 0000000..db39a65 --- /dev/null +++ b/apps/coder/src/services/backends/claude-sdk-routing.ts @@ -0,0 +1,38 @@ +/** + * claude-sdk-sessionstore #9 (Part 2) — claude-SDK-vs-PTY routing predicate. + * + * Sibling to `shouldUseWarmBackend` (warm-acp-routing.ts). The warm Claude-SDK + * backend keys its persistent `query()` on (chat_id, agent) — exactly like the + * warm-ACP / opencode-server backends — so a task only routes to it when it carries + * BOTH a `session_id` and a `chat_id` (a real chat tab). + * + * CRUCIALLY this is ALSO gated behind the `CLAUDE_SDK_BACKEND` env flag (default + * OFF). While off — the production default — claude always falls through to the + * existing one-shot PTY `runExternalAgent` path, UNCHANGED. The live SDK streaming + * pump + cross-turn resume need a host smoke against the real `claude` binary, so + * we keep the working PTY path as the default until that lands. Flip the env var + * on a host (any truthy value) to opt a deployment into the SDK backend. + * + * Pure (env read injected) so it's unit-testable; the dispatcher consumes it. + */ + +/** True iff the `CLAUDE_SDK_BACKEND` env flag is set to a truthy value. */ +export function claudeSdkBackendEnabled(env: NodeJS.ProcessEnv = process.env): boolean { + const v = env.CLAUDE_SDK_BACKEND; + if (v == null) return false; + const s = v.trim().toLowerCase(); + return s !== '' && s !== '0' && s !== 'false' && s !== 'off' && s !== 'no'; +} + +export function shouldUseClaudeSdk( + task: { + agent: string | null; + session_id: string | null; + chat_id: string | null; + }, + env: NodeJS.ProcessEnv = process.env, +): boolean { + if (!claudeSdkBackendEnabled(env)) return false; + if (task.agent !== 'claude') return false; + return task.session_id != null && task.chat_id != null; +} diff --git a/apps/coder/src/services/backends/claude-sdk.ts b/apps/coder/src/services/backends/claude-sdk.ts new file mode 100644 index 0000000..9191c19 --- /dev/null +++ b/apps/coder/src/services/backends/claude-sdk.ts @@ -0,0 +1,364 @@ +/** + * claude-sdk-sessionstore #9 (Part 2) — ClaudeSdkBackend. + * + * A warm, resumable backend for the `claude` agent built on the Claude Agent SDK + * (`@anthropic-ai/claude-agent-sdk`), implementing the Phase-0 `AgentBackend` + * contract (same shape as `WarmAcpBackend` / `OpenCodeServerBackend`). One + * persistent `query()` per (chat, agent) session, driven in STREAMING-INPUT mode: + * the `prompt` is a pushable `AsyncIterable` that stays open across + * turns, so the SDK subprocess + conversation stay warm between `prompt()` calls + * until `closeSession`/`dispose`. + * + * ⚠ LIVE PUMP IS HOST-ONLY. The actual streaming turn needs the real `claude` + * binary + ANTHROPIC auth on a host — it CANNOT run in the dev container. This file + * is written against the REAL SDK types so it TYPECHECKS, and the PURE pieces (the + * `mapSdkMessage` mapper + the `createPushable` queue) are unit-tested. Routing to + * this backend is gated behind `CLAUDE_SDK_BACKEND` (default OFF) so production + * claude stays on the working PTY path until a host smoke validates the pump + + * cross-turn resume. + * + * Lifecycle (mirrors warm-acp.ts / opencode-server.ts): + * - `ensureSession`: resolve the resume id from `agent_sessions(chat_id,'claude')` + * and (re)build the single `query()` if not already live. The SDK's own + * `sessionStore` (Part 1 PostgresSessionStore) materializes the transcript on + * resume; `options.resume` carries the provider session id. + * - `prompt`: push ONE user message onto the open queue, iterate the generator, + * map each `SDKMessage` → `AgentEvent`s via `mapSdkMessage`, forward to + * `ctx.onEvent`, and resolve when the turn's `result` message lands. Capture the + * `session_id` from the `init` message and persist it to `agent_sessions`; + * accumulate `result.usage` / `total_cost_usd` onto the row (mirrors opencode U.6). + * - `closeSession` / `dispose`: close the queue + dispose the query generator. + * - A thrown error or `result.subtype==='error*'` marks `agent_sessions.status='crashed'`. + * + * Turn serialization: like warm-acp, exactly one turn is in flight at a time on a + * given backend (the dispatcher's per-session `inflight` map enforces this upstream; + * `isBusy()` reports it so the pool never evicts mid-turn). + */ +import { query, type Query, type SDKMessage, type SDKUserMessage, type Options } from '@anthropic-ai/claude-agent-sdk'; +import type { FastifyBaseLogger } from 'fastify'; +import type { Sql } from '../../db.js'; +import { PostgresSessionStore } from './claude-session-store.js'; +import { createPushable, type Pushable } from './pushable-iterable.js'; +import { mapSdkMessage, createClaudeSdkMapState, type ClaudeSdkMapState } from './claude-sdk-map.js'; +import type { + AgentBackend, + AgentSessionHandle, + EnsureSessionOpts, + PromptCtx, + TurnResult, +} from '../agent-backend.js'; + +export interface ClaudeSdkBackendDeps { + sql: Sql; + log: FastifyBaseLogger; + /** The (chat, agent) this backend serves — its pool identity + DB key. */ + chatId: string; + /** Always 'claude' today; kept explicit so the pool key + DB writes stay honest. */ + agent: string; + /** Resolved `claude` binary path (available_agents.install_path); null → SDK default. */ + installPath: string | null; +} + +export class ClaudeSdkBackend implements AgentBackend { + readonly backend = 'claude_sdk' as const; + + private readonly sql: Sql; + private readonly log: FastifyBaseLogger; + private readonly chatId: string; + private readonly agent: string; + private readonly installPath: string | null; + private readonly sessionStore: PostgresSessionStore; + + /** The single persistent query() generator; null until the first turn builds it. */ + private query: Query | null = null; + /** The open input queue feeding the generator one SDKUserMessage per turn. */ + private input: Pushable | null = null; + /** The provider's own session id (resume token), captured from the init message. */ + private agentSessionId: string | null = null; + /** Resolved model the live query() was built with; a change forces a rebuild. */ + private builtModel: string | null = null; + /** True between prompt() start and settle. */ + private busy = false; + private up = false; + + constructor(deps: ClaudeSdkBackendDeps) { + this.sql = deps.sql; + this.log = deps.log; + this.chatId = deps.chatId; + this.agent = deps.agent; + this.installPath = deps.installPath; + this.sessionStore = new PostgresSessionStore(deps.sql); + } + + /** §2: liveness for the health endpoint + dispatcher fallback decision. */ + health(): 'up' | 'down' { + return this.up ? 'up' : 'down'; + } + + /** Phase 3: busy iff a turn is in flight (pool never evicts a busy backend). */ + isBusy(): boolean { + return this.busy; + } + + // ─── ensureSession: resolve resume id + (re)build the warm query ────────────── + + async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise { + // Resolve the resume token from the (chat_id, agent) row. A crashed row is not + // resumed (the SDK would fail to load a dead session); we create fresh. + const [row] = await this.sql<{ agent_session_id: string | null; status: string }[]>` + SELECT agent_session_id, status FROM agent_sessions + WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent} + `; + const resumeId = row && row.status !== 'crashed' ? row.agent_session_id : null; + + // (Re)build the warm query if there is none, or the model changed (the SDK can + // change model mid-session via setModel, but a fresh build is simplest + matches + // opencode's config-drift → fresh-session rule). The query stays alive across + // turns; only closeSession/dispose tears it down. + if (!this.query || this.builtModel !== opts.model) { + await this.teardownQuery(); + this.buildQuery(opts.worktreePath, opts.model, resumeId); + } + + // Seed the in-memory resume id from the DB so a handle built before the first + // turn's init message still carries the last-known token. The init message + // overwrites it with the authoritative current id during the turn. + if (this.agentSessionId == null) this.agentSessionId = resumeId; + + // Upsert the agent_sessions row (backend='claude_sdk'). agent_session_id may be + // null until the first turn captures it from the init message; prompt() updates it. + await this.sql` + INSERT INTO agent_sessions + (chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at) + VALUES + (${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'claude_sdk', ${this.agentSessionId}, NULL, 'active', clock_timestamp()) + ON CONFLICT (chat_id, agent) DO UPDATE SET + session_id = EXCLUDED.session_id, + worktree_id = EXCLUDED.worktree_id, + backend = 'claude_sdk', + agent_session_id = COALESCE(EXCLUDED.agent_session_id, agent_sessions.agent_session_id), + server_port = NULL, + status = 'active', + last_active_at = clock_timestamp() + `.catch((err) => { + this.log.warn({ err: errMsg(err), chatId: opts.chatId, agent: opts.agent }, 'claude-sdk: agent_sessions upsert failed (non-fatal)'); + }); + + return { + sessionId, + agent: opts.agent, + backend: 'claude_sdk', + chatId: opts.chatId, + worktreeId: opts.worktreeId, + agentSessionId: this.agentSessionId, + serverPort: null, + }; + } + + /** Build the persistent query() in streaming-input mode. Lazy — no subprocess + * work happens until the generator is first iterated in prompt(). */ + private buildQuery(worktreePath: string, model: string, resumeId: string | null): void { + const input = createPushable(); + const options: Options = { + sessionStore: this.sessionStore, + cwd: worktreePath, + // Stream partial assistant messages so text/thinking/tool deltas arrive live + // (the mapper reads them; without this only terminal messages land). + includePartialMessages: true, + ...(model ? { model } : {}), + ...(resumeId ? { resume: resumeId } : {}), + ...(this.installPath ? { pathToClaudeCodeExecutable: this.installPath } : {}), + // ANTHROPIC auth/env must reach the child; inherit the process env (host concern). + env: process.env as Record, + }; + this.input = input; + this.query = query({ prompt: input.iterable, options }); + this.builtModel = model; + this.up = true; + this.log.info({ chatId: this.chatId, agent: this.agent, model, resume: resumeId ?? null }, 'claude-sdk: warm query built'); + } + + // ─── prompt: push one user message + drain the generator until result ───────── + + async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise { + if (!this.query || !this.input) { + // ensureSession should have built it; rebuild defensively (e.g. evicted/raced). + this.buildQuery(ctx.worktreePath, ctx.model, handle.agentSessionId); + } + const gen = this.query!; + const queue = this.input!; + + if (ctx.signal.aborted) return { ok: false, error: 'aborted' }; + + this.busy = true; + const state: ClaudeSdkMapState = createClaudeSdkMapState(); + // Per-turn abort: interrupt the in-flight query on the SAME generator (never + // tear down the warm query — that's the pool's lifetime). The generator then + // emits its terminal result and the drain loop exits. + let aborted = false; + const onAbort = () => { + if (aborted) return; + aborted = true; + void gen.interrupt().catch(() => {}); + }; + ctx.signal.addEventListener('abort', onAbort, { once: true }); + + // Push the turn's user message onto the open queue. session_id is optional on + // the wire; the SDK manages it via resume + the init message. + const userMsg: SDKUserMessage = { + type: 'user', + message: { role: 'user', content: input }, + parent_tool_use_id: null, + ...(handle.agentSessionId ? { session_id: handle.agentSessionId } : {}), + }; + queue.push(userMsg); + + try { + for await (const msg of gen) { + // Capture the provider session id from the init message (authoritative). + if (msg.type === 'system' && msg.subtype === 'init' && msg.session_id) { + if (this.agentSessionId !== msg.session_id) { + this.agentSessionId = msg.session_id; + await this.persistAgentSessionId(msg.session_id); + } + } + // The result message ends THIS turn (it does not close the generator — + // streaming-input keeps it alive for the next pushed message). + if (msg.type === 'result') { + await this.accumulateUsage(msg); + const ok = msg.subtype === 'success' && !aborted; + if (!ok) { + // error_during_execution / error_max_turns / aborted → crashed row. + await this.markCrashed(); + } else { + await this.markIdle(); + } + if (aborted) return { ok: false, error: 'aborted' }; + return ok + ? { ok: true } + : { ok: false, error: resultErrorMessage(msg) }; + } + // Map renderable content → AgentEvents for the dispatcher's onEvent. + for (const ev of mapSdkMessage(msg, state)) { + ctx.onEvent(ev); + } + } + // Generator ended without a result message (e.g. it was disposed) — treat as + // a non-fatal incomplete turn so the dispatcher still finalizes the row. + if (aborted) return { ok: false, error: 'aborted' }; + return { ok: false, error: 'claude-sdk: query ended before result' }; + } catch (err) { + if (aborted) return { ok: false, error: 'aborted' }; + await this.markCrashed(); + return { ok: false, error: errMsg(err) }; + } finally { + ctx.signal.removeEventListener('abort', onAbort); + this.busy = false; + } + } + + // ─── persistence helpers ────────────────────────────────────────────────────── + + private async persistAgentSessionId(id: string): Promise { + await this.sql` + UPDATE agent_sessions + SET agent_session_id = ${id}, last_active_at = clock_timestamp() + WHERE chat_id = ${this.chatId} AND agent = ${this.agent} + `.catch((err) => { + this.log.warn({ err: errMsg(err), chatId: this.chatId }, 'claude-sdk: failed to persist agent_session_id (non-fatal)'); + }); + } + + /** + * Accumulate the turn's usage/cost onto the (chat_id, agent) row — mirrors the + * opencode U.6 running-total pattern. The SDK reports usage once per turn on the + * result message (not per step), so this fires once per prompt(). Cache read/write + * input tokens fold into `input_tokens`; usage telemetry never fails a turn. + */ + private async accumulateUsage(result: Extract): Promise { + const u = result.usage; + const input = num(u?.input_tokens) + num(u?.cache_read_input_tokens) + num(u?.cache_creation_input_tokens); + const output = num(u?.output_tokens); + const cost = numF(result.total_cost_usd); + if (input === 0 && output === 0 && cost === 0) return; + await this.sql` + UPDATE agent_sessions SET + input_tokens = input_tokens + ${input}, + output_tokens = output_tokens + ${output}, + cost = cost + ${cost} + WHERE chat_id = ${this.chatId} AND agent = ${this.agent} + `.catch((err) => { + this.log.warn({ err: errMsg(err), chatId: this.chatId }, 'claude-sdk: failed to persist usage (non-fatal)'); + }); + } + + private async markIdle(): Promise { + await this.sql` + UPDATE agent_sessions SET status = 'idle', last_active_at = clock_timestamp() + WHERE chat_id = ${this.chatId} AND agent = ${this.agent} + `.catch(() => {}); + } + + private async markCrashed(): Promise { + await this.sql` + UPDATE agent_sessions SET status = 'crashed' + WHERE chat_id = ${this.chatId} AND agent = ${this.agent} + `.catch(() => {}); + } + + // ─── teardown ──────────────────────────────────────────────────────────────── + + async closeSession(handle: AgentSessionHandle): Promise { + await this.teardownQuery(); + await this.sql` + UPDATE agent_sessions SET status = 'closed' + WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent} + `.catch(() => {}); + } + + async dispose(): Promise { + await this.teardownQuery(); + } + + /** Close the input queue + dispose the generator. Idempotent. */ + private async teardownQuery(): Promise { + this.up = false; + this.busy = false; + const q = this.query; + const queue = this.input; + this.query = null; + this.input = null; + this.builtModel = null; + queue?.close(); + if (q) { + // return() ends the AsyncGenerator and lets the SDK clean up its subprocess. + await q.return(undefined).catch(() => {}); + } + } +} + +// ─── helpers ────────────────────────────────────────────────────────────────── + +/** Coerce to a non-negative finite integer (tokens). */ +function num(v: unknown): number { + const x = typeof v === 'number' ? v : Number(v); + return Number.isFinite(x) && x > 0 ? Math.round(x) : 0; +} + +/** Coerce to a non-negative finite float (cost USD). */ +function numF(v: unknown): number { + const x = typeof v === 'number' ? v : Number(v); + return Number.isFinite(x) && x > 0 ? x : 0; +} + +/** Build a human-readable error from an SDK error-result message. */ +function resultErrorMessage(result: Extract): string { + if (result.subtype === 'success') return 'ok'; + const errs = (result as { errors?: string[] }).errors; + if (Array.isArray(errs) && errs.length > 0) return `${result.subtype}: ${errs.join('; ')}`; + return result.subtype; +} + +function errMsg(e: unknown): string { + return e instanceof Error ? e.message : String(e); +} diff --git a/apps/coder/src/services/backends/claude-session-store.ts b/apps/coder/src/services/backends/claude-session-store.ts new file mode 100644 index 0000000..8752c8f --- /dev/null +++ b/apps/coder/src/services/backends/claude-session-store.ts @@ -0,0 +1,117 @@ +import type { SessionStore, SessionKey, SessionStoreEntry } from '@anthropic-ai/claude-agent-sdk'; +import type { Sql } from '../../db.js'; + +/** + * claude-sdk-sessionstore #9 (Part 1) — clean-room PostgresSessionStore. + * + * A Postgres-backed implementation of the Claude Agent SDK's `SessionStore` + * adapter type. The SDK mirrors each transcript line (a JSON-safe POJO with a + * `type` discriminant) to this store via `append`; on resume it calls `load` + * to materialize the full transcript back. We treat entries as opaque blobs and + * preserve append order via a BIGSERIAL `id` — `load` replays `ORDER BY id`. + * + * Storage shape: one row per entry in `claude_session_entries`, keyed by the + * SDK's `SessionKey` (project_key, session_id, subpath). The SDK uses an + * *undefined* subpath for the main transcript and disallows the empty string; + * we collapse `undefined → ''` so the main transcript and subagent files share + * one table, distinguished by the `subpath` column (`'' = main`). + * + * Clean-room: written against the SDK's published `SessionStore` type contract + * and BooCode's existing SQL conventions (porsager tagged templates, `sql.json` + * for JSONB). No SDK example/reference code was consulted. + */ +export class PostgresSessionStore implements SessionStore { + constructor(private readonly sql: Sql) {} + + /** + * Mirror a batch of transcript entries. No-op on an empty batch; otherwise a + * single multi-row INSERT writes them in array order. Because `id` is a + * monotonically-increasing BIGSERIAL, the insert order is the replay order + * `load` reconstructs — entries within one call land in the order given. + */ + async append(key: SessionKey, entries: SessionStoreEntry[]): Promise { + if (entries.length === 0) return; + const subpath = key.subpath ?? ''; + const rows = entries.map((entry) => ({ + project_key: key.projectKey, + session_id: key.sessionId, + subpath, + entry: this.sql.json(entry as never), + })); + await this.sql` + INSERT INTO claude_session_entries ${this.sql(rows, 'project_key', 'session_id', 'subpath', 'entry')} + `; + } + + /** + * Load a full transcript for resume. Returns the entries in append order, or + * `null` for a (project_key, session_id, subpath) key that was never written. + */ + async load(key: SessionKey): Promise { + const subpath = key.subpath ?? ''; + const rows = await this.sql<{ entry: SessionStoreEntry }[]>` + SELECT entry + FROM claude_session_entries + WHERE project_key = ${key.projectKey} + AND session_id = ${key.sessionId} + AND subpath = ${subpath} + ORDER BY id + `; + if (rows.length === 0) return null; + return rows.map((r) => r.entry); + } + + /** + * List the main transcripts for a project. `mtime` is the storage write time + * (latest `created_at` for the session) in Unix epoch milliseconds; the SDK + * sorts the result by mtime descending. + */ + async listSessions(projectKey: string): Promise> { + const rows = await this.sql<{ session_id: string; mtime: string }[]>` + SELECT session_id, extract(epoch FROM max(created_at)) * 1000 AS mtime + FROM claude_session_entries + WHERE project_key = ${projectKey} + AND subpath = '' + GROUP BY session_id + `; + return rows.map((r) => ({ sessionId: r.session_id, mtime: Number(r.mtime) })); + } + + /** + * Delete a session. With a `subpath` set, only that subpath's rows are + * removed; with `subpath` omitted, every row for the session is removed + * (all subpaths, including the main transcript). + */ + async delete(key: SessionKey): Promise { + if (key.subpath !== undefined) { + await this.sql` + DELETE FROM claude_session_entries + WHERE project_key = ${key.projectKey} + AND session_id = ${key.sessionId} + AND subpath = ${key.subpath} + `; + return; + } + await this.sql` + DELETE FROM claude_session_entries + WHERE project_key = ${key.projectKey} + AND session_id = ${key.sessionId} + `; + } + + /** + * List the distinct non-main subpaths under a session (e.g. subagent files). + * Used during resume to discover and materialize subagent transcripts; the + * main transcript (`subpath = ''`) is excluded. + */ + async listSubkeys(key: { projectKey: string; sessionId: string }): Promise { + const rows = await this.sql<{ subpath: string }[]>` + SELECT DISTINCT subpath + FROM claude_session_entries + WHERE project_key = ${key.projectKey} + AND session_id = ${key.sessionId} + AND subpath <> '' + `; + return rows.map((r) => r.subpath); + } +} diff --git a/apps/coder/src/services/backends/pushable-iterable.ts b/apps/coder/src/services/backends/pushable-iterable.ts new file mode 100644 index 0000000..7119978 --- /dev/null +++ b/apps/coder/src/services/backends/pushable-iterable.ts @@ -0,0 +1,96 @@ +/** + * claude-sdk-sessionstore #9 (Part 2) — a tiny PURE pushable async-iterable. + * + * The Claude Agent SDK's streaming-input mode wants `query({ prompt })` where + * `prompt` is an `AsyncIterable`. To keep ONE `query()` generator + * alive across many turns (the "warm" property), the backend feeds it ONE user + * message per `prompt()` turn through a queue that stays open between turns and is + * only closed at `closeSession`/`dispose`. This is that queue. + * + * Semantics (the bit worth unit-testing — push/close/iterate ordering): + * - `push(v)` enqueues a value. If a consumer is parked in `await next()`, it's + * handed the value immediately; otherwise the value buffers in FIFO order. + * - The async iterator yields buffered/pushed values in push order, and PARKS + * (never busy-loops) when the buffer is empty — so the SDK generator waits for + * the next turn's message instead of seeing end-of-input. + * - `close()` ends the iterable: any parked consumer resolves `{done:true}` and + * all future `next()`s return done. Values pushed after close are dropped. + * - It's single-consumer (one `query()` reads it); concurrent consumers are not a + * supported shape and not needed here. + * + * No SDK import — generic over the pushed value `T` — so the pure push/close/iterate + * ordering is testable without the `SDKUserMessage` shape or a live binary. + */ +export interface Pushable { + /** Enqueue a value (or hand it to a parked consumer). No-op after close. */ + push(value: T): void; + /** End the iterable. Idempotent; a parked consumer resolves done. */ + close(): void; + /** True once `close()` has been called. */ + readonly closed: boolean; + /** The async-iterable the consumer (the SDK `query`) drives. */ + readonly iterable: AsyncIterable; +} + +export function createPushable(): Pushable { + const buffer: T[] = []; + // A waiting consumer's resolver (null when none is parked). Single-consumer. + let pendingResolve: ((res: IteratorResult) => void) | null = null; + let closed = false; + + function push(value: T): void { + if (closed) return; + if (pendingResolve) { + const resolve = pendingResolve; + pendingResolve = null; + resolve({ value, done: false }); + return; + } + buffer.push(value); + } + + function close(): void { + if (closed) return; + closed = true; + if (pendingResolve) { + const resolve = pendingResolve; + pendingResolve = null; + resolve({ value: undefined, done: true }); + } + } + + const iterator: AsyncIterator = { + next(): Promise> { + // Drain the buffer first (FIFO), regardless of close — buffered values + // pushed before close are still delivered. + if (buffer.length > 0) { + return Promise.resolve({ value: buffer.shift() as T, done: false }); + } + if (closed) { + return Promise.resolve({ value: undefined, done: true }); + } + // Park until the next push/close. Single-consumer: only one waiter at a time. + return new Promise>((resolve) => { + pendingResolve = resolve; + }); + }, + return(): Promise> { + // Consumer abandoned the loop (e.g. `break`) → close so a later push no-ops. + close(); + return Promise.resolve({ value: undefined, done: true }); + }, + }; + + return { + push, + close, + get closed() { + return closed; + }, + iterable: { + [Symbol.asyncIterator]() { + return iterator; + }, + }, + }; +} diff --git a/apps/coder/src/services/dispatcher.ts b/apps/coder/src/services/dispatcher.ts index 8fb1558..b61e787 100644 --- a/apps/coder/src/services/dispatcher.ts +++ b/apps/coder/src/services/dispatcher.ts @@ -16,7 +16,9 @@ import { snapshotToWireToolCall, type AcpToolSnapshot } from './acp-tool-snapsho import { agentPool, OPENCODE_POOL_KEY } from './agent-pool.js'; import { OpenCodeServerBackend } from './backends/opencode-server.js'; import { WarmAcpBackend } from './backends/warm-acp.js'; +import { ClaudeSdkBackend } from './backends/claude-sdk.js'; import { shouldUseWarmBackend } from './backends/warm-acp-routing.js'; +import { shouldUseClaudeSdk } from './backends/claude-sdk-routing.js'; import type { AgentBackend, AgentEvent } from './agent-backend.js'; interface InferenceRunner { @@ -131,6 +133,12 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise { + const taskId = task.id; + const agent = task.agent!; + // shouldUseClaudeSdk guarantees both non-null before we get here. + const sessionId = task.session_id!; + const chatId = task.chat_id!; + log.info({ taskId, agent, chatId }, 'dispatcher: starting task (path B — claude SDK)'); + + const [project] = await sql<{ path: string | null }[]>` + SELECT path FROM projects WHERE id = ${task.project_id} + `; + const projectPath = project?.path; + if (!projectPath) { + await sql` + UPDATE tasks + SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' + WHERE id = ${taskId} + `; + return; + } + + const ac = new AbortController(); + + try { + await sql` + UPDATE tasks + SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp' + WHERE id = ${taskId} + `; + + // Persistent, session-keyed worktree (shared across turns + agents; NOT torn + // down per turn — Phase 3 reaps it). Same as the opencode/warm-ACP paths so a + // chat that switches agents shares one worktree. + const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, { + signal: ac.signal, + }); + log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready (claude SDK)'); + + const [assistantMsg] = await sql<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) + RETURNING id + `; + const assistantId = assistantMsg!.id; + + // write-edit-robustness #4: pre-turn checkpoint of the persistent session + // worktree (best-effort; never breaks dispatch). + await createCheckpoint( + sql, + { chatId, sessionId, worktreeId, worktreePath, messageId: assistantId }, + { signal: ac.signal, log }, + ).catch(() => null); + + broker.publishFrame(sessionId, { + type: 'message_started', + message_id: assistantId, + chat_id: chatId, + role: 'assistant', + } as WsFrame); + + const manifestCommands = getManifestCommands(agent); + if (manifestCommands.length > 0) { + setTaskCommands(taskId, manifestCommands); + broker.publishFrame(sessionId, { + type: 'agent_commands', + task_id: taskId, + session_id: sessionId, + commands: manifestCommands, + } as WsFrame); + } + + // Accumulate the turn's stream for persistence + the final message content. + const textChunks: string[] = []; + const reasoningChunks: string[] = []; + const toolSnaps = new Map(); + + // Map transport-agnostic AgentEvents → the SAME WS frames the warm-ACP / + // opencode paths emit. This boundary attaches message_id/chat_id. + const onEvent = (e: AgentEvent): void => { + switch (e.type) { + case 'text': + textChunks.push(e.text); + broker.publishFrame(sessionId, { + type: 'delta', + message_id: assistantId, + chat_id: chatId, + content: e.text, + } as WsFrame); + break; + case 'reasoning': + reasoningChunks.push(e.text); + broker.publishFrame(sessionId, { + type: 'reasoning_delta', + message_id: assistantId, + chat_id: chatId, + content: e.text, + } as WsFrame); + break; + case 'tool_call': + case 'tool_update': + toolSnaps.set(e.toolCall.toolCallId, e.toolCall); + broker.publishFrame(sessionId, { + type: 'tool_call', + message_id: assistantId, + chat_id: chatId, + tool_call: snapshotToWireToolCall(e.toolCall), + } as WsFrame); + break; + case 'commands': + if (e.commands.length > 0) { + setTaskCommands(taskId, e.commands); + broker.publishFrame(sessionId, { + type: 'agent_commands', + task_id: taskId, + session_id: sessionId, + commands: e.commands, + } as WsFrame); + } + break; + } + }; + + const model = task.model ?? undefined; + const backend = getClaudeSdkBackend(chatId, agent, installPath); + const handle = await backend.ensureSession(sessionId, { + agent, + model: model ?? '', + chatId, + worktreePath, + worktreeId, + projectId: task.project_id, + }); + const result = await backend.prompt(handle, task.input, { + worktreePath, + model: model ?? '', + signal: ac.signal, + onEvent, + taskId, + modeId: task.mode_id ?? undefined, + }); + // Phase 3: keep the pooled (chat,agent) backend warm across the turn. + agentPool.touch(chatId, agent); + + const assistantContent = textChunks.join('').slice(0, 50_000); + const reasoningText = reasoningChunks.join('').slice(0, 200_000); + const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'claude SDK turn failed').slice(0, 500); + + await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText); + + await sql` + UPDATE messages + SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() + WHERE id = ${assistantId} + `; + broker.publishFrame(sessionId, { + type: 'message_complete', + message_id: assistantId, + chat_id: chatId, + } as WsFrame); + + if (stopping) { + await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`; + return; // worktree persists (no cleanup); backend stays warm + } + + // Diff the persistent worktree against its captured baseline and SUPERSEDE + // the session's prior pending row (latest-wins) — identical to opencode/ACP. + const diff = await diffWorktree(worktreePath, projectPath, { + signal: ac.signal, + baseRef: baseCommit ?? 'HEAD', + }); + if (diff) { + await sql` + DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending' + `; + await sql` + INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent) + VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent}) + `; + log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change (claude SDK)'); + } else { + log.info({ taskId }, 'dispatcher: no changes detected in session worktree (claude SDK)'); + } + + // NO worktree cleanup — persistent (Phase 3 reaps it). Backend stays warm. + + const [extCostRow] = await sql<{ total: number | null }[]>` + SELECT SUM(tokens_used)::int AS total + FROM messages + WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL + `; + const extCostTokens = extCostRow?.total ?? null; + + const finalState = result.ok ? 'completed' : 'failed'; + await sql` + UPDATE tasks + SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} + WHERE id = ${taskId} + `; + log.info({ taskId, agent, finalState }, 'dispatcher: task finished (claude SDK)'); + clearTaskCommands(taskId); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + log.error({ taskId, agent, err: errMsg }, 'dispatcher: claude SDK error'); + await sql` + UPDATE tasks + SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} + WHERE id = ${taskId} + `.catch(() => {}); + clearTaskCommands(taskId); + // No worktree cleanup (persistent); backend stays warm for the next turn. + } + } + // ─── Helpers ──────────────────────────────────────────────────────────────── async function waitForCompletion(assistantId: string): Promise { diff --git a/apps/coder/src/services/provider-registry.ts b/apps/coder/src/services/provider-registry.ts index 262f7ce..816a6a5 100644 --- a/apps/coder/src/services/provider-registry.ts +++ b/apps/coder/src/services/provider-registry.ts @@ -38,6 +38,12 @@ export const PROVIDERS: ProviderDef[] = [ }, { name: 'claude', + // transport stays 'pty' — the DEFAULT dispatch path (one-shot `claude + // --output-format stream-json`). claude-sdk-sessionstore #9 (Part 2) adds a warm + // Claude-Agent-SDK backend (services/backends/claude-sdk.ts) routed ONLY when the + // `CLAUDE_SDK_BACKEND` env flag is truthy AND the task is a chat tab; with the flag + // off (production default) claude always uses this PTY path, so the transport label + // is left unchanged. Flip the env var on a host (after a live smoke) to opt in. label: 'Claude Code', transport: 'pty', modelSource: 'static', diff --git a/apps/coder/vitest.config.ts b/apps/coder/vitest.config.ts index 5802658..14258ce 100644 --- a/apps/coder/vitest.config.ts +++ b/apps/coder/vitest.config.ts @@ -5,5 +5,11 @@ export default defineConfig({ environment: 'node', globals: false, include: ['src/**/__tests__/**/*.test.ts'], + // DB-integration suites (checkpoints, claude-session-store, reconnect, etc.) + // each apply the full schema in beforeAll against the one shared dev DB; running + // test files in parallel makes those concurrent DDL applies deadlock under + // DATABASE_URL. Serialize file execution — the suites are fast, so the cost is + // negligible and the default (no-DATABASE_URL) run is unaffected. + fileParallelism: false, }, }); diff --git a/openspec/changes/claude-sdk-sessionstore/proposal.md b/openspec/changes/claude-sdk-sessionstore/proposal.md new file mode 100644 index 0000000..a5b3d65 --- /dev/null +++ b/openspec/changes/claude-sdk-sessionstore/proposal.md @@ -0,0 +1,68 @@ +# Claude Agent SDK backend + clean-room PostgresSessionStore (#9) + +**Status:** in progress (started 2026-06-01) +**Source:** `boocode_code_review_v2.md` §1 #9, §5h/§5i (happy + SDK `.d.ts`). Decision §6.2: lean SDK. +**SDK:** `@anthropic-ai/claude-agent-sdk@0.3.159` (installed, Commercial Terms — runtime dep OK, code +reference-only; the store is **clean-room** from the real interface, not vendored). + +Replace BooCoder's one-shot PTY claude dispatch with a warm, resumable Claude-SDK backend. Two parts: +the clean-room session store (fully testable here) and the backend + wiring (live pump needs a host +smoke against real `claude`). + +## Ground-truth SDK API (from the installed `sdk.d.ts`) +- `query({ prompt: string | AsyncIterable, options?: Options }): Query` where + `Query extends AsyncGenerator`. +- `Options`: `sessionStore?: SessionStore`, `resume?: string`, `model?`, `cwd?`, + `pathToClaudeCodeExecutable?`, `canUseTool?`, `permissionMode?`, `env?`, `allowedTools?`. +- `SessionStore = { append(key, entries): Promise; load(key): Promise; + listSessions?(projectKey): Promise<{sessionId,mtime}[]>; delete?(key): Promise; + listSubkeys?({projectKey,sessionId}): Promise }`. +- `SessionKey = { projectKey: string; sessionId: string; subpath?: string }` (undefined subpath = main + transcript; empty string invalid — store maps undefined→'' internally). +- `SessionStoreEntry = { type: string; uuid?: string; timestamp?: string; [k]: unknown }` (opaque JSONL). +- Messages: `SDKSystemMessage{subtype:'init'}` carries `session_id` (+ model/tools); `SDKResultMessage` + (success/error) ends a turn with `result`, `usage`, `total_cost_usd`; `SDKPartialAssistantMessage` / + `SDKAssistantMessage` carry text/thinking/tool blocks. + +## Part 1 — Clean-room PostgresSessionStore (testable now) +- Schema (`apps/coder/src/schema.sql`): a generic append-only entry table + `claude_session_entries(id BIGSERIAL PK, project_key TEXT, session_id TEXT, subpath TEXT DEFAULT '', + entry JSONB, created_at TIMESTAMPTZ DEFAULT clock_timestamp())` + index `(project_key, session_id, + subpath, id)`. (The store is generic per the SDK's key; the chat↔session ownership lives in + `agent_sessions`, not here.) +- `apps/coder/src/services/backends/claude-session-store.ts`: `PostgresSessionStore` implementing the + real `SessionStore` type over `Sql`. `append` = ordered multi-INSERT (id = order); `load` = SELECT + ORDER BY id → array or null; `listSessions` = group main-transcript rows, mtime = max(created_at) ms; + `delete` = scoped delete (subpath given → that subpath; omitted → whole session); `listSubkeys` = + DISTINCT non-'' subpaths. Pure SQL, no SDK import needed beyond the `SessionStore` type. +- Tests `__tests__/claude-session-store.test.ts` (DB-opt-in, mirror `checkpoints.test.ts`): append→load + round-trip + order, null on unseen key, subpath isolation (main vs subagent), listSessions mtime, + delete scoping, listSubkeys. + +## Part 2 — ClaudeSdkBackend + wiring (live pump needs host smoke) +- `agent_sessions.backend` CHECK adds `'claude_sdk'`. +- `apps/coder/src/services/backends/claude-sdk.ts`: a `ClaudeSdkBackend` implementing `AgentBackend` + (mirror `warm-acp.ts`/`opencode-server.ts`). `ensureSession` resolves the resume id from + `agent_sessions(chat_id,'claude').agent_session_id`; `prompt` drives one persistent `query()` in + streaming-input mode (a pushable `AsyncIterable` fed per turn) with + `{ sessionStore, resume, model, cwd: worktreePath, pathToClaudeCodeExecutable: installPath }`, + reads the `AsyncGenerator` until `result`, captures `session_id` from the `init` message + and persists it to `agent_sessions`. A pure `mapSdkMessage(msg): AgentEvent[]` (unit-tested) maps + partial/assistant/tool/thinking → the existing `AgentEvent` union; `result.usage`/`total_cost_usd` + accumulate onto `agent_sessions` (like opencode U.6). `isBusy`/`closeSession`/crash mirror the ACP + backend. +- Routing: add `claude` to the warm path (`warm-acp-routing.ts` or a sibling `shouldUseClaudeSdk`), + with the existing PTY `runExternalAgent` kept as the **fallback** (session-less creators + if the SDK + backend fails to start). Provider registry: claude stays selectable; transport reflects the SDK path. +- Frames + persistence identical to the warm-ACP path (`persistExternalAgentTurn`, broker frames). + +## Verify +- Part 1: `pnpm -C apps/coder test` + DB-opt-in store tests against dev postgres; build clean. +- Part 2: `pnpm -C apps/coder build` + `npx tsc -p apps/coder/tsconfig.json --noEmit` (typechecks + against the REAL SDK types) + pure-mapper unit tests. **Live pump + resume across turns: host smoke + against real `claude` (auth required) — cannot run from the dev container.** + +## Open flags +- SDK peer-deps want `zod@^4`; workspace is `zod@3.25.76` (installed with a warning) — watch at runtime. +- `pathToClaudeCodeExecutable` from `available_agents.install_path`; the SDK spawns the same `claude` + binary the PTY path uses. ANTHROPIC auth/env must reach the child (host concern). diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6995347..e08718a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -51,6 +51,9 @@ importers: '@agentclientprotocol/sdk': specifier: ^0.22.1 version: 0.22.1(zod@3.25.76) + '@anthropic-ai/claude-agent-sdk': + specifier: ^0.3.159 + version: 0.3.159(@anthropic-ai/sdk@0.100.1(zod@3.25.76))(@modelcontextprotocol/sdk@1.29.0(zod@3.25.76))(zod@3.25.76) '@boocode/server': specifier: workspace:* version: link:../server @@ -317,6 +320,63 @@ packages: resolution: {integrity: sha512-UrcABB+4bUrFABwbluTIBErXwvbsU/V7TZWfmbgJfbkwiBuziS9gxdODUyuiecfdGQ85jglMW6juS3+z5TsKLw==} engines: {node: '>=10'} + '@anthropic-ai/claude-agent-sdk-darwin-arm64@0.3.159': + resolution: {integrity: sha512-3nnH4yUNJVSyaU5DBlGw2yxc4zlnVvAnc9UOe+La47QVG7/dN+rWAgn4zCbqKk9bWFLDQ1Ek0r56EZE1Qo4UKQ==} + cpu: [arm64] + os: [darwin] + + '@anthropic-ai/claude-agent-sdk-darwin-x64@0.3.159': + resolution: {integrity: sha512-iv+NRjz+t4Q1R2+kLdDbccSo3b0wedVJ9jMT3noznOVojZHzgkxVpTvt36/XkXV0rqIDQ5H18bBYIZZzOXu7mg==} + cpu: [x64] + os: [darwin] + + '@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.3.159': + resolution: {integrity: sha512-WvwiQBWt3tdu5EwqjpDZszI6p2uetYsw4Cxc6ptO/SmLIYXcDienP8nmirZdsZrS+Gzk6imgY0IY5mmNaRhelQ==} + cpu: [arm64] + os: [linux] + + '@anthropic-ai/claude-agent-sdk-linux-arm64@0.3.159': + resolution: {integrity: sha512-FlsS5M4GCpzsQVaNDFF8dRgFGR3QwyAHZFl/xM/2Y2BqVBH+NH17RpKQSJxr1qr41QnsNkinMnu2iSKoc33hKg==} + cpu: [arm64] + os: [linux] + + '@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.3.159': + resolution: {integrity: sha512-kFH6RC2YJbPc8XWRNy/wL4YU7LzdJjSwAdH488sVzIif3q+TrvVrV5y/IW0+MLmta+CKIqtFYpGaucsJYvj7Eg==} + cpu: [x64] + os: [linux] + + '@anthropic-ai/claude-agent-sdk-linux-x64@0.3.159': + resolution: {integrity: sha512-uNPEC/iRzVb4bEdzs0KAz1zV7i1PVGEZZnJTQyi1OtgVa81sAoH/H0CbbzDiTsquKdaESf+1DSSEkUlfZmMUEw==} + cpu: [x64] + os: [linux] + + '@anthropic-ai/claude-agent-sdk-win32-arm64@0.3.159': + resolution: {integrity: sha512-WN1QEZGgWXz9GMl61QU6j9E+LEF5plki87bL2xsGwuCPzK+OeVPQU55pabuP8P+vFBFHUo3Y9OlTVyZHnUzmAQ==} + cpu: [arm64] + os: [win32] + + '@anthropic-ai/claude-agent-sdk-win32-x64@0.3.159': + resolution: {integrity: sha512-Ty4seccD+dTDX5hhj89IUELZd/LkxO5O43Uiz5Mo8ZJktoX38SK4XMZlBS935QdqTFLRvPL0hvK4Lt4dTOqzPw==} + cpu: [x64] + os: [win32] + + '@anthropic-ai/claude-agent-sdk@0.3.159': + resolution: {integrity: sha512-Xh1oVMIK6N3KsiNIhqNH8ZK90zjRmAEL9d1Md8ZlGdHJE+HhdMYdBadujc3KEkV0uufsEUvYp+A3fDenfypGSA==} + engines: {node: '>=18.0.0'} + peerDependencies: + '@anthropic-ai/sdk': '>=0.93.0' + '@modelcontextprotocol/sdk': ^1.29.0 + zod: ^4.0.0 + + '@anthropic-ai/sdk@0.100.1': + resolution: {integrity: sha512-RANcEe7LpiLczkKGOwoXOTuFdPhuubS0i4xaAKOMpcqc55YO0mukgxppV7eygx3DXNjxWT6RYOLPyOy0aIAmwg==} + hasBin: true + peerDependencies: + zod: ^3.25.0 || ^4.0.0 + peerDependenciesMeta: + zod: + optional: true + '@babel/code-frame@7.29.0': resolution: {integrity: sha512-9NhCeYjq9+3uxgdtp20LSiJXJvN0FeCtNGpJxuMFZ1Kv3cWUNb6DOhJwUvcVCzKGR66cw4njwM6hrJLqgOwbcw==} engines: {node: '>=6.9.0'} @@ -446,6 +506,10 @@ packages: peerDependencies: '@babel/core': ^7.0.0-0 + '@babel/runtime@7.29.7': + resolution: {integrity: sha512-Nq8OhGWiZIZGV6hLHoyAKLLcJihP/xFeBMGJoUrxTX2psI8dCifzLhZISFb+VWS3wFMRDmCGw5R+dOySCqPLhw==} + engines: {node: '>=6.9.0'} + '@babel/template@7.28.6': resolution: {integrity: sha512-YA6Ma2KsCdGb+WC6UpBVFJGXL58MDA6oyONbjyF/+5sBgxY/dwkhLogbMT2GXXyU84/IhRw/2D1Os1B/giz+BQ==} engines: {node: '>=6.9.0'} @@ -1787,6 +1851,9 @@ packages: resolution: {integrity: sha512-tlqY9xq5ukxTUZBmoOp+m61cqwQD5pHJtFY3Mn8CA8ps6yghLH/Hw8UPdqg4OLmFW3IFlcXnQNmo/dh8HzXYIQ==} engines: {node: '>=18'} + '@stablelib/base64@1.0.1': + resolution: {integrity: sha512-1bnPQqSxSuc3Ii6MhBysoWCg58j97aUjuCSZrGSmDxNqtytIi0k8utUenAwTZN4V5mXXYGsVUI9zeBqy+jBOSQ==} + '@standard-schema/spec@1.1.0': resolution: {integrity: sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w==} @@ -2528,6 +2595,9 @@ packages: fast-querystring@1.1.2: resolution: {integrity: sha512-g6KuKWmFXc0fID8WWH0jit4g0AGBoJhCkJMb1RmbsSEUNvQ+ZC8D6CUZ+GtF8nMzSPXnhiePyyqqipzNNEnHjg==} + fast-sha256@1.3.0: + resolution: {integrity: sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==} + fast-string-truncated-width@3.0.3: resolution: {integrity: sha512-0jjjIEL6+0jag3l2XWWizO64/aZVtpiGE3t0Zgqxv0DPuxiMjvB3M24fCyhZUO4KomJQPj3LTSUnDP3GpdwC0g==} @@ -2873,6 +2943,10 @@ packages: json-schema-ref-resolver@1.0.1: resolution: {integrity: sha512-EJAj1pgHc1hxF6vo2Z3s69fMjO1INq6eGHXZ8Z6wCQeldCuwxGK9Sxf4/cScGn3FZubCVUehfWtcDM/PLteCQw==} + json-schema-to-ts@3.1.1: + resolution: {integrity: sha512-+DWg8jCJG2TEnpy7kOm/7/AxaYoaRbjVB4LFZLySZlWn8exGs3A4OLJR966cVvU26N7X9TWxl+Jsw7dzAqKT6g==} + engines: {node: '>=16'} + json-schema-traverse@1.0.0: resolution: {integrity: sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==} @@ -3756,6 +3830,9 @@ packages: stackback@0.0.2: resolution: {integrity: sha512-1XMJE5fQo1jGH6Y/7ebnwPOBEkIEnT4QF32d5R1+VXdXveM0IBMJt8zfaxX1P3QhVwrYe+576+jkANtSS2mBbw==} + standardwebhooks@1.0.0: + resolution: {integrity: sha512-BbHGOQK9olHPMvQNHWul6MYlrRTAOKn03rOe4A8O3CLWhNf4YHBqq2HJKKC+sfqpxiBY52pNeesD6jIiLDz8jg==} + statuses@2.0.1: resolution: {integrity: sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ==} engines: {node: '>= 0.8'} @@ -3899,6 +3976,9 @@ packages: trough@2.2.0: resolution: {integrity: sha512-tmMpK00BjZiUyVyvrBK7knerNgmgvcV/KLVyuma/SC+TQN167GrMRciANTz09+k3zW8L8t60jWO1GpfkZdjTaw==} + ts-algebra@2.0.0: + resolution: {integrity: sha512-FPAhNPFMrkwz76P7cdjdmiShwMynZYN6SgOujD1urY4oNm80Ou9oMdmbR45LotcKOXoy7wSmHkRFE6Mxbrhefw==} + ts-morph@26.0.0: resolution: {integrity: sha512-ztMO++owQnz8c/gIENcM9XfCEzgoGphTv+nKpYNM1bgsdOVC/jRZuEBf6N+mLLDNg68Kl+GgUZfOySaRiG1/Ug==} @@ -4194,6 +4274,52 @@ snapshots: '@alloc/quick-lru@5.2.0': {} + '@anthropic-ai/claude-agent-sdk-darwin-arm64@0.3.159': + optional: true + + '@anthropic-ai/claude-agent-sdk-darwin-x64@0.3.159': + optional: true + + '@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.3.159': + optional: true + + '@anthropic-ai/claude-agent-sdk-linux-arm64@0.3.159': + optional: true + + '@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.3.159': + optional: true + + '@anthropic-ai/claude-agent-sdk-linux-x64@0.3.159': + optional: true + + '@anthropic-ai/claude-agent-sdk-win32-arm64@0.3.159': + optional: true + + '@anthropic-ai/claude-agent-sdk-win32-x64@0.3.159': + optional: true + + '@anthropic-ai/claude-agent-sdk@0.3.159(@anthropic-ai/sdk@0.100.1(zod@3.25.76))(@modelcontextprotocol/sdk@1.29.0(zod@3.25.76))(zod@3.25.76)': + dependencies: + '@anthropic-ai/sdk': 0.100.1(zod@3.25.76) + '@modelcontextprotocol/sdk': 1.29.0(zod@3.25.76) + zod: 3.25.76 + optionalDependencies: + '@anthropic-ai/claude-agent-sdk-darwin-arm64': 0.3.159 + '@anthropic-ai/claude-agent-sdk-darwin-x64': 0.3.159 + '@anthropic-ai/claude-agent-sdk-linux-arm64': 0.3.159 + '@anthropic-ai/claude-agent-sdk-linux-arm64-musl': 0.3.159 + '@anthropic-ai/claude-agent-sdk-linux-x64': 0.3.159 + '@anthropic-ai/claude-agent-sdk-linux-x64-musl': 0.3.159 + '@anthropic-ai/claude-agent-sdk-win32-arm64': 0.3.159 + '@anthropic-ai/claude-agent-sdk-win32-x64': 0.3.159 + + '@anthropic-ai/sdk@0.100.1(zod@3.25.76)': + dependencies: + json-schema-to-ts: 3.1.1 + standardwebhooks: 1.0.0 + optionalDependencies: + zod: 3.25.76 + '@babel/code-frame@7.29.0': dependencies: '@babel/helper-validator-identifier': 7.28.5 @@ -4367,6 +4493,8 @@ snapshots: transitivePeerDependencies: - supports-color + '@babel/runtime@7.29.7': {} + '@babel/template@7.28.6': dependencies: '@babel/code-frame': 7.29.0 @@ -5618,6 +5746,8 @@ snapshots: '@sindresorhus/merge-streams@4.0.0': {} + '@stablelib/base64@1.0.1': {} + '@standard-schema/spec@1.1.0': {} '@tailwindcss/node@4.3.0': @@ -6386,6 +6516,8 @@ snapshots: dependencies: fast-decode-uri-component: 1.0.1 + fast-sha256@1.3.0: {} + fast-string-truncated-width@3.0.3: {} fast-string-width@3.0.2: @@ -6727,6 +6859,11 @@ snapshots: dependencies: fast-deep-equal: 3.1.3 + json-schema-to-ts@3.1.1: + dependencies: + '@babel/runtime': 7.29.7 + ts-algebra: 2.0.0 + json-schema-traverse@1.0.0: {} json-schema-typed@8.0.2: {} @@ -7939,6 +8076,11 @@ snapshots: stackback@0.0.2: {} + standardwebhooks@1.0.0: + dependencies: + '@stablelib/base64': 1.0.1 + fast-sha256: 1.3.0 + statuses@2.0.1: {} statuses@2.0.2: {} @@ -8061,6 +8203,8 @@ snapshots: trough@2.2.0: {} + ts-algebra@2.0.0: {} + ts-morph@26.0.0: dependencies: '@ts-morph/common': 0.27.0