import type { Agent, Session, ToolCall, } from '../../types/api.js'; import * as modelContext from '../model-context.js'; import { toolJsonSchemas, type ToolJsonSchema } from '../tools.js'; import { matchToolGlob } from '../agents.js'; import type { OpenAiMessage } from './payload.js'; import { extractToolCallBlocks } from './tool-call-parser.js'; import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js'; import type { InferenceContext, StreamResult, TurnArgs, } from './turn.js'; import { upstreamModel } from './provider.js'; import { jsonSchema, streamText, tool, type JSONValue, type ModelMessage, type ToolCallRepairFunction, } from 'ai'; interface StreamOptions { // null = omit tools entirely (compact phase); [] = caller stripped all tools // (rare; we still omit from the request body to avoid OpenAI 400). tools: ToolJsonSchema[] | null; temperature?: number; top_p?: number | null; top_k?: number | null; min_p?: number | null; presence_penalty?: number | null; } // v1.13.1-A: convert BooCode's OpenAI-shaped history into AI SDK // ModelMessage[]. Tool result messages need a `toolName` field that the // OpenAI shape doesn't carry; we look it up by scanning earlier assistant // `tool_calls` entries for a matching id. function toModelMessages(messages: OpenAiMessage[]): ModelMessage[] { const toolNameById = new Map(); for (const m of messages) { if (m.role === 'assistant' && m.tool_calls) { for (const tc of m.tool_calls) { toolNameById.set(tc.id, tc.function.name); } } } const out: ModelMessage[] = []; for (const m of messages) { if (m.role === 'system' || m.role === 'user') { out.push({ role: m.role, content: m.content ?? '' }); continue; } if (m.role === 'assistant') { const hasTools = m.tool_calls && m.tool_calls.length > 0; const hasReasoning = typeof m.reasoning === 'string' && m.reasoning.length > 0; if (!hasTools && !hasReasoning) { // Bare text assistant (string content). null content + no tool_calls // is degenerate but harmless to forward. out.push({ role: 'assistant', content: m.content ?? '' }); continue; } // v1.13.1-C: AI SDK ReasoningPart precedes text + tool-calls in the // assistant content array. Reasoning models (qwen3.6) consume their // prior reasoning context to resume mid-thought across tool boundaries. const parts: Array< | { type: 'reasoning'; text: string } | { type: 'text'; text: string } | { type: 'tool-call'; toolCallId: string; toolName: string; input: unknown } > = []; if (hasReasoning) { parts.push({ type: 'reasoning', text: m.reasoning! }); } if (m.content && m.content.length > 0) { parts.push({ type: 'text', text: m.content }); } for (const tc of m.tool_calls ?? []) { let input: unknown = {}; try { input = tc.function.arguments.length > 0 ? JSON.parse(tc.function.arguments) : {}; } catch { // Malformed args from a prior turn: pass through as a raw blob so // the model sees the same shape it emitted. Wraps the string under // _raw to match the buildMessagesPayload upstream convention. input = { _raw: tc.function.arguments }; } parts.push({ type: 'tool-call', toolCallId: tc.id, toolName: tc.function.name, input }); } out.push({ role: 'assistant', content: parts }); continue; } if (m.role === 'tool') { const toolCallId = m.tool_call_id ?? ''; const toolName = toolNameById.get(toolCallId) ?? 'unknown'; const raw = m.content ?? ''; let output: { type: 'text'; value: string } | { type: 'json'; value: JSONValue }; try { // JSON.parse returns `any`; cast to JSONValue since the upstream // tool_results column is already JSON-serializable by construction. output = { type: 'json', value: JSON.parse(raw) as JSONValue }; } catch { output = { type: 'text', value: raw }; } out.push({ role: 'tool', content: [{ type: 'tool-result', toolCallId, toolName, output }], }); continue; } } return out; } // Build the AI SDK tools record from BooCode's JSON-schema tool definitions. // No `execute` field: BooCode runs tools itself in tool-phase.ts; streamText // surfaces the tool-call parts via fullStream and we capture them for the // outer loop to dispatch. function buildAiTools(schemas: ToolJsonSchema[]): Record> { const out: Record> = {}; for (const s of schemas) { out[s.function.name] = tool({ description: s.function.description, inputSchema: jsonSchema(s.function.parameters), }); } return out; } // v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via // llama-swap) emit tool calls as inline XML inside delta.content rather than // the structured tool_calls field. We extract them out of the streamed text // before flushing it to the client. // // Qwen shape: // // // VALUE // ... // // // // v1.13.16: also recognize Anthropic markup that qwen3.6-35b-a3b-mxfp4 // drifts to (training-data residue from Claude Code documentation): // // VALUE // // Both formats share the synthetic xml_call_${idx} ID space; the counter // increments across whichever opener appears first. Multiple blocks may // appear back-to-back in either format and they never nest. export async function streamCompletion( ctx: InferenceContext, model: string, messages: OpenAiMessage[], opts: StreamOptions, onDelta: (content: string) => void, onUsage: ((prompt: number | null, completion: number | null) => void) | undefined, signal?: AbortSignal, agent?: Agent | null, ): Promise { const aiMessages = toModelMessages(messages); const hasTools = opts.tools !== null && opts.tools.length > 0; const aiTools = hasTools ? buildAiTools(opts.tools!) : undefined; const startedAt = Date.now(); // v1.13.1-C: accumulate reasoning text across reasoning-delta parts. // qwen3.6 emits these on a separate channel from text content; we capture // them per stream so finalizeCompletion can dual-write a 'reasoning' part. // Replaces the v1.13.1-A counter-only diagnostic. let reasoningAccumulated = ''; // v1.13.3: experimental_repairToolCall keeps the stream alive when the // model emits a malformed tool call (bad JSON args, unknown name, etc.). // Without a repair function streamText throws and the WHOLE stream dies; // with one, the SDK invokes us and we route the bad call through normally. // Strategy: pass through unmodified. executeToolPhase's existing error // path (unknown tool name → "unknown tool: X" result; zod-reject → tool // 'X' rejected — fieldname: required) already gives the model a clean // recovery surface on the next turn. Logging gives us visibility into // how often qwen3.6 actually emits broken calls. const repairToolCall: ToolCallRepairFunction> = async ({ toolCall, error, }) => { ctx.log.warn( { toolCallId: toolCall.toolCallId, toolName: toolCall.toolName, error: error.message, }, 'malformed tool call surfaced via repairToolCall', ); return toolCall; }; const result = streamText({ model: upstreamModel(ctx.config, model, agent ?? null), messages: aiMessages, ...(aiTools ? { tools: aiTools, toolChoice: 'auto' as const, experimental_repairToolCall: repairToolCall } : {}), ...(typeof opts.temperature === 'number' ? { temperature: opts.temperature } : {}), ...(typeof opts.top_p === 'number' ? { topP: opts.top_p } : {}), ...(typeof opts.top_k === 'number' ? { topK: opts.top_k } : {}), ...(typeof opts.presence_penalty === 'number' ? { presencePenalty: opts.presence_penalty } : {}), abortSignal: signal, }); let content = ''; let pendingBuffer = ''; let finishReason: string | null = null; // v1.13.1-A: AI SDK emits one `tool-call` part per fully-aggregated call, // so we no longer need the OpenAI-index reassembly map the manual SSE // parser used. XML tool calls extracted from text content go into the // same flat list and keep the v1.10.5 synthetic id convention. const toolCalls: ToolCall[] = []; for await (const part of result.fullStream) { switch (part.type) { case 'text-delta': { pendingBuffer += part.text; // v1.13.16: unified extraction. The helper finds the earliest-opening // complete or block, flushes prose between/around // them, holds any partial opener for the next chunk, and silently // drops blocks that fail to parse (matches pre-v1.13.16 behavior). const extracted = extractToolCallBlocks(pendingBuffer); if (extracted.flushed.length > 0) { content += extracted.flushed; onDelta(extracted.flushed); } for (const call of extracted.calls) { const synthIdx = toolCalls.length; toolCalls.push({ id: `xml_call_${synthIdx}`, name: call.name, args: call.args, }); } pendingBuffer = extracted.remaining; break; } case 'tool-call': { // AI SDK has already parsed the input into an object. Match the // ToolCall shape BooCode passes around in toolCallsBuffer downstream. toolCalls.push({ id: part.toolCallId, name: part.toolName, args: (part.input ?? {}) as Record, }); break; } case 'reasoning-delta': { // v1.13.1-C: accumulate; finalizeCompletion / executeToolPhase // dual-write the resulting text as a kind='reasoning' part. if (typeof part.text === 'string') { reasoningAccumulated += part.text; } break; } case 'finish': { if (typeof part.finishReason === 'string') { finishReason = part.finishReason; } break; } case 'error': { const err = part.error; throw err instanceof Error ? err : new Error(String(err)); } // Intentional no-op: start, start-step, text-start, text-end, // reasoning-start, reasoning-end, source, file, tool-input-start, // tool-input-delta, tool-input-end, tool-result, tool-error, // finish-step, raw. We only care about the aggregated tool-call and // text-delta paths above; the rest are AI SDK lifecycle/streaming // breadcrumbs that don't change BooCode's persistence or WS contract. default: break; } } // v1.13.1-A: drain any buffered partial XML opener as plain text. The // pre-AI-SDK path did this on stream end too — better to leak ` 0) { content += pendingBuffer; onDelta(pendingBuffer); pendingBuffer = ''; } // AI SDK v6 fullStream returns normally on abort; check signal explicitly. // Without this throw the row would land as status='complete' with partial // content instead of going through handleAbortOrError → status='cancelled'. // Smoke D caught this in v1.13.1-A — don't refactor it away. if (signal?.aborted) { const abortErr = new Error('aborted'); abortErr.name = 'AbortError'; throw abortErr; } // Usage lands as a promise on the result; awaiting after fullStream is // drained is safe. AI SDK v6 names: `inputTokens` / `outputTokens`. let promptTokens: number | null = null; let completionTokens: number | null = null; try { const usage = await result.usage; if (typeof usage.inputTokens === 'number') promptTokens = usage.inputTokens; if (typeof usage.outputTokens === 'number') completionTokens = usage.outputTokens; } catch { // Some providers omit usage on partial streams; leave both null. } if (onUsage && (promptTokens !== null || completionTokens !== null)) { onUsage(promptTokens, completionTokens); } if (reasoningAccumulated.length > 0) { ctx.log.debug( { reasoningChars: reasoningAccumulated.length, model, elapsed_ms: Date.now() - startedAt }, 'streamCompletion: captured reasoning', ); } return { finishReason, content, toolCalls, promptTokens, completionTokens, reasoning: reasoningAccumulated, }; } export async function executeStreamPhase( ctx: InferenceContext, args: TurnArgs, session: Session, messages: OpenAiMessage[], state: StreamPhaseState, agent: Agent | null, // v1.11.8: when false, web_search and web_fetch are stripped from the // tool list sent to the LLM, so the model can't even attempt them. webToolsEnabled: boolean, ): Promise { const { sessionId, chatId, assistantMessageId, signal } = args; const startedRow = await ctx.sql<{ started_at: string }[]>` UPDATE messages SET started_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING started_at `; state.startedAt = startedRow[0]?.started_at ?? null; ctx.publish(sessionId, { type: 'message_started', message_id: assistantMessageId, chat_id: chatId, role: 'assistant', }); let pendingFlushTimer: NodeJS.Timeout | null = null; let flushPromise: Promise = Promise.resolve(); const flushNow = () => { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } const snapshot = state.accumulated; flushPromise = flushPromise.then(() => ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` ); }; const scheduleFlush = () => { if (pendingFlushTimer) return; pendingFlushTimer = setTimeout(() => { pendingFlushTimer = null; flushNow(); }, DB_FLUSH_INTERVAL_MS); }; // Tool whitelist: if an agent is set, filter the global tool list to only the // tool names it allows. v1.15.0-mcp-multi: uses matchToolGlob for glob // pattern support (e.g. `context7_*`, `!web_*`). When no agent: send all tools. // v1.11.8: a second filter strips web_search + web_fetch unless the chat // has them explicitly enabled. Counts as an opt-in security boundary: the // model can't summon a tool that wasn't offered to it. const WEB_TOOL_NAMES: ReadonlySet = new Set(['web_search', 'web_fetch']); const effectiveTools: ToolJsonSchema[] = (agent ? toolJsonSchemas().filter((t) => matchToolGlob(t.function.name, agent.tools)) : toolJsonSchemas() ).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name)); const effectiveTemperature = agent?.temperature; const effectiveTopP = agent?.top_p ?? undefined; const effectiveTopK = agent?.top_k ?? undefined; const effectiveMinP = agent?.min_p ?? undefined; const effectivePresencePenalty = agent?.presence_penalty ?? undefined; // v1.12.2: ctx_max lookup is cached after the first hit per model, so this // is a Map probe in steady state. We capture nCtx once at the top of the // stream so the throttled usage publish doesn't refetch each tick. const mctxForStream = await modelContext.getModelContext(session.model); const nCtxForStream = mctxForStream?.n_ctx ?? null; // v1.12.2 → v1.13.1-A: live usage publishes were throttled to ~500ms when // the manual SSE parser saw `parsed.usage` per chunk. AI SDK v6 surfaces // usage only at stream end (result.usage promise), so the throttle is // effectively a single trailing publish. ChatThroughput will tick once at // stream completion rather than mid-stream — known regression vs v1.12.2, // recovered if a future dispatch interpolates from delta cadence. const USAGE_THROTTLE_MS = 500; let lastUsageAt = 0; let pendingUsage: { p: number | null; c: number | null } | null = null; let usageTimer: NodeJS.Timeout | null = null; const flushUsage = () => { if (!pendingUsage) return; const { p, c } = pendingUsage; pendingUsage = null; lastUsageAt = Date.now(); ctx.publish(sessionId, { type: 'usage', message_id: assistantMessageId, chat_id: chatId, completion_tokens: c, ctx_used: p, ctx_max: nCtxForStream, }); }; try { return await streamCompletion( ctx, session.model, messages, { tools: effectiveTools, temperature: effectiveTemperature, top_p: effectiveTopP, top_k: effectiveTopK, min_p: effectiveMinP, presence_penalty: effectivePresencePenalty }, (delta) => { state.accumulated += delta; ctx.publish(sessionId, { type: 'delta', message_id: assistantMessageId, chat_id: chatId, content: delta, }); ctx.log.debug({ sessionId, delta }, 'inference delta'); scheduleFlush(); }, (prompt, completion) => { pendingUsage = { p: prompt, c: completion }; const elapsed = Date.now() - lastUsageAt; if (elapsed >= USAGE_THROTTLE_MS) { flushUsage(); } else if (!usageTimer) { usageTimer = setTimeout(() => { usageTimer = null; flushUsage(); }, USAGE_THROTTLE_MS - elapsed); } }, signal, agent, ); } finally { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } if (usageTimer) { clearTimeout(usageTimer); usageTimer = null; } await flushPromise; } }