Phase 1: Trace System + Observability - tool_traces DB table + insert/update service - tool_trace_start/tool_trace_finish WS frames (contracts + FE types) - Instrumented tool-phase.ts with timing around every tool call - GET /api/chats/:id/traces paginated endpoint - Trace viewer frontend (collapsible panel with timing bars + token breakdown) Phase 2: Session Persistence + Resume - agent_snapshots table (UPSERT per chat, persisted on turn boundaries) - save/load/delete service functions - Agent snapshot sent on WS reconnect - Session timeline view (vertical timeline with scroll-to + restore) Tooling: - run_command tool (execFile, 30s timeout, 32KB cap, path-guarded) - Auto-fix loop: after write tools, runs pnpm build, injects errors into next turn
582 lines
23 KiB
TypeScript
582 lines
23 KiB
TypeScript
import type {
|
|
Agent,
|
|
Message,
|
|
Project,
|
|
Session,
|
|
UserStreamFrame,
|
|
} from '../../types/api.js';
|
|
import { resolveProjectRoot } from '../path_guard.js';
|
|
import { maybeAutoNameChat } from '../auto_name.js';
|
|
import { rewriteSearchQuery } from '../task-search-rewrite.js';
|
|
import { getAgentById } from '../agents.js';
|
|
import * as compaction from '../compaction.js';
|
|
import { resolveTurnConfig } from './turn-config.js';
|
|
import { decideStep, decidePostToolAction } from './step-decision.js';
|
|
import {
|
|
freshMistakeState,
|
|
recordStep,
|
|
MISTAKE_RECOVERY_NOTE,
|
|
} from './mistake-tracker.js';
|
|
import {
|
|
buildMessagesPayload,
|
|
loadContext,
|
|
} from './payload.js';
|
|
import { toDcpMessages, transformMessages, fromDcpMessages } from './dcp/index.js';
|
|
import {
|
|
finalizeCompletion,
|
|
finalizeEmpty,
|
|
handleAbortOrError,
|
|
} from './error-handler.js';
|
|
import {
|
|
executeStreamPhase,
|
|
} from './stream-phase.js';
|
|
import { executeToolPhase, type ToolPhaseResult } from './tool-phase.js';
|
|
import type {
|
|
InferenceContext,
|
|
StreamPhaseState,
|
|
StreamResult,
|
|
TurnArgs,
|
|
} from './types.js';
|
|
import { saveAgentSnapshot } from '../session-snapshots.js';
|
|
// vWhale: auto-fix loop — after write tools, build the project and inject
|
|
// errors. Uses execFile (no shell) against the project root.
|
|
import { execFile } from 'node:child_process';
|
|
import { readFileSync, existsSync } from 'node:fs';
|
|
import { join } from 'node:path';
|
|
import {
|
|
runCapHitSummary,
|
|
runDoomLoopSummary,
|
|
runStepCapSummary,
|
|
insertMistakeRecoverySentinel,
|
|
} from './sentinel-summaries.js';
|
|
|
|
// vWhale: auto-fix — detect build command from package.json, run it, return
|
|
// error text for injection into next iteration. Best-effort, never throws.
|
|
const BUILD_TIMEOUT_MS = 60_000;
|
|
const BUILD_OUTPUT_CAP = 8_000;
|
|
|
|
async function detectAndRunBuild(
|
|
ctx: InferenceContext,
|
|
projectRoot: string,
|
|
sessionId: string,
|
|
chatId: string,
|
|
model: string,
|
|
existingNote: string | undefined,
|
|
): Promise<string | undefined> {
|
|
// Only run for DeepSeek models (local Qwen models don't benefit from build loop).
|
|
if (!model.startsWith('deepseek-')) return undefined;
|
|
|
|
// Detect build command from package.json in project root.
|
|
const pkgPath = join(projectRoot, 'package.json');
|
|
if (!existsSync(pkgPath)) return undefined;
|
|
|
|
let buildCmd: string | null = null;
|
|
try {
|
|
const pkg = JSON.parse(readFileSync(pkgPath, 'utf8')) as { scripts?: Record<string, string> };
|
|
if (pkg.scripts?.build) buildCmd = 'build';
|
|
else if (pkg.scripts?.compile) buildCmd = 'compile';
|
|
else if (pkg.scripts?.typecheck) buildCmd = 'typecheck';
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
if (!buildCmd) return undefined;
|
|
|
|
// Detect package manager.
|
|
const hasPnpm = existsSync(join(projectRoot, 'pnpm-lock.yaml'));
|
|
const hasYarn = existsSync(join(projectRoot, 'yarn.lock'));
|
|
const pm = hasPnpm ? 'pnpm' : hasYarn ? 'yarn' : 'npm';
|
|
|
|
// Run the build.
|
|
try {
|
|
const out = await new Promise<string>((resolve, reject) => {
|
|
execFile(pm, ['run', buildCmd!], { cwd: projectRoot, timeout: BUILD_TIMEOUT_MS, maxBuffer: BUILD_OUTPUT_CAP * 2 },
|
|
(err, stdout, stderr) => {
|
|
if (err && (err as NodeJS.ErrnoException).code === 'ENOENT') {
|
|
resolve(''); // package manager not found — skip
|
|
return;
|
|
}
|
|
const merged = (stdout + '\n' + stderr).trim();
|
|
resolve(merged.slice(0, BUILD_OUTPUT_CAP));
|
|
},
|
|
);
|
|
});
|
|
|
|
if (!out) return undefined; // build succeeded or no output
|
|
ctx.log.info({ sessionId, chatId, buildCmd, outputLen: out.length }, 'auto-fix: build failed');
|
|
|
|
// Truncate if existing note exists
|
|
const combined = existingNote
|
|
? existingNote + '\n\n--- Build error ---\n' + out.slice(0, BUILD_OUTPUT_CAP - existingNote.length)
|
|
: '--- Build error ---\n' + out.slice(0, BUILD_OUTPUT_CAP);
|
|
|
|
return combined;
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
// P5: MAX_STEPS moved to ./turn-config.ts (with resolveTurnConfig). Re-exported
|
|
// here so the public surface (index.ts → './turn.js') is unchanged.
|
|
export { MAX_STEPS } from './turn-config.js';
|
|
|
|
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
|
// importing from services/inference.js as the public surface.
|
|
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js';
|
|
export { buildMessagesPayload } from './payload.js';
|
|
|
|
// v1.12.4: payload assembly extracted to ./inference/payload.ts (tests
|
|
// import buildMessagesPayload from this module, so a re-export below
|
|
// preserves the public surface). Stream + tool phases extracted to
|
|
// ./inference/stream-phase.ts and ./inference/tool-phase.ts.
|
|
//
|
|
// P5: the shared pipeline types (InferenceFrame / FramePublisher /
|
|
// InferenceContext / StreamResult / TurnArgs) moved to ./types.js to break the
|
|
// turn.ts type-hub-and-leaf near-cycle. They are re-exported from there via
|
|
// inference/index.ts for the public surface.
|
|
|
|
|
|
export async function runAssistantTurn(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
): Promise<void> {
|
|
const { sessionId, chatId, signal } = args;
|
|
|
|
// v1.14.0: resolve agent once at the top. The agent stays fixed for the
|
|
// duration of this user-message turn — PATCH agent_id mid-conversation
|
|
// takes effect on the next runInference, not mid-loop.
|
|
const initialLoaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (!initialLoaded) {
|
|
ctx.log.warn({ sessionId }, 'inference: session or project missing');
|
|
return;
|
|
}
|
|
const { session, project } = initialLoaded;
|
|
const agent = session.agent_id
|
|
? await getAgentById(project.path, session.agent_id)
|
|
: null;
|
|
// P5: pure per-turn config (budget + cap math + text-only flag).
|
|
const { effectiveCap, budget, isTextOnly } = resolveTurnConfig(agent);
|
|
|
|
// steps: 0 special case — model responds text-only. The while loop would
|
|
// never enter (effectiveCap === 0), so we handle it explicitly before the
|
|
// loop. The model always gets at least one chance to respond with text.
|
|
if (isTextOnly) {
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (loaded) {
|
|
await runTextOnlyTurn(ctx, args, loaded.session, loaded.project, loaded.history, agent);
|
|
}
|
|
return;
|
|
}
|
|
|
|
let stepNumber = 0;
|
|
let toolsUsed = args.toolsUsed;
|
|
let recentToolCalls = args.recentToolCalls;
|
|
let assistantMessageId = args.assistantMessageId;
|
|
// v#12 MistakeTracker: the tracker state is carried on `args` (mutated in
|
|
// place by recordStep). pendingRecoveryNote is a loop-local because it is a
|
|
// single-step transient — set when a nudge fires, consumed (injected into the
|
|
// next payload) and cleared on the following iteration.
|
|
const mistakeTracker = args.mistakeTracker;
|
|
let pendingRecoveryNote: string | undefined = args.pendingRecoveryNote;
|
|
|
|
while (stepNumber < effectiveCap) {
|
|
// ---- top-of-loop gate: doom-loop, then budget (pure decision) ----
|
|
const decision = decideStep({ recentToolCalls, toolsUsed, budget });
|
|
if (decision.kind === 'doom') {
|
|
// Need fresh history for the summary.
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (loaded) {
|
|
const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal };
|
|
await runDoomLoopSummary(ctx, iterArgs, loaded.session, loaded.project, loaded.history, agent, decision.loop);
|
|
}
|
|
break;
|
|
}
|
|
if (decision.kind === 'budget') {
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (loaded) {
|
|
const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal };
|
|
await runCapHitSummary(ctx, iterArgs, loaded.session, loaded.project, loaded.history, agent, budget);
|
|
}
|
|
break;
|
|
}
|
|
// decision.kind === 'stream' → proceed with compaction + stream + tools.
|
|
|
|
// ---- compaction check ----
|
|
// v1.11: if the prior turn flagged this chat for compaction, run it
|
|
// before loadContext so we read post-compaction history. Swallow
|
|
// failures and proceed with un-compacted history.
|
|
const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>`
|
|
SELECT needs_compaction FROM chats WHERE id = ${chatId}
|
|
`;
|
|
if (chatFlag[0]?.needs_compaction) {
|
|
try {
|
|
await compaction.process({
|
|
sql: ctx.sql,
|
|
config: ctx.config,
|
|
log: ctx.log,
|
|
broker: ctx.broker,
|
|
chatId,
|
|
hooks: ctx.hooks,
|
|
});
|
|
} catch (err) {
|
|
ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding');
|
|
await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`;
|
|
}
|
|
}
|
|
|
|
// ---- load context (must re-load each iteration — new messages since last step) ----
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (!loaded) {
|
|
ctx.log.warn({ sessionId }, 'inference: session or project missing mid-loop');
|
|
break;
|
|
}
|
|
let { session: iterSession, project: iterProject, history } = loaded;
|
|
const projectRoot = await resolveProjectRoot(iterProject.path);
|
|
|
|
try {
|
|
const dcpMsgs = toDcpMessages(history);
|
|
const { messages: pruned, stats } = transformMessages(chatId, dcpMsgs);
|
|
if (stats.removedCount > 0) {
|
|
ctx.log.info({ chatId, ...stats }, 'dcp: transform removed messages');
|
|
history = fromDcpMessages(pruned) as typeof history;
|
|
}
|
|
} catch (err) {
|
|
ctx.log.warn({ err: err instanceof Error ? err.message : String(err), chatId }, 'dcp: transform skipped');
|
|
}
|
|
|
|
// v1.14.0: log step boundary for instrumentation. step_start parts are in
|
|
// the schema CHECK but not emitted here — writing to the assistant message
|
|
// before the stream phase creates a sequence-0 collision with
|
|
// partsFromAssistantMessage. A WS frame or structured log is sufficient
|
|
// since the frontend doesn't render step boundaries in v1.14.
|
|
ctx.log.info({ sessionId, chatId, step: stepNumber, assistantMessageId }, 'step_start');
|
|
|
|
// ---- build messages + stream phase ----
|
|
const messages = await buildMessagesPayload(iterSession, iterProject, history, agent, ctx.log);
|
|
const webToolsEnabled =
|
|
iterSession.web_search_enabled ?? iterProject.default_web_search_enabled ?? false;
|
|
|
|
if (stepNumber === 0 && webToolsEnabled && messages.length >= 2) {
|
|
const lastUserMsg = [...messages].reverse().find((m) => m.role === 'user');
|
|
if (lastUserMsg?.content) {
|
|
const hint = await rewriteSearchQuery(lastUserMsg.content);
|
|
if (hint && messages[0]?.role === 'system' && messages[0].content) {
|
|
messages[0].content += `\n\nThe user's search intent can be summarized as: "${hint}"`;
|
|
}
|
|
}
|
|
}
|
|
|
|
// v#12 MistakeTracker: if the prior iteration's nudge fired, append the
|
|
// transient recovery note to THIS payload (consumed exactly once, then
|
|
// cleared). Never persisted — same lifecycle as the cap-hit/doom-loop
|
|
// summary notes, which live only inside the in-memory messages array.
|
|
if (pendingRecoveryNote) {
|
|
messages.push({ role: 'system', content: pendingRecoveryNote });
|
|
pendingRecoveryNote = undefined;
|
|
}
|
|
|
|
const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal };
|
|
const state: StreamPhaseState = { accumulated: '', startedAt: null };
|
|
let result: StreamResult;
|
|
try {
|
|
result = await executeStreamPhase(ctx, iterArgs, iterSession, messages, state, agent, webToolsEnabled);
|
|
} catch (err) {
|
|
await handleAbortOrError(ctx, iterArgs, state.accumulated, err);
|
|
break;
|
|
}
|
|
|
|
// ---- non-tool finish → finalize and exit ----
|
|
if (result.toolCalls.length === 0) {
|
|
// vWhale: Stop hook (best-effort, non-blocking).
|
|
if (ctx.hooks) {
|
|
ctx.hooks.run('Stop', {
|
|
event: 'Stop',
|
|
session_id: sessionId,
|
|
chat_id: chatId,
|
|
last_assistant_text: result.content.slice(0, 500),
|
|
turn: stepNumber,
|
|
}).catch(() => {});
|
|
}
|
|
await finalizeCompletion(ctx, iterArgs, result, state.startedAt, iterSession);
|
|
break;
|
|
}
|
|
|
|
// ---- steps: 0 edge case ----
|
|
// effectiveCap check above guarantees we're inside the loop, but this
|
|
// guard handles the theoretical case where the model emits tool calls
|
|
// on step 0 when effectiveCap would have been 0 (impossible since the
|
|
// while condition prevents entry, but kept for safety). If effectiveCap
|
|
// is 1 and we're on step 0, tool calls ARE executed — steps counts
|
|
// iterations, not post-first-stream.
|
|
|
|
// ---- tool phase ----
|
|
let toolPhaseResult: ToolPhaseResult;
|
|
try {
|
|
toolPhaseResult = await executeToolPhase(ctx, iterArgs, result, state.startedAt, iterSession, projectRoot, agent, stepNumber);
|
|
} catch (err) {
|
|
// Tool phase errors are unexpected (individual tool failures are
|
|
// caught inside executeToolPhase). Log and break.
|
|
ctx.log.error({ err, sessionId, chatId, step: stepNumber }, 'tool phase threw unexpectedly');
|
|
break;
|
|
}
|
|
|
|
// ---- update loop locals ----
|
|
toolsUsed += toolPhaseResult.toolCallCount;
|
|
recentToolCalls = [...recentToolCalls, ...toolPhaseResult.toolCalls];
|
|
stepNumber++;
|
|
|
|
// v#12 MistakeTracker: fold this iteration's tool outcomes into the
|
|
// tracker, in order. recordStep mutates `mistakeTracker` in place (it is
|
|
// the same object referenced by args). A 'success' clears the streak.
|
|
for (const o of toolPhaseResult.outcomes) {
|
|
recordStep(mistakeTracker, o);
|
|
}
|
|
|
|
// vWhale: auto-fix — after write tools, attempt build and inject errors.
|
|
const WRITE_TOOLS = new Set(['edit_file', 'create_file', 'delete_file', 'apply_pending']);
|
|
const hasWriteTools = toolPhaseResult.toolCalls.some((tc) => WRITE_TOOLS.has(tc.name));
|
|
if (hasWriteTools) {
|
|
detectAndRunBuild(ctx, projectRoot, sessionId, chatId, iterSession.model, pendingRecoveryNote)
|
|
.then((buildError) => {
|
|
if (buildError) pendingRecoveryNote = buildError;
|
|
})
|
|
.catch(() => {});
|
|
}
|
|
|
|
// v#12 MistakeTracker: post-tool decision (pure). 'stop' = the tool phase
|
|
// returned a non-'continue' action ('paused' for user input, or
|
|
// 'synthesis_done') — neither a nudge nor an escalate would change the
|
|
// control flow, so the mistake check is skipped. On 'continue' the
|
|
// heterogeneous-failure pattern gates nudge/escalate/continue. Complements
|
|
// the doom-loop gate above, which only catches *identical* repeats.
|
|
const post = decidePostToolAction(toolPhaseResult.action, mistakeTracker);
|
|
if (post === 'stop') {
|
|
break;
|
|
}
|
|
if (post === 'nudge') {
|
|
// Soft intervention: inject model-facing recovery guidance into the NEXT
|
|
// step's payload, drop a UI sentinel, bump nudges, reset the streak, and
|
|
// continue. The note is consumed (and cleared) at the top of the next
|
|
// iteration's payload build.
|
|
pendingRecoveryNote = MISTAKE_RECOVERY_NOTE;
|
|
const failureKinds = [...mistakeTracker.run];
|
|
await insertMistakeRecoverySentinel(ctx, sessionId, chatId, {
|
|
failureKinds,
|
|
count: failureKinds.length,
|
|
escalated: false,
|
|
canContinue: true,
|
|
});
|
|
mistakeTracker.nudges += 1;
|
|
mistakeTracker.run = [];
|
|
ctx.log.info(
|
|
{ sessionId, chatId, step: stepNumber, nudges: mistakeTracker.nudges, failureKinds },
|
|
'mistake_recovery nudge',
|
|
);
|
|
assistantMessageId = toolPhaseResult.nextAssistantId!;
|
|
continue;
|
|
}
|
|
if (post === 'escalate') {
|
|
// The nudge didn't break the failure run — stop the turn (cap-hit-style)
|
|
// to avoid burning the whole step budget on heterogeneous failures. The
|
|
// next assistant row is still 'streaming'; finalize it as an empty
|
|
// complete row so the slot doesn't dangle, then drop the escalate
|
|
// sentinel.
|
|
const failureKinds = [...mistakeTracker.run];
|
|
assistantMessageId = toolPhaseResult.nextAssistantId!;
|
|
const escalateArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal };
|
|
await finalizeEmpty(ctx, escalateArgs);
|
|
await insertMistakeRecoverySentinel(ctx, sessionId, chatId, {
|
|
failureKinds,
|
|
count: failureKinds.length,
|
|
escalated: true,
|
|
canContinue: true,
|
|
});
|
|
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
|
ctx.log.info(
|
|
{ sessionId, chatId, step: stepNumber, failureKinds },
|
|
'mistake_recovery escalate — stopping turn',
|
|
);
|
|
break;
|
|
}
|
|
|
|
// 'continue' — advance to next assistant message.
|
|
assistantMessageId = toolPhaseResult.nextAssistantId!;
|
|
}
|
|
|
|
// vWhale: Stop hook at post-loop exit (best-effort, non-blocking).
|
|
if (ctx.hooks) {
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
const lastAssistant = loaded?.history?.slice().reverse().find(
|
|
(m: import('../../types/api.js').Message) => m.role === 'assistant',
|
|
);
|
|
const content = lastAssistant?.content ?? '';
|
|
ctx.hooks.run('Stop', {
|
|
event: 'Stop',
|
|
session_id: sessionId,
|
|
chat_id: chatId,
|
|
last_assistant_text: content.slice(0, 500),
|
|
turn: stepNumber,
|
|
}).catch(() => {});
|
|
}
|
|
|
|
// ---- persist agent snapshot (best-effort, never blocks inference) ----
|
|
const snapLoaded = await loadContext(ctx.sql, sessionId, chatId).catch(() => null);
|
|
if (snapLoaded) {
|
|
await saveAgentSnapshot(ctx.sql, chatId, {
|
|
session_id: sessionId,
|
|
model: snapLoaded.session.model,
|
|
agent: agent?.name ?? null,
|
|
mode: null,
|
|
turn_number: stepNumber,
|
|
messages: snapLoaded.history.map((m) => ({ role: m.role, content: m.content })),
|
|
}).catch(() => {});
|
|
}
|
|
|
|
// ---- post-loop: step-cap sentinel ----
|
|
// When the loop exits because stepNumber reached effectiveCap, the last
|
|
// iteration's tool phase returned 'continue' with a nextAssistantId that
|
|
// is still in 'streaming' status (unfilled). Use it for the wrap-up.
|
|
if (stepNumber >= effectiveCap && effectiveCap < Infinity) {
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (loaded) {
|
|
const capArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal };
|
|
await runStepCapSummary(ctx, capArgs, loaded.session, loaded.project, loaded.history, agent, stepNumber, effectiveCap);
|
|
}
|
|
}
|
|
}
|
|
|
|
// v1.14.0: special handling for steps: 0 — the model responds text-only.
|
|
// The while loop never enters (effectiveCap === 0). We stream once with
|
|
// no tools, finalize, and return. If the model emits tool calls despite
|
|
// not being offered tools, they're ignored (finalize as text-only).
|
|
async function runTextOnlyTurn(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
session: Session,
|
|
project: Project,
|
|
history: Message[],
|
|
agent: Agent | null,
|
|
): Promise<void> {
|
|
const messages = await buildMessagesPayload(session, project, history, agent, ctx.log);
|
|
// Web tools are irrelevant when steps: 0 (no tool execution), but we
|
|
// still need to resolve the flag for executeStreamPhase's signature.
|
|
const webToolsEnabled =
|
|
session.web_search_enabled ?? project.default_web_search_enabled ?? false;
|
|
|
|
const state: StreamPhaseState = { accumulated: '', startedAt: null };
|
|
let result: StreamResult;
|
|
try {
|
|
result = await executeStreamPhase(ctx, args, session, messages, state, agent, webToolsEnabled);
|
|
} catch (err) {
|
|
await handleAbortOrError(ctx, args, state.accumulated, err);
|
|
return;
|
|
}
|
|
|
|
if (result.toolCalls.length > 0) {
|
|
ctx.log.warn(
|
|
{ chatId: args.chatId, toolCallCount: result.toolCalls.length },
|
|
'steps: 0 agent emitted tool calls; ignoring and finalizing as text-only',
|
|
);
|
|
// Override: strip tool calls so finalizeCompletion treats it as text-only.
|
|
result = { ...result, toolCalls: [] };
|
|
}
|
|
|
|
await finalizeCompletion(ctx, args, result, state.startedAt, session);
|
|
}
|
|
|
|
export async function runInference(
|
|
ctx: InferenceContext,
|
|
sessionId: string,
|
|
chatId: string,
|
|
assistantMessageId: string,
|
|
signal?: AbortSignal
|
|
): Promise<void> {
|
|
// v1.8.2: every fresh inference (initial send, regenerate, force_send,
|
|
// continue) starts with a clean budget. Tool-call accumulation across
|
|
// Continue invocations is what the hard ceiling guards against, not the
|
|
// per-call budget.
|
|
// v1.11.6: recentToolCalls also resets — doom-loop detection is scoped
|
|
// to a single user-message turn, so a Continue starts with no history.
|
|
// v#12 MistakeTracker: fresh per user-message turn, like recentToolCalls.
|
|
// Tracks consecutive heterogeneous tool failures across the loop's
|
|
// stream-and-tool iterations within this turn.
|
|
return runAssistantTurn(ctx, {
|
|
sessionId,
|
|
chatId,
|
|
assistantMessageId,
|
|
toolsUsed: 0,
|
|
recentToolCalls: [],
|
|
mistakeTracker: freshMistakeState(),
|
|
signal,
|
|
});
|
|
}
|
|
|
|
// v1.8.2: cap-hit summary flow. Called instead of erroring when the loop
|
|
// hits its budget. Reuses the in-flight assistant message slot to stream a
|
|
// short wrap-up reply with the synthetic note prepended and tools disabled,
|
|
// then always inserts a cap_hit sentinel afterward (regardless of summary
|
|
// outcome) so the UI can show a Continue affordance.
|
|
interface InferenceRegistration {
|
|
controller: AbortController;
|
|
completed: Promise<void>;
|
|
}
|
|
|
|
export function createInferenceRunner(
|
|
ctx: Omit<InferenceContext, 'publishUser'>,
|
|
publishUserFn: (user: string, frame: UserStreamFrame) => void
|
|
) {
|
|
const registry = new Map<string, InferenceRegistration>();
|
|
|
|
return {
|
|
enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) {
|
|
const callCtx: InferenceContext = {
|
|
...ctx,
|
|
publishUser: (frame) => publishUserFn(user, frame),
|
|
// v1.11: broker comes in via ctx (set at registration time). Repeated
|
|
// here so the destructure carries it onto the per-call ctx without
|
|
// having to add it to every enqueue/cancel signature individually.
|
|
broker: ctx.broker,
|
|
};
|
|
// v1.8 mobile-tabs: announce working before the async loop starts so
|
|
// every device subscribed to the user channel sees the amber dot.
|
|
callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'streaming', at: new Date().toISOString() });
|
|
const controller = new AbortController();
|
|
let resolveCompleted!: () => void;
|
|
const completed = new Promise<void>((res) => { resolveCompleted = res; });
|
|
const registration: InferenceRegistration = { controller, completed };
|
|
registry.set(chatId, registration);
|
|
void (async () => {
|
|
try {
|
|
await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal);
|
|
setImmediate(() => {
|
|
void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => {
|
|
callCtx.log.warn({ err, chatId }, 'auto-name failed');
|
|
});
|
|
});
|
|
} catch (err) {
|
|
callCtx.log.error({ err }, 'unhandled inference error');
|
|
} finally {
|
|
resolveCompleted();
|
|
// Only clear our own registration; a force-send may have replaced it.
|
|
if (registry.get(chatId) === registration) {
|
|
registry.delete(chatId);
|
|
}
|
|
}
|
|
})();
|
|
},
|
|
|
|
async cancel(_sessionId: string, chatId: string): Promise<boolean> {
|
|
const reg = registry.get(chatId);
|
|
if (!reg) return false;
|
|
reg.controller.abort();
|
|
// Swallow — we just need to wait for the catch/finally to persist state.
|
|
await reg.completed.catch(() => {});
|
|
return true;
|
|
},
|
|
|
|
hasActive(chatId: string): boolean {
|
|
return registry.has(chatId);
|
|
},
|
|
};
|
|
}
|
|
|