- Add ai@^6 and @ai-sdk/openai-compatible@^2 to apps/server.
- New services/inference/provider.ts: createOpenAICompatible against
llama-swap (baseURL threaded from config.LLAMA_SWAP_URL, cached per
baseURL). No apiKey — Authelia + Tailscale gate llama-swap, not keys.
- streamCompletion rewritten as an adapter over streamText. AI SDK
fullStream parts (text-delta, tool-call, finish, error) map back to
the legacy {content?, tool_calls?, finishReason} StreamResult shape
that executeStreamPhase already consumes. No layer above
streamCompletion changes.
- toModelMessages converts BooCode's OpenAI-shaped history to AI SDK
ModelMessage[]; tool messages need toolName which we look up by
scanning earlier assistant tool_calls for the matching id.
- buildAiTools wraps BooCode's JSON-schema tool defs via
tool({ inputSchema: jsonSchema(parameters) }) with NO execute —
BooCode dispatches tools in tool-phase.ts, not the AI SDK loop.
- XML fallback parser preserved as-is — qwen3.6 still emits XML tool
calls in text content that the structured tool-call layer misses.
- reasoning-delta parts dropped with a debug-level counter — captured
properly in v1.13.1-C.
- Abort path: streamText({ abortSignal }) wires ctx.signal through, but
AI SDK v6 swallows the abort (fullStream iterator exits cleanly
rather than throwing). Post-iteration `if (signal?.aborted) throw` so
handleAbortOrError owns the row and writes status='cancelled'. Caught
by smoke D; would have shipped as status='complete' on stop otherwise.
- Usage frame reads result.usage (inputTokens / outputTokens v6 names)
AFTER stream drain. Single trailing publish through the existing 500ms
throttle. Known regression: ChatThroughput's live mid-stream tick
(v1.12.2) is gone — it now shows a single value at stream end.
TODO(v1.13.1-followup): interpolate outputTokens during streaming
via a delta-cadence counter (e.g. part.text.length/4 token proxy)
and publish every 500ms; reconcile against result.usage at finish.
- Write-path dual-write from v1.13.0 unaffected.
Read path stays on JSON columns. v1.13.1-B flips reads to message_parts.
Smoke verified end-to-end against running container:
- A. Plain text: status='complete', 1 text part.
- B. Single tool prompt → multi-tool chain (4 calls): every assistant
with tool_calls has 2 parts (text+tool_call), every tool row has
1 part (tool_result).
- C. Multi-step covered by B's chain.
- D. Stop mid-stream: status='cancelled' written via handleAbortOrError
after the post-iteration abort throw.
431 lines
16 KiB
TypeScript
431 lines
16 KiB
TypeScript
import type {
|
|
Agent,
|
|
Session,
|
|
ToolCall,
|
|
} from '../../types/api.js';
|
|
import * as modelContext from '../model-context.js';
|
|
import { toolJsonSchemas, type ToolJsonSchema } from '../tools.js';
|
|
import type { OpenAiMessage } from './payload.js';
|
|
import {
|
|
XML_TOOL_CLOSE,
|
|
XML_TOOL_OPEN,
|
|
parseXmlToolCall,
|
|
partialXmlOpenerStart,
|
|
} from './xml-parser.js';
|
|
import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js';
|
|
import type {
|
|
InferenceContext,
|
|
StreamResult,
|
|
TurnArgs,
|
|
} from './turn.js';
|
|
import { upstreamModel } from './provider.js';
|
|
import { jsonSchema, streamText, tool, type JSONValue, type ModelMessage } from 'ai';
|
|
|
|
interface StreamOptions {
|
|
// null = omit tools entirely (compact phase); [] = caller stripped all tools
|
|
// (rare; we still omit from the request body to avoid OpenAI 400).
|
|
tools: ToolJsonSchema[] | null;
|
|
temperature?: number;
|
|
}
|
|
|
|
// v1.13.1-A: convert BooCode's OpenAI-shaped history into AI SDK
|
|
// ModelMessage[]. Tool result messages need a `toolName` field that the
|
|
// OpenAI shape doesn't carry; we look it up by scanning earlier assistant
|
|
// `tool_calls` entries for a matching id.
|
|
function toModelMessages(messages: OpenAiMessage[]): ModelMessage[] {
|
|
const toolNameById = new Map<string, string>();
|
|
for (const m of messages) {
|
|
if (m.role === 'assistant' && m.tool_calls) {
|
|
for (const tc of m.tool_calls) {
|
|
toolNameById.set(tc.id, tc.function.name);
|
|
}
|
|
}
|
|
}
|
|
const out: ModelMessage[] = [];
|
|
for (const m of messages) {
|
|
if (m.role === 'system' || m.role === 'user') {
|
|
out.push({ role: m.role, content: m.content ?? '' });
|
|
continue;
|
|
}
|
|
if (m.role === 'assistant') {
|
|
const hasTools = m.tool_calls && m.tool_calls.length > 0;
|
|
if (!hasTools) {
|
|
// Bare text assistant (string content). null content + no tool_calls
|
|
// is degenerate but harmless to forward.
|
|
out.push({ role: 'assistant', content: m.content ?? '' });
|
|
continue;
|
|
}
|
|
const parts: Array<
|
|
| { type: 'text'; text: string }
|
|
| { type: 'tool-call'; toolCallId: string; toolName: string; input: unknown }
|
|
> = [];
|
|
if (m.content && m.content.length > 0) {
|
|
parts.push({ type: 'text', text: m.content });
|
|
}
|
|
for (const tc of m.tool_calls!) {
|
|
let input: unknown = {};
|
|
try {
|
|
input = tc.function.arguments.length > 0 ? JSON.parse(tc.function.arguments) : {};
|
|
} catch {
|
|
// Malformed args from a prior turn: pass through as a raw blob so
|
|
// the model sees the same shape it emitted. Wraps the string under
|
|
// _raw to match the buildMessagesPayload upstream convention.
|
|
input = { _raw: tc.function.arguments };
|
|
}
|
|
parts.push({ type: 'tool-call', toolCallId: tc.id, toolName: tc.function.name, input });
|
|
}
|
|
out.push({ role: 'assistant', content: parts });
|
|
continue;
|
|
}
|
|
if (m.role === 'tool') {
|
|
const toolCallId = m.tool_call_id ?? '';
|
|
const toolName = toolNameById.get(toolCallId) ?? 'unknown';
|
|
const raw = m.content ?? '';
|
|
let output: { type: 'text'; value: string } | { type: 'json'; value: JSONValue };
|
|
try {
|
|
// JSON.parse returns `any`; cast to JSONValue since the upstream
|
|
// tool_results column is already JSON-serializable by construction.
|
|
output = { type: 'json', value: JSON.parse(raw) as JSONValue };
|
|
} catch {
|
|
output = { type: 'text', value: raw };
|
|
}
|
|
out.push({
|
|
role: 'tool',
|
|
content: [{ type: 'tool-result', toolCallId, toolName, output }],
|
|
});
|
|
continue;
|
|
}
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// Build the AI SDK tools record from BooCode's JSON-schema tool definitions.
|
|
// No `execute` field: BooCode runs tools itself in tool-phase.ts; streamText
|
|
// surfaces the tool-call parts via fullStream and we capture them for the
|
|
// outer loop to dispatch.
|
|
function buildAiTools(schemas: ToolJsonSchema[]): Record<string, ReturnType<typeof tool>> {
|
|
const out: Record<string, ReturnType<typeof tool>> = {};
|
|
for (const s of schemas) {
|
|
out[s.function.name] = tool({
|
|
description: s.function.description,
|
|
inputSchema: jsonSchema(s.function.parameters),
|
|
});
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via
|
|
// llama-swap) emit tool calls as inline XML inside delta.content rather than
|
|
// the structured tool_calls field. We extract them out of the streamed text
|
|
// before flushing it to the client, mirroring the pre-AI-SDK behavior.
|
|
//
|
|
// XML shape:
|
|
// <tool_call>
|
|
// <function=NAME>
|
|
// <parameter=KEY>VALUE</parameter>
|
|
// ...
|
|
// </function>
|
|
// </tool_call>
|
|
// Multiple <tool_call> blocks may appear back-to-back; they never nest.
|
|
export async function streamCompletion(
|
|
ctx: InferenceContext,
|
|
model: string,
|
|
messages: OpenAiMessage[],
|
|
opts: StreamOptions,
|
|
onDelta: (content: string) => void,
|
|
onUsage: ((prompt: number | null, completion: number | null) => void) | undefined,
|
|
signal?: AbortSignal
|
|
): Promise<StreamResult> {
|
|
const aiMessages = toModelMessages(messages);
|
|
const hasTools = opts.tools !== null && opts.tools.length > 0;
|
|
const aiTools = hasTools ? buildAiTools(opts.tools!) : undefined;
|
|
|
|
const startedAt = Date.now();
|
|
let reasoningDeltaCount = 0;
|
|
|
|
const result = streamText({
|
|
model: upstreamModel(ctx.config.LLAMA_SWAP_URL, model),
|
|
messages: aiMessages,
|
|
...(aiTools ? { tools: aiTools, toolChoice: 'auto' as const } : {}),
|
|
...(typeof opts.temperature === 'number' ? { temperature: opts.temperature } : {}),
|
|
abortSignal: signal,
|
|
});
|
|
|
|
let content = '';
|
|
let pendingBuffer = '';
|
|
let finishReason: string | null = null;
|
|
// v1.13.1-A: AI SDK emits one `tool-call` part per fully-aggregated call,
|
|
// so we no longer need the OpenAI-index reassembly map the manual SSE
|
|
// parser used. XML tool calls extracted from text content go into the
|
|
// same flat list and keep the v1.10.5 synthetic id convention.
|
|
const toolCalls: ToolCall[] = [];
|
|
|
|
for await (const part of result.fullStream) {
|
|
switch (part.type) {
|
|
case 'text-delta': {
|
|
pendingBuffer += part.text;
|
|
// Extract any complete <tool_call>...</tool_call> blocks before
|
|
// flushing visible text.
|
|
while (true) {
|
|
const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN);
|
|
if (startIdx === -1) break;
|
|
const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx);
|
|
if (closeIdx === -1) break;
|
|
const blockEnd = closeIdx + XML_TOOL_CLOSE.length;
|
|
const block = pendingBuffer.slice(startIdx, blockEnd);
|
|
if (startIdx > 0) {
|
|
const before = pendingBuffer.slice(0, startIdx);
|
|
content += before;
|
|
onDelta(before);
|
|
}
|
|
const parsedCall = parseXmlToolCall(block);
|
|
if (parsedCall) {
|
|
const synthIdx = toolCalls.length;
|
|
toolCalls.push({
|
|
id: `xml_call_${synthIdx}`,
|
|
name: parsedCall.name,
|
|
args: parsedCall.args,
|
|
});
|
|
}
|
|
// Parse failures still drop the block — leaking <tool_call> XML to
|
|
// the chat would look worse than silently swallowing the bad block.
|
|
pendingBuffer = pendingBuffer.slice(blockEnd);
|
|
}
|
|
// Hold back any (partial or full) unclosed opener; flush the rest.
|
|
const partialIdx = partialXmlOpenerStart(pendingBuffer);
|
|
if (partialIdx >= 0) {
|
|
if (partialIdx > 0) {
|
|
const flush = pendingBuffer.slice(0, partialIdx);
|
|
content += flush;
|
|
onDelta(flush);
|
|
}
|
|
pendingBuffer = pendingBuffer.slice(partialIdx);
|
|
} else if (pendingBuffer.length > 0) {
|
|
content += pendingBuffer;
|
|
onDelta(pendingBuffer);
|
|
pendingBuffer = '';
|
|
}
|
|
break;
|
|
}
|
|
case 'tool-call': {
|
|
// AI SDK has already parsed the input into an object. Match the
|
|
// ToolCall shape BooCode passes around in toolCallsBuffer downstream.
|
|
toolCalls.push({
|
|
id: part.toolCallId,
|
|
name: part.toolName,
|
|
args: (part.input ?? {}) as Record<string, unknown>,
|
|
});
|
|
break;
|
|
}
|
|
case 'reasoning-delta': {
|
|
// v1.13.1-A: reasoning parts are dropped for now. v1.13.1-C will
|
|
// persist them as `kind='reasoning'` rows in message_parts. Counter
|
|
// is logged at finish so we know whether qwen3.6 actually emits any.
|
|
reasoningDeltaCount += 1;
|
|
break;
|
|
}
|
|
case 'finish': {
|
|
if (typeof part.finishReason === 'string') {
|
|
finishReason = part.finishReason;
|
|
}
|
|
break;
|
|
}
|
|
case 'error': {
|
|
const err = part.error;
|
|
throw err instanceof Error ? err : new Error(String(err));
|
|
}
|
|
// Intentional no-op: start, start-step, text-start, text-end,
|
|
// reasoning-start, reasoning-end, source, file, tool-input-start,
|
|
// tool-input-delta, tool-input-end, tool-result, tool-error,
|
|
// finish-step, raw. We only care about the aggregated tool-call and
|
|
// text-delta paths above; the rest are AI SDK lifecycle/streaming
|
|
// breadcrumbs that don't change BooCode's persistence or WS contract.
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
// v1.13.1-A: drain any buffered partial XML opener as plain text. The
|
|
// pre-AI-SDK path did this on stream end too — better to leak `<tool_c`
|
|
// than vanish the text.
|
|
if (pendingBuffer.length > 0) {
|
|
content += pendingBuffer;
|
|
onDelta(pendingBuffer);
|
|
pendingBuffer = '';
|
|
}
|
|
|
|
// v1.13.1-A: AI SDK v6 swallows the abort signal — the fullStream iterator
|
|
// exits cleanly and we'd otherwise return a successful StreamResult, which
|
|
// makes executeStreamPhase call finalizeCompletion and write status='complete'.
|
|
// Detect post-iteration abort and throw an AbortError so handleAbortOrError
|
|
// owns the row instead, matching v1.12.x stop-button behavior.
|
|
if (signal?.aborted) {
|
|
const abortErr = new Error('aborted');
|
|
abortErr.name = 'AbortError';
|
|
throw abortErr;
|
|
}
|
|
|
|
// Usage lands as a promise on the result; awaiting after fullStream is
|
|
// drained is safe. AI SDK v6 names: `inputTokens` / `outputTokens`.
|
|
let promptTokens: number | null = null;
|
|
let completionTokens: number | null = null;
|
|
try {
|
|
const usage = await result.usage;
|
|
if (typeof usage.inputTokens === 'number') promptTokens = usage.inputTokens;
|
|
if (typeof usage.outputTokens === 'number') completionTokens = usage.outputTokens;
|
|
} catch {
|
|
// Some providers omit usage on partial streams; leave both null.
|
|
}
|
|
|
|
if (onUsage && (promptTokens !== null || completionTokens !== null)) {
|
|
onUsage(promptTokens, completionTokens);
|
|
}
|
|
|
|
if (reasoningDeltaCount > 0) {
|
|
ctx.log.debug(
|
|
{ reasoningDeltaCount, model, elapsed_ms: Date.now() - startedAt },
|
|
'streamCompletion: reasoning deltas dropped (captured in v1.13.1-C)',
|
|
);
|
|
}
|
|
|
|
return { finishReason, content, toolCalls, promptTokens, completionTokens };
|
|
}
|
|
|
|
export async function executeStreamPhase(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
session: Session,
|
|
messages: OpenAiMessage[],
|
|
state: StreamPhaseState,
|
|
agent: Agent | null,
|
|
// v1.11.8: when false, web_search and web_fetch are stripped from the
|
|
// tool list sent to the LLM, so the model can't even attempt them.
|
|
webToolsEnabled: boolean,
|
|
): Promise<StreamResult> {
|
|
const { sessionId, chatId, assistantMessageId, signal } = args;
|
|
|
|
const startedRow = await ctx.sql<{ started_at: string }[]>`
|
|
UPDATE messages
|
|
SET started_at = clock_timestamp()
|
|
WHERE id = ${assistantMessageId}
|
|
RETURNING started_at
|
|
`;
|
|
state.startedAt = startedRow[0]?.started_at ?? null;
|
|
|
|
ctx.publish(sessionId, {
|
|
type: 'message_started',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
role: 'assistant',
|
|
});
|
|
|
|
let pendingFlushTimer: NodeJS.Timeout | null = null;
|
|
let flushPromise: Promise<unknown> = Promise.resolve();
|
|
|
|
const flushNow = () => {
|
|
if (pendingFlushTimer) {
|
|
clearTimeout(pendingFlushTimer);
|
|
pendingFlushTimer = null;
|
|
}
|
|
const snapshot = state.accumulated;
|
|
flushPromise = flushPromise.then(() =>
|
|
ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}`
|
|
);
|
|
};
|
|
|
|
const scheduleFlush = () => {
|
|
if (pendingFlushTimer) return;
|
|
pendingFlushTimer = setTimeout(() => {
|
|
pendingFlushTimer = null;
|
|
flushNow();
|
|
}, DB_FLUSH_INTERVAL_MS);
|
|
};
|
|
|
|
// Tool whitelist: if an agent is set, filter the global tool list to only the
|
|
// tool names it allows. Unknown names in agent.tools are dropped silently
|
|
// (handled here by intersection). When no agent: send all tools.
|
|
// v1.11.8: a second filter strips web_search + web_fetch unless the chat
|
|
// has them explicitly enabled. Counts as an opt-in security boundary: the
|
|
// model can't summon a tool that wasn't offered to it.
|
|
const WEB_TOOL_NAMES: ReadonlySet<string> = new Set(['web_search', 'web_fetch']);
|
|
const effectiveTools: ToolJsonSchema[] = (agent
|
|
? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name))
|
|
: toolJsonSchemas()
|
|
).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name));
|
|
const effectiveTemperature = agent?.temperature;
|
|
|
|
// v1.12.2: ctx_max lookup is cached after the first hit per model, so this
|
|
// is a Map probe in steady state. We capture nCtx once at the top of the
|
|
// stream so the throttled usage publish doesn't refetch each tick.
|
|
const mctxForStream = await modelContext.getModelContext(session.model);
|
|
const nCtxForStream = mctxForStream?.n_ctx ?? null;
|
|
|
|
// v1.12.2 → v1.13.1-A: live usage publishes were throttled to ~500ms when
|
|
// the manual SSE parser saw `parsed.usage` per chunk. AI SDK v6 surfaces
|
|
// usage only at stream end (result.usage promise), so the throttle is
|
|
// effectively a single trailing publish. ChatThroughput will tick once at
|
|
// stream completion rather than mid-stream — known regression vs v1.12.2,
|
|
// recovered if a future dispatch interpolates from delta cadence.
|
|
const USAGE_THROTTLE_MS = 500;
|
|
let lastUsageAt = 0;
|
|
let pendingUsage: { p: number | null; c: number | null } | null = null;
|
|
let usageTimer: NodeJS.Timeout | null = null;
|
|
const flushUsage = () => {
|
|
if (!pendingUsage) return;
|
|
const { p, c } = pendingUsage;
|
|
pendingUsage = null;
|
|
lastUsageAt = Date.now();
|
|
ctx.publish(sessionId, {
|
|
type: 'usage',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
completion_tokens: c,
|
|
ctx_used: p,
|
|
ctx_max: nCtxForStream,
|
|
});
|
|
};
|
|
|
|
try {
|
|
return await streamCompletion(
|
|
ctx,
|
|
session.model,
|
|
messages,
|
|
{ tools: effectiveTools, temperature: effectiveTemperature },
|
|
(delta) => {
|
|
state.accumulated += delta;
|
|
ctx.publish(sessionId, {
|
|
type: 'delta',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
content: delta,
|
|
});
|
|
ctx.log.debug({ sessionId, delta }, 'inference delta');
|
|
scheduleFlush();
|
|
},
|
|
(prompt, completion) => {
|
|
pendingUsage = { p: prompt, c: completion };
|
|
const elapsed = Date.now() - lastUsageAt;
|
|
if (elapsed >= USAGE_THROTTLE_MS) {
|
|
flushUsage();
|
|
} else if (!usageTimer) {
|
|
usageTimer = setTimeout(() => {
|
|
usageTimer = null;
|
|
flushUsage();
|
|
}, USAGE_THROTTLE_MS - elapsed);
|
|
}
|
|
},
|
|
signal
|
|
);
|
|
} finally {
|
|
if (pendingFlushTimer) {
|
|
clearTimeout(pendingFlushTimer);
|
|
pendingFlushTimer = null;
|
|
}
|
|
if (usageTimer) {
|
|
clearTimeout(usageTimer);
|
|
usageTimer = null;
|
|
}
|
|
await flushPromise;
|
|
}
|
|
}
|