/** * claude-sdk-sessionstore #9 (Part 2) — PURE Claude-SDK message → AgentEvent mapper. * * `ClaudeSdkBackend` drives one `query()` per (chat, agent) session and feeds each * `SDKMessage` it yields through this function, forwarding the returned * `AgentEvent[]` to the dispatcher's `onEvent` (which maps them to WS frames + * persists). Kept PURE (one message + a caller-owned accumulator → events) so it's * unit-testable without a live `claude` binary — the whole point of Part 2's * typecheck-and-unit-test gate (the live pump needs a host smoke). * * SDK shapes (verified against @anthropic-ai/claude-agent-sdk@0.3.159 sdk.d.ts + * @anthropic-ai/sdk beta messages d.ts): * - `SDKPartialAssistantMessage` (`type:'stream_event'`) carries a * `BetaRawMessageStreamEvent` — the LIVE delta stream (only emitted when * `options.includePartialMessages` is set, which the backend sets). We map: * · content_block_delta + text_delta → { text } * · content_block_delta + thinking_delta → { reasoning } * · content_block_start + tool_use block → { tool_call } (in_progress) * · content_block_delta + input_json_delta → buffered into the tool's args * (no event; the assembled input rides the terminal tool_update) * - `SDKAssistantMessage` (`type:'assistant'`) carries the FINAL `message.content` * blocks. Text/thinking there are post-hoc repeats of what the partials already * streamed, so we DROP them (dedup) and only emit a terminal `tool_update` * (status completed) per `tool_use` block, with its now-complete `input`. * - All other `SDKMessage` variants (system/init, status, result, hooks, task * notifications, …) carry no renderable turn content → return []. * * Tool assembly spans messages: a tool_use block opens in a partial * `content_block_start`, its args stream as `input_json_delta` frames keyed by the * block `index`, and the final assistant message restates the complete block. The * caller owns a `ClaudeSdkMapState` (snapshot map + per-index tool tracking) that * threads this across calls, mirroring the `Map` the other * backends pass into `mapSessionUpdate`. The result frames carry the SAME * `AcpToolSnapshot` shape, so `persistExternalAgentTurn` / `snapshotToWireToolCall` * are reused unchanged. */ import type { SDKMessage } from '@anthropic-ai/claude-agent-sdk'; import type { AgentEvent } from '../agent-backend.js'; import type { AcpToolSnapshot } from '../acp-tool-snapshot.js'; /** * The underlying `@anthropic-ai/sdk` Beta message types (`BetaRawMessageStreamEvent`, * `BetaContentBlock`) are a TRANSITIVE dep of `@anthropic-ai/claude-agent-sdk` — not * a direct dependency of apps/coder — so a `@anthropic-ai/sdk/...` import does NOT * resolve here under pnpm's strict node_modules. We instead DERIVE both shapes from * the SDK's own exported message types, which is also more correct (it tracks the * exact `event` / `content` shapes the SDK yields, not a hand-picked import path). */ type StreamEvent = Extract['event']; type AssistantContent = Extract['message']['content']; type ContentBlock = AssistantContent extends readonly (infer B)[] ? B : never; type UserContent = Extract['message']['content']; /** * Caller-owned accumulator threaded across `mapSdkMessage` calls within ONE turn. * The backend creates a fresh one per turn and clears it at turn end. */ export interface ClaudeSdkMapState { /** Stable tool-call snapshots by tool_use id, merged across start/delta/stop. */ snapshots: Map; /** * Partial-stream block index → in-flight tool assembly. Anthropic's stream keys * blocks by a numeric `index`; tool_use args arrive as `input_json_delta`s under * that index with no id, so we map index→id to route them and buffer the raw * JSON fragments until the block closes (or the final assistant message lands). */ toolByIndex: Map; } /** Construct a fresh per-turn accumulator. */ export function createClaudeSdkMapState(): ClaudeSdkMapState { return { snapshots: new Map(), toolByIndex: new Map() }; } /** * Map one `SDKMessage` → zero or more `AgentEvent`s, mutating `state` for * cross-message tool assembly + dedup. Pure w.r.t. its inputs otherwise. */ export function mapSdkMessage(msg: SDKMessage, state: ClaudeSdkMapState): AgentEvent[] { switch (msg.type) { case 'stream_event': return mapStreamEvent(msg.event, state); case 'assistant': return mapFinalAssistant(msg.message.content, state); case 'user': // Tool RESULTS ride in as user messages (tool_result blocks): the SDK ran // the tool and feeds its output back. Without mapping these, the tool_call // never reaches a terminal snapshot — it persists as status:'running' with // no output and the UI spinner never stops (the bug this fixes). return mapUserToolResults(msg.message.content, state); default: // system/init, status, result, hooks, task_*, etc. — no turn content here. // (The backend reads session_id off the init message and usage/cost off the // result message directly; neither produces a renderable AgentEvent.) return []; } } /** Live partial-stream delta → AgentEvent(s). */ function mapStreamEvent(event: StreamEvent, state: ClaudeSdkMapState): AgentEvent[] { switch (event.type) { case 'content_block_start': { const block = event.content_block; if (block.type === 'tool_use') { const snap: AcpToolSnapshot = { toolCallId: block.id, title: block.name, kind: null, status: 'in_progress', rawInput: block.input ?? undefined, rawOutput: undefined, }; state.snapshots.set(block.id, snap); state.toolByIndex.set(event.index, { id: block.id, name: block.name, jsonBuf: '' }); return [{ type: 'tool_call', toolCall: snap }]; } return []; } case 'content_block_delta': { const delta = event.delta; if (delta.type === 'text_delta') { return delta.text ? [{ type: 'text', text: delta.text }] : []; } if (delta.type === 'thinking_delta') { return delta.thinking ? [{ type: 'reasoning', text: delta.thinking }] : []; } if (delta.type === 'input_json_delta') { // Buffer the tool's streamed args under its block index; no event yet — // the assembled input rides the terminal tool_update (or the final block). const t = state.toolByIndex.get(event.index); if (t) t.jsonBuf += delta.partial_json ?? ''; return []; } // signature_delta / citations_delta / compaction_delta — nothing to render. return []; } case 'content_block_stop': { // Close out a streamed tool block: parse its buffered JSON args and emit a // tool_update carrying the assembled input. The final assistant message will // restate the same block, but its snapshot is dedup-merged (same id) so this // is harmless — we emit here so a tool's input renders even if the assistant // message is delayed/dropped. const t = state.toolByIndex.get(event.index); if (!t) return []; state.toolByIndex.delete(event.index); const prev = state.snapshots.get(t.id); const snap: AcpToolSnapshot = { toolCallId: t.id, title: prev?.title ?? t.name, kind: null, status: 'in_progress', rawInput: parseJsonOr(t.jsonBuf, prev?.rawInput), rawOutput: undefined, }; state.snapshots.set(t.id, snap); return [{ type: 'tool_update', toolCall: snap }]; } default: // message_start / message_delta / message_stop — turn framing, no content. return []; } } /** * Final assistant message content blocks. Text/thinking are post-hoc repeats of * the partial stream → dropped (dedup). Only tool_use blocks emit a terminal * tool_update carrying the complete `input`. */ function mapFinalAssistant(content: ContentBlock[], state: ClaudeSdkMapState): AgentEvent[] { const out: AgentEvent[] = []; for (const block of content) { if (block.type === 'tool_use') { const prev = state.snapshots.get(block.id); const snap: AcpToolSnapshot = { toolCallId: block.id, title: prev?.title ?? block.name, kind: null, status: 'completed', rawInput: block.input ?? prev?.rawInput, rawOutput: undefined, }; state.snapshots.set(block.id, snap); out.push({ type: 'tool_update', toolCall: snap }); } // text / thinking / redacted_thinking blocks: already streamed via partials. } return out; } /** * User-message tool_result blocks → terminal tool_update events. The SDK runs * each tool and feeds the output back in a `user` message; we mark the matching * snapshot completed (or failed, on is_error) WITH its output so the snapshot * persists/renders as resolved instead of spinning. Unknown ids (no prior * snapshot) are still surfaced so a stray result isn't silently lost. */ function mapUserToolResults(content: UserContent, state: ClaudeSdkMapState): AgentEvent[] { if (!Array.isArray(content)) return []; const out: AgentEvent[] = []; for (const raw of content) { const block = raw as { type?: string; tool_use_id?: string; content?: unknown; is_error?: boolean }; if (block.type !== 'tool_result' || !block.tool_use_id) continue; const prev = state.snapshots.get(block.tool_use_id); const snap: AcpToolSnapshot = { toolCallId: block.tool_use_id, title: prev?.title ?? block.tool_use_id, kind: prev?.kind ?? null, status: block.is_error ? 'failed' : 'completed', rawInput: prev?.rawInput, rawOutput: toolResultText(block.content), }; state.snapshots.set(block.tool_use_id, snap); out.push({ type: 'tool_update', toolCall: snap }); } return out; } /** tool_result content is a string OR an array of content blocks (text/image). * Flatten text blocks; fall back to the raw value so nothing is lost. */ function toolResultText(content: unknown): unknown { if (typeof content === 'string') return content; if (Array.isArray(content)) { const text = content .map((c) => c && typeof c === 'object' && (c as { type?: string }).type === 'text' ? String((c as { text?: unknown }).text ?? '') : '', ) .filter(Boolean) .join('\n'); return text || content; } return content ?? ''; } /** Parse a buffered JSON string; fall back to a prior value on empty/invalid. */ function parseJsonOr(buf: string, fallback: unknown): unknown { const s = buf.trim(); if (!s) return fallback; try { return JSON.parse(s); } catch { return fallback; } }