v1.11: opencode-style compaction port
- compaction.ts: usable/isOverflow/estimate/turns/select/buildPrompt/process
- compaction-prompt.ts: SUMMARY_TEMPLATE verbatim from opencode
- schema: messages.{compacted_at,summary,tail_start_id} + chats.needs_compaction
- inference: auto-trigger on overflow, pre-fetch compaction before next turn
- /compact slash command rewired to new path
- WS: chat_status working/idle around compaction + compacted frame
- frontend: SummaryCard + sonner toast on compacted
- 24 unit tests for pure functions
This commit is contained in:
503
apps/server/src/services/compaction.ts
Normal file
503
apps/server/src/services/compaction.ts
Normal file
@@ -0,0 +1,503 @@
|
||||
// v1.11: anchored rolling compaction. Ported algorithms (not Effect-TS code)
|
||||
// from opencode (packages/opencode/src/session/{compaction,overflow}.ts).
|
||||
//
|
||||
// What's different from BooCode's legacy /compact:
|
||||
// - Operates per-chat (chats have N:1 to sessions; history is per-chat).
|
||||
// - Detects overflow automatically after each inference completion using
|
||||
// llama-swap's reported n_ctx; flags chats.needs_compaction=true.
|
||||
// - On the next turn (or manual /compact) we summarize the *head* (messages
|
||||
// prior to a preserved tail of N user-turns) into a single
|
||||
// summary=true assistant row. Older messages get compacted_at-stamped so
|
||||
// inference assembly filters them out; the GET endpoint still returns
|
||||
// them so the UI can show history with the summary card inline.
|
||||
// - The summary is *anchored rolling* — exactly one live summary=true row
|
||||
// per chat. Subsequent compactions read the prior summary as
|
||||
// previousSummary, ask the LLM to update-merge it, then mark the prior
|
||||
// summary row compacted_at too (it stays in the UI but isn't sent to the
|
||||
// LLM again).
|
||||
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { Sql } from '../db.js';
|
||||
import type { Config } from '../config.js';
|
||||
import type { Broker } from './broker.js';
|
||||
import { SUMMARY_TEMPLATE } from './compaction-prompt.js';
|
||||
|
||||
const COMPACTION_BUFFER = 20_000;
|
||||
const MIN_PRESERVE_RECENT_TOKENS = 2_000;
|
||||
const MAX_PRESERVE_RECENT_TOKENS = 8_000;
|
||||
const DEFAULT_TAIL_TURNS = 2;
|
||||
|
||||
// Subset of Message fields compaction touches. Selecting only what's needed
|
||||
// keeps process() independent of api.ts mutations and reduces DB egress.
|
||||
export interface CompactionMessage {
|
||||
id: string;
|
||||
role: 'user' | 'assistant' | 'system' | 'tool';
|
||||
content: string;
|
||||
kind: 'message' | 'compact';
|
||||
summary: boolean;
|
||||
status: 'streaming' | 'complete' | 'failed' | 'cancelled';
|
||||
tool_calls: Array<{ id: string; name: string; args: Record<string, unknown> }> | null;
|
||||
tool_results: { tool_call_id: string; output: unknown; truncated: boolean; error?: string } | null;
|
||||
metadata: { kind?: string } | null;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
// === overflow ===
|
||||
|
||||
// Tokens we hold in reserve for the model's response so a near-full context
|
||||
// can still produce a useful turn. Mirrors opencode's COMPACTION_BUFFER.
|
||||
// Returns 0 when the context limit is unknown (caller treats 0 as "do not
|
||||
// trigger overflow"); avoids dividing-by-zero downstream.
|
||||
export function usable(contextLimit: number): number {
|
||||
if (!contextLimit || contextLimit <= 0) return 0;
|
||||
return Math.max(0, contextLimit - COMPACTION_BUFFER);
|
||||
}
|
||||
|
||||
export interface Usage {
|
||||
prompt_tokens: number;
|
||||
completion_tokens: number;
|
||||
}
|
||||
|
||||
// True when the assistant just used >= usable() tokens. Unknown limit → false
|
||||
// (we never auto-trigger compaction without a budget — better to keep
|
||||
// inference flowing than to fall into a compaction we can't size properly).
|
||||
export function isOverflow(usage: Usage, contextLimit: number): boolean {
|
||||
const budget = usable(contextLimit);
|
||||
if (budget <= 0) return false;
|
||||
return (usage.prompt_tokens + usage.completion_tokens) >= budget;
|
||||
}
|
||||
|
||||
// === selection ===
|
||||
|
||||
interface Turn {
|
||||
start: number;
|
||||
end: number;
|
||||
id: string;
|
||||
}
|
||||
|
||||
// Char-count / 4 token estimate. Matches opencode's Token.estimate (which
|
||||
// also goes through JSON.stringify). Adequate for tail-fitting math; we
|
||||
// don't need a real tokenizer here — the 20k buffer absorbs the slop.
|
||||
export function estimate(messages: CompactionMessage[]): number {
|
||||
return Math.ceil(JSON.stringify(messages).length / 4);
|
||||
}
|
||||
|
||||
// Walk messages, return one Turn per user message that is NOT a summary row.
|
||||
// end = next-user-start; final turn ends at messages.length.
|
||||
export function turns(messages: CompactionMessage[]): Turn[] {
|
||||
const result: Turn[] = [];
|
||||
for (let i = 0; i < messages.length; i++) {
|
||||
const m = messages[i]!;
|
||||
if (m.role !== 'user') continue;
|
||||
if (m.summary) continue;
|
||||
result.push({ start: i, end: messages.length, id: m.id });
|
||||
}
|
||||
for (let i = 0; i < result.length - 1; i++) {
|
||||
result[i]!.end = result[i + 1]!.start;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// Inside a turn that doesn't fit whole, walk forward from start+1 looking for
|
||||
// the largest suffix that fits the remaining budget. Returns the keep-start
|
||||
// index (the first preserved message) or undefined if no suffix fits.
|
||||
function splitTurn(
|
||||
messages: CompactionMessage[],
|
||||
turn: Turn,
|
||||
budget: number,
|
||||
): { start: number; id: string } | undefined {
|
||||
if (budget <= 0) return undefined;
|
||||
if (turn.end - turn.start <= 1) return undefined;
|
||||
for (let start = turn.start + 1; start < turn.end; start++) {
|
||||
const size = estimate(messages.slice(start, turn.end));
|
||||
if (size > budget) continue;
|
||||
return { start, id: messages[start]!.id };
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export interface SelectResult {
|
||||
head: CompactionMessage[];
|
||||
tail_start_id: string | undefined;
|
||||
}
|
||||
|
||||
// Choose the boundary between the "head" (to be summarized) and the "tail"
|
||||
// (preserved verbatim). Strategy:
|
||||
// 1. Reserve a budget for the recent tail. Default ranges [2k, 8k] tokens
|
||||
// with 25% of usable() as the target.
|
||||
// 2. Take the last `tail_turns` user-turns; greedily fit from newest back.
|
||||
// 3. If the next-older turn doesn't fit whole, split it mid-turn.
|
||||
// 4. If we couldn't keep anything OR everything fit (keep.start === 0),
|
||||
// return full-preserve (no compaction this round).
|
||||
export function select(
|
||||
messages: CompactionMessage[],
|
||||
contextLimit: number,
|
||||
tailTurns: number = DEFAULT_TAIL_TURNS,
|
||||
): SelectResult {
|
||||
if (tailTurns <= 0) return { head: messages, tail_start_id: undefined };
|
||||
const budget = Math.min(
|
||||
MAX_PRESERVE_RECENT_TOKENS,
|
||||
Math.max(MIN_PRESERVE_RECENT_TOKENS, Math.floor(usable(contextLimit) * 0.25)),
|
||||
);
|
||||
|
||||
const all = turns(messages);
|
||||
if (all.length === 0) return { head: messages, tail_start_id: undefined };
|
||||
const recent = all.slice(-tailTurns);
|
||||
|
||||
let total = 0;
|
||||
let keep: { start: number; id: string } | undefined;
|
||||
for (let i = recent.length - 1; i >= 0; i--) {
|
||||
const turn = recent[i]!;
|
||||
const size = estimate(messages.slice(turn.start, turn.end));
|
||||
if (total + size <= budget) {
|
||||
total += size;
|
||||
keep = { start: turn.start, id: turn.id };
|
||||
continue;
|
||||
}
|
||||
const remaining = budget - total;
|
||||
const split = splitTurn(messages, turn, remaining);
|
||||
if (split) keep = split;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!keep || keep.start === 0) {
|
||||
return { head: messages, tail_start_id: undefined };
|
||||
}
|
||||
return {
|
||||
head: messages.slice(0, keep.start),
|
||||
tail_start_id: keep.id,
|
||||
};
|
||||
}
|
||||
|
||||
// === prompt assembly ===
|
||||
|
||||
// Build the final user message that asks the model to (re)produce the
|
||||
// anchored summary. `context` is reserved for future plugin injection;
|
||||
// callers pass [] today.
|
||||
export function buildPrompt(
|
||||
previousSummary: string | undefined,
|
||||
context: string[],
|
||||
): string {
|
||||
const anchor = previousSummary
|
||||
? [
|
||||
'Update the anchored summary below using the conversation history above.',
|
||||
'Preserve still-true details, remove stale details, and merge in the new facts.',
|
||||
'<previous-summary>',
|
||||
previousSummary,
|
||||
'</previous-summary>',
|
||||
].join('\n')
|
||||
: 'Create a new anchored summary from the conversation history above.';
|
||||
return [anchor, SUMMARY_TEMPLATE, ...context].join('\n\n');
|
||||
}
|
||||
|
||||
// === OpenAI conversion (compaction-local; intentionally does NOT call
|
||||
// inference.ts buildMessagesPayload because that uses the legacy "find latest
|
||||
// kind='compact' marker and skip everything before it" shortcircuit, which
|
||||
// would silently drop pre-legacy-compact history before the LLM sees it.
|
||||
// Compaction wants to send the entire head, full stop.) ===
|
||||
|
||||
interface OpenAiMessage {
|
||||
role: 'system' | 'user' | 'assistant' | 'tool';
|
||||
content: string | null;
|
||||
tool_calls?: Array<{
|
||||
id: string;
|
||||
type: 'function';
|
||||
function: { name: string; arguments: string };
|
||||
}>;
|
||||
tool_call_id?: string;
|
||||
}
|
||||
|
||||
function isCapHitSentinel(m: CompactionMessage): boolean {
|
||||
return m.role === 'system' && m.metadata != null && m.metadata.kind === 'cap_hit';
|
||||
}
|
||||
|
||||
function buildHeadPayload(head: CompactionMessage[]): OpenAiMessage[] {
|
||||
const out: OpenAiMessage[] = [];
|
||||
for (const m of head) {
|
||||
if (isCapHitSentinel(m)) continue;
|
||||
if (m.role === 'assistant' && (m.status === 'streaming' || m.status === 'cancelled')) continue;
|
||||
if (m.kind === 'compact') {
|
||||
// Legacy compact row — pass through as system context. The new
|
||||
// anchored summary will subsume it, but the LLM should see it during
|
||||
// the bridging round so it can carry forward the still-true bits.
|
||||
out.push({ role: 'system', content: m.content });
|
||||
continue;
|
||||
}
|
||||
if (m.summary) {
|
||||
// Defense in depth: process() filters these out of the select-input
|
||||
// already. If one slips through, render it as assistant content so we
|
||||
// never crash here.
|
||||
out.push({ role: 'assistant', content: m.content });
|
||||
continue;
|
||||
}
|
||||
if (m.role === 'tool') {
|
||||
const tr = m.tool_results;
|
||||
if (!tr) continue;
|
||||
const outputText = tr.error
|
||||
? `error: ${tr.error}`
|
||||
: typeof tr.output === 'string'
|
||||
? tr.output
|
||||
: JSON.stringify(tr.output);
|
||||
out.push({ role: 'tool', content: outputText, tool_call_id: tr.tool_call_id });
|
||||
continue;
|
||||
}
|
||||
if (m.role === 'assistant') {
|
||||
const msg: OpenAiMessage = {
|
||||
role: 'assistant',
|
||||
content: m.content && m.content.length > 0 ? m.content : null,
|
||||
};
|
||||
if (m.tool_calls && m.tool_calls.length > 0) {
|
||||
msg.tool_calls = m.tool_calls.map((tc) => ({
|
||||
id: tc.id,
|
||||
type: 'function' as const,
|
||||
function: { name: tc.name, arguments: JSON.stringify(tc.args) },
|
||||
}));
|
||||
}
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
out.push({ role: 'user', content: m.content });
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// === llama-swap call ===
|
||||
|
||||
// Non-streaming completion. Opencode streams; for a one-shot summary call a
|
||||
// single POST is less code and the latency hit is acceptable (the user
|
||||
// doesn't see this directly — useSessionStream emits the toast + refetches
|
||||
// on the 'compacted' frame).
|
||||
interface CompletionResult {
|
||||
content: string;
|
||||
promptTokens: number;
|
||||
completionTokens: number;
|
||||
nCtx: number | null;
|
||||
}
|
||||
|
||||
async function callLlamaSwap(
|
||||
config: Config,
|
||||
model: string,
|
||||
messages: OpenAiMessage[],
|
||||
log: FastifyBaseLogger,
|
||||
): Promise<CompletionResult> {
|
||||
const res = await fetch(`${config.LLAMA_SWAP_URL}/v1/chat/completions`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ model, messages, stream: false }),
|
||||
});
|
||||
if (!res.ok) {
|
||||
const text = await res.text().catch(() => '');
|
||||
throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`);
|
||||
}
|
||||
const json = (await res.json()) as {
|
||||
choices?: Array<{ message?: { content?: string } }>;
|
||||
usage?: { prompt_tokens?: number; completion_tokens?: number };
|
||||
timings?: { n_ctx?: number };
|
||||
};
|
||||
const content = json.choices?.[0]?.message?.content ?? '';
|
||||
const promptTokens = json.usage?.prompt_tokens ?? 0;
|
||||
const completionTokens = json.usage?.completion_tokens ?? 0;
|
||||
const nCtx = typeof json.timings?.n_ctx === 'number' ? json.timings.n_ctx : null;
|
||||
log.debug({ promptTokens, completionTokens, nCtx, chars: content.length }, 'compaction llm complete');
|
||||
return { content, promptTokens, completionTokens, nCtx };
|
||||
}
|
||||
|
||||
// === entry point ===
|
||||
|
||||
export interface ProcessInput {
|
||||
sql: Sql;
|
||||
config: Config;
|
||||
log: FastifyBaseLogger;
|
||||
broker: Broker;
|
||||
chatId: string;
|
||||
}
|
||||
|
||||
// Runs one round of anchored rolling compaction on `chatId`. No-ops cleanly
|
||||
// (clearing needs_compaction) when there's nothing reasonable to compact.
|
||||
// Throws on LLM failure — callers decide whether to log+swallow or surface.
|
||||
export async function process(input: ProcessInput): Promise<void> {
|
||||
const { sql, config, log, broker, chatId } = input;
|
||||
|
||||
// 1. Resolve chat → session for model + WS publish channel.
|
||||
const chatRows = await sql<{ id: string; session_id: string }[]>`
|
||||
SELECT id, session_id FROM chats WHERE id = ${chatId}
|
||||
`;
|
||||
if (chatRows.length === 0) {
|
||||
log.warn({ chatId }, 'compaction: chat not found');
|
||||
return;
|
||||
}
|
||||
const chat = chatRows[0]!;
|
||||
const sessionId = chat.session_id;
|
||||
|
||||
const sessRows = await sql<{ id: string; model: string }[]>`
|
||||
SELECT id, model FROM sessions WHERE id = ${sessionId}
|
||||
`;
|
||||
if (sessRows.length === 0) {
|
||||
log.warn({ chatId, sessionId }, 'compaction: session not found');
|
||||
return;
|
||||
}
|
||||
const session = sessRows[0]!;
|
||||
|
||||
// 2. All currently-active messages in this chat (compacted_at IS NULL).
|
||||
// ORDER BY (created_at, id) matches loadContext in inference.ts so the
|
||||
// turns() boundary logic sees the same sequence the LLM will.
|
||||
const messages = await sql<CompactionMessage[]>`
|
||||
SELECT id, role, content, kind, summary, status, tool_calls, tool_results, metadata, created_at
|
||||
FROM messages
|
||||
WHERE chat_id = ${chatId} AND compacted_at IS NULL
|
||||
ORDER BY created_at ASC, id ASC
|
||||
`;
|
||||
if (messages.length === 0) {
|
||||
await sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`;
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. Find the prior anchored summary (newest summary=true row). Its content
|
||||
// becomes previousSummary — the anchor in the prompt. Filter it out of the
|
||||
// select-input so we don't double-encode (it's already in the anchor text).
|
||||
const previousSummary = messages.filter((m) => m.summary).at(-1)?.content;
|
||||
const forSelect = messages.filter((m) => !m.summary);
|
||||
|
||||
// 4. Resolve a recent context limit. llama-swap reports timings.n_ctx per
|
||||
// completion; we cache it on messages.ctx_max. Use the most recent value
|
||||
// from any message in this chat (oldest assumption is the same model is
|
||||
// still running). When unknown, fall back to model.context_limit-less
|
||||
// defaults via the buffer-only path (see usable()).
|
||||
const ctxRows = await sql<{ ctx_max: number | null }[]>`
|
||||
SELECT ctx_max FROM messages
|
||||
WHERE chat_id = ${chatId} AND ctx_max IS NOT NULL
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
`;
|
||||
const contextLimit = ctxRows[0]?.ctx_max ?? 0;
|
||||
|
||||
// 5. Decide head / tail.
|
||||
const sel = select(forSelect, contextLimit);
|
||||
if (!sel.tail_start_id || sel.head.length === 0) {
|
||||
// Full preserve — nothing to compact this round. Clear the flag so we
|
||||
// don't loop. (Could happen when the chat is short or the budget swung
|
||||
// wider after a model context bump.)
|
||||
await sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`;
|
||||
log.info({ chatId, contextLimit, msgCount: messages.length }, 'compaction: nothing to compact');
|
||||
return;
|
||||
}
|
||||
|
||||
// 6. Build the OpenAI request: head as user/assistant/tool turns + a final
|
||||
// user message carrying buildPrompt(previousSummary, []). No system prompt
|
||||
// — matches opencode (`system: []`); the template + anchor are sufficient.
|
||||
const headPayload = buildHeadPayload(sel.head);
|
||||
const finalUser: OpenAiMessage = { role: 'user', content: buildPrompt(previousSummary, []) };
|
||||
const payload = [...headPayload, finalUser];
|
||||
|
||||
log.info(
|
||||
{
|
||||
chatId,
|
||||
contextLimit,
|
||||
headLen: sel.head.length,
|
||||
tailStartId: sel.tail_start_id,
|
||||
hadPrevSummary: previousSummary !== undefined,
|
||||
},
|
||||
'compaction: invoking model',
|
||||
);
|
||||
|
||||
// 6a. Flip the chat dot amber for the duration of the LLM call + DB writes.
|
||||
// Same { type: 'chat_status', status: 'working', at } shape inference.ts
|
||||
// emits at runner enqueue. publishUser → broadcasts on the per-user channel
|
||||
// (all devices / tabs see it) since chat_status is a user-channel frame in
|
||||
// BooCode (see useChatStatus.ts, which is the consumer).
|
||||
broker.publishUser('default', {
|
||||
type: 'chat_status',
|
||||
chat_id: chatId,
|
||||
status: 'working',
|
||||
at: new Date().toISOString(),
|
||||
});
|
||||
|
||||
// try/finally so the dot ALWAYS drops back to idle, even if the LLM call
|
||||
// throws or a downstream DB write fails. The succeeded flag gates the
|
||||
// 'compacted' frame + final log: we only signal completion to the UI when
|
||||
// the new summary row actually landed.
|
||||
let succeeded = false;
|
||||
let newId = '';
|
||||
let result: CompletionResult | undefined;
|
||||
try {
|
||||
// 7. Single completion (no tools). Throws on llama-swap failure.
|
||||
result = await callLlamaSwap(config, session.model, payload, log);
|
||||
|
||||
// 8. Insert the new anchored summary row. role='assistant' per spec; the
|
||||
// UI distinguishes via summary=true. tail_start_id points at the first
|
||||
// preserved tail message so debug surfaces / future tools can reason
|
||||
// about the boundary without re-deriving from compacted_at.
|
||||
const insertRows = await sql<{ id: string }[]>`
|
||||
INSERT INTO messages (
|
||||
session_id, chat_id, role, content, kind, status,
|
||||
summary, tail_start_id,
|
||||
tokens_used, ctx_used, ctx_max,
|
||||
created_at, finished_at
|
||||
)
|
||||
VALUES (
|
||||
${sessionId}, ${chatId}, 'assistant', ${result.content}, 'message', 'complete',
|
||||
true, ${sel.tail_start_id},
|
||||
${result.completionTokens}, ${result.promptTokens}, ${result.nCtx},
|
||||
clock_timestamp(), clock_timestamp()
|
||||
)
|
||||
RETURNING id
|
||||
`;
|
||||
newId = insertRows[0]!.id;
|
||||
|
||||
// 9. Mark every prior live message (head + prior summary) as compacted.
|
||||
// Bound by "created_at strictly less than tail_start_id's created_at" so
|
||||
// the preserved tail stays compacted_at=NULL. Exclude the new summary
|
||||
// row we just inserted (it's "now", which is >= tail_start_id's
|
||||
// created_at anyway, but defensive).
|
||||
await sql`
|
||||
UPDATE messages
|
||||
SET compacted_at = clock_timestamp()
|
||||
WHERE chat_id = ${chatId}
|
||||
AND compacted_at IS NULL
|
||||
AND id != ${newId}
|
||||
AND created_at < (SELECT created_at FROM messages WHERE id = ${sel.tail_start_id})
|
||||
`;
|
||||
|
||||
// 10. Clear the flag and bump the chat's updated_at so the sidebar
|
||||
// reflects recent activity.
|
||||
await sql`
|
||||
UPDATE chats
|
||||
SET needs_compaction = false, updated_at = clock_timestamp()
|
||||
WHERE id = ${chatId}
|
||||
`;
|
||||
|
||||
succeeded = true;
|
||||
} finally {
|
||||
// Always restore the dot. Status='idle' (not 'error') even on failure —
|
||||
// the caller logs/re-surfaces the error separately; the dot doesn't
|
||||
// need to stay red across reloads for a transient compaction blip.
|
||||
broker.publishUser('default', {
|
||||
type: 'chat_status',
|
||||
chat_id: chatId,
|
||||
status: 'idle',
|
||||
at: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
// 11. Tell the client. useSessionStream subscribes to the per-session WS
|
||||
// channel; the handler refetches messages (so the new summary row + the
|
||||
// compacted_at-stamped older rows render correctly) and fires a sonner
|
||||
// toast. Order matters: idle must precede 'compacted' so the dot is
|
||||
// already green by the time the refetch toast appears.
|
||||
if (succeeded) {
|
||||
broker.publish(sessionId, {
|
||||
type: 'compacted',
|
||||
session_id: sessionId,
|
||||
chat_id: chatId,
|
||||
summary_message_id: newId,
|
||||
});
|
||||
log.info(
|
||||
{
|
||||
chatId,
|
||||
newId,
|
||||
completionTokens: result?.completionTokens,
|
||||
promptTokens: result?.promptTokens,
|
||||
},
|
||||
'compaction: complete',
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user