v1.12.4: complete inference.ts split into services/inference/
- sentinel-summaries.ts: runCapHitSummary, insertCapHitSentinel, runDoomLoopSummary, insertDoomLoopSentinel - inference.ts → inference/turn.ts: residue is runAssistantTurn, runInference, createInferenceRunner orchestration only - inference/index.ts: re-export shim preserves the public surface (createInferenceRunner, runInference, runAssistantTurn, detectDoomLoop, DOOM_LOOP_THRESHOLD, buildMessagesPayload, plus type-side InferenceContext/InferenceFrame/StreamResult/TurnArgs/ FramePublisher) - src/index.ts + auto_name.ts + the two vitest test files updated to import from ./services/inference/index.js explicitly (NodeNext ESM doesn't honor directory-index resolution) Final tally: 11 files under services/inference/, the largest being sentinel-summaries.ts at 523 LoC (two near-clone summary paths kept side-by-side until a third sentinel justifies factoring out a shared runWrapUpSummary). turn.ts is now 326 LoC, the next-largest is stream-phase.ts at 380. Public import surface unchanged. tool-phase.ts → turn.ts back-edge for runAssistantTurn remains (cycle is safe; resolved at call time). Prepares the file structure for v1.13 AI SDK migration — streamText swap targets stream-phase.ts only. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -16,7 +16,7 @@ import { registerWebSocket } from './routes/ws.js';
|
||||
import { registerModelRoutes } from './routes/models.js';
|
||||
import { registerAgentRoutes } from './routes/agents.js';
|
||||
import { registerSkillsRoutes } from './routes/skills.js';
|
||||
import { createInferenceRunner } from './services/inference.js';
|
||||
import { createInferenceRunner } from './services/inference/index.js';
|
||||
import { createBroker } from './services/broker.js';
|
||||
import { listSkills } from './services/skills.js';
|
||||
import * as compaction from './services/compaction.js';
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { DOOM_LOOP_THRESHOLD, detectDoomLoop } from '../inference.js';
|
||||
import { DOOM_LOOP_THRESHOLD, detectDoomLoop } from '../inference/index.js';
|
||||
import type { ToolCall } from '../../types/api.js';
|
||||
|
||||
// ---- fixture ----------------------------------------------------------------
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { buildMessagesPayload } from '../inference.js';
|
||||
import { buildMessagesPayload } from '../inference/index.js';
|
||||
import type {
|
||||
Message,
|
||||
MessageRole,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { InferenceContext } from './inference.js';
|
||||
import type { InferenceContext } from './inference/index.js';
|
||||
|
||||
const NAMING_SYSTEM_PROMPT =
|
||||
'You name chat sessions. Reply directly with no thinking, reasoning, or explanation. Output ONLY the title, 4 words max, no quotes, no punctuation, no prefix like "Title:".';
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import type { MessageMetadata, Session } from '../../types/api.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import { maybeFlagForCompaction } from './payload.js';
|
||||
import type { InferenceContext, StreamResult, TurnArgs } from '../inference.js';
|
||||
import type { InferenceContext, StreamResult, TurnArgs } from './turn.js';
|
||||
|
||||
export async function handleAbortOrError(
|
||||
ctx: InferenceContext,
|
||||
|
||||
20
apps/server/src/services/inference/index.ts
Normal file
20
apps/server/src/services/inference/index.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
// v1.12.4: re-export shim. Outside callers (apps/server/src/index.ts and the
|
||||
// vitest inference tests) import from './services/inference/index.js'. The
|
||||
// directory is now the public surface; turn.ts holds runAssistantTurn /
|
||||
// runInference / createInferenceRunner while the other inference/*.ts files
|
||||
// stay implementation-private.
|
||||
|
||||
export {
|
||||
createInferenceRunner,
|
||||
runAssistantTurn,
|
||||
runInference,
|
||||
} from './turn.js';
|
||||
export type {
|
||||
FramePublisher,
|
||||
InferenceContext,
|
||||
InferenceFrame,
|
||||
StreamResult,
|
||||
TurnArgs,
|
||||
} from './turn.js';
|
||||
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js';
|
||||
export { buildMessagesPayload } from './payload.js';
|
||||
@@ -8,7 +8,7 @@ import type {
|
||||
import * as compaction from '../compaction.js';
|
||||
import { buildSystemPrompt } from '../system-prompt.js';
|
||||
import { isAnySentinel } from './sentinels.js';
|
||||
import type { InferenceContext } from '../inference.js';
|
||||
import type { InferenceContext } from './turn.js';
|
||||
|
||||
export interface OpenAiMessage {
|
||||
role: 'system' | 'user' | 'assistant' | 'tool';
|
||||
|
||||
@@ -1,47 +1,20 @@
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { Sql } from '../db.js';
|
||||
import type { Config } from '../config.js';
|
||||
import type {
|
||||
Agent,
|
||||
ErrorReason,
|
||||
Message,
|
||||
MessageMetadata,
|
||||
Project,
|
||||
Session,
|
||||
ToolCall,
|
||||
UserStreamFrame,
|
||||
} from '../types/api.js';
|
||||
import { ALL_TOOLS } from './tools.js';
|
||||
import { resolveProjectRoot } from './path_guard.js';
|
||||
import { maybeAutoNameChat } from './auto_name.js';
|
||||
import { getAgentById } from './agents.js';
|
||||
import * as compaction from './compaction.js';
|
||||
import * as modelContext from './model-context.js';
|
||||
import type { Broker } from './broker.js';
|
||||
import { resolveToolBudget } from './inference/budget.js';
|
||||
import {
|
||||
DOOM_LOOP_THRESHOLD,
|
||||
detectDoomLoop,
|
||||
} from './inference/sentinels.js';
|
||||
import {
|
||||
buildMessagesPayload,
|
||||
loadContext,
|
||||
} from './inference/payload.js';
|
||||
import {
|
||||
finalizeCompletion,
|
||||
handleAbortOrError,
|
||||
} from './inference/error-handler.js';
|
||||
import {
|
||||
executeStreamPhase,
|
||||
streamCompletion,
|
||||
} from './inference/stream-phase.js';
|
||||
import { executeToolPhase } from './inference/tool-phase.js';
|
||||
import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './inference/types.js';
|
||||
|
||||
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
||||
// importing from services/inference.js as the public surface.
|
||||
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './inference/sentinels.js';
|
||||
export { buildMessagesPayload } from './inference/payload.js';
|
||||
} from '../../types/api.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import { buildMessagesPayload } from './payload.js';
|
||||
import { DOOM_LOOP_THRESHOLD } from './sentinels.js';
|
||||
import { streamCompletion } from './stream-phase.js';
|
||||
import { DB_FLUSH_INTERVAL_MS } from './types.js';
|
||||
import type {
|
||||
InferenceContext,
|
||||
StreamResult,
|
||||
TurnArgs,
|
||||
} from './turn.js';
|
||||
|
||||
// Synthetic system note appended to the cap-hit summary call. Verbatim from
|
||||
// the v1.8.2 spec — do not paraphrase: the model is more reliable when the
|
||||
@@ -52,219 +25,7 @@ const CAP_HIT_SUMMARY_NOTE = (limit: number) =>
|
||||
const DOOM_LOOP_NOTE = (name: string) =>
|
||||
`You called ${name} with the same arguments ${DOOM_LOOP_THRESHOLD} times in a row. Stop calling it. Produce the best answer you can with what you have.`;
|
||||
|
||||
export interface InferenceFrame {
|
||||
type:
|
||||
| 'message_started'
|
||||
| 'delta'
|
||||
| 'tool_call'
|
||||
| 'tool_result'
|
||||
| 'message_complete'
|
||||
| 'usage'
|
||||
| 'messages_deleted'
|
||||
| 'session_renamed'
|
||||
| 'chat_renamed'
|
||||
| 'error';
|
||||
message_id?: string;
|
||||
message_ids?: string[];
|
||||
chat_id?: string;
|
||||
tool_message_id?: string;
|
||||
tool_call_id?: string;
|
||||
// v1.8.2: 'system' added so cap-hit sentinel messages can announce themselves
|
||||
// through the normal message_started → delta → message_complete sequence.
|
||||
role?: 'assistant' | 'tool' | 'user' | 'system';
|
||||
content?: string;
|
||||
tool_call?: ToolCall;
|
||||
output?: unknown;
|
||||
truncated?: boolean;
|
||||
error?: string;
|
||||
// v1.8.2: structured error reason. Set on `type: 'error'` so the UI can
|
||||
// surface a specific message; `error` stays the human-readable text.
|
||||
reason?: ErrorReason;
|
||||
// v1.8.2: piggybacks on `message_complete` so static or terminally-resolved
|
||||
// messages can carry their persisted metadata to the live stream without a
|
||||
// refetch (sentinels carry { kind: 'cap_hit', ... }; failed messages carry
|
||||
// { kind: 'error', ... }).
|
||||
metadata?: MessageMetadata | null;
|
||||
tokens_used?: number | null;
|
||||
ctx_used?: number | null;
|
||||
ctx_max?: number | null;
|
||||
completion_tokens?: number | null;
|
||||
started_at?: string | null;
|
||||
finished_at?: string | null;
|
||||
model?: string;
|
||||
session_id?: string;
|
||||
name?: string;
|
||||
}
|
||||
|
||||
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
||||
|
||||
export interface InferenceContext {
|
||||
sql: Sql;
|
||||
config: Config;
|
||||
log: FastifyBaseLogger;
|
||||
publish: FramePublisher;
|
||||
publishUser: (frame: UserStreamFrame) => void;
|
||||
// v1.11: passed through so compaction.process can publish 'compacted'
|
||||
// frames on the same session WS channel useSessionStream subscribes to.
|
||||
// Compaction is the only path that needs the raw broker handle (regular
|
||||
// inference goes through `publish`); keeping a separate field avoids
|
||||
// tempting other code paths into bypassing the session-id binding.
|
||||
broker: Broker;
|
||||
}
|
||||
|
||||
// v1.12.4: payload assembly extracted to ./inference/payload.ts (tests
|
||||
// import buildMessagesPayload from this module, so a re-export below
|
||||
// preserves the public surface). Stream + tool phases extracted to
|
||||
// ./inference/stream-phase.ts and ./inference/tool-phase.ts.
|
||||
|
||||
export interface StreamResult {
|
||||
finishReason: string | null;
|
||||
content: string;
|
||||
toolCalls: ToolCall[];
|
||||
promptTokens: number | null;
|
||||
completionTokens: number | null;
|
||||
}
|
||||
|
||||
|
||||
export interface TurnArgs {
|
||||
sessionId: string;
|
||||
chatId: string;
|
||||
assistantMessageId: string;
|
||||
// v1.8.2: cumulative tool calls executed this run. Compared against the
|
||||
// resolved budget at the top of each turn. Replaces the older `depth`
|
||||
// counter (which counted iterations, not invocations).
|
||||
toolsUsed: number;
|
||||
// v1.11.6: ordered tool calls executed in this user-message turn (across
|
||||
// recursive runAssistantTurn invocations). Reset to [] at user-message
|
||||
// boundaries by runInference, same as toolsUsed. Doom-loop check at the
|
||||
// top of runAssistantTurn slices the last DOOM_LOOP_THRESHOLD entries.
|
||||
recentToolCalls: ToolCall[];
|
||||
signal: AbortSignal | undefined;
|
||||
}
|
||||
|
||||
|
||||
export async function runAssistantTurn(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
): Promise<void> {
|
||||
const { sessionId, chatId } = args;
|
||||
|
||||
// v1.11: if the prior turn flagged this chat for compaction, run it first
|
||||
// so loadContext below reads the post-compaction history. We swallow
|
||||
// compaction failures (clearing the flag so we don't loop) and proceed
|
||||
// with the un-compacted history — a slow turn that hits the model's
|
||||
// hard limit is recoverable; a dead session is not.
|
||||
const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>`
|
||||
SELECT needs_compaction FROM chats WHERE id = ${chatId}
|
||||
`;
|
||||
if (chatFlag[0]?.needs_compaction) {
|
||||
try {
|
||||
await compaction.process({
|
||||
sql: ctx.sql,
|
||||
config: ctx.config,
|
||||
log: ctx.log,
|
||||
broker: ctx.broker,
|
||||
chatId,
|
||||
});
|
||||
} catch (err) {
|
||||
ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding');
|
||||
await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`;
|
||||
}
|
||||
}
|
||||
|
||||
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
||||
if (!loaded) {
|
||||
ctx.log.warn({ sessionId }, 'inference: session or project missing');
|
||||
return;
|
||||
}
|
||||
const { session, project, history } = loaded;
|
||||
const projectRoot = await resolveProjectRoot(project.path);
|
||||
// Agent resolution is per-turn so PATCH agent_id mid-conversation takes
|
||||
// effect on the next message. Unknown agent_id returns null silently —
|
||||
// session falls back to base prompt + all tools + default temperature.
|
||||
const agent = session.agent_id
|
||||
? await getAgentById(project.path, session.agent_id)
|
||||
: null;
|
||||
|
||||
// v1.8.2: cap-hit replaces the older "tool loop depth exceeded" failure.
|
||||
// When we've already burned the budget *before* this turn even runs, we
|
||||
// skip straight to the summary flow — the in-flight assistant message slot
|
||||
// gets reused for the wrap-up reply instead of being marked failed.
|
||||
const budget = resolveToolBudget(agent);
|
||||
if (args.toolsUsed >= budget) {
|
||||
await runCapHitSummary(ctx, args, session, project, history, agent, budget);
|
||||
return;
|
||||
}
|
||||
|
||||
// v1.11.6: doom-loop guard. Detected BEFORE the budget cap (the model can
|
||||
// burn through 3 identical calls long before the 15-call budget fires).
|
||||
// Same in-flight-slot-reuse pattern as runCapHitSummary — wrap-up reply
|
||||
// lands in args.assistantMessageId, then a doom_loop sentinel is inserted
|
||||
// to make the abort visible in the chat history.
|
||||
const loop = detectDoomLoop(args.recentToolCalls);
|
||||
if (loop) {
|
||||
await runDoomLoopSummary(ctx, args, session, project, history, agent, loop);
|
||||
return;
|
||||
}
|
||||
|
||||
const messages = await buildMessagesPayload(session, project, history, agent);
|
||||
|
||||
// v1.11.8: resolve per-chat web-tools opt-in. Tri-state on the wire:
|
||||
// - session.web_search_enabled = null → inherit project default
|
||||
// - session.web_search_enabled = true/false → explicit
|
||||
// Both web_search and web_fetch are gated by this single flag (the UI
|
||||
// label is "Enable web search and fetch" — same store, both tools).
|
||||
// Default is false unless explicitly opted in, matching the v1.9
|
||||
// plumbing intent ("inert until Batch 8 ships the actual tools").
|
||||
const webToolsEnabled =
|
||||
session.web_search_enabled ?? project.default_web_search_enabled ?? false;
|
||||
|
||||
const state: StreamPhaseState = { accumulated: '', startedAt: null };
|
||||
let result: StreamResult;
|
||||
try {
|
||||
result = await executeStreamPhase(ctx, args, session, messages, state, agent, webToolsEnabled);
|
||||
} catch (err) {
|
||||
await handleAbortOrError(ctx, args, state.accumulated, err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.toolCalls.length > 0) {
|
||||
await executeToolPhase(ctx, args, result, state.startedAt, session, projectRoot);
|
||||
return;
|
||||
}
|
||||
|
||||
await finalizeCompletion(ctx, args, result, state.startedAt, session);
|
||||
}
|
||||
|
||||
export async function runInference(
|
||||
ctx: InferenceContext,
|
||||
sessionId: string,
|
||||
chatId: string,
|
||||
assistantMessageId: string,
|
||||
signal?: AbortSignal
|
||||
): Promise<void> {
|
||||
// v1.8.2: every fresh inference (initial send, regenerate, force_send,
|
||||
// continue) starts with a clean budget. Tool-call accumulation across
|
||||
// Continue invocations is what the hard ceiling guards against, not the
|
||||
// per-call budget.
|
||||
// v1.11.6: recentToolCalls also resets — doom-loop detection is scoped
|
||||
// to a single user-message turn, so a Continue starts with no history.
|
||||
return runAssistantTurn(ctx, {
|
||||
sessionId,
|
||||
chatId,
|
||||
assistantMessageId,
|
||||
toolsUsed: 0,
|
||||
recentToolCalls: [],
|
||||
signal,
|
||||
});
|
||||
}
|
||||
|
||||
// v1.8.2: cap-hit summary flow. Called instead of erroring when the loop
|
||||
// hits its budget. Reuses the in-flight assistant message slot to stream a
|
||||
// short wrap-up reply with the synthetic note prepended and tools disabled,
|
||||
// then always inserts a cap_hit sentinel afterward (regardless of summary
|
||||
// outcome) so the UI can show a Continue affordance.
|
||||
async function runCapHitSummary(
|
||||
export async function runCapHitSummary(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
session: Session,
|
||||
@@ -526,7 +287,7 @@ async function insertCapHitSentinel(
|
||||
// Kept as a clone rather than refactored into a shared helper because the
|
||||
// two summary paths still differ in error reason + sentinel shape; a third
|
||||
// sentinel would justify factoring out runWrapUpSummary(opts).
|
||||
async function runDoomLoopSummary(
|
||||
export async function runDoomLoopSummary(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
session: Session,
|
||||
@@ -760,69 +521,3 @@ async function insertDoomLoopSentinel(
|
||||
metadata,
|
||||
});
|
||||
}
|
||||
|
||||
interface InferenceRegistration {
|
||||
controller: AbortController;
|
||||
completed: Promise<void>;
|
||||
}
|
||||
|
||||
export function createInferenceRunner(
|
||||
ctx: Omit<InferenceContext, 'publishUser'>,
|
||||
publishUserFn: (user: string, frame: UserStreamFrame) => void
|
||||
) {
|
||||
const registry = new Map<string, InferenceRegistration>();
|
||||
|
||||
return {
|
||||
enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) {
|
||||
const callCtx: InferenceContext = {
|
||||
...ctx,
|
||||
publishUser: (frame) => publishUserFn(user, frame),
|
||||
// v1.11: broker comes in via ctx (set at registration time). Repeated
|
||||
// here so the destructure carries it onto the per-call ctx without
|
||||
// having to add it to every enqueue/cancel signature individually.
|
||||
broker: ctx.broker,
|
||||
};
|
||||
// v1.8 mobile-tabs: announce working before the async loop starts so
|
||||
// every device subscribed to the user channel sees the amber dot.
|
||||
callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'streaming', at: new Date().toISOString() });
|
||||
const controller = new AbortController();
|
||||
let resolveCompleted!: () => void;
|
||||
const completed = new Promise<void>((res) => { resolveCompleted = res; });
|
||||
const registration: InferenceRegistration = { controller, completed };
|
||||
registry.set(chatId, registration);
|
||||
void (async () => {
|
||||
try {
|
||||
await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal);
|
||||
setImmediate(() => {
|
||||
void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => {
|
||||
callCtx.log.warn({ err, chatId }, 'auto-name failed');
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
callCtx.log.error({ err }, 'unhandled inference error');
|
||||
} finally {
|
||||
resolveCompleted();
|
||||
// Only clear our own registration; a force-send may have replaced it.
|
||||
if (registry.get(chatId) === registration) {
|
||||
registry.delete(chatId);
|
||||
}
|
||||
}
|
||||
})();
|
||||
},
|
||||
|
||||
async cancel(_sessionId: string, chatId: string): Promise<boolean> {
|
||||
const reg = registry.get(chatId);
|
||||
if (!reg) return false;
|
||||
reg.controller.abort();
|
||||
// Swallow — we just need to wait for the catch/finally to persist state.
|
||||
await reg.completed.catch(() => {});
|
||||
return true;
|
||||
},
|
||||
|
||||
hasActive(chatId: string): boolean {
|
||||
return registry.has(chatId);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export const _toolNames = ALL_TOOLS.map((t) => t.name);
|
||||
@@ -17,7 +17,7 @@ import type {
|
||||
InferenceContext,
|
||||
StreamResult,
|
||||
TurnArgs,
|
||||
} from '../inference.js';
|
||||
} from './turn.js';
|
||||
|
||||
interface ChatCompletionDelta {
|
||||
role?: string;
|
||||
|
||||
@@ -7,12 +7,12 @@ import type {
|
||||
InferenceContext,
|
||||
StreamResult,
|
||||
TurnArgs,
|
||||
} from '../inference.js';
|
||||
} from './turn.js';
|
||||
// v1.12.4: ESM value-import cycle. executeToolPhase recurses into
|
||||
// runAssistantTurn which lives in inference.ts. The cycle is safe because
|
||||
// the reference is read at call time (inside an async function body), not
|
||||
// at module top-level. Node + tsc resolve this cleanly.
|
||||
import { runAssistantTurn } from '../inference.js';
|
||||
import { runAssistantTurn } from './turn.js';
|
||||
|
||||
async function executeToolCall(
|
||||
projectRoot: string,
|
||||
|
||||
326
apps/server/src/services/inference/turn.ts
Normal file
326
apps/server/src/services/inference/turn.ts
Normal file
@@ -0,0 +1,326 @@
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { Sql } from '../../db.js';
|
||||
import type { Config } from '../../config.js';
|
||||
import type {
|
||||
Agent,
|
||||
ErrorReason,
|
||||
Message,
|
||||
MessageMetadata,
|
||||
Project,
|
||||
Session,
|
||||
ToolCall,
|
||||
UserStreamFrame,
|
||||
} from '../../types/api.js';
|
||||
import { ALL_TOOLS } from '../tools.js';
|
||||
import { resolveProjectRoot } from '../path_guard.js';
|
||||
import { maybeAutoNameChat } from '../auto_name.js';
|
||||
import { getAgentById } from '../agents.js';
|
||||
import * as compaction from '../compaction.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import type { Broker } from '../broker.js';
|
||||
import { resolveToolBudget } from './budget.js';
|
||||
import {
|
||||
DOOM_LOOP_THRESHOLD,
|
||||
detectDoomLoop,
|
||||
} from './sentinels.js';
|
||||
import {
|
||||
buildMessagesPayload,
|
||||
loadContext,
|
||||
} from './payload.js';
|
||||
import {
|
||||
finalizeCompletion,
|
||||
handleAbortOrError,
|
||||
} from './error-handler.js';
|
||||
import {
|
||||
executeStreamPhase,
|
||||
streamCompletion,
|
||||
} from './stream-phase.js';
|
||||
import { executeToolPhase } from './tool-phase.js';
|
||||
import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js';
|
||||
import {
|
||||
runCapHitSummary,
|
||||
runDoomLoopSummary,
|
||||
} from './sentinel-summaries.js';
|
||||
|
||||
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
||||
// importing from services/inference.js as the public surface.
|
||||
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js';
|
||||
export { buildMessagesPayload } from './payload.js';
|
||||
|
||||
export interface InferenceFrame {
|
||||
type:
|
||||
| 'message_started'
|
||||
| 'delta'
|
||||
| 'tool_call'
|
||||
| 'tool_result'
|
||||
| 'message_complete'
|
||||
| 'usage'
|
||||
| 'messages_deleted'
|
||||
| 'session_renamed'
|
||||
| 'chat_renamed'
|
||||
| 'error';
|
||||
message_id?: string;
|
||||
message_ids?: string[];
|
||||
chat_id?: string;
|
||||
tool_message_id?: string;
|
||||
tool_call_id?: string;
|
||||
// v1.8.2: 'system' added so cap-hit sentinel messages can announce themselves
|
||||
// through the normal message_started → delta → message_complete sequence.
|
||||
role?: 'assistant' | 'tool' | 'user' | 'system';
|
||||
content?: string;
|
||||
tool_call?: ToolCall;
|
||||
output?: unknown;
|
||||
truncated?: boolean;
|
||||
error?: string;
|
||||
// v1.8.2: structured error reason. Set on `type: 'error'` so the UI can
|
||||
// surface a specific message; `error` stays the human-readable text.
|
||||
reason?: ErrorReason;
|
||||
// v1.8.2: piggybacks on `message_complete` so static or terminally-resolved
|
||||
// messages can carry their persisted metadata to the live stream without a
|
||||
// refetch (sentinels carry { kind: 'cap_hit', ... }; failed messages carry
|
||||
// { kind: 'error', ... }).
|
||||
metadata?: MessageMetadata | null;
|
||||
tokens_used?: number | null;
|
||||
ctx_used?: number | null;
|
||||
ctx_max?: number | null;
|
||||
completion_tokens?: number | null;
|
||||
started_at?: string | null;
|
||||
finished_at?: string | null;
|
||||
model?: string;
|
||||
session_id?: string;
|
||||
name?: string;
|
||||
}
|
||||
|
||||
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
||||
|
||||
export interface InferenceContext {
|
||||
sql: Sql;
|
||||
config: Config;
|
||||
log: FastifyBaseLogger;
|
||||
publish: FramePublisher;
|
||||
publishUser: (frame: UserStreamFrame) => void;
|
||||
// v1.11: passed through so compaction.process can publish 'compacted'
|
||||
// frames on the same session WS channel useSessionStream subscribes to.
|
||||
// Compaction is the only path that needs the raw broker handle (regular
|
||||
// inference goes through `publish`); keeping a separate field avoids
|
||||
// tempting other code paths into bypassing the session-id binding.
|
||||
broker: Broker;
|
||||
}
|
||||
|
||||
// v1.12.4: payload assembly extracted to ./inference/payload.ts (tests
|
||||
// import buildMessagesPayload from this module, so a re-export below
|
||||
// preserves the public surface). Stream + tool phases extracted to
|
||||
// ./inference/stream-phase.ts and ./inference/tool-phase.ts.
|
||||
|
||||
export interface StreamResult {
|
||||
finishReason: string | null;
|
||||
content: string;
|
||||
toolCalls: ToolCall[];
|
||||
promptTokens: number | null;
|
||||
completionTokens: number | null;
|
||||
}
|
||||
|
||||
|
||||
export interface TurnArgs {
|
||||
sessionId: string;
|
||||
chatId: string;
|
||||
assistantMessageId: string;
|
||||
// v1.8.2: cumulative tool calls executed this run. Compared against the
|
||||
// resolved budget at the top of each turn. Replaces the older `depth`
|
||||
// counter (which counted iterations, not invocations).
|
||||
toolsUsed: number;
|
||||
// v1.11.6: ordered tool calls executed in this user-message turn (across
|
||||
// recursive runAssistantTurn invocations). Reset to [] at user-message
|
||||
// boundaries by runInference, same as toolsUsed. Doom-loop check at the
|
||||
// top of runAssistantTurn slices the last DOOM_LOOP_THRESHOLD entries.
|
||||
recentToolCalls: ToolCall[];
|
||||
signal: AbortSignal | undefined;
|
||||
}
|
||||
|
||||
|
||||
export async function runAssistantTurn(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
): Promise<void> {
|
||||
const { sessionId, chatId } = args;
|
||||
|
||||
// v1.11: if the prior turn flagged this chat for compaction, run it first
|
||||
// so loadContext below reads the post-compaction history. We swallow
|
||||
// compaction failures (clearing the flag so we don't loop) and proceed
|
||||
// with the un-compacted history — a slow turn that hits the model's
|
||||
// hard limit is recoverable; a dead session is not.
|
||||
const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>`
|
||||
SELECT needs_compaction FROM chats WHERE id = ${chatId}
|
||||
`;
|
||||
if (chatFlag[0]?.needs_compaction) {
|
||||
try {
|
||||
await compaction.process({
|
||||
sql: ctx.sql,
|
||||
config: ctx.config,
|
||||
log: ctx.log,
|
||||
broker: ctx.broker,
|
||||
chatId,
|
||||
});
|
||||
} catch (err) {
|
||||
ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding');
|
||||
await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`;
|
||||
}
|
||||
}
|
||||
|
||||
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
||||
if (!loaded) {
|
||||
ctx.log.warn({ sessionId }, 'inference: session or project missing');
|
||||
return;
|
||||
}
|
||||
const { session, project, history } = loaded;
|
||||
const projectRoot = await resolveProjectRoot(project.path);
|
||||
// Agent resolution is per-turn so PATCH agent_id mid-conversation takes
|
||||
// effect on the next message. Unknown agent_id returns null silently —
|
||||
// session falls back to base prompt + all tools + default temperature.
|
||||
const agent = session.agent_id
|
||||
? await getAgentById(project.path, session.agent_id)
|
||||
: null;
|
||||
|
||||
// v1.8.2: cap-hit replaces the older "tool loop depth exceeded" failure.
|
||||
// When we've already burned the budget *before* this turn even runs, we
|
||||
// skip straight to the summary flow — the in-flight assistant message slot
|
||||
// gets reused for the wrap-up reply instead of being marked failed.
|
||||
const budget = resolveToolBudget(agent);
|
||||
if (args.toolsUsed >= budget) {
|
||||
await runCapHitSummary(ctx, args, session, project, history, agent, budget);
|
||||
return;
|
||||
}
|
||||
|
||||
// v1.11.6: doom-loop guard. Detected BEFORE the budget cap (the model can
|
||||
// burn through 3 identical calls long before the 15-call budget fires).
|
||||
// Same in-flight-slot-reuse pattern as runCapHitSummary — wrap-up reply
|
||||
// lands in args.assistantMessageId, then a doom_loop sentinel is inserted
|
||||
// to make the abort visible in the chat history.
|
||||
const loop = detectDoomLoop(args.recentToolCalls);
|
||||
if (loop) {
|
||||
await runDoomLoopSummary(ctx, args, session, project, history, agent, loop);
|
||||
return;
|
||||
}
|
||||
|
||||
const messages = await buildMessagesPayload(session, project, history, agent);
|
||||
|
||||
// v1.11.8: resolve per-chat web-tools opt-in. Tri-state on the wire:
|
||||
// - session.web_search_enabled = null → inherit project default
|
||||
// - session.web_search_enabled = true/false → explicit
|
||||
// Both web_search and web_fetch are gated by this single flag (the UI
|
||||
// label is "Enable web search and fetch" — same store, both tools).
|
||||
// Default is false unless explicitly opted in, matching the v1.9
|
||||
// plumbing intent ("inert until Batch 8 ships the actual tools").
|
||||
const webToolsEnabled =
|
||||
session.web_search_enabled ?? project.default_web_search_enabled ?? false;
|
||||
|
||||
const state: StreamPhaseState = { accumulated: '', startedAt: null };
|
||||
let result: StreamResult;
|
||||
try {
|
||||
result = await executeStreamPhase(ctx, args, session, messages, state, agent, webToolsEnabled);
|
||||
} catch (err) {
|
||||
await handleAbortOrError(ctx, args, state.accumulated, err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.toolCalls.length > 0) {
|
||||
await executeToolPhase(ctx, args, result, state.startedAt, session, projectRoot);
|
||||
return;
|
||||
}
|
||||
|
||||
await finalizeCompletion(ctx, args, result, state.startedAt, session);
|
||||
}
|
||||
|
||||
export async function runInference(
|
||||
ctx: InferenceContext,
|
||||
sessionId: string,
|
||||
chatId: string,
|
||||
assistantMessageId: string,
|
||||
signal?: AbortSignal
|
||||
): Promise<void> {
|
||||
// v1.8.2: every fresh inference (initial send, regenerate, force_send,
|
||||
// continue) starts with a clean budget. Tool-call accumulation across
|
||||
// Continue invocations is what the hard ceiling guards against, not the
|
||||
// per-call budget.
|
||||
// v1.11.6: recentToolCalls also resets — doom-loop detection is scoped
|
||||
// to a single user-message turn, so a Continue starts with no history.
|
||||
return runAssistantTurn(ctx, {
|
||||
sessionId,
|
||||
chatId,
|
||||
assistantMessageId,
|
||||
toolsUsed: 0,
|
||||
recentToolCalls: [],
|
||||
signal,
|
||||
});
|
||||
}
|
||||
|
||||
// v1.8.2: cap-hit summary flow. Called instead of erroring when the loop
|
||||
// hits its budget. Reuses the in-flight assistant message slot to stream a
|
||||
// short wrap-up reply with the synthetic note prepended and tools disabled,
|
||||
// then always inserts a cap_hit sentinel afterward (regardless of summary
|
||||
// outcome) so the UI can show a Continue affordance.
|
||||
interface InferenceRegistration {
|
||||
controller: AbortController;
|
||||
completed: Promise<void>;
|
||||
}
|
||||
|
||||
export function createInferenceRunner(
|
||||
ctx: Omit<InferenceContext, 'publishUser'>,
|
||||
publishUserFn: (user: string, frame: UserStreamFrame) => void
|
||||
) {
|
||||
const registry = new Map<string, InferenceRegistration>();
|
||||
|
||||
return {
|
||||
enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) {
|
||||
const callCtx: InferenceContext = {
|
||||
...ctx,
|
||||
publishUser: (frame) => publishUserFn(user, frame),
|
||||
// v1.11: broker comes in via ctx (set at registration time). Repeated
|
||||
// here so the destructure carries it onto the per-call ctx without
|
||||
// having to add it to every enqueue/cancel signature individually.
|
||||
broker: ctx.broker,
|
||||
};
|
||||
// v1.8 mobile-tabs: announce working before the async loop starts so
|
||||
// every device subscribed to the user channel sees the amber dot.
|
||||
callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'streaming', at: new Date().toISOString() });
|
||||
const controller = new AbortController();
|
||||
let resolveCompleted!: () => void;
|
||||
const completed = new Promise<void>((res) => { resolveCompleted = res; });
|
||||
const registration: InferenceRegistration = { controller, completed };
|
||||
registry.set(chatId, registration);
|
||||
void (async () => {
|
||||
try {
|
||||
await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal);
|
||||
setImmediate(() => {
|
||||
void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => {
|
||||
callCtx.log.warn({ err, chatId }, 'auto-name failed');
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
callCtx.log.error({ err }, 'unhandled inference error');
|
||||
} finally {
|
||||
resolveCompleted();
|
||||
// Only clear our own registration; a force-send may have replaced it.
|
||||
if (registry.get(chatId) === registration) {
|
||||
registry.delete(chatId);
|
||||
}
|
||||
}
|
||||
})();
|
||||
},
|
||||
|
||||
async cancel(_sessionId: string, chatId: string): Promise<boolean> {
|
||||
const reg = registry.get(chatId);
|
||||
if (!reg) return false;
|
||||
reg.controller.abort();
|
||||
// Swallow — we just need to wait for the catch/finally to persist state.
|
||||
await reg.completed.catch(() => {});
|
||||
return true;
|
||||
},
|
||||
|
||||
hasActive(chatId: string): boolean {
|
||||
return registry.has(chatId);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export const _toolNames = ALL_TOOLS.map((t) => t.name);
|
||||
Reference in New Issue
Block a user