Compare commits
3 Commits
ea468ca7fb
...
v1.12.4-in
| Author | SHA1 | Date | |
|---|---|---|---|
| 9ef00c0268 | |||
| c87df6981a | |||
| 8fa7b7fce9 |
@@ -16,7 +16,7 @@ import { registerWebSocket } from './routes/ws.js';
|
||||
import { registerModelRoutes } from './routes/models.js';
|
||||
import { registerAgentRoutes } from './routes/agents.js';
|
||||
import { registerSkillsRoutes } from './routes/skills.js';
|
||||
import { createInferenceRunner } from './services/inference.js';
|
||||
import { createInferenceRunner } from './services/inference/index.js';
|
||||
import { createBroker } from './services/broker.js';
|
||||
import { listSkills } from './services/skills.js';
|
||||
import * as compaction from './services/compaction.js';
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { DOOM_LOOP_THRESHOLD, detectDoomLoop } from '../inference.js';
|
||||
import { DOOM_LOOP_THRESHOLD, detectDoomLoop } from '../inference/index.js';
|
||||
import type { ToolCall } from '../../types/api.js';
|
||||
|
||||
// ---- fixture ----------------------------------------------------------------
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { buildMessagesPayload } from '../inference.js';
|
||||
import { buildMessagesPayload } from '../inference/index.js';
|
||||
import type {
|
||||
Message,
|
||||
MessageRole,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { InferenceContext } from './inference.js';
|
||||
import type { InferenceContext } from './inference/index.js';
|
||||
|
||||
const NAMING_SYSTEM_PROMPT =
|
||||
'You name chat sessions. Reply directly with no thinking, reasoning, or explanation. Output ONLY the title, 4 words max, no quotes, no punctuation, no prefix like "Title:".';
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
148
apps/server/src/services/inference/error-handler.ts
Normal file
148
apps/server/src/services/inference/error-handler.ts
Normal file
@@ -0,0 +1,148 @@
|
||||
import type { MessageMetadata, Session } from '../../types/api.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import { maybeFlagForCompaction } from './payload.js';
|
||||
import type { InferenceContext, StreamResult, TurnArgs } from './turn.js';
|
||||
|
||||
export async function handleAbortOrError(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
accumulated: string,
|
||||
err: unknown
|
||||
): Promise<void> {
|
||||
const { sessionId, chatId, assistantMessageId } = args;
|
||||
const isAbort = err instanceof Error && err.name === 'AbortError';
|
||||
const finalStatus = isAbort ? 'cancelled' : 'failed';
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
// v1.8.2: persist a structured error metadata blob on genuine failures so
|
||||
// the bubble can render the reason on reload without re-deriving from the
|
||||
// (one-shot) WS error frame. User-initiated abort skips this — there's no
|
||||
// "reason" to surface for a stop the user already explicitly chose.
|
||||
const errorMetadata: MessageMetadata | null = isAbort
|
||||
? null
|
||||
: { kind: 'error', error_reason: 'llm_provider_error', error_text: errMsg };
|
||||
if (errorMetadata) {
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET status = ${finalStatus},
|
||||
content = ${accumulated},
|
||||
finished_at = clock_timestamp(),
|
||||
metadata = ${ctx.sql.json(errorMetadata as never)}
|
||||
WHERE id = ${assistantMessageId}
|
||||
`;
|
||||
} else {
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET status = ${finalStatus},
|
||||
content = ${accumulated},
|
||||
finished_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
`;
|
||||
}
|
||||
const [failSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
||||
UPDATE sessions SET updated_at = clock_timestamp()
|
||||
WHERE id = ${sessionId}
|
||||
RETURNING project_id, name, updated_at
|
||||
`;
|
||||
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at });
|
||||
// v1.8 mobile-tabs: cancellation is a user-initiated stop, treat as idle;
|
||||
// genuine errors flip the dot red. v1.8.2: error path also carries a
|
||||
// machine-readable `reason` so the UI can render specifics inline.
|
||||
if (isAbort) {
|
||||
// v1.12.1: defensive cancellation write. The status=${finalStatus} UPDATE
|
||||
// above already sets 'cancelled' for the AbortError case, but a row can
|
||||
// leak as 'streaming' when the abort fires between the post-tool-phase
|
||||
// INSERT (executeToolPhase) and the next runAssistantTurn's stream setup,
|
||||
// bypassing the try/catch around executeStreamPhase. The status guard
|
||||
// makes this a no-op when the earlier write already landed.
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET status = 'cancelled', content = ${accumulated}, finished_at = clock_timestamp()
|
||||
WHERE id = ${args.assistantMessageId} AND status = 'streaming'
|
||||
`;
|
||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
});
|
||||
ctx.log.info({ sessionId, chatId, assistantMessageId }, 'inference cancelled');
|
||||
} else {
|
||||
ctx.publishUser({
|
||||
type: 'chat_status',
|
||||
chat_id: chatId,
|
||||
status: 'error',
|
||||
at: new Date().toISOString(),
|
||||
reason: 'llm_provider_error',
|
||||
});
|
||||
ctx.publish(sessionId, {
|
||||
type: 'error',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
error: errMsg,
|
||||
reason: 'llm_provider_error',
|
||||
});
|
||||
ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed');
|
||||
}
|
||||
}
|
||||
|
||||
export async function finalizeCompletion(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
result: StreamResult,
|
||||
startedAt: string | null,
|
||||
session: Session
|
||||
): Promise<void> {
|
||||
const { sessionId, chatId, assistantMessageId } = args;
|
||||
const { content, finishReason, promptTokens, completionTokens } = result;
|
||||
|
||||
// v1.11.3: see executeToolPhase for the rationale.
|
||||
const mctx = await modelContext.getModelContext(session.model);
|
||||
const nCtx = mctx?.n_ctx ?? null;
|
||||
|
||||
const [updated] = await ctx.sql<
|
||||
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
||||
>`
|
||||
UPDATE messages
|
||||
SET content = ${content},
|
||||
status = 'complete',
|
||||
tokens_used = ${completionTokens},
|
||||
ctx_used = ${promptTokens},
|
||||
ctx_max = ${nCtx},
|
||||
finished_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
||||
`;
|
||||
// v1.11: flag for compaction on the terminal turn too. Catches the common
|
||||
// case of a turn that hit the limit without invoking tools.
|
||||
await maybeFlagForCompaction(ctx, chatId, updated);
|
||||
const [completeSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
||||
UPDATE sessions SET updated_at = clock_timestamp()
|
||||
WHERE id = ${sessionId}
|
||||
RETURNING project_id, name, updated_at
|
||||
`;
|
||||
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: completeSessRow!.project_id, name: completeSessRow!.name, updated_at: completeSessRow!.updated_at });
|
||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
tokens_used: updated?.tokens_used ?? null,
|
||||
ctx_used: updated?.ctx_used ?? null,
|
||||
ctx_max: updated?.ctx_max ?? null,
|
||||
started_at: startedAt,
|
||||
finished_at: updated?.finished_at ?? null,
|
||||
model: session.model,
|
||||
});
|
||||
ctx.log.info(
|
||||
{
|
||||
sessionId,
|
||||
chatId,
|
||||
assistantMessageId,
|
||||
finishReason,
|
||||
chars: content.length,
|
||||
tokens_used: updated?.tokens_used,
|
||||
ctx_used: updated?.ctx_used,
|
||||
},
|
||||
'inference complete'
|
||||
);
|
||||
}
|
||||
20
apps/server/src/services/inference/index.ts
Normal file
20
apps/server/src/services/inference/index.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
// v1.12.4: re-export shim. Outside callers (apps/server/src/index.ts and the
|
||||
// vitest inference tests) import from './services/inference/index.js'. The
|
||||
// directory is now the public surface; turn.ts holds runAssistantTurn /
|
||||
// runInference / createInferenceRunner while the other inference/*.ts files
|
||||
// stay implementation-private.
|
||||
|
||||
export {
|
||||
createInferenceRunner,
|
||||
runAssistantTurn,
|
||||
runInference,
|
||||
} from './turn.js';
|
||||
export type {
|
||||
FramePublisher,
|
||||
InferenceContext,
|
||||
InferenceFrame,
|
||||
StreamResult,
|
||||
TurnArgs,
|
||||
} from './turn.js';
|
||||
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js';
|
||||
export { buildMessagesPayload } from './payload.js';
|
||||
155
apps/server/src/services/inference/payload.ts
Normal file
155
apps/server/src/services/inference/payload.ts
Normal file
@@ -0,0 +1,155 @@
|
||||
import type { Sql } from '../../db.js';
|
||||
import type {
|
||||
Agent,
|
||||
Message,
|
||||
Project,
|
||||
Session,
|
||||
} from '../../types/api.js';
|
||||
import * as compaction from '../compaction.js';
|
||||
import { buildSystemPrompt } from '../system-prompt.js';
|
||||
import { isAnySentinel } from './sentinels.js';
|
||||
import type { InferenceContext } from './turn.js';
|
||||
|
||||
export interface OpenAiMessage {
|
||||
role: 'system' | 'user' | 'assistant' | 'tool';
|
||||
content: string | null;
|
||||
tool_calls?: Array<{
|
||||
id: string;
|
||||
type: 'function';
|
||||
function: { name: string; arguments: string };
|
||||
}>;
|
||||
tool_call_id?: string;
|
||||
}
|
||||
|
||||
// v1.12: buildSystemPrompt lives in services/system-prompt.ts. It awaits the
|
||||
// container-guidance loader, so this function is async too and every call
|
||||
// site in inference.ts awaits the result.
|
||||
export async function buildMessagesPayload(
|
||||
session: Session,
|
||||
project: Project,
|
||||
history: Message[],
|
||||
agent: Agent | null = null
|
||||
): Promise<OpenAiMessage[]> {
|
||||
const out: OpenAiMessage[] = [];
|
||||
const systemPrompt = await buildSystemPrompt(project, session, agent);
|
||||
out.push({ role: 'system', content: systemPrompt });
|
||||
|
||||
// Find the latest compact marker — only send messages from that point onwards
|
||||
let startIdx = 0;
|
||||
for (let i = history.length - 1; i >= 0; i--) {
|
||||
if (history[i]!.kind === 'compact') {
|
||||
startIdx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (let i = startIdx; i < history.length; i++) {
|
||||
const m = history[i]!;
|
||||
if (m.kind === 'compact') {
|
||||
out.push({ role: 'system', content: m.content });
|
||||
continue;
|
||||
}
|
||||
// v1.8.2 / v1.11.6: cap-hit and doom-loop sentinels are UI-only — never
|
||||
// send them to the LLM. The synthetic instruction note lives only inside
|
||||
// the summary call's messages array and is never persisted, so on a
|
||||
// follow-up turn the model resumes with a clean context.
|
||||
if (isAnySentinel(m)) continue;
|
||||
if (m.role === 'assistant' && m.status === 'streaming') continue;
|
||||
if (m.role === 'assistant' && m.status === 'cancelled') continue;
|
||||
if (m.role === 'tool') {
|
||||
const tr = m.tool_results;
|
||||
if (!tr) continue;
|
||||
const outputText = tr.error
|
||||
? `error: ${tr.error}`
|
||||
: typeof tr.output === 'string'
|
||||
? tr.output
|
||||
: JSON.stringify(tr.output);
|
||||
out.push({
|
||||
role: 'tool',
|
||||
content: outputText,
|
||||
tool_call_id: tr.tool_call_id,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
if (m.role === 'assistant') {
|
||||
const msg: OpenAiMessage = {
|
||||
role: 'assistant',
|
||||
content: m.content && m.content.length > 0 ? m.content : null,
|
||||
};
|
||||
if (m.tool_calls && m.tool_calls.length > 0) {
|
||||
msg.tool_calls = m.tool_calls.map((tc) => ({
|
||||
id: tc.id,
|
||||
type: 'function' as const,
|
||||
function: { name: tc.name, arguments: JSON.stringify(tc.args) },
|
||||
}));
|
||||
}
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
out.push({ role: 'user', content: m.content });
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
export async function loadContext(
|
||||
sql: Sql,
|
||||
sessionId: string,
|
||||
chatId: string
|
||||
): Promise<{ session: Session; project: Project; history: Message[] } | null> {
|
||||
const sessionRows = await sql<Session[]>`
|
||||
SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at,
|
||||
agent_id, web_search_enabled
|
||||
FROM sessions WHERE id = ${sessionId}
|
||||
`;
|
||||
if (sessionRows.length === 0) return null;
|
||||
const session = sessionRows[0]!;
|
||||
|
||||
const projectRows = await sql<Project[]>`
|
||||
SELECT id, name, path, added_at, last_session_id, status, gitea_remote,
|
||||
default_system_prompt, default_web_search_enabled
|
||||
FROM projects WHERE id = ${session.project_id}
|
||||
`;
|
||||
if (projectRows.length === 0) return null;
|
||||
const project = projectRows[0]!;
|
||||
|
||||
// v1.11: filter compacted messages out of the inference assembly. The GET
|
||||
// /api/sessions/:id/messages endpoint still returns everything (so the UI
|
||||
// can show history with the summary card inline); only LLM payloads skip
|
||||
// compacted rows. compacted_at IS NULL keeps the active summary + tail.
|
||||
const history = await sql<Message[]>`
|
||||
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
|
||||
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata
|
||||
FROM messages
|
||||
WHERE chat_id = ${chatId} AND compacted_at IS NULL
|
||||
ORDER BY created_at ASC, id ASC
|
||||
`;
|
||||
|
||||
return { session, project, history };
|
||||
}
|
||||
|
||||
// v1.11: shared helper used after both finalizeCompletion and executeToolPhase
|
||||
// persist their token counts. Reads tokens off the just-UPDATEd row (which
|
||||
// the caller returns from RETURNING), runs compaction.isOverflow, and flips
|
||||
// chats.needs_compaction. The next runAssistantTurn invocation acts on it.
|
||||
// Silent on missing tokens — llama-swap occasionally omits usage on truncated
|
||||
// streams, and we'd rather miss one overflow than crash the inference path.
|
||||
export async function maybeFlagForCompaction(
|
||||
ctx: InferenceContext,
|
||||
chatId: string,
|
||||
updated: { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null } | undefined,
|
||||
): Promise<void> {
|
||||
if (!updated) return;
|
||||
const promptTokens = updated.ctx_used;
|
||||
const completionTokens = updated.tokens_used;
|
||||
const contextLimit = updated.ctx_max;
|
||||
if (typeof promptTokens !== 'number') return;
|
||||
if (typeof completionTokens !== 'number') return;
|
||||
if (typeof contextLimit !== 'number') return;
|
||||
const overflow = compaction.isOverflow(
|
||||
{ prompt_tokens: promptTokens, completion_tokens: completionTokens },
|
||||
contextLimit,
|
||||
);
|
||||
if (!overflow) return;
|
||||
await ctx.sql`UPDATE chats SET needs_compaction = true WHERE id = ${chatId}`;
|
||||
ctx.log.info({ chatId, promptTokens, completionTokens, contextLimit }, 'inference: flagged for compaction');
|
||||
}
|
||||
523
apps/server/src/services/inference/sentinel-summaries.ts
Normal file
523
apps/server/src/services/inference/sentinel-summaries.ts
Normal file
@@ -0,0 +1,523 @@
|
||||
import type {
|
||||
Agent,
|
||||
Message,
|
||||
MessageMetadata,
|
||||
Project,
|
||||
Session,
|
||||
} from '../../types/api.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import { buildMessagesPayload } from './payload.js';
|
||||
import { DOOM_LOOP_THRESHOLD } from './sentinels.js';
|
||||
import { streamCompletion } from './stream-phase.js';
|
||||
import { DB_FLUSH_INTERVAL_MS } from './types.js';
|
||||
import type {
|
||||
InferenceContext,
|
||||
StreamResult,
|
||||
TurnArgs,
|
||||
} from './turn.js';
|
||||
|
||||
// Synthetic system note appended to the cap-hit summary call. Verbatim from
|
||||
// the v1.8.2 spec — do not paraphrase: the model is more reliable when the
|
||||
// instruction is short, declarative, and identical across calls.
|
||||
const CAP_HIT_SUMMARY_NOTE = (limit: number) =>
|
||||
`You've reached the tool budget (${limit} calls). Produce the best answer you can with what you have. Do not call more tools.`;
|
||||
|
||||
const DOOM_LOOP_NOTE = (name: string) =>
|
||||
`You called ${name} with the same arguments ${DOOM_LOOP_THRESHOLD} times in a row. Stop calling it. Produce the best answer you can with what you have.`;
|
||||
|
||||
export async function runCapHitSummary(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
session: Session,
|
||||
project: Project,
|
||||
history: Message[],
|
||||
agent: Agent | null,
|
||||
budget: number,
|
||||
): Promise<void> {
|
||||
const { sessionId, chatId, assistantMessageId, signal } = args;
|
||||
|
||||
const messages = await buildMessagesPayload(session, project, history, agent);
|
||||
messages.push({ role: 'system', content: CAP_HIT_SUMMARY_NOTE(budget) });
|
||||
|
||||
const startedRow = await ctx.sql<{ started_at: string }[]>`
|
||||
UPDATE messages
|
||||
SET started_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING started_at
|
||||
`;
|
||||
const startedAt = startedRow[0]?.started_at ?? null;
|
||||
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_started',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
role: 'assistant',
|
||||
});
|
||||
|
||||
let accumulated = '';
|
||||
let pendingFlushTimer: NodeJS.Timeout | null = null;
|
||||
let flushPromise: Promise<unknown> = Promise.resolve();
|
||||
const flushNow = () => {
|
||||
if (pendingFlushTimer) {
|
||||
clearTimeout(pendingFlushTimer);
|
||||
pendingFlushTimer = null;
|
||||
}
|
||||
const snapshot = accumulated;
|
||||
flushPromise = flushPromise.then(() =>
|
||||
ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}`
|
||||
);
|
||||
};
|
||||
const scheduleFlush = () => {
|
||||
if (pendingFlushTimer) return;
|
||||
pendingFlushTimer = setTimeout(() => {
|
||||
pendingFlushTimer = null;
|
||||
flushNow();
|
||||
}, DB_FLUSH_INTERVAL_MS);
|
||||
};
|
||||
|
||||
let summaryOk = false;
|
||||
let summarySoftCancelled = false;
|
||||
let summaryError: string | null = null;
|
||||
let result: StreamResult | null = null;
|
||||
try {
|
||||
result = await streamCompletion(
|
||||
ctx,
|
||||
session.model,
|
||||
messages,
|
||||
{ tools: null, temperature: agent?.temperature },
|
||||
(delta) => {
|
||||
accumulated += delta;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'delta',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
content: delta,
|
||||
});
|
||||
scheduleFlush();
|
||||
},
|
||||
undefined,
|
||||
signal,
|
||||
);
|
||||
summaryOk = true;
|
||||
} catch (err) {
|
||||
if (err instanceof Error && err.name === 'AbortError') {
|
||||
summarySoftCancelled = true;
|
||||
} else {
|
||||
summaryError = err instanceof Error ? err.message : String(err);
|
||||
}
|
||||
} finally {
|
||||
if (pendingFlushTimer) {
|
||||
clearTimeout(pendingFlushTimer);
|
||||
pendingFlushTimer = null;
|
||||
}
|
||||
await flushPromise;
|
||||
}
|
||||
|
||||
// Finalize the summary message based on the three outcomes. The sentinel
|
||||
// is inserted regardless so the user always has the Continue affordance —
|
||||
// even on a partial / failed summary the chat history shows where the
|
||||
// budget was hit.
|
||||
if (summaryOk && result) {
|
||||
// v1.11.3: see executeToolPhase for the rationale.
|
||||
const mctx = await modelContext.getModelContext(session.model);
|
||||
const nCtx = mctx?.n_ctx ?? null;
|
||||
const [updated] = await ctx.sql<
|
||||
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
||||
>`
|
||||
UPDATE messages
|
||||
SET content = ${result.content},
|
||||
status = 'complete',
|
||||
tokens_used = ${result.completionTokens},
|
||||
ctx_used = ${result.promptTokens},
|
||||
ctx_max = ${nCtx},
|
||||
finished_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
tokens_used: updated?.tokens_used ?? null,
|
||||
ctx_used: updated?.ctx_used ?? null,
|
||||
ctx_max: updated?.ctx_max ?? null,
|
||||
started_at: startedAt,
|
||||
finished_at: updated?.finished_at ?? null,
|
||||
model: session.model,
|
||||
});
|
||||
} else if (summarySoftCancelled) {
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET content = ${accumulated},
|
||||
status = 'cancelled',
|
||||
finished_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
});
|
||||
} else {
|
||||
const errMeta: MessageMetadata = {
|
||||
kind: 'error',
|
||||
error_reason: 'summary_after_cap_failed',
|
||||
error_text: summaryError ?? 'summary failed',
|
||||
};
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET content = ${accumulated},
|
||||
status = 'failed',
|
||||
finished_at = clock_timestamp(),
|
||||
metadata = ${ctx.sql.json(errMeta as never)}
|
||||
WHERE id = ${assistantMessageId}
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'error',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
error: summaryError ?? 'summary failed',
|
||||
reason: 'summary_after_cap_failed',
|
||||
});
|
||||
}
|
||||
|
||||
// Bump session/chat updated_at exactly once for this turn.
|
||||
const [sessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
||||
UPDATE sessions SET updated_at = clock_timestamp()
|
||||
WHERE id = ${sessionId}
|
||||
RETURNING project_id, name, updated_at
|
||||
`;
|
||||
ctx.publishUser({
|
||||
type: 'session_updated',
|
||||
session_id: sessionId,
|
||||
project_id: sessRow!.project_id,
|
||||
name: sessRow!.name,
|
||||
updated_at: sessRow!.updated_at,
|
||||
});
|
||||
|
||||
await insertCapHitSentinel(ctx, sessionId, chatId, agent, budget);
|
||||
|
||||
// Status frame fires last so the dot color reflects the terminal state.
|
||||
// Success → idle, abort → idle (user-driven stop), error → error+reason.
|
||||
if (summaryOk) {
|
||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
||||
} else if (summarySoftCancelled) {
|
||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
||||
} else {
|
||||
ctx.publishUser({
|
||||
type: 'chat_status',
|
||||
chat_id: chatId,
|
||||
status: 'error',
|
||||
at: new Date().toISOString(),
|
||||
reason: 'summary_after_cap_failed',
|
||||
});
|
||||
}
|
||||
|
||||
ctx.log.info(
|
||||
{ sessionId, chatId, assistantMessageId, budget, summaryOk, summaryCancelled: summarySoftCancelled },
|
||||
'inference cap-hit summary finished',
|
||||
);
|
||||
}
|
||||
|
||||
async function insertCapHitSentinel(
|
||||
ctx: InferenceContext,
|
||||
sessionId: string,
|
||||
chatId: string,
|
||||
agent: Agent | null,
|
||||
budget: number,
|
||||
): Promise<void> {
|
||||
// Hard ceiling: count prior cap_hit sentinels in this chat. After two
|
||||
// continues (sentinel count of 2), the next sentinel reports can_continue
|
||||
// false and the UI disables the Continue button.
|
||||
const priorRows = await ctx.sql<{ count: number }[]>`
|
||||
SELECT COUNT(*)::int AS count
|
||||
FROM messages
|
||||
WHERE chat_id = ${chatId}
|
||||
AND role = 'system'
|
||||
AND metadata->>'kind' = 'cap_hit'
|
||||
`;
|
||||
const priorCount = priorRows[0]?.count ?? 0;
|
||||
const canContinue = priorCount < 2;
|
||||
const metadata: MessageMetadata = {
|
||||
kind: 'cap_hit',
|
||||
used: budget,
|
||||
limit: budget,
|
||||
agent_name: agent?.name ?? null,
|
||||
can_continue: canContinue,
|
||||
};
|
||||
const content = `Reached tool budget (${budget}/${budget}). Continue to extend.`;
|
||||
|
||||
const [row] = await ctx.sql<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at, metadata)
|
||||
VALUES (${sessionId}, ${chatId}, 'system', ${content}, 'complete', clock_timestamp(), ${ctx.sql.json(metadata as never)})
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
// The sentinel content is static, but we still walk the standard frame
|
||||
// sequence (started → delta → complete) so useSessionStream's reducer
|
||||
// appends it via the same path it uses for streaming assistant messages.
|
||||
// The delta carries the full text in one chunk.
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_started',
|
||||
message_id: row!.id,
|
||||
chat_id: chatId,
|
||||
role: 'system',
|
||||
});
|
||||
ctx.publish(sessionId, {
|
||||
type: 'delta',
|
||||
message_id: row!.id,
|
||||
chat_id: chatId,
|
||||
content,
|
||||
});
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: row!.id,
|
||||
chat_id: chatId,
|
||||
metadata,
|
||||
});
|
||||
}
|
||||
|
||||
// v1.11.6: doom-loop wrap-up. Mirrors runCapHitSummary structurally — same
|
||||
// in-flight-slot reuse, same tools-disabled streaming-summary call, same
|
||||
// post-finalize sentinel insert + chat_status drop. Differences:
|
||||
// - synthetic note text comes from DOOM_LOOP_NOTE (names the looping tool)
|
||||
// - sentinel metadata is { kind: 'doom_loop', tool_name, args, threshold }
|
||||
// and has no Continue affordance (manual retry would just re-loop)
|
||||
// - chat_status error path uses reason: 'doom_loop_summary_failed'
|
||||
// Kept as a clone rather than refactored into a shared helper because the
|
||||
// two summary paths still differ in error reason + sentinel shape; a third
|
||||
// sentinel would justify factoring out runWrapUpSummary(opts).
|
||||
export async function runDoomLoopSummary(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
session: Session,
|
||||
project: Project,
|
||||
history: Message[],
|
||||
agent: Agent | null,
|
||||
loop: { name: string; args: Record<string, unknown> },
|
||||
): Promise<void> {
|
||||
const { sessionId, chatId, assistantMessageId, signal } = args;
|
||||
|
||||
const messages = await buildMessagesPayload(session, project, history, agent);
|
||||
messages.push({ role: 'system', content: DOOM_LOOP_NOTE(loop.name) });
|
||||
|
||||
const startedRow = await ctx.sql<{ started_at: string }[]>`
|
||||
UPDATE messages
|
||||
SET started_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING started_at
|
||||
`;
|
||||
const startedAt = startedRow[0]?.started_at ?? null;
|
||||
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_started',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
role: 'assistant',
|
||||
});
|
||||
|
||||
let accumulated = '';
|
||||
let pendingFlushTimer: NodeJS.Timeout | null = null;
|
||||
let flushPromise: Promise<unknown> = Promise.resolve();
|
||||
const flushNow = () => {
|
||||
if (pendingFlushTimer) {
|
||||
clearTimeout(pendingFlushTimer);
|
||||
pendingFlushTimer = null;
|
||||
}
|
||||
const snapshot = accumulated;
|
||||
flushPromise = flushPromise.then(() =>
|
||||
ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}`
|
||||
);
|
||||
};
|
||||
const scheduleFlush = () => {
|
||||
if (pendingFlushTimer) return;
|
||||
pendingFlushTimer = setTimeout(() => {
|
||||
pendingFlushTimer = null;
|
||||
flushNow();
|
||||
}, DB_FLUSH_INTERVAL_MS);
|
||||
};
|
||||
|
||||
let summaryOk = false;
|
||||
let summarySoftCancelled = false;
|
||||
let summaryError: string | null = null;
|
||||
let result: StreamResult | null = null;
|
||||
try {
|
||||
result = await streamCompletion(
|
||||
ctx,
|
||||
session.model,
|
||||
messages,
|
||||
{ tools: null, temperature: agent?.temperature },
|
||||
(delta) => {
|
||||
accumulated += delta;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'delta',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
content: delta,
|
||||
});
|
||||
scheduleFlush();
|
||||
},
|
||||
undefined,
|
||||
signal,
|
||||
);
|
||||
summaryOk = true;
|
||||
} catch (err) {
|
||||
if (err instanceof Error && err.name === 'AbortError') {
|
||||
summarySoftCancelled = true;
|
||||
} else {
|
||||
summaryError = err instanceof Error ? err.message : String(err);
|
||||
}
|
||||
} finally {
|
||||
if (pendingFlushTimer) {
|
||||
clearTimeout(pendingFlushTimer);
|
||||
pendingFlushTimer = null;
|
||||
}
|
||||
await flushPromise;
|
||||
}
|
||||
|
||||
if (summaryOk && result) {
|
||||
const mctx = await modelContext.getModelContext(session.model);
|
||||
const nCtx = mctx?.n_ctx ?? null;
|
||||
const [updated] = await ctx.sql<
|
||||
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
||||
>`
|
||||
UPDATE messages
|
||||
SET content = ${result.content},
|
||||
status = 'complete',
|
||||
tokens_used = ${result.completionTokens},
|
||||
ctx_used = ${result.promptTokens},
|
||||
ctx_max = ${nCtx},
|
||||
finished_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
tokens_used: updated?.tokens_used ?? null,
|
||||
ctx_used: updated?.ctx_used ?? null,
|
||||
ctx_max: updated?.ctx_max ?? null,
|
||||
started_at: startedAt,
|
||||
finished_at: updated?.finished_at ?? null,
|
||||
model: session.model,
|
||||
});
|
||||
} else if (summarySoftCancelled) {
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET content = ${accumulated},
|
||||
status = 'cancelled',
|
||||
finished_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
});
|
||||
} else {
|
||||
// Doom-loop summary failure reuses the existing summary_after_cap_failed
|
||||
// error reason — the ErrorReason union is shared between sentinel paths
|
||||
// and the UI surfaces a generic "summary failed" line for both. We don't
|
||||
// add a new reason code because the user-visible failure mode is the
|
||||
// same (model gave up mid-summary). Sentinel below still fires.
|
||||
const errMeta: MessageMetadata = {
|
||||
kind: 'error',
|
||||
error_reason: 'summary_after_cap_failed',
|
||||
error_text: summaryError ?? 'doom-loop summary failed',
|
||||
};
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET content = ${accumulated},
|
||||
status = 'failed',
|
||||
finished_at = clock_timestamp(),
|
||||
metadata = ${ctx.sql.json(errMeta as never)}
|
||||
WHERE id = ${assistantMessageId}
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'error',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
error: summaryError ?? 'doom-loop summary failed',
|
||||
reason: 'summary_after_cap_failed',
|
||||
});
|
||||
}
|
||||
|
||||
const [sessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
||||
UPDATE sessions SET updated_at = clock_timestamp()
|
||||
WHERE id = ${sessionId}
|
||||
RETURNING project_id, name, updated_at
|
||||
`;
|
||||
ctx.publishUser({
|
||||
type: 'session_updated',
|
||||
session_id: sessionId,
|
||||
project_id: sessRow!.project_id,
|
||||
name: sessRow!.name,
|
||||
updated_at: sessRow!.updated_at,
|
||||
});
|
||||
|
||||
await insertDoomLoopSentinel(ctx, sessionId, chatId, loop);
|
||||
|
||||
if (summaryOk || summarySoftCancelled) {
|
||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
||||
} else {
|
||||
ctx.publishUser({
|
||||
type: 'chat_status',
|
||||
chat_id: chatId,
|
||||
status: 'error',
|
||||
at: new Date().toISOString(),
|
||||
reason: 'summary_after_cap_failed',
|
||||
});
|
||||
}
|
||||
|
||||
ctx.log.info(
|
||||
{ sessionId, chatId, assistantMessageId, loopedTool: loop.name, summaryOk, summaryCancelled: summarySoftCancelled },
|
||||
'inference doom-loop summary finished',
|
||||
);
|
||||
}
|
||||
|
||||
async function insertDoomLoopSentinel(
|
||||
ctx: InferenceContext,
|
||||
sessionId: string,
|
||||
chatId: string,
|
||||
loop: { name: string; args: Record<string, unknown> },
|
||||
): Promise<void> {
|
||||
// No hard-ceiling / can-continue logic here — doom-loop is a different
|
||||
// failure mode from cap-hit. Continuing would re-trigger the loop with
|
||||
// the same tools available; the user needs to restate their question
|
||||
// or switch agents instead.
|
||||
const metadata: MessageMetadata = {
|
||||
kind: 'doom_loop',
|
||||
tool_name: loop.name,
|
||||
args: loop.args,
|
||||
threshold: DOOM_LOOP_THRESHOLD,
|
||||
};
|
||||
const content = `Detected ${DOOM_LOOP_THRESHOLD} identical calls to ${loop.name}. Stopping the tool-call loop. Produce the best answer you can with what you have.`;
|
||||
|
||||
const [row] = await ctx.sql<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at, metadata)
|
||||
VALUES (${sessionId}, ${chatId}, 'system', ${content}, 'complete', clock_timestamp(), ${ctx.sql.json(metadata as never)})
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
// Standard frame sequence — same as cap-hit sentinel — so
|
||||
// useSessionStream's reducer appends the row via the existing path.
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_started',
|
||||
message_id: row!.id,
|
||||
chat_id: chatId,
|
||||
role: 'system',
|
||||
});
|
||||
ctx.publish(sessionId, {
|
||||
type: 'delta',
|
||||
message_id: row!.id,
|
||||
chat_id: chatId,
|
||||
content,
|
||||
});
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: row!.id,
|
||||
chat_id: chatId,
|
||||
metadata,
|
||||
});
|
||||
}
|
||||
380
apps/server/src/services/inference/stream-phase.ts
Normal file
380
apps/server/src/services/inference/stream-phase.ts
Normal file
@@ -0,0 +1,380 @@
|
||||
import type {
|
||||
Agent,
|
||||
Session,
|
||||
ToolCall,
|
||||
} from '../../types/api.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import { toolJsonSchemas, type ToolJsonSchema } from '../tools.js';
|
||||
import type { OpenAiMessage } from './payload.js';
|
||||
import {
|
||||
XML_TOOL_CLOSE,
|
||||
XML_TOOL_OPEN,
|
||||
parseXmlToolCall,
|
||||
partialXmlOpenerStart,
|
||||
} from './xml-parser.js';
|
||||
import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js';
|
||||
import type {
|
||||
InferenceContext,
|
||||
StreamResult,
|
||||
TurnArgs,
|
||||
} from './turn.js';
|
||||
|
||||
interface ChatCompletionDelta {
|
||||
role?: string;
|
||||
content?: string | null;
|
||||
tool_calls?: Array<{
|
||||
index: number;
|
||||
id?: string;
|
||||
type?: 'function';
|
||||
function?: { name?: string; arguments?: string };
|
||||
}>;
|
||||
}
|
||||
|
||||
interface ChatCompletionChunk {
|
||||
choices?: Array<{
|
||||
delta: ChatCompletionDelta;
|
||||
finish_reason: string | null;
|
||||
}>;
|
||||
usage?: {
|
||||
prompt_tokens?: number;
|
||||
completion_tokens?: number;
|
||||
total_tokens?: number;
|
||||
};
|
||||
}
|
||||
|
||||
interface StreamOptions {
|
||||
// null = omit tools entirely (compact phase); [] = caller stripped all tools
|
||||
// (rare; we still omit from the request body to avoid OpenAI 400).
|
||||
tools: ToolJsonSchema[] | null;
|
||||
temperature?: number;
|
||||
}
|
||||
|
||||
async function* sseLines(stream: ReadableStream<Uint8Array>): AsyncGenerator<string> {
|
||||
const reader = stream.getReader();
|
||||
const decoder = new TextDecoder('utf-8');
|
||||
let buffer = '';
|
||||
try {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
let idx;
|
||||
while ((idx = buffer.indexOf('\n')) >= 0) {
|
||||
const line = buffer.slice(0, idx).replace(/\r$/, '');
|
||||
buffer = buffer.slice(idx + 1);
|
||||
if (line.length === 0) continue;
|
||||
yield line;
|
||||
}
|
||||
}
|
||||
if (buffer.length > 0) yield buffer;
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
// v1.10.5 Qwen-coder XML fallback. Some local models (notably qwen3-coder via
|
||||
// llama-swap) emit tool calls as inline XML inside delta.content rather than
|
||||
// the structured delta.tool_calls field. The XML shape is:
|
||||
// <tool_call>
|
||||
// <function=NAME>
|
||||
// <parameter=KEY>
|
||||
// VALUE
|
||||
// </parameter>
|
||||
// ...more parameters...
|
||||
// </function>
|
||||
// </tool_call>
|
||||
// Multiple <tool_call> blocks may appear back-to-back; they never nest.
|
||||
// streamCompletion buffers delta.content, extracts complete blocks, parses
|
||||
// them via parseXmlToolCall, and pushes synthetic entries into the existing
|
||||
// toolCallsBuffer alongside any native JSON-format tool calls.
|
||||
export async function streamCompletion(
|
||||
ctx: InferenceContext,
|
||||
model: string,
|
||||
messages: OpenAiMessage[],
|
||||
opts: StreamOptions,
|
||||
onDelta: (content: string) => void,
|
||||
onUsage: ((prompt: number | null, completion: number | null) => void) | undefined,
|
||||
signal?: AbortSignal
|
||||
): Promise<StreamResult> {
|
||||
const body: Record<string, unknown> = {
|
||||
model,
|
||||
messages,
|
||||
stream: true,
|
||||
stream_options: { include_usage: true },
|
||||
};
|
||||
if (opts.tools && opts.tools.length > 0) {
|
||||
body['tools'] = opts.tools;
|
||||
body['tool_choice'] = 'auto';
|
||||
}
|
||||
if (typeof opts.temperature === 'number') {
|
||||
body['temperature'] = opts.temperature;
|
||||
}
|
||||
|
||||
const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(body),
|
||||
signal,
|
||||
});
|
||||
if (!res.ok || !res.body) {
|
||||
const text = await res.text().catch(() => '');
|
||||
throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`);
|
||||
}
|
||||
|
||||
let content = '';
|
||||
// v1.10.5: holds delta.content bytes that may contain a partial XML tool
|
||||
// call. Anything not part of a (possibly forming) <tool_call>…</tool_call>
|
||||
// pair is flushed to content + onDelta as soon as we know it's safe.
|
||||
let pendingBuffer = '';
|
||||
let finishReason: string | null = null;
|
||||
let promptTokens: number | null = null;
|
||||
let completionTokens: number | null = null;
|
||||
const toolCallsBuffer = new Map<number, { id: string; name: string; argsText: string }>();
|
||||
|
||||
for await (const line of sseLines(res.body)) {
|
||||
if (!line.startsWith('data:')) continue;
|
||||
const payload = line.slice(5).trim();
|
||||
if (payload === '[DONE]') break;
|
||||
let parsed: ChatCompletionChunk;
|
||||
try {
|
||||
parsed = JSON.parse(payload);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (parsed.usage) {
|
||||
if (typeof parsed.usage.prompt_tokens === 'number') {
|
||||
promptTokens = parsed.usage.prompt_tokens;
|
||||
}
|
||||
if (typeof parsed.usage.completion_tokens === 'number') {
|
||||
completionTokens = parsed.usage.completion_tokens;
|
||||
}
|
||||
onUsage?.(promptTokens, completionTokens);
|
||||
}
|
||||
// v1.11.3: removed dead `parsed.timings.n_ctx` read. llama-server's
|
||||
// streaming completion does NOT emit n_ctx in timings (verified
|
||||
// empirically); the authoritative source is llama-swap's
|
||||
// /upstream/<model>/props endpoint, fetched per-turn via
|
||||
// model-context.getModelContext() at the finalization sites below.
|
||||
|
||||
const choice = parsed.choices?.[0];
|
||||
if (!choice) continue;
|
||||
const delta = choice.delta ?? {};
|
||||
if (typeof delta.content === 'string' && delta.content.length > 0) {
|
||||
// v1.10.5 XML fallback. Append, then extract any complete tool_call
|
||||
// blocks before deciding what's safe to flush as visible content.
|
||||
pendingBuffer += delta.content;
|
||||
while (true) {
|
||||
const startIdx = pendingBuffer.indexOf(XML_TOOL_OPEN);
|
||||
if (startIdx === -1) break;
|
||||
const closeIdx = pendingBuffer.indexOf(XML_TOOL_CLOSE, startIdx);
|
||||
if (closeIdx === -1) break;
|
||||
const blockEnd = closeIdx + XML_TOOL_CLOSE.length;
|
||||
const block = pendingBuffer.slice(startIdx, blockEnd);
|
||||
// Any text before the opener is plain content — flush it now.
|
||||
if (startIdx > 0) {
|
||||
const before = pendingBuffer.slice(0, startIdx);
|
||||
content += before;
|
||||
onDelta(before);
|
||||
}
|
||||
const parsedCall = parseXmlToolCall(block);
|
||||
if (parsedCall) {
|
||||
const synthIdx = toolCallsBuffer.size;
|
||||
toolCallsBuffer.set(synthIdx, {
|
||||
id: `xml_call_${synthIdx}`,
|
||||
name: parsedCall.name,
|
||||
argsText: JSON.stringify(parsedCall.args),
|
||||
});
|
||||
}
|
||||
// If parsing failed we still drop the block — emitting unparseable
|
||||
// XML to the chat would look worse than silently swallowing it.
|
||||
pendingBuffer = pendingBuffer.slice(blockEnd);
|
||||
}
|
||||
// After all complete blocks are out, hold back any (partial or full)
|
||||
// unclosed opener; flush the rest.
|
||||
const partialIdx = partialXmlOpenerStart(pendingBuffer);
|
||||
if (partialIdx >= 0) {
|
||||
if (partialIdx > 0) {
|
||||
const flush = pendingBuffer.slice(0, partialIdx);
|
||||
content += flush;
|
||||
onDelta(flush);
|
||||
}
|
||||
pendingBuffer = pendingBuffer.slice(partialIdx);
|
||||
} else if (pendingBuffer.length > 0) {
|
||||
content += pendingBuffer;
|
||||
onDelta(pendingBuffer);
|
||||
pendingBuffer = '';
|
||||
}
|
||||
}
|
||||
if (Array.isArray(delta.tool_calls)) {
|
||||
for (const tc of delta.tool_calls) {
|
||||
const idx = tc.index;
|
||||
const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' };
|
||||
if (tc.id) existing.id = tc.id;
|
||||
if (tc.function?.name) existing.name = tc.function.name;
|
||||
if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments;
|
||||
toolCallsBuffer.set(idx, existing);
|
||||
}
|
||||
}
|
||||
if (choice.finish_reason) finishReason = choice.finish_reason;
|
||||
}
|
||||
|
||||
// v1.10.5: if the stream ended mid-XML (e.g. model truncated, no closer
|
||||
// ever arrived), flush whatever was buffered as plain content so it isn't
|
||||
// silently dropped. Better to show a stray `<tool_call>` than vanish text.
|
||||
if (pendingBuffer.length > 0) {
|
||||
content += pendingBuffer;
|
||||
onDelta(pendingBuffer);
|
||||
pendingBuffer = '';
|
||||
}
|
||||
|
||||
const toolCalls: ToolCall[] = [];
|
||||
for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) {
|
||||
let args: Record<string, unknown> = {};
|
||||
if (t.argsText.length > 0) {
|
||||
try {
|
||||
args = JSON.parse(t.argsText);
|
||||
} catch {
|
||||
args = { _raw: t.argsText };
|
||||
}
|
||||
}
|
||||
toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args });
|
||||
}
|
||||
|
||||
return { finishReason, content, toolCalls, promptTokens, completionTokens };
|
||||
}
|
||||
|
||||
export async function executeStreamPhase(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
session: Session,
|
||||
messages: OpenAiMessage[],
|
||||
state: StreamPhaseState,
|
||||
agent: Agent | null,
|
||||
// v1.11.8: when false, web_search and web_fetch are stripped from the
|
||||
// tool list sent to the LLM, so the model can't even attempt them.
|
||||
webToolsEnabled: boolean,
|
||||
): Promise<StreamResult> {
|
||||
const { sessionId, chatId, assistantMessageId, signal } = args;
|
||||
|
||||
const startedRow = await ctx.sql<{ started_at: string }[]>`
|
||||
UPDATE messages
|
||||
SET started_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING started_at
|
||||
`;
|
||||
state.startedAt = startedRow[0]?.started_at ?? null;
|
||||
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_started',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
role: 'assistant',
|
||||
});
|
||||
|
||||
let pendingFlushTimer: NodeJS.Timeout | null = null;
|
||||
let flushPromise: Promise<unknown> = Promise.resolve();
|
||||
|
||||
const flushNow = () => {
|
||||
if (pendingFlushTimer) {
|
||||
clearTimeout(pendingFlushTimer);
|
||||
pendingFlushTimer = null;
|
||||
}
|
||||
const snapshot = state.accumulated;
|
||||
flushPromise = flushPromise.then(() =>
|
||||
ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}`
|
||||
);
|
||||
};
|
||||
|
||||
const scheduleFlush = () => {
|
||||
if (pendingFlushTimer) return;
|
||||
pendingFlushTimer = setTimeout(() => {
|
||||
pendingFlushTimer = null;
|
||||
flushNow();
|
||||
}, DB_FLUSH_INTERVAL_MS);
|
||||
};
|
||||
|
||||
// Tool whitelist: if an agent is set, filter the global tool list to only the
|
||||
// tool names it allows. Unknown names in agent.tools are dropped silently
|
||||
// (handled here by intersection). When no agent: send all tools.
|
||||
// v1.11.8: a second filter strips web_search + web_fetch unless the chat
|
||||
// has them explicitly enabled. Counts as an opt-in security boundary: the
|
||||
// model can't summon a tool that wasn't offered to it.
|
||||
const WEB_TOOL_NAMES: ReadonlySet<string> = new Set(['web_search', 'web_fetch']);
|
||||
const effectiveTools: ToolJsonSchema[] = (agent
|
||||
? toolJsonSchemas().filter((t) => agent.tools.includes(t.function.name))
|
||||
: toolJsonSchemas()
|
||||
).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name));
|
||||
const effectiveTemperature = agent?.temperature;
|
||||
|
||||
// v1.12.2: ctx_max lookup is cached after the first hit per model, so this
|
||||
// is a Map probe in steady state. We capture nCtx once at the top of the
|
||||
// stream so the throttled usage publish doesn't refetch each tick.
|
||||
const mctxForStream = await modelContext.getModelContext(session.model);
|
||||
const nCtxForStream = mctxForStream?.n_ctx ?? null;
|
||||
|
||||
// v1.12.2: throttle live usage publishes to ~500ms. The model can land
|
||||
// dozens of usage frames per second; without a throttle the WS turns into
|
||||
// a firehose for a few KB savings on each render.
|
||||
const USAGE_THROTTLE_MS = 500;
|
||||
let lastUsageAt = 0;
|
||||
let pendingUsage: { p: number | null; c: number | null } | null = null;
|
||||
let usageTimer: NodeJS.Timeout | null = null;
|
||||
const flushUsage = () => {
|
||||
if (!pendingUsage) return;
|
||||
const { p, c } = pendingUsage;
|
||||
pendingUsage = null;
|
||||
lastUsageAt = Date.now();
|
||||
ctx.publish(sessionId, {
|
||||
type: 'usage',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
completion_tokens: c,
|
||||
ctx_used: p,
|
||||
ctx_max: nCtxForStream,
|
||||
});
|
||||
};
|
||||
|
||||
try {
|
||||
return await streamCompletion(
|
||||
ctx,
|
||||
session.model,
|
||||
messages,
|
||||
{ tools: effectiveTools, temperature: effectiveTemperature },
|
||||
(delta) => {
|
||||
state.accumulated += delta;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'delta',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
content: delta,
|
||||
});
|
||||
ctx.log.debug({ sessionId, delta }, 'inference delta');
|
||||
scheduleFlush();
|
||||
},
|
||||
(prompt, completion) => {
|
||||
pendingUsage = { p: prompt, c: completion };
|
||||
const elapsed = Date.now() - lastUsageAt;
|
||||
if (elapsed >= USAGE_THROTTLE_MS) {
|
||||
flushUsage();
|
||||
} else if (!usageTimer) {
|
||||
usageTimer = setTimeout(() => {
|
||||
usageTimer = null;
|
||||
flushUsage();
|
||||
}, USAGE_THROTTLE_MS - elapsed);
|
||||
}
|
||||
},
|
||||
signal
|
||||
);
|
||||
} finally {
|
||||
if (pendingFlushTimer) {
|
||||
clearTimeout(pendingFlushTimer);
|
||||
pendingFlushTimer = null;
|
||||
}
|
||||
if (usageTimer) {
|
||||
clearTimeout(usageTimer);
|
||||
usageTimer = null;
|
||||
}
|
||||
await flushPromise;
|
||||
}
|
||||
}
|
||||
213
apps/server/src/services/inference/tool-phase.ts
Normal file
213
apps/server/src/services/inference/tool-phase.ts
Normal file
@@ -0,0 +1,213 @@
|
||||
import type { Session, ToolCall } from '../../types/api.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import { PathScopeError } from '../path_guard.js';
|
||||
import { TOOLS_BY_NAME } from '../tools.js';
|
||||
import { maybeFlagForCompaction } from './payload.js';
|
||||
import type {
|
||||
InferenceContext,
|
||||
StreamResult,
|
||||
TurnArgs,
|
||||
} from './turn.js';
|
||||
// v1.12.4: ESM value-import cycle. executeToolPhase recurses into
|
||||
// runAssistantTurn which lives in inference.ts. The cycle is safe because
|
||||
// the reference is read at call time (inside an async function body), not
|
||||
// at module top-level. Node + tsc resolve this cleanly.
|
||||
import { runAssistantTurn } from './turn.js';
|
||||
|
||||
async function executeToolCall(
|
||||
projectRoot: string,
|
||||
toolCall: ToolCall
|
||||
): Promise<{ output: unknown; truncated: boolean; error?: string }> {
|
||||
const tool = TOOLS_BY_NAME[toolCall.name];
|
||||
if (!tool) {
|
||||
return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` };
|
||||
}
|
||||
const parsed = tool.inputSchema.safeParse(toolCall.args);
|
||||
if (!parsed.success) {
|
||||
// v1.12 Track B.2: enrich the zod-reject path so the model sees a
|
||||
// one-line, tool-named hint ("tool 'search_symbols' rejected — query:
|
||||
// Required") instead of a JSON blob of flatten output. Higher recovery
|
||||
// rate on the next turn; doom-loop guard still bounds infinite retries.
|
||||
// The cast is because tool.inputSchema is ZodType<unknown>, so zod can't
|
||||
// statically narrow flatten()'s fieldErrors key set — but the runtime
|
||||
// shape is the standard { formErrors: string[]; fieldErrors: Record<...> }.
|
||||
const flatten = parsed.error.flatten() as {
|
||||
formErrors: string[];
|
||||
fieldErrors: Record<string, string[] | undefined>;
|
||||
};
|
||||
const fieldErrors = Object.entries(flatten.fieldErrors)
|
||||
.map(([field, errs]) => `${field}: ${errs?.[0] ?? 'invalid'}`)
|
||||
.join('; ');
|
||||
const formError = flatten.formErrors[0];
|
||||
const hint = fieldErrors || formError || 'unknown validation error';
|
||||
return {
|
||||
output: null,
|
||||
truncated: false,
|
||||
error: `tool '${toolCall.name}' rejected — ${hint}`,
|
||||
};
|
||||
}
|
||||
try {
|
||||
const output = await tool.execute(parsed.data, projectRoot);
|
||||
const truncated =
|
||||
typeof output === 'object' && output !== null && 'truncated' in output
|
||||
? Boolean((output as { truncated: unknown }).truncated)
|
||||
: false;
|
||||
return { output, truncated };
|
||||
} catch (err) {
|
||||
if (err instanceof PathScopeError) {
|
||||
return { output: null, truncated: false, error: err.message };
|
||||
}
|
||||
return {
|
||||
output: null,
|
||||
truncated: false,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeToolPhase(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
result: StreamResult,
|
||||
startedAt: string | null,
|
||||
session: Session,
|
||||
projectRoot: string
|
||||
): Promise<void> {
|
||||
const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args;
|
||||
const { content, toolCalls, promptTokens, completionTokens } = result;
|
||||
|
||||
// v1.11.3: ctx_max comes from llama-swap /upstream/<model>/props, not the
|
||||
// streaming completion (which doesn't emit n_ctx). getModelContext caches
|
||||
// the positive lookup for the process lifetime, so this is a single Map
|
||||
// hit after the first invocation per model.
|
||||
const mctx = await modelContext.getModelContext(session.model);
|
||||
const nCtx = mctx?.n_ctx ?? null;
|
||||
|
||||
const [updated] = await ctx.sql<
|
||||
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
||||
>`
|
||||
UPDATE messages
|
||||
SET content = ${content},
|
||||
status = 'complete',
|
||||
tool_calls = ${ctx.sql.json(toolCalls as never)},
|
||||
tokens_used = ${completionTokens},
|
||||
ctx_used = ${promptTokens},
|
||||
ctx_max = ${nCtx},
|
||||
finished_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
||||
`;
|
||||
// v1.11: flag for compaction if this turn pushed us over the usable budget.
|
||||
// We never compact mid-loop (the recursive runAssistantTurn keeps tools
|
||||
// flowing); the flag fires on the NEXT turn's pre-fetch hook above.
|
||||
await maybeFlagForCompaction(ctx, chatId, updated);
|
||||
const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
||||
UPDATE sessions SET updated_at = clock_timestamp()
|
||||
WHERE id = ${sessionId}
|
||||
RETURNING project_id, name, updated_at
|
||||
`;
|
||||
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at });
|
||||
for (const tc of toolCalls) {
|
||||
ctx.publish(sessionId, {
|
||||
type: 'tool_call',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
tool_call: tc,
|
||||
});
|
||||
}
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
tokens_used: updated?.tokens_used ?? null,
|
||||
ctx_used: updated?.ctx_used ?? null,
|
||||
ctx_max: updated?.ctx_max ?? null,
|
||||
started_at: startedAt,
|
||||
finished_at: updated?.finished_at ?? null,
|
||||
model: session.model,
|
||||
});
|
||||
|
||||
// Batch 9.7: ask_user_input pauses the loop. The tool row is still inserted
|
||||
// (the answer endpoint needs a target row to UPDATE), but tool_results is
|
||||
// pre-stamped with output=null as a "pending" sentinel and no tool_result
|
||||
// frame goes out — the card renders from the tool_call frame alone. Mixed
|
||||
// batches still execute the other tools normally.
|
||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'tool_running', at: new Date().toISOString() });
|
||||
let pausingForUserInput = false;
|
||||
await Promise.all(
|
||||
toolCalls.map(async (tc) => {
|
||||
const [toolRow] = await ctx.sql<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
const toolMessageId = toolRow!.id;
|
||||
if (tc.name === 'ask_user_input') {
|
||||
pausingForUserInput = true;
|
||||
const sentinel = { tool_call_id: tc.id, output: null, truncated: false };
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET tool_results = ${ctx.sql.json(sentinel as never)}
|
||||
WHERE id = ${toolMessageId}
|
||||
`;
|
||||
return;
|
||||
}
|
||||
const tres = await executeToolCall(projectRoot, tc);
|
||||
const stored = {
|
||||
tool_call_id: tc.id,
|
||||
output: tres.output,
|
||||
truncated: tres.truncated,
|
||||
...(tres.error ? { error: tres.error } : {}),
|
||||
};
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET tool_results = ${ctx.sql.json(stored as never)}
|
||||
WHERE id = ${toolMessageId}
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'tool_result',
|
||||
tool_message_id: toolMessageId,
|
||||
chat_id: chatId,
|
||||
tool_call_id: tc.id,
|
||||
output: tres.output,
|
||||
truncated: tres.truncated,
|
||||
...(tres.error ? { error: tres.error } : {}),
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
if (pausingForUserInput) {
|
||||
ctx.publishUser({
|
||||
type: 'chat_status',
|
||||
chat_id: chatId,
|
||||
status: 'waiting_for_input',
|
||||
at: new Date().toISOString(),
|
||||
});
|
||||
ctx.log.info(
|
||||
{ sessionId, chatId, assistantMessageId },
|
||||
'inference paused awaiting user input',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const [nextAssistant] = await ctx.sql<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
await runAssistantTurn(ctx, {
|
||||
sessionId,
|
||||
chatId,
|
||||
assistantMessageId: nextAssistant!.id,
|
||||
// v1.8.2: charge this turn's actual tool invocations against the budget.
|
||||
// One assistant message can emit multiple tool_calls, so we add the run
|
||||
// count, not 1. The next turn's budget check sees the cumulative total.
|
||||
toolsUsed: toolsUsed + result.toolCalls.length,
|
||||
// v1.11.6: append the just-executed tool calls to the per-turn history
|
||||
// so the next runAssistantTurn's doom-loop check can see them. We don't
|
||||
// cap the array length here — per-turn budgets keep it bounded
|
||||
// (typically <30 entries), and slicing happens inside detectDoomLoop.
|
||||
recentToolCalls: [...args.recentToolCalls, ...result.toolCalls],
|
||||
signal,
|
||||
});
|
||||
}
|
||||
326
apps/server/src/services/inference/turn.ts
Normal file
326
apps/server/src/services/inference/turn.ts
Normal file
@@ -0,0 +1,326 @@
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { Sql } from '../../db.js';
|
||||
import type { Config } from '../../config.js';
|
||||
import type {
|
||||
Agent,
|
||||
ErrorReason,
|
||||
Message,
|
||||
MessageMetadata,
|
||||
Project,
|
||||
Session,
|
||||
ToolCall,
|
||||
UserStreamFrame,
|
||||
} from '../../types/api.js';
|
||||
import { ALL_TOOLS } from '../tools.js';
|
||||
import { resolveProjectRoot } from '../path_guard.js';
|
||||
import { maybeAutoNameChat } from '../auto_name.js';
|
||||
import { getAgentById } from '../agents.js';
|
||||
import * as compaction from '../compaction.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import type { Broker } from '../broker.js';
|
||||
import { resolveToolBudget } from './budget.js';
|
||||
import {
|
||||
DOOM_LOOP_THRESHOLD,
|
||||
detectDoomLoop,
|
||||
} from './sentinels.js';
|
||||
import {
|
||||
buildMessagesPayload,
|
||||
loadContext,
|
||||
} from './payload.js';
|
||||
import {
|
||||
finalizeCompletion,
|
||||
handleAbortOrError,
|
||||
} from './error-handler.js';
|
||||
import {
|
||||
executeStreamPhase,
|
||||
streamCompletion,
|
||||
} from './stream-phase.js';
|
||||
import { executeToolPhase } from './tool-phase.js';
|
||||
import { DB_FLUSH_INTERVAL_MS, type StreamPhaseState } from './types.js';
|
||||
import {
|
||||
runCapHitSummary,
|
||||
runDoomLoopSummary,
|
||||
} from './sentinel-summaries.js';
|
||||
|
||||
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
||||
// importing from services/inference.js as the public surface.
|
||||
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js';
|
||||
export { buildMessagesPayload } from './payload.js';
|
||||
|
||||
export interface InferenceFrame {
|
||||
type:
|
||||
| 'message_started'
|
||||
| 'delta'
|
||||
| 'tool_call'
|
||||
| 'tool_result'
|
||||
| 'message_complete'
|
||||
| 'usage'
|
||||
| 'messages_deleted'
|
||||
| 'session_renamed'
|
||||
| 'chat_renamed'
|
||||
| 'error';
|
||||
message_id?: string;
|
||||
message_ids?: string[];
|
||||
chat_id?: string;
|
||||
tool_message_id?: string;
|
||||
tool_call_id?: string;
|
||||
// v1.8.2: 'system' added so cap-hit sentinel messages can announce themselves
|
||||
// through the normal message_started → delta → message_complete sequence.
|
||||
role?: 'assistant' | 'tool' | 'user' | 'system';
|
||||
content?: string;
|
||||
tool_call?: ToolCall;
|
||||
output?: unknown;
|
||||
truncated?: boolean;
|
||||
error?: string;
|
||||
// v1.8.2: structured error reason. Set on `type: 'error'` so the UI can
|
||||
// surface a specific message; `error` stays the human-readable text.
|
||||
reason?: ErrorReason;
|
||||
// v1.8.2: piggybacks on `message_complete` so static or terminally-resolved
|
||||
// messages can carry their persisted metadata to the live stream without a
|
||||
// refetch (sentinels carry { kind: 'cap_hit', ... }; failed messages carry
|
||||
// { kind: 'error', ... }).
|
||||
metadata?: MessageMetadata | null;
|
||||
tokens_used?: number | null;
|
||||
ctx_used?: number | null;
|
||||
ctx_max?: number | null;
|
||||
completion_tokens?: number | null;
|
||||
started_at?: string | null;
|
||||
finished_at?: string | null;
|
||||
model?: string;
|
||||
session_id?: string;
|
||||
name?: string;
|
||||
}
|
||||
|
||||
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
||||
|
||||
export interface InferenceContext {
|
||||
sql: Sql;
|
||||
config: Config;
|
||||
log: FastifyBaseLogger;
|
||||
publish: FramePublisher;
|
||||
publishUser: (frame: UserStreamFrame) => void;
|
||||
// v1.11: passed through so compaction.process can publish 'compacted'
|
||||
// frames on the same session WS channel useSessionStream subscribes to.
|
||||
// Compaction is the only path that needs the raw broker handle (regular
|
||||
// inference goes through `publish`); keeping a separate field avoids
|
||||
// tempting other code paths into bypassing the session-id binding.
|
||||
broker: Broker;
|
||||
}
|
||||
|
||||
// v1.12.4: payload assembly extracted to ./inference/payload.ts (tests
|
||||
// import buildMessagesPayload from this module, so a re-export below
|
||||
// preserves the public surface). Stream + tool phases extracted to
|
||||
// ./inference/stream-phase.ts and ./inference/tool-phase.ts.
|
||||
|
||||
export interface StreamResult {
|
||||
finishReason: string | null;
|
||||
content: string;
|
||||
toolCalls: ToolCall[];
|
||||
promptTokens: number | null;
|
||||
completionTokens: number | null;
|
||||
}
|
||||
|
||||
|
||||
export interface TurnArgs {
|
||||
sessionId: string;
|
||||
chatId: string;
|
||||
assistantMessageId: string;
|
||||
// v1.8.2: cumulative tool calls executed this run. Compared against the
|
||||
// resolved budget at the top of each turn. Replaces the older `depth`
|
||||
// counter (which counted iterations, not invocations).
|
||||
toolsUsed: number;
|
||||
// v1.11.6: ordered tool calls executed in this user-message turn (across
|
||||
// recursive runAssistantTurn invocations). Reset to [] at user-message
|
||||
// boundaries by runInference, same as toolsUsed. Doom-loop check at the
|
||||
// top of runAssistantTurn slices the last DOOM_LOOP_THRESHOLD entries.
|
||||
recentToolCalls: ToolCall[];
|
||||
signal: AbortSignal | undefined;
|
||||
}
|
||||
|
||||
|
||||
export async function runAssistantTurn(
|
||||
ctx: InferenceContext,
|
||||
args: TurnArgs,
|
||||
): Promise<void> {
|
||||
const { sessionId, chatId } = args;
|
||||
|
||||
// v1.11: if the prior turn flagged this chat for compaction, run it first
|
||||
// so loadContext below reads the post-compaction history. We swallow
|
||||
// compaction failures (clearing the flag so we don't loop) and proceed
|
||||
// with the un-compacted history — a slow turn that hits the model's
|
||||
// hard limit is recoverable; a dead session is not.
|
||||
const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>`
|
||||
SELECT needs_compaction FROM chats WHERE id = ${chatId}
|
||||
`;
|
||||
if (chatFlag[0]?.needs_compaction) {
|
||||
try {
|
||||
await compaction.process({
|
||||
sql: ctx.sql,
|
||||
config: ctx.config,
|
||||
log: ctx.log,
|
||||
broker: ctx.broker,
|
||||
chatId,
|
||||
});
|
||||
} catch (err) {
|
||||
ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding');
|
||||
await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`;
|
||||
}
|
||||
}
|
||||
|
||||
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
||||
if (!loaded) {
|
||||
ctx.log.warn({ sessionId }, 'inference: session or project missing');
|
||||
return;
|
||||
}
|
||||
const { session, project, history } = loaded;
|
||||
const projectRoot = await resolveProjectRoot(project.path);
|
||||
// Agent resolution is per-turn so PATCH agent_id mid-conversation takes
|
||||
// effect on the next message. Unknown agent_id returns null silently —
|
||||
// session falls back to base prompt + all tools + default temperature.
|
||||
const agent = session.agent_id
|
||||
? await getAgentById(project.path, session.agent_id)
|
||||
: null;
|
||||
|
||||
// v1.8.2: cap-hit replaces the older "tool loop depth exceeded" failure.
|
||||
// When we've already burned the budget *before* this turn even runs, we
|
||||
// skip straight to the summary flow — the in-flight assistant message slot
|
||||
// gets reused for the wrap-up reply instead of being marked failed.
|
||||
const budget = resolveToolBudget(agent);
|
||||
if (args.toolsUsed >= budget) {
|
||||
await runCapHitSummary(ctx, args, session, project, history, agent, budget);
|
||||
return;
|
||||
}
|
||||
|
||||
// v1.11.6: doom-loop guard. Detected BEFORE the budget cap (the model can
|
||||
// burn through 3 identical calls long before the 15-call budget fires).
|
||||
// Same in-flight-slot-reuse pattern as runCapHitSummary — wrap-up reply
|
||||
// lands in args.assistantMessageId, then a doom_loop sentinel is inserted
|
||||
// to make the abort visible in the chat history.
|
||||
const loop = detectDoomLoop(args.recentToolCalls);
|
||||
if (loop) {
|
||||
await runDoomLoopSummary(ctx, args, session, project, history, agent, loop);
|
||||
return;
|
||||
}
|
||||
|
||||
const messages = await buildMessagesPayload(session, project, history, agent);
|
||||
|
||||
// v1.11.8: resolve per-chat web-tools opt-in. Tri-state on the wire:
|
||||
// - session.web_search_enabled = null → inherit project default
|
||||
// - session.web_search_enabled = true/false → explicit
|
||||
// Both web_search and web_fetch are gated by this single flag (the UI
|
||||
// label is "Enable web search and fetch" — same store, both tools).
|
||||
// Default is false unless explicitly opted in, matching the v1.9
|
||||
// plumbing intent ("inert until Batch 8 ships the actual tools").
|
||||
const webToolsEnabled =
|
||||
session.web_search_enabled ?? project.default_web_search_enabled ?? false;
|
||||
|
||||
const state: StreamPhaseState = { accumulated: '', startedAt: null };
|
||||
let result: StreamResult;
|
||||
try {
|
||||
result = await executeStreamPhase(ctx, args, session, messages, state, agent, webToolsEnabled);
|
||||
} catch (err) {
|
||||
await handleAbortOrError(ctx, args, state.accumulated, err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.toolCalls.length > 0) {
|
||||
await executeToolPhase(ctx, args, result, state.startedAt, session, projectRoot);
|
||||
return;
|
||||
}
|
||||
|
||||
await finalizeCompletion(ctx, args, result, state.startedAt, session);
|
||||
}
|
||||
|
||||
export async function runInference(
|
||||
ctx: InferenceContext,
|
||||
sessionId: string,
|
||||
chatId: string,
|
||||
assistantMessageId: string,
|
||||
signal?: AbortSignal
|
||||
): Promise<void> {
|
||||
// v1.8.2: every fresh inference (initial send, regenerate, force_send,
|
||||
// continue) starts with a clean budget. Tool-call accumulation across
|
||||
// Continue invocations is what the hard ceiling guards against, not the
|
||||
// per-call budget.
|
||||
// v1.11.6: recentToolCalls also resets — doom-loop detection is scoped
|
||||
// to a single user-message turn, so a Continue starts with no history.
|
||||
return runAssistantTurn(ctx, {
|
||||
sessionId,
|
||||
chatId,
|
||||
assistantMessageId,
|
||||
toolsUsed: 0,
|
||||
recentToolCalls: [],
|
||||
signal,
|
||||
});
|
||||
}
|
||||
|
||||
// v1.8.2: cap-hit summary flow. Called instead of erroring when the loop
|
||||
// hits its budget. Reuses the in-flight assistant message slot to stream a
|
||||
// short wrap-up reply with the synthetic note prepended and tools disabled,
|
||||
// then always inserts a cap_hit sentinel afterward (regardless of summary
|
||||
// outcome) so the UI can show a Continue affordance.
|
||||
interface InferenceRegistration {
|
||||
controller: AbortController;
|
||||
completed: Promise<void>;
|
||||
}
|
||||
|
||||
export function createInferenceRunner(
|
||||
ctx: Omit<InferenceContext, 'publishUser'>,
|
||||
publishUserFn: (user: string, frame: UserStreamFrame) => void
|
||||
) {
|
||||
const registry = new Map<string, InferenceRegistration>();
|
||||
|
||||
return {
|
||||
enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) {
|
||||
const callCtx: InferenceContext = {
|
||||
...ctx,
|
||||
publishUser: (frame) => publishUserFn(user, frame),
|
||||
// v1.11: broker comes in via ctx (set at registration time). Repeated
|
||||
// here so the destructure carries it onto the per-call ctx without
|
||||
// having to add it to every enqueue/cancel signature individually.
|
||||
broker: ctx.broker,
|
||||
};
|
||||
// v1.8 mobile-tabs: announce working before the async loop starts so
|
||||
// every device subscribed to the user channel sees the amber dot.
|
||||
callCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'streaming', at: new Date().toISOString() });
|
||||
const controller = new AbortController();
|
||||
let resolveCompleted!: () => void;
|
||||
const completed = new Promise<void>((res) => { resolveCompleted = res; });
|
||||
const registration: InferenceRegistration = { controller, completed };
|
||||
registry.set(chatId, registration);
|
||||
void (async () => {
|
||||
try {
|
||||
await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal);
|
||||
setImmediate(() => {
|
||||
void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => {
|
||||
callCtx.log.warn({ err, chatId }, 'auto-name failed');
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
callCtx.log.error({ err }, 'unhandled inference error');
|
||||
} finally {
|
||||
resolveCompleted();
|
||||
// Only clear our own registration; a force-send may have replaced it.
|
||||
if (registry.get(chatId) === registration) {
|
||||
registry.delete(chatId);
|
||||
}
|
||||
}
|
||||
})();
|
||||
},
|
||||
|
||||
async cancel(_sessionId: string, chatId: string): Promise<boolean> {
|
||||
const reg = registry.get(chatId);
|
||||
if (!reg) return false;
|
||||
reg.controller.abort();
|
||||
// Swallow — we just need to wait for the catch/finally to persist state.
|
||||
await reg.completed.catch(() => {});
|
||||
return true;
|
||||
},
|
||||
|
||||
hasActive(chatId: string): boolean {
|
||||
return registry.has(chatId);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export const _toolNames = ALL_TOOLS.map((t) => t.name);
|
||||
13
apps/server/src/services/inference/types.ts
Normal file
13
apps/server/src/services/inference/types.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
// v1.12.4: shared inter-phase types/constants for the extracted phase files.
|
||||
// Lives here so stream-phase, tool-phase, and the summary functions still in
|
||||
// inference.ts can all reference the same definitions without circular imports.
|
||||
|
||||
export interface StreamPhaseState {
|
||||
accumulated: string;
|
||||
startedAt: string | null;
|
||||
}
|
||||
|
||||
// 500ms keeps the DB UPDATE rate bounded under heavy streaming. Used by
|
||||
// executeStreamPhase, runCapHitSummary, and runDoomLoopSummary — every site
|
||||
// that does a debounced content flush during streaming.
|
||||
export const DB_FLUSH_INTERVAL_MS = 500;
|
||||
Reference in New Issue
Block a user