import type { Agent, Session, ToolCall, } from '../../types/api.js'; import * as modelContext from '../model-context.js'; import { toolJsonSchemas, type ToolJsonSchema } from '../tools.js'; import type { OpenAiMessage } from './payload.js'; import { XML_TOOL_CLOSE, XML_TOOL_OPEN, parseXmlToolCall, partialXmlOpenerStart, } from './xml-parser.js'; import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js'; import type { InferenceContext, StreamResult, TurnArgs, } from './turn.js'; interface ChatCompletionDelta { role?: string; content?: string | null; tool_calls?: Array<{ index: number; id?: string; type?: 'function'; function?: { name?: string; arguments?: string }; }>; } interface ChatCompletionChunk { choices?: Array<{ delta: ChatCompletionDelta; finish_reason: string | null; }>; usage?: { prompt_tokens?: number; completion_tokens?: number; total_tokens?: number; }; } interface StreamOptions { // null = omit tools entirely (compact phase); [] = caller stripped all tools // (rare; we still omit from the request body to avoid OpenAI 400). tools: ToolJsonSchema[] | null; temperature?: number; } async function* sseLines(stream: ReadableStream): AsyncGenerator { const reader = stream.getReader(); const decoder = new TextDecoder('utf-8'); let buffer = ''; try { while (true) { const { value, done } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); let idx; while ((idx = buffer.indexOf('\n')) >= 0) { const line = buffer.slice(0, idx).replace(/\r$/, ''); buffer = buffer.slice(idx + 1); if (line.length === 0) continue; yield line; } } if (buffer.length > 0) yield buffer; } finally { reader.releaseLock(); } } // v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via // llama-swap) emit tool calls as inline XML inside delta.content rather than // the structured delta.tool_calls field. The XML shape is: // // // // VALUE // // ...more parameters... // // // Multiple blocks may appear back-to-back; they never nest. // streamCompletion buffers delta.content, extracts complete blocks, parses // them via parseXmlToolCall, and pushes synthetic entries into the existing // toolCallsBuffer alongside any native JSON-format tool calls. export async function streamCompletion( ctx: InferenceContext, model: string, messages: OpenAiMessage[], opts: StreamOptions, onDelta: (content: string) => void, onUsage: ((prompt: number | null, completion: number | null) => void) | undefined, signal?: AbortSignal ): Promise { const body: Record = { model, messages, stream: true, stream_options: { include_usage: true }, }; if (opts.tools && opts.tools.length > 0) { body['tools'] = opts.tools; body['tool_choice'] = 'auto'; } if (typeof opts.temperature === 'number') { body['temperature'] = opts.temperature; } const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body), signal, }); if (!res.ok || !res.body) { const text = await res.text().catch(() => ''); throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`); } let content = ''; // v1.10.5: holds delta.content bytes that may contain a partial XML tool // call. Anything not part of a (possibly forming) // pair is flushed to content + onDelta as soon as we know it's safe. let pendingBuffer = ''; let finishReason: string | null = null; let promptTokens: number | null = null; let completionTokens: number | null = null; const toolCallsBuffer = new Map(); for await (const line of sseLines(res.body)) { if (!line.startsWith('data:')) continue; const payload = line.slice(5).trim(); if (payload === '[DONE]') break; let parsed: ChatCompletionChunk; try { parsed = JSON.parse(payload); } catch { continue; } if (parsed.usage) { if (typeof parsed.usage.prompt_tokens === 'number') { promptTokens = parsed.usage.prompt_tokens; } if (typeof parsed.usage.completion_tokens === 'number') { completionTokens = parsed.usage.completion_tokens; } onUsage?.(promptTokens, completionTokens); } // v1.11.3: removed dead `parsed.timings.n_ctx` read. llama-server's // streaming completion does NOT emit n_ctx in timings (verified // empirically); the authoritative source is llama-swap's // /upstream//props endpoint, fetched per-turn via // model-context.getModelContext() at the finalization sites below. const choice = parsed.choices?.[0]; if (!choice) continue; const delta = choice.delta ?? {}; if (typeof delta.content === 'string' && delta.content.length > 0) { // v1.10.5 XML fallback. Append, then extract any complete tool_call // blocks before deciding what's safe to flush as visible content. pendingBuffer += delta.content; while (true) { const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN); if (startIdx === -1) break; const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx); if (closeIdx === -1) break; const blockEnd = closeIdx + XML_TOOL_CLOSE.length; const block = pendingBuffer.slice(startIdx, blockEnd); // Any text before the opener is plain content — flush it now. if (startIdx > 0) { const before = pendingBuffer.slice(0, startIdx); content += before; onDelta(before); } const parsedCall = parseXmlToolCall(block); if (parsedCall) { const synthIdx = toolCallsBuffer.size; toolCallsBuffer.set(synthIdx, { id: `xml_call_${synthIdx}`, name: parsedCall.name, argsText: JSON.stringify(parsedCall.args), }); } // If parsing failed we still drop the block — emitting unparseable // XML to the chat would look worse than silently swallowing it. pendingBuffer = pendingBuffer.slice(blockEnd); } // After all complete blocks are out, hold back any (partial or full) // unclosed opener; flush the rest. const partialIdx = partialXmlOpenerStart(pendingBuffer); if (partialIdx >= 0) { if (partialIdx > 0) { const flush = pendingBuffer.slice(0, partialIdx); content += flush; onDelta(flush); } pendingBuffer = pendingBuffer.slice(partialIdx); } else if (pendingBuffer.length > 0) { content += pendingBuffer; onDelta(pendingBuffer); pendingBuffer = ''; } } if (Array.isArray(delta.tool_calls)) { for (const tc of delta.tool_calls) { const idx = tc.index; const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' }; if (tc.id) existing.id = tc.id; if (tc.function?.name) existing.name = tc.function.name; if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments; toolCallsBuffer.set(idx, existing); } } if (choice.finish_reason) finishReason = choice.finish_reason; } // v1.10.5: if the stream ended mid-XML (e.g. model truncated, no closer // ever arrived), flush whatever was buffered as plain content so it isn't // silently dropped. Better to show a stray `` than vanish text. if (pendingBuffer.length > 0) { content += pendingBuffer; onDelta(pendingBuffer); pendingBuffer = ''; } const toolCalls: ToolCall[] = []; for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) { let args: Record = {}; if (t.argsText.length > 0) { try { args = JSON.parse(t.argsText); } catch { args = { _raw: t.argsText }; } } toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args }); } return { finishReason, content, toolCalls, promptTokens, completionTokens }; } export async function executeStreamPhase( ctx: InferenceContext, args: TurnArgs, session: Session, messages: OpenAiMessage[], state: StreamPhaseState, agent: Agent | null, // v1.11.8: when false, web_search and web_fetch are stripped from the // tool list sent to the LLM, so the model can't even attempt them. webToolsEnabled: boolean, ): Promise { const { sessionId, chatId, assistantMessageId, signal } = args; const startedRow = await ctx.sql<{ started_at: string }[]>` UPDATE messages SET started_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING started_at `; state.startedAt = startedRow[0]?.started_at ?? null; ctx.publish(sessionId, { type: 'message_started', message_id: assistantMessageId, chat_id: chatId, role: 'assistant', }); let pendingFlushTimer: NodeJS.Timeout | null = null; let flushPromise: Promise = Promise.resolve(); const flushNow = () => { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } const snapshot = state.accumulated; flushPromise = flushPromise.then(() => ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` ); }; const scheduleFlush = () => { if (pendingFlushTimer) return; pendingFlushTimer = setTimeout(() => { pendingFlushTimer = null; flushNow(); }, DB_FLUSH_INTERVAL_MS); }; // Tool whitelist: if an agent is set, filter the global tool list to only the // tool names it allows. Unknown names in agent.tools are dropped silently // (handled here by intersection). When no agent: send all tools. // v1.11.8: a second filter strips web_search + web_fetch unless the chat // has them explicitly enabled. Counts as an opt-in security boundary: the // model can't summon a tool that wasn't offered to it. const WEB_TOOL_NAMES: ReadonlySet = new Set(['web_search', 'web_fetch']); const effectiveTools: ToolJsonSchema[] = (agent ? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name)) : toolJsonSchemas() ).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name)); const effectiveTemperature = agent?.temperature; // v1.12.2: ctx_max lookup is cached after the first hit per model, so this // is a Map probe in steady state. We capture nCtx once at the top of the // stream so the throttled usage publish doesn't refetch each tick. const mctxForStream = await modelContext.getModelContext(session.model); const nCtxForStream = mctxForStream?.n_ctx ?? null; // v1.12.2: throttle live usage publishes to ~500ms. The model can land // dozens of usage frames per second; without a throttle the WS turns into // a firehose for a few KB savings on each render. const USAGE_THROTTLE_MS = 500; let lastUsageAt = 0; let pendingUsage: { p: number | null; c: number | null } | null = null; let usageTimer: NodeJS.Timeout | null = null; const flushUsage = () => { if (!pendingUsage) return; const { p, c } = pendingUsage; pendingUsage = null; lastUsageAt = Date.now(); ctx.publish(sessionId, { type: 'usage', message_id: assistantMessageId, chat_id: chatId, completion_tokens: c, ctx_used: p, ctx_max: nCtxForStream, }); }; try { return await streamCompletion( ctx, session.model, messages, { tools: effectiveTools, temperature: effectiveTemperature }, (delta) => { state.accumulated += delta; ctx.publish(sessionId, { type: 'delta', message_id: assistantMessageId, chat_id: chatId, content: delta, }); ctx.log.debug({ sessionId, delta }, 'inference delta'); scheduleFlush(); }, (prompt, completion) => { pendingUsage = { p: prompt, c: completion }; const elapsed = Date.now() - lastUsageAt; if (elapsed >= USAGE_THROTTLE_MS) { flushUsage(); } else if (!usageTimer) { usageTimer = setTimeout(() => { usageTimer = null; flushUsage(); }, USAGE_THROTTLE_MS - elapsed); } }, signal ); } finally { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } if (usageTimer) { clearTimeout(usageTimer); usageTimer = null; } await flushPromise; } }