import type { FastifyBaseLogger } from 'fastify'; import type { Sql } from '../../db.js'; import type { Config } from '../../config.js'; import type { Agent, ErrorReason, Message, MessageMetadata, Project, Session, ToolCall, UserStreamFrame, } from '../../types/api.js'; import { ALL_TOOLS } from '../tools.js'; import { resolveProjectRoot } from '../path_guard.js'; import { maybeAutoNameChat } from '../auto_name.js'; import { getAgentById } from '../agents.js'; import * as compaction from '../compaction.js'; import * as modelContext from '../model-context.js'; import type { Broker } from '../broker.js'; import { resolveToolBudget } from './budget.js'; import { DOOM_LOOP_THRESHOLD, detectDoomLoop, } from './sentinels.js'; import { buildMessagesPayload, loadContext, } from './payload.js'; import { finalizeCompletion, handleAbortOrError, } from './error-handler.js'; import { executeStreamPhase, streamCompletion, } from './stream-phase.js'; import { executeToolPhase } from './tool-phase.js'; import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js'; import { runCapHitSummary, runDoomLoopSummary, } from './sentinel-summaries.js'; // v1.12.4: re-exported so external callers (tests, future consumers) keep // importing from services/inference.js as the public surface. export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js'; export { buildMessagesPayload } from './payload.js'; export interface InferenceFrame { type: | 'message_started' | 'delta' | 'tool_call' | 'tool_result' | 'message_complete' | 'usage' | 'messages_deleted' | 'session_renamed' | 'chat_renamed' | 'error'; message_id?: string; message_ids?: string[]; chat_id?: string; tool_message_id?: string; tool_call_id?: string; // v1.8.2: 'system' added so cap-hit sentinel messages can announce themselves // through the normal message_started → delta → message_complete sequence. role?: 'assistant' | 'tool' | 'user' | 'system'; content?: string; tool_call?: ToolCall; output?: unknown; truncated?: boolean; error?: string; // v1.8.2: structured error reason. Set on `type: 'error'` so the UI can // surface a specific message; `error` stays the human-readable text. reason?: ErrorReason; // v1.8.2: piggybacks on `message_complete` so static or terminally-resolved // messages can carry their persisted metadata to the live stream without a // refetch (sentinels carry { kind: 'cap_hit', ... }; failed messages carry // { kind: 'error', ... }). metadata?: MessageMetadata | null; tokens_used?: number | null; ctx_used?: number | null; ctx_max?: number | null; completion_tokens?: number | null; started_at?: string | null; finished_at?: string | null; model?: string; session_id?: string; name?: string; } export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void; export interface InferenceContext { sql: Sql; config: Config; log: FastifyBaseLogger; publish: FramePublisher; publishUser: (frame: UserStreamFrame) => void; // v1.11: passed through so compaction.process can publish 'compacted' // frames on the same session WS channel useSessionStream subscribes to. // Compaction is the only path that needs the raw broker handle (regular // inference goes through `publish`); keeping a separate field avoids // tempting other code paths into bypassing the session-id binding. broker: Broker; } // v1.12.4: payload assembly extracted to ./inference/payload.ts (tests // import buildMessagesPayload from this module, so a re-export below // preserves the public surface). Stream + tool phases extracted to // ./inference/stream-phase.ts and ./inference/tool-phase.ts. export interface StreamResult { finishReason: string | null; content: string; toolCalls: ToolCall[]; promptTokens: number | null; completionTokens: number | null; // v1.13.1-C: reasoning text accumulated across reasoning-delta parts. // Empty string when the model doesn't emit reasoning (most cases). reasoning: string; } export interface TurnArgs { sessionId: string; chatId: string; assistantMessageId: string; // v1.8.2: cumulative tool calls executed this run. Compared against the // resolved budget at the top of each turn. Replaces the older `depth` // counter (which counted iterations, not invocations). toolsUsed: number; // v1.11.6: ordered tool calls executed in this user-message turn (across // recursive runAssistantTurn invocations). Reset to [] at user-message // boundaries by runInference, same as toolsUsed. Doom-loop check at the // top of runAssistantTurn slices the last DOOM_LOOP_THRESHOLD entries. recentToolCalls: ToolCall[]; signal: AbortSignal | undefined; } export async function runAssistantTurn( ctx: InferenceContext, args: TurnArgs, ): Promise { const { sessionId, chatId } = args; // v1.11: if the prior turn flagged this chat for compaction, run it first // so loadContext below reads the post-compaction history. We swallow // compaction failures (clearing the flag so we don't loop) and proceed // with the un-compacted history — a slow turn that hits the model's // hard limit is recoverable; a dead session is not. const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>` SELECT needs_compaction FROM chats WHERE id = ${chatId} `; if (chatFlag[0]?.needs_compaction) { try { await compaction.process({ sql: ctx.sql, config: ctx.config, log: ctx.log, broker: ctx.broker, chatId, }); } catch (err) { ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding'); await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`; } } const loaded = await loadContext(ctx.sql, sessionId, chatId); if (!loaded) { ctx.log.warn({ sessionId }, 'inference: session or project missing'); return; } const { session, project, history } = loaded; const projectRoot = await resolveProjectRoot(project.path); // Agent resolution is per-turn so PATCH agent_id mid-conversation takes // effect on the next message. Unknown agent_id returns null silently — // session falls back to base prompt + all tools + default temperature. const agent = session.agent_id ? await getAgentById(project.path, session.agent_id) : null; // v1.8.2: cap-hit replaces the older "tool loop depth exceeded" failure. // When we've already burned the budget *before* this turn even runs, we // skip straight to the summary flow — the in-flight assistant message slot // gets reused for the wrap-up reply instead of being marked failed. const budget = resolveToolBudget(agent); if (args.toolsUsed >= budget) { await runCapHitSummary(ctx, args, session, project, history, agent, budget); return; } // v1.11.6: doom-loop guard. Detected BEFORE the budget cap (the model can // burn through 3 identical calls long before the 15-call budget fires). // Same in-flight-slot-reuse pattern as runCapHitSummary — wrap-up reply // lands in args.assistantMessageId, then a doom_loop sentinel is inserted // to make the abort visible in the chat history. const loop = detectDoomLoop(args.recentToolCalls); if (loop) { await runDoomLoopSummary(ctx, args, session, project, history, agent, loop); return; } const messages = await buildMessagesPayload(session, project, history, agent, ctx.log); // v1.11.8: resolve per-chat web-tools opt-in. Tri-state on the wire: // - session.web_search_enabled = null → inherit project default // - session.web_search_enabled = true/false → explicit // Both web_search and web_fetch are gated by this single flag (the UI // label is "Enable web search and fetch" — same store, both tools). // Default is false unless explicitly opted in, matching the v1.9 // plumbing intent ("inert until Batch 8 ships the actual tools"). const webToolsEnabled = session.web_search_enabled ?? project.default_web_search_enabled ?? false; const state: StreamPhaseState = { accumulated: '', startedAt: null }; let result: StreamResult; try { result = await executeStreamPhase(ctx, args, session, messages, state, agent, webToolsEnabled); } catch (err) { await handleAbortOrError(ctx, args, state.accumulated, err); return; } if (result.toolCalls.length > 0) { await executeToolPhase(ctx, args, result, state.startedAt, session, projectRoot); return; } await finalizeCompletion(ctx, args, result, state.startedAt, session); } export async function runInference( ctx: InferenceContext, sessionId: string, chatId: string, assistantMessageId: string, signal?: AbortSignal ): Promise { // v1.8.2: every fresh inference (initial send, regenerate, force_send, // continue) starts with a clean budget. Tool-call accumulation across // Continue invocations is what the hard ceiling guards against, not the // per-call budget. // v1.11.6: recentToolCalls also resets — doom-loop detection is scoped // to a single user-message turn, so a Continue starts with no history. return runAssistantTurn(ctx, { sessionId, chatId, assistantMessageId, toolsUsed: 0, recentToolCalls: [], signal, }); } // v1.8.2: cap-hit summary flow. Called instead of erroring when the loop // hits its budget. Reuses the in-flight assistant message slot to stream a // short wrap-up reply with the synthetic note prepended and tools disabled, // then always inserts a cap_hit sentinel afterward (regardless of summary // outcome) so the UI can show a Continue affordance. interface InferenceRegistration { controller: AbortController; completed: Promise; } export function createInferenceRunner( ctx: Omit, publishUserFn: (user: string, frame: UserStreamFrame) => void ) { const registry = new Map(); return { enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) { const callCtx: InferenceContext = { ...ctx, publishUser: (frame) => publishUserFn(user, frame), // v1.11: broker comes in via ctx (set at registration time). Repeated // here so the destructure carries it onto the per-call ctx without // having to add it to every enqueue/cancel signature individually. broker: ctx.broker, }; // v1.8 mobile-tabs: announce working before the async loop starts so // every device subscribed to the user channel sees the amber dot. callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'streaming', at: new Date().toISOString() }); const controller = new AbortController(); let resolveCompleted!: () => void; const completed = new Promise((res) => { resolveCompleted = res; }); const registration: InferenceRegistration = { controller, completed }; registry.set(chatId, registration); void (async () => { try { await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal); setImmediate(() => { void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => { callCtx.log.warn({ err, chatId }, 'auto-name failed'); }); }); } catch (err) { callCtx.log.error({ err }, 'unhandled inference error'); } finally { resolveCompleted(); // Only clear our own registration; a force-send may have replaced it. if (registry.get(chatId) === registration) { registry.delete(chatId); } } })(); }, async cancel(_sessionId: string, chatId: string): Promise { const reg = registry.get(chatId); if (!reg) return false; reg.controller.abort(); // Swallow — we just need to wait for the catch/finally to persist state. await reg.completed.catch(() => {}); return true; }, hasActive(chatId: string): boolean { return registry.has(chatId); }, }; } export const _toolNames = ALL_TOOLS.map((t) => t.name);