v1.12.4-rc2: extract payload + error-handler from inference.ts
- payload.ts: buildMessagesPayload (re-exported), loadContext, maybeFlagForCompaction - error-handler.ts: handleAbortOrError, finalizeCompletion Both new files type-import InferenceContext/StreamResult/TurnArgs from inference.ts; ESM elides type imports so there's no runtime cycle. handleAbortOrError turned out not to call the summary functions, so no back-edge needed. inference.ts shrinks from ~1676 to ~1401 LoC. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -23,15 +23,10 @@ import { getAgentById } from './agents.js';
|
|||||||
import * as compaction from './compaction.js';
|
import * as compaction from './compaction.js';
|
||||||
import * as modelContext from './model-context.js';
|
import * as modelContext from './model-context.js';
|
||||||
import type { Broker } from './broker.js';
|
import type { Broker } from './broker.js';
|
||||||
// v1.12: prompt assembly extracted to its own module. buildSystemPrompt is
|
|
||||||
// async (awaits the container-guidance loader) — buildMessagesPayload below
|
|
||||||
// is therefore async too, and its three call sites in this file await it.
|
|
||||||
import { buildSystemPrompt } from './system-prompt.js';
|
|
||||||
import { resolveToolBudget } from './inference/budget.js';
|
import { resolveToolBudget } from './inference/budget.js';
|
||||||
import {
|
import {
|
||||||
DOOM_LOOP_THRESHOLD,
|
DOOM_LOOP_THRESHOLD,
|
||||||
detectDoomLoop,
|
detectDoomLoop,
|
||||||
isAnySentinel,
|
|
||||||
} from './inference/sentinels.js';
|
} from './inference/sentinels.js';
|
||||||
import {
|
import {
|
||||||
XML_TOOL_CLOSE,
|
XML_TOOL_CLOSE,
|
||||||
@@ -39,10 +34,21 @@ import {
|
|||||||
parseXmlToolCall,
|
parseXmlToolCall,
|
||||||
partialXmlOpenerStart,
|
partialXmlOpenerStart,
|
||||||
} from './inference/xml-parser.js';
|
} from './inference/xml-parser.js';
|
||||||
|
import {
|
||||||
|
buildMessagesPayload,
|
||||||
|
loadContext,
|
||||||
|
maybeFlagForCompaction,
|
||||||
|
type OpenAiMessage,
|
||||||
|
} from './inference/payload.js';
|
||||||
|
import {
|
||||||
|
finalizeCompletion,
|
||||||
|
handleAbortOrError,
|
||||||
|
} from './inference/error-handler.js';
|
||||||
|
|
||||||
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
// v1.12.4: re-exported so external callers (tests, future consumers) keep
|
||||||
// importing from services/inference.js as the public surface.
|
// importing from services/inference.js as the public surface.
|
||||||
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './inference/sentinels.js';
|
export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './inference/sentinels.js';
|
||||||
|
export { buildMessagesPayload } from './inference/payload.js';
|
||||||
|
|
||||||
const DB_FLUSH_INTERVAL_MS = 500;
|
const DB_FLUSH_INTERVAL_MS = 500;
|
||||||
|
|
||||||
@@ -101,17 +107,6 @@ export interface InferenceFrame {
|
|||||||
|
|
||||||
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
||||||
|
|
||||||
interface OpenAiMessage {
|
|
||||||
role: 'system' | 'user' | 'assistant' | 'tool';
|
|
||||||
content: string | null;
|
|
||||||
tool_calls?: Array<{
|
|
||||||
id: string;
|
|
||||||
type: 'function';
|
|
||||||
function: { name: string; arguments: string };
|
|
||||||
}>;
|
|
||||||
tool_call_id?: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
interface ChatCompletionDelta {
|
interface ChatCompletionDelta {
|
||||||
role?: string;
|
role?: string;
|
||||||
content?: string | null;
|
content?: string | null;
|
||||||
@@ -149,140 +144,14 @@ export interface InferenceContext {
|
|||||||
broker: Broker;
|
broker: Broker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// v1.12.4: payload assembly extracted to ./inference/payload.ts —
|
||||||
|
// buildMessagesPayload, loadContext, maybeFlagForCompaction, and the
|
||||||
|
// OpenAiMessage shape live there now. Re-exported below to preserve the
|
||||||
|
// public surface (tests import buildMessagesPayload from this module).
|
||||||
// v1.12: buildSystemPrompt moved to services/system-prompt.ts. See that
|
// v1.12: buildSystemPrompt moved to services/system-prompt.ts. See that
|
||||||
// module for the resolution order doc and the container-guidance layer.
|
// module for the resolution order doc and the container-guidance layer.
|
||||||
// buildMessagesPayload is async now because buildSystemPrompt awaits the
|
// buildMessagesPayload is async now because buildSystemPrompt awaits the
|
||||||
// guidance cache lookup.
|
// guidance cache lookup.
|
||||||
export async function buildMessagesPayload(
|
|
||||||
session: Session,
|
|
||||||
project: Project,
|
|
||||||
history: Message[],
|
|
||||||
agent: Agent | null = null
|
|
||||||
): Promise<OpenAiMessage[]> {
|
|
||||||
const out: OpenAiMessage[] = [];
|
|
||||||
const systemPrompt = await buildSystemPrompt(project, session, agent);
|
|
||||||
out.push({ role: 'system', content: systemPrompt });
|
|
||||||
|
|
||||||
// Find the latest compact marker — only send messages from that point onwards
|
|
||||||
let startIdx = 0;
|
|
||||||
for (let i = history.length - 1; i >= 0; i--) {
|
|
||||||
if (history[i]!.kind === 'compact') {
|
|
||||||
startIdx = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (let i = startIdx; i < history.length; i++) {
|
|
||||||
const m = history[i]!;
|
|
||||||
if (m.kind === 'compact') {
|
|
||||||
out.push({ role: 'system', content: m.content });
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// v1.8.2 / v1.11.6: cap-hit and doom-loop sentinels are UI-only — never
|
|
||||||
// send them to the LLM. The synthetic instruction note lives only inside
|
|
||||||
// the summary call's messages array and is never persisted, so on a
|
|
||||||
// follow-up turn the model resumes with a clean context.
|
|
||||||
if (isAnySentinel(m)) continue;
|
|
||||||
if (m.role === 'assistant' && m.status === 'streaming') continue;
|
|
||||||
if (m.role === 'assistant' && m.status === 'cancelled') continue;
|
|
||||||
if (m.role === 'tool') {
|
|
||||||
const tr = m.tool_results;
|
|
||||||
if (!tr) continue;
|
|
||||||
const outputText = tr.error
|
|
||||||
? `error: ${tr.error}`
|
|
||||||
: typeof tr.output === 'string'
|
|
||||||
? tr.output
|
|
||||||
: JSON.stringify(tr.output);
|
|
||||||
out.push({
|
|
||||||
role: 'tool',
|
|
||||||
content: outputText,
|
|
||||||
tool_call_id: tr.tool_call_id,
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (m.role === 'assistant') {
|
|
||||||
const msg: OpenAiMessage = {
|
|
||||||
role: 'assistant',
|
|
||||||
content: m.content && m.content.length > 0 ? m.content : null,
|
|
||||||
};
|
|
||||||
if (m.tool_calls && m.tool_calls.length > 0) {
|
|
||||||
msg.tool_calls = m.tool_calls.map((tc) => ({
|
|
||||||
id: tc.id,
|
|
||||||
type: 'function' as const,
|
|
||||||
function: { name: tc.name, arguments: JSON.stringify(tc.args) },
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
out.push(msg);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
out.push({ role: 'user', content: m.content });
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function loadContext(
|
|
||||||
sql: Sql,
|
|
||||||
sessionId: string,
|
|
||||||
chatId: string
|
|
||||||
): Promise<{ session: Session; project: Project; history: Message[] } | null> {
|
|
||||||
const sessionRows = await sql<Session[]>`
|
|
||||||
SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at,
|
|
||||||
agent_id, web_search_enabled
|
|
||||||
FROM sessions WHERE id = ${sessionId}
|
|
||||||
`;
|
|
||||||
if (sessionRows.length === 0) return null;
|
|
||||||
const session = sessionRows[0]!;
|
|
||||||
|
|
||||||
const projectRows = await sql<Project[]>`
|
|
||||||
SELECT id, name, path, added_at, last_session_id, status, gitea_remote,
|
|
||||||
default_system_prompt, default_web_search_enabled
|
|
||||||
FROM projects WHERE id = ${session.project_id}
|
|
||||||
`;
|
|
||||||
if (projectRows.length === 0) return null;
|
|
||||||
const project = projectRows[0]!;
|
|
||||||
|
|
||||||
// v1.11: filter compacted messages out of the inference assembly. The GET
|
|
||||||
// /api/sessions/:id/messages endpoint still returns everything (so the UI
|
|
||||||
// can show history with the summary card inline); only LLM payloads skip
|
|
||||||
// compacted rows. compacted_at IS NULL keeps the active summary + tail.
|
|
||||||
const history = await sql<Message[]>`
|
|
||||||
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
|
|
||||||
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata
|
|
||||||
FROM messages
|
|
||||||
WHERE chat_id = ${chatId} AND compacted_at IS NULL
|
|
||||||
ORDER BY created_at ASC, id ASC
|
|
||||||
`;
|
|
||||||
|
|
||||||
return { session, project, history };
|
|
||||||
}
|
|
||||||
|
|
||||||
// v1.11: shared helper used after both finalizeCompletion and executeToolPhase
|
|
||||||
// persist their token counts. Reads tokens off the just-UPDATEd row (which
|
|
||||||
// the caller returns from RETURNING), runs compaction.isOverflow, and flips
|
|
||||||
// chats.needs_compaction. The next runAssistantTurn invocation acts on it.
|
|
||||||
// Silent on missing tokens — llama-swap occasionally omits usage on truncated
|
|
||||||
// streams, and we'd rather miss one overflow than crash the inference path.
|
|
||||||
async function maybeFlagForCompaction(
|
|
||||||
ctx: InferenceContext,
|
|
||||||
chatId: string,
|
|
||||||
updated: { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null } | undefined,
|
|
||||||
): Promise<void> {
|
|
||||||
if (!updated) return;
|
|
||||||
const promptTokens = updated.ctx_used;
|
|
||||||
const completionTokens = updated.tokens_used;
|
|
||||||
const contextLimit = updated.ctx_max;
|
|
||||||
if (typeof promptTokens !== 'number') return;
|
|
||||||
if (typeof completionTokens !== 'number') return;
|
|
||||||
if (typeof contextLimit !== 'number') return;
|
|
||||||
const overflow = compaction.isOverflow(
|
|
||||||
{ prompt_tokens: promptTokens, completion_tokens: completionTokens },
|
|
||||||
contextLimit,
|
|
||||||
);
|
|
||||||
if (!overflow) return;
|
|
||||||
await ctx.sql`UPDATE chats SET needs_compaction = true WHERE id = ${chatId}`;
|
|
||||||
ctx.log.info({ chatId, promptTokens, completionTokens, contextLimit }, 'inference: flagged for compaction');
|
|
||||||
}
|
|
||||||
|
|
||||||
async function* sseLines(stream: ReadableStream<Uint8Array>): AsyncGenerator<string> {
|
async function* sseLines(stream: ReadableStream<Uint8Array>): AsyncGenerator<string> {
|
||||||
const reader = stream.getReader();
|
const reader = stream.getReader();
|
||||||
const decoder = new TextDecoder('utf-8');
|
const decoder = new TextDecoder('utf-8');
|
||||||
@@ -306,7 +175,7 @@ async function* sseLines(stream: ReadableStream<Uint8Array>): AsyncGenerator<str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interface StreamResult {
|
export interface StreamResult {
|
||||||
finishReason: string | null;
|
finishReason: string | null;
|
||||||
content: string;
|
content: string;
|
||||||
toolCalls: ToolCall[];
|
toolCalls: ToolCall[];
|
||||||
@@ -544,7 +413,7 @@ async function executeToolCall(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interface TurnArgs {
|
export interface TurnArgs {
|
||||||
sessionId: string;
|
sessionId: string;
|
||||||
chatId: string;
|
chatId: string;
|
||||||
assistantMessageId: string;
|
assistantMessageId: string;
|
||||||
@@ -700,88 +569,6 @@ async function executeStreamPhase(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function handleAbortOrError(
|
|
||||||
ctx: InferenceContext,
|
|
||||||
args: TurnArgs,
|
|
||||||
accumulated: string,
|
|
||||||
err: unknown
|
|
||||||
): Promise<void> {
|
|
||||||
const { sessionId, chatId, assistantMessageId } = args;
|
|
||||||
const isAbort = err instanceof Error && err.name === 'AbortError';
|
|
||||||
const finalStatus = isAbort ? 'cancelled' : 'failed';
|
|
||||||
const errMsg = err instanceof Error ? err.message : String(err);
|
|
||||||
// v1.8.2: persist a structured error metadata blob on genuine failures so
|
|
||||||
// the bubble can render the reason on reload without re-deriving from the
|
|
||||||
// (one-shot) WS error frame. User-initiated abort skips this — there's no
|
|
||||||
// "reason" to surface for a stop the user already explicitly chose.
|
|
||||||
const errorMetadata: MessageMetadata | null = isAbort
|
|
||||||
? null
|
|
||||||
: { kind: 'error', error_reason: 'llm_provider_error', error_text: errMsg };
|
|
||||||
if (errorMetadata) {
|
|
||||||
await ctx.sql`
|
|
||||||
UPDATE messages
|
|
||||||
SET status = ${finalStatus},
|
|
||||||
content = ${accumulated},
|
|
||||||
finished_at = clock_timestamp(),
|
|
||||||
metadata = ${ctx.sql.json(errorMetadata as never)}
|
|
||||||
WHERE id = ${assistantMessageId}
|
|
||||||
`;
|
|
||||||
} else {
|
|
||||||
await ctx.sql`
|
|
||||||
UPDATE messages
|
|
||||||
SET status = ${finalStatus},
|
|
||||||
content = ${accumulated},
|
|
||||||
finished_at = clock_timestamp()
|
|
||||||
WHERE id = ${assistantMessageId}
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
const [failSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
|
||||||
UPDATE sessions SET updated_at = clock_timestamp()
|
|
||||||
WHERE id = ${sessionId}
|
|
||||||
RETURNING project_id, name, updated_at
|
|
||||||
`;
|
|
||||||
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at });
|
|
||||||
// v1.8 mobile-tabs: cancellation is a user-initiated stop, treat as idle;
|
|
||||||
// genuine errors flip the dot red. v1.8.2: error path also carries a
|
|
||||||
// machine-readable `reason` so the UI can render specifics inline.
|
|
||||||
if (isAbort) {
|
|
||||||
// v1.12.1: defensive cancellation write. The status=${finalStatus} UPDATE
|
|
||||||
// above already sets 'cancelled' for the AbortError case, but a row can
|
|
||||||
// leak as 'streaming' when the abort fires between the post-tool-phase
|
|
||||||
// INSERT (executeToolPhase) and the next runAssistantTurn's stream setup,
|
|
||||||
// bypassing the try/catch around executeStreamPhase. The status guard
|
|
||||||
// makes this a no-op when the earlier write already landed.
|
|
||||||
await ctx.sql`
|
|
||||||
UPDATE messages
|
|
||||||
SET status = 'cancelled', content = ${accumulated}, finished_at = clock_timestamp()
|
|
||||||
WHERE id = ${args.assistantMessageId} AND status = 'streaming'
|
|
||||||
`;
|
|
||||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'message_complete',
|
|
||||||
message_id: assistantMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
});
|
|
||||||
ctx.log.info({ sessionId, chatId, assistantMessageId }, 'inference cancelled');
|
|
||||||
} else {
|
|
||||||
ctx.publishUser({
|
|
||||||
type: 'chat_status',
|
|
||||||
chat_id: chatId,
|
|
||||||
status: 'error',
|
|
||||||
at: new Date().toISOString(),
|
|
||||||
reason: 'llm_provider_error',
|
|
||||||
});
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'error',
|
|
||||||
message_id: assistantMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
error: errMsg,
|
|
||||||
reason: 'llm_provider_error',
|
|
||||||
});
|
|
||||||
ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function executeToolPhase(
|
async function executeToolPhase(
|
||||||
ctx: InferenceContext,
|
ctx: InferenceContext,
|
||||||
args: TurnArgs,
|
args: TurnArgs,
|
||||||
@@ -929,68 +716,6 @@ async function executeToolPhase(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async function finalizeCompletion(
|
|
||||||
ctx: InferenceContext,
|
|
||||||
args: TurnArgs,
|
|
||||||
result: StreamResult,
|
|
||||||
startedAt: string | null,
|
|
||||||
session: Session
|
|
||||||
): Promise<void> {
|
|
||||||
const { sessionId, chatId, assistantMessageId } = args;
|
|
||||||
const { content, finishReason, promptTokens, completionTokens } = result;
|
|
||||||
|
|
||||||
// v1.11.3: see executeToolPhase for the rationale.
|
|
||||||
const mctx = await modelContext.getModelContext(session.model);
|
|
||||||
const nCtx = mctx?.n_ctx ?? null;
|
|
||||||
|
|
||||||
const [updated] = await ctx.sql<
|
|
||||||
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
|
||||||
>`
|
|
||||||
UPDATE messages
|
|
||||||
SET content = ${content},
|
|
||||||
status = 'complete',
|
|
||||||
tokens_used = ${completionTokens},
|
|
||||||
ctx_used = ${promptTokens},
|
|
||||||
ctx_max = ${nCtx},
|
|
||||||
finished_at = clock_timestamp()
|
|
||||||
WHERE id = ${assistantMessageId}
|
|
||||||
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
|
||||||
`;
|
|
||||||
// v1.11: flag for compaction on the terminal turn too. Catches the common
|
|
||||||
// case of a turn that hit the limit without invoking tools.
|
|
||||||
await maybeFlagForCompaction(ctx, chatId, updated);
|
|
||||||
const [completeSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
|
||||||
UPDATE sessions SET updated_at = clock_timestamp()
|
|
||||||
WHERE id = ${sessionId}
|
|
||||||
RETURNING project_id, name, updated_at
|
|
||||||
`;
|
|
||||||
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: completeSessRow!.project_id, name: completeSessRow!.name, updated_at: completeSessRow!.updated_at });
|
|
||||||
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
|
||||||
ctx.publish(sessionId, {
|
|
||||||
type: 'message_complete',
|
|
||||||
message_id: assistantMessageId,
|
|
||||||
chat_id: chatId,
|
|
||||||
tokens_used: updated?.tokens_used ?? null,
|
|
||||||
ctx_used: updated?.ctx_used ?? null,
|
|
||||||
ctx_max: updated?.ctx_max ?? null,
|
|
||||||
started_at: startedAt,
|
|
||||||
finished_at: updated?.finished_at ?? null,
|
|
||||||
model: session.model,
|
|
||||||
});
|
|
||||||
ctx.log.info(
|
|
||||||
{
|
|
||||||
sessionId,
|
|
||||||
chatId,
|
|
||||||
assistantMessageId,
|
|
||||||
finishReason,
|
|
||||||
chars: content.length,
|
|
||||||
tokens_used: updated?.tokens_used,
|
|
||||||
ctx_used: updated?.ctx_used,
|
|
||||||
},
|
|
||||||
'inference complete'
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function runAssistantTurn(
|
async function runAssistantTurn(
|
||||||
ctx: InferenceContext,
|
ctx: InferenceContext,
|
||||||
args: TurnArgs,
|
args: TurnArgs,
|
||||||
|
|||||||
148
apps/server/src/services/inference/error-handler.ts
Normal file
148
apps/server/src/services/inference/error-handler.ts
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
import type { MessageMetadata, Session } from '../../types/api.js';
|
||||||
|
import * as modelContext from '../model-context.js';
|
||||||
|
import { maybeFlagForCompaction } from './payload.js';
|
||||||
|
import type { InferenceContext, StreamResult, TurnArgs } from '../inference.js';
|
||||||
|
|
||||||
|
export async function handleAbortOrError(
|
||||||
|
ctx: InferenceContext,
|
||||||
|
args: TurnArgs,
|
||||||
|
accumulated: string,
|
||||||
|
err: unknown
|
||||||
|
): Promise<void> {
|
||||||
|
const { sessionId, chatId, assistantMessageId } = args;
|
||||||
|
const isAbort = err instanceof Error && err.name === 'AbortError';
|
||||||
|
const finalStatus = isAbort ? 'cancelled' : 'failed';
|
||||||
|
const errMsg = err instanceof Error ? err.message : String(err);
|
||||||
|
// v1.8.2: persist a structured error metadata blob on genuine failures so
|
||||||
|
// the bubble can render the reason on reload without re-deriving from the
|
||||||
|
// (one-shot) WS error frame. User-initiated abort skips this — there's no
|
||||||
|
// "reason" to surface for a stop the user already explicitly chose.
|
||||||
|
const errorMetadata: MessageMetadata | null = isAbort
|
||||||
|
? null
|
||||||
|
: { kind: 'error', error_reason: 'llm_provider_error', error_text: errMsg };
|
||||||
|
if (errorMetadata) {
|
||||||
|
await ctx.sql`
|
||||||
|
UPDATE messages
|
||||||
|
SET status = ${finalStatus},
|
||||||
|
content = ${accumulated},
|
||||||
|
finished_at = clock_timestamp(),
|
||||||
|
metadata = ${ctx.sql.json(errorMetadata as never)}
|
||||||
|
WHERE id = ${assistantMessageId}
|
||||||
|
`;
|
||||||
|
} else {
|
||||||
|
await ctx.sql`
|
||||||
|
UPDATE messages
|
||||||
|
SET status = ${finalStatus},
|
||||||
|
content = ${accumulated},
|
||||||
|
finished_at = clock_timestamp()
|
||||||
|
WHERE id = ${assistantMessageId}
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
const [failSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
||||||
|
UPDATE sessions SET updated_at = clock_timestamp()
|
||||||
|
WHERE id = ${sessionId}
|
||||||
|
RETURNING project_id, name, updated_at
|
||||||
|
`;
|
||||||
|
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at });
|
||||||
|
// v1.8 mobile-tabs: cancellation is a user-initiated stop, treat as idle;
|
||||||
|
// genuine errors flip the dot red. v1.8.2: error path also carries a
|
||||||
|
// machine-readable `reason` so the UI can render specifics inline.
|
||||||
|
if (isAbort) {
|
||||||
|
// v1.12.1: defensive cancellation write. The status=${finalStatus} UPDATE
|
||||||
|
// above already sets 'cancelled' for the AbortError case, but a row can
|
||||||
|
// leak as 'streaming' when the abort fires between the post-tool-phase
|
||||||
|
// INSERT (executeToolPhase) and the next runAssistantTurn's stream setup,
|
||||||
|
// bypassing the try/catch around executeStreamPhase. The status guard
|
||||||
|
// makes this a no-op when the earlier write already landed.
|
||||||
|
await ctx.sql`
|
||||||
|
UPDATE messages
|
||||||
|
SET status = 'cancelled', content = ${accumulated}, finished_at = clock_timestamp()
|
||||||
|
WHERE id = ${args.assistantMessageId} AND status = 'streaming'
|
||||||
|
`;
|
||||||
|
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'message_complete',
|
||||||
|
message_id: assistantMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
});
|
||||||
|
ctx.log.info({ sessionId, chatId, assistantMessageId }, 'inference cancelled');
|
||||||
|
} else {
|
||||||
|
ctx.publishUser({
|
||||||
|
type: 'chat_status',
|
||||||
|
chat_id: chatId,
|
||||||
|
status: 'error',
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
reason: 'llm_provider_error',
|
||||||
|
});
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'error',
|
||||||
|
message_id: assistantMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
error: errMsg,
|
||||||
|
reason: 'llm_provider_error',
|
||||||
|
});
|
||||||
|
ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function finalizeCompletion(
|
||||||
|
ctx: InferenceContext,
|
||||||
|
args: TurnArgs,
|
||||||
|
result: StreamResult,
|
||||||
|
startedAt: string | null,
|
||||||
|
session: Session
|
||||||
|
): Promise<void> {
|
||||||
|
const { sessionId, chatId, assistantMessageId } = args;
|
||||||
|
const { content, finishReason, promptTokens, completionTokens } = result;
|
||||||
|
|
||||||
|
// v1.11.3: see executeToolPhase for the rationale.
|
||||||
|
const mctx = await modelContext.getModelContext(session.model);
|
||||||
|
const nCtx = mctx?.n_ctx ?? null;
|
||||||
|
|
||||||
|
const [updated] = await ctx.sql<
|
||||||
|
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
||||||
|
>`
|
||||||
|
UPDATE messages
|
||||||
|
SET content = ${content},
|
||||||
|
status = 'complete',
|
||||||
|
tokens_used = ${completionTokens},
|
||||||
|
ctx_used = ${promptTokens},
|
||||||
|
ctx_max = ${nCtx},
|
||||||
|
finished_at = clock_timestamp()
|
||||||
|
WHERE id = ${assistantMessageId}
|
||||||
|
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
||||||
|
`;
|
||||||
|
// v1.11: flag for compaction on the terminal turn too. Catches the common
|
||||||
|
// case of a turn that hit the limit without invoking tools.
|
||||||
|
await maybeFlagForCompaction(ctx, chatId, updated);
|
||||||
|
const [completeSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
||||||
|
UPDATE sessions SET updated_at = clock_timestamp()
|
||||||
|
WHERE id = ${sessionId}
|
||||||
|
RETURNING project_id, name, updated_at
|
||||||
|
`;
|
||||||
|
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: completeSessRow!.project_id, name: completeSessRow!.name, updated_at: completeSessRow!.updated_at });
|
||||||
|
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() });
|
||||||
|
ctx.publish(sessionId, {
|
||||||
|
type: 'message_complete',
|
||||||
|
message_id: assistantMessageId,
|
||||||
|
chat_id: chatId,
|
||||||
|
tokens_used: updated?.tokens_used ?? null,
|
||||||
|
ctx_used: updated?.ctx_used ?? null,
|
||||||
|
ctx_max: updated?.ctx_max ?? null,
|
||||||
|
started_at: startedAt,
|
||||||
|
finished_at: updated?.finished_at ?? null,
|
||||||
|
model: session.model,
|
||||||
|
});
|
||||||
|
ctx.log.info(
|
||||||
|
{
|
||||||
|
sessionId,
|
||||||
|
chatId,
|
||||||
|
assistantMessageId,
|
||||||
|
finishReason,
|
||||||
|
chars: content.length,
|
||||||
|
tokens_used: updated?.tokens_used,
|
||||||
|
ctx_used: updated?.ctx_used,
|
||||||
|
},
|
||||||
|
'inference complete'
|
||||||
|
);
|
||||||
|
}
|
||||||
155
apps/server/src/services/inference/payload.ts
Normal file
155
apps/server/src/services/inference/payload.ts
Normal file
@@ -0,0 +1,155 @@
|
|||||||
|
import type { Sql } from '../../db.js';
|
||||||
|
import type {
|
||||||
|
Agent,
|
||||||
|
Message,
|
||||||
|
Project,
|
||||||
|
Session,
|
||||||
|
} from '../../types/api.js';
|
||||||
|
import * as compaction from '../compaction.js';
|
||||||
|
import { buildSystemPrompt } from '../system-prompt.js';
|
||||||
|
import { isAnySentinel } from './sentinels.js';
|
||||||
|
import type { InferenceContext } from '../inference.js';
|
||||||
|
|
||||||
|
export interface OpenAiMessage {
|
||||||
|
role: 'system' | 'user' | 'assistant' | 'tool';
|
||||||
|
content: string | null;
|
||||||
|
tool_calls?: Array<{
|
||||||
|
id: string;
|
||||||
|
type: 'function';
|
||||||
|
function: { name: string; arguments: string };
|
||||||
|
}>;
|
||||||
|
tool_call_id?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
// v1.12: buildSystemPrompt lives in services/system-prompt.ts. It awaits the
|
||||||
|
// container-guidance loader, so this function is async too and every call
|
||||||
|
// site in inference.ts awaits the result.
|
||||||
|
export async function buildMessagesPayload(
|
||||||
|
session: Session,
|
||||||
|
project: Project,
|
||||||
|
history: Message[],
|
||||||
|
agent: Agent | null = null
|
||||||
|
): Promise<OpenAiMessage[]> {
|
||||||
|
const out: OpenAiMessage[] = [];
|
||||||
|
const systemPrompt = await buildSystemPrompt(project, session, agent);
|
||||||
|
out.push({ role: 'system', content: systemPrompt });
|
||||||
|
|
||||||
|
// Find the latest compact marker — only send messages from that point onwards
|
||||||
|
let startIdx = 0;
|
||||||
|
for (let i = history.length - 1; i >= 0; i--) {
|
||||||
|
if (history[i]!.kind === 'compact') {
|
||||||
|
startIdx = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (let i = startIdx; i < history.length; i++) {
|
||||||
|
const m = history[i]!;
|
||||||
|
if (m.kind === 'compact') {
|
||||||
|
out.push({ role: 'system', content: m.content });
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// v1.8.2 / v1.11.6: cap-hit and doom-loop sentinels are UI-only — never
|
||||||
|
// send them to the LLM. The synthetic instruction note lives only inside
|
||||||
|
// the summary call's messages array and is never persisted, so on a
|
||||||
|
// follow-up turn the model resumes with a clean context.
|
||||||
|
if (isAnySentinel(m)) continue;
|
||||||
|
if (m.role === 'assistant' && m.status === 'streaming') continue;
|
||||||
|
if (m.role === 'assistant' && m.status === 'cancelled') continue;
|
||||||
|
if (m.role === 'tool') {
|
||||||
|
const tr = m.tool_results;
|
||||||
|
if (!tr) continue;
|
||||||
|
const outputText = tr.error
|
||||||
|
? `error: ${tr.error}`
|
||||||
|
: typeof tr.output === 'string'
|
||||||
|
? tr.output
|
||||||
|
: JSON.stringify(tr.output);
|
||||||
|
out.push({
|
||||||
|
role: 'tool',
|
||||||
|
content: outputText,
|
||||||
|
tool_call_id: tr.tool_call_id,
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (m.role === 'assistant') {
|
||||||
|
const msg: OpenAiMessage = {
|
||||||
|
role: 'assistant',
|
||||||
|
content: m.content && m.content.length > 0 ? m.content : null,
|
||||||
|
};
|
||||||
|
if (m.tool_calls && m.tool_calls.length > 0) {
|
||||||
|
msg.tool_calls = m.tool_calls.map((tc) => ({
|
||||||
|
id: tc.id,
|
||||||
|
type: 'function' as const,
|
||||||
|
function: { name: tc.name, arguments: JSON.stringify(tc.args) },
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
out.push(msg);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
out.push({ role: 'user', content: m.content });
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function loadContext(
|
||||||
|
sql: Sql,
|
||||||
|
sessionId: string,
|
||||||
|
chatId: string
|
||||||
|
): Promise<{ session: Session; project: Project; history: Message[] } | null> {
|
||||||
|
const sessionRows = await sql<Session[]>`
|
||||||
|
SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at,
|
||||||
|
agent_id, web_search_enabled
|
||||||
|
FROM sessions WHERE id = ${sessionId}
|
||||||
|
`;
|
||||||
|
if (sessionRows.length === 0) return null;
|
||||||
|
const session = sessionRows[0]!;
|
||||||
|
|
||||||
|
const projectRows = await sql<Project[]>`
|
||||||
|
SELECT id, name, path, added_at, last_session_id, status, gitea_remote,
|
||||||
|
default_system_prompt, default_web_search_enabled
|
||||||
|
FROM projects WHERE id = ${session.project_id}
|
||||||
|
`;
|
||||||
|
if (projectRows.length === 0) return null;
|
||||||
|
const project = projectRows[0]!;
|
||||||
|
|
||||||
|
// v1.11: filter compacted messages out of the inference assembly. The GET
|
||||||
|
// /api/sessions/:id/messages endpoint still returns everything (so the UI
|
||||||
|
// can show history with the summary card inline); only LLM payloads skip
|
||||||
|
// compacted rows. compacted_at IS NULL keeps the active summary + tail.
|
||||||
|
const history = await sql<Message[]>`
|
||||||
|
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
|
||||||
|
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata
|
||||||
|
FROM messages
|
||||||
|
WHERE chat_id = ${chatId} AND compacted_at IS NULL
|
||||||
|
ORDER BY created_at ASC, id ASC
|
||||||
|
`;
|
||||||
|
|
||||||
|
return { session, project, history };
|
||||||
|
}
|
||||||
|
|
||||||
|
// v1.11: shared helper used after both finalizeCompletion and executeToolPhase
|
||||||
|
// persist their token counts. Reads tokens off the just-UPDATEd row (which
|
||||||
|
// the caller returns from RETURNING), runs compaction.isOverflow, and flips
|
||||||
|
// chats.needs_compaction. The next runAssistantTurn invocation acts on it.
|
||||||
|
// Silent on missing tokens — llama-swap occasionally omits usage on truncated
|
||||||
|
// streams, and we'd rather miss one overflow than crash the inference path.
|
||||||
|
export async function maybeFlagForCompaction(
|
||||||
|
ctx: InferenceContext,
|
||||||
|
chatId: string,
|
||||||
|
updated: { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null } | undefined,
|
||||||
|
): Promise<void> {
|
||||||
|
if (!updated) return;
|
||||||
|
const promptTokens = updated.ctx_used;
|
||||||
|
const completionTokens = updated.tokens_used;
|
||||||
|
const contextLimit = updated.ctx_max;
|
||||||
|
if (typeof promptTokens !== 'number') return;
|
||||||
|
if (typeof completionTokens !== 'number') return;
|
||||||
|
if (typeof contextLimit !== 'number') return;
|
||||||
|
const overflow = compaction.isOverflow(
|
||||||
|
{ prompt_tokens: promptTokens, completion_tokens: completionTokens },
|
||||||
|
contextLimit,
|
||||||
|
);
|
||||||
|
if (!overflow) return;
|
||||||
|
await ctx.sql`UPDATE chats SET needs_compaction = true WHERE id = ${chatId}`;
|
||||||
|
ctx.log.info({ chatId, promptTokens, completionTokens, contextLimit }, 'inference: flagged for compaction');
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user