/** * Claude-Code-compatible stream-json NDJSON parser (feature #7, * openspec `sampling-streamjson-tokens`). * * qwen (`--output-format stream-json`) and claude (`--output-format stream-json`) * both emit Claude-Code's stream-json NDJSON on stdout: one JSON object per line. * This module turns that stream into the same transport-agnostic `AgentEvent`s the * ACP / opencode-server backends emit, so the PTY dispatch path can publish live * broker frames + persist structured parts instead of slicing stdout opaque. * * Two surfaces: * - `parseStreamJsonLine(line, state)` — PURE per-line mapping (unit-testable). * `state` is the caller-owned accumulator (open tool blocks + usage/session_id). * - `makeStreamJsonParser()` — a thin stateful wrapper holding the state, with a * `push(line)` that returns the events for that line and getters for the final * `usage` / `sessionId`. * * Defensive by contract: a non-JSON / partial / garbage line yields `[]` and never * throws. Tool args (`input_json_delta`) arrive fragmented across many lines; we * accumulate the partial JSON string per content-block index and only surface the * parsed `rawInput` once the block stops (or, as a fallback, off the terminal * `assistant` message which carries the fully-assembled `tool_use` blocks). * * Schema (keyed on top-level `type`): * - `system` — init: { session_id, tools, ... } * - `assistant` — { message: { content: [ {type:'text'|'thinking'|'tool_use', ...} ], usage? } } * - `user` — tool results (ignored — diffing the worktree captures effects) * - `result` — final: { usage: { input_tokens, output_tokens }, session_id? } * - `stream_event` — { event: { type, index?, content_block?, delta?, usage? } } * event.type: * content_block_start — { index, content_block: {type, id?, name?} } * content_block_delta — { index, delta: {type, text?|thinking?|partial_json?} } * content_block_stop — { index } * message_delta — { usage: { output_tokens } } * message_start — { message: { usage } } */ import type { AgentEvent } from './agent-backend.js'; import type { AcpToolSnapshot } from './acp-tool-snapshot.js'; /** Convenience alias for the per-line return value. */ export type AgentEventList = AgentEvent[]; export interface StreamJsonUsage { inputTokens?: number; outputTokens?: number; } /** Per-open-content-block accumulation for tool args assembled across deltas. */ interface OpenToolBlock { toolCallId: string; name: string; /** Concatenated `input_json_delta.partial_json` fragments. */ partialJson: string; } export interface StreamJsonState { /** content-block index → open tool block (only `tool_use` blocks are tracked). */ toolBlocks: Map; sessionId: string | null; usage: StreamJsonUsage; } export function makeStreamJsonState(): StreamJsonState { return { toolBlocks: new Map(), sessionId: null, usage: {} }; } function asRecord(value: unknown): Record | null { if (value && typeof value === 'object' && !Array.isArray(value)) { return value as Record; } return null; } function asString(value: unknown): string | undefined { return typeof value === 'string' ? value : undefined; } function asNumber(value: unknown): number | undefined { return typeof value === 'number' && Number.isFinite(value) ? value : undefined; } /** Pull token counts out of an Anthropic-shape `usage` object, mutating state. */ function captureUsage(usage: Record | null, state: StreamJsonState): void { if (!usage) return; const input = asNumber(usage.input_tokens); const output = asNumber(usage.output_tokens); if (input !== undefined) state.usage.inputTokens = input; // output_tokens is reported incrementally on message_delta; keep the latest. if (output !== undefined) state.usage.outputTokens = output; } /** Parse the accumulated tool-arg JSON; tolerate an unparseable/partial body. */ function parseToolInput(partialJson: string): unknown { const trimmed = partialJson.trim(); if (!trimmed) return {}; try { return JSON.parse(trimmed); } catch { return { _raw: partialJson }; } } function toolSnapshot(block: OpenToolBlock, rawInput: unknown, status: AcpToolSnapshot['status']): AcpToolSnapshot { return { toolCallId: block.toolCallId, title: block.name, kind: null, status, rawInput, }; } /** * Map one stream-event sub-object (the `event` field of a `stream_event` line) to * AgentEvents, mutating `state` for open tool blocks + usage. */ function handleStreamEvent(event: Record, state: StreamJsonState): AgentEvent[] { const eventType = asString(event.type); if (!eventType) return []; switch (eventType) { case 'content_block_start': { const index = asNumber(event.index); const block = asRecord(event.content_block); if (index === undefined || !block) return []; if (asString(block.type) !== 'tool_use') return []; const toolCallId = asString(block.id) ?? `tool_${index}`; const name = asString(block.name) ?? 'tool'; const open: OpenToolBlock = { toolCallId, name, partialJson: '' }; state.toolBlocks.set(index, open); // Surface the tool start immediately (running, no args yet) so the UI shows // the call before the args finish streaming. return [{ type: 'tool_call', toolCall: toolSnapshot(open, {}, 'in_progress') }]; } case 'content_block_delta': { const index = asNumber(event.index); const delta = asRecord(event.delta); if (delta === null) return []; const deltaType = asString(delta.type); if (deltaType === 'text_delta') { const text = asString(delta.text); return text ? [{ type: 'text', text }] : []; } if (deltaType === 'thinking_delta') { const text = asString(delta.thinking); return text ? [{ type: 'reasoning', text }] : []; } if (deltaType === 'input_json_delta') { // Accumulate tool args; no event until the block stops. const fragment = asString(delta.partial_json); if (index !== undefined && fragment) { const open = state.toolBlocks.get(index); if (open) open.partialJson += fragment; } return []; } return []; } case 'content_block_stop': { const index = asNumber(event.index); if (index === undefined) return []; const open = state.toolBlocks.get(index); if (!open) return []; state.toolBlocks.delete(index); const rawInput = parseToolInput(open.partialJson); return [{ type: 'tool_update', toolCall: toolSnapshot(open, rawInput, 'completed') }]; } case 'message_start': { const message = asRecord(event.message); captureUsage(asRecord(message?.usage), state); return []; } case 'message_delta': { captureUsage(asRecord(event.usage), state); return []; } default: return []; } } /** * Map the terminal `assistant` message (post-hoc full message) to AgentEvents. Used * as a fallback for transports that emit only the assembled `assistant` line and no * incremental `stream_event`s. When stream_events already streamed a block, the * caller dedups by toolCallId, so re-emitting the assembled tool_use is harmless. */ function handleAssistantMessage(message: Record, state: StreamJsonState): AgentEvent[] { captureUsage(asRecord(message.usage), state); const content = message.content; if (!Array.isArray(content)) return []; const out: AgentEvent[] = []; let toolIdx = 0; for (const rawBlock of content) { const block = asRecord(rawBlock); if (!block) continue; const blockType = asString(block.type); if (blockType === 'text') { const text = asString(block.text); if (text) out.push({ type: 'text', text }); } else if (blockType === 'thinking') { const text = asString(block.thinking); if (text) out.push({ type: 'reasoning', text }); } else if (blockType === 'tool_use') { const toolCallId = asString(block.id) ?? `tool_${toolIdx}`; const name = asString(block.name) ?? 'tool'; const rawInput = 'input' in block ? block.input : {}; out.push({ type: 'tool_update', toolCall: { toolCallId, title: name, kind: null, status: 'completed', rawInput }, }); } toolIdx++; } return out; } /** * Pure per-line mapping. `line` is a single complete NDJSON line (no trailing * newline required; surrounding whitespace tolerated). Returns the AgentEvents the * line produces and mutates `state` (open tool blocks, usage, session_id). A blank, * non-JSON, or unrecognized line yields `[]` and never throws. */ export function parseStreamJsonLine(line: string, state: StreamJsonState): AgentEvent[] { const trimmed = line.trim(); if (!trimmed) return []; let obj: Record | null; try { const parsed: unknown = JSON.parse(trimmed); obj = asRecord(parsed); } catch { return []; } if (!obj) return []; const type = asString(obj.type); switch (type) { case 'system': { const sid = asString(obj.session_id); if (sid) state.sessionId = sid; return []; } case 'stream_event': { const event = asRecord(obj.event); return event ? handleStreamEvent(event, state) : []; } case 'assistant': { const sid = asString(obj.session_id); if (sid) state.sessionId = sid; const message = asRecord(obj.message); return message ? handleAssistantMessage(message, state) : []; } case 'result': { const sid = asString(obj.session_id); if (sid) state.sessionId = sid; captureUsage(asRecord(obj.usage), state); return []; } default: // `user` (tool results) and any unknown line type — ignore. return []; } } export interface StreamJsonParser { /** Feed one complete NDJSON line; returns its AgentEvents (never throws). */ push(line: string): AgentEvent[]; /** Final usage (input/output tokens) accumulated so far. */ usage(): StreamJsonUsage; /** Provider session id from the init `system` line / `result`, if seen. */ sessionId(): string | null; } /** * Stateful wrapper around `parseStreamJsonLine`. Holds per-tool-block accumulation * + usage/session_id across the turn. Line-buffering (splitting stdout on `\n` and * holding the partial tail) is the caller's job — see `pty-dispatch.ts`. */ export function makeStreamJsonParser(): StreamJsonParser { const state = makeStreamJsonState(); return { push: (line: string) => parseStreamJsonLine(line, state), usage: () => ({ ...state.usage }), sessionId: () => state.sessionId, }; }