import type { FastifyBaseLogger } from 'fastify'; import type { Sql } from '../db.js'; import type { Config } from '../config.js'; import type { Agent, ErrorReason, Message, MessageMetadata, Project, Session, ToolCall, UserStreamFrame, } from '../types/api.js'; import { ALL_TOOLS, READ_ONLY_TOOL_NAMES, TOOLS_BY_NAME, toolJsonSchemas, type ToolJsonSchema, } from './tools.js'; import { PathScopeError, resolveProjectRoot } from './path_guard.js'; import { maybeAutoNameChat } from './auto_name.js'; import { getAgentById } from './agents.js'; const BASE_SYSTEM_PROMPT = (projectPath: string) => `You are BooCode Chat, a code investigation assistant. The user is working on a project located at ${projectPath}. Use the file-read tools (view_file, list_dir, grep, find_files) to investigate code when needed. Be concise. Cite file paths and line numbers when discussing code. Do not hallucinate file contents — read the file first. Tool results may be truncated; if so, narrow your query rather than guessing.`; const DB_FLUSH_INTERVAL_MS = 500; // v1.8.2: tool-call budget defaults. Resolved per-turn by resolveToolBudget. // - Agent with explicit max_tool_calls: that value. // - Agent with read-only-only tools: BUDGET_READ_ONLY (30). // - Agent with any non-read-only tool: BUDGET_NON_READ_ONLY (10). // - No agent (raw chat): BUDGET_NO_AGENT (15). const BUDGET_READ_ONLY = 30; const BUDGET_NON_READ_ONLY = 10; const BUDGET_NO_AGENT = 15; const READ_ONLY_SET: ReadonlySet = new Set(READ_ONLY_TOOL_NAMES); function resolveToolBudget(agent: Agent | null): number { if (agent?.max_tool_calls != null) return agent.max_tool_calls; if (!agent) return BUDGET_NO_AGENT; const allReadOnly = agent.tools.every((t) => READ_ONLY_SET.has(t)); return allReadOnly ? BUDGET_READ_ONLY : BUDGET_NON_READ_ONLY; } // Synthetic system note appended to the cap-hit summary call. Verbatim from // the v1.8.2 spec — do not paraphrase: the model is more reliable when the // instruction is short, declarative, and identical across calls. const CAP_HIT_SUMMARY_NOTE = (limit: number) => `You've reached the tool budget (${limit} calls). Produce the best answer you can with what you have. Do not call more tools.`; function isCapHitSentinel(m: Message): boolean { return ( m.role === 'system' && m.metadata !== null && typeof m.metadata === 'object' && (m.metadata as { kind?: unknown }).kind === 'cap_hit' ); } export interface InferenceFrame { type: | 'message_started' | 'delta' | 'tool_call' | 'tool_result' | 'message_complete' | 'messages_deleted' | 'session_renamed' | 'chat_renamed' | 'error'; message_id?: string; message_ids?: string[]; chat_id?: string; tool_message_id?: string; tool_call_id?: string; // v1.8.2: 'system' added so cap-hit sentinel messages can announce themselves // through the normal message_started → delta → message_complete sequence. role?: 'assistant' | 'tool' | 'user' | 'system'; content?: string; tool_call?: ToolCall; output?: unknown; truncated?: boolean; error?: string; // v1.8.2: structured error reason. Set on `type: 'error'` so the UI can // surface a specific message; `error` stays the human-readable text. reason?: ErrorReason; // v1.8.2: piggybacks on `message_complete` so static or terminally-resolved // messages can carry their persisted metadata to the live stream without a // refetch (sentinels carry { kind: 'cap_hit', ... }; failed messages carry // { kind: 'error', ... }). metadata?: MessageMetadata | null; tokens_used?: number | null; ctx_used?: number | null; ctx_max?: number | null; started_at?: string | null; finished_at?: string | null; model?: string; session_id?: string; name?: string; } export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void; interface OpenAiMessage { role: 'system' | 'user' | 'assistant' | 'tool'; content: string | null; tool_calls?: Array<{ id: string; type: 'function'; function: { name: string; arguments: string }; }>; tool_call_id?: string; } interface ChatCompletionDelta { role?: string; content?: string | null; tool_calls?: Array<{ index: number; id?: string; type?: 'function'; function?: { name?: string; arguments?: string }; }>; } interface ChatCompletionChunk { choices?: Array<{ delta: ChatCompletionDelta; finish_reason: string | null; }>; usage?: { prompt_tokens?: number; completion_tokens?: number; total_tokens?: number; }; timings?: { n_ctx?: number; }; } export interface InferenceContext { sql: Sql; config: Config; log: FastifyBaseLogger; publish: FramePublisher; publishUser: (frame: UserStreamFrame) => void; } // Resolution order: base prompt < agent.system_prompt < user prompt, where // user prompt = session.system_prompt if non-empty, else project's // default_system_prompt if non-empty, else nothing. Empty/whitespace-only // counts as "no override" for both layers (v1.9 inherit semantics — keeps // the column non-nullable so the existing key/value store stays put). export function buildSystemPrompt( project: Project, session: Session, agent: Agent | null ): string { let out = BASE_SYSTEM_PROMPT(project.path); if (agent && agent.system_prompt.trim().length > 0) { out += '\n\n' + agent.system_prompt.trim(); } const sessionPrompt = session.system_prompt?.trim() ?? ''; const projectPrompt = project.default_system_prompt?.trim() ?? ''; const userPrompt = sessionPrompt || projectPrompt; if (userPrompt.length > 0) { out += '\n\n' + userPrompt; } return out; } export function buildMessagesPayload( session: Session, project: Project, history: Message[], agent: Agent | null = null ): OpenAiMessage[] { const out: OpenAiMessage[] = []; const systemPrompt = buildSystemPrompt(project, session, agent); out.push({ role: 'system', content: systemPrompt }); // Find the latest compact marker — only send messages from that point onwards let startIdx = 0; for (let i = history.length - 1; i >= 0; i--) { if (history[i]!.kind === 'compact') { startIdx = i; break; } } for (let i = startIdx; i < history.length; i++) { const m = history[i]!; if (m.kind === 'compact') { out.push({ role: 'system', content: m.content }); continue; } // v1.8.2: cap-hit sentinels are UI-only — never send them to the LLM. The // synthetic "you've reached the tool budget" note lives only inside the // summary call's messages array and is never persisted, so on Continue // the model resumes with a clean context. if (isCapHitSentinel(m)) continue; if (m.role === 'assistant' && m.status === 'streaming') continue; if (m.role === 'assistant' && m.status === 'cancelled') continue; if (m.role === 'tool') { const tr = m.tool_results; if (!tr) continue; const outputText = tr.error ? `error: ${tr.error}` : typeof tr.output === 'string' ? tr.output : JSON.stringify(tr.output); out.push({ role: 'tool', content: outputText, tool_call_id: tr.tool_call_id, }); continue; } if (m.role === 'assistant') { const msg: OpenAiMessage = { role: 'assistant', content: m.content && m.content.length > 0 ? m.content : null, }; if (m.tool_calls && m.tool_calls.length > 0) { msg.tool_calls = m.tool_calls.map((tc) => ({ id: tc.id, type: 'function' as const, function: { name: tc.name, arguments: JSON.stringify(tc.args) }, })); } out.push(msg); continue; } out.push({ role: 'user', content: m.content }); } return out; } async function loadContext( sql: Sql, sessionId: string, chatId: string ): Promise<{ session: Session; project: Project; history: Message[] } | null> { const sessionRows = await sql` SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at, agent_id, web_search_enabled FROM sessions WHERE id = ${sessionId} `; if (sessionRows.length === 0) return null; const session = sessionRows[0]!; const projectRows = await sql` SELECT id, name, path, added_at, last_session_id, status, gitea_remote, default_system_prompt, default_web_search_enabled FROM projects WHERE id = ${session.project_id} `; if (projectRows.length === 0) return null; const project = projectRows[0]!; const history = await sql` SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq, tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata FROM messages WHERE chat_id = ${chatId} ORDER BY created_at ASC, id ASC `; return { session, project, history }; } async function* sseLines(stream: ReadableStream): AsyncGenerator { const reader = stream.getReader(); const decoder = new TextDecoder('utf-8'); let buffer = ''; try { while (true) { const { value, done } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); let idx; while ((idx = buffer.indexOf('\n')) >= 0) { const line = buffer.slice(0, idx).replace(/\r$/, ''); buffer = buffer.slice(idx + 1); if (line.length === 0) continue; yield line; } } if (buffer.length > 0) yield buffer; } finally { reader.releaseLock(); } } interface StreamResult { finishReason: string | null; content: string; toolCalls: ToolCall[]; promptTokens: number | null; completionTokens: number | null; nCtx: number | null; } interface StreamOptions { // null = omit tools entirely (compact phase); [] = caller stripped all tools // (rare; we still omit from the request body to avoid OpenAI 400). tools: ToolJsonSchema[] | null; temperature?: number; } async function streamCompletion( ctx: InferenceContext, model: string, messages: OpenAiMessage[], opts: StreamOptions, onDelta: (content: string) => void, signal?: AbortSignal ): Promise { const body: Record = { model, messages, stream: true, stream_options: { include_usage: true }, }; if (opts.tools && opts.tools.length > 0) { body['tools'] = opts.tools; body['tool_choice'] = 'auto'; } if (typeof opts.temperature === 'number') { body['temperature'] = opts.temperature; } const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body), signal, }); if (!res.ok || !res.body) { const text = await res.text().catch(() => ''); throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`); } let content = ''; let finishReason: string | null = null; let promptTokens: number | null = null; let completionTokens: number | null = null; let nCtx: number | null = null; const toolCallsBuffer = new Map(); for await (const line of sseLines(res.body)) { if (!line.startsWith('data:')) continue; const payload = line.slice(5).trim(); if (payload === '[DONE]') break; let parsed: ChatCompletionChunk; try { parsed = JSON.parse(payload); } catch { continue; } if (parsed.usage) { if (typeof parsed.usage.prompt_tokens === 'number') { promptTokens = parsed.usage.prompt_tokens; } if (typeof parsed.usage.completion_tokens === 'number') { completionTokens = parsed.usage.completion_tokens; } } if (parsed.timings && typeof parsed.timings.n_ctx === 'number') { nCtx = parsed.timings.n_ctx; } const choice = parsed.choices?.[0]; if (!choice) continue; const delta = choice.delta ?? {}; if (typeof delta.content === 'string' && delta.content.length > 0) { content += delta.content; onDelta(delta.content); } if (Array.isArray(delta.tool_calls)) { for (const tc of delta.tool_calls) { const idx = tc.index; const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' }; if (tc.id) existing.id = tc.id; if (tc.function?.name) existing.name = tc.function.name; if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments; toolCallsBuffer.set(idx, existing); } } if (choice.finish_reason) finishReason = choice.finish_reason; } const toolCalls: ToolCall[] = []; for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) { let args: Record = {}; if (t.argsText.length > 0) { try { args = JSON.parse(t.argsText); } catch { args = { _raw: t.argsText }; } } toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args }); } return { finishReason, content, toolCalls, promptTokens, completionTokens, nCtx }; } async function executeToolCall( projectRoot: string, toolCall: ToolCall ): Promise<{ output: unknown; truncated: boolean; error?: string }> { const tool = TOOLS_BY_NAME[toolCall.name]; if (!tool) { return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` }; } const parsed = tool.inputSchema.safeParse(toolCall.args); if (!parsed.success) { return { output: null, truncated: false, error: `invalid input: ${JSON.stringify(parsed.error.flatten())}`, }; } try { const output = await tool.execute(parsed.data, projectRoot); const truncated = typeof output === 'object' && output !== null && 'truncated' in output ? Boolean((output as { truncated: unknown }).truncated) : false; return { output, truncated }; } catch (err) { if (err instanceof PathScopeError) { return { output: null, truncated: false, error: err.message }; } return { output: null, truncated: false, error: err instanceof Error ? err.message : String(err), }; } } interface TurnArgs { sessionId: string; chatId: string; assistantMessageId: string; // v1.8.2: cumulative tool calls executed this run. Compared against the // resolved budget at the top of each turn. Replaces the older `depth` // counter (which counted iterations, not invocations). toolsUsed: number; signal: AbortSignal | undefined; } interface StreamPhaseState { accumulated: string; startedAt: string | null; } async function executeStreamPhase( ctx: InferenceContext, args: TurnArgs, session: Session, messages: OpenAiMessage[], state: StreamPhaseState, agent: Agent | null ): Promise { const { sessionId, chatId, assistantMessageId, signal } = args; const startedRow = await ctx.sql<{ started_at: string }[]>` UPDATE messages SET started_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING started_at `; state.startedAt = startedRow[0]?.started_at ?? null; ctx.publish(sessionId, { type: 'message_started', message_id: assistantMessageId, chat_id: chatId, role: 'assistant', }); let pendingFlushTimer: NodeJS.Timeout | null = null; let flushPromise: Promise = Promise.resolve(); const flushNow = () => { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } const snapshot = state.accumulated; flushPromise = flushPromise.then(() => ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` ); }; const scheduleFlush = () => { if (pendingFlushTimer) return; pendingFlushTimer = setTimeout(() => { pendingFlushTimer = null; flushNow(); }, DB_FLUSH_INTERVAL_MS); }; // Tool whitelist: if an agent is set, filter the global tool list to only the // tool names it allows. Unknown names in agent.tools are dropped silently // (handled here by intersection). When no agent: send all tools. const effectiveTools: ToolJsonSchema[] = agent ? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name)) : toolJsonSchemas(); const effectiveTemperature = agent?.temperature; try { return await streamCompletion( ctx, session.model, messages, { tools: effectiveTools, temperature: effectiveTemperature }, (delta) => { state.accumulated += delta; ctx.publish(sessionId, { type: 'delta', message_id: assistantMessageId, chat_id: chatId, content: delta, }); ctx.log.debug({ sessionId, delta }, 'inference delta'); scheduleFlush(); }, signal ); } finally { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } await flushPromise; } } async function handleAbortOrError( ctx: InferenceContext, args: TurnArgs, accumulated: string, err: unknown ): Promise { const { sessionId, chatId, assistantMessageId } = args; const isAbort = err instanceof Error && err.name === 'AbortError'; const finalStatus = isAbort ? 'cancelled' : 'failed'; const errMsg = err instanceof Error ? err.message : String(err); // v1.8.2: persist a structured error metadata blob on genuine failures so // the bubble can render the reason on reload without re-deriving from the // (one-shot) WS error frame. User-initiated abort skips this — there's no // "reason" to surface for a stop the user already explicitly chose. const errorMetadata: MessageMetadata | null = isAbort ? null : { kind: 'error', error_reason: 'llm_provider_error', error_text: errMsg }; if (errorMetadata) { await ctx.sql` UPDATE messages SET status = ${finalStatus}, content = ${accumulated}, finished_at = clock_timestamp(), metadata = ${ctx.sql.json(errorMetadata as never)} WHERE id = ${assistantMessageId} `; } else { await ctx.sql` UPDATE messages SET status = ${finalStatus}, content = ${accumulated}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} `; } const [failSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at }); // v1.8 mobile-tabs: cancellation is a user-initiated stop, treat as idle; // genuine errors flip the dot red. v1.8.2: error path also carries a // machine-readable `reason` so the UI can render specifics inline. if (isAbort) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, }); ctx.log.info({ sessionId, chatId, assistantMessageId }, 'inference cancelled'); } else { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'error', at: new Date().toISOString(), reason: 'llm_provider_error', }); ctx.publish(sessionId, { type: 'error', message_id: assistantMessageId, chat_id: chatId, error: errMsg, reason: 'llm_provider_error', }); ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed'); } } async function executeToolPhase( ctx: InferenceContext, args: TurnArgs, result: StreamResult, startedAt: string | null, session: Session, projectRoot: string ): Promise { const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args; const { content, toolCalls, promptTokens, completionTokens, nCtx } = result; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${content}, status = 'complete', tool_calls = ${ctx.sql.json(toolCalls as never)}, tokens_used = ${completionTokens}, ctx_used = ${promptTokens}, ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at }); for (const tc of toolCalls) { ctx.publish(sessionId, { type: 'tool_call', message_id: assistantMessageId, chat_id: chatId, tool_call: tc, }); } ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); await Promise.all( toolCalls.map(async (tc) => { const [toolRow] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp()) RETURNING id `; const toolMessageId = toolRow!.id; const tres = await executeToolCall(projectRoot, tc); const stored = { tool_call_id: tc.id, output: tres.output, truncated: tres.truncated, ...(tres.error ? { error: tres.error } : {}), }; await ctx.sql` UPDATE messages SET tool_results = ${ctx.sql.json(stored as never)} WHERE id = ${toolMessageId} `; ctx.publish(sessionId, { type: 'tool_result', tool_message_id: toolMessageId, chat_id: chatId, tool_call_id: tc.id, output: tres.output, truncated: tres.truncated, ...(tres.error ? { error: tres.error } : {}), }); }) ); const [nextAssistant] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; await runAssistantTurn(ctx, { sessionId, chatId, assistantMessageId: nextAssistant!.id, // v1.8.2: charge this turn's actual tool invocations against the budget. // One assistant message can emit multiple tool_calls, so we add the run // count, not 1. The next turn's budget check sees the cumulative total. toolsUsed: toolsUsed + result.toolCalls.length, signal, }); } async function finalizeCompletion( ctx: InferenceContext, args: TurnArgs, result: StreamResult, startedAt: string | null, session: Session ): Promise { const { sessionId, chatId, assistantMessageId } = args; const { content, finishReason, promptTokens, completionTokens, nCtx } = result; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${content}, status = 'complete', tokens_used = ${completionTokens}, ctx_used = ${promptTokens}, ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; const [completeSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: completeSessRow!.project_id, name: completeSessRow!.name, updated_at: completeSessRow!.updated_at }); ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); ctx.log.info( { sessionId, chatId, assistantMessageId, finishReason, chars: content.length, tokens_used: updated?.tokens_used, ctx_used: updated?.ctx_used, }, 'inference complete' ); } async function runAssistantTurn( ctx: InferenceContext, args: TurnArgs, ): Promise { const { sessionId, chatId } = args; const loaded = await loadContext(ctx.sql, sessionId, chatId); if (!loaded) { ctx.log.warn({ sessionId }, 'inference: session or project missing'); return; } const { session, project, history } = loaded; const projectRoot = await resolveProjectRoot(project.path); // Agent resolution is per-turn so PATCH agent_id mid-conversation takes // effect on the next message. Unknown agent_id returns null silently — // session falls back to base prompt + all tools + default temperature. const agent = session.agent_id ? await getAgentById(project.path, session.agent_id) : null; // v1.8.2: cap-hit replaces the older "tool loop depth exceeded" failure. // When we've already burned the budget *before* this turn even runs, we // skip straight to the summary flow — the in-flight assistant message slot // gets reused for the wrap-up reply instead of being marked failed. const budget = resolveToolBudget(agent); if (args.toolsUsed >= budget) { await runCapHitSummary(ctx, args, session, project, history, agent, budget); return; } const messages = buildMessagesPayload(session, project, history, agent); const state: StreamPhaseState = { accumulated: '', startedAt: null }; let result: StreamResult; try { result = await executeStreamPhase(ctx, args, session, messages, state, agent); } catch (err) { await handleAbortOrError(ctx, args, state.accumulated, err); return; } if (result.toolCalls.length > 0) { await executeToolPhase(ctx, args, result, state.startedAt, session, projectRoot); return; } await finalizeCompletion(ctx, args, result, state.startedAt, session); } export async function runInference( ctx: InferenceContext, sessionId: string, chatId: string, assistantMessageId: string, signal?: AbortSignal ): Promise { // v1.8.2: every fresh inference (initial send, regenerate, force_send, // continue) starts with a clean budget. Tool-call accumulation across // Continue invocations is what the hard ceiling guards against, not the // per-call budget. return runAssistantTurn(ctx, { sessionId, chatId, assistantMessageId, toolsUsed: 0, signal }); } // v1.8.2: cap-hit summary flow. Called instead of erroring when the loop // hits its budget. Reuses the in-flight assistant message slot to stream a // short wrap-up reply with the synthetic note prepended and tools disabled, // then always inserts a cap_hit sentinel afterward (regardless of summary // outcome) so the UI can show a Continue affordance. async function runCapHitSummary( ctx: InferenceContext, args: TurnArgs, session: Session, project: Project, history: Message[], agent: Agent | null, budget: number, ): Promise { const { sessionId, chatId, assistantMessageId, signal } = args; const messages = buildMessagesPayload(session, project, history, agent); messages.push({ role: 'system', content: CAP_HIT_SUMMARY_NOTE(budget) }); const startedRow = await ctx.sql<{ started_at: string }[]>` UPDATE messages SET started_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING started_at `; const startedAt = startedRow[0]?.started_at ?? null; ctx.publish(sessionId, { type: 'message_started', message_id: assistantMessageId, chat_id: chatId, role: 'assistant', }); let accumulated = ''; let pendingFlushTimer: NodeJS.Timeout | null = null; let flushPromise: Promise = Promise.resolve(); const flushNow = () => { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } const snapshot = accumulated; flushPromise = flushPromise.then(() => ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` ); }; const scheduleFlush = () => { if (pendingFlushTimer) return; pendingFlushTimer = setTimeout(() => { pendingFlushTimer = null; flushNow(); }, DB_FLUSH_INTERVAL_MS); }; let summaryOk = false; let summarySoftCancelled = false; let summaryError: string | null = null; let result: StreamResult | null = null; try { result = await streamCompletion( ctx, session.model, messages, { tools: null, temperature: agent?.temperature }, (delta) => { accumulated += delta; ctx.publish(sessionId, { type: 'delta', message_id: assistantMessageId, chat_id: chatId, content: delta, }); scheduleFlush(); }, signal, ); summaryOk = true; } catch (err) { if (err instanceof Error && err.name === 'AbortError') { summarySoftCancelled = true; } else { summaryError = err instanceof Error ? err.message : String(err); } } finally { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } await flushPromise; } // Finalize the summary message based on the three outcomes. The sentinel // is inserted regardless so the user always has the Continue affordance — // even on a partial / failed summary the chat history shows where the // budget was hit. if (summaryOk && result) { const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${result.content}, status = 'complete', tokens_used = ${result.completionTokens}, ctx_used = ${result.promptTokens}, ctx_max = ${result.nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); } else if (summarySoftCancelled) { await ctx.sql` UPDATE messages SET content = ${accumulated}, status = 'cancelled', finished_at = clock_timestamp() WHERE id = ${assistantMessageId} `; ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, }); } else { const errMeta: MessageMetadata = { kind: 'error', error_reason: 'summary_after_cap_failed', error_text: summaryError ?? 'summary failed', }; await ctx.sql` UPDATE messages SET content = ${accumulated}, status = 'failed', finished_at = clock_timestamp(), metadata = ${ctx.sql.json(errMeta as never)} WHERE id = ${assistantMessageId} `; ctx.publish(sessionId, { type: 'error', message_id: assistantMessageId, chat_id: chatId, error: summaryError ?? 'summary failed', reason: 'summary_after_cap_failed', }); } // Bump session/chat updated_at exactly once for this turn. const [sessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: sessRow!.project_id, name: sessRow!.name, updated_at: sessRow!.updated_at, }); await insertCapHitSentinel(ctx, sessionId, chatId, agent, budget); // Status frame fires last so the dot color reflects the terminal state. // Success → idle, abort → idle (user-driven stop), error → error+reason. if (summaryOk) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); } else if (summarySoftCancelled) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); } else { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'error', at: new Date().toISOString(), reason: 'summary_after_cap_failed', }); } ctx.log.info( { sessionId, chatId, assistantMessageId, budget, summaryOk, summaryCancelled: summarySoftCancelled }, 'inference cap-hit summary finished', ); } async function insertCapHitSentinel( ctx: InferenceContext, sessionId: string, chatId: string, agent: Agent | null, budget: number, ): Promise { // Hard ceiling: count prior cap_hit sentinels in this chat. After two // continues (sentinel count of 2), the next sentinel reports can_continue // false and the UI disables the Continue button. const priorRows = await ctx.sql<{ count: number }[]>` SELECT COUNT(*)::int AS count FROM messages WHERE chat_id = ${chatId} AND role = 'system' AND metadata->>'kind' = 'cap_hit' `; const priorCount = priorRows[0]?.count ?? 0; const canContinue = priorCount < 2; const metadata: MessageMetadata = { kind: 'cap_hit', used: budget, limit: budget, agent_name: agent?.name ?? null, can_continue: canContinue, }; const content = `Reached tool budget (${budget}/${budget}). Continue to extend.`; const [row] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at, metadata) VALUES (${sessionId}, ${chatId}, 'system', ${content}, 'complete', clock_timestamp(), ${ctx.sql.json(metadata as never)}) RETURNING id `; // The sentinel content is static, but we still walk the standard frame // sequence (started → delta → complete) so useSessionStream's reducer // appends it via the same path it uses for streaming assistant messages. // The delta carries the full text in one chunk. ctx.publish(sessionId, { type: 'message_started', message_id: row!.id, chat_id: chatId, role: 'system', }); ctx.publish(sessionId, { type: 'delta', message_id: row!.id, chat_id: chatId, content, }); ctx.publish(sessionId, { type: 'message_complete', message_id: row!.id, chat_id: chatId, metadata, }); } const COMPACT_SYSTEM_PROMPT = 'Summarize the preceding conversation into a dense but complete context paragraph. Preserve all key facts, decisions, file paths, code patterns, and action items. Do not add any new information. Output only the summary paragraph.'; async function runCompact( ctx: InferenceContext, sessionId: string, chatId: string, compactMessageId: string ): Promise { const loaded = await loadContext(ctx.sql, sessionId, chatId); if (!loaded) return; const { session, project, history } = loaded; const messagesForSummary = buildMessagesPayload(session, project, history.filter((m) => m.id !== compactMessageId) ); messagesForSummary.push({ role: 'system', content: COMPACT_SYSTEM_PROMPT, }); ctx.publish(sessionId, { type: 'message_started', message_id: compactMessageId, chat_id: chatId, role: 'assistant', }); let content = ''; try { const result = await streamCompletion( ctx, session.model, messagesForSummary, { tools: null }, (delta) => { content += delta; ctx.publish(sessionId, { type: 'delta', message_id: compactMessageId, chat_id: chatId, content: delta, }); } ); content = result.content; } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); await ctx.sql` UPDATE messages SET status = 'failed', content = ${content}, finished_at = clock_timestamp() WHERE id = ${compactMessageId} `; ctx.publish(sessionId, { type: 'error', message_id: compactMessageId, chat_id: chatId, error: errMsg, }); return; } const preCompactCount = history.filter((m) => m.id !== compactMessageId && m.kind !== 'compact').length; const summary = `[Context compacted — ${preCompactCount} messages summarized]\n\n${content}`; await ctx.sql` UPDATE messages SET content = ${summary}, status = 'complete', finished_at = clock_timestamp() WHERE id = ${compactMessageId} `; ctx.publish(sessionId, { type: 'message_complete', message_id: compactMessageId, chat_id: chatId, }); } interface InferenceRegistration { controller: AbortController; completed: Promise; } export function createInferenceRunner( ctx: Omit, publishUserFn: (user: string, frame: UserStreamFrame) => void ) { const registry = new Map(); return { enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) { const callCtx: InferenceContext = { ...ctx, publishUser: (frame) => publishUserFn(user, frame), }; // v1.8 mobile-tabs: announce working before the async loop starts so // every device subscribed to the user channel sees the amber dot. callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'working', at: new Date().toISOString() }); const controller = new AbortController(); let resolveCompleted!: () => void; const completed = new Promise((res) => { resolveCompleted = res; }); const registration: InferenceRegistration = { controller, completed }; registry.set(chatId, registration); void (async () => { try { await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal); setImmediate(() => { void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => { callCtx.log.warn({ err, chatId }, 'auto-name failed'); }); }); } catch (err) { callCtx.log.error({ err }, 'unhandled inference error'); } finally { resolveCompleted(); // Only clear our own registration; a force-send may have replaced it. if (registry.get(chatId) === registration) { registry.delete(chatId); } } })(); }, enqueueCompact(sessionId: string, chatId: string, compactMessageId: string, user: string) { const callCtx: InferenceContext = { ...ctx, publishUser: (frame) => publishUserFn(user, frame), }; void (async () => { try { await runCompact(callCtx, sessionId, chatId, compactMessageId); } catch (err) { callCtx.log.error({ err }, 'unhandled compact error'); } })(); }, async cancel(_sessionId: string, chatId: string): Promise { const reg = registry.get(chatId); if (!reg) return false; reg.controller.abort(); // Swallow — we just need to wait for the catch/finally to persist state. await reg.completed.catch(() => {}); return true; }, hasActive(chatId: string): boolean { return registry.has(chatId); }, }; } export const _toolNames = ALL_TOOLS.map((t) => t.name);