diff --git a/apps/server/src/services/inference.ts b/apps/server/src/services/inference.ts index 662b17e..96d282b 100644 --- a/apps/server/src/services/inference.ts +++ b/apps/server/src/services/inference.ts @@ -11,13 +11,8 @@ import type { ToolCall, UserStreamFrame, } from '../types/api.js'; -import { - ALL_TOOLS, - TOOLS_BY_NAME, - toolJsonSchemas, - type ToolJsonSchema, -} from './tools.js'; -import { PathScopeError, resolveProjectRoot } from './path_guard.js'; +import { ALL_TOOLS } from './tools.js'; +import { resolveProjectRoot } from './path_guard.js'; import { maybeAutoNameChat } from './auto_name.js'; import { getAgentById } from './agents.js'; import * as compaction from './compaction.js'; @@ -28,30 +23,26 @@ import { DOOM_LOOP_THRESHOLD, detectDoomLoop, } from './inference/sentinels.js'; -import { - XML_TOOL_CLOSE, - XML_TOOL_OPEN, - parseXmlToolCall, - partialXmlOpenerStart, -} from './inference/xml-parser.js'; import { buildMessagesPayload, loadContext, - maybeFlagForCompaction, - type OpenAiMessage, } from './inference/payload.js'; import { finalizeCompletion, handleAbortOrError, } from './inference/error-handler.js'; +import { + executeStreamPhase, + streamCompletion, +} from './inference/stream-phase.js'; +import { executeToolPhase } from './inference/tool-phase.js'; +import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './inference/types.js'; // v1.12.4: re-exported so external callers (tests, future consumers) keep // importing from services/inference.js as the public surface. export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './inference/sentinels.js'; export { buildMessagesPayload } from './inference/payload.js'; -const DB_FLUSH_INTERVAL_MS = 500; - // Synthetic system note appended to the cap-hit summary call. Verbatim from // the v1.8.2 spec — do not paraphrase: the model is more reliable when the // instruction is short, declarative, and identical across calls. @@ -107,29 +98,6 @@ export interface InferenceFrame { export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void; -interface ChatCompletionDelta { - role?: string; - content?: string | null; - tool_calls?: Array<{ - index: number; - id?: string; - type?: 'function'; - function?: { name?: string; arguments?: string }; - }>; -} - -interface ChatCompletionChunk { - choices?: Array<{ - delta: ChatCompletionDelta; - finish_reason: string | null; - }>; - usage?: { - prompt_tokens?: number; - completion_tokens?: number; - total_tokens?: number; - }; -} - export interface InferenceContext { sql: Sql; config: Config; @@ -144,36 +112,10 @@ export interface InferenceContext { broker: Broker; } -// v1.12.4: payload assembly extracted to ./inference/payload.ts — -// buildMessagesPayload, loadContext, maybeFlagForCompaction, and the -// OpenAiMessage shape live there now. Re-exported below to preserve the -// public surface (tests import buildMessagesPayload from this module). -// v1.12: buildSystemPrompt moved to services/system-prompt.ts. See that -// module for the resolution order doc and the container-guidance layer. -// buildMessagesPayload is async now because buildSystemPrompt awaits the -// guidance cache lookup. -async function* sseLines(stream: ReadableStream): AsyncGenerator { - const reader = stream.getReader(); - const decoder = new TextDecoder('utf-8'); - let buffer = ''; - try { - while (true) { - const { value, done } = await reader.read(); - if (done) break; - buffer += decoder.decode(value, { stream: true }); - let idx; - while ((idx = buffer.indexOf('\n')) >= 0) { - const line = buffer.slice(0, idx).replace(/\r$/, ''); - buffer = buffer.slice(idx + 1); - if (line.length === 0) continue; - yield line; - } - } - if (buffer.length > 0) yield buffer; - } finally { - reader.releaseLock(); - } -} +// v1.12.4: payload assembly extracted to ./inference/payload.ts (tests +// import buildMessagesPayload from this module, so a re-export below +// preserves the public surface). Stream + tool phases extracted to +// ./inference/stream-phase.ts and ./inference/tool-phase.ts. export interface StreamResult { finishReason: string | null; @@ -183,235 +125,6 @@ export interface StreamResult { completionTokens: number | null; } -interface StreamOptions { - // null = omit tools entirely (compact phase); [] = caller stripped all tools - // (rare; we still omit from the request body to avoid OpenAI 400). - tools: ToolJsonSchema[] | null; - temperature?: number; -} - -// v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via -// llama-swap) emit tool calls as inline XML inside delta.content rather than -// the structured delta.tool_calls field. The XML shape is: -// -// -// -// VALUE -// -// ...more parameters... -// -// -// Multiple blocks may appear back-to-back; they never nest. -// streamCompletion buffers delta.content, extracts complete blocks, parses -// them via parseXmlToolCall, and pushes synthetic entries into the existing -// toolCallsBuffer alongside any native JSON-format tool calls. -async function streamCompletion( - ctx: InferenceContext, - model: string, - messages: OpenAiMessage[], - opts: StreamOptions, - onDelta: (content: string) => void, - onUsage: ((prompt: number | null, completion: number | null) => void) | undefined, - signal?: AbortSignal -): Promise { - const body: Record = { - model, - messages, - stream: true, - stream_options: { include_usage: true }, - }; - if (opts.tools && opts.tools.length > 0) { - body['tools'] = opts.tools; - body['tool_choice'] = 'auto'; - } - if (typeof opts.temperature === 'number') { - body['temperature'] = opts.temperature; - } - - const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(body), - signal, - }); - if (!res.ok || !res.body) { - const text = await res.text().catch(() => ''); - throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`); - } - - let content = ''; - // v1.10.5: holds delta.content bytes that may contain a partial XML tool - // call. Anything not part of a (possibly forming) - // pair is flushed to content + onDelta as soon as we know it's safe. - let pendingBuffer = ''; - let finishReason: string | null = null; - let promptTokens: number | null = null; - let completionTokens: number | null = null; - const toolCallsBuffer = new Map(); - - for await (const line of sseLines(res.body)) { - if (!line.startsWith('data:')) continue; - const payload = line.slice(5).trim(); - if (payload === '[DONE]') break; - let parsed: ChatCompletionChunk; - try { - parsed = JSON.parse(payload); - } catch { - continue; - } - - if (parsed.usage) { - if (typeof parsed.usage.prompt_tokens === 'number') { - promptTokens = parsed.usage.prompt_tokens; - } - if (typeof parsed.usage.completion_tokens === 'number') { - completionTokens = parsed.usage.completion_tokens; - } - onUsage?.(promptTokens, completionTokens); - } - // v1.11.3: removed dead `parsed.timings.n_ctx` read. llama-server's - // streaming completion does NOT emit n_ctx in timings (verified - // empirically); the authoritative source is llama-swap's - // /upstream//props endpoint, fetched per-turn via - // model-context.getModelContext() at the finalization sites below. - - const choice = parsed.choices?.[0]; - if (!choice) continue; - const delta = choice.delta ?? {}; - if (typeof delta.content === 'string' && delta.content.length > 0) { - // v1.10.5 XML fallback. Append, then extract any complete tool_call - // blocks before deciding what's safe to flush as visible content. - pendingBuffer += delta.content; - while (true) { - const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN); - if (startIdx === -1) break; - const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx); - if (closeIdx === -1) break; - const blockEnd = closeIdx + XML_TOOL_CLOSE.length; - const block = pendingBuffer.slice(startIdx, blockEnd); - // Any text before the opener is plain content — flush it now. - if (startIdx > 0) { - const before = pendingBuffer.slice(0, startIdx); - content += before; - onDelta(before); - } - const parsedCall = parseXmlToolCall(block); - if (parsedCall) { - const synthIdx = toolCallsBuffer.size; - toolCallsBuffer.set(synthIdx, { - id: `xml_call_${synthIdx}`, - name: parsedCall.name, - argsText: JSON.stringify(parsedCall.args), - }); - } - // If parsing failed we still drop the block — emitting unparseable - // XML to the chat would look worse than silently swallowing it. - pendingBuffer = pendingBuffer.slice(blockEnd); - } - // After all complete blocks are out, hold back any (partial or full) - // unclosed opener; flush the rest. - const partialIdx = partialXmlOpenerStart(pendingBuffer); - if (partialIdx >= 0) { - if (partialIdx > 0) { - const flush = pendingBuffer.slice(0, partialIdx); - content += flush; - onDelta(flush); - } - pendingBuffer = pendingBuffer.slice(partialIdx); - } else if (pendingBuffer.length > 0) { - content += pendingBuffer; - onDelta(pendingBuffer); - pendingBuffer = ''; - } - } - if (Array.isArray(delta.tool_calls)) { - for (const tc of delta.tool_calls) { - const idx = tc.index; - const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' }; - if (tc.id) existing.id = tc.id; - if (tc.function?.name) existing.name = tc.function.name; - if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments; - toolCallsBuffer.set(idx, existing); - } - } - if (choice.finish_reason) finishReason = choice.finish_reason; - } - - // v1.10.5: if the stream ended mid-XML (e.g. model truncated, no closer - // ever arrived), flush whatever was buffered as plain content so it isn't - // silently dropped. Better to show a stray `` than vanish text. - if (pendingBuffer.length > 0) { - content += pendingBuffer; - onDelta(pendingBuffer); - pendingBuffer = ''; - } - - const toolCalls: ToolCall[] = []; - for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) { - let args: Record = {}; - if (t.argsText.length > 0) { - try { - args = JSON.parse(t.argsText); - } catch { - args = { _raw: t.argsText }; - } - } - toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args }); - } - - return { finishReason, content, toolCalls, promptTokens, completionTokens }; -} - -async function executeToolCall( - projectRoot: string, - toolCall: ToolCall -): Promise<{ output: unknown; truncated: boolean; error?: string }> { - const tool = TOOLS_BY_NAME[toolCall.name]; - if (!tool) { - return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` }; - } - const parsed = tool.inputSchema.safeParse(toolCall.args); - if (!parsed.success) { - // v1.12 Track B.2: enrich the zod-reject path so the model sees a - // one-line, tool-named hint ("tool 'search_symbols' rejected — query: - // Required") instead of a JSON blob of flatten output. Higher recovery - // rate on the next turn; doom-loop guard still bounds infinite retries. - // The cast is because tool.inputSchema is ZodType, so zod can't - // statically narrow flatten()'s fieldErrors key set — but the runtime - // shape is the standard { formErrors: string[]; fieldErrors: Record<...> }. - const flatten = parsed.error.flatten() as { - formErrors: string[]; - fieldErrors: Record; - }; - const fieldErrors = Object.entries(flatten.fieldErrors) - .map(([field, errs]) => `${field}: ${errs?.[0] ?? 'invalid'}`) - .join('; '); - const formError = flatten.formErrors[0]; - const hint = fieldErrors || formError || 'unknown validation error'; - return { - output: null, - truncated: false, - error: `tool '${toolCall.name}' rejected — ${hint}`, - }; - } - try { - const output = await tool.execute(parsed.data, projectRoot); - const truncated = - typeof output === 'object' && output !== null && 'truncated' in output - ? Boolean((output as { truncated: unknown }).truncated) - : false; - return { output, truncated }; - } catch (err) { - if (err instanceof PathScopeError) { - return { output: null, truncated: false, error: err.message }; - } - return { - output: null, - truncated: false, - error: err instanceof Error ? err.message : String(err), - }; - } -} export interface TurnArgs { sessionId: string; @@ -429,294 +142,8 @@ export interface TurnArgs { signal: AbortSignal | undefined; } -interface StreamPhaseState { - accumulated: string; - startedAt: string | null; -} -async function executeStreamPhase( - ctx: InferenceContext, - args: TurnArgs, - session: Session, - messages: OpenAiMessage[], - state: StreamPhaseState, - agent: Agent | null, - // v1.11.8: when false, web_search and web_fetch are stripped from the - // tool list sent to the LLM, so the model can't even attempt them. - webToolsEnabled: boolean, -): Promise { - const { sessionId, chatId, assistantMessageId, signal } = args; - - const startedRow = await ctx.sql<{ started_at: string }[]>` - UPDATE messages - SET started_at = clock_timestamp() - WHERE id = ${assistantMessageId} - RETURNING started_at - `; - state.startedAt = startedRow[0]?.started_at ?? null; - - ctx.publish(sessionId, { - type: 'message_started', - message_id: assistantMessageId, - chat_id: chatId, - role: 'assistant', - }); - - let pendingFlushTimer: NodeJS.Timeout | null = null; - let flushPromise: Promise = Promise.resolve(); - - const flushNow = () => { - if (pendingFlushTimer) { - clearTimeout(pendingFlushTimer); - pendingFlushTimer = null; - } - const snapshot = state.accumulated; - flushPromise = flushPromise.then(() => - ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` - ); - }; - - const scheduleFlush = () => { - if (pendingFlushTimer) return; - pendingFlushTimer = setTimeout(() => { - pendingFlushTimer = null; - flushNow(); - }, DB_FLUSH_INTERVAL_MS); - }; - - // Tool whitelist: if an agent is set, filter the global tool list to only the - // tool names it allows. Unknown names in agent.tools are dropped silently - // (handled here by intersection). When no agent: send all tools. - // v1.11.8: a second filter strips web_search + web_fetch unless the chat - // has them explicitly enabled. Counts as an opt-in security boundary: the - // model can't summon a tool that wasn't offered to it. - const WEB_TOOL_NAMES: ReadonlySet = new Set(['web_search', 'web_fetch']); - const effectiveTools: ToolJsonSchema[] = (agent - ? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name)) - : toolJsonSchemas() - ).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name)); - const effectiveTemperature = agent?.temperature; - - // v1.12.2: ctx_max lookup is cached after the first hit per model, so this - // is a Map probe in steady state. We capture nCtx once at the top of the - // stream so the throttled usage publish doesn't refetch each tick. - const mctxForStream = await modelContext.getModelContext(session.model); - const nCtxForStream = mctxForStream?.n_ctx ?? null; - - // v1.12.2: throttle live usage publishes to ~500ms. The model can land - // dozens of usage frames per second; without a throttle the WS turns into - // a firehose for a few KB savings on each render. - const USAGE_THROTTLE_MS = 500; - let lastUsageAt = 0; - let pendingUsage: { p: number | null; c: number | null } | null = null; - let usageTimer: NodeJS.Timeout | null = null; - const flushUsage = () => { - if (!pendingUsage) return; - const { p, c } = pendingUsage; - pendingUsage = null; - lastUsageAt = Date.now(); - ctx.publish(sessionId, { - type: 'usage', - message_id: assistantMessageId, - chat_id: chatId, - completion_tokens: c, - ctx_used: p, - ctx_max: nCtxForStream, - }); - }; - - try { - return await streamCompletion( - ctx, - session.model, - messages, - { tools: effectiveTools, temperature: effectiveTemperature }, - (delta) => { - state.accumulated += delta; - ctx.publish(sessionId, { - type: 'delta', - message_id: assistantMessageId, - chat_id: chatId, - content: delta, - }); - ctx.log.debug({ sessionId, delta }, 'inference delta'); - scheduleFlush(); - }, - (prompt, completion) => { - pendingUsage = { p: prompt, c: completion }; - const elapsed = Date.now() - lastUsageAt; - if (elapsed >= USAGE_THROTTLE_MS) { - flushUsage(); - } else if (!usageTimer) { - usageTimer = setTimeout(() => { - usageTimer = null; - flushUsage(); - }, USAGE_THROTTLE_MS - elapsed); - } - }, - signal - ); - } finally { - if (pendingFlushTimer) { - clearTimeout(pendingFlushTimer); - pendingFlushTimer = null; - } - if (usageTimer) { - clearTimeout(usageTimer); - usageTimer = null; - } - await flushPromise; - } -} - -async function executeToolPhase( - ctx: InferenceContext, - args: TurnArgs, - result: StreamResult, - startedAt: string | null, - session: Session, - projectRoot: string -): Promise { - const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args; - const { content, toolCalls, promptTokens, completionTokens } = result; - - // v1.11.3: ctx_max comes from llama-swap /upstream//props, not the - // streaming completion (which doesn't emit n_ctx). getModelContext caches - // the positive lookup for the process lifetime, so this is a single Map - // hit after the first invocation per model. - const mctx = await modelContext.getModelContext(session.model); - const nCtx = mctx?.n_ctx ?? null; - - const [updated] = await ctx.sql< - { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] - >` - UPDATE messages - SET content = ${content}, - status = 'complete', - tool_calls = ${ctx.sql.json(toolCalls as never)}, - tokens_used = ${completionTokens}, - ctx_used = ${promptTokens}, - ctx_max = ${nCtx}, - finished_at = clock_timestamp() - WHERE id = ${assistantMessageId} - RETURNING tokens_used, ctx_used, ctx_max, finished_at - `; - // v1.11: flag for compaction if this turn pushed us over the usable budget. - // We never compact mid-loop (the recursive runAssistantTurn keeps tools - // flowing); the flag fires on the NEXT turn's pre-fetch hook above. - await maybeFlagForCompaction(ctx, chatId, updated); - const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` - UPDATE sessions SET updated_at = clock_timestamp() - WHERE id = ${sessionId} - RETURNING project_id, name, updated_at - `; - ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at }); - for (const tc of toolCalls) { - ctx.publish(sessionId, { - type: 'tool_call', - message_id: assistantMessageId, - chat_id: chatId, - tool_call: tc, - }); - } - ctx.publish(sessionId, { - type: 'message_complete', - message_id: assistantMessageId, - chat_id: chatId, - tokens_used: updated?.tokens_used ?? null, - ctx_used: updated?.ctx_used ?? null, - ctx_max: updated?.ctx_max ?? null, - started_at: startedAt, - finished_at: updated?.finished_at ?? null, - model: session.model, - }); - - // Batch 9.7: ask_user_input pauses the loop. The tool row is still inserted - // (the answer endpoint needs a target row to UPDATE), but tool_results is - // pre-stamped with output=null as a "pending" sentinel and no tool_result - // frame goes out — the card renders from the tool_call frame alone. Mixed - // batches still execute the other tools normally. - ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'tool_running', at: new Date().toISOString() }); - let pausingForUserInput = false; - await Promise.all( - toolCalls.map(async (tc) => { - const [toolRow] = await ctx.sql<{ id: string }[]>` - INSERT INTO messages (session_id, chat_id, role, content, status, created_at) - VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp()) - RETURNING id - `; - const toolMessageId = toolRow!.id; - if (tc.name === 'ask_user_input') { - pausingForUserInput = true; - const sentinel = { tool_call_id: tc.id, output: null, truncated: false }; - await ctx.sql` - UPDATE messages - SET tool_results = ${ctx.sql.json(sentinel as never)} - WHERE id = ${toolMessageId} - `; - return; - } - const tres = await executeToolCall(projectRoot, tc); - const stored = { - tool_call_id: tc.id, - output: tres.output, - truncated: tres.truncated, - ...(tres.error ? { error: tres.error } : {}), - }; - await ctx.sql` - UPDATE messages - SET tool_results = ${ctx.sql.json(stored as never)} - WHERE id = ${toolMessageId} - `; - ctx.publish(sessionId, { - type: 'tool_result', - tool_message_id: toolMessageId, - chat_id: chatId, - tool_call_id: tc.id, - output: tres.output, - truncated: tres.truncated, - ...(tres.error ? { error: tres.error } : {}), - }); - }) - ); - - if (pausingForUserInput) { - ctx.publishUser({ - type: 'chat_status', - chat_id: chatId, - status: 'waiting_for_input', - at: new Date().toISOString(), - }); - ctx.log.info( - { sessionId, chatId, assistantMessageId }, - 'inference paused awaiting user input', - ); - return; - } - - const [nextAssistant] = await ctx.sql<{ id: string }[]>` - INSERT INTO messages (session_id, chat_id, role, content, status, created_at) - VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) - RETURNING id - `; - await runAssistantTurn(ctx, { - sessionId, - chatId, - assistantMessageId: nextAssistant!.id, - // v1.8.2: charge this turn's actual tool invocations against the budget. - // One assistant message can emit multiple tool_calls, so we add the run - // count, not 1. The next turn's budget check sees the cumulative total. - toolsUsed: toolsUsed + result.toolCalls.length, - // v1.11.6: append the just-executed tool calls to the per-turn history - // so the next runAssistantTurn's doom-loop check can see them. We don't - // cap the array length here — per-turn budgets keep it bounded - // (typically <30 entries), and slicing happens inside detectDoomLoop. - recentToolCalls: [...args.recentToolCalls, ...result.toolCalls], - signal, - }); -} - -async function runAssistantTurn( +export async function runAssistantTurn( ctx: InferenceContext, args: TurnArgs, ): Promise { diff --git a/apps/server/src/services/inference/stream-phase.ts b/apps/server/src/services/inference/stream-phase.ts new file mode 100644 index 0000000..c044f87 --- /dev/null +++ b/apps/server/src/services/inference/stream-phase.ts @@ -0,0 +1,380 @@ +import type { + Agent, + Session, + ToolCall, +} from '../../types/api.js'; +import * as modelContext from '../model-context.js'; +import { toolJsonSchemas, type ToolJsonSchema } from '../tools.js'; +import type { OpenAiMessage } from './payload.js'; +import { + XML_TOOL_CLOSE, + XML_TOOL_OPEN, + parseXmlToolCall, + partialXmlOpenerStart, +} from './xml-parser.js'; +import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js'; +import type { + InferenceContext, + StreamResult, + TurnArgs, +} from '../inference.js'; + +interface ChatCompletionDelta { + role?: string; + content?: string | null; + tool_calls?: Array<{ + index: number; + id?: string; + type?: 'function'; + function?: { name?: string; arguments?: string }; + }>; +} + +interface ChatCompletionChunk { + choices?: Array<{ + delta: ChatCompletionDelta; + finish_reason: string | null; + }>; + usage?: { + prompt_tokens?: number; + completion_tokens?: number; + total_tokens?: number; + }; +} + +interface StreamOptions { + // null = omit tools entirely (compact phase); [] = caller stripped all tools + // (rare; we still omit from the request body to avoid OpenAI 400). + tools: ToolJsonSchema[] | null; + temperature?: number; +} + +async function* sseLines(stream: ReadableStream): AsyncGenerator { + const reader = stream.getReader(); + const decoder = new TextDecoder('utf-8'); + let buffer = ''; + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + let idx; + while ((idx = buffer.indexOf('\n')) >= 0) { + const line = buffer.slice(0, idx).replace(/\r$/, ''); + buffer = buffer.slice(idx + 1); + if (line.length === 0) continue; + yield line; + } + } + if (buffer.length > 0) yield buffer; + } finally { + reader.releaseLock(); + } +} + +// v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via +// llama-swap) emit tool calls as inline XML inside delta.content rather than +// the structured delta.tool_calls field. The XML shape is: +// +// +// +// VALUE +// +// ...more parameters... +// +// +// Multiple blocks may appear back-to-back; they never nest. +// streamCompletion buffers delta.content, extracts complete blocks, parses +// them via parseXmlToolCall, and pushes synthetic entries into the existing +// toolCallsBuffer alongside any native JSON-format tool calls. +export async function streamCompletion( + ctx: InferenceContext, + model: string, + messages: OpenAiMessage[], + opts: StreamOptions, + onDelta: (content: string) => void, + onUsage: ((prompt: number | null, completion: number | null) => void) | undefined, + signal?: AbortSignal +): Promise { + const body: Record = { + model, + messages, + stream: true, + stream_options: { include_usage: true }, + }; + if (opts.tools && opts.tools.length > 0) { + body['tools'] = opts.tools; + body['tool_choice'] = 'auto'; + } + if (typeof opts.temperature === 'number') { + body['temperature'] = opts.temperature; + } + + const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + signal, + }); + if (!res.ok || !res.body) { + const text = await res.text().catch(() => ''); + throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`); + } + + let content = ''; + // v1.10.5: holds delta.content bytes that may contain a partial XML tool + // call. Anything not part of a (possibly forming) + // pair is flushed to content + onDelta as soon as we know it's safe. + let pendingBuffer = ''; + let finishReason: string | null = null; + let promptTokens: number | null = null; + let completionTokens: number | null = null; + const toolCallsBuffer = new Map(); + + for await (const line of sseLines(res.body)) { + if (!line.startsWith('data:')) continue; + const payload = line.slice(5).trim(); + if (payload === '[DONE]') break; + let parsed: ChatCompletionChunk; + try { + parsed = JSON.parse(payload); + } catch { + continue; + } + + if (parsed.usage) { + if (typeof parsed.usage.prompt_tokens === 'number') { + promptTokens = parsed.usage.prompt_tokens; + } + if (typeof parsed.usage.completion_tokens === 'number') { + completionTokens = parsed.usage.completion_tokens; + } + onUsage?.(promptTokens, completionTokens); + } + // v1.11.3: removed dead `parsed.timings.n_ctx` read. llama-server's + // streaming completion does NOT emit n_ctx in timings (verified + // empirically); the authoritative source is llama-swap's + // /upstream//props endpoint, fetched per-turn via + // model-context.getModelContext() at the finalization sites below. + + const choice = parsed.choices?.[0]; + if (!choice) continue; + const delta = choice.delta ?? {}; + if (typeof delta.content === 'string' && delta.content.length > 0) { + // v1.10.5 XML fallback. Append, then extract any complete tool_call + // blocks before deciding what's safe to flush as visible content. + pendingBuffer += delta.content; + while (true) { + const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN); + if (startIdx === -1) break; + const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx); + if (closeIdx === -1) break; + const blockEnd = closeIdx + XML_TOOL_CLOSE.length; + const block = pendingBuffer.slice(startIdx, blockEnd); + // Any text before the opener is plain content — flush it now. + if (startIdx > 0) { + const before = pendingBuffer.slice(0, startIdx); + content += before; + onDelta(before); + } + const parsedCall = parseXmlToolCall(block); + if (parsedCall) { + const synthIdx = toolCallsBuffer.size; + toolCallsBuffer.set(synthIdx, { + id: `xml_call_${synthIdx}`, + name: parsedCall.name, + argsText: JSON.stringify(parsedCall.args), + }); + } + // If parsing failed we still drop the block — emitting unparseable + // XML to the chat would look worse than silently swallowing it. + pendingBuffer = pendingBuffer.slice(blockEnd); + } + // After all complete blocks are out, hold back any (partial or full) + // unclosed opener; flush the rest. + const partialIdx = partialXmlOpenerStart(pendingBuffer); + if (partialIdx >= 0) { + if (partialIdx > 0) { + const flush = pendingBuffer.slice(0, partialIdx); + content += flush; + onDelta(flush); + } + pendingBuffer = pendingBuffer.slice(partialIdx); + } else if (pendingBuffer.length > 0) { + content += pendingBuffer; + onDelta(pendingBuffer); + pendingBuffer = ''; + } + } + if (Array.isArray(delta.tool_calls)) { + for (const tc of delta.tool_calls) { + const idx = tc.index; + const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' }; + if (tc.id) existing.id = tc.id; + if (tc.function?.name) existing.name = tc.function.name; + if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments; + toolCallsBuffer.set(idx, existing); + } + } + if (choice.finish_reason) finishReason = choice.finish_reason; + } + + // v1.10.5: if the stream ended mid-XML (e.g. model truncated, no closer + // ever arrived), flush whatever was buffered as plain content so it isn't + // silently dropped. Better to show a stray `` than vanish text. + if (pendingBuffer.length > 0) { + content += pendingBuffer; + onDelta(pendingBuffer); + pendingBuffer = ''; + } + + const toolCalls: ToolCall[] = []; + for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) { + let args: Record = {}; + if (t.argsText.length > 0) { + try { + args = JSON.parse(t.argsText); + } catch { + args = { _raw: t.argsText }; + } + } + toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args }); + } + + return { finishReason, content, toolCalls, promptTokens, completionTokens }; +} + +export async function executeStreamPhase( + ctx: InferenceContext, + args: TurnArgs, + session: Session, + messages: OpenAiMessage[], + state: StreamPhaseState, + agent: Agent | null, + // v1.11.8: when false, web_search and web_fetch are stripped from the + // tool list sent to the LLM, so the model can't even attempt them. + webToolsEnabled: boolean, +): Promise { + const { sessionId, chatId, assistantMessageId, signal } = args; + + const startedRow = await ctx.sql<{ started_at: string }[]>` + UPDATE messages + SET started_at = clock_timestamp() + WHERE id = ${assistantMessageId} + RETURNING started_at + `; + state.startedAt = startedRow[0]?.started_at ?? null; + + ctx.publish(sessionId, { + type: 'message_started', + message_id: assistantMessageId, + chat_id: chatId, + role: 'assistant', + }); + + let pendingFlushTimer: NodeJS.Timeout | null = null; + let flushPromise: Promise = Promise.resolve(); + + const flushNow = () => { + if (pendingFlushTimer) { + clearTimeout(pendingFlushTimer); + pendingFlushTimer = null; + } + const snapshot = state.accumulated; + flushPromise = flushPromise.then(() => + ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` + ); + }; + + const scheduleFlush = () => { + if (pendingFlushTimer) return; + pendingFlushTimer = setTimeout(() => { + pendingFlushTimer = null; + flushNow(); + }, DB_FLUSH_INTERVAL_MS); + }; + + // Tool whitelist: if an agent is set, filter the global tool list to only the + // tool names it allows. Unknown names in agent.tools are dropped silently + // (handled here by intersection). When no agent: send all tools. + // v1.11.8: a second filter strips web_search + web_fetch unless the chat + // has them explicitly enabled. Counts as an opt-in security boundary: the + // model can't summon a tool that wasn't offered to it. + const WEB_TOOL_NAMES: ReadonlySet = new Set(['web_search', 'web_fetch']); + const effectiveTools: ToolJsonSchema[] = (agent + ? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name)) + : toolJsonSchemas() + ).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name)); + const effectiveTemperature = agent?.temperature; + + // v1.12.2: ctx_max lookup is cached after the first hit per model, so this + // is a Map probe in steady state. We capture nCtx once at the top of the + // stream so the throttled usage publish doesn't refetch each tick. + const mctxForStream = await modelContext.getModelContext(session.model); + const nCtxForStream = mctxForStream?.n_ctx ?? null; + + // v1.12.2: throttle live usage publishes to ~500ms. The model can land + // dozens of usage frames per second; without a throttle the WS turns into + // a firehose for a few KB savings on each render. + const USAGE_THROTTLE_MS = 500; + let lastUsageAt = 0; + let pendingUsage: { p: number | null; c: number | null } | null = null; + let usageTimer: NodeJS.Timeout | null = null; + const flushUsage = () => { + if (!pendingUsage) return; + const { p, c } = pendingUsage; + pendingUsage = null; + lastUsageAt = Date.now(); + ctx.publish(sessionId, { + type: 'usage', + message_id: assistantMessageId, + chat_id: chatId, + completion_tokens: c, + ctx_used: p, + ctx_max: nCtxForStream, + }); + }; + + try { + return await streamCompletion( + ctx, + session.model, + messages, + { tools: effectiveTools, temperature: effectiveTemperature }, + (delta) => { + state.accumulated += delta; + ctx.publish(sessionId, { + type: 'delta', + message_id: assistantMessageId, + chat_id: chatId, + content: delta, + }); + ctx.log.debug({ sessionId, delta }, 'inference delta'); + scheduleFlush(); + }, + (prompt, completion) => { + pendingUsage = { p: prompt, c: completion }; + const elapsed = Date.now() - lastUsageAt; + if (elapsed >= USAGE_THROTTLE_MS) { + flushUsage(); + } else if (!usageTimer) { + usageTimer = setTimeout(() => { + usageTimer = null; + flushUsage(); + }, USAGE_THROTTLE_MS - elapsed); + } + }, + signal + ); + } finally { + if (pendingFlushTimer) { + clearTimeout(pendingFlushTimer); + pendingFlushTimer = null; + } + if (usageTimer) { + clearTimeout(usageTimer); + usageTimer = null; + } + await flushPromise; + } +} diff --git a/apps/server/src/services/inference/tool-phase.ts b/apps/server/src/services/inference/tool-phase.ts new file mode 100644 index 0000000..880d7c2 --- /dev/null +++ b/apps/server/src/services/inference/tool-phase.ts @@ -0,0 +1,213 @@ +import type { Session, ToolCall } from '../../types/api.js'; +import * as modelContext from '../model-context.js'; +import { PathScopeError } from '../path_guard.js'; +import { TOOLS_BY_NAME } from '../tools.js'; +import { maybeFlagForCompaction } from './payload.js'; +import type { + InferenceContext, + StreamResult, + TurnArgs, +} from '../inference.js'; +// v1.12.4: ESM value-import cycle. executeToolPhase recurses into +// runAssistantTurn which lives in inference.ts. The cycle is safe because +// the reference is read at call time (inside an async function body), not +// at module top-level. Node + tsc resolve this cleanly. +import { runAssistantTurn } from '../inference.js'; + +async function executeToolCall( + projectRoot: string, + toolCall: ToolCall +): Promise<{ output: unknown; truncated: boolean; error?: string }> { + const tool = TOOLS_BY_NAME[toolCall.name]; + if (!tool) { + return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` }; + } + const parsed = tool.inputSchema.safeParse(toolCall.args); + if (!parsed.success) { + // v1.12 Track B.2: enrich the zod-reject path so the model sees a + // one-line, tool-named hint ("tool 'search_symbols' rejected — query: + // Required") instead of a JSON blob of flatten output. Higher recovery + // rate on the next turn; doom-loop guard still bounds infinite retries. + // The cast is because tool.inputSchema is ZodType, so zod can't + // statically narrow flatten()'s fieldErrors key set — but the runtime + // shape is the standard { formErrors: string[]; fieldErrors: Record<...> }. + const flatten = parsed.error.flatten() as { + formErrors: string[]; + fieldErrors: Record; + }; + const fieldErrors = Object.entries(flatten.fieldErrors) + .map(([field, errs]) => `${field}: ${errs?.[0] ?? 'invalid'}`) + .join('; '); + const formError = flatten.formErrors[0]; + const hint = fieldErrors || formError || 'unknown validation error'; + return { + output: null, + truncated: false, + error: `tool '${toolCall.name}' rejected — ${hint}`, + }; + } + try { + const output = await tool.execute(parsed.data, projectRoot); + const truncated = + typeof output === 'object' && output !== null && 'truncated' in output + ? Boolean((output as { truncated: unknown }).truncated) + : false; + return { output, truncated }; + } catch (err) { + if (err instanceof PathScopeError) { + return { output: null, truncated: false, error: err.message }; + } + return { + output: null, + truncated: false, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +export async function executeToolPhase( + ctx: InferenceContext, + args: TurnArgs, + result: StreamResult, + startedAt: string | null, + session: Session, + projectRoot: string +): Promise { + const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args; + const { content, toolCalls, promptTokens, completionTokens } = result; + + // v1.11.3: ctx_max comes from llama-swap /upstream//props, not the + // streaming completion (which doesn't emit n_ctx). getModelContext caches + // the positive lookup for the process lifetime, so this is a single Map + // hit after the first invocation per model. + const mctx = await modelContext.getModelContext(session.model); + const nCtx = mctx?.n_ctx ?? null; + + const [updated] = await ctx.sql< + { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] + >` + UPDATE messages + SET content = ${content}, + status = 'complete', + tool_calls = ${ctx.sql.json(toolCalls as never)}, + tokens_used = ${completionTokens}, + ctx_used = ${promptTokens}, + ctx_max = ${nCtx}, + finished_at = clock_timestamp() + WHERE id = ${assistantMessageId} + RETURNING tokens_used, ctx_used, ctx_max, finished_at + `; + // v1.11: flag for compaction if this turn pushed us over the usable budget. + // We never compact mid-loop (the recursive runAssistantTurn keeps tools + // flowing); the flag fires on the NEXT turn's pre-fetch hook above. + await maybeFlagForCompaction(ctx, chatId, updated); + const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` + UPDATE sessions SET updated_at = clock_timestamp() + WHERE id = ${sessionId} + RETURNING project_id, name, updated_at + `; + ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at }); + for (const tc of toolCalls) { + ctx.publish(sessionId, { + type: 'tool_call', + message_id: assistantMessageId, + chat_id: chatId, + tool_call: tc, + }); + } + ctx.publish(sessionId, { + type: 'message_complete', + message_id: assistantMessageId, + chat_id: chatId, + tokens_used: updated?.tokens_used ?? null, + ctx_used: updated?.ctx_used ?? null, + ctx_max: updated?.ctx_max ?? null, + started_at: startedAt, + finished_at: updated?.finished_at ?? null, + model: session.model, + }); + + // Batch 9.7: ask_user_input pauses the loop. The tool row is still inserted + // (the answer endpoint needs a target row to UPDATE), but tool_results is + // pre-stamped with output=null as a "pending" sentinel and no tool_result + // frame goes out — the card renders from the tool_call frame alone. Mixed + // batches still execute the other tools normally. + ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'tool_running', at: new Date().toISOString() }); + let pausingForUserInput = false; + await Promise.all( + toolCalls.map(async (tc) => { + const [toolRow] = await ctx.sql<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp()) + RETURNING id + `; + const toolMessageId = toolRow!.id; + if (tc.name === 'ask_user_input') { + pausingForUserInput = true; + const sentinel = { tool_call_id: tc.id, output: null, truncated: false }; + await ctx.sql` + UPDATE messages + SET tool_results = ${ctx.sql.json(sentinel as never)} + WHERE id = ${toolMessageId} + `; + return; + } + const tres = await executeToolCall(projectRoot, tc); + const stored = { + tool_call_id: tc.id, + output: tres.output, + truncated: tres.truncated, + ...(tres.error ? { error: tres.error } : {}), + }; + await ctx.sql` + UPDATE messages + SET tool_results = ${ctx.sql.json(stored as never)} + WHERE id = ${toolMessageId} + `; + ctx.publish(sessionId, { + type: 'tool_result', + tool_message_id: toolMessageId, + chat_id: chatId, + tool_call_id: tc.id, + output: tres.output, + truncated: tres.truncated, + ...(tres.error ? { error: tres.error } : {}), + }); + }) + ); + + if (pausingForUserInput) { + ctx.publishUser({ + type: 'chat_status', + chat_id: chatId, + status: 'waiting_for_input', + at: new Date().toISOString(), + }); + ctx.log.info( + { sessionId, chatId, assistantMessageId }, + 'inference paused awaiting user input', + ); + return; + } + + const [nextAssistant] = await ctx.sql<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) + RETURNING id + `; + await runAssistantTurn(ctx, { + sessionId, + chatId, + assistantMessageId: nextAssistant!.id, + // v1.8.2: charge this turn's actual tool invocations against the budget. + // One assistant message can emit multiple tool_calls, so we add the run + // count, not 1. The next turn's budget check sees the cumulative total. + toolsUsed: toolsUsed + result.toolCalls.length, + // v1.11.6: append the just-executed tool calls to the per-turn history + // so the next runAssistantTurn's doom-loop check can see them. We don't + // cap the array length here — per-turn budgets keep it bounded + // (typically <30 entries), and slicing happens inside detectDoomLoop. + recentToolCalls: [...args.recentToolCalls, ...result.toolCalls], + signal, + }); +} diff --git a/apps/server/src/services/inference/types.ts b/apps/server/src/services/inference/types.ts new file mode 100644 index 0000000..2b2a3b8 --- /dev/null +++ b/apps/server/src/services/inference/types.ts @@ -0,0 +1,13 @@ +// v1.12.4: shared inter-phase types/constants for the extracted phase files. +// Lives here so stream-phase, tool-phase, and the summary functions still in +// inference.ts can all reference the same definitions without circular imports. + +export interface StreamPhaseState { + accumulated: string; + startedAt: string | null; +} + +// 500ms keeps the DB UPDATE rate bounded under heavy streaming. Used by +// executeStreamPhase, runCapHitSummary, and runDoomLoopSummary — every site +// that does a debounced content flush during streaming. +export const DB_FLUSH_INTERVAL_MS = 500;