v1.12.4-rc3: extract stream-phase + tool-phase from inference.ts
- stream-phase.ts: streamCompletion, executeStreamPhase (plus sseLines,
StreamOptions, ChatCompletionDelta/Chunk as private helpers)
- tool-phase.ts: executeToolPhase + private executeToolCall
- types.ts: shared StreamPhaseState + DB_FLUSH_INTERVAL_MS so the
summary functions still in inference.ts can reference them without
pulling from a phase file
Cycle: executeToolPhase recurses into runAssistantTurn, which stays in
inference.ts. Resolved by direct value back-edge — tool-phase.ts does
`import { runAssistantTurn } from '../inference.js'` and runAssistantTurn
is now exported. Safe because the dereference happens inside an async
function body, after both modules have fully evaluated. No
callback-through-args fallback needed.
inference.ts shrinks from ~1401 to ~828 LoC. Final Dispatch D moves the
sentinel summaries out and renames the residue to inference/turn.ts.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,13 +11,8 @@ import type {
|
|||||||
ToolCall,
|
ToolCall,
|
||||||
UserStreamFrame,
|
UserStreamFrame,
|
||||||
} from '../types/api.js';
|
} from '../types/api.js';
|
||||||
import {
|
import { ALL_TOOLS } from './tools.js';
|
||||||
ALL_TOOLS,
|
import { resolveProjectRoot } from './path_guard.js';
|
||||||
TOOLS_BY_NAME,
|
|
||||||
toolJsonSchemas,
|
|
||||||
type ToolJsonSchema,
|
|
||||||
} from './tools.js';
|
|
||||||
import { PathScopeError, resolveProjectRoot } from './path_guard.js';
|
|
||||||
import { maybeAutoNameChat } from './auto_name.js';
|
import { maybeAutoNameChat } from './auto_name.js';
|
||||||
import { getAgentById } from './agents.js';
|
import { getAgentById } from './agents.js';
|
||||||
import * as compaction from './compaction.js';
|
import * as compaction from './compaction.js';
|
||||||
@@ -28,30 +23,26 @@ import {
|
|||||||
DOOM_LOOP_THRESHOLD,
|
DOOM_LOOP_THRESHOLD,
|
||||||
detectDoomLoop,
|
detectDoomLoop,
|
||||||
} from './inference/sentinels.js';
|
} from './inference/sentinels.js';
|
||||||
import {
|
|
||||||
XML_TOOL_CLOSE,
|
|
||||||
XML_TOOL_OPEN,
|
|
||||||
parseXmlToolCall,
|
|
||||||
partialXmlOpenerStart,
|
|
||||||
} from './inference/xml-parser.js';
|
|
||||||
import {
|
import {
|
||||||
buildMessagesPayload,
|
buildMessagesPayload,
|
||||||
loadContext,
|
loadContext,
|
||||||
maybeFlagForCompaction,
|
|
||||||
type OpenAiMessage,
|
|
||||||
} from './inference/payload.js';
|
} from './inference/payload.js';
|
||||||
import {
|
import {
|
||||||
finalizeCompletion,
|
finalizeCompletion,
|
||||||
handleAbortOrError,
|
handleAbortOrError,
|
||||||
} from './inference/error-handler.js';
|
} from './inference/error-handler.js';
|
||||||
|
import {
|
||||||
|
executeStreamPhase,
|
||||||
|
streamCompletion,
|
||||||
|
} from './inference/stream-phase.js';
|
||||||
|
import { executeToolPhase } from './inference/tool-phase.js';
|
||||||
|
import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './inference/types.js';
|
||||||
|
|
||||||
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
||||||
// importing from services/inference.js as the public surface.
|
// importing from services/inference.js as the public surface.
|
||||||
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './inference/sentinels.js';
|
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './inference/sentinels.js';
|
||||||
export { buildMessagesPayload } from './inference/payload.js';
|
export { buildMessagesPayload } from './inference/payload.js';
|
||||||
|
|
||||||
const DB_FLUSH_INTERVAL_MS = 500;
|
|
||||||
|
|
||||||
// Synthetic system note appended to the cap-hit summary call. Verbatim from
|
// Synthetic system note appended to the cap-hit summary call. Verbatim from
|
||||||
// the v1.8.2 spec — do not paraphrase: the model is more reliable when the
|
// the v1.8.2 spec — do not paraphrase: the model is more reliable when the
|
||||||
// instruction is short, declarative, and identical across calls.
|
// instruction is short, declarative, and identical across calls.
|
||||||
@@ -107,29 +98,6 @@ export interface InferenceFrame {
|
|||||||
|
|
||||||
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
||||||
|
|
||||||
interface ChatCompletionDelta {
|
|
||||||
role?: string;
|
|
||||||
content?: string | null;
|
|
||||||
tool_calls?: Array<{
|
|
||||||
index: number;
|
|
||||||
id?: string;
|
|
||||||
type?: 'function';
|
|
||||||
function?: { name?: string; arguments?: string };
|
|
||||||
}>;
|
|
||||||
}
|
|
||||||
|
|
||||||
interface ChatCompletionChunk {
|
|
||||||
choices?: Array<{
|
|
||||||
delta: ChatCompletionDelta;
|
|
||||||
finish_reason: string | null;
|
|
||||||
}>;
|
|
||||||
usage?: {
|
|
||||||
prompt_tokens?: number;
|
|
||||||
completion_tokens?: number;
|
|
||||||
total_tokens?: number;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface InferenceContext {
|
export interface InferenceContext {
|
||||||
sql: Sql;
|
sql: Sql;
|
||||||
config: Config;
|
config: Config;
|
||||||
@@ -144,36 +112,10 @@ export interface InferenceContext {
|
|||||||
broker: Broker;
|
broker: Broker;
|
||||||
}
|
}
|
||||||
|
|
||||||
// v1.12.4: payload assembly extracted to ./inference/payload.ts —
|
// v1.12.4: payload assembly extracted to ./inference/payload.ts (tests
|
||||||
// buildMessagesPayload, loadContext, maybeFlagForCompaction, and the
|
// import buildMessagesPayload from this module, so a re-export below
|
||||||
// OpenAiMessage shape live there now. Re-exported below to preserve the
|
// preserves the public surface). Stream + tool phases extracted to
|
||||||
// public surface (tests import buildMessagesPayload from this module).
|
// ./inference/stream-phase.ts and ./inference/tool-phase.ts.
|
||||||
// v1.12: buildSystemPrompt moved to services/system-prompt.ts. See that
|
|
||||||
// module for the resolution order doc and the container-guidance layer.
|
|
||||||
// buildMessagesPayload is async now because buildSystemPrompt awaits the
|
|
||||||
// guidance cache lookup.
|
|
||||||
async function* sseLines(stream: ReadableStream<Uint8Array>): AsyncGenerator<string> {
|
|
||||||
const reader = stream.getReader();
|
|
||||||
const decoder = new TextDecoder('utf-8');
|
|
||||||
let buffer = '';
|
|
||||||
try {
|
|
||||||
while (true) {
|
|
||||||
const { value, done } = await reader.read();
|
|
||||||
if (done) break;
|
|
||||||
buffer += decoder.decode(value, { stream: true });
|
|
||||||
let idx;
|
|
||||||
while ((idx = buffer.indexOf('\n')) >= 0) {
|
|
||||||
const line = buffer.slice(0, idx).replace(/\r$/, '');
|
|
||||||
buffer = buffer.slice(idx + 1);
|
|
||||||
if (line.length === 0) continue;
|
|
||||||
yield line;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (buffer.length > 0) yield buffer;
|
|
||||||
} finally {
|
|
||||||
reader.releaseLock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface StreamResult {
|
export interface StreamResult {
|
||||||
finishReason: string | null;
|
finishReason: string | null;
|
||||||
@@ -183,235 +125,6 @@ export interface StreamResult {
|
|||||||
completionTokens: number | null;
|
completionTokens: number | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface StreamOptions {
|
|
||||||
// null = omit tools entirely (compact phase); [] = caller stripped all tools
|
|
||||||
// (rare; we still omit from the request body to avoid OpenAI 400).
|
|
||||||
tools: ToolJsonSchema[] | null;
|
|
||||||
temperature?: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
// v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via
|
|
||||||
// llama-swap) emit tool calls as inline XML inside delta.content rather than
|
|
||||||
// the structured delta.tool_calls field. The XML shape is:
|
|
||||||
// <tool_call>
|
|
||||||
// <function=NAME>
|
|
||||||
// <parameter=KEY>
|
|
||||||
// VALUE
|
|
||||||
// </parameter>
|
|
||||||
// ...more parameters...
|
|
||||||
// </function>
|
|
||||||
// </tool_call>
|
|
||||||
// Multiple <tool_call> blocks may appear back-to-back; they never nest.
|
|
||||||
// streamCompletion buffers delta.content, extracts complete blocks, parses
|
|
||||||
// them via parseXmlToolCall, and pushes synthetic entries into the existing
|
|
||||||
// toolCallsBuffer alongside any native JSON-format tool calls.
|
|
||||||
async function streamCompletion(
|
|
||||||
ctx: InferenceContext,
|
|
||||||
model: string,
|
|
||||||
messages: OpenAiMessage[],
|
|
||||||
opts: StreamOptions,
|
|
||||||
onDelta: (content: string) => void,
|
|
||||||
onUsage: ((prompt: number | null, completion: number | null) => void) | undefined,
|
|
||||||
signal?: AbortSignal
|
|
||||||
): Promise<StreamResult> {
|
|
||||||
const body: Record<string, unknown> = {
|
|
||||||
model,
|
|
||||||
messages,
|
|
||||||
stream: true,
|
|
||||||
stream_options: { include_usage: true },
|
|
||||||
};
|
|
||||||
if (opts.tools && opts.tools.length > 0) {
|
|
||||||
body['tools'] = opts.tools;
|
|
||||||
body['tool_choice'] = 'auto';
|
|
||||||
}
|
|
||||||
if (typeof opts.temperature === 'number') {
|
|
||||||
body['temperature'] = opts.temperature;
|
|
||||||
}
|
|
||||||
|
|
||||||
const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, {
|
|
||||||
method: 'POST',
|
|
||||||
headers: { 'Content-Type': 'application/json' },
|
|
||||||
body: JSON.stringify(body),
|
|
||||||
signal,
|
|
||||||
});
|
|
||||||
if (!res.ok || !res.body) {
|
|
||||||
const text = await res.text().catch(() => '');
|
|
||||||
throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
let content = '';
|
|
||||||
// v1.10.5: holds delta.content bytes that may contain a partial XML tool
|
|
||||||
// call. Anything not part of a (possibly forming) <tool_call>…</tool_call>
|
|
||||||
// pair is flushed to content + onDelta as soon as we know it's safe.
|
|
||||||
let pendingBuffer = '';
|
|
||||||
let finishReason: string | null = null;
|
|
||||||
let promptTokens: number | null = null;
|
|
||||||
let completionTokens: number | null = null;
|
|
||||||
const toolCallsBuffer = new Map<number, { id: string; name: string; argsText: string }>();
|
|
||||||
|
|
||||||
for await (const line of sseLines(res.body)) {
|
|
||||||
if (!line.startsWith('data:')) continue;
|
|
||||||
const payload = line.slice(5).trim();
|
|
||||||
if (payload === '[DONE]') break;
|
|
||||||
let parsed: ChatCompletionChunk;
|
|
||||||
try {
|
|
||||||
parsed = JSON.parse(payload);
|
|
||||||
} catch {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (parsed.usage) {
|
|
||||||
if (typeof parsed.usage.prompt_tokens === 'number') {
|
|
||||||
promptTokens = parsed.usage.prompt_tokens;
|
|
||||||
}
|
|
||||||
if (typeof parsed.usage.completion_tokens === 'number') {
|
|
||||||
completionTokens = parsed.usage.completion_tokens;
|
|
||||||
}
|
|
||||||
onUsage?.(promptTokens, completionTokens);
|
|
||||||
}
|
|
||||||
// v1.11.3: removed dead `parsed.timings.n_ctx` read. llama-server's
|
|
||||||
// streaming completion does NOT emit n_ctx in timings (verified
|
|
||||||
// empirically); the authoritative source is llama-swap's
|
|
||||||
// /upstream/<model>/props endpoint, fetched per-turn via
|
|
||||||
// model-context.getModelContext() at the finalization sites below.
|
|
||||||
|
|
||||||
const choice = parsed.choices?.[0];
|
|
||||||
if (!choice) continue;
|
|
||||||
const delta = choice.delta ?? {};
|
|
||||||
if (typeof delta.content === 'string' && delta.content.length > 0) {
|
|
||||||
// v1.10.5 XML fallback. Append, then extract any complete tool_call
|
|
||||||
// blocks before deciding what's safe to flush as visible content.
|
|
||||||
pendingBuffer += delta.content;
|
|
||||||
while (true) {
|
|
||||||
const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN);
|
|
||||||
if (startIdx === -1) break;
|
|
||||||
const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx);
|
|
||||||
if (closeIdx === -1) break;
|
|
||||||
const blockEnd = closeIdx + XML_TOOL_CLOSE.length;
|
|
||||||
const block = pendingBuffer.slice(startIdx, blockEnd);
|
|
||||||
// Any text before the opener is plain content — flush it now.
|
|
||||||
if (startIdx > 0) {
|
|
||||||
const before = pendingBuffer.slice(0, startIdx);
|
|
||||||
content += before;
|
|
||||||
onDelta(before);
|
|
||||||
}
|
|
||||||
const parsedCall = parseXmlToolCall(block);
|
|
||||||
if (parsedCall) {
|
|
||||||
const synthIdx = toolCallsBuffer.size;
|
|
||||||
toolCallsBuffer.set(synthIdx, {
|
|
||||||
id: `xml_call_${synthIdx}`,
|
|
||||||
name: parsedCall.name,
|
|
||||||
argsText: JSON.stringify(parsedCall.args),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
// If parsing failed we still drop the block — emitting unparseable
|
|
||||||
// XML to the chat would look worse than silently swallowing it.
|
|
||||||
pendingBuffer = pendingBuffer.slice(blockEnd);
|
|
||||||
}
|
|
||||||
// After all complete blocks are out, hold back any (partial or full)
|
|
||||||
// unclosed opener; flush the rest.
|
|
||||||
const partialIdx = partialXmlOpenerStart(pendingBuffer);
|
|
||||||
if (partialIdx >= 0) {
|
|
||||||
if (partialIdx > 0) {
|
|
||||||
const flush = pendingBuffer.slice(0, partialIdx);
|
|
||||||
content += flush;
|
|
||||||
onDelta(flush);
|
|
||||||
}
|
|
||||||
pendingBuffer = pendingBuffer.slice(partialIdx);
|
|
||||||
} else if (pendingBuffer.length > 0) {
|
|
||||||
content += pendingBuffer;
|
|
||||||
onDelta(pendingBuffer);
|
|
||||||
pendingBuffer = '';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (Array.isArray(delta.tool_calls)) {
|
|
||||||
for (const tc of delta.tool_calls) {
|
|
||||||
const idx = tc.index;
|
|
||||||
const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' };
|
|
||||||
if (tc.id) existing.id = tc.id;
|
|
||||||
if (tc.function?.name) existing.name = tc.function.name;
|
|
||||||
if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments;
|
|
||||||
toolCallsBuffer.set(idx, existing);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (choice.finish_reason) finishReason = choice.finish_reason;
|
|
||||||
}
|
|
||||||
|
|
||||||
// v1.10.5: if the stream ended mid-XML (e.g. model truncated, no closer
|
|
||||||
// ever arrived), flush whatever was buffered as plain content so it isn't
|
|
||||||
// silently dropped. Better to show a stray `<tool_call>` than vanish text.
|
|
||||||
if (pendingBuffer.length > 0) {
|
|
||||||
content += pendingBuffer;
|
|
||||||
onDelta(pendingBuffer);
|
|
||||||
pendingBuffer = '';
|
|
||||||
}
|
|
||||||
|
|
||||||
const toolCalls: ToolCall[] = [];
|
|
||||||
for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) {
|
|
||||||
let args: Record<string, unknown> = {};
|
|
||||||
if (t.argsText.length > 0) {
|
|
||||||
try {
|
|
||||||
args = JSON.parse(t.argsText);
|
|
||||||
} catch {
|
|
||||||
args = { _raw: t.argsText };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args });
|
|
||||||
}
|
|
||||||
|
|
||||||
return { finishReason, content, toolCalls, promptTokens, completionTokens };
|
|
||||||
}
|
|
||||||
|
|
||||||
async function executeToolCall(
|
|
||||||
projectRoot: string,
|
|
||||||
toolCall: ToolCall
|
|
||||||
): Promise<{ output: unknown; truncated: boolean; error?: string }> {
|
|
||||||
const tool = TOOLS_BY_NAME[toolCall.name];
|
|
||||||
if (!tool) {
|
|
||||||
return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` };
|
|
||||||
}
|
|
||||||
const parsed = tool.inputSchema.safeParse(toolCall.args);
|
|
||||||
if (!parsed.success) {
|
|
||||||
// v1.12 Track B.2: enrich the zod-reject path so the model sees a
|
|
||||||
// one-line, tool-named hint ("tool 'search_symbols' rejected — query:
|
|
||||||
// Required") instead of a JSON blob of flatten output. Higher recovery
|
|
||||||
// rate on the next turn; doom-loop guard still bounds infinite retries.
|
|
||||||
// The cast is because tool.inputSchema is ZodType<unknown>, so zod can't
|
|
||||||
// statically narrow flatten()'s fieldErrors key set — but the runtime
|
|
||||||
// shape is the standard { formErrors: string[]; fieldErrors: Record<...> }.
|
|
||||||
const flatten = parsed.error.flatten() as {
|
|
||||||
formErrors: string[];
|
|
||||||
fieldErrors: Record<string, string[] | undefined>;
|
|
||||||
};
|
|
||||||
const fieldErrors = Object.entries(flatten.fieldErrors)
|
|
||||||
.map(([field, errs]) => `${field}: ${errs?.[0] ?? 'invalid'}`)
|
|
||||||
.join('; ');
|
|
||||||
const formError = flatten.formErrors[0];
|
|
||||||
const hint = fieldErrors || formError || 'unknown validation error';
|
|
||||||
return {
|
|
||||||
output: null,
|
|
||||||
truncated: false,
|
|
||||||
error: `tool '${toolCall.name}' rejected — ${hint}`,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
const output = await tool.execute(parsed.data, projectRoot);
|
|
||||||
const truncated =
|
|
||||||
typeof output === 'object' && output !== null && 'truncated' in output
|
|
||||||
? Boolean((output as { truncated: unknown }).truncated)
|
|
||||||
: false;
|
|
||||||
return { output, truncated };
|
|
||||||
} catch (err) {
|
|
||||||
if (err instanceof PathScopeError) {
|
|
||||||
return { output: null, truncated: false, error: err.message };
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
output: null,
|
|
||||||
truncated: false,
|
|
||||||
error: err instanceof Error ? err.message : String(err),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface TurnArgs {
|
export interface TurnArgs {
|
||||||
sessionId: string;
|
sessionId: string;
|
||||||
@@ -429,294 +142,8 @@ export interface TurnArgs {
|
|||||||
signal: AbortSignal | undefined;
|
signal: AbortSignal | undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface StreamPhaseState {
|
|
||||||
accumulated: string;
|
|
||||||
startedAt: string | null;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function executeStreamPhase(
|
export async function runAssistantTurn(
|
||||||
ctx: InferenceContext,
|
|
||||||
args: TurnArgs,
|
|
||||||
session: Session,
|
|
||||||
messages: OpenAiMessage[],
|
|
||||||
state: StreamPhaseState,
|
|
||||||
agent: Agent | null,
|
|
||||||
// v1.11.8: when false, web_search and web_fetch are stripped from the
|
|
||||||
// tool list sent to the LLM, so the model can't even attempt them.
|
|
||||||
webToolsEnabled: boolean,
|
|
||||||
): Promise<StreamResult> {
|
|
||||||
const { sessionId, chatId, assistantMessageId, signal } = args;
|
|
||||||
|
|
||||||
const startedRow = await ctx.sql<{ started_at: string }[]>`
|
|
||||||
UPDATE messages
|
|
||||||
SET started_at = clock_timestamp()
|
|
||||||
WHERE id = ${assistantMessageId}
|
|
||||||
RETURNING started_at
|
|
||||||
`;
|
|
||||||
state.startedAt = startedRow[0]?.started_at ?? null;
|
|
||||||
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'message_started',
|
|
||||||
message_id: assistantMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
role: 'assistant',
|
|
||||||
});
|
|
||||||
|
|
||||||
let pendingFlushTimer: NodeJS.Timeout | null = null;
|
|
||||||
let flushPromise: Promise<unknown> = Promise.resolve();
|
|
||||||
|
|
||||||
const flushNow = () => {
|
|
||||||
if (pendingFlushTimer) {
|
|
||||||
clearTimeout(pendingFlushTimer);
|
|
||||||
pendingFlushTimer = null;
|
|
||||||
}
|
|
||||||
const snapshot = state.accumulated;
|
|
||||||
flushPromise = flushPromise.then(() =>
|
|
||||||
ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}`
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
const scheduleFlush = () => {
|
|
||||||
if (pendingFlushTimer) return;
|
|
||||||
pendingFlushTimer = setTimeout(() => {
|
|
||||||
pendingFlushTimer = null;
|
|
||||||
flushNow();
|
|
||||||
}, DB_FLUSH_INTERVAL_MS);
|
|
||||||
};
|
|
||||||
|
|
||||||
// Tool whitelist: if an agent is set, filter the global tool list to only the
|
|
||||||
// tool names it allows. Unknown names in agent.tools are dropped silently
|
|
||||||
// (handled here by intersection). When no agent: send all tools.
|
|
||||||
// v1.11.8: a second filter strips web_search + web_fetch unless the chat
|
|
||||||
// has them explicitly enabled. Counts as an opt-in security boundary: the
|
|
||||||
// model can't summon a tool that wasn't offered to it.
|
|
||||||
const WEB_TOOL_NAMES: ReadonlySet<string> = new Set(['web_search', 'web_fetch']);
|
|
||||||
const effectiveTools: ToolJsonSchema[] = (agent
|
|
||||||
? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name))
|
|
||||||
: toolJsonSchemas()
|
|
||||||
).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name));
|
|
||||||
const effectiveTemperature = agent?.temperature;
|
|
||||||
|
|
||||||
// v1.12.2: ctx_max lookup is cached after the first hit per model, so this
|
|
||||||
// is a Map probe in steady state. We capture nCtx once at the top of the
|
|
||||||
// stream so the throttled usage publish doesn't refetch each tick.
|
|
||||||
const mctxForStream = await modelContext.getModelContext(session.model);
|
|
||||||
const nCtxForStream = mctxForStream?.n_ctx ?? null;
|
|
||||||
|
|
||||||
// v1.12.2: throttle live usage publishes to ~500ms. The model can land
|
|
||||||
// dozens of usage frames per second; without a throttle the WS turns into
|
|
||||||
// a firehose for a few KB savings on each render.
|
|
||||||
const USAGE_THROTTLE_MS = 500;
|
|
||||||
let lastUsageAt = 0;
|
|
||||||
let pendingUsage: { p: number | null; c: number | null } | null = null;
|
|
||||||
let usageTimer: NodeJS.Timeout | null = null;
|
|
||||||
const flushUsage = () => {
|
|
||||||
if (!pendingUsage) return;
|
|
||||||
const { p, c } = pendingUsage;
|
|
||||||
pendingUsage = null;
|
|
||||||
lastUsageAt = Date.now();
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'usage',
|
|
||||||
message_id: assistantMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
completion_tokens: c,
|
|
||||||
ctx_used: p,
|
|
||||||
ctx_max: nCtxForStream,
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
try {
|
|
||||||
return await streamCompletion(
|
|
||||||
ctx,
|
|
||||||
session.model,
|
|
||||||
messages,
|
|
||||||
{ tools: effectiveTools, temperature: effectiveTemperature },
|
|
||||||
(delta) => {
|
|
||||||
state.accumulated += delta;
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'delta',
|
|
||||||
message_id: assistantMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
content: delta,
|
|
||||||
});
|
|
||||||
ctx.log.debug({ sessionId, delta }, 'inference delta');
|
|
||||||
scheduleFlush();
|
|
||||||
},
|
|
||||||
(prompt, completion) => {
|
|
||||||
pendingUsage = { p: prompt, c: completion };
|
|
||||||
const elapsed = Date.now() - lastUsageAt;
|
|
||||||
if (elapsed >= USAGE_THROTTLE_MS) {
|
|
||||||
flushUsage();
|
|
||||||
} else if (!usageTimer) {
|
|
||||||
usageTimer = setTimeout(() => {
|
|
||||||
usageTimer = null;
|
|
||||||
flushUsage();
|
|
||||||
}, USAGE_THROTTLE_MS - elapsed);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
signal
|
|
||||||
);
|
|
||||||
} finally {
|
|
||||||
if (pendingFlushTimer) {
|
|
||||||
clearTimeout(pendingFlushTimer);
|
|
||||||
pendingFlushTimer = null;
|
|
||||||
}
|
|
||||||
if (usageTimer) {
|
|
||||||
clearTimeout(usageTimer);
|
|
||||||
usageTimer = null;
|
|
||||||
}
|
|
||||||
await flushPromise;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function executeToolPhase(
|
|
||||||
ctx: InferenceContext,
|
|
||||||
args: TurnArgs,
|
|
||||||
result: StreamResult,
|
|
||||||
startedAt: string | null,
|
|
||||||
session: Session,
|
|
||||||
projectRoot: string
|
|
||||||
): Promise<void> {
|
|
||||||
const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args;
|
|
||||||
const { content, toolCalls, promptTokens, completionTokens } = result;
|
|
||||||
|
|
||||||
// v1.11.3: ctx_max comes from llama-swap /upstream/<model>/props, not the
|
|
||||||
// streaming completion (which doesn't emit n_ctx). getModelContext caches
|
|
||||||
// the positive lookup for the process lifetime, so this is a single Map
|
|
||||||
// hit after the first invocation per model.
|
|
||||||
const mctx = await modelContext.getModelContext(session.model);
|
|
||||||
const nCtx = mctx?.n_ctx ?? null;
|
|
||||||
|
|
||||||
const [updated] = await ctx.sql<
|
|
||||||
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
|
||||||
>`
|
|
||||||
UPDATE messages
|
|
||||||
SET content = ${content},
|
|
||||||
status = 'complete',
|
|
||||||
tool_calls = ${ctx.sql.json(toolCalls as never)},
|
|
||||||
tokens_used = ${completionTokens},
|
|
||||||
ctx_used = ${promptTokens},
|
|
||||||
ctx_max = ${nCtx},
|
|
||||||
finished_at = clock_timestamp()
|
|
||||||
WHERE id = ${assistantMessageId}
|
|
||||||
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
|
||||||
`;
|
|
||||||
// v1.11: flag for compaction if this turn pushed us over the usable budget.
|
|
||||||
// We never compact mid-loop (the recursive runAssistantTurn keeps tools
|
|
||||||
// flowing); the flag fires on the NEXT turn's pre-fetch hook above.
|
|
||||||
await maybeFlagForCompaction(ctx, chatId, updated);
|
|
||||||
const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
|
||||||
UPDATE sessions SET updated_at = clock_timestamp()
|
|
||||||
WHERE id = ${sessionId}
|
|
||||||
RETURNING project_id, name, updated_at
|
|
||||||
`;
|
|
||||||
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at });
|
|
||||||
for (const tc of toolCalls) {
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'tool_call',
|
|
||||||
message_id: assistantMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
tool_call: tc,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'message_complete',
|
|
||||||
message_id: assistantMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
tokens_used: updated?.tokens_used ?? null,
|
|
||||||
ctx_used: updated?.ctx_used ?? null,
|
|
||||||
ctx_max: updated?.ctx_max ?? null,
|
|
||||||
started_at: startedAt,
|
|
||||||
finished_at: updated?.finished_at ?? null,
|
|
||||||
model: session.model,
|
|
||||||
});
|
|
||||||
|
|
||||||
// Batch 9.7: ask_user_input pauses the loop. The tool row is still inserted
|
|
||||||
// (the answer endpoint needs a target row to UPDATE), but tool_results is
|
|
||||||
// pre-stamped with output=null as a "pending" sentinel and no tool_result
|
|
||||||
// frame goes out — the card renders from the tool_call frame alone. Mixed
|
|
||||||
// batches still execute the other tools normally.
|
|
||||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'tool_running', at: new Date().toISOString() });
|
|
||||||
let pausingForUserInput = false;
|
|
||||||
await Promise.all(
|
|
||||||
toolCalls.map(async (tc) => {
|
|
||||||
const [toolRow] = await ctx.sql<{ id: string }[]>`
|
|
||||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
||||||
VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp())
|
|
||||||
RETURNING id
|
|
||||||
`;
|
|
||||||
const toolMessageId = toolRow!.id;
|
|
||||||
if (tc.name === 'ask_user_input') {
|
|
||||||
pausingForUserInput = true;
|
|
||||||
const sentinel = { tool_call_id: tc.id, output: null, truncated: false };
|
|
||||||
await ctx.sql`
|
|
||||||
UPDATE messages
|
|
||||||
SET tool_results = ${ctx.sql.json(sentinel as never)}
|
|
||||||
WHERE id = ${toolMessageId}
|
|
||||||
`;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const tres = await executeToolCall(projectRoot, tc);
|
|
||||||
const stored = {
|
|
||||||
tool_call_id: tc.id,
|
|
||||||
output: tres.output,
|
|
||||||
truncated: tres.truncated,
|
|
||||||
...(tres.error ? { error: tres.error } : {}),
|
|
||||||
};
|
|
||||||
await ctx.sql`
|
|
||||||
UPDATE messages
|
|
||||||
SET tool_results = ${ctx.sql.json(stored as never)}
|
|
||||||
WHERE id = ${toolMessageId}
|
|
||||||
`;
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'tool_result',
|
|
||||||
tool_message_id: toolMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
tool_call_id: tc.id,
|
|
||||||
output: tres.output,
|
|
||||||
truncated: tres.truncated,
|
|
||||||
...(tres.error ? { error: tres.error } : {}),
|
|
||||||
});
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
if (pausingForUserInput) {
|
|
||||||
ctx.publishUser({
|
|
||||||
type: 'chat_status',
|
|
||||||
chat_id: chatId,
|
|
||||||
status: 'waiting_for_input',
|
|
||||||
at: new Date().toISOString(),
|
|
||||||
});
|
|
||||||
ctx.log.info(
|
|
||||||
{ sessionId, chatId, assistantMessageId },
|
|
||||||
'inference paused awaiting user input',
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const [nextAssistant] = await ctx.sql<{ id: string }[]>`
|
|
||||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
||||||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
|
||||||
RETURNING id
|
|
||||||
`;
|
|
||||||
await runAssistantTurn(ctx, {
|
|
||||||
sessionId,
|
|
||||||
chatId,
|
|
||||||
assistantMessageId: nextAssistant!.id,
|
|
||||||
// v1.8.2: charge this turn's actual tool invocations against the budget.
|
|
||||||
// One assistant message can emit multiple tool_calls, so we add the run
|
|
||||||
// count, not 1. The next turn's budget check sees the cumulative total.
|
|
||||||
toolsUsed: toolsUsed + result.toolCalls.length,
|
|
||||||
// v1.11.6: append the just-executed tool calls to the per-turn history
|
|
||||||
// so the next runAssistantTurn's doom-loop check can see them. We don't
|
|
||||||
// cap the array length here — per-turn budgets keep it bounded
|
|
||||||
// (typically <30 entries), and slicing happens inside detectDoomLoop.
|
|
||||||
recentToolCalls: [...args.recentToolCalls, ...result.toolCalls],
|
|
||||||
signal,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async function runAssistantTurn(
|
|
||||||
ctx: InferenceContext,
|
ctx: InferenceContext,
|
||||||
args: TurnArgs,
|
args: TurnArgs,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
|||||||
380
apps/server/src/services/inference/stream-phase.ts
Normal file
380
apps/server/src/services/inference/stream-phase.ts
Normal file
@@ -0,0 +1,380 @@
|
|||||||
|
import type {
|
||||||
|
Agent,
|
||||||
|
Session,
|
||||||
|
ToolCall,
|
||||||
|
} from '../../types/api.js';
|
||||||
|
import * as modelContext from '../model-context.js';
|
||||||
|
import { toolJsonSchemas, type ToolJsonSchema } from '../tools.js';
|
||||||
|
import type { OpenAiMessage } from './payload.js';
|
||||||
|
import {
|
||||||
|
XML_TOOL_CLOSE,
|
||||||
|
XML_TOOL_OPEN,
|
||||||
|
parseXmlToolCall,
|
||||||
|
partialXmlOpenerStart,
|
||||||
|
} from './xml-parser.js';
|
||||||
|
import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js';
|
||||||
|
import type {
|
||||||
|
InferenceContext,
|
||||||
|
StreamResult,
|
||||||
|
TurnArgs,
|
||||||
|
} from '../inference.js';
|
||||||
|
|
||||||
|
interface ChatCompletionDelta {
|
||||||
|
role?: string;
|
||||||
|
content?: string | null;
|
||||||
|
tool_calls?: Array<{
|
||||||
|
index: number;
|
||||||
|
id?: string;
|
||||||
|
type?: 'function';
|
||||||
|
function?: { name?: string; arguments?: string };
|
||||||
|
}>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ChatCompletionChunk {
|
||||||
|
choices?: Array<{
|
||||||
|
delta: ChatCompletionDelta;
|
||||||
|
finish_reason: string | null;
|
||||||
|
}>;
|
||||||
|
usage?: {
|
||||||
|
prompt_tokens?: number;
|
||||||
|
completion_tokens?: number;
|
||||||
|
total_tokens?: number;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
interface StreamOptions {
|
||||||
|
// null = omit tools entirely (compact phase); [] = caller stripped all tools
|
||||||
|
// (rare; we still omit from the request body to avoid OpenAI 400).
|
||||||
|
tools: ToolJsonSchema[] | null;
|
||||||
|
temperature?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function* sseLines(stream: ReadableStream<Uint8Array>): AsyncGenerator<string> {
|
||||||
|
const reader = stream.getReader();
|
||||||
|
const decoder = new TextDecoder('utf-8');
|
||||||
|
let buffer = '';
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { value, done } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
buffer += decoder.decode(value, { stream: true });
|
||||||
|
let idx;
|
||||||
|
while ((idx = buffer.indexOf('\n')) >= 0) {
|
||||||
|
const line = buffer.slice(0, idx).replace(/\r$/, '');
|
||||||
|
buffer = buffer.slice(idx + 1);
|
||||||
|
if (line.length === 0) continue;
|
||||||
|
yield line;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (buffer.length > 0) yield buffer;
|
||||||
|
} finally {
|
||||||
|
reader.releaseLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via
|
||||||
|
// llama-swap) emit tool calls as inline XML inside delta.content rather than
|
||||||
|
// the structured delta.tool_calls field. The XML shape is:
|
||||||
|
// <tool_call>
|
||||||
|
// <function=NAME>
|
||||||
|
// <parameter=KEY>
|
||||||
|
// VALUE
|
||||||
|
// </parameter>
|
||||||
|
// ...more parameters...
|
||||||
|
// </function>
|
||||||
|
// </tool_call>
|
||||||
|
// Multiple <tool_call> blocks may appear back-to-back; they never nest.
|
||||||
|
// streamCompletion buffers delta.content, extracts complete blocks, parses
|
||||||
|
// them via parseXmlToolCall, and pushes synthetic entries into the existing
|
||||||
|
// toolCallsBuffer alongside any native JSON-format tool calls.
|
||||||
|
export async function streamCompletion(
|
||||||
|
ctx: InferenceContext,
|
||||||
|
model: string,
|
||||||
|
messages: OpenAiMessage[],
|
||||||
|
opts: StreamOptions,
|
||||||
|
onDelta: (content: string) => void,
|
||||||
|
onUsage: ((prompt: number | null, completion: number | null) => void) | undefined,
|
||||||
|
signal?: AbortSignal
|
||||||
|
): Promise<StreamResult> {
|
||||||
|
const body: Record<string, unknown> = {
|
||||||
|
model,
|
||||||
|
messages,
|
||||||
|
stream: true,
|
||||||
|
stream_options: { include_usage: true },
|
||||||
|
};
|
||||||
|
if (opts.tools && opts.tools.length > 0) {
|
||||||
|
body['tools'] = opts.tools;
|
||||||
|
body['tool_choice'] = 'auto';
|
||||||
|
}
|
||||||
|
if (typeof opts.temperature === 'number') {
|
||||||
|
body['temperature'] = opts.temperature;
|
||||||
|
}
|
||||||
|
|
||||||
|
const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify(body),
|
||||||
|
signal,
|
||||||
|
});
|
||||||
|
if (!res.ok || !res.body) {
|
||||||
|
const text = await res.text().catch(() => '');
|
||||||
|
throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
let content = '';
|
||||||
|
// v1.10.5: holds delta.content bytes that may contain a partial XML tool
|
||||||
|
// call. Anything not part of a (possibly forming) <tool_call>…</tool_call>
|
||||||
|
// pair is flushed to content + onDelta as soon as we know it's safe.
|
||||||
|
let pendingBuffer = '';
|
||||||
|
let finishReason: string | null = null;
|
||||||
|
let promptTokens: number | null = null;
|
||||||
|
let completionTokens: number | null = null;
|
||||||
|
const toolCallsBuffer = new Map<number, { id: string; name: string; argsText: string }>();
|
||||||
|
|
||||||
|
for await (const line of sseLines(res.body)) {
|
||||||
|
if (!line.startsWith('data:')) continue;
|
||||||
|
const payload = line.slice(5).trim();
|
||||||
|
if (payload === '[DONE]') break;
|
||||||
|
let parsed: ChatCompletionChunk;
|
||||||
|
try {
|
||||||
|
parsed = JSON.parse(payload);
|
||||||
|
} catch {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (parsed.usage) {
|
||||||
|
if (typeof parsed.usage.prompt_tokens === 'number') {
|
||||||
|
promptTokens = parsed.usage.prompt_tokens;
|
||||||
|
}
|
||||||
|
if (typeof parsed.usage.completion_tokens === 'number') {
|
||||||
|
completionTokens = parsed.usage.completion_tokens;
|
||||||
|
}
|
||||||
|
onUsage?.(promptTokens, completionTokens);
|
||||||
|
}
|
||||||
|
// v1.11.3: removed dead `parsed.timings.n_ctx` read. llama-server's
|
||||||
|
// streaming completion does NOT emit n_ctx in timings (verified
|
||||||
|
// empirically); the authoritative source is llama-swap's
|
||||||
|
// /upstream/<model>/props endpoint, fetched per-turn via
|
||||||
|
// model-context.getModelContext() at the finalization sites below.
|
||||||
|
|
||||||
|
const choice = parsed.choices?.[0];
|
||||||
|
if (!choice) continue;
|
||||||
|
const delta = choice.delta ?? {};
|
||||||
|
if (typeof delta.content === 'string' && delta.content.length > 0) {
|
||||||
|
// v1.10.5 XML fallback. Append, then extract any complete tool_call
|
||||||
|
// blocks before deciding what's safe to flush as visible content.
|
||||||
|
pendingBuffer += delta.content;
|
||||||
|
while (true) {
|
||||||
|
const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN);
|
||||||
|
if (startIdx === -1) break;
|
||||||
|
const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx);
|
||||||
|
if (closeIdx === -1) break;
|
||||||
|
const blockEnd = closeIdx + XML_TOOL_CLOSE.length;
|
||||||
|
const block = pendingBuffer.slice(startIdx, blockEnd);
|
||||||
|
// Any text before the opener is plain content — flush it now.
|
||||||
|
if (startIdx > 0) {
|
||||||
|
const before = pendingBuffer.slice(0, startIdx);
|
||||||
|
content += before;
|
||||||
|
onDelta(before);
|
||||||
|
}
|
||||||
|
const parsedCall = parseXmlToolCall(block);
|
||||||
|
if (parsedCall) {
|
||||||
|
const synthIdx = toolCallsBuffer.size;
|
||||||
|
toolCallsBuffer.set(synthIdx, {
|
||||||
|
id: `xml_call_${synthIdx}`,
|
||||||
|
name: parsedCall.name,
|
||||||
|
argsText: JSON.stringify(parsedCall.args),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// If parsing failed we still drop the block — emitting unparseable
|
||||||
|
// XML to the chat would look worse than silently swallowing it.
|
||||||
|
pendingBuffer = pendingBuffer.slice(blockEnd);
|
||||||
|
}
|
||||||
|
// After all complete blocks are out, hold back any (partial or full)
|
||||||
|
// unclosed opener; flush the rest.
|
||||||
|
const partialIdx = partialXmlOpenerStart(pendingBuffer);
|
||||||
|
if (partialIdx >= 0) {
|
||||||
|
if (partialIdx > 0) {
|
||||||
|
const flush = pendingBuffer.slice(0, partialIdx);
|
||||||
|
content += flush;
|
||||||
|
onDelta(flush);
|
||||||
|
}
|
||||||
|
pendingBuffer = pendingBuffer.slice(partialIdx);
|
||||||
|
} else if (pendingBuffer.length > 0) {
|
||||||
|
content += pendingBuffer;
|
||||||
|
onDelta(pendingBuffer);
|
||||||
|
pendingBuffer = '';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (Array.isArray(delta.tool_calls)) {
|
||||||
|
for (const tc of delta.tool_calls) {
|
||||||
|
const idx = tc.index;
|
||||||
|
const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' };
|
||||||
|
if (tc.id) existing.id = tc.id;
|
||||||
|
if (tc.function?.name) existing.name = tc.function.name;
|
||||||
|
if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments;
|
||||||
|
toolCallsBuffer.set(idx, existing);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (choice.finish_reason) finishReason = choice.finish_reason;
|
||||||
|
}
|
||||||
|
|
||||||
|
// v1.10.5: if the stream ended mid-XML (e.g. model truncated, no closer
|
||||||
|
// ever arrived), flush whatever was buffered as plain content so it isn't
|
||||||
|
// silently dropped. Better to show a stray `<tool_call>` than vanish text.
|
||||||
|
if (pendingBuffer.length > 0) {
|
||||||
|
content += pendingBuffer;
|
||||||
|
onDelta(pendingBuffer);
|
||||||
|
pendingBuffer = '';
|
||||||
|
}
|
||||||
|
|
||||||
|
const toolCalls: ToolCall[] = [];
|
||||||
|
for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) {
|
||||||
|
let args: Record<string, unknown> = {};
|
||||||
|
if (t.argsText.length > 0) {
|
||||||
|
try {
|
||||||
|
args = JSON.parse(t.argsText);
|
||||||
|
} catch {
|
||||||
|
args = { _raw: t.argsText };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args });
|
||||||
|
}
|
||||||
|
|
||||||
|
return { finishReason, content, toolCalls, promptTokens, completionTokens };
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function executeStreamPhase(
|
||||||
|
ctx: InferenceContext,
|
||||||
|
args: TurnArgs,
|
||||||
|
session: Session,
|
||||||
|
messages: OpenAiMessage[],
|
||||||
|
state: StreamPhaseState,
|
||||||
|
agent: Agent | null,
|
||||||
|
// v1.11.8: when false, web_search and web_fetch are stripped from the
|
||||||
|
// tool list sent to the LLM, so the model can't even attempt them.
|
||||||
|
webToolsEnabled: boolean,
|
||||||
|
): Promise<StreamResult> {
|
||||||
|
const { sessionId, chatId, assistantMessageId, signal } = args;
|
||||||
|
|
||||||
|
const startedRow = await ctx.sql<{ started_at: string }[]>`
|
||||||
|
UPDATE messages
|
||||||
|
SET started_at = clock_timestamp()
|
||||||
|
WHERE id = ${assistantMessageId}
|
||||||
|
RETURNING started_at
|
||||||
|
`;
|
||||||
|
state.startedAt = startedRow[0]?.started_at ?? null;
|
||||||
|
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'message_started',
|
||||||
|
message_id: assistantMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
role: 'assistant',
|
||||||
|
});
|
||||||
|
|
||||||
|
let pendingFlushTimer: NodeJS.Timeout | null = null;
|
||||||
|
let flushPromise: Promise<unknown> = Promise.resolve();
|
||||||
|
|
||||||
|
const flushNow = () => {
|
||||||
|
if (pendingFlushTimer) {
|
||||||
|
clearTimeout(pendingFlushTimer);
|
||||||
|
pendingFlushTimer = null;
|
||||||
|
}
|
||||||
|
const snapshot = state.accumulated;
|
||||||
|
flushPromise = flushPromise.then(() =>
|
||||||
|
ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}`
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
const scheduleFlush = () => {
|
||||||
|
if (pendingFlushTimer) return;
|
||||||
|
pendingFlushTimer = setTimeout(() => {
|
||||||
|
pendingFlushTimer = null;
|
||||||
|
flushNow();
|
||||||
|
}, DB_FLUSH_INTERVAL_MS);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Tool whitelist: if an agent is set, filter the global tool list to only the
|
||||||
|
// tool names it allows. Unknown names in agent.tools are dropped silently
|
||||||
|
// (handled here by intersection). When no agent: send all tools.
|
||||||
|
// v1.11.8: a second filter strips web_search + web_fetch unless the chat
|
||||||
|
// has them explicitly enabled. Counts as an opt-in security boundary: the
|
||||||
|
// model can't summon a tool that wasn't offered to it.
|
||||||
|
const WEB_TOOL_NAMES: ReadonlySet<string> = new Set(['web_search', 'web_fetch']);
|
||||||
|
const effectiveTools: ToolJsonSchema[] = (agent
|
||||||
|
? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name))
|
||||||
|
: toolJsonSchemas()
|
||||||
|
).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name));
|
||||||
|
const effectiveTemperature = agent?.temperature;
|
||||||
|
|
||||||
|
// v1.12.2: ctx_max lookup is cached after the first hit per model, so this
|
||||||
|
// is a Map probe in steady state. We capture nCtx once at the top of the
|
||||||
|
// stream so the throttled usage publish doesn't refetch each tick.
|
||||||
|
const mctxForStream = await modelContext.getModelContext(session.model);
|
||||||
|
const nCtxForStream = mctxForStream?.n_ctx ?? null;
|
||||||
|
|
||||||
|
// v1.12.2: throttle live usage publishes to ~500ms. The model can land
|
||||||
|
// dozens of usage frames per second; without a throttle the WS turns into
|
||||||
|
// a firehose for a few KB savings on each render.
|
||||||
|
const USAGE_THROTTLE_MS = 500;
|
||||||
|
let lastUsageAt = 0;
|
||||||
|
let pendingUsage: { p: number | null; c: number | null } | null = null;
|
||||||
|
let usageTimer: NodeJS.Timeout | null = null;
|
||||||
|
const flushUsage = () => {
|
||||||
|
if (!pendingUsage) return;
|
||||||
|
const { p, c } = pendingUsage;
|
||||||
|
pendingUsage = null;
|
||||||
|
lastUsageAt = Date.now();
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'usage',
|
||||||
|
message_id: assistantMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
completion_tokens: c,
|
||||||
|
ctx_used: p,
|
||||||
|
ctx_max: nCtxForStream,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
return await streamCompletion(
|
||||||
|
ctx,
|
||||||
|
session.model,
|
||||||
|
messages,
|
||||||
|
{ tools: effectiveTools, temperature: effectiveTemperature },
|
||||||
|
(delta) => {
|
||||||
|
state.accumulated += delta;
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'delta',
|
||||||
|
message_id: assistantMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
content: delta,
|
||||||
|
});
|
||||||
|
ctx.log.debug({ sessionId, delta }, 'inference delta');
|
||||||
|
scheduleFlush();
|
||||||
|
},
|
||||||
|
(prompt, completion) => {
|
||||||
|
pendingUsage = { p: prompt, c: completion };
|
||||||
|
const elapsed = Date.now() - lastUsageAt;
|
||||||
|
if (elapsed >= USAGE_THROTTLE_MS) {
|
||||||
|
flushUsage();
|
||||||
|
} else if (!usageTimer) {
|
||||||
|
usageTimer = setTimeout(() => {
|
||||||
|
usageTimer = null;
|
||||||
|
flushUsage();
|
||||||
|
}, USAGE_THROTTLE_MS - elapsed);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
signal
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
if (pendingFlushTimer) {
|
||||||
|
clearTimeout(pendingFlushTimer);
|
||||||
|
pendingFlushTimer = null;
|
||||||
|
}
|
||||||
|
if (usageTimer) {
|
||||||
|
clearTimeout(usageTimer);
|
||||||
|
usageTimer = null;
|
||||||
|
}
|
||||||
|
await flushPromise;
|
||||||
|
}
|
||||||
|
}
|
||||||
213
apps/server/src/services/inference/tool-phase.ts
Normal file
213
apps/server/src/services/inference/tool-phase.ts
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
import type { Session, ToolCall } from '../../types/api.js';
|
||||||
|
import * as modelContext from '../model-context.js';
|
||||||
|
import { PathScopeError } from '../path_guard.js';
|
||||||
|
import { TOOLS_BY_NAME } from '../tools.js';
|
||||||
|
import { maybeFlagForCompaction } from './payload.js';
|
||||||
|
import type {
|
||||||
|
InferenceContext,
|
||||||
|
StreamResult,
|
||||||
|
TurnArgs,
|
||||||
|
} from '../inference.js';
|
||||||
|
// v1.12.4: ESM value-import cycle. executeToolPhase recurses into
|
||||||
|
// runAssistantTurn which lives in inference.ts. The cycle is safe because
|
||||||
|
// the reference is read at call time (inside an async function body), not
|
||||||
|
// at module top-level. Node + tsc resolve this cleanly.
|
||||||
|
import { runAssistantTurn } from '../inference.js';
|
||||||
|
|
||||||
|
async function executeToolCall(
|
||||||
|
projectRoot: string,
|
||||||
|
toolCall: ToolCall
|
||||||
|
): Promise<{ output: unknown; truncated: boolean; error?: string }> {
|
||||||
|
const tool = TOOLS_BY_NAME[toolCall.name];
|
||||||
|
if (!tool) {
|
||||||
|
return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` };
|
||||||
|
}
|
||||||
|
const parsed = tool.inputSchema.safeParse(toolCall.args);
|
||||||
|
if (!parsed.success) {
|
||||||
|
// v1.12 Track B.2: enrich the zod-reject path so the model sees a
|
||||||
|
// one-line, tool-named hint ("tool 'search_symbols' rejected — query:
|
||||||
|
// Required") instead of a JSON blob of flatten output. Higher recovery
|
||||||
|
// rate on the next turn; doom-loop guard still bounds infinite retries.
|
||||||
|
// The cast is because tool.inputSchema is ZodType<unknown>, so zod can't
|
||||||
|
// statically narrow flatten()'s fieldErrors key set — but the runtime
|
||||||
|
// shape is the standard { formErrors: string[]; fieldErrors: Record<...> }.
|
||||||
|
const flatten = parsed.error.flatten() as {
|
||||||
|
formErrors: string[];
|
||||||
|
fieldErrors: Record<string, string[] | undefined>;
|
||||||
|
};
|
||||||
|
const fieldErrors = Object.entries(flatten.fieldErrors)
|
||||||
|
.map(([field, errs]) => `${field}: ${errs?.[0] ?? 'invalid'}`)
|
||||||
|
.join('; ');
|
||||||
|
const formError = flatten.formErrors[0];
|
||||||
|
const hint = fieldErrors || formError || 'unknown validation error';
|
||||||
|
return {
|
||||||
|
output: null,
|
||||||
|
truncated: false,
|
||||||
|
error: `tool '${toolCall.name}' rejected — ${hint}`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const output = await tool.execute(parsed.data, projectRoot);
|
||||||
|
const truncated =
|
||||||
|
typeof output === 'object' && output !== null && 'truncated' in output
|
||||||
|
? Boolean((output as { truncated: unknown }).truncated)
|
||||||
|
: false;
|
||||||
|
return { output, truncated };
|
||||||
|
} catch (err) {
|
||||||
|
if (err instanceof PathScopeError) {
|
||||||
|
return { output: null, truncated: false, error: err.message };
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
output: null,
|
||||||
|
truncated: false,
|
||||||
|
error: err instanceof Error ? err.message : String(err),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function executeToolPhase(
|
||||||
|
ctx: InferenceContext,
|
||||||
|
args: TurnArgs,
|
||||||
|
result: StreamResult,
|
||||||
|
startedAt: string | null,
|
||||||
|
session: Session,
|
||||||
|
projectRoot: string
|
||||||
|
): Promise<void> {
|
||||||
|
const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args;
|
||||||
|
const { content, toolCalls, promptTokens, completionTokens } = result;
|
||||||
|
|
||||||
|
// v1.11.3: ctx_max comes from llama-swap /upstream/<model>/props, not the
|
||||||
|
// streaming completion (which doesn't emit n_ctx). getModelContext caches
|
||||||
|
// the positive lookup for the process lifetime, so this is a single Map
|
||||||
|
// hit after the first invocation per model.
|
||||||
|
const mctx = await modelContext.getModelContext(session.model);
|
||||||
|
const nCtx = mctx?.n_ctx ?? null;
|
||||||
|
|
||||||
|
const [updated] = await ctx.sql<
|
||||||
|
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
||||||
|
>`
|
||||||
|
UPDATE messages
|
||||||
|
SET content = ${content},
|
||||||
|
status = 'complete',
|
||||||
|
tool_calls = ${ctx.sql.json(toolCalls as never)},
|
||||||
|
tokens_used = ${completionTokens},
|
||||||
|
ctx_used = ${promptTokens},
|
||||||
|
ctx_max = ${nCtx},
|
||||||
|
finished_at = clock_timestamp()
|
||||||
|
WHERE id = ${assistantMessageId}
|
||||||
|
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
||||||
|
`;
|
||||||
|
// v1.11: flag for compaction if this turn pushed us over the usable budget.
|
||||||
|
// We never compact mid-loop (the recursive runAssistantTurn keeps tools
|
||||||
|
// flowing); the flag fires on the NEXT turn's pre-fetch hook above.
|
||||||
|
await maybeFlagForCompaction(ctx, chatId, updated);
|
||||||
|
const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
||||||
|
UPDATE sessions SET updated_at = clock_timestamp()
|
||||||
|
WHERE id = ${sessionId}
|
||||||
|
RETURNING project_id, name, updated_at
|
||||||
|
`;
|
||||||
|
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at });
|
||||||
|
for (const tc of toolCalls) {
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'tool_call',
|
||||||
|
message_id: assistantMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
tool_call: tc,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'message_complete',
|
||||||
|
message_id: assistantMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
tokens_used: updated?.tokens_used ?? null,
|
||||||
|
ctx_used: updated?.ctx_used ?? null,
|
||||||
|
ctx_max: updated?.ctx_max ?? null,
|
||||||
|
started_at: startedAt,
|
||||||
|
finished_at: updated?.finished_at ?? null,
|
||||||
|
model: session.model,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Batch 9.7: ask_user_input pauses the loop. The tool row is still inserted
|
||||||
|
// (the answer endpoint needs a target row to UPDATE), but tool_results is
|
||||||
|
// pre-stamped with output=null as a "pending" sentinel and no tool_result
|
||||||
|
// frame goes out — the card renders from the tool_call frame alone. Mixed
|
||||||
|
// batches still execute the other tools normally.
|
||||||
|
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'tool_running', at: new Date().toISOString() });
|
||||||
|
let pausingForUserInput = false;
|
||||||
|
await Promise.all(
|
||||||
|
toolCalls.map(async (tc) => {
|
||||||
|
const [toolRow] = await ctx.sql<{ id: string }[]>`
|
||||||
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||||
|
VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp())
|
||||||
|
RETURNING id
|
||||||
|
`;
|
||||||
|
const toolMessageId = toolRow!.id;
|
||||||
|
if (tc.name === 'ask_user_input') {
|
||||||
|
pausingForUserInput = true;
|
||||||
|
const sentinel = { tool_call_id: tc.id, output: null, truncated: false };
|
||||||
|
await ctx.sql`
|
||||||
|
UPDATE messages
|
||||||
|
SET tool_results = ${ctx.sql.json(sentinel as never)}
|
||||||
|
WHERE id = ${toolMessageId}
|
||||||
|
`;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const tres = await executeToolCall(projectRoot, tc);
|
||||||
|
const stored = {
|
||||||
|
tool_call_id: tc.id,
|
||||||
|
output: tres.output,
|
||||||
|
truncated: tres.truncated,
|
||||||
|
...(tres.error ? { error: tres.error } : {}),
|
||||||
|
};
|
||||||
|
await ctx.sql`
|
||||||
|
UPDATE messages
|
||||||
|
SET tool_results = ${ctx.sql.json(stored as never)}
|
||||||
|
WHERE id = ${toolMessageId}
|
||||||
|
`;
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'tool_result',
|
||||||
|
tool_message_id: toolMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
tool_call_id: tc.id,
|
||||||
|
output: tres.output,
|
||||||
|
truncated: tres.truncated,
|
||||||
|
...(tres.error ? { error: tres.error } : {}),
|
||||||
|
});
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
if (pausingForUserInput) {
|
||||||
|
ctx.publishUser({
|
||||||
|
type: 'chat_status',
|
||||||
|
chat_id: chatId,
|
||||||
|
status: 'waiting_for_input',
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
ctx.log.info(
|
||||||
|
{ sessionId, chatId, assistantMessageId },
|
||||||
|
'inference paused awaiting user input',
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [nextAssistant] = await ctx.sql<{ id: string }[]>`
|
||||||
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||||
|
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
||||||
|
RETURNING id
|
||||||
|
`;
|
||||||
|
await runAssistantTurn(ctx, {
|
||||||
|
sessionId,
|
||||||
|
chatId,
|
||||||
|
assistantMessageId: nextAssistant!.id,
|
||||||
|
// v1.8.2: charge this turn's actual tool invocations against the budget.
|
||||||
|
// One assistant message can emit multiple tool_calls, so we add the run
|
||||||
|
// count, not 1. The next turn's budget check sees the cumulative total.
|
||||||
|
toolsUsed: toolsUsed + result.toolCalls.length,
|
||||||
|
// v1.11.6: append the just-executed tool calls to the per-turn history
|
||||||
|
// so the next runAssistantTurn's doom-loop check can see them. We don't
|
||||||
|
// cap the array length here — per-turn budgets keep it bounded
|
||||||
|
// (typically <30 entries), and slicing happens inside detectDoomLoop.
|
||||||
|
recentToolCalls: [...args.recentToolCalls, ...result.toolCalls],
|
||||||
|
signal,
|
||||||
|
});
|
||||||
|
}
|
||||||
13
apps/server/src/services/inference/types.ts
Normal file
13
apps/server/src/services/inference/types.ts
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
// v1.12.4: shared inter-phase types/constants for the extracted phase files.
|
||||||
|
// Lives here so stream-phase, tool-phase, and the summary functions still in
|
||||||
|
// inference.ts can all reference the same definitions without circular imports.
|
||||||
|
|
||||||
|
export interface StreamPhaseState {
|
||||||
|
accumulated: string;
|
||||||
|
startedAt: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 500ms keeps the DB UPDATE rate bounded under heavy streaming. Used by
|
||||||
|
// executeStreamPhase, runCapHitSummary, and runDoomLoopSummary — every site
|
||||||
|
// that does a debounced content flush during streaming.
|
||||||
|
export const DB_FLUSH_INTERVAL_MS = 500;
|
||||||
Reference in New Issue
Block a user