feat: sampling knobs + live PTY stream-json + token UI (v2.7.3)

Three small wins from boocode_code_review_v2 §1 #11/#7/#8.

#11 sampling knobs: top_n_sigma + dry_* family as first-class Agent fields,
threaded into the request body via providerOptions.openaiCompatible. Fixes a
latent bug — top_k (rejected by the AI-SDK provider) and min_p (never passed to
streamText) were dead on the wire; both now route through the same channel.
--reasoning-budget documented in data/AGENTS.md.

#7 live PTY stream-json: new stream-json-parser.ts line-buffers qwen/claude
NDJSON and emits text/reasoning/tool frames live + persists, with a fallback to
the old opaque slice. claude gets --output-format stream-json --verbose.

#8 token UI: agent_sessions input/output_tokens/cost now flow through the route
+ type and render beside the AgentComposerBar session chip.

Built by 3 parallel agents. Server 523 + coder 245 tests passing; builds + web
tsc clean. Builds on v2.7.2. openspec sampling-streamjson-tokens.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-01 12:47:17 +00:00
parent 5651f56039
commit a584dd16b0
15 changed files with 945 additions and 22 deletions

View File

@@ -0,0 +1,296 @@
/**
* Claude-Code-compatible stream-json NDJSON parser (feature #7,
* openspec `sampling-streamjson-tokens`).
*
* qwen (`--output-format stream-json`) and claude (`--output-format stream-json`)
* both emit Claude-Code's stream-json NDJSON on stdout: one JSON object per line.
* This module turns that stream into the same transport-agnostic `AgentEvent`s the
* ACP / opencode-server backends emit, so the PTY dispatch path can publish live
* broker frames + persist structured parts instead of slicing stdout opaque.
*
* Two surfaces:
* - `parseStreamJsonLine(line, state)` — PURE per-line mapping (unit-testable).
* `state` is the caller-owned accumulator (open tool blocks + usage/session_id).
* - `makeStreamJsonParser()` — a thin stateful wrapper holding the state, with a
* `push(line)` that returns the events for that line and getters for the final
* `usage` / `sessionId`.
*
* Defensive by contract: a non-JSON / partial / garbage line yields `[]` and never
* throws. Tool args (`input_json_delta`) arrive fragmented across many lines; we
* accumulate the partial JSON string per content-block index and only surface the
* parsed `rawInput` once the block stops (or, as a fallback, off the terminal
* `assistant` message which carries the fully-assembled `tool_use` blocks).
*
* Schema (keyed on top-level `type`):
* - `system` — init: { session_id, tools, ... }
* - `assistant` — { message: { content: [ {type:'text'|'thinking'|'tool_use', ...} ], usage? } }
* - `user` — tool results (ignored — diffing the worktree captures effects)
* - `result` — final: { usage: { input_tokens, output_tokens }, session_id? }
* - `stream_event` — { event: { type, index?, content_block?, delta?, usage? } }
* event.type:
* content_block_start — { index, content_block: {type, id?, name?} }
* content_block_delta — { index, delta: {type, text?|thinking?|partial_json?} }
* content_block_stop — { index }
* message_delta — { usage: { output_tokens } }
* message_start — { message: { usage } }
*/
import type { AgentEvent } from './agent-backend.js';
import type { AcpToolSnapshot } from './acp-tool-snapshot.js';
/** Convenience alias for the per-line return value. */
export type AgentEventList = AgentEvent[];
export interface StreamJsonUsage {
inputTokens?: number;
outputTokens?: number;
}
/** Per-open-content-block accumulation for tool args assembled across deltas. */
interface OpenToolBlock {
toolCallId: string;
name: string;
/** Concatenated `input_json_delta.partial_json` fragments. */
partialJson: string;
}
export interface StreamJsonState {
/** content-block index → open tool block (only `tool_use` blocks are tracked). */
toolBlocks: Map<number, OpenToolBlock>;
sessionId: string | null;
usage: StreamJsonUsage;
}
export function makeStreamJsonState(): StreamJsonState {
return { toolBlocks: new Map(), sessionId: null, usage: {} };
}
function asRecord(value: unknown): Record<string, unknown> | null {
if (value && typeof value === 'object' && !Array.isArray(value)) {
return value as Record<string, unknown>;
}
return null;
}
function asString(value: unknown): string | undefined {
return typeof value === 'string' ? value : undefined;
}
function asNumber(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
/** Pull token counts out of an Anthropic-shape `usage` object, mutating state. */
function captureUsage(usage: Record<string, unknown> | null, state: StreamJsonState): void {
if (!usage) return;
const input = asNumber(usage.input_tokens);
const output = asNumber(usage.output_tokens);
if (input !== undefined) state.usage.inputTokens = input;
// output_tokens is reported incrementally on message_delta; keep the latest.
if (output !== undefined) state.usage.outputTokens = output;
}
/** Parse the accumulated tool-arg JSON; tolerate an unparseable/partial body. */
function parseToolInput(partialJson: string): unknown {
const trimmed = partialJson.trim();
if (!trimmed) return {};
try {
return JSON.parse(trimmed);
} catch {
return { _raw: partialJson };
}
}
function toolSnapshot(block: OpenToolBlock, rawInput: unknown, status: AcpToolSnapshot['status']): AcpToolSnapshot {
return {
toolCallId: block.toolCallId,
title: block.name,
kind: null,
status,
rawInput,
};
}
/**
* Map one stream-event sub-object (the `event` field of a `stream_event` line) to
* AgentEvents, mutating `state` for open tool blocks + usage.
*/
function handleStreamEvent(event: Record<string, unknown>, state: StreamJsonState): AgentEvent[] {
const eventType = asString(event.type);
if (!eventType) return [];
switch (eventType) {
case 'content_block_start': {
const index = asNumber(event.index);
const block = asRecord(event.content_block);
if (index === undefined || !block) return [];
if (asString(block.type) !== 'tool_use') return [];
const toolCallId = asString(block.id) ?? `tool_${index}`;
const name = asString(block.name) ?? 'tool';
const open: OpenToolBlock = { toolCallId, name, partialJson: '' };
state.toolBlocks.set(index, open);
// Surface the tool start immediately (running, no args yet) so the UI shows
// the call before the args finish streaming.
return [{ type: 'tool_call', toolCall: toolSnapshot(open, {}, 'in_progress') }];
}
case 'content_block_delta': {
const index = asNumber(event.index);
const delta = asRecord(event.delta);
if (delta === null) return [];
const deltaType = asString(delta.type);
if (deltaType === 'text_delta') {
const text = asString(delta.text);
return text ? [{ type: 'text', text }] : [];
}
if (deltaType === 'thinking_delta') {
const text = asString(delta.thinking);
return text ? [{ type: 'reasoning', text }] : [];
}
if (deltaType === 'input_json_delta') {
// Accumulate tool args; no event until the block stops.
const fragment = asString(delta.partial_json);
if (index !== undefined && fragment) {
const open = state.toolBlocks.get(index);
if (open) open.partialJson += fragment;
}
return [];
}
return [];
}
case 'content_block_stop': {
const index = asNumber(event.index);
if (index === undefined) return [];
const open = state.toolBlocks.get(index);
if (!open) return [];
state.toolBlocks.delete(index);
const rawInput = parseToolInput(open.partialJson);
return [{ type: 'tool_update', toolCall: toolSnapshot(open, rawInput, 'completed') }];
}
case 'message_start': {
const message = asRecord(event.message);
captureUsage(asRecord(message?.usage), state);
return [];
}
case 'message_delta': {
captureUsage(asRecord(event.usage), state);
return [];
}
default:
return [];
}
}
/**
* Map the terminal `assistant` message (post-hoc full message) to AgentEvents. Used
* as a fallback for transports that emit only the assembled `assistant` line and no
* incremental `stream_event`s. When stream_events already streamed a block, the
* caller dedups by toolCallId, so re-emitting the assembled tool_use is harmless.
*/
function handleAssistantMessage(message: Record<string, unknown>, state: StreamJsonState): AgentEvent[] {
captureUsage(asRecord(message.usage), state);
const content = message.content;
if (!Array.isArray(content)) return [];
const out: AgentEvent[] = [];
let toolIdx = 0;
for (const rawBlock of content) {
const block = asRecord(rawBlock);
if (!block) continue;
const blockType = asString(block.type);
if (blockType === 'text') {
const text = asString(block.text);
if (text) out.push({ type: 'text', text });
} else if (blockType === 'thinking') {
const text = asString(block.thinking);
if (text) out.push({ type: 'reasoning', text });
} else if (blockType === 'tool_use') {
const toolCallId = asString(block.id) ?? `tool_${toolIdx}`;
const name = asString(block.name) ?? 'tool';
const rawInput = 'input' in block ? block.input : {};
out.push({
type: 'tool_update',
toolCall: { toolCallId, title: name, kind: null, status: 'completed', rawInput },
});
}
toolIdx++;
}
return out;
}
/**
* Pure per-line mapping. `line` is a single complete NDJSON line (no trailing
* newline required; surrounding whitespace tolerated). Returns the AgentEvents the
* line produces and mutates `state` (open tool blocks, usage, session_id). A blank,
* non-JSON, or unrecognized line yields `[]` and never throws.
*/
export function parseStreamJsonLine(line: string, state: StreamJsonState): AgentEvent[] {
const trimmed = line.trim();
if (!trimmed) return [];
let obj: Record<string, unknown> | null;
try {
const parsed: unknown = JSON.parse(trimmed);
obj = asRecord(parsed);
} catch {
return [];
}
if (!obj) return [];
const type = asString(obj.type);
switch (type) {
case 'system': {
const sid = asString(obj.session_id);
if (sid) state.sessionId = sid;
return [];
}
case 'stream_event': {
const event = asRecord(obj.event);
return event ? handleStreamEvent(event, state) : [];
}
case 'assistant': {
const sid = asString(obj.session_id);
if (sid) state.sessionId = sid;
const message = asRecord(obj.message);
return message ? handleAssistantMessage(message, state) : [];
}
case 'result': {
const sid = asString(obj.session_id);
if (sid) state.sessionId = sid;
captureUsage(asRecord(obj.usage), state);
return [];
}
default:
// `user` (tool results) and any unknown line type — ignore.
return [];
}
}
export interface StreamJsonParser {
/** Feed one complete NDJSON line; returns its AgentEvents (never throws). */
push(line: string): AgentEvent[];
/** Final usage (input/output tokens) accumulated so far. */
usage(): StreamJsonUsage;
/** Provider session id from the init `system` line / `result`, if seen. */
sessionId(): string | null;
}
/**
* Stateful wrapper around `parseStreamJsonLine`. Holds per-tool-block accumulation
* + usage/session_id across the turn. Line-buffering (splitting stdout on `\n` and
* holding the partial tail) is the caller's job — see `pty-dispatch.ts`.
*/
export function makeStreamJsonParser(): StreamJsonParser {
const state = makeStreamJsonState();
return {
push: (line: string) => parseStreamJsonLine(line, state),
usage: () => ({ ...state.usage }),
sessionId: () => state.sessionId,
};
}