import type { FastifyBaseLogger } from 'fastify'; import type { Sql } from '../../db.js'; import type { Config } from '../../config.js'; import type { Agent, ErrorReason, Message, MessageMetadata, Project, Session, ToolCall, UserStreamFrame, } from '../../types/api.js'; import { ALL_TOOLS } from '../tools.js'; import { resolveProjectRoot } from '../path_guard.js'; import { maybeAutoNameChat } from '../auto_name.js'; import { getAgentById } from '../agents.js'; import * as compaction from '../compaction.js'; import type { Broker } from '../broker.js'; import { resolveToolBudget } from './budget.js'; import { detectDoomLoop, } from './sentinels.js'; import { buildMessagesPayload, loadContext, } from './payload.js'; import { finalizeCompletion, handleAbortOrError, } from './error-handler.js'; import { executeStreamPhase, } from './stream-phase.js'; import { executeToolPhase, type ToolPhaseResult } from './tool-phase.js'; import type { StreamPhaseState } from './types.js'; import { runCapHitSummary, runDoomLoopSummary, runStepCapSummary, } from './sentinel-summaries.js'; // v1.14.0: hard ceiling on the number of stream-and-tool iterations per // user-message turn. Per-agent cap via agent.steps is the primary knob; // MAX_STEPS is the safety ceiling. 200 is 4x the effective budget ceiling // (50 tool calls) — in practice budget fires first unless the model makes // many 0-tool-call iterations (which exit the loop via the non-tool finish // path anyway). export const MAX_STEPS = 200; // v1.12.4: re-exported so external callers (tests, future consumers) keep // importing from services/inference.js as the public surface. export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js'; export { buildMessagesPayload } from './payload.js'; export interface InferenceFrame { type: | 'message_started' | 'delta' | 'tool_call' | 'tool_result' | 'message_complete' | 'usage' | 'messages_deleted' | 'session_renamed' | 'chat_renamed' | 'error'; message_id?: string; message_ids?: string[]; chat_id?: string; tool_message_id?: string; tool_call_id?: string; // v1.8.2: 'system' added so cap-hit sentinel messages can announce themselves // through the normal message_started → delta → message_complete sequence. role?: 'assistant' | 'tool' | 'user' | 'system'; content?: string; tool_call?: ToolCall; output?: unknown; truncated?: boolean; error?: string; // v1.8.2: structured error reason. Set on `type: 'error'` so the UI can // surface a specific message; `error` stays the human-readable text. reason?: ErrorReason; // v1.8.2: piggybacks on `message_complete` so static or terminally-resolved // messages can carry their persisted metadata to the live stream without a // refetch (sentinels carry { kind: 'cap_hit', ... }; failed messages carry // { kind: 'error', ... }). metadata?: MessageMetadata | null; tokens_used?: number | null; ctx_used?: number | null; ctx_max?: number | null; completion_tokens?: number | null; started_at?: string | null; finished_at?: string | null; model?: string; session_id?: string; name?: string; } export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void; export interface InferenceContext { sql: Sql; config: Config; log: FastifyBaseLogger; publish: FramePublisher; publishUser: (frame: UserStreamFrame) => void; // v1.11: passed through so compaction.process can publish 'compacted' // frames on the same session WS channel useSessionStream subscribes to. // Compaction is the only path that needs the raw broker handle (regular // inference goes through `publish`); keeping a separate field avoids // tempting other code paths into bypassing the session-id binding. broker: Broker; } // v1.12.4: payload assembly extracted to ./inference/payload.ts (tests // import buildMessagesPayload from this module, so a re-export below // preserves the public surface). Stream + tool phases extracted to // ./inference/stream-phase.ts and ./inference/tool-phase.ts. export interface StreamResult { finishReason: string | null; content: string; toolCalls: ToolCall[]; promptTokens: number | null; completionTokens: number | null; // v1.13.1-C: reasoning text accumulated across reasoning-delta parts. // Empty string when the model doesn't emit reasoning (most cases). reasoning: string; } export interface TurnArgs { sessionId: string; chatId: string; assistantMessageId: string; // v1.8.2: cumulative tool calls executed this run. Compared against the // resolved budget at the top of each turn. Replaces the older `depth` // counter (which counted iterations, not invocations). toolsUsed: number; // v1.11.6: ordered tool calls executed in this user-message turn (across // recursive runAssistantTurn invocations). Reset to [] at user-message // boundaries by runInference, same as toolsUsed. Doom-loop check at the // top of runAssistantTurn slices the last DOOM_LOOP_THRESHOLD entries. recentToolCalls: ToolCall[]; signal: AbortSignal | undefined; } export async function runAssistantTurn( ctx: InferenceContext, args: TurnArgs, ): Promise { const { sessionId, chatId, signal } = args; // v1.14.0: resolve agent once at the top. The agent stays fixed for the // duration of this user-message turn — PATCH agent_id mid-conversation // takes effect on the next runInference, not mid-loop. const initialLoaded = await loadContext(ctx.sql, sessionId, chatId); if (!initialLoaded) { ctx.log.warn({ sessionId }, 'inference: session or project missing'); return; } const { session, project } = initialLoaded; const agent = session.agent_id ? await getAgentById(project.path, session.agent_id) : null; const budget = resolveToolBudget(agent); // v1.14.0: effectiveCap = min(agent.steps ?? Infinity, MAX_STEPS). // steps: 0 means "no tool calls allowed" — the first stream phase runs // but if it emits tool calls they are not executed (finalize as text-only). const effectiveCap = Math.min(agent?.steps ?? Infinity, MAX_STEPS); // steps: 0 special case — model responds text-only. The while loop would // never enter (effectiveCap === 0), so we handle it explicitly before the // loop. The model always gets at least one chance to respond with text. if (effectiveCap === 0) { const loaded = await loadContext(ctx.sql, sessionId, chatId); if (loaded) { await runTextOnlyTurn(ctx, args, loaded.session, loaded.project, loaded.history, agent); } return; } let stepNumber = 0; let toolsUsed = args.toolsUsed; let recentToolCalls = args.recentToolCalls; let assistantMessageId = args.assistantMessageId; while (stepNumber < effectiveCap) { // ---- doom-loop check (moved from top-of-function) ---- const loop = detectDoomLoop(recentToolCalls); if (loop) { // Need fresh history for the summary. const loaded = await loadContext(ctx.sql, sessionId, chatId); if (loaded) { const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, signal }; await runDoomLoopSummary(ctx, iterArgs, loaded.session, loaded.project, loaded.history, agent, loop); } break; } // ---- budget check (moved from top-of-function) ---- if (toolsUsed >= budget) { const loaded = await loadContext(ctx.sql, sessionId, chatId); if (loaded) { const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, signal }; await runCapHitSummary(ctx, iterArgs, loaded.session, loaded.project, loaded.history, agent, budget); } break; } // ---- compaction check ---- // v1.11: if the prior turn flagged this chat for compaction, run it // before loadContext so we read post-compaction history. Swallow // failures and proceed with un-compacted history. const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>` SELECT needs_compaction FROM chats WHERE id = ${chatId} `; if (chatFlag[0]?.needs_compaction) { try { await compaction.process({ sql: ctx.sql, config: ctx.config, log: ctx.log, broker: ctx.broker, chatId, }); } catch (err) { ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding'); await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`; } } // ---- load context (must re-load each iteration — new messages since last step) ---- const loaded = await loadContext(ctx.sql, sessionId, chatId); if (!loaded) { ctx.log.warn({ sessionId }, 'inference: session or project missing mid-loop'); break; } const { session: iterSession, project: iterProject, history } = loaded; const projectRoot = await resolveProjectRoot(iterProject.path); // v1.14.0: log step boundary for instrumentation. step_start parts are in // the schema CHECK but not emitted here — writing to the assistant message // before the stream phase creates a sequence-0 collision with // partsFromAssistantMessage. A WS frame or structured log is sufficient // since the frontend doesn't render step boundaries in v1.14. ctx.log.info({ sessionId, chatId, step: stepNumber, assistantMessageId }, 'step_start'); // ---- build messages + stream phase ---- const messages = await buildMessagesPayload(iterSession, iterProject, history, agent, ctx.log); const webToolsEnabled = iterSession.web_search_enabled ?? iterProject.default_web_search_enabled ?? false; const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, signal }; const state: StreamPhaseState = { accumulated: '', startedAt: null }; let result: StreamResult; try { result = await executeStreamPhase(ctx, iterArgs, iterSession, messages, state, agent, webToolsEnabled); } catch (err) { await handleAbortOrError(ctx, iterArgs, state.accumulated, err); break; } // ---- non-tool finish → finalize and exit ---- if (result.toolCalls.length === 0) { await finalizeCompletion(ctx, iterArgs, result, state.startedAt, iterSession); break; } // ---- steps: 0 edge case ---- // effectiveCap check above guarantees we're inside the loop, but this // guard handles the theoretical case where the model emits tool calls // on step 0 when effectiveCap would have been 0 (impossible since the // while condition prevents entry, but kept for safety). If effectiveCap // is 1 and we're on step 0, tool calls ARE executed — steps counts // iterations, not post-first-stream. // ---- tool phase ---- let toolPhaseResult: ToolPhaseResult; try { toolPhaseResult = await executeToolPhase(ctx, iterArgs, result, state.startedAt, iterSession, projectRoot); } catch (err) { // Tool phase errors are unexpected (individual tool failures are // caught inside executeToolPhase). Log and break. ctx.log.error({ err, sessionId, chatId, step: stepNumber }, 'tool phase threw unexpectedly'); break; } // ---- update loop locals ---- toolsUsed += toolPhaseResult.toolCallCount; recentToolCalls = [...recentToolCalls, ...toolPhaseResult.toolCalls]; stepNumber++; if (toolPhaseResult.action !== 'continue') { // 'paused' (user input) or 'synthesis_done' — stop the loop. break; } // 'continue' — advance to next assistant message. assistantMessageId = toolPhaseResult.nextAssistantId!; } // ---- post-loop: step-cap sentinel ---- // When the loop exits because stepNumber reached effectiveCap, the last // iteration's tool phase returned 'continue' with a nextAssistantId that // is still in 'streaming' status (unfilled). Use it for the wrap-up. if (stepNumber >= effectiveCap && effectiveCap < Infinity) { const loaded = await loadContext(ctx.sql, sessionId, chatId); if (loaded) { const capArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, signal }; await runStepCapSummary(ctx, capArgs, loaded.session, loaded.project, loaded.history, agent, stepNumber, effectiveCap); } } } // v1.14.0: special handling for steps: 0 — the model responds text-only. // The while loop never enters (effectiveCap === 0). We stream once with // no tools, finalize, and return. If the model emits tool calls despite // not being offered tools, they're ignored (finalize as text-only). async function runTextOnlyTurn( ctx: InferenceContext, args: TurnArgs, session: Session, project: Project, history: Message[], agent: Agent | null, ): Promise { const messages = await buildMessagesPayload(session, project, history, agent, ctx.log); // Web tools are irrelevant when steps: 0 (no tool execution), but we // still need to resolve the flag for executeStreamPhase's signature. const webToolsEnabled = session.web_search_enabled ?? project.default_web_search_enabled ?? false; const state: StreamPhaseState = { accumulated: '', startedAt: null }; let result: StreamResult; try { result = await executeStreamPhase(ctx, args, session, messages, state, agent, webToolsEnabled); } catch (err) { await handleAbortOrError(ctx, args, state.accumulated, err); return; } if (result.toolCalls.length > 0) { ctx.log.warn( { chatId: args.chatId, toolCallCount: result.toolCalls.length }, 'steps: 0 agent emitted tool calls; ignoring and finalizing as text-only', ); // Override: strip tool calls so finalizeCompletion treats it as text-only. result = { ...result, toolCalls: [] }; } await finalizeCompletion(ctx, args, result, state.startedAt, session); } export async function runInference( ctx: InferenceContext, sessionId: string, chatId: string, assistantMessageId: string, signal?: AbortSignal ): Promise { // v1.8.2: every fresh inference (initial send, regenerate, force_send, // continue) starts with a clean budget. Tool-call accumulation across // Continue invocations is what the hard ceiling guards against, not the // per-call budget. // v1.11.6: recentToolCalls also resets — doom-loop detection is scoped // to a single user-message turn, so a Continue starts with no history. return runAssistantTurn(ctx, { sessionId, chatId, assistantMessageId, toolsUsed: 0, recentToolCalls: [], signal, }); } // v1.8.2: cap-hit summary flow. Called instead of erroring when the loop // hits its budget. Reuses the in-flight assistant message slot to stream a // short wrap-up reply with the synthetic note prepended and tools disabled, // then always inserts a cap_hit sentinel afterward (regardless of summary // outcome) so the UI can show a Continue affordance. interface InferenceRegistration { controller: AbortController; completed: Promise; } export function createInferenceRunner( ctx: Omit, publishUserFn: (user: string, frame: UserStreamFrame) => void ) { const registry = new Map(); return { enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) { const callCtx: InferenceContext = { ...ctx, publishUser: (frame) => publishUserFn(user, frame), // v1.11: broker comes in via ctx (set at registration time). Repeated // here so the destructure carries it onto the per-call ctx without // having to add it to every enqueue/cancel signature individually. broker: ctx.broker, }; // v1.8 mobile-tabs: announce working before the async loop starts so // every device subscribed to the user channel sees the amber dot. callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'streaming', at: new Date().toISOString() }); const controller = new AbortController(); let resolveCompleted!: () => void; const completed = new Promise((res) => { resolveCompleted = res; }); const registration: InferenceRegistration = { controller, completed }; registry.set(chatId, registration); void (async () => { try { await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal); setImmediate(() => { void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => { callCtx.log.warn({ err, chatId }, 'auto-name failed'); }); }); } catch (err) { callCtx.log.error({ err }, 'unhandled inference error'); } finally { resolveCompleted(); // Only clear our own registration; a force-send may have replaced it. if (registry.get(chatId) === registration) { registry.delete(chatId); } } })(); }, async cancel(_sessionId: string, chatId: string): Promise { const reg = registry.get(chatId); if (!reg) return false; reg.controller.abort(); // Swallow — we just need to wait for the catch/finally to persist state. await reg.completed.catch(() => {}); return true; }, hasActive(chatId: string): boolean { return registry.has(chatId); }, }; } export const _toolNames = ALL_TOOLS.map((t) => t.name);