/** * claude-sdk-sessionstore #9 (Part 2) — ClaudeSdkBackend. * * A warm, resumable backend for the `claude` agent built on the Claude Agent SDK * (`@anthropic-ai/claude-agent-sdk`), implementing the Phase-0 `AgentBackend` * contract (same shape as `WarmAcpBackend` / `OpenCodeServerBackend`). One * persistent `query()` per (chat, agent) session, driven in STREAMING-INPUT mode: * the `prompt` is a pushable `AsyncIterable` that stays open across * turns, so the SDK subprocess + conversation stay warm between `prompt()` calls * until `closeSession`/`dispose`. * * ⚠ LIVE PUMP IS HOST-ONLY. The actual streaming turn needs the real `claude` * binary + ANTHROPIC auth on a host — it CANNOT run in the dev container. This file * is written against the REAL SDK types so it TYPECHECKS, and the PURE pieces (the * `mapSdkMessage` mapper + the `createPushable` queue) are unit-tested. Routing to * this backend is gated behind `CLAUDE_SDK_BACKEND` (default OFF) so production * claude stays on the working PTY path until a host smoke validates the pump + * cross-turn resume. * * Lifecycle (mirrors warm-acp.ts / opencode-server.ts): * - `ensureSession`: resolve the resume id from `agent_sessions(chat_id,'claude')` * and (re)build the single `query()` if not already live. The SDK's own * `sessionStore` (Part 1 PostgresSessionStore) materializes the transcript on * resume; `options.resume` carries the provider session id. * - `prompt`: push ONE user message onto the open queue, iterate the generator, * map each `SDKMessage` → `AgentEvent`s via `mapSdkMessage`, forward to * `ctx.onEvent`, and resolve when the turn's `result` message lands. Capture the * `session_id` from the `init` message and persist it to `agent_sessions`; * accumulate `result.usage` / `total_cost_usd` onto the row (mirrors opencode U.6). * - `closeSession` / `dispose`: close the queue + dispose the query generator. * - A thrown error or `result.subtype==='error*'` marks `agent_sessions.status='crashed'`. * * Turn serialization: like warm-acp, exactly one turn is in flight at a time on a * given backend (the dispatcher's per-session `inflight` map enforces this upstream; * `isBusy()` reports it so the pool never evicts mid-turn). */ import { query, type Query, type SDKMessage, type SDKUserMessage, type Options } from '@anthropic-ai/claude-agent-sdk'; import type { FastifyBaseLogger } from 'fastify'; import type { Sql } from '../../db.js'; import { PostgresSessionStore } from './claude-session-store.js'; import { createPushable, type Pushable } from './pushable-iterable.js'; import { mapSdkMessage, createClaudeSdkMapState, type ClaudeSdkMapState } from './claude-sdk-map.js'; import type { AgentBackend, AgentSessionHandle, EnsureSessionOpts, PromptCtx, TurnResult, } from '../agent-backend.js'; export interface ClaudeSdkBackendDeps { sql: Sql; log: FastifyBaseLogger; /** The (chat, agent) this backend serves — its pool identity + DB key. */ chatId: string; /** Always 'claude' today; kept explicit so the pool key + DB writes stay honest. */ agent: string; /** Resolved `claude` binary path (available_agents.install_path); null → SDK default. */ installPath: string | null; } export class ClaudeSdkBackend implements AgentBackend { readonly backend = 'claude_sdk' as const; private readonly sql: Sql; private readonly log: FastifyBaseLogger; private readonly chatId: string; private readonly agent: string; private readonly installPath: string | null; private readonly sessionStore: PostgresSessionStore; /** The single persistent query() generator; null until the first turn builds it. */ private query: Query | null = null; /** The open input queue feeding the generator one SDKUserMessage per turn. */ private input: Pushable | null = null; /** The provider's own session id (resume token), captured from the init message. */ private agentSessionId: string | null = null; /** Resolved model the live query() was built with; a change forces a rebuild. */ private builtModel: string | null = null; /** True between prompt() start and settle. */ private busy = false; private up = false; constructor(deps: ClaudeSdkBackendDeps) { this.sql = deps.sql; this.log = deps.log; this.chatId = deps.chatId; this.agent = deps.agent; this.installPath = deps.installPath; this.sessionStore = new PostgresSessionStore(deps.sql); } /** §2: liveness for the health endpoint + dispatcher fallback decision. */ health(): 'up' | 'down' { return this.up ? 'up' : 'down'; } /** Phase 3: busy iff a turn is in flight (pool never evicts a busy backend). */ isBusy(): boolean { return this.busy; } // ─── ensureSession: resolve resume id + (re)build the warm query ────────────── async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise { // Resolve the resume token from the (chat_id, agent) row. A crashed row is not // resumed (the SDK would fail to load a dead session); we create fresh. const [row] = await this.sql<{ agent_session_id: string | null; status: string }[]>` SELECT agent_session_id, status FROM agent_sessions WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent} `; const resumeId = row && row.status !== 'crashed' ? row.agent_session_id : null; // (Re)build the warm query if there is none, or the model changed (the SDK can // change model mid-session via setModel, but a fresh build is simplest + matches // opencode's config-drift → fresh-session rule). The query stays alive across // turns; only closeSession/dispose tears it down. if (!this.query || this.builtModel !== opts.model) { await this.teardownQuery(); this.buildQuery(opts.worktreePath, opts.model, resumeId); } // Seed the in-memory resume id from the DB so a handle built before the first // turn's init message still carries the last-known token. The init message // overwrites it with the authoritative current id during the turn. if (this.agentSessionId == null) this.agentSessionId = resumeId; // Upsert the agent_sessions row (backend='claude_sdk'). agent_session_id may be // null until the first turn captures it from the init message; prompt() updates it. await this.sql` INSERT INTO agent_sessions (chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at) VALUES (${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'claude_sdk', ${this.agentSessionId}, NULL, 'active', clock_timestamp()) ON CONFLICT (chat_id, agent) DO UPDATE SET session_id = EXCLUDED.session_id, worktree_id = EXCLUDED.worktree_id, backend = 'claude_sdk', agent_session_id = COALESCE(EXCLUDED.agent_session_id, agent_sessions.agent_session_id), server_port = NULL, status = 'active', last_active_at = clock_timestamp() `.catch((err) => { this.log.warn({ err: errMsg(err), chatId: opts.chatId, agent: opts.agent }, 'claude-sdk: agent_sessions upsert failed (non-fatal)'); }); return { sessionId, agent: opts.agent, backend: 'claude_sdk', chatId: opts.chatId, worktreeId: opts.worktreeId, agentSessionId: this.agentSessionId, serverPort: null, }; } /** Build the persistent query() in streaming-input mode. Lazy — no subprocess * work happens until the generator is first iterated in prompt(). */ private buildQuery(worktreePath: string, model: string, resumeId: string | null): void { const input = createPushable(); const options: Options = { sessionStore: this.sessionStore, cwd: worktreePath, // Stream partial assistant messages so text/thinking/tool deltas arrive live // (the mapper reads them; without this only terminal messages land). includePartialMessages: true, ...(model ? { model } : {}), ...(resumeId ? { resume: resumeId } : {}), ...(this.installPath ? { pathToClaudeCodeExecutable: this.installPath } : {}), // ANTHROPIC auth/env must reach the child; inherit the process env (host concern). env: process.env as Record, }; this.input = input; this.query = query({ prompt: input.iterable, options }); this.builtModel = model; this.up = true; this.log.info({ chatId: this.chatId, agent: this.agent, model, resume: resumeId ?? null }, 'claude-sdk: warm query built'); } // ─── prompt: push one user message + drain the generator until result ───────── async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise { if (!this.query || !this.input) { // ensureSession should have built it; rebuild defensively (e.g. evicted/raced). this.buildQuery(ctx.worktreePath, ctx.model, handle.agentSessionId); } const gen = this.query!; const queue = this.input!; if (ctx.signal.aborted) return { ok: false, error: 'aborted' }; this.busy = true; const state: ClaudeSdkMapState = createClaudeSdkMapState(); // Per-turn abort: interrupt the in-flight query on the SAME generator (never // tear down the warm query — that's the pool's lifetime). The generator then // emits its terminal result and the drain loop exits. let aborted = false; const onAbort = () => { if (aborted) return; aborted = true; void gen.interrupt().catch(() => {}); }; ctx.signal.addEventListener('abort', onAbort, { once: true }); // Push the turn's user message onto the open queue. session_id is optional on // the wire; the SDK manages it via resume + the init message. const userMsg: SDKUserMessage = { type: 'user', message: { role: 'user', content: input }, parent_tool_use_id: null, ...(handle.agentSessionId ? { session_id: handle.agentSessionId } : {}), }; queue.push(userMsg); try { for await (const msg of gen) { // Capture the provider session id from the init message (authoritative). if (msg.type === 'system' && msg.subtype === 'init' && msg.session_id) { if (this.agentSessionId !== msg.session_id) { this.agentSessionId = msg.session_id; await this.persistAgentSessionId(msg.session_id); } } // The result message ends THIS turn (it does not close the generator — // streaming-input keeps it alive for the next pushed message). if (msg.type === 'result') { await this.accumulateUsage(msg); const ok = msg.subtype === 'success' && !aborted; if (!ok) { // error_during_execution / error_max_turns / aborted → crashed row. await this.markCrashed(); } else { await this.markIdle(); } if (aborted) return { ok: false, error: 'aborted' }; return ok ? { ok: true } : { ok: false, error: resultErrorMessage(msg) }; } // Map renderable content → AgentEvents for the dispatcher's onEvent. for (const ev of mapSdkMessage(msg, state)) { ctx.onEvent(ev); } } // Generator ended without a result message (e.g. it was disposed) — treat as // a non-fatal incomplete turn so the dispatcher still finalizes the row. if (aborted) return { ok: false, error: 'aborted' }; return { ok: false, error: 'claude-sdk: query ended before result' }; } catch (err) { if (aborted) return { ok: false, error: 'aborted' }; await this.markCrashed(); return { ok: false, error: errMsg(err) }; } finally { ctx.signal.removeEventListener('abort', onAbort); this.busy = false; } } // ─── persistence helpers ────────────────────────────────────────────────────── private async persistAgentSessionId(id: string): Promise { await this.sql` UPDATE agent_sessions SET agent_session_id = ${id}, last_active_at = clock_timestamp() WHERE chat_id = ${this.chatId} AND agent = ${this.agent} `.catch((err) => { this.log.warn({ err: errMsg(err), chatId: this.chatId }, 'claude-sdk: failed to persist agent_session_id (non-fatal)'); }); } /** * Accumulate the turn's usage/cost onto the (chat_id, agent) row — mirrors the * opencode U.6 running-total pattern. The SDK reports usage once per turn on the * result message (not per step), so this fires once per prompt(). Cache read/write * input tokens fold into `input_tokens`; usage telemetry never fails a turn. */ private async accumulateUsage(result: Extract): Promise { const u = result.usage; const input = num(u?.input_tokens) + num(u?.cache_read_input_tokens) + num(u?.cache_creation_input_tokens); const output = num(u?.output_tokens); const cost = numF(result.total_cost_usd); if (input === 0 && output === 0 && cost === 0) return; await this.sql` UPDATE agent_sessions SET input_tokens = input_tokens + ${input}, output_tokens = output_tokens + ${output}, cost = cost + ${cost} WHERE chat_id = ${this.chatId} AND agent = ${this.agent} `.catch((err) => { this.log.warn({ err: errMsg(err), chatId: this.chatId }, 'claude-sdk: failed to persist usage (non-fatal)'); }); } private async markIdle(): Promise { await this.sql` UPDATE agent_sessions SET status = 'idle', last_active_at = clock_timestamp() WHERE chat_id = ${this.chatId} AND agent = ${this.agent} `.catch(() => {}); } private async markCrashed(): Promise { await this.sql` UPDATE agent_sessions SET status = 'crashed' WHERE chat_id = ${this.chatId} AND agent = ${this.agent} `.catch(() => {}); } // ─── teardown ──────────────────────────────────────────────────────────────── async closeSession(handle: AgentSessionHandle): Promise { await this.teardownQuery(); await this.sql` UPDATE agent_sessions SET status = 'closed' WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent} `.catch(() => {}); } async dispose(): Promise { await this.teardownQuery(); } /** Close the input queue + dispose the generator. Idempotent. */ private async teardownQuery(): Promise { this.up = false; this.busy = false; const q = this.query; const queue = this.input; this.query = null; this.input = null; this.builtModel = null; queue?.close(); if (q) { // return() ends the AsyncGenerator and lets the SDK clean up its subprocess. await q.return(undefined).catch(() => {}); } } } // ─── helpers ────────────────────────────────────────────────────────────────── /** Coerce to a non-negative finite integer (tokens). */ function num(v: unknown): number { const x = typeof v === 'number' ? v : Number(v); return Number.isFinite(x) && x > 0 ? Math.round(x) : 0; } /** Coerce to a non-negative finite float (cost USD). */ function numF(v: unknown): number { const x = typeof v === 'number' ? v : Number(v); return Number.isFinite(x) && x > 0 ? x : 0; } /** Build a human-readable error from an SDK error-result message. */ function resultErrorMessage(result: Extract): string { if (result.subtype === 'success') return 'ok'; const errs = (result as { errors?: string[] }).errors; if (Array.isArray(errs) && errs.length > 0) return `${result.subtype}: ${errs.join('; ')}`; return result.subtype; } function errMsg(e: unknown): string { return e instanceof Error ? e.message : String(e); }