import type { Agent, Message, MessageMetadata, Project, Session, } from '../../types/api.js'; import * as modelContext from '../model-context.js'; import { buildMessagesPayload } from './payload.js'; import { DOOM_LOOP_THRESHOLD } from './sentinels.js'; import { streamCompletion } from './stream-phase.js'; import { DB_FLUSH_INTERVAL_MS } from './types.js'; import type { InferenceContext, StreamResult, TurnArgs, } from './turn.js'; // Synthetic system note appended to the cap-hit summary call. Verbatim from // the v1.8.2 spec — do not paraphrase: the model is more reliable when the // instruction is short, declarative, and identical across calls. const CAP_HIT_SUMMARY_NOTE = (limit: number) => `You've reached the tool budget (${limit} calls). Produce the best answer you can with what you have. Do not call more tools.`; const DOOM_LOOP_NOTE = (name: string) => `You called ${name} with the same arguments ${DOOM_LOOP_THRESHOLD} times in a row. Stop calling it. Produce the best answer you can with what you have.`; export async function runCapHitSummary( ctx: InferenceContext, args: TurnArgs, session: Session, project: Project, history: Message[], agent: Agent | null, budget: number, ): Promise { const { sessionId, chatId, assistantMessageId, signal } = args; const messages = await buildMessagesPayload(session, project, history, agent, ctx.log); messages.push({ role: 'system', content: CAP_HIT_SUMMARY_NOTE(budget) }); const startedRow = await ctx.sql<{ started_at: string }[]>` UPDATE messages SET started_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING started_at `; const startedAt = startedRow[0]?.started_at ?? null; ctx.publish(sessionId, { type: 'message_started', message_id: assistantMessageId, chat_id: chatId, role: 'assistant', }); let accumulated = ''; let pendingFlushTimer: NodeJS.Timeout | null = null; let flushPromise: Promise = Promise.resolve(); const flushNow = () => { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } const snapshot = accumulated; flushPromise = flushPromise.then(() => ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` ); }; const scheduleFlush = () => { if (pendingFlushTimer) return; pendingFlushTimer = setTimeout(() => { pendingFlushTimer = null; flushNow(); }, DB_FLUSH_INTERVAL_MS); }; let summaryOk = false; let summarySoftCancelled = false; let summaryError: string | null = null; let result: StreamResult | null = null; try { result = await streamCompletion( ctx, session.model, messages, { tools: null, temperature: agent?.temperature }, (delta) => { accumulated += delta; ctx.publish(sessionId, { type: 'delta', message_id: assistantMessageId, chat_id: chatId, content: delta, }); scheduleFlush(); }, undefined, signal, ); summaryOk = true; } catch (err) { if (err instanceof Error && err.name === 'AbortError') { summarySoftCancelled = true; } else { summaryError = err instanceof Error ? err.message : String(err); } } finally { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } await flushPromise; } // Finalize the summary message based on the three outcomes. The sentinel // is inserted regardless so the user always has the Continue affordance — // even on a partial / failed summary the chat history shows where the // budget was hit. if (summaryOk && result) { // v1.11.3: see executeToolPhase for the rationale. const mctx = await modelContext.getModelContext(session.model); const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${result.content}, status = 'complete', tokens_used = ${result.completionTokens}, ctx_used = ${result.promptTokens}, ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); } else if (summarySoftCancelled) { await ctx.sql` UPDATE messages SET content = ${accumulated}, status = 'cancelled', finished_at = clock_timestamp() WHERE id = ${assistantMessageId} `; ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, }); } else { const errMeta: MessageMetadata = { kind: 'error', error_reason: 'summary_after_cap_failed', error_text: summaryError ?? 'summary failed', }; await ctx.sql` UPDATE messages SET content = ${accumulated}, status = 'failed', finished_at = clock_timestamp(), metadata = ${ctx.sql.json(errMeta as never)} WHERE id = ${assistantMessageId} `; ctx.publish(sessionId, { type: 'error', message_id: assistantMessageId, chat_id: chatId, error: summaryError ?? 'summary failed', reason: 'summary_after_cap_failed', }); } // Bump session/chat updated_at exactly once for this turn. const [sessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: sessRow!.project_id, name: sessRow!.name, updated_at: sessRow!.updated_at, }); await insertCapHitSentinel(ctx, sessionId, chatId, agent, budget); // Status frame fires last so the dot color reflects the terminal state. // Success → idle, abort → idle (user-driven stop), error → error+reason. if (summaryOk) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); } else if (summarySoftCancelled) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); } else { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'error', at: new Date().toISOString(), reason: 'summary_after_cap_failed', }); } ctx.log.info( { sessionId, chatId, assistantMessageId, budget, summaryOk, summaryCancelled: summarySoftCancelled }, 'inference cap-hit summary finished', ); } async function insertCapHitSentinel( ctx: InferenceContext, sessionId: string, chatId: string, agent: Agent | null, budget: number, ): Promise { // Hard ceiling: count prior cap_hit sentinels in this chat. After two // continues (sentinel count of 2), the next sentinel reports can_continue // false and the UI disables the Continue button. const priorRows = await ctx.sql<{ count: number }[]>` SELECT COUNT(*)::int AS count FROM messages WHERE chat_id = ${chatId} AND role = 'system' AND metadata->>'kind' = 'cap_hit' `; const priorCount = priorRows[0]?.count ?? 0; const canContinue = priorCount < 2; const metadata: MessageMetadata = { kind: 'cap_hit', used: budget, limit: budget, agent_name: agent?.name ?? null, can_continue: canContinue, }; const content = `Reached tool budget (${budget}/${budget}). Continue to extend.`; const [row] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at, metadata) VALUES (${sessionId}, ${chatId}, 'system', ${content}, 'complete', clock_timestamp(), ${ctx.sql.json(metadata as never)}) RETURNING id `; // The sentinel content is static, but we still walk the standard frame // sequence (started → delta → complete) so useSessionStream's reducer // appends it via the same path it uses for streaming assistant messages. // The delta carries the full text in one chunk. ctx.publish(sessionId, { type: 'message_started', message_id: row!.id, chat_id: chatId, role: 'system', }); ctx.publish(sessionId, { type: 'delta', message_id: row!.id, chat_id: chatId, content, }); ctx.publish(sessionId, { type: 'message_complete', message_id: row!.id, chat_id: chatId, metadata, }); } // v1.11.6: doom-loop wrap-up. Mirrors runCapHitSummary structurally — same // in-flight-slot reuse, same tools-disabled streaming-summary call, same // post-finalize sentinel insert + chat_status drop. Differences: // - synthetic note text comes from DOOM_LOOP_NOTE (names the looping tool) // - sentinel metadata is { kind: 'doom_loop', tool_name, args, threshold } // and has no Continue affordance (manual retry would just re-loop) // - chat_status error path uses reason: 'doom_loop_summary_failed' // Kept as a clone rather than refactored into a shared helper because the // two summary paths still differ in error reason + sentinel shape; a third // sentinel would justify factoring out runWrapUpSummary(opts). export async function runDoomLoopSummary( ctx: InferenceContext, args: TurnArgs, session: Session, project: Project, history: Message[], agent: Agent | null, loop: { name: string; args: Record }, ): Promise { const { sessionId, chatId, assistantMessageId, signal } = args; const messages = await buildMessagesPayload(session, project, history, agent, ctx.log); messages.push({ role: 'system', content: DOOM_LOOP_NOTE(loop.name) }); const startedRow = await ctx.sql<{ started_at: string }[]>` UPDATE messages SET started_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING started_at `; const startedAt = startedRow[0]?.started_at ?? null; ctx.publish(sessionId, { type: 'message_started', message_id: assistantMessageId, chat_id: chatId, role: 'assistant', }); let accumulated = ''; let pendingFlushTimer: NodeJS.Timeout | null = null; let flushPromise: Promise = Promise.resolve(); const flushNow = () => { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } const snapshot = accumulated; flushPromise = flushPromise.then(() => ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}` ); }; const scheduleFlush = () => { if (pendingFlushTimer) return; pendingFlushTimer = setTimeout(() => { pendingFlushTimer = null; flushNow(); }, DB_FLUSH_INTERVAL_MS); }; let summaryOk = false; let summarySoftCancelled = false; let summaryError: string | null = null; let result: StreamResult | null = null; try { result = await streamCompletion( ctx, session.model, messages, { tools: null, temperature: agent?.temperature }, (delta) => { accumulated += delta; ctx.publish(sessionId, { type: 'delta', message_id: assistantMessageId, chat_id: chatId, content: delta, }); scheduleFlush(); }, undefined, signal, ); summaryOk = true; } catch (err) { if (err instanceof Error && err.name === 'AbortError') { summarySoftCancelled = true; } else { summaryError = err instanceof Error ? err.message : String(err); } } finally { if (pendingFlushTimer) { clearTimeout(pendingFlushTimer); pendingFlushTimer = null; } await flushPromise; } if (summaryOk && result) { const mctx = await modelContext.getModelContext(session.model); const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${result.content}, status = 'complete', tokens_used = ${result.completionTokens}, ctx_used = ${result.promptTokens}, ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); } else if (summarySoftCancelled) { await ctx.sql` UPDATE messages SET content = ${accumulated}, status = 'cancelled', finished_at = clock_timestamp() WHERE id = ${assistantMessageId} `; ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, }); } else { // Doom-loop summary failure reuses the existing summary_after_cap_failed // error reason — the ErrorReason union is shared between sentinel paths // and the UI surfaces a generic "summary failed" line for both. We don't // add a new reason code because the user-visible failure mode is the // same (model gave up mid-summary). Sentinel below still fires. const errMeta: MessageMetadata = { kind: 'error', error_reason: 'summary_after_cap_failed', error_text: summaryError ?? 'doom-loop summary failed', }; await ctx.sql` UPDATE messages SET content = ${accumulated}, status = 'failed', finished_at = clock_timestamp(), metadata = ${ctx.sql.json(errMeta as never)} WHERE id = ${assistantMessageId} `; ctx.publish(sessionId, { type: 'error', message_id: assistantMessageId, chat_id: chatId, error: summaryError ?? 'doom-loop summary failed', reason: 'summary_after_cap_failed', }); } const [sessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: sessRow!.project_id, name: sessRow!.name, updated_at: sessRow!.updated_at, }); await insertDoomLoopSentinel(ctx, sessionId, chatId, loop); if (summaryOk || summarySoftCancelled) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); } else { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'error', at: new Date().toISOString(), reason: 'summary_after_cap_failed', }); } ctx.log.info( { sessionId, chatId, assistantMessageId, loopedTool: loop.name, summaryOk, summaryCancelled: summarySoftCancelled }, 'inference doom-loop summary finished', ); } async function insertDoomLoopSentinel( ctx: InferenceContext, sessionId: string, chatId: string, loop: { name: string; args: Record }, ): Promise { // No hard-ceiling / can-continue logic here — doom-loop is a different // failure mode from cap-hit. Continuing would re-trigger the loop with // the same tools available; the user needs to restate their question // or switch agents instead. const metadata: MessageMetadata = { kind: 'doom_loop', tool_name: loop.name, args: loop.args, threshold: DOOM_LOOP_THRESHOLD, }; const content = `Detected ${DOOM_LOOP_THRESHOLD} identical calls to ${loop.name}. Stopping the tool-call loop. Produce the best answer you can with what you have.`; const [row] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at, metadata) VALUES (${sessionId}, ${chatId}, 'system', ${content}, 'complete', clock_timestamp(), ${ctx.sql.json(metadata as never)}) RETURNING id `; // Standard frame sequence — same as cap-hit sentinel — so // useSessionStream's reducer appends the row via the existing path. ctx.publish(sessionId, { type: 'message_started', message_id: row!.id, chat_id: chatId, role: 'system', }); ctx.publish(sessionId, { type: 'delta', message_id: row!.id, chat_id: chatId, content, }); ctx.publish(sessionId, { type: 'message_complete', message_id: row!.id, chat_id: chatId, metadata, }); }