import type { FastifyBaseLogger } from 'fastify'; import type { Sql } from '../db.js'; import type { Config } from '../config.js'; import type { Agent, ErrorReason, Message, MessageMetadata, Project, Session, ToolCall, UserStreamFrame, } from '../types/api.js'; import { ALL_TOOLS, READ_ONLY_TOOL_NAMES, TOOLS_BY_NAME, toolJsonSchemas, type ToolJsonSchema, } from './tools.js'; import { PathScopeError, resolveProjectRoot } from './path_guard.js'; import { maybeAutoNameChat } from './auto_name.js'; import { getAgentById } from './agents.js'; import * as compaction from './compaction.js'; import * as modelContext from './model-context.js'; import type { Broker } from './broker.js'; const BASE_SYSTEM_PROMPT = (projectPath: string) => `You are BooCode Chat, a code investigation assistant. The user is working on a project located at ${projectPath}. Use the file-read tools (view_file, list_dir, grep, find_files) to investigate code when needed. Be concise. Cite file paths and line numbers when discussing code. Do not hallucinate file contents — read the file first. Tool results may be truncated; if so, narrow your query rather than guessing.`; const DB_FLUSH_INTERVAL_MS = 500; // v1.8.2: tool-call budget defaults. Resolved per-turn by resolveToolBudget. // - Agent with explicit max_tool_calls: that value. // - Agent with read-only-only tools: BUDGET_READ_ONLY (30). // - Agent with any non-read-only tool: BUDGET_NON_READ_ONLY (10). // - No agent (raw chat): BUDGET_NO_AGENT (15). const BUDGET_READ_ONLY = 30; const BUDGET_NON_READ_ONLY = 10; const BUDGET_NO_AGENT = 15; const READ_ONLY_SET: ReadonlySet = new Set(READ_ONLY_TOOL_NAMES); function resolveToolBudget(agent: Agent | null): number { if (agent?.max_tool_calls != null) return agent.max_tool_calls; if (!agent) return BUDGET_NO_AGENT; const allReadOnly = agent.tools.every((t) => READ_ONLY_SET.has(t)); return allReadOnly ? BUDGET_READ_ONLY : BUDGET_NON_READ_ONLY; } // Synthetic system note appended to the cap-hit summary call. Verbatim from // the v1.8.2 spec — do not paraphrase: the model is more reliable when the // instruction is short, declarative, and identical across calls. const CAP_HIT_SUMMARY_NOTE = (limit: number) => `You've reached the tool budget (${limit} calls). Produce the best answer you can with what you have. Do not call more tools.`; function isCapHitSentinel(m: Message): boolean { return ( m.role === 'system' && m.metadata !== null && typeof m.metadata === 'object' && (m.metadata as { kind?: unknown }).kind === 'cap_hit' ); } export interface InferenceFrame { type: | 'message_started' | 'delta' | 'tool_call' | 'tool_result' | 'message_complete' | 'messages_deleted' | 'session_renamed' | 'chat_renamed' | 'error'; message_id?: string; message_ids?: string[]; chat_id?: string; tool_message_id?: string; tool_call_id?: string; // v1.8.2: 'system' added so cap-hit sentinel messages can announce themselves // through the normal message_started → delta → message_complete sequence. role?: 'assistant' | 'tool' | 'user' | 'system'; content?: string; tool_call?: ToolCall; output?: unknown; truncated?: boolean; error?: string; // v1.8.2: structured error reason. Set on `type: 'error'` so the UI can // surface a specific message; `error` stays the human-readable text. reason?: ErrorReason; // v1.8.2: piggybacks on `message_complete` so static or terminally-resolved // messages can carry their persisted metadata to the live stream without a // refetch (sentinels carry { kind: 'cap_hit', ... }; failed messages carry // { kind: 'error', ... }). metadata?: MessageMetadata | null; tokens_used?: number | null; ctx_used?: number | null; ctx_max?: number | null; started_at?: string | null; finished_at?: string | null; model?: string; session_id?: string; name?: string; } export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void; interface OpenAiMessage { role: 'system' | 'user' | 'assistant' | 'tool'; content: string | null; tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string }; }>; tool_call_id?: string; } interface ChatCompletionDelta { role?: string; content?: string | null; tool_calls?: Array<{ index: number; id?: string; type?: 'function'; function?: { name?: string; arguments?: string }; }>; } interface ChatCompletionChunk { choices?: Array<{ delta: ChatCompletionDelta; finish_reason: string | null; }>; usage?: { prompt_tokens?: number; completion_tokens?: number; total_tokens?: number; }; } export interface InferenceContext { sql: Sql; config: Config; log: FastifyBaseLogger; publish: FramePublisher; publishUser: (frame: UserStreamFrame) => void; // v1.11: passed through so compaction.process can publish 'compacted' // frames on the same session WS channel useSessionStream subscribes to. // Compaction is the only path that needs the raw broker handle (regular // inference goes through `publish`); keeping a separate field avoids // tempting other code paths into bypassing the session-id binding. broker: Broker; } // Resolution order: base prompt < agent.system_prompt < user prompt, where // user prompt = session.system_prompt if non-empty, else project's // default_system_prompt if non-empty, else nothing. Empty/whitespace-only // counts as "no override" for both layers (v1.9 inherit semantics — keeps // the column non-nullable so the existing key/value store stays put). export function buildSystemPrompt( project: Project, session: Session, agent: Agent | null ): string { let out = BASE_SYSTEM_PROMPT(project.path); if (agent && agent.system_prompt.trim().length > 0) { out += '\n\n' + agent.system_prompt.trim(); } const sessionPrompt = session.system_prompt?.trim() ?? ''; const projectPrompt = project.default_system_prompt?.trim() ?? ''; const userPrompt = sessionPrompt || projectPrompt; if (userPrompt.length > 0) { out += '\n\n' + userPrompt; } return out; } export function buildMessagesPayload( session: Session, project: Project, history: Message[], agent: Agent | null = null ): OpenAiMessage[] { const out: OpenAiMessage[] = []; const systemPrompt = buildSystemPrompt(project, session, agent); out.push({ role: 'system', content: systemPrompt }); // Find the latest compact marker — only send messages from that point onwards let startIdx = 0; for (let i = history.length - 1; i >= 0; i--) { if (history[i]!.kind === 'compact') { startIdx = i; break; } } for (let i = startIdx; i < history.length; i++) { const m = history[i]!; if (m.kind === 'compact') { out.push({ role: 'system', content: m.content }); continue; } // v1.8.2: cap-hit sentinels are UI-only — never send them to the LLM. The // synthetic "you've reached the tool budget" note lives only inside the // summary call's messages array and is never persisted, so on Continue // the model resumes with a clean context. if (isCapHitSentinel(m)) continue; if (m.role === 'assistant' && m.status === 'streaming') continue; if (m.role === 'assistant' && m.status === 'cancelled') continue; if (m.role === 'tool') { const tr = m.tool_results; if (!tr) continue; const outputText = tr.error ? `error: ${tr.error}` : typeof tr.output === 'string' ? tr.output : JSON.stringify(tr.output); out.push({ role: 'tool', content: outputText, tool_call_id: tr.tool_call_id, }); continue; } if (m.role === 'assistant') { const msg: OpenAiMessage = { role: 'assistant', content: m.content && m.content.length > 0 ? m.content : null, }; if (m.tool_calls && m.tool_calls.length > 0) { msg.tool_calls = m.tool_calls.map((tc) => ({ id: tc.id, type: 'function' as const, function: { name: tc.name, arguments: JSON.stringify(tc.args) }, })); } out.push(msg); continue; } out.push({ role: 'user', content: m.content }); } return out; } async function loadContext( sql: Sql, sessionId: string, chatId: string ): Promise<{ session: Session; project: Project; history: Message[] } | null> { const sessionRows = await sql` SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at, agent_id, web_search_enabled FROM sessions WHERE id = ${sessionId} `; if (sessionRows.length === 0) return null; const session = sessionRows[0]!; const projectRows = await sql` SELECT id, name, path, added_at, last_session_id, status, gitea_remote, default_system_prompt, default_web_search_enabled FROM projects WHERE id = ${session.project_id} `; if (projectRows.length === 0) return null; const project = projectRows[0]!; // v1.11: filter compacted messages out of the inference assembly. The GET // /api/sessions/:id/messages endpoint still returns everything (so the UI // can show history with the summary card inline); only LLM payloads skip // compacted rows. compacted_at IS NULL keeps the active summary + tail. const history = await sql` SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq, tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata FROM messages WHERE chat_id = ${chatId} AND compacted_at IS NULL ORDER BY created_at ASC, id ASC `; return { session, project, history }; } // v1.11: shared helper used after both finalizeCompletion and executeToolPhase // persist their token counts. Reads tokens off the just-UPDATEd row (which // the caller returns from RETURNING), runs compaction.isOverflow, and flips // chats.needs_compaction. The next runAssistantTurn invocation acts on it. // Silent on missing tokens — llama-swap occasionally omits usage on truncated // streams, and we'd rather miss one overflow than crash the inference path. async function maybeFlagForCompaction( ctx: InferenceContext, chatId: string, updated: { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null } | undefined, ): Promise { if (!updated) return; const promptTokens = updated.ctx_used; const completionTokens = updated.tokens_used; const contextLimit = updated.ctx_max; if (typeof promptTokens !== 'number') return; if (typeof completionTokens !== 'number') return; if (typeof contextLimit !== 'number') return; const overflow = compaction.isOverflow( { prompt_tokens: promptTokens, completion_tokens: completionTokens }, contextLimit, ); if (!overflow) return; await ctx.sql`UPDATE chats SET needs_compaction = true WHERE id = ${chatId}`; ctx.log.info({ chatId, promptTokens, completionTokens, contextLimit }, 'inference: flagged for compaction'); } async function* sseLines(stream: ReadableStream): AsyncGenerator { const reader = stream.getReader(); const decoder = new TextDecoder('utf-8'); let buffer = ''; try { while (true) { const { value, done } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); let idx; while ((idx = buffer.indexOf('\n')) >= 0) { const line = buffer.slice(0, idx).replace(/\r$/, ''); buffer = buffer.slice(idx + 1); if (line.length === 0) continue; yield line; } } if (buffer.length > 0) yield buffer; } finally { reader.releaseLock(); } } interface StreamResult { finishReason: string | null; content: string; toolCalls: ToolCall[]; promptTokens: number | null; completionTokens: number | null; } interface StreamOptions { // null = omit tools entirely (compact phase); [] = caller stripped all tools // (rare; we still omit from the request body to avoid OpenAI 400). tools: ToolJsonSchema[] | null; temperature?: number; } // v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via // llama-swap) emit tool calls as inline XML inside delta.content rather than // the structured delta.tool_calls field. The XML shape is: // // // // VALUE // // ...more parameters... // // // Multiple blocks may appear back-to-back; they never nest. // streamCompletion buffers delta.content, extracts complete blocks, parses // them via parseXmlToolCall, and pushes synthetic entries into the existing // toolCallsBuffer alongside any native JSON-format tool calls. const XML_TOOL_OPEN = ''; const XML_TOOL_CLOSE = ''; function parseXmlToolCall( block: string, ): { name: string; args: Record } | null { const nameMatch = block.match(/]+)>/); if (!nameMatch || !nameMatch[1]) return null; const name = nameMatch[1].trim(); if (!name) return null; const args: Record = {}; // Non-greedy body so each … pair is matched // independently even when multiple appear in the same block. const paramRe = /]+)>([\s\S]*?)<\/parameter>/g; for (const m of block.matchAll(paramRe)) { const key = (m[1] ?? '').trim(); if (!key) continue; const raw = (m[2] ?? '').trim(); try { args[key] = JSON.parse(raw); } catch { args[key] = raw; } } return { name, args }; } // Locate the first character that begins (or completely contains) an // unfinished opener in `s`. Returns -1 when `s` can be flushed // to the client in full without risking a partial tag leak. // Case 1: a full `` opener with no matching closer — caller // must keep everything from that index forward until the next // chunk arrives with the closer. // Case 2: `s` ends with a strict prefix of `` (e.g. ` pair before reaching this check. function partialXmlOpenerStart(s: string): number { const fullOpener = s.indexOf(XML_TOOL_OPEN); if (fullOpener !== -1) return fullOpener; const lastLt = s.lastIndexOf('<'); if (lastLt === -1) return -1; const suffix = s.slice(lastLt); if (XML_TOOL_OPEN.startsWith(suffix) && suffix.length < XML_TOOL_OPEN.length) { return lastLt; } return -1; } async function streamCompletion( ctx: InferenceContext, model: string, messages: OpenAiMessage[], opts: StreamOptions, onDelta: (content: string) => void, signal?: AbortSignal ): Promise { const body: Record = { model, messages, stream: true, stream_options: { include_usage: true }, }; if (opts.tools && opts.tools.length > 0) { body['tools'] = opts.tools; body['tool_choice'] = 'auto'; } if (typeof opts.temperature === 'number') { body['temperature'] = opts.temperature; } const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body), signal, }); if (!res.ok || !res.body) { const text = await res.text().catch(() => ''); throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`); } let content = ''; // v1.10.5: holds delta.content bytes that may contain a partial XML tool // call. Anything not part of a (possibly forming) // pair is flushed to content + onDelta as soon as we know it's safe. let pendingBuffer = ''; let finishReason: string | null = null; let promptTokens: number | null = null; let completionTokens: number | null = null; const toolCallsBuffer = new Map(); for await (const line of sseLines(res.body)) { if (!line.startsWith('data:')) continue; const payload = line.slice(5).trim(); if (payload === '[DONE]') break; let parsed: ChatCompletionChunk; try { parsed = JSON.parse(payload); } catch { continue; } if (parsed.usage) { if (typeof parsed.usage.prompt_tokens === 'number') { promptTokens = parsed.usage.prompt_tokens; } if (typeof parsed.usage.completion_tokens === 'number') { completionTokens = parsed.usage.completion_tokens; } } // v1.11.3: removed dead `parsed.timings.n_ctx` read. llama-server's // streaming completion does NOT emit n_ctx in timings (verified // empirically); the authoritative source is llama-swap's // /upstream//props endpoint, fetched per-turn via // model-context.getModelContext() at the finalization sites below. const choice = parsed.choices?.[0]; if (!choice) continue; const delta = choice.delta ?? {}; if (typeof delta.content === 'string' && delta.content.length > 0) { // v1.10.5 XML fallback. Append, then extract any complete tool_call // blocks before deciding what's safe to flush as visible content. pendingBuffer += delta.content; while (true) { const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN); if (startIdx === -1) break; const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx); if (closeIdx === -1) break; const blockEnd = closeIdx + XML_TOOL_CLOSE.length; const block = pendingBuffer.slice(startIdx, blockEnd); // Any text before the opener is plain content — flush it now. if (startIdx > 0) { const before = pendingBuffer.slice(0, startIdx); content += before; onDelta(before); } const parsedCall = parseXmlToolCall(block); if (parsedCall) { const synthIdx = toolCallsBuffer.size; toolCallsBuffer.set(synthIdx, { id: `xml_call_${synthIdx}`, name: parsedCall.name, argsText: JSON.stringify(parsedCall.args), }); } // If parsing failed we still drop the block — emitting unparseable // XML to the chat would look worse than silently swallowing it. pendingBuffer = pendingBuffer.slice(blockEnd); } // After all complete blocks are out, hold back any (partial or full) // unclosed opener; flush the rest. const partialIdx = partialXmlOpenerStart(pendingBuffer); if (partialIdx >= 0) { if (partialIdx > 0) { const flush = pendingBuffer.slice(0, partialIdx); content += flush; onDelta(flush); } pendingBuffer = pendingBuffer.slice(partialIdx); } else if (pendingBuffer.length > 0) { content += pendingBuffer; onDelta(pendingBuffer); pendingBuffer = ''; } } if (Array.isArray(delta.tool_calls)) { for (const tc of delta.tool_calls) { const idx = tc.index; const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' }; if (tc.id) existing.id = tc.id; if (tc.function?.name) existing.name = tc.function.name; if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments; toolCallsBuffer.set(idx, existing); } } if (choice.finish_reason) finishReason = choice.finish_reason; } // v1.10.5: if the stream ended mid-XML (e.g. model truncated, no closer // ever arrived), flush whatever was buffered as plain content so it isn't // silently dropped. Better to show a stray `` than vanish text. if (pendingBuffer.length > 0) { content += pendingBuffer; onDelta(pendingBuffer); pendingBuffer = ''; } const toolCalls: ToolCall[] = []; for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) { let args: Record = {}; if (t.argsText.length > 0) { try { args = JSON.parse(t.argsText); } catch { args = { _raw: t.argsText }; } } toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args }); } return { finishReason, content, toolCalls, promptTokens, completionTokens }; } async function executeToolCall( projectRoot: string, toolCall: ToolCall ): Promise<{ output: unknown; truncated: boolean; error?: string }> { const tool = TOOLS_BY_NAME[toolCall.name]; if (!tool) { return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` }; } const parsed = tool.inputSchema.safeParse(toolCall.args); if (!parsed.success) { return { output: null, truncated: false, error: `invalid input: ${JSON.stringify(parsed.error.flatten())}`, }; } try { const output = await tool.execute(parsed.data, projectRoot); const truncated = typeof output === 'object' && output !== null && 'truncated' in output ? Boolean((output as { truncated: unknown }).truncated) : false; return { output, truncated }; } catch (err) { if (err instanceof PathScopeError) { return { output: null, truncated: false, error: err.message }; } return { output: null, truncated: false, error: err instanceof Error ? err.message : String(err), }; } } interface TurnArgs { sessionId: string; chatId: string; assistantMessageId: string; // v1.8.2: cumulative tool calls executed this run. Compared against the // resolved budget at the top of each turn. Replaces the older `depth` // counter (which counted iterations, not invocations). toolsUsed: number; signal: AbortSignal | undefined; } interface StreamPhaseState { accumulated: string; startedAt: string | null; } async function executeStreamPhase( ctx: InferenceContext, args: TurnArgs, session: Session, messages: OpenAiMessage[], state: StreamPhaseState, agent: Agent | null ): Promise { const { sessionId, chatId, assistantMessageId, signal } = args; const startedRow = await ctx.sql<{ started_at: string }[]>` UPDATE messages SET started_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING started_at `; state.startedAt = startedRow[0]?.started_at ?? null; ctx.publish(sessionId, { type: 'message_started', message_id: assistantMessageId, chat_id: chatId, role: 'assistant', }); let pendingFlushTimer: NodeJS.Timeout | null = null; let flushPromise: Promise = Promise.resolve(); const flushNow = () => { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } const snapshot = state.accumulated; flushPromise = flushPromise.then(() => ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` ); }; const scheduleFlush = () => { if (pendingFlushTimer) return; pendingFlushTimer = setTimeout(() => { pendingFlushTimer = null; flushNow(); }, DB_FLUSH_INTERVAL_MS); }; // Tool whitelist: if an agent is set, filter the global tool list to only the // tool names it allows. Unknown names in agent.tools are dropped silently // (handled here by intersection). When no agent: send all tools. const effectiveTools: ToolJsonSchema[] = agent ? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name)) : toolJsonSchemas(); const effectiveTemperature = agent?.temperature; try { return await streamCompletion( ctx, session.model, messages, { tools: effectiveTools, temperature: effectiveTemperature }, (delta) => { state.accumulated += delta; ctx.publish(sessionId, { type: 'delta', message_id: assistantMessageId, chat_id: chatId, content: delta, }); ctx.log.debug({ sessionId, delta }, 'inference delta'); scheduleFlush(); }, signal ); } finally { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } await flushPromise; } } async function handleAbortOrError( ctx: InferenceContext, args: TurnArgs, accumulated: string, err: unknown ): Promise { const { sessionId, chatId, assistantMessageId } = args; const isAbort = err instanceof Error && err.name === 'AbortError'; const finalStatus = isAbort ? 'cancelled' : 'failed'; const errMsg = err instanceof Error ? err.message : String(err); // v1.8.2: persist a structured error metadata blob on genuine failures so // the bubble can render the reason on reload without re-deriving from the // (one-shot) WS error frame. User-initiated abort skips this — there's no // "reason" to surface for a stop the user already explicitly chose. const errorMetadata: MessageMetadata | null = isAbort ? null : { kind: 'error', error_reason: 'llm_provider_error', error_text: errMsg }; if (errorMetadata) { await ctx.sql` UPDATE messages SET status = ${finalStatus}, content = ${accumulated}, finished_at = clock_timestamp(), metadata = ${ctx.sql.json(errorMetadata as never)} WHERE id = ${assistantMessageId} `; } else { await ctx.sql` UPDATE messages SET status = ${finalStatus}, content = ${accumulated}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} `; } const [failSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at }); // v1.8 mobile-tabs: cancellation is a user-initiated stop, treat as idle; // genuine errors flip the dot red. v1.8.2: error path also carries a // machine-readable `reason` so the UI can render specifics inline. if (isAbort) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, }); ctx.log.info({ sessionId, chatId, assistantMessageId }, 'inference cancelled'); } else { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'error', at: new Date().toISOString(), reason: 'llm_provider_error', }); ctx.publish(sessionId, { type: 'error', message_id: assistantMessageId, chat_id: chatId, error: errMsg, reason: 'llm_provider_error', }); ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed'); } } async function executeToolPhase( ctx: InferenceContext, args: TurnArgs, result: StreamResult, startedAt: string | null, session: Session, projectRoot: string ): Promise { const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args; const { content, toolCalls, promptTokens, completionTokens } = result; // v1.11.3: ctx_max comes from llama-swap /upstream//props, not the // streaming completion (which doesn't emit n_ctx). getModelContext caches // the positive lookup for the process lifetime, so this is a single Map // hit after the first invocation per model. const mctx = await modelContext.getModelContext(session.model); const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${content}, status = 'complete', tool_calls = ${ctx.sql.json(toolCalls as never)}, tokens_used = ${completionTokens}, ctx_used = ${promptTokens}, ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; // v1.11: flag for compaction if this turn pushed us over the usable budget. // We never compact mid-loop (the recursive runAssistantTurn keeps tools // flowing); the flag fires on the NEXT turn's pre-fetch hook above. await maybeFlagForCompaction(ctx, chatId, updated); const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at }); for (const tc of toolCalls) { ctx.publish(sessionId, { type: 'tool_call', message_id: assistantMessageId, chat_id: chatId, tool_call: tc, }); } ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); // Batch 9.7: ask_user_input pauses the loop. The tool row is still inserted // (the answer endpoint needs a target row to UPDATE), but tool_results is // pre-stamped with output=null as a "pending" sentinel and no tool_result // frame goes out — the card renders from the tool_call frame alone. Mixed // batches still execute the other tools normally. let pausingForUserInput = false; await Promise.all( toolCalls.map(async (tc) => { const [toolRow] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp()) RETURNING id `; const toolMessageId = toolRow!.id; if (tc.name === 'ask_user_input') { pausingForUserInput = true; const sentinel = { tool_call_id: tc.id, output: null, truncated: false }; await ctx.sql` UPDATE messages SET tool_results = ${ctx.sql.json(sentinel as never)} WHERE id = ${toolMessageId} `; return; } const tres = await executeToolCall(projectRoot, tc); const stored = { tool_call_id: tc.id, output: tres.output, truncated: tres.truncated, ...(tres.error ? { error: tres.error } : {}), }; await ctx.sql` UPDATE messages SET tool_results = ${ctx.sql.json(stored as never)} WHERE id = ${toolMessageId} `; ctx.publish(sessionId, { type: 'tool_result', tool_message_id: toolMessageId, chat_id: chatId, tool_call_id: tc.id, output: tres.output, truncated: tres.truncated, ...(tres.error ? { error: tres.error } : {}), }); }) ); if (pausingForUserInput) { // Drop the dot back to idle — the card is the actionable surface now. // The next inference turn fires from POST /api/chats/:id/answer_user_input // once the user submits their answers. ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString(), }); ctx.log.info( { sessionId, chatId, assistantMessageId }, 'inference paused awaiting user input', ); return; } const [nextAssistant] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; await runAssistantTurn(ctx, { sessionId, chatId, assistantMessageId: nextAssistant!.id, // v1.8.2: charge this turn's actual tool invocations against the budget. // One assistant message can emit multiple tool_calls, so we add the run // count, not 1. The next turn's budget check sees the cumulative total. toolsUsed: toolsUsed + result.toolCalls.length, signal, }); } async function finalizeCompletion( ctx: InferenceContext, args: TurnArgs, result: StreamResult, startedAt: string | null, session: Session ): Promise { const { sessionId, chatId, assistantMessageId } = args; const { content, finishReason, promptTokens, completionTokens } = result; // v1.11.3: see executeToolPhase for the rationale. const mctx = await modelContext.getModelContext(session.model); const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${content}, status = 'complete', tokens_used = ${completionTokens}, ctx_used = ${promptTokens}, ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; // v1.11: flag for compaction on the terminal turn too. Catches the common // case of a turn that hit the limit without invoking tools. await maybeFlagForCompaction(ctx, chatId, updated); const [completeSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: completeSessRow!.project_id, name: completeSessRow!.name, updated_at: completeSessRow!.updated_at }); ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); ctx.log.info( { sessionId, chatId, assistantMessageId, finishReason, chars: content.length, tokens_used: updated?.tokens_used, ctx_used: updated?.ctx_used, }, 'inference complete' ); } async function runAssistantTurn( ctx: InferenceContext, args: TurnArgs, ): Promise { const { sessionId, chatId } = args; // v1.11: if the prior turn flagged this chat for compaction, run it first // so loadContext below reads the post-compaction history. We swallow // compaction failures (clearing the flag so we don't loop) and proceed // with the un-compacted history — a slow turn that hits the model's // hard limit is recoverable; a dead session is not. const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>` SELECT needs_compaction FROM chats WHERE id = ${chatId} `; if (chatFlag[0]?.needs_compaction) { try { await compaction.process({ sql: ctx.sql, config: ctx.config, log: ctx.log, broker: ctx.broker, chatId, }); } catch (err) { ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding'); await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`; } } const loaded = await loadContext(ctx.sql, sessionId, chatId); if (!loaded) { ctx.log.warn({ sessionId }, 'inference: session or project missing'); return; } const { session, project, history } = loaded; const projectRoot = await resolveProjectRoot(project.path); // Agent resolution is per-turn so PATCH agent_id mid-conversation takes // effect on the next message. Unknown agent_id returns null silently — // session falls back to base prompt + all tools + default temperature. const agent = session.agent_id ? await getAgentById(project.path, session.agent_id) : null; // v1.8.2: cap-hit replaces the older "tool loop depth exceeded" failure. // When we've already burned the budget *before* this turn even runs, we // skip straight to the summary flow — the in-flight assistant message slot // gets reused for the wrap-up reply instead of being marked failed. const budget = resolveToolBudget(agent); if (args.toolsUsed >= budget) { await runCapHitSummary(ctx, args, session, project, history, agent, budget); return; } const messages = buildMessagesPayload(session, project, history, agent); const state: StreamPhaseState = { accumulated: '', startedAt: null }; let result: StreamResult; try { result = await executeStreamPhase(ctx, args, session, messages, state, agent); } catch (err) { await handleAbortOrError(ctx, args, state.accumulated, err); return; } if (result.toolCalls.length > 0) { await executeToolPhase(ctx, args, result, state.startedAt, session, projectRoot); return; } await finalizeCompletion(ctx, args, result, state.startedAt, session); } export async function runInference( ctx: InferenceContext, sessionId: string, chatId: string, assistantMessageId: string, signal?: AbortSignal ): Promise { // v1.8.2: every fresh inference (initial send, regenerate, force_send, // continue) starts with a clean budget. Tool-call accumulation across // Continue invocations is what the hard ceiling guards against, not the // per-call budget. return runAssistantTurn(ctx, { sessionId, chatId, assistantMessageId, toolsUsed: 0, signal }); } // v1.8.2: cap-hit summary flow. Called instead of erroring when the loop // hits its budget. Reuses the in-flight assistant message slot to stream a // short wrap-up reply with the synthetic note prepended and tools disabled, // then always inserts a cap_hit sentinel afterward (regardless of summary // outcome) so the UI can show a Continue affordance. async function runCapHitSummary( ctx: InferenceContext, args: TurnArgs, session: Session, project: Project, history: Message[], agent: Agent | null, budget: number, ): Promise { const { sessionId, chatId, assistantMessageId, signal } = args; const messages = buildMessagesPayload(session, project, history, agent); messages.push({ role: 'system', content: CAP_HIT_SUMMARY_NOTE(budget) }); const startedRow = await ctx.sql<{ started_at: string }[]>` UPDATE messages SET started_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING started_at `; const startedAt = startedRow[0]?.started_at ?? null; ctx.publish(sessionId, { type: 'message_started', message_id: assistantMessageId, chat_id: chatId, role: 'assistant', }); let accumulated = ''; let pendingFlushTimer: NodeJS.Timeout | null = null; let flushPromise: Promise = Promise.resolve(); const flushNow = () => { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } const snapshot = accumulated; flushPromise = flushPromise.then(() => ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` ); }; const scheduleFlush = () => { if (pendingFlushTimer) return; pendingFlushTimer = setTimeout(() => { pendingFlushTimer = null; flushNow(); }, DB_FLUSH_INTERVAL_MS); }; let summaryOk = false; let summarySoftCancelled = false; let summaryError: string | null = null; let result: StreamResult | null = null; try { result = await streamCompletion( ctx, session.model, messages, { tools: null, temperature: agent?.temperature }, (delta) => { accumulated += delta; ctx.publish(sessionId, { type: 'delta', message_id: assistantMessageId, chat_id: chatId, content: delta, }); scheduleFlush(); }, signal, ); summaryOk = true; } catch (err) { if (err instanceof Error && err.name === 'AbortError') { summarySoftCancelled = true; } else { summaryError = err instanceof Error ? err.message : String(err); } } finally { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } await flushPromise; } // Finalize the summary message based on the three outcomes. The sentinel // is inserted regardless so the user always has the Continue affordance — // even on a partial / failed summary the chat history shows where the // budget was hit. if (summaryOk && result) { // v1.11.3: see executeToolPhase for the rationale. const mctx = await modelContext.getModelContext(session.model); const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${result.content}, status = 'complete', tokens_used = ${result.completionTokens}, ctx_used = ${result.promptTokens}, ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); } else if (summarySoftCancelled) { await ctx.sql` UPDATE messages SET content = ${accumulated}, status = 'cancelled', finished_at = clock_timestamp() WHERE id = ${assistantMessageId} `; ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, }); } else { const errMeta: MessageMetadata = { kind: 'error', error_reason: 'summary_after_cap_failed', error_text: summaryError ?? 'summary failed', }; await ctx.sql` UPDATE messages SET content = ${accumulated}, status = 'failed', finished_at = clock_timestamp(), metadata = ${ctx.sql.json(errMeta as never)} WHERE id = ${assistantMessageId} `; ctx.publish(sessionId, { type: 'error', message_id: assistantMessageId, chat_id: chatId, error: summaryError ?? 'summary failed', reason: 'summary_after_cap_failed', }); } // Bump session/chat updated_at exactly once for this turn. const [sessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: sessRow!.project_id, name: sessRow!.name, updated_at: sessRow!.updated_at, }); await insertCapHitSentinel(ctx, sessionId, chatId, agent, budget); // Status frame fires last so the dot color reflects the terminal state. // Success → idle, abort → idle (user-driven stop), error → error+reason. if (summaryOk) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); } else if (summarySoftCancelled) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); } else { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'error', at: new Date().toISOString(), reason: 'summary_after_cap_failed', }); } ctx.log.info( { sessionId, chatId, assistantMessageId, budget, summaryOk, summaryCancelled: summarySoftCancelled }, 'inference cap-hit summary finished', ); } async function insertCapHitSentinel( ctx: InferenceContext, sessionId: string, chatId: string, agent: Agent | null, budget: number, ): Promise { // Hard ceiling: count prior cap_hit sentinels in this chat. After two // continues (sentinel count of 2), the next sentinel reports can_continue // false and the UI disables the Continue button. const priorRows = await ctx.sql<{ count: number }[]>` SELECT COUNT(*)::int AS count FROM messages WHERE chat_id = ${chatId} AND role = 'system' AND metadata->>'kind' = 'cap_hit' `; const priorCount = priorRows[0]?.count ?? 0; const canContinue = priorCount < 2; const metadata: MessageMetadata = { kind: 'cap_hit', used: budget, limit: budget, agent_name: agent?.name ?? null, can_continue: canContinue, }; const content = `Reached tool budget (${budget}/${budget}). Continue to extend.`; const [row] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at, metadata) VALUES (${sessionId}, ${chatId}, 'system', ${content}, 'complete', clock_timestamp(), ${ctx.sql.json(metadata as never)}) RETURNING id `; // The sentinel content is static, but we still walk the standard frame // sequence (started → delta → complete) so useSessionStream's reducer // appends it via the same path it uses for streaming assistant messages. // The delta carries the full text in one chunk. ctx.publish(sessionId, { type: 'message_started', message_id: row!.id, chat_id: chatId, role: 'system', }); ctx.publish(sessionId, { type: 'delta', message_id: row!.id, chat_id: chatId, content, }); ctx.publish(sessionId, { type: 'message_complete', message_id: row!.id, chat_id: chatId, metadata, }); } interface InferenceRegistration { controller: AbortController; completed: Promise; } export function createInferenceRunner( ctx: Omit, publishUserFn: (user: string, frame: UserStreamFrame) => void ) { const registry = new Map(); return { enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) { const callCtx: InferenceContext = { ...ctx, publishUser: (frame) => publishUserFn(user, frame), // v1.11: broker comes in via ctx (set at registration time). Repeated // here so the destructure carries it onto the per-call ctx without // having to add it to every enqueue/cancel signature individually. broker: ctx.broker, }; // v1.8 mobile-tabs: announce working before the async loop starts so // every device subscribed to the user channel sees the amber dot. callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'working', at: new Date().toISOString() }); const controller = new AbortController(); let resolveCompleted!: () => void; const completed = new Promise((res) => { resolveCompleted = res; }); const registration: InferenceRegistration = { controller, completed }; registry.set(chatId, registration); void (async () => { try { await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal); setImmediate(() => { void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => { callCtx.log.warn({ err, chatId }, 'auto-name failed'); }); }); } catch (err) { callCtx.log.error({ err }, 'unhandled inference error'); } finally { resolveCompleted(); // Only clear our own registration; a force-send may have replaced it. if (registry.get(chatId) === registration) { registry.delete(chatId); } } })(); }, async cancel(_sessionId: string, chatId: string): Promise { const reg = registry.get(chatId); if (!reg) return false; reg.controller.abort(); // Swallow — we just need to wait for the catch/finally to persist state. await reg.completed.catch(() => {}); return true; }, hasActive(chatId: string): boolean { return registry.has(chatId); }, }; } export const _toolNames = ALL_TOOLS.map((t) => t.name);