Checkpoint of in-progress backend work present in the tree, not authored this session: auto_name, inference tool-phase/turn, secret_guard, provider-registry, plus a new agent-allowlist test (7 tests, passing). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
461 lines
18 KiB
TypeScript
461 lines
18 KiB
TypeScript
import type { FastifyBaseLogger } from 'fastify';
|
|
import type { Sql } from '../../db.js';
|
|
import type { Config } from '../../config.js';
|
|
import type {
|
|
Agent,
|
|
ErrorReason,
|
|
Message,
|
|
MessageMetadata,
|
|
Project,
|
|
Session,
|
|
ToolCall,
|
|
UserStreamFrame,
|
|
} from '../../types/api.js';
|
|
import { ALL_TOOLS } from '../tools.js';
|
|
import { resolveProjectRoot } from '../path_guard.js';
|
|
import { maybeAutoNameChat } from '../auto_name.js';
|
|
import { rewriteSearchQuery } from '../task-search-rewrite.js';
|
|
import { getAgentById } from '../agents.js';
|
|
import * as compaction from '../compaction.js';
|
|
import type { Broker } from '../broker.js';
|
|
import { resolveToolBudget } from './budget.js';
|
|
import {
|
|
detectDoomLoop,
|
|
} from './sentinels.js';
|
|
import {
|
|
buildMessagesPayload,
|
|
loadContext,
|
|
} from './payload.js';
|
|
import {
|
|
finalizeCompletion,
|
|
handleAbortOrError,
|
|
} from './error-handler.js';
|
|
import {
|
|
executeStreamPhase,
|
|
} from './stream-phase.js';
|
|
import { executeToolPhase, type ToolPhaseResult } from './tool-phase.js';
|
|
import type { StreamPhaseState } from './types.js';
|
|
import {
|
|
runCapHitSummary,
|
|
runDoomLoopSummary,
|
|
runStepCapSummary,
|
|
} from './sentinel-summaries.js';
|
|
|
|
// v1.14.0: hard ceiling on the number of stream-and-tool iterations per
|
|
// user-message turn. Per-agent cap via agent.steps is the primary knob;
|
|
// MAX_STEPS is the safety ceiling. 200 is 4x the effective budget ceiling
|
|
// (50 tool calls) — in practice budget fires first unless the model makes
|
|
// many 0-tool-call iterations (which exit the loop via the non-tool finish
|
|
// path anyway).
|
|
export const MAX_STEPS = 200;
|
|
|
|
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
|
// importing from services/inference.js as the public surface.
|
|
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js';
|
|
export { buildMessagesPayload } from './payload.js';
|
|
|
|
export interface InferenceFrame {
|
|
type:
|
|
| 'message_started'
|
|
| 'delta'
|
|
| 'tool_call'
|
|
| 'tool_result'
|
|
| 'message_complete'
|
|
| 'usage'
|
|
| 'messages_deleted'
|
|
| 'session_renamed'
|
|
| 'chat_renamed'
|
|
| 'error';
|
|
message_id?: string;
|
|
message_ids?: string[];
|
|
chat_id?: string;
|
|
tool_message_id?: string;
|
|
tool_call_id?: string;
|
|
// v1.8.2: 'system' added so cap-hit sentinel messages can announce themselves
|
|
// through the normal message_started → delta → message_complete sequence.
|
|
role?: 'assistant' | 'tool' | 'user' | 'system';
|
|
content?: string;
|
|
tool_call?: ToolCall;
|
|
output?: unknown;
|
|
truncated?: boolean;
|
|
error?: string;
|
|
// v1.8.2: structured error reason. Set on `type: 'error'` so the UI can
|
|
// surface a specific message; `error` stays the human-readable text.
|
|
reason?: ErrorReason;
|
|
// v1.8.2: piggybacks on `message_complete` so static or terminally-resolved
|
|
// messages can carry their persisted metadata to the live stream without a
|
|
// refetch (sentinels carry { kind: 'cap_hit', ... }; failed messages carry
|
|
// { kind: 'error', ... }).
|
|
metadata?: MessageMetadata | null;
|
|
tokens_used?: number | null;
|
|
ctx_used?: number | null;
|
|
ctx_max?: number | null;
|
|
completion_tokens?: number | null;
|
|
started_at?: string | null;
|
|
finished_at?: string | null;
|
|
model?: string;
|
|
session_id?: string;
|
|
name?: string;
|
|
}
|
|
|
|
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
|
|
|
export interface InferenceContext {
|
|
sql: Sql;
|
|
config: Config;
|
|
log: FastifyBaseLogger;
|
|
publish: FramePublisher;
|
|
publishUser: (frame: UserStreamFrame) => void;
|
|
// v1.11: passed through so compaction.process can publish 'compacted'
|
|
// frames on the same session WS channel useSessionStream subscribes to.
|
|
// Compaction is the only path that needs the raw broker handle (regular
|
|
// inference goes through `publish`); keeping a separate field avoids
|
|
// tempting other code paths into bypassing the session-id binding.
|
|
broker: Broker;
|
|
}
|
|
|
|
// v1.12.4: payload assembly extracted to ./inference/payload.ts (tests
|
|
// import buildMessagesPayload from this module, so a re-export below
|
|
// preserves the public surface). Stream + tool phases extracted to
|
|
// ./inference/stream-phase.ts and ./inference/tool-phase.ts.
|
|
|
|
export interface StreamResult {
|
|
finishReason: string | null;
|
|
content: string;
|
|
toolCalls: ToolCall[];
|
|
promptTokens: number | null;
|
|
completionTokens: number | null;
|
|
// v1.13.1-C: reasoning text accumulated across reasoning-delta parts.
|
|
// Empty string when the model doesn't emit reasoning (most cases).
|
|
reasoning: string;
|
|
}
|
|
|
|
|
|
export interface TurnArgs {
|
|
sessionId: string;
|
|
chatId: string;
|
|
assistantMessageId: string;
|
|
// v1.8.2: cumulative tool calls executed this run. Compared against the
|
|
// resolved budget at the top of each turn. Replaces the older `depth`
|
|
// counter (which counted iterations, not invocations).
|
|
toolsUsed: number;
|
|
// v1.11.6: ordered tool calls executed in this user-message turn (across
|
|
// recursive runAssistantTurn invocations). Reset to [] at user-message
|
|
// boundaries by runInference, same as toolsUsed. Doom-loop check at the
|
|
// top of runAssistantTurn slices the last DOOM_LOOP_THRESHOLD entries.
|
|
recentToolCalls: ToolCall[];
|
|
signal: AbortSignal | undefined;
|
|
}
|
|
|
|
|
|
export async function runAssistantTurn(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
): Promise<void> {
|
|
const { sessionId, chatId, signal } = args;
|
|
|
|
// v1.14.0: resolve agent once at the top. The agent stays fixed for the
|
|
// duration of this user-message turn — PATCH agent_id mid-conversation
|
|
// takes effect on the next runInference, not mid-loop.
|
|
const initialLoaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (!initialLoaded) {
|
|
ctx.log.warn({ sessionId }, 'inference: session or project missing');
|
|
return;
|
|
}
|
|
const { session, project } = initialLoaded;
|
|
const agent = session.agent_id
|
|
? await getAgentById(project.path, session.agent_id)
|
|
: null;
|
|
const budget = resolveToolBudget(agent);
|
|
|
|
// v1.14.0: effectiveCap = min(agent.steps ?? Infinity, MAX_STEPS).
|
|
// steps: 0 means "no tool calls allowed" — the first stream phase runs
|
|
// but if it emits tool calls they are not executed (finalize as text-only).
|
|
const effectiveCap = Math.min(agent?.steps ?? Infinity, MAX_STEPS);
|
|
|
|
// steps: 0 special case — model responds text-only. The while loop would
|
|
// never enter (effectiveCap === 0), so we handle it explicitly before the
|
|
// loop. The model always gets at least one chance to respond with text.
|
|
if (effectiveCap === 0) {
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (loaded) {
|
|
await runTextOnlyTurn(ctx, args, loaded.session, loaded.project, loaded.history, agent);
|
|
}
|
|
return;
|
|
}
|
|
|
|
let stepNumber = 0;
|
|
let toolsUsed = args.toolsUsed;
|
|
let recentToolCalls = args.recentToolCalls;
|
|
let assistantMessageId = args.assistantMessageId;
|
|
|
|
while (stepNumber < effectiveCap) {
|
|
// ---- doom-loop check (moved from top-of-function) ----
|
|
const loop = detectDoomLoop(recentToolCalls);
|
|
if (loop) {
|
|
// Need fresh history for the summary.
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (loaded) {
|
|
const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, signal };
|
|
await runDoomLoopSummary(ctx, iterArgs, loaded.session, loaded.project, loaded.history, agent, loop);
|
|
}
|
|
break;
|
|
}
|
|
|
|
// ---- budget check (moved from top-of-function) ----
|
|
if (toolsUsed >= budget) {
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (loaded) {
|
|
const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, signal };
|
|
await runCapHitSummary(ctx, iterArgs, loaded.session, loaded.project, loaded.history, agent, budget);
|
|
}
|
|
break;
|
|
}
|
|
|
|
// ---- compaction check ----
|
|
// v1.11: if the prior turn flagged this chat for compaction, run it
|
|
// before loadContext so we read post-compaction history. Swallow
|
|
// failures and proceed with un-compacted history.
|
|
const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>`
|
|
SELECT needs_compaction FROM chats WHERE id = ${chatId}
|
|
`;
|
|
if (chatFlag[0]?.needs_compaction) {
|
|
try {
|
|
await compaction.process({
|
|
sql: ctx.sql,
|
|
config: ctx.config,
|
|
log: ctx.log,
|
|
broker: ctx.broker,
|
|
chatId,
|
|
});
|
|
} catch (err) {
|
|
ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding');
|
|
await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`;
|
|
}
|
|
}
|
|
|
|
// ---- load context (must re-load each iteration — new messages since last step) ----
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (!loaded) {
|
|
ctx.log.warn({ sessionId }, 'inference: session or project missing mid-loop');
|
|
break;
|
|
}
|
|
const { session: iterSession, project: iterProject, history } = loaded;
|
|
const projectRoot = await resolveProjectRoot(iterProject.path);
|
|
|
|
// v1.14.0: log step boundary for instrumentation. step_start parts are in
|
|
// the schema CHECK but not emitted here — writing to the assistant message
|
|
// before the stream phase creates a sequence-0 collision with
|
|
// partsFromAssistantMessage. A WS frame or structured log is sufficient
|
|
// since the frontend doesn't render step boundaries in v1.14.
|
|
ctx.log.info({ sessionId, chatId, step: stepNumber, assistantMessageId }, 'step_start');
|
|
|
|
// ---- build messages + stream phase ----
|
|
const messages = await buildMessagesPayload(iterSession, iterProject, history, agent, ctx.log);
|
|
const webToolsEnabled =
|
|
iterSession.web_search_enabled ?? iterProject.default_web_search_enabled ?? false;
|
|
|
|
if (stepNumber === 0 && webToolsEnabled && messages.length >= 2) {
|
|
const lastUserMsg = [...messages].reverse().find((m) => m.role === 'user');
|
|
if (lastUserMsg?.content) {
|
|
const hint = await rewriteSearchQuery(lastUserMsg.content);
|
|
if (hint && messages[0]?.role === 'system' && messages[0].content) {
|
|
messages[0].content += `\n\nThe user's search intent can be summarized as: "${hint}"`;
|
|
}
|
|
}
|
|
}
|
|
|
|
const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, signal };
|
|
const state: StreamPhaseState = { accumulated: '', startedAt: null };
|
|
let result: StreamResult;
|
|
try {
|
|
result = await executeStreamPhase(ctx, iterArgs, iterSession, messages, state, agent, webToolsEnabled);
|
|
} catch (err) {
|
|
await handleAbortOrError(ctx, iterArgs, state.accumulated, err);
|
|
break;
|
|
}
|
|
|
|
// ---- non-tool finish → finalize and exit ----
|
|
if (result.toolCalls.length === 0) {
|
|
await finalizeCompletion(ctx, iterArgs, result, state.startedAt, iterSession);
|
|
break;
|
|
}
|
|
|
|
// ---- steps: 0 edge case ----
|
|
// effectiveCap check above guarantees we're inside the loop, but this
|
|
// guard handles the theoretical case where the model emits tool calls
|
|
// on step 0 when effectiveCap would have been 0 (impossible since the
|
|
// while condition prevents entry, but kept for safety). If effectiveCap
|
|
// is 1 and we're on step 0, tool calls ARE executed — steps counts
|
|
// iterations, not post-first-stream.
|
|
|
|
// ---- tool phase ----
|
|
let toolPhaseResult: ToolPhaseResult;
|
|
try {
|
|
toolPhaseResult = await executeToolPhase(ctx, iterArgs, result, state.startedAt, iterSession, projectRoot, agent);
|
|
} catch (err) {
|
|
// Tool phase errors are unexpected (individual tool failures are
|
|
// caught inside executeToolPhase). Log and break.
|
|
ctx.log.error({ err, sessionId, chatId, step: stepNumber }, 'tool phase threw unexpectedly');
|
|
break;
|
|
}
|
|
|
|
// ---- update loop locals ----
|
|
toolsUsed += toolPhaseResult.toolCallCount;
|
|
recentToolCalls = [...recentToolCalls, ...toolPhaseResult.toolCalls];
|
|
stepNumber++;
|
|
|
|
if (toolPhaseResult.action !== 'continue') {
|
|
// 'paused' (user input) or 'synthesis_done' — stop the loop.
|
|
break;
|
|
}
|
|
// 'continue' — advance to next assistant message.
|
|
assistantMessageId = toolPhaseResult.nextAssistantId!;
|
|
}
|
|
|
|
// ---- post-loop: step-cap sentinel ----
|
|
// When the loop exits because stepNumber reached effectiveCap, the last
|
|
// iteration's tool phase returned 'continue' with a nextAssistantId that
|
|
// is still in 'streaming' status (unfilled). Use it for the wrap-up.
|
|
if (stepNumber >= effectiveCap && effectiveCap < Infinity) {
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (loaded) {
|
|
const capArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, signal };
|
|
await runStepCapSummary(ctx, capArgs, loaded.session, loaded.project, loaded.history, agent, stepNumber, effectiveCap);
|
|
}
|
|
}
|
|
}
|
|
|
|
// v1.14.0: special handling for steps: 0 — the model responds text-only.
|
|
// The while loop never enters (effectiveCap === 0). We stream once with
|
|
// no tools, finalize, and return. If the model emits tool calls despite
|
|
// not being offered tools, they're ignored (finalize as text-only).
|
|
async function runTextOnlyTurn(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
session: Session,
|
|
project: Project,
|
|
history: Message[],
|
|
agent: Agent | null,
|
|
): Promise<void> {
|
|
const messages = await buildMessagesPayload(session, project, history, agent, ctx.log);
|
|
// Web tools are irrelevant when steps: 0 (no tool execution), but we
|
|
// still need to resolve the flag for executeStreamPhase's signature.
|
|
const webToolsEnabled =
|
|
session.web_search_enabled ?? project.default_web_search_enabled ?? false;
|
|
|
|
const state: StreamPhaseState = { accumulated: '', startedAt: null };
|
|
let result: StreamResult;
|
|
try {
|
|
result = await executeStreamPhase(ctx, args, session, messages, state, agent, webToolsEnabled);
|
|
} catch (err) {
|
|
await handleAbortOrError(ctx, args, state.accumulated, err);
|
|
return;
|
|
}
|
|
|
|
if (result.toolCalls.length > 0) {
|
|
ctx.log.warn(
|
|
{ chatId: args.chatId, toolCallCount: result.toolCalls.length },
|
|
'steps: 0 agent emitted tool calls; ignoring and finalizing as text-only',
|
|
);
|
|
// Override: strip tool calls so finalizeCompletion treats it as text-only.
|
|
result = { ...result, toolCalls: [] };
|
|
}
|
|
|
|
await finalizeCompletion(ctx, args, result, state.startedAt, session);
|
|
}
|
|
|
|
export async function runInference(
|
|
ctx: InferenceContext,
|
|
sessionId: string,
|
|
chatId: string,
|
|
assistantMessageId: string,
|
|
signal?: AbortSignal
|
|
): Promise<void> {
|
|
// v1.8.2: every fresh inference (initial send, regenerate, force_send,
|
|
// continue) starts with a clean budget. Tool-call accumulation across
|
|
// Continue invocations is what the hard ceiling guards against, not the
|
|
// per-call budget.
|
|
// v1.11.6: recentToolCalls also resets — doom-loop detection is scoped
|
|
// to a single user-message turn, so a Continue starts with no history.
|
|
return runAssistantTurn(ctx, {
|
|
sessionId,
|
|
chatId,
|
|
assistantMessageId,
|
|
toolsUsed: 0,
|
|
recentToolCalls: [],
|
|
signal,
|
|
});
|
|
}
|
|
|
|
// v1.8.2: cap-hit summary flow. Called instead of erroring when the loop
|
|
// hits its budget. Reuses the in-flight assistant message slot to stream a
|
|
// short wrap-up reply with the synthetic note prepended and tools disabled,
|
|
// then always inserts a cap_hit sentinel afterward (regardless of summary
|
|
// outcome) so the UI can show a Continue affordance.
|
|
interface InferenceRegistration {
|
|
controller: AbortController;
|
|
completed: Promise<void>;
|
|
}
|
|
|
|
export function createInferenceRunner(
|
|
ctx: Omit<InferenceContext, 'publishUser'>,
|
|
publishUserFn: (user: string, frame: UserStreamFrame) => void
|
|
) {
|
|
const registry = new Map<string, InferenceRegistration>();
|
|
|
|
return {
|
|
enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) {
|
|
const callCtx: InferenceContext = {
|
|
...ctx,
|
|
publishUser: (frame) => publishUserFn(user, frame),
|
|
// v1.11: broker comes in via ctx (set at registration time). Repeated
|
|
// here so the destructure carries it onto the per-call ctx without
|
|
// having to add it to every enqueue/cancel signature individually.
|
|
broker: ctx.broker,
|
|
};
|
|
// v1.8 mobile-tabs: announce working before the async loop starts so
|
|
// every device subscribed to the user channel sees the amber dot.
|
|
callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'streaming', at: new Date().toISOString() });
|
|
const controller = new AbortController();
|
|
let resolveCompleted!: () => void;
|
|
const completed = new Promise<void>((res) => { resolveCompleted = res; });
|
|
const registration: InferenceRegistration = { controller, completed };
|
|
registry.set(chatId, registration);
|
|
void (async () => {
|
|
try {
|
|
await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal);
|
|
setImmediate(() => {
|
|
void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => {
|
|
callCtx.log.warn({ err, chatId }, 'auto-name failed');
|
|
});
|
|
});
|
|
} catch (err) {
|
|
callCtx.log.error({ err }, 'unhandled inference error');
|
|
} finally {
|
|
resolveCompleted();
|
|
// Only clear our own registration; a force-send may have replaced it.
|
|
if (registry.get(chatId) === registration) {
|
|
registry.delete(chatId);
|
|
}
|
|
}
|
|
})();
|
|
},
|
|
|
|
async cancel(_sessionId: string, chatId: string): Promise<boolean> {
|
|
const reg = registry.get(chatId);
|
|
if (!reg) return false;
|
|
reg.controller.abort();
|
|
// Swallow — we just need to wait for the catch/finally to persist state.
|
|
await reg.completed.catch(() => {});
|
|
return true;
|
|
},
|
|
|
|
hasActive(chatId: string): boolean {
|
|
return registry.has(chatId);
|
|
},
|
|
};
|
|
}
|
|
|
|
export const _toolNames = ALL_TOOLS.map((t) => t.name);
|