Files
boocode/apps/server/src/services/inference/payload.ts
indifferentketchup 93d3f86c2b v2.2-paseo-providers: Paseo provider stack + v2.2.1 pane-scoped chat fixes
Ship Paseo-equivalent provider snapshot, AgentComposerBar, ACP dispatch
rewrite with streaming/persist, permission prompts, and agent commands.
Follow-up: pane-scoped chat resolution, CoderMessageList tool timeline,
WS user-delta replace, and inference orphan tool_call stripping.
Archive openspec v2-2; update CHANGELOG and CURRENT.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-26 15:18:31 +00:00

268 lines
11 KiB
TypeScript

import type { FastifyBaseLogger } from 'fastify';
import type { Sql } from '../../db.js';
import type {
Agent,
Message,
Project,
Session,
} from '../../types/api.js';
import * as compaction from '../compaction.js';
import { buildSystemPromptWithFingerprint } from '../system-prompt.js';
import { isAnySentinel } from './sentinels.js';
import { PRUNE_TRIGGER_TOKENS, prune } from './prune.js';
import type { InferenceContext } from './turn.js';
export interface OpenAiMessage {
role: 'system' | 'user' | 'assistant' | 'tool';
content: string | null;
tool_calls?: Array<{
id: string;
type: 'function';
function: { name: string; arguments: string };
}>;
tool_call_id?: string;
// v1.13.1-C: reasoning text from a prior assistant turn, sourced from
// message_parts kind='reasoning' rows joined in via reasoning_parts on
// the messages_with_parts view. stream-phase.ts/toModelMessages threads
// this into the AI SDK ReasoningPart when forwarding to the model so
// reasoning models can resume mid-thought across tool-call boundaries.
reasoning?: string;
}
// v1.12: buildSystemPrompt lives in services/system-prompt.ts. It awaits the
// container-guidance loader, so this function is async too and every call
// site in inference.ts awaits the result.
// v1.13.8: optional log argument. When provided, emit prefix-fingerprint
// per call + prefix-drift when the same session sees a hash change. Tests
// omit it and exercise the byte-stability surface directly through
// buildSystemPromptWithFingerprint. The observer Map in system-prompt.ts
// updates regardless of whether log is passed.
function toolResultIdsFollowing(history: Message[], assistantIdx: number): Set<string> {
const ids = new Set<string>();
for (let j = assistantIdx + 1; j < history.length; j++) {
const row = history[j]!;
if (row.role === 'user' || row.role === 'assistant') break;
if (row.role === 'tool' && row.tool_results?.tool_call_id) {
ids.add(row.tool_results.tool_call_id);
}
}
return ids;
}
function findAssistantOwnerForToolCall(history: Message[], toolIdx: number, callId: string): number | null {
for (let k = toolIdx - 1; k >= 0; k--) {
const row = history[k]!;
if (row.role === 'user') break;
if (row.role === 'assistant' && row.tool_calls?.some((tc) => tc.id === callId)) return k;
}
return null;
}
function assistantToolCallsArePayloadComplete(history: Message[], assistantIdx: number): boolean {
const assistant = history[assistantIdx]!;
if (!assistant.tool_calls?.length) return false;
const fulfilled = toolResultIdsFollowing(history, assistantIdx);
return assistant.tool_calls.every((tc) => fulfilled.has(tc.id));
}
export async function buildMessagesPayload(
session: Session,
project: Project,
history: Message[],
agent: Agent | null = null,
log?: FastifyBaseLogger,
): Promise<OpenAiMessage[]> {
const out: OpenAiMessage[] = [];
const { prompt: systemPrompt, fingerprint, drift } =
await buildSystemPromptWithFingerprint(project, session, agent);
if (log) {
log.info(fingerprint);
if (drift) log.warn(drift);
}
out.push({ role: 'system', content: systemPrompt });
// Find the latest compact marker — only send messages from that point onwards
let startIdx = 0;
for (let i = history.length - 1; i >= 0; i--) {
if (history[i]!.kind === 'compact') {
startIdx = i;
break;
}
}
for (let i = startIdx; i < history.length; i++) {
const m = history[i]!;
if (m.kind === 'compact') {
out.push({ role: 'system', content: m.content });
continue;
}
// v1.8.2 / v1.11.6: cap-hit and doom-loop sentinels are UI-only — never
// send them to the LLM. The synthetic instruction note lives only inside
// the summary call's messages array and is never persisted, so on a
// follow-up turn the model resumes with a clean context.
if (isAnySentinel(m)) continue;
if (m.role === 'assistant' && m.status === 'streaming') continue;
if (m.role === 'assistant' && m.status === 'cancelled') continue;
// v1.13.7: skip failed assistant turns. A failed row carries no usable
// content for the model, and leaving it in the payload alongside any
// following assistant message produces "Cannot have 2 or more assistant
// messages at the end of the list" from the OpenAI-compatible upstream.
if (m.role === 'assistant' && m.status === 'failed') continue;
// v1.13.7: skip "empty" completed assistants — clen=0 + no tool_calls.
// These can land when an upstream stream returns finishReason='stop' with
// no text/tool output (network blip, rate limit recovery, model quirk).
// Same risk as the failed-status case: a trailing empty assistant plus
// the next attempt's assistant placeholder = two trailing assistants and
// the API rejects the whole payload.
if (
m.role === 'assistant' &&
m.status === 'complete' &&
(m.content == null || m.content.trim().length === 0) &&
(m.tool_calls == null || m.tool_calls.length === 0)
) {
continue;
}
if (m.role === 'tool') {
const tr = m.tool_results;
if (!tr) continue;
const ownerIdx = findAssistantOwnerForToolCall(history, i, tr.tool_call_id);
if (ownerIdx == null || !assistantToolCallsArePayloadComplete(history, ownerIdx)) {
continue;
}
const outputText = tr.error
? `error: ${tr.error}`
: typeof tr.output === 'string'
? tr.output
: JSON.stringify(tr.output);
out.push({
role: 'tool',
content: outputText,
tool_call_id: tr.tool_call_id,
});
continue;
}
if (m.role === 'assistant') {
const msg: OpenAiMessage = {
role: 'assistant',
content: m.content && m.content.length > 0 ? m.content : null,
};
if (m.tool_calls && m.tool_calls.length > 0) {
if (assistantToolCallsArePayloadComplete(history, i)) {
msg.tool_calls = m.tool_calls.map((tc) => ({
id: tc.id,
type: 'function' as const,
function: { name: tc.name, arguments: JSON.stringify(tc.args) },
}));
}
// Orphaned tool_calls (no matching tool rows) are stripped so the
// upstream API does not reject the payload on the next user turn.
}
// v1.13.1-C: collapse reasoning_parts into a single string. The view
// returns them ordered by sequence; multiple reasoning parts on one
// message are rare but concat preserves ordering. Skip when absent.
if (m.reasoning_parts && m.reasoning_parts.length > 0) {
msg.reasoning = m.reasoning_parts.map((p) => p.text ?? '').join('');
}
const hasPayload =
(msg.content != null && msg.content.trim().length > 0) ||
(msg.tool_calls != null && msg.tool_calls.length > 0) ||
(msg.reasoning != null && msg.reasoning.length > 0);
if (!hasPayload) continue;
out.push(msg);
continue;
}
out.push({ role: 'user', content: m.content });
}
return out;
}
export async function loadContext(
sql: Sql,
sessionId: string,
chatId: string
): Promise<{ session: Session; project: Project; history: Message[] } | null> {
const sessionRows = await sql<Session[]>`
SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at,
agent_id, web_search_enabled, allowed_read_paths
FROM sessions WHERE id = ${sessionId}
`;
if (sessionRows.length === 0) return null;
const session = sessionRows[0]!;
const projectRows = await sql<Project[]>`
SELECT id, name, path, added_at, last_session_id, status, gitea_remote,
default_system_prompt, default_web_search_enabled
FROM projects WHERE id = ${session.project_id}
`;
if (projectRows.length === 0) return null;
const project = projectRows[0]!;
// v1.11: filter compacted messages out of the inference assembly. The GET
// /api/sessions/:id/messages endpoint still returns everything (so the UI
// can show history with the summary card inline); only LLM payloads skip
// compacted rows. compacted_at IS NULL keeps the active summary + tail.
// v1.13.1-B: reads tool_calls/tool_results via the parts-merged view.
// v1.13.1-C: also pull reasoning_parts so assistant messages from
// reasoning models can be replayed with their reasoning context preserved.
const history = await sql<Message[]>`
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata,
reasoning_parts
FROM messages_with_parts
WHERE chat_id = ${chatId} AND compacted_at IS NULL
ORDER BY created_at ASC, id ASC
`;
return { session, project, history };
}
// v1.11: shared helper used after both finalizeCompletion and executeToolPhase
// persist their token counts. Reads tokens off the just-UPDATEd row (which
// the caller returns from RETURNING), runs compaction.isOverflow, and flips
// chats.needs_compaction. The next runAssistantTurn invocation acts on it.
// Silent on missing tokens — llama-swap occasionally omits usage on truncated
// streams, and we'd rather miss one overflow than crash the inference path.
export async function maybeFlagForCompaction(
ctx: InferenceContext,
chatId: string,
updated: { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null } | undefined,
): Promise<void> {
if (!updated) return;
const promptTokens = updated.ctx_used;
const completionTokens = updated.tokens_used;
const contextLimit = updated.ctx_max;
if (typeof promptTokens !== 'number') return;
if (typeof completionTokens !== 'number') return;
if (typeof contextLimit !== 'number') return;
const overflow = compaction.isOverflow(
{ prompt_tokens: promptTokens, completion_tokens: completionTokens },
contextLimit,
);
if (!overflow) return;
// v1.13.4: try the cheap prune first. If it freed at least
// PRUNE_TRIGGER_TOKENS (20k) worth of context, we're below the threshold
// again — skip flagging summarize for the next turn. The next turn's
// overflow check will re-evaluate from scratch.
// v1.13.9: the overflow trigger above is now 85% of ctx_max (was
// ctx_max - 20k). PRUNE_TRIGGER_TOKENS stays at 20k as the prune-freed
// threshold — independent of the overflow formula.
// Prune failures (DB errors etc.) propagate so the surrounding inference
// path sees them; the catch in finalizeCompletion / executeToolPhase
// doesn't shield this — by design, we want to know if prune is broken.
const pruned = await prune({ sql: ctx.sql, chatId });
if (pruned.hidden > 0) {
ctx.log.info(
{ chatId, hidden: pruned.hidden, freedTokens: pruned.freedTokens },
'inference: prune freed context budget',
);
}
if (pruned.freedTokens >= PRUNE_TRIGGER_TOKENS) {
// Prune handled it; skip the (expensive) summarize path.
return;
}
await ctx.sql`UPDATE chats SET needs_compaction = true WHERE id = ${chatId}`;
ctx.log.info({ chatId, promptTokens, completionTokens, contextLimit }, 'inference: flagged for compaction');
}