diff --git a/CHANGELOG.md b/CHANGELOG.md index fcab010..5a86288 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes per release tag. Most recent on top, ordered by tag creation date (which matches the git history). Tag names follow `vMAJOR.MINOR.PATCH-slug` — the slug describes what shipped, so the tag name alone is enough to recall the batch. +## v2.7.3-sampling-streamjson-tokens — 2026-06-01 + +Three small BooCode wins from `boocode_code_review_v2.md` §1 #11/#7/#8. **Sampling knobs:** per-agent `top_n_sigma` + the `dry_*` repetition family (`dry_multiplier`/`dry_base`/`dry_allowed_length`/`dry_penalty_last_n`) are now first-class Agent frontmatter fields, parsed in `agents.ts` and threaded into the llama-swap chat-completion body via `providerOptions.openaiCompatible` (the `@ai-sdk/openai-compatible` extra-body channel). This surfaced and fixed a **latent bug**: `top_k` (rejected by the AI-SDK provider as unsupported) and `min_p` (never passed to `streamText` at all) had been dead on the wire — no agent's `top_k`/`min_p` ever affected sampling; both now route through the same channel, so agents that set them will start using them. `--reasoning-budget` is documented in `data/AGENTS.md` (already works via `llama_extra_args`, permitted by the deny-list validator). **Live PTY stream-json:** qwen/claude PTY dispatch sliced stdout opaque; a new `stream-json-parser.ts` line-buffers the Claude-Code-compatible NDJSON and emits text/reasoning/tool frames live as they arrive (mirroring the ACP/opencode paths) + persists the structured parts, with a clean fallback to the old opaque slice when output isn't NDJSON (claude now runs `--output-format stream-json --verbose`). **Token UI:** the per-`(chat,agent)` `agent_sessions.input_tokens`/`output_tokens`/`cost` columns (accumulated since `v2.6.8` but dropped by the read route + wire type) now flow through and render condensed beside the AgentComposerBar session chip. Built by three parallel agents over disjoint subsystems; server 523 + coder 245 tests passing (incl. 11 new stream-json-parser + new agent-parse tests), all builds + web tsc clean. Builds on `v2.7.2-checkpoint-idor`; openspec `sampling-streamjson-tokens`. The qwen-vs-claude `usage` field names in #7 are best-guess pending a live smoke. + ## v2.7.2-checkpoint-idor — 2026-06-01 Closes two IDOR authorization holes in the `v2.7.1-write-edit-robustness` checkpoint routes, flagged by the automated push security review. The `GET /api/sessions/:id/checkpoints?chat_id=` list route scoped its `chat_id` branch by `chat_id` alone — any session's `chat_id` would read its checkpoints; it now joins through `chats` and gates on `chats.session_id` (authoritative; `checkpoints.session_id` is a nullable denormalized hint). The `restoreCheckpoint` scope guard was fail-open — `cp.session_id && cp.session_id !== sessionId` fell through whenever the checkpoint's denormalized `session_id` was null, allowing a cross-session restore (worktree reset + transcript trim) — it now resolves the owning session via the checkpoint's chat and denies on any missing-or-mismatched row. A DB-integration regression covers the exact null-`session_id` cross-session case. Real-world blast radius is small (BooCoder is single-user behind Authelia on loopback), but both are genuine authorization bugs. Coder suite 234 passing (7/7 checkpoint tests incl. the regression against live postgres+git), typecheck clean. Hotfix on `v2.7.1-write-edit-robustness`. diff --git a/apps/coder/src/routes/agent-sessions.ts b/apps/coder/src/routes/agent-sessions.ts index 7f1e019..ea4017e 100644 --- a/apps/coder/src/routes/agent-sessions.ts +++ b/apps/coder/src/routes/agent-sessions.ts @@ -16,6 +16,11 @@ export interface AgentSessionRow { status: string; has_session: boolean; last_active_at: string | null; + // v2.6.8 per-(chat,agent) running token/cost totals (sampling-streamjson-tokens + // #8). BIGINT columns arrive as strings over the wire; the frontend coerces. + input_tokens: number; + output_tokens: number; + cost: number; } export function registerAgentSessionRoutes(app: FastifyInstance, sql: Sql): void { @@ -39,7 +44,10 @@ export function registerAgentSessionRoutes(app: FastifyInstance, sql: Sql): void a.agent AS agent, a.status AS status, (a.agent_session_id IS NOT NULL) AS has_session, - a.last_active_at AS last_active_at + a.last_active_at AS last_active_at, + a.input_tokens AS input_tokens, + a.output_tokens AS output_tokens, + a.cost AS cost FROM agent_sessions a JOIN chats c ON c.id = a.chat_id WHERE c.session_id = ${sessionId} diff --git a/apps/coder/src/services/__tests__/stream-json-parser.test.ts b/apps/coder/src/services/__tests__/stream-json-parser.test.ts new file mode 100644 index 0000000..aed6877 --- /dev/null +++ b/apps/coder/src/services/__tests__/stream-json-parser.test.ts @@ -0,0 +1,189 @@ +import { describe, it, expect } from 'vitest'; +import { + makeStreamJsonParser, + makeStreamJsonState, + parseStreamJsonLine, + type AgentEventList, +} from '../stream-json-parser.js'; +import type { AgentEvent } from '../agent-backend.js'; +import type { AcpToolSnapshot } from '../acp-tool-snapshot.js'; + +// Helpers to JSON-encode the representative Claude-Code stream-json lines. +const sys = (sessionId: string) => + JSON.stringify({ type: 'system', subtype: 'init', session_id: sessionId, tools: ['read', 'edit'] }); + +const streamEvent = (event: unknown) => JSON.stringify({ type: 'stream_event', event }); + +const textDelta = (index: number, text: string) => + streamEvent({ type: 'content_block_delta', index, delta: { type: 'text_delta', text } }); + +const thinkingDelta = (index: number, thinking: string) => + streamEvent({ type: 'content_block_delta', index, delta: { type: 'thinking_delta', thinking } }); + +const toolStart = (index: number, id: string, name: string) => + streamEvent({ type: 'content_block_start', index, content_block: { type: 'tool_use', id, name } }); + +const inputJsonDelta = (index: number, partial: string) => + streamEvent({ type: 'content_block_delta', index, delta: { type: 'input_json_delta', partial_json: partial } }); + +const blockStop = (index: number) => streamEvent({ type: 'content_block_stop', index }); + +const resultLine = (input: number, output: number, sessionId?: string) => + JSON.stringify({ type: 'result', subtype: 'success', session_id: sessionId, usage: { input_tokens: input, output_tokens: output } }); + +describe('parseStreamJsonLine (pure per-line mapping)', () => { + it('captures session_id from the system init line and emits no events', () => { + const state = makeStreamJsonState(); + const events = parseStreamJsonLine(sys('sess-abc'), state); + expect(events).toEqual([]); + expect(state.sessionId).toBe('sess-abc'); + }); + + it('maps a text_delta stream_event → a text event', () => { + const state = makeStreamJsonState(); + expect(parseStreamJsonLine(textDelta(0, 'Hello'), state)).toEqual([{ type: 'text', text: 'Hello' }]); + }); + + it('maps a thinking_delta stream_event → a reasoning event', () => { + const state = makeStreamJsonState(); + expect(parseStreamJsonLine(thinkingDelta(0, 'pondering'), state)).toEqual([ + { type: 'reasoning', text: 'pondering' }, + ]); + }); + + it('tolerates a garbage / non-JSON line (returns [], no throw)', () => { + const state = makeStreamJsonState(); + expect(parseStreamJsonLine('not json at all {{{', state)).toEqual([]); + expect(parseStreamJsonLine('', state)).toEqual([]); + expect(parseStreamJsonLine(' ', state)).toEqual([]); + // A truncated/partial JSON object also yields [] rather than throwing. + expect(parseStreamJsonLine('{"type":"stream_event","eve', state)).toEqual([]); + }); + + it('ignores unknown top-level line types and the user (tool-result) line', () => { + const state = makeStreamJsonState(); + expect(parseStreamJsonLine(JSON.stringify({ type: 'user', message: {} }), state)).toEqual([]); + expect(parseStreamJsonLine(JSON.stringify({ type: 'whatever' }), state)).toEqual([]); + }); + + it('assembles a tool call across input_json_delta chunks (split across lines)', () => { + const state = makeStreamJsonState(); + // start → tool_call (running, empty args) + const start = parseStreamJsonLine(toolStart(1, 'toolu_1', 'edit_file'), state); + expect(start).toHaveLength(1); + expect(start[0]!.type).toBe('tool_call'); + const startSnap = (start[0] as { type: 'tool_call'; toolCall: AcpToolSnapshot }).toolCall; + expect(startSnap.toolCallId).toBe('toolu_1'); + expect(startSnap.title).toBe('edit_file'); + expect(startSnap.status).toBe('in_progress'); + expect(startSnap.rawInput).toEqual({}); + + // args streamed in fragments — no events until stop + expect(parseStreamJsonLine(inputJsonDelta(1, '{"path":"a'), state)).toEqual([]); + expect(parseStreamJsonLine(inputJsonDelta(1, '.ts","content":'), state)).toEqual([]); + expect(parseStreamJsonLine(inputJsonDelta(1, '"hi"}'), state)).toEqual([]); + + // stop → tool_update with the parsed, fully-assembled input + const stop = parseStreamJsonLine(blockStop(1), state); + expect(stop).toHaveLength(1); + expect(stop[0]!.type).toBe('tool_update'); + const stopSnap = (stop[0] as { type: 'tool_update'; toolCall: AcpToolSnapshot }).toolCall; + expect(stopSnap.toolCallId).toBe('toolu_1'); + expect(stopSnap.status).toBe('completed'); + expect(stopSnap.rawInput).toEqual({ path: 'a.ts', content: 'hi' }); + }); + + it('falls back to {_raw} when accumulated tool args are not valid JSON', () => { + const state = makeStreamJsonState(); + parseStreamJsonLine(toolStart(0, 'toolu_x', 'run'), state); + parseStreamJsonLine(inputJsonDelta(0, '{"broken'), state); + const stop = parseStreamJsonLine(blockStop(0), state); + const snap = (stop[0] as { type: 'tool_update'; toolCall: AcpToolSnapshot }).toolCall; + expect(snap.rawInput).toEqual({ _raw: '{"broken' }); + }); + + it('captures usage from message_delta and result lines', () => { + const state = makeStreamJsonState(); + parseStreamJsonLine(streamEvent({ type: 'message_delta', usage: { output_tokens: 42 } }), state); + expect(state.usage.outputTokens).toBe(42); + parseStreamJsonLine(resultLine(100, 250, 'sess-z'), state); + expect(state.usage.inputTokens).toBe(100); + expect(state.usage.outputTokens).toBe(250); + expect(state.sessionId).toBe('sess-z'); + }); + + it('maps a terminal assistant message (fallback) → text + reasoning + tool events', () => { + const state = makeStreamJsonState(); + const line = JSON.stringify({ + type: 'assistant', + session_id: 'sess-asst', + message: { + content: [ + { type: 'thinking', thinking: 'let me think' }, + { type: 'text', text: 'Here is the answer' }, + { type: 'tool_use', id: 'toolu_9', name: 'view_file', input: { path: 'x.ts' } }, + ], + usage: { input_tokens: 5, output_tokens: 7 }, + }, + }); + const events = parseStreamJsonLine(line, state); + expect(events).toEqual([ + { type: 'reasoning', text: 'let me think' }, + { type: 'text', text: 'Here is the answer' }, + { + type: 'tool_update', + toolCall: { toolCallId: 'toolu_9', title: 'view_file', kind: null, status: 'completed', rawInput: { path: 'x.ts' } }, + }, + ]); + expect(state.usage).toEqual({ inputTokens: 5, outputTokens: 7 }); + expect(state.sessionId).toBe('sess-asst'); + }); +}); + +describe('makeStreamJsonParser (stateful wrapper over a full turn)', () => { + it('streams a representative turn: init → text → thinking → tool → result', () => { + const parser = makeStreamJsonParser(); + const all: AgentEvent[] = []; + const feed = (line: string): AgentEventList => { + const evs = parser.push(line); + all.push(...evs); + return evs; + }; + + feed(sys('sess-1')); + feed(textDelta(0, 'Reading ')); + feed(textDelta(0, 'the file. ')); + feed(thinkingDelta(0, 'I should edit it')); + feed(toolStart(1, 'toolu_a', 'edit_file')); + feed(inputJsonDelta(1, '{"path":')); + feed(inputJsonDelta(1, '"main.ts"}')); + feed(blockStop(1)); + feed(textDelta(0, 'Done.')); + feed(resultLine(120, 80, 'sess-1')); + + expect(all).toEqual([ + { type: 'text', text: 'Reading ' }, + { type: 'text', text: 'the file. ' }, + { type: 'reasoning', text: 'I should edit it' }, + { + type: 'tool_call', + toolCall: { toolCallId: 'toolu_a', title: 'edit_file', kind: null, status: 'in_progress', rawInput: {} }, + }, + { + type: 'tool_update', + toolCall: { toolCallId: 'toolu_a', title: 'edit_file', kind: null, status: 'completed', rawInput: { path: 'main.ts' } }, + }, + { type: 'text', text: 'Done.' }, + ]); + + expect(parser.usage()).toEqual({ inputTokens: 120, outputTokens: 80 }); + expect(parser.sessionId()).toBe('sess-1'); + }); + + it('a garbage line interleaved mid-turn does not derail subsequent parsing', () => { + const parser = makeStreamJsonParser(); + expect(parser.push(textDelta(0, 'a'))).toEqual([{ type: 'text', text: 'a' }]); + expect(parser.push('>>> not json <<<')).toEqual([]); + expect(parser.push(textDelta(0, 'b'))).toEqual([{ type: 'text', text: 'b' }]); + }); +}); diff --git a/apps/coder/src/services/dispatcher.ts b/apps/coder/src/services/dispatcher.ts index 9b194ac..8fb1558 100644 --- a/apps/coder/src/services/dispatcher.ts +++ b/apps/coder/src/services/dispatcher.ts @@ -410,6 +410,52 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise(); + + const onPtyEvent = (e: AgentEvent): void => { + switch (e.type) { + case 'text': + ptyTextChunks.push(e.text); + broker.publishFrame(sessionId, { + type: 'delta', + message_id: assistantId, + chat_id: chatId, + content: e.text, + } as WsFrame); + break; + case 'reasoning': + ptyReasoningChunks.push(e.text); + broker.publishFrame(sessionId, { + type: 'reasoning_delta', + message_id: assistantId, + chat_id: chatId, + content: e.text, + } as WsFrame); + break; + case 'tool_call': + case 'tool_update': + ptyToolSnaps.set(e.toolCall.toolCallId, e.toolCall); + broker.publishFrame(sessionId, { + type: 'tool_call', + message_id: assistantId, + chat_id: chatId, + tool_call: snapshotToWireToolCall(e.toolCall), + } as WsFrame); + break; + case 'commands': + // stream-json carries no commands today; ignore if it ever does. + break; + } + }; + const result = await dispatchViaPty({ agent, task: task.input, @@ -420,17 +466,33 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise void; } interface PtySpawnSpec { @@ -40,7 +60,9 @@ function buildPtySpawnSpec( switch (agent) { case 'claude': { - const args = ['-p']; + // stream-json on -p requires --verbose (Claude Code rejects stream-json + // print mode without it). qwen needs no such flag. + const args = ['-p', '--output-format', 'stream-json', '--verbose']; if (model) args.push('--model', model); if (modeId) args.push('--permission-mode', modeId); if (thinkingOptionId) args.push('--effort', thinkingOptionId); @@ -73,7 +95,7 @@ function buildPtySpawnSpec( } export async function dispatchViaPty(opts: PtyDispatchOpts): Promise { - const { agent, task, worktreePath, model, modeId, thinkingOptionId, installPath, signal, log } = opts; + const { agent, task, worktreePath, model, modeId, thinkingOptionId, installPath, signal, log, onEvent } = opts; const cmd = buildPtySpawnSpec(agent, task, model, modeId, thinkingOptionId, installPath); if (!cmd) { @@ -81,6 +103,7 @@ export async function dispatchViaPty(opts: PtyDispatchOpts): Promise { stdout += chunk.toString(); }); + // Live NDJSON parsing (only when a sink is supplied). Line-buffer: split on + // '\n', dispatch complete lines, hold the partial tail until the next chunk. + const parser = onEvent ? makeStreamJsonParser() : null; + let lineBuf = ''; + let streamed = false; + const feedLine = (line: string): void => { + if (!parser || !onEvent) return; + for (const e of parser.push(line)) { + streamed = true; + onEvent(e); + } + }; + + child.stdout!.on('data', (chunk: Buffer) => { + const text = chunk.toString(); + stdout += text; + if (!parser) return; + lineBuf += text; + let nl = lineBuf.indexOf('\n'); + while (nl !== -1) { + const line = lineBuf.slice(0, nl); + lineBuf = lineBuf.slice(nl + 1); + feedLine(line); + nl = lineBuf.indexOf('\n'); + } + }); child.stderr!.on('data', (chunk: Buffer) => { stderr += chunk.toString(); }); const cleanup = () => { @@ -116,7 +164,7 @@ export async function dispatchViaPty(opts: PtyDispatchOpts): Promise { if (signal) signal.removeEventListener('abort', cleanup); - log.info({ agent, exitCode: code }, 'pty-dispatch: completed'); - resolve({ exitCode: code ?? 1, stdout, stderr }); + // Flush any final line with no trailing newline. + if (lineBuf.trim()) feedLine(lineBuf); + lineBuf = ''; + log.info({ agent, exitCode: code, streamed }, 'pty-dispatch: completed'); + resolve({ + exitCode: code ?? 1, + stdout, + stderr, + streamed, + usage: parser?.usage(), + agentSessionId: parser?.sessionId() ?? null, + }); }); child.on('error', (err) => { diff --git a/apps/coder/src/services/stream-json-parser.ts b/apps/coder/src/services/stream-json-parser.ts new file mode 100644 index 0000000..c1dce27 --- /dev/null +++ b/apps/coder/src/services/stream-json-parser.ts @@ -0,0 +1,296 @@ +/** + * Claude-Code-compatible stream-json NDJSON parser (feature #7, + * openspec `sampling-streamjson-tokens`). + * + * qwen (`--output-format stream-json`) and claude (`--output-format stream-json`) + * both emit Claude-Code's stream-json NDJSON on stdout: one JSON object per line. + * This module turns that stream into the same transport-agnostic `AgentEvent`s the + * ACP / opencode-server backends emit, so the PTY dispatch path can publish live + * broker frames + persist structured parts instead of slicing stdout opaque. + * + * Two surfaces: + * - `parseStreamJsonLine(line, state)` — PURE per-line mapping (unit-testable). + * `state` is the caller-owned accumulator (open tool blocks + usage/session_id). + * - `makeStreamJsonParser()` — a thin stateful wrapper holding the state, with a + * `push(line)` that returns the events for that line and getters for the final + * `usage` / `sessionId`. + * + * Defensive by contract: a non-JSON / partial / garbage line yields `[]` and never + * throws. Tool args (`input_json_delta`) arrive fragmented across many lines; we + * accumulate the partial JSON string per content-block index and only surface the + * parsed `rawInput` once the block stops (or, as a fallback, off the terminal + * `assistant` message which carries the fully-assembled `tool_use` blocks). + * + * Schema (keyed on top-level `type`): + * - `system` — init: { session_id, tools, ... } + * - `assistant` — { message: { content: [ {type:'text'|'thinking'|'tool_use', ...} ], usage? } } + * - `user` — tool results (ignored — diffing the worktree captures effects) + * - `result` — final: { usage: { input_tokens, output_tokens }, session_id? } + * - `stream_event` — { event: { type, index?, content_block?, delta?, usage? } } + * event.type: + * content_block_start — { index, content_block: {type, id?, name?} } + * content_block_delta — { index, delta: {type, text?|thinking?|partial_json?} } + * content_block_stop — { index } + * message_delta — { usage: { output_tokens } } + * message_start — { message: { usage } } + */ +import type { AgentEvent } from './agent-backend.js'; +import type { AcpToolSnapshot } from './acp-tool-snapshot.js'; + +/** Convenience alias for the per-line return value. */ +export type AgentEventList = AgentEvent[]; + +export interface StreamJsonUsage { + inputTokens?: number; + outputTokens?: number; +} + +/** Per-open-content-block accumulation for tool args assembled across deltas. */ +interface OpenToolBlock { + toolCallId: string; + name: string; + /** Concatenated `input_json_delta.partial_json` fragments. */ + partialJson: string; +} + +export interface StreamJsonState { + /** content-block index → open tool block (only `tool_use` blocks are tracked). */ + toolBlocks: Map; + sessionId: string | null; + usage: StreamJsonUsage; +} + +export function makeStreamJsonState(): StreamJsonState { + return { toolBlocks: new Map(), sessionId: null, usage: {} }; +} + +function asRecord(value: unknown): Record | null { + if (value && typeof value === 'object' && !Array.isArray(value)) { + return value as Record; + } + return null; +} + +function asString(value: unknown): string | undefined { + return typeof value === 'string' ? value : undefined; +} + +function asNumber(value: unknown): number | undefined { + return typeof value === 'number' && Number.isFinite(value) ? value : undefined; +} + +/** Pull token counts out of an Anthropic-shape `usage` object, mutating state. */ +function captureUsage(usage: Record | null, state: StreamJsonState): void { + if (!usage) return; + const input = asNumber(usage.input_tokens); + const output = asNumber(usage.output_tokens); + if (input !== undefined) state.usage.inputTokens = input; + // output_tokens is reported incrementally on message_delta; keep the latest. + if (output !== undefined) state.usage.outputTokens = output; +} + +/** Parse the accumulated tool-arg JSON; tolerate an unparseable/partial body. */ +function parseToolInput(partialJson: string): unknown { + const trimmed = partialJson.trim(); + if (!trimmed) return {}; + try { + return JSON.parse(trimmed); + } catch { + return { _raw: partialJson }; + } +} + +function toolSnapshot(block: OpenToolBlock, rawInput: unknown, status: AcpToolSnapshot['status']): AcpToolSnapshot { + return { + toolCallId: block.toolCallId, + title: block.name, + kind: null, + status, + rawInput, + }; +} + +/** + * Map one stream-event sub-object (the `event` field of a `stream_event` line) to + * AgentEvents, mutating `state` for open tool blocks + usage. + */ +function handleStreamEvent(event: Record, state: StreamJsonState): AgentEvent[] { + const eventType = asString(event.type); + if (!eventType) return []; + + switch (eventType) { + case 'content_block_start': { + const index = asNumber(event.index); + const block = asRecord(event.content_block); + if (index === undefined || !block) return []; + if (asString(block.type) !== 'tool_use') return []; + const toolCallId = asString(block.id) ?? `tool_${index}`; + const name = asString(block.name) ?? 'tool'; + const open: OpenToolBlock = { toolCallId, name, partialJson: '' }; + state.toolBlocks.set(index, open); + // Surface the tool start immediately (running, no args yet) so the UI shows + // the call before the args finish streaming. + return [{ type: 'tool_call', toolCall: toolSnapshot(open, {}, 'in_progress') }]; + } + + case 'content_block_delta': { + const index = asNumber(event.index); + const delta = asRecord(event.delta); + if (delta === null) return []; + const deltaType = asString(delta.type); + if (deltaType === 'text_delta') { + const text = asString(delta.text); + return text ? [{ type: 'text', text }] : []; + } + if (deltaType === 'thinking_delta') { + const text = asString(delta.thinking); + return text ? [{ type: 'reasoning', text }] : []; + } + if (deltaType === 'input_json_delta') { + // Accumulate tool args; no event until the block stops. + const fragment = asString(delta.partial_json); + if (index !== undefined && fragment) { + const open = state.toolBlocks.get(index); + if (open) open.partialJson += fragment; + } + return []; + } + return []; + } + + case 'content_block_stop': { + const index = asNumber(event.index); + if (index === undefined) return []; + const open = state.toolBlocks.get(index); + if (!open) return []; + state.toolBlocks.delete(index); + const rawInput = parseToolInput(open.partialJson); + return [{ type: 'tool_update', toolCall: toolSnapshot(open, rawInput, 'completed') }]; + } + + case 'message_start': { + const message = asRecord(event.message); + captureUsage(asRecord(message?.usage), state); + return []; + } + + case 'message_delta': { + captureUsage(asRecord(event.usage), state); + return []; + } + + default: + return []; + } +} + +/** + * Map the terminal `assistant` message (post-hoc full message) to AgentEvents. Used + * as a fallback for transports that emit only the assembled `assistant` line and no + * incremental `stream_event`s. When stream_events already streamed a block, the + * caller dedups by toolCallId, so re-emitting the assembled tool_use is harmless. + */ +function handleAssistantMessage(message: Record, state: StreamJsonState): AgentEvent[] { + captureUsage(asRecord(message.usage), state); + const content = message.content; + if (!Array.isArray(content)) return []; + const out: AgentEvent[] = []; + let toolIdx = 0; + for (const rawBlock of content) { + const block = asRecord(rawBlock); + if (!block) continue; + const blockType = asString(block.type); + if (blockType === 'text') { + const text = asString(block.text); + if (text) out.push({ type: 'text', text }); + } else if (blockType === 'thinking') { + const text = asString(block.thinking); + if (text) out.push({ type: 'reasoning', text }); + } else if (blockType === 'tool_use') { + const toolCallId = asString(block.id) ?? `tool_${toolIdx}`; + const name = asString(block.name) ?? 'tool'; + const rawInput = 'input' in block ? block.input : {}; + out.push({ + type: 'tool_update', + toolCall: { toolCallId, title: name, kind: null, status: 'completed', rawInput }, + }); + } + toolIdx++; + } + return out; +} + +/** + * Pure per-line mapping. `line` is a single complete NDJSON line (no trailing + * newline required; surrounding whitespace tolerated). Returns the AgentEvents the + * line produces and mutates `state` (open tool blocks, usage, session_id). A blank, + * non-JSON, or unrecognized line yields `[]` and never throws. + */ +export function parseStreamJsonLine(line: string, state: StreamJsonState): AgentEvent[] { + const trimmed = line.trim(); + if (!trimmed) return []; + + let obj: Record | null; + try { + const parsed: unknown = JSON.parse(trimmed); + obj = asRecord(parsed); + } catch { + return []; + } + if (!obj) return []; + + const type = asString(obj.type); + switch (type) { + case 'system': { + const sid = asString(obj.session_id); + if (sid) state.sessionId = sid; + return []; + } + + case 'stream_event': { + const event = asRecord(obj.event); + return event ? handleStreamEvent(event, state) : []; + } + + case 'assistant': { + const sid = asString(obj.session_id); + if (sid) state.sessionId = sid; + const message = asRecord(obj.message); + return message ? handleAssistantMessage(message, state) : []; + } + + case 'result': { + const sid = asString(obj.session_id); + if (sid) state.sessionId = sid; + captureUsage(asRecord(obj.usage), state); + return []; + } + + default: + // `user` (tool results) and any unknown line type — ignore. + return []; + } +} + +export interface StreamJsonParser { + /** Feed one complete NDJSON line; returns its AgentEvents (never throws). */ + push(line: string): AgentEvent[]; + /** Final usage (input/output tokens) accumulated so far. */ + usage(): StreamJsonUsage; + /** Provider session id from the init `system` line / `result`, if seen. */ + sessionId(): string | null; +} + +/** + * Stateful wrapper around `parseStreamJsonLine`. Holds per-tool-block accumulation + * + usage/session_id across the turn. Line-buffering (splitting stdout on `\n` and + * holding the partial tail) is the caller's job — see `pty-dispatch.ts`. + */ +export function makeStreamJsonParser(): StreamJsonParser { + const state = makeStreamJsonState(); + return { + push: (line: string) => parseStreamJsonLine(line, state), + usage: () => ({ ...state.usage }), + sessionId: () => state.sessionId, + }; +} diff --git a/apps/server/src/services/__tests__/agents.test.ts b/apps/server/src/services/__tests__/agents.test.ts index 32aaac3..eb667ce 100644 --- a/apps/server/src/services/__tests__/agents.test.ts +++ b/apps/server/src/services/__tests__/agents.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect } from 'vitest'; +import { describe, it, expect, vi, afterEach } from 'vitest'; import { isAgentRegistryMarkdown, parseAgentsMd } from '../agents.js'; describe('isAgentRegistryMarkdown', () => { @@ -31,3 +31,87 @@ Start here expect(r.errors.length).toBeGreaterThan(0); }); }); + +// v2.6 sampling-streamjson-tokens (#11): per-agent llama.cpp sampler extensions. +describe('parseAgentsMd: v2.6 sampling knobs', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + const withFrontmatter = (lines: string) => `# Agents + +## Sampler +--- +temperature: 0.6 +${lines} +tools: [view_file] +description: test +--- +You sample. +`; + + it('parses top_n_sigma and the dry_* family from frontmatter', () => { + const md = withFrontmatter( + [ + 'top_n_sigma: 1.5', + 'dry_multiplier: 0.8', + 'dry_base: 1.75', + 'dry_allowed_length: 2', + 'dry_penalty_last_n: -1', + ].join('\n'), + ); + const { agents, errors } = parseAgentsMd(md); + expect(errors).toHaveLength(0); + expect(agents).toHaveLength(1); + const a = agents[0]!; + expect(a.top_n_sigma).toBe(1.5); + expect(a.dry_multiplier).toBe(0.8); + expect(a.dry_base).toBe(1.75); + expect(a.dry_allowed_length).toBe(2); + expect(a.dry_penalty_last_n).toBe(-1); + }); + + it('defaults the new sampler fields to null when omitted', () => { + const { agents } = parseAgentsMd(withFrontmatter('top_p: 0.95')); + const a = agents[0]!; + expect(a.top_n_sigma).toBeNull(); + expect(a.dry_multiplier).toBeNull(); + expect(a.dry_base).toBeNull(); + expect(a.dry_allowed_length).toBeNull(); + expect(a.dry_penalty_last_n).toBeNull(); + }); + + it('warns (does not error) on out-of-range top_n_sigma / dry_* values', () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const md = withFrontmatter( + [ + 'top_n_sigma: -1', + 'dry_multiplier: -0.5', + 'dry_base: -2', + 'dry_allowed_length: -3', + 'dry_penalty_last_n: -5', + ].join('\n'), + ); + const { agents, errors } = parseAgentsMd(md); + expect(errors).toHaveLength(0); + expect(agents).toHaveLength(1); + // Mirrors top_k/min_p: out-of-range still stored, with a warning. + expect(warn).toHaveBeenCalled(); + const warnings = warn.mock.calls.map((c) => String(c[0])).join('\n'); + expect(warnings).toContain('top_n_sigma'); + expect(warnings).toContain('dry_multiplier'); + expect(warnings).toContain('dry_base'); + expect(warnings).toContain('dry_allowed_length'); + expect(warnings).toContain('dry_penalty_last_n'); + }); + + it('errors on non-numeric / non-integer sampler values', () => { + const md = withFrontmatter( + ['top_n_sigma: high', 'dry_allowed_length: 2.5'].join('\n'), + ); + const { errors } = parseAgentsMd(md); + const joined = errors.map((e) => e.reason).join('\n'); + expect(joined).toContain('top_n_sigma must be a number'); + expect(joined).toContain('dry_allowed_length must be an integer'); + }); +}); diff --git a/apps/server/src/services/agents.ts b/apps/server/src/services/agents.ts index 681ac01..e873991 100644 --- a/apps/server/src/services/agents.ts +++ b/apps/server/src/services/agents.ts @@ -88,6 +88,12 @@ interface ParsedFrontmatter { top_k?: number; min_p?: number; presence_penalty?: number; + // v2.6 sampling-streamjson-tokens (#11): llama.cpp sampler extensions. + top_n_sigma?: number; + dry_multiplier?: number; + dry_base?: number; + dry_allowed_length?: number; + dry_penalty_last_n?: number; tools?: string[]; description?: string; model?: string; @@ -178,6 +184,63 @@ function parseFrontmatter(yaml: string): { data: ParsedFrontmatter; errors: stri } else { errors.push(`presence_penalty must be a number (got "${valueRaw}")`); } + } else if (key === 'top_n_sigma') { + // v2.6 #11: llama.cpp top-n-sigma sampler. Float ≥ 0 (typical 0-3). + // Mirrors top_p/min_p: store then warn on out-of-range (non-numeric + // hard-fails the block). + const n = Number(valueRaw); + if (Number.isFinite(n)) { + data.top_n_sigma = n; + if (n < 0) { + console.warn(`agents: top_n_sigma ${n} out of range (≥0), ignoring (falling back to default)`); + } + } else { + errors.push(`top_n_sigma must be a number (got "${valueRaw}")`); + } + } else if (key === 'dry_multiplier') { + // v2.6 #11: DRY repetition-penalty multiplier. Float ≥ 0 (0 disables DRY). + const n = Number(valueRaw); + if (Number.isFinite(n)) { + data.dry_multiplier = n; + if (n < 0) { + console.warn(`agents: dry_multiplier ${n} out of range (≥0), ignoring (falling back to default)`); + } + } else { + errors.push(`dry_multiplier must be a number (got "${valueRaw}")`); + } + } else if (key === 'dry_base') { + // v2.6 #11: DRY penalty growth base. Float ≥ 0. + const n = Number(valueRaw); + if (Number.isFinite(n)) { + data.dry_base = n; + if (n < 0) { + console.warn(`agents: dry_base ${n} out of range (≥0), ignoring (falling back to default)`); + } + } else { + errors.push(`dry_base must be a number (got "${valueRaw}")`); + } + } else if (key === 'dry_allowed_length') { + // v2.6 #11: DRY max sequence length not penalized. Integer ≥ 0. + const n = Number(valueRaw); + if (Number.isInteger(n)) { + data.dry_allowed_length = n; + if (n < 0) { + console.warn(`agents: dry_allowed_length ${n} out of range (≥0), ignoring (falling back to default)`); + } + } else { + errors.push(`dry_allowed_length must be an integer (got "${valueRaw}")`); + } + } else if (key === 'dry_penalty_last_n') { + // v2.6 #11: DRY lookback window. Integer ≥ -1 (-1 = whole context, 0 = off). + const n = Number(valueRaw); + if (Number.isInteger(n)) { + data.dry_penalty_last_n = n; + if (n < -1) { + console.warn(`agents: dry_penalty_last_n ${n} out of range (≥-1), ignoring (falling back to default)`); + } + } else { + errors.push(`dry_penalty_last_n must be an integer (got "${valueRaw}")`); + } } else if (key === 'tools') { if (valueRaw === '') { data.tools = []; @@ -354,6 +417,11 @@ function parseAgentSection(section: RawSection): Omit { top_k: typeof fm.top_k === 'number' ? fm.top_k : null, min_p: typeof fm.min_p === 'number' ? fm.min_p : null, presence_penalty: typeof fm.presence_penalty === 'number' ? fm.presence_penalty : null, + top_n_sigma: typeof fm.top_n_sigma === 'number' ? fm.top_n_sigma : null, + dry_multiplier: typeof fm.dry_multiplier === 'number' ? fm.dry_multiplier : null, + dry_base: typeof fm.dry_base === 'number' ? fm.dry_base : null, + dry_allowed_length: typeof fm.dry_allowed_length === 'number' ? fm.dry_allowed_length : null, + dry_penalty_last_n: typeof fm.dry_penalty_last_n === 'number' ? fm.dry_penalty_last_n : null, tools: filteredTools, model: typeof fm.model === 'string' && fm.model.length > 0 ? fm.model : null, max_tool_calls: typeof fm.max_tool_calls === 'number' ? fm.max_tool_calls : null, diff --git a/apps/server/src/services/inference/sentinel-summaries.ts b/apps/server/src/services/inference/sentinel-summaries.ts index 033a406..1bcfbd6 100644 --- a/apps/server/src/services/inference/sentinel-summaries.ts +++ b/apps/server/src/services/inference/sentinel-summaries.ts @@ -86,7 +86,7 @@ export async function runCapHitSummary( ctx, session.model, messages, - { tools: null, temperature: agent?.temperature, top_p: agent?.top_p ?? undefined, top_k: agent?.top_k ?? undefined, min_p: agent?.min_p ?? undefined, presence_penalty: agent?.presence_penalty ?? undefined }, + { tools: null, temperature: agent?.temperature, top_p: agent?.top_p ?? undefined, top_k: agent?.top_k ?? undefined, min_p: agent?.min_p ?? undefined, presence_penalty: agent?.presence_penalty ?? undefined, top_n_sigma: agent?.top_n_sigma ?? undefined, dry_multiplier: agent?.dry_multiplier ?? undefined, dry_base: agent?.dry_base ?? undefined, dry_allowed_length: agent?.dry_allowed_length ?? undefined, dry_penalty_last_n: agent?.dry_penalty_last_n ?? undefined }, (delta) => { accumulated += delta; ctx.publish(sessionId, { @@ -346,7 +346,7 @@ export async function runDoomLoopSummary( ctx, session.model, messages, - { tools: null, temperature: agent?.temperature, top_p: agent?.top_p ?? undefined, top_k: agent?.top_k ?? undefined, min_p: agent?.min_p ?? undefined, presence_penalty: agent?.presence_penalty ?? undefined }, + { tools: null, temperature: agent?.temperature, top_p: agent?.top_p ?? undefined, top_k: agent?.top_k ?? undefined, min_p: agent?.min_p ?? undefined, presence_penalty: agent?.presence_penalty ?? undefined, top_n_sigma: agent?.top_n_sigma ?? undefined, dry_multiplier: agent?.dry_multiplier ?? undefined, dry_base: agent?.dry_base ?? undefined, dry_allowed_length: agent?.dry_allowed_length ?? undefined, dry_penalty_last_n: agent?.dry_penalty_last_n ?? undefined }, (delta) => { accumulated += delta; ctx.publish(sessionId, { @@ -545,7 +545,7 @@ export async function runStepCapSummary( ctx, session.model, messages, - { tools: null, temperature: agent?.temperature, top_p: agent?.top_p ?? undefined, top_k: agent?.top_k ?? undefined, min_p: agent?.min_p ?? undefined, presence_penalty: agent?.presence_penalty ?? undefined }, + { tools: null, temperature: agent?.temperature, top_p: agent?.top_p ?? undefined, top_k: agent?.top_k ?? undefined, min_p: agent?.min_p ?? undefined, presence_penalty: agent?.presence_penalty ?? undefined, top_n_sigma: agent?.top_n_sigma ?? undefined, dry_multiplier: agent?.dry_multiplier ?? undefined, dry_base: agent?.dry_base ?? undefined, dry_allowed_length: agent?.dry_allowed_length ?? undefined, dry_penalty_last_n: agent?.dry_penalty_last_n ?? undefined }, (delta) => { accumulated += delta; ctx.publish(sessionId, { diff --git a/apps/server/src/services/inference/stream-phase.ts b/apps/server/src/services/inference/stream-phase.ts index 95f2fa9..c2915c3 100644 --- a/apps/server/src/services/inference/stream-phase.ts +++ b/apps/server/src/services/inference/stream-phase.ts @@ -33,6 +33,39 @@ interface StreamOptions { top_k?: number | null; min_p?: number | null; presence_penalty?: number | null; + // v2.6 sampling-streamjson-tokens (#11): llama.cpp sampler extensions. These + // are NOT standard AI-SDK streamText options and are NOT serialized by the + // openai-compatible provider's standardized-settings path (topK is even + // explicitly dropped with an "unsupported feature: topK" warning). They reach + // llama-server only via providerOptions.openaiCompatible (see buildSamplerProviderOptions). + top_n_sigma?: number | null; + dry_multiplier?: number | null; + dry_base?: number | null; + dry_allowed_length?: number | null; + dry_penalty_last_n?: number | null; +} + +// v2.6 #11: build the providerOptions.openaiCompatible extraBody object for the +// llama.cpp sampler extensions. @ai-sdk/openai-compatible (2.0.47) merges every +// non-reserved key under providerOptions.openaiCompatible straight into the +// chat-completion request body (see its getArgs: the Object.fromEntries spread +// filtered against openaiCompatibleLanguageModelChatOptions.shape). This is the +// ONLY working passthrough for these params: +// - top_k / min_p were latently dropped before this: top_k was passed as the +// AI-SDK `topK` setting which the openai-compatible provider rejects as +// unsupported; min_p was never passed to streamText at all. +// - top_n_sigma + the dry_* family have no AI-SDK equivalent. +// Keys use llama-server's snake_case body names so they land verbatim. +function buildSamplerProviderOptions(opts: StreamOptions): Record | undefined { + const body: Record = {}; + if (typeof opts.top_k === 'number') body.top_k = opts.top_k; + if (typeof opts.min_p === 'number') body.min_p = opts.min_p; + if (typeof opts.top_n_sigma === 'number') body.top_n_sigma = opts.top_n_sigma; + if (typeof opts.dry_multiplier === 'number') body.dry_multiplier = opts.dry_multiplier; + if (typeof opts.dry_base === 'number') body.dry_base = opts.dry_base; + if (typeof opts.dry_allowed_length === 'number') body.dry_allowed_length = opts.dry_allowed_length; + if (typeof opts.dry_penalty_last_n === 'number') body.dry_penalty_last_n = opts.dry_penalty_last_n; + return Object.keys(body).length > 0 ? body : undefined; } // v1.13.1-A: convert BooCode's OpenAI-shaped history into AI SDK @@ -195,6 +228,14 @@ export async function streamCompletion( return toolCall; }; + // v2.6 #11: llama.cpp sampler extensions (top_k, min_p, top_n_sigma, dry_*) + // ride providerOptions.openaiCompatible — they are NOT standardized streamText + // settings. NB: top_k used to be passed below as the AI-SDK `topK` setting; + // the openai-compatible provider dropped it with an "unsupported feature: topK" + // warning and min_p was never wired at all, so both were dead on the wire + // before this. They now go through the same extraBody path as the new params. + const samplerBody = buildSamplerProviderOptions(opts); + const result = streamText({ model: upstreamModel(ctx.config, model, agent ?? null), messages: aiMessages, @@ -203,8 +244,8 @@ export async function streamCompletion( : {}), ...(typeof opts.temperature === 'number' ? { temperature: opts.temperature } : {}), ...(typeof opts.top_p === 'number' ? { topP: opts.top_p } : {}), - ...(typeof opts.top_k === 'number' ? { topK: opts.top_k } : {}), ...(typeof opts.presence_penalty === 'number' ? { presencePenalty: opts.presence_penalty } : {}), + ...(samplerBody ? { providerOptions: { openaiCompatible: samplerBody } } : {}), abortSignal: signal, }); @@ -398,6 +439,12 @@ export async function executeStreamPhase( const effectiveTopK = agent?.top_k ?? undefined; const effectiveMinP = agent?.min_p ?? undefined; const effectivePresencePenalty = agent?.presence_penalty ?? undefined; + // v2.6 #11: llama.cpp sampler extensions, threaded the same way as top_k/min_p. + const effectiveTopNSigma = agent?.top_n_sigma ?? undefined; + const effectiveDryMultiplier = agent?.dry_multiplier ?? undefined; + const effectiveDryBase = agent?.dry_base ?? undefined; + const effectiveDryAllowedLength = agent?.dry_allowed_length ?? undefined; + const effectiveDryPenaltyLastN = agent?.dry_penalty_last_n ?? undefined; // v1.12.2: ctx_max lookup is cached after the first hit per model, so this // is a Map probe in steady state. We capture nCtx once at the top of the @@ -435,7 +482,19 @@ export async function executeStreamPhase( ctx, session.model, messages, - { tools: effectiveTools, temperature: effectiveTemperature, top_p: effectiveTopP, top_k: effectiveTopK, min_p: effectiveMinP, presence_penalty: effectivePresencePenalty }, + { + tools: effectiveTools, + temperature: effectiveTemperature, + top_p: effectiveTopP, + top_k: effectiveTopK, + min_p: effectiveMinP, + presence_penalty: effectivePresencePenalty, + top_n_sigma: effectiveTopNSigma, + dry_multiplier: effectiveDryMultiplier, + dry_base: effectiveDryBase, + dry_allowed_length: effectiveDryAllowedLength, + dry_penalty_last_n: effectiveDryPenaltyLastN, + }, (delta) => { state.accumulated += delta; ctx.publish(sessionId, { diff --git a/apps/server/src/types/api.ts b/apps/server/src/types/api.ts index 257dc2b..0ae1e99 100644 --- a/apps/server/src/types/api.ts +++ b/apps/server/src/types/api.ts @@ -117,6 +117,15 @@ export interface Agent { top_k: number | null; // null means omit from request body min_p: number | null; // null means omit from request body presence_penalty: number | null; // null means omit from request body + // v2.6 sampling-streamjson-tokens (#11): llama.cpp sampler extensions. + // null = omit from request body. top_n_sigma + the DRY repetition family + // help the doom-loop-prone local model. All travel via the same + // providerOptions.openaiCompatible extraBody channel as top_k/min_p. + top_n_sigma: number | null; + dry_multiplier: number | null; + dry_base: number | null; + dry_allowed_length: number | null; + dry_penalty_last_n: number | null; tools: string[]; // whitelist of tool names; empty = no tools allowed model: string | null; // null means "session.model wins" source: AgentSource; diff --git a/apps/web/src/api/client.ts b/apps/web/src/api/client.ts index 26139c4..8a071d4 100644 --- a/apps/web/src/api/client.ts +++ b/apps/web/src/api/client.ts @@ -34,6 +34,12 @@ export interface AgentSessionInfo { status: string; has_session: boolean; last_active_at: string | null; + // v2.6.8 per-(chat,agent) running token/cost totals (sampling-streamjson-tokens + // #8). input_tokens/output_tokens are BIGINT and may arrive as strings; cost is + // DOUBLE. AgentComposerBar coerces with Number(...) before rendering. + input_tokens: number; + output_tokens: number; + cost: number; } // write-edit-robustness #4: a pre-turn worktree snapshot anchored to an diff --git a/apps/web/src/components/AgentComposerBar.tsx b/apps/web/src/components/AgentComposerBar.tsx index aa74d11..1514932 100644 --- a/apps/web/src/components/AgentComposerBar.tsx +++ b/apps/web/src/components/AgentComposerBar.tsx @@ -185,6 +185,14 @@ interface Props { hasPriorTurn?: boolean; } +// Condensed token count: 950 → "950", 12_400 → "12.4K", 3_200_000 → "3.2M". +// Sub-1000 stays exact; thousands/millions get one decimal, trailing .0 trimmed. +function abbrevTokens(n: number): string { + if (!Number.isFinite(n) || n < 1000) return String(Math.max(0, Math.round(n))); + if (n < 1_000_000) return `${(n / 1000).toFixed(1).replace(/\.0$/, '')}K`; + return `${(n / 1_000_000).toFixed(1).replace(/\.0$/, '')}M`; +} + // Relative-time formatter for the resumed-chip title (e.g. "3m ago"). function relativeTime(iso: string | null): string { if (!iso) return 'unknown'; @@ -353,6 +361,21 @@ export function AgentComposerBar({ projectPath, value, onChange, onProviderComma : { label: 'new session', title: `${value.provider} starts a fresh session this turn` } : null; + // sampling-streamjson-tokens #8: condensed per-(chat,agent) token/cost readout + // beside the session chip. Coerce — input/output are BIGINT (string over wire). + // Hidden when no session row or all totals are zero (e.g. native boocode, which + // holds no agent_sessions row, or a provider that hasn't run yet). + const usageReadout = (() => { + if (!sessionChip || !sessionRow) return null; + const inTok = Number(sessionRow.input_tokens) || 0; + const outTok = Number(sessionRow.output_tokens) || 0; + const cost = Number(sessionRow.cost) || 0; + if (inTok <= 0 && outTok <= 0 && cost <= 0) return null; + const parts = [`${abbrevTokens(inTok)} in`, `${abbrevTokens(outTok)} out`]; + if (cost > 0) parts.push(`$${cost.toFixed(2)}`); + return parts.join(' · '); + })(); + return (
)} + {usageReadout && ( + + {usageReadout} + + )}