Files
boocode/apps/server/src/services/inference/stream-phase.ts
indifferentketchup a08d809b73 v1.13.3: cleanup bundle — statement timeout + alpha ordering + stuck-row sweeper + repairToolCall
Four independent items, all owed from prior dispatches.

- statement_timeout at the database level via:
    ALTER DATABASE boocode SET statement_timeout = '30s';
  Applied operationally; documented as a comment at the top of schema.sql
  (ALTER DATABASE can't run inside a DO block, so it's not idempotent
  inside applySchema). Re-apply after a volume reset.

- Tool registry alpha-sorted at module load. llama.cpp's prompt cache
  hits on byte-identical prefixes; any reordering of the tool list near
  the top of the system prompt would invalidate every cached turn.
  Single-source sort at the ALL_TOOLS export so toolJsonSchemas() and
  TOOLS_BY_NAME inherit the order automatically. New tools.test.ts
  asserts the invariant; total tests 173 (was 172).

- Periodic in-process stuck-row sweeper. Runs every 60s, marks
  'streaming' rows older than 5 minutes as 'failed', and publishes
  chat_status='idle' on the user channel so the UI dot drops without a
  refresh. Closes the mid-session crash UX gap; the v1.12.1 boot sweep
  only fires once at startup, so sessions used to stay stuck until next
  reboot. setInterval cleaned up via app.addHook('onClose'). Mirrors
  handleAbortOrError's publish pattern.

- experimental_repairToolCall wired through AI SDK v6 streamText. Pass-
  through implementation: log + return the original toolCall so the
  stream keeps going. executeToolPhase's existing error paths (unknown
  tool name → 'unknown tool: X' result; zod-reject → 'tool X rejected
  — field: required') already surface bad calls to the model; the value
  here is preventing the AI SDK from THROWING on parse errors and
  killing the whole stream. Owed since v1.13.1-A.

Smoke verified:
- statement_timeout = '30s' confirmed via SHOW.
- Tool path normal flow intact (list_dir prompt → tool_call → result
  → final assistant). No malformed tool calls in the test run; repair
  log will surface them when qwen3.6 actually emits one.
- Alpha order verified at runtime via the dist bundle: match: true.
- Sweeper logic not traffic-tested (no stuck rows to find), but the
  SQL UPDATE + broker.publishUser pattern is identical to handleAbort
  and the boot sweep — synthesis-only verification.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 06:46:03 +00:00

483 lines
17 KiB
TypeScript

import type {
Agent,
Session,
ToolCall,
} from '../../types/api.js';
import * as modelContext from '../model-context.js';
import { toolJsonSchemas, type ToolJsonSchema } from '../tools.js';
import type { OpenAiMessage } from './payload.js';
import {
XML_TOOL_CLOSE,
XML_TOOL_OPEN,
parseXmlToolCall,
partialXmlOpenerStart,
} from './xml-parser.js';
import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js';
import type {
InferenceContext,
StreamResult,
TurnArgs,
} from './turn.js';
import { upstreamModel } from './provider.js';
import {
jsonSchema,
streamText,
tool,
type JSONValue,
type ModelMessage,
type ToolCallRepairFunction,
} from 'ai';
interface StreamOptions {
// null = omit tools entirely (compact phase); [] = caller stripped all tools
// (rare; we still omit from the request body to avoid OpenAI 400).
tools: ToolJsonSchema[] | null;
temperature?: number;
}
// v1.13.1-A: convert BooCode's OpenAI-shaped history into AI SDK
// ModelMessage[]. Tool result messages need a `toolName` field that the
// OpenAI shape doesn't carry; we look it up by scanning earlier assistant
// `tool_calls` entries for a matching id.
function toModelMessages(messages: OpenAiMessage[]): ModelMessage[] {
const toolNameById = new Map<string, string>();
for (const m of messages) {
if (m.role === 'assistant' && m.tool_calls) {
for (const tc of m.tool_calls) {
toolNameById.set(tc.id, tc.function.name);
}
}
}
const out: ModelMessage[] = [];
for (const m of messages) {
if (m.role === 'system' || m.role === 'user') {
out.push({ role: m.role, content: m.content ?? '' });
continue;
}
if (m.role === 'assistant') {
const hasTools = m.tool_calls && m.tool_calls.length > 0;
const hasReasoning = typeof m.reasoning === 'string' && m.reasoning.length > 0;
if (!hasTools && !hasReasoning) {
// Bare text assistant (string content). null content + no tool_calls
// is degenerate but harmless to forward.
out.push({ role: 'assistant', content: m.content ?? '' });
continue;
}
// v1.13.1-C: AI SDK ReasoningPart precedes text + tool-calls in the
// assistant content array. Reasoning models (qwen3.6) consume their
// prior reasoning context to resume mid-thought across tool boundaries.
const parts: Array<
| { type: 'reasoning'; text: string }
| { type: 'text'; text: string }
| { type: 'tool-call'; toolCallId: string; toolName: string; input: unknown }
> = [];
if (hasReasoning) {
parts.push({ type: 'reasoning', text: m.reasoning! });
}
if (m.content && m.content.length > 0) {
parts.push({ type: 'text', text: m.content });
}
for (const tc of m.tool_calls ?? []) {
let input: unknown = {};
try {
input = tc.function.arguments.length > 0 ? JSON.parse(tc.function.arguments) : {};
} catch {
// Malformed args from a prior turn: pass through as a raw blob so
// the model sees the same shape it emitted. Wraps the string under
// _raw to match the buildMessagesPayload upstream convention.
input = { _raw: tc.function.arguments };
}
parts.push({ type: 'tool-call', toolCallId: tc.id, toolName: tc.function.name, input });
}
out.push({ role: 'assistant', content: parts });
continue;
}
if (m.role === 'tool') {
const toolCallId = m.tool_call_id ?? '';
const toolName = toolNameById.get(toolCallId) ?? 'unknown';
const raw = m.content ?? '';
let output: { type: 'text'; value: string } | { type: 'json'; value: JSONValue };
try {
// JSON.parse returns `any`; cast to JSONValue since the upstream
// tool_results column is already JSON-serializable by construction.
output = { type: 'json', value: JSON.parse(raw) as JSONValue };
} catch {
output = { type: 'text', value: raw };
}
out.push({
role: 'tool',
content: [{ type: 'tool-result', toolCallId, toolName, output }],
});
continue;
}
}
return out;
}
// Build the AI SDK tools record from BooCode's JSON-schema tool definitions.
// No `execute` field: BooCode runs tools itself in tool-phase.ts; streamText
// surfaces the tool-call parts via fullStream and we capture them for the
// outer loop to dispatch.
function buildAiTools(schemas: ToolJsonSchema[]): Record<string, ReturnType<typeof tool>> {
const out: Record<string, ReturnType<typeof tool>> = {};
for (const s of schemas) {
out[s.function.name] = tool({
description: s.function.description,
inputSchema: jsonSchema(s.function.parameters),
});
}
return out;
}
// v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via
// llama-swap) emit tool calls as inline XML inside delta.content rather than
// the structured tool_calls field. We extract them out of the streamed text
// before flushing it to the client, mirroring the pre-AI-SDK behavior.
//
// XML shape:
// <tool_call>
// <function=NAME>
// <parameter=KEY>VALUE</parameter>
// ...
// </function>
// </tool_call>
// Multiple <tool_call> blocks may appear back-to-back; they never nest.
export async function streamCompletion(
ctx: InferenceContext,
model: string,
messages: OpenAiMessage[],
opts: StreamOptions,
onDelta: (content: string) => void,
onUsage: ((prompt: number | null, completion: number | null) => void) | undefined,
signal?: AbortSignal
): Promise<StreamResult> {
const aiMessages = toModelMessages(messages);
const hasTools = opts.tools !== null && opts.tools.length > 0;
const aiTools = hasTools ? buildAiTools(opts.tools!) : undefined;
const startedAt = Date.now();
// v1.13.1-C: accumulate reasoning text across reasoning-delta parts.
// qwen3.6 emits these on a separate channel from text content; we capture
// them per stream so finalizeCompletion can dual-write a 'reasoning' part.
// Replaces the v1.13.1-A counter-only diagnostic.
let reasoningAccumulated = '';
// v1.13.3: experimental_repairToolCall keeps the stream alive when the
// model emits a malformed tool call (bad JSON args, unknown name, etc.).
// Without a repair function streamText throws and the WHOLE stream dies;
// with one, the SDK invokes us and we route the bad call through normally.
// Strategy: pass through unmodified. executeToolPhase's existing error
// path (unknown tool name → "unknown tool: X" result; zod-reject → tool
// 'X' rejected — fieldname: required) already gives the model a clean
// recovery surface on the next turn. Logging gives us visibility into
// how often qwen3.6 actually emits broken calls.
const repairToolCall: ToolCallRepairFunction<NonNullable<typeof aiTools>> = async ({
toolCall,
error,
}) => {
ctx.log.warn(
{
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
error: error.message,
},
'malformed tool call surfaced via repairToolCall',
);
return toolCall;
};
const result = streamText({
model: upstreamModel(ctx.config.LLAMA_SWAP_URL, model),
messages: aiMessages,
...(aiTools
? { tools: aiTools, toolChoice: 'auto' as const, experimental_repairToolCall: repairToolCall }
: {}),
...(typeof opts.temperature === 'number' ? { temperature: opts.temperature } : {}),
abortSignal: signal,
});
let content = '';
let pendingBuffer = '';
let finishReason: string | null = null;
// v1.13.1-A: AI SDK emits one `tool-call` part per fully-aggregated call,
// so we no longer need the OpenAI-index reassembly map the manual SSE
// parser used. XML tool calls extracted from text content go into the
// same flat list and keep the v1.10.5 synthetic id convention.
const toolCalls: ToolCall[] = [];
for await (const part of result.fullStream) {
switch (part.type) {
case 'text-delta': {
pendingBuffer += part.text;
// Extract any complete <tool_call>...</tool_call> blocks before
// flushing visible text.
while (true) {
const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN);
if (startIdx === -1) break;
const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx);
if (closeIdx === -1) break;
const blockEnd = closeIdx + XML_TOOL_CLOSE.length;
const block = pendingBuffer.slice(startIdx, blockEnd);
if (startIdx > 0) {
const before = pendingBuffer.slice(0, startIdx);
content += before;
onDelta(before);
}
const parsedCall = parseXmlToolCall(block);
if (parsedCall) {
const synthIdx = toolCalls.length;
toolCalls.push({
id: `xml_call_${synthIdx}`,
name: parsedCall.name,
args: parsedCall.args,
});
}
// Parse failures still drop the block — leaking <tool_call> XML to
// the chat would look worse than silently swallowing the bad block.
pendingBuffer = pendingBuffer.slice(blockEnd);
}
// Hold back any (partial or full) unclosed opener; flush the rest.
const partialIdx = partialXmlOpenerStart(pendingBuffer);
if (partialIdx >= 0) {
if (partialIdx > 0) {
const flush = pendingBuffer.slice(0, partialIdx);
content += flush;
onDelta(flush);
}
pendingBuffer = pendingBuffer.slice(partialIdx);
} else if (pendingBuffer.length > 0) {
content += pendingBuffer;
onDelta(pendingBuffer);
pendingBuffer = '';
}
break;
}
case 'tool-call': {
// AI SDK has already parsed the input into an object. Match the
// ToolCall shape BooCode passes around in toolCallsBuffer downstream.
toolCalls.push({
id: part.toolCallId,
name: part.toolName,
args: (part.input ?? {}) as Record<string, unknown>,
});
break;
}
case 'reasoning-delta': {
// v1.13.1-C: accumulate; finalizeCompletion / executeToolPhase
// dual-write the resulting text as a kind='reasoning' part.
if (typeof part.text === 'string') {
reasoningAccumulated += part.text;
}
break;
}
case 'finish': {
if (typeof part.finishReason === 'string') {
finishReason = part.finishReason;
}
break;
}
case 'error': {
const err = part.error;
throw err instanceof Error ? err : new Error(String(err));
}
// Intentional no-op: start, start-step, text-start, text-end,
// reasoning-start, reasoning-end, source, file, tool-input-start,
// tool-input-delta, tool-input-end, tool-result, tool-error,
// finish-step, raw. We only care about the aggregated tool-call and
// text-delta paths above; the rest are AI SDK lifecycle/streaming
// breadcrumbs that don't change BooCode's persistence or WS contract.
default:
break;
}
}
// v1.13.1-A: drain any buffered partial XML opener as plain text. The
// pre-AI-SDK path did this on stream end too — better to leak `<tool_c`
// than vanish the text.
if (pendingBuffer.length > 0) {
content += pendingBuffer;
onDelta(pendingBuffer);
pendingBuffer = '';
}
// AI SDK v6 fullStream returns normally on abort; check signal explicitly.
// Without this throw the row would land as status='complete' with partial
// content instead of going through handleAbortOrError → status='cancelled'.
// Smoke D caught this in v1.13.1-A — don't refactor it away.
if (signal?.aborted) {
const abortErr = new Error('aborted');
abortErr.name = 'AbortError';
throw abortErr;
}
// Usage lands as a promise on the result; awaiting after fullStream is
// drained is safe. AI SDK v6 names: `inputTokens` / `outputTokens`.
let promptTokens: number | null = null;
let completionTokens: number | null = null;
try {
const usage = await result.usage;
if (typeof usage.inputTokens === 'number') promptTokens = usage.inputTokens;
if (typeof usage.outputTokens === 'number') completionTokens = usage.outputTokens;
} catch {
// Some providers omit usage on partial streams; leave both null.
}
if (onUsage && (promptTokens !== null || completionTokens !== null)) {
onUsage(promptTokens, completionTokens);
}
if (reasoningAccumulated.length > 0) {
ctx.log.debug(
{ reasoningChars: reasoningAccumulated.length, model, elapsed_ms: Date.now() - startedAt },
'streamCompletion: captured reasoning',
);
}
return {
finishReason,
content,
toolCalls,
promptTokens,
completionTokens,
reasoning: reasoningAccumulated,
};
}
export async function executeStreamPhase(
ctx: InferenceContext,
args: TurnArgs,
session: Session,
messages: OpenAiMessage[],
state: StreamPhaseState,
agent: Agent | null,
// v1.11.8: when false, web_search and web_fetch are stripped from the
// tool list sent to the LLM, so the model can't even attempt them.
webToolsEnabled: boolean,
): Promise<StreamResult> {
const { sessionId, chatId, assistantMessageId, signal } = args;
const startedRow = await ctx.sql<{ started_at: string }[]>`
UPDATE messages
SET started_at = clock_timestamp()
WHERE id = ${assistantMessageId}
RETURNING started_at
`;
state.startedAt = startedRow[0]?.started_at ?? null;
ctx.publish(sessionId, {
type: 'message_started',
message_id: assistantMessageId,
chat_id: chatId,
role: 'assistant',
});
let pendingFlushTimer: NodeJS.Timeout | null = null;
let flushPromise: Promise<unknown> = Promise.resolve();
const flushNow = () => {
if (pendingFlushTimer) {
clearTimeout(pendingFlushTimer);
pendingFlushTimer = null;
}
const snapshot = state.accumulated;
flushPromise = flushPromise.then(() =>
ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}`
);
};
const scheduleFlush = () => {
if (pendingFlushTimer) return;
pendingFlushTimer = setTimeout(() => {
pendingFlushTimer = null;
flushNow();
}, DB_FLUSH_INTERVAL_MS);
};
// Tool whitelist: if an agent is set, filter the global tool list to only the
// tool names it allows. Unknown names in agent.tools are dropped silently
// (handled here by intersection). When no agent: send all tools.
// v1.11.8: a second filter strips web_search + web_fetch unless the chat
// has them explicitly enabled. Counts as an opt-in security boundary: the
// model can't summon a tool that wasn't offered to it.
const WEB_TOOL_NAMES: ReadonlySet<string> = new Set(['web_search', 'web_fetch']);
const effectiveTools: ToolJsonSchema[] = (agent
? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name))
: toolJsonSchemas()
).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name));
const effectiveTemperature = agent?.temperature;
// v1.12.2: ctx_max lookup is cached after the first hit per model, so this
// is a Map probe in steady state. We capture nCtx once at the top of the
// stream so the throttled usage publish doesn't refetch each tick.
const mctxForStream = await modelContext.getModelContext(session.model);
const nCtxForStream = mctxForStream?.n_ctx ?? null;
// v1.12.2 → v1.13.1-A: live usage publishes were throttled to ~500ms when
// the manual SSE parser saw `parsed.usage` per chunk. AI SDK v6 surfaces
// usage only at stream end (result.usage promise), so the throttle is
// effectively a single trailing publish. ChatThroughput will tick once at
// stream completion rather than mid-stream — known regression vs v1.12.2,
// recovered if a future dispatch interpolates from delta cadence.
const USAGE_THROTTLE_MS = 500;
let lastUsageAt = 0;
let pendingUsage: { p: number | null; c: number | null } | null = null;
let usageTimer: NodeJS.Timeout | null = null;
const flushUsage = () => {
if (!pendingUsage) return;
const { p, c } = pendingUsage;
pendingUsage = null;
lastUsageAt = Date.now();
ctx.publish(sessionId, {
type: 'usage',
message_id: assistantMessageId,
chat_id: chatId,
completion_tokens: c,
ctx_used: p,
ctx_max: nCtxForStream,
});
};
try {
return await streamCompletion(
ctx,
session.model,
messages,
{ tools: effectiveTools, temperature: effectiveTemperature },
(delta) => {
state.accumulated += delta;
ctx.publish(sessionId, {
type: 'delta',
message_id: assistantMessageId,
chat_id: chatId,
content: delta,
});
ctx.log.debug({ sessionId, delta }, 'inference delta');
scheduleFlush();
},
(prompt, completion) => {
pendingUsage = { p: prompt, c: completion };
const elapsed = Date.now() - lastUsageAt;
if (elapsed >= USAGE_THROTTLE_MS) {
flushUsage();
} else if (!usageTimer) {
usageTimer = setTimeout(() => {
usageTimer = null;
flushUsage();
}, USAGE_THROTTLE_MS - elapsed);
}
},
signal
);
} finally {
if (pendingFlushTimer) {
clearTimeout(pendingFlushTimer);
pendingFlushTimer = null;
}
if (usageTimer) {
clearTimeout(usageTimer);
usageTimer = null;
}
await flushPromise;
}
}