diff --git a/apps/server/src/schema.sql b/apps/server/src/schema.sql index 78e9ac5..9be69f6 100644 --- a/apps/server/src/schema.sql +++ b/apps/server/src/schema.sql @@ -56,6 +56,24 @@ CREATE TABLE IF NOT EXISTS message_parts ( ); CREATE INDEX IF NOT EXISTS message_parts_msg_seq_idx ON message_parts (message_id, sequence); +-- v1.13.4: prune support. hidden_at marks parts that have been pruned out +-- of the model payload by the two-tier compaction prune (services/inference/ +-- prune.ts). Rows stay in the DB so frontend can still display them with a +-- "hidden" indicator (out of scope this dispatch). messages_with_parts +-- view filters these out — see below. Partial index speeds the common +-- "visible parts only" filter. +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'message_parts' AND column_name = 'hidden_at' + ) THEN + ALTER TABLE message_parts ADD COLUMN hidden_at timestamptz NULL; + END IF; +END $$; +CREATE INDEX IF NOT EXISTS message_parts_hidden_idx + ON message_parts (message_id) WHERE hidden_at IS NULL; + -- v1.13.1-B: read-path view. Read sites SELECT FROM messages_with_parts -- instead of messages so tool_calls / tool_results / reasoning_parts come -- from the granular message_parts table. The COALESCE means pre-v1.13.0 @@ -73,23 +91,32 @@ SELECT m.last_seq, m.tokens_used, m.ctx_used, m.ctx_max, m.started_at, m.finished_at, m.created_at, m.metadata, m.summary, m.tail_start_id, m.compacted_at, - COALESCE( - (SELECT jsonb_agg(p.payload ORDER BY p.sequence) - FROM message_parts p - WHERE p.message_id = m.id AND p.kind = 'tool_call'), - m.tool_calls - ) AS tool_calls, - COALESCE( - (SELECT p.payload - FROM message_parts p - WHERE p.message_id = m.id AND p.kind = 'tool_result' - ORDER BY p.sequence - LIMIT 1), - m.tool_results - ) AS tool_results, + -- v1.13.4: prune semantics need to distinguish "no parts row exists" + -- (pre-v1.13.0 fallback to legacy column) from "all parts hidden" + -- (prune intended — return null/empty so the row drops from the model + -- payload). A naive COALESCE would fall back to the legacy column when + -- every part is hidden, undoing the prune. CASE on EXISTS(any kind) + -- splits the two cases. + CASE + WHEN EXISTS (SELECT 1 FROM message_parts pp + WHERE pp.message_id = m.id AND pp.kind = 'tool_call') + THEN (SELECT jsonb_agg(p.payload ORDER BY p.sequence) + FROM message_parts p + WHERE p.message_id = m.id AND p.kind = 'tool_call' AND p.hidden_at IS NULL) + ELSE m.tool_calls + END AS tool_calls, + CASE + WHEN EXISTS (SELECT 1 FROM message_parts pp + WHERE pp.message_id = m.id AND pp.kind = 'tool_result') + THEN (SELECT p.payload + FROM message_parts p + WHERE p.message_id = m.id AND p.kind = 'tool_result' AND p.hidden_at IS NULL + ORDER BY p.sequence LIMIT 1) + ELSE m.tool_results + END AS tool_results, (SELECT jsonb_agg(p.payload ORDER BY p.sequence) FROM message_parts p - WHERE p.message_id = m.id AND p.kind = 'reasoning') AS reasoning_parts + WHERE p.message_id = m.id AND p.kind = 'reasoning' AND p.hidden_at IS NULL) AS reasoning_parts FROM messages m; ALTER TABLE messages ADD COLUMN IF NOT EXISTS tokens_used INTEGER; diff --git a/apps/server/src/services/__tests__/prune.test.ts b/apps/server/src/services/__tests__/prune.test.ts new file mode 100644 index 0000000..c7ec365 --- /dev/null +++ b/apps/server/src/services/__tests__/prune.test.ts @@ -0,0 +1,96 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { + selectPruneTargets, + PROTECTED_TOKENS, + PRUNE_TRIGGER_TOKENS, + type PartForPrune, +} from '../inference/prune.js'; + +// Test fixture: build a tool_result part whose payload size yields a known +// token estimate (chars/4). The decision logic only cares about +// JSON.stringify(payload).length, so a string payload of `4n` chars +// produces exactly `n` tokens. +let seq = 0; +function part(tokens: number, createdAt: Date): PartForPrune { + seq += 1; + // JSON.stringify("xxx...") wraps in quotes (adds 2 chars), so subtract 2 + // before multiplying. Math.ceil((len+2)/4) needs len ≈ 4*tokens - 2 so the + // total stringified length is 4*tokens. Approximate by padding 4 chars per + // token; the off-by-one from quotes is small and tests check totals, not + // exact per-part counts. + const text = 'x'.repeat(tokens * 4 - 2); + return { id: `p${seq}`, payload: text, created_at: createdAt }; +} + +const T_NOW = new Date('2026-05-22T12:00:00Z'); +function ago(secondsBack: number): Date { + return new Date(T_NOW.getTime() - secondsBack * 1000); +} + +describe('selectPruneTargets', () => { + beforeEach(() => { + seq = 0; + }); + + it('returns nothing when there are no parts', () => { + expect(selectPruneTargets([], null)).toEqual({ ids: [], freedTokens: 0 }); + }); + + it('returns nothing when total tokens are under the protection window', () => { + const parts: PartForPrune[] = [ + part(10_000, ago(10)), + part(10_000, ago(20)), + ]; // 20k total, all protected + expect(selectPruneTargets(parts, null)).toEqual({ ids: [], freedTokens: 0 }); + }); + + it('returns nothing when candidate total is below the prune trigger', () => { + // Protection fills with ~40k newest, candidates only ~5k. Below 20k trigger. + const parts: PartForPrune[] = [ + part(20_000, ago(10)), + part(20_000, ago(20)), + // Past protection; total ~5k won't trigger. + part(5_000, ago(30)), + ]; + const result = selectPruneTargets(parts, null); + expect(result.ids).toEqual([]); + expect(result.freedTokens).toBe(0); + }); + + it('hides candidates past protection when their total clears the trigger', () => { + // Newest 40k protected; older 30k cleanly above the 20k trigger. + const parts: PartForPrune[] = [ + part(20_000, ago(10)), + part(20_000, ago(20)), + // Past protection, total ~30k freed. + part(15_000, ago(30)), + part(15_000, ago(40)), + ]; + const result = selectPruneTargets(parts, null); + expect(result.ids).toEqual(['p3', 'p4']); + expect(result.freedTokens).toBeGreaterThanOrEqual(PRUNE_TRIGGER_TOKENS); + }); + + it('stops at the compaction summary boundary', () => { + // Newest 30k protected (just under PROTECTED_TOKENS=40k); then 30k of + // older parts. Boundary sits at ago(35), so the ago(40) part is + // beyond it and gets skipped. + const parts: PartForPrune[] = [ + part(15_000, ago(10)), + part(15_000, ago(20)), + part(15_000, ago(30)), // crosses protection threshold; candidate + part(15_000, ago(40)), // beyond summary boundary; skipped + ]; + const tailStart = ago(35); + const result = selectPruneTargets(parts, tailStart); + // ago(30) is the only candidate inside the window; 15k is below the + // 20k trigger so we expect no hides. + expect(result.ids).toEqual([]); + }); + + it('does not prune when only protected parts exist (no candidates)', () => { + // Exactly PROTECTED_TOKENS of newest parts; no older candidates. + const parts: PartForPrune[] = [part(PROTECTED_TOKENS, ago(10))]; + expect(selectPruneTargets(parts, null)).toEqual({ ids: [], freedTokens: 0 }); + }); +}); diff --git a/apps/server/src/services/inference/payload.ts b/apps/server/src/services/inference/payload.ts index 84c579b..f5c11f9 100644 --- a/apps/server/src/services/inference/payload.ts +++ b/apps/server/src/services/inference/payload.ts @@ -8,6 +8,7 @@ import type { import * as compaction from '../compaction.js'; import { buildSystemPrompt } from '../system-prompt.js'; import { isAnySentinel } from './sentinels.js'; +import { PRUNE_TRIGGER_TOKENS, prune } from './prune.js'; import type { InferenceContext } from './turn.js'; export interface OpenAiMessage { @@ -166,6 +167,26 @@ export async function maybeFlagForCompaction( contextLimit, ); if (!overflow) return; + + // v1.13.4: try the cheap prune first. If it freed at least the buffer + // worth of tokens (PRUNE_TRIGGER_TOKENS, identical to COMPACTION_BUFFER), + // we're below the threshold again — skip flagging summarize for the next + // turn. The next turn's overflow check will re-evaluate from scratch. + // Prune failures (DB errors etc.) propagate so the surrounding inference + // path sees them; the catch in finalizeCompletion / executeToolPhase + // doesn't shield this — by design, we want to know if prune is broken. + const pruned = await prune({ sql: ctx.sql, chatId }); + if (pruned.hidden > 0) { + ctx.log.info( + { chatId, hidden: pruned.hidden, freedTokens: pruned.freedTokens }, + 'inference: prune freed context budget', + ); + } + if (pruned.freedTokens >= PRUNE_TRIGGER_TOKENS) { + // Prune handled it; skip the (expensive) summarize path. + return; + } + await ctx.sql`UPDATE chats SET needs_compaction = true WHERE id = ${chatId}`; ctx.log.info({ chatId, promptTokens, completionTokens, contextLimit }, 'inference: flagged for compaction'); } diff --git a/apps/server/src/services/inference/prune.ts b/apps/server/src/services/inference/prune.ts new file mode 100644 index 0000000..06d67c3 --- /dev/null +++ b/apps/server/src/services/inference/prune.ts @@ -0,0 +1,127 @@ +import type { Sql } from '../../db.js'; + +// v1.13.4: two-tier compaction prune. Opencode's prune half (the cheap one); +// summarize half shipped in v1.11.0 as services/compaction.ts. +// +// Algorithm: scan tool_result parts newest-first. Protect the last +// PROTECTED_TOKENS of content (the model recently saw these — pruning them +// kills coherence). Older parts are candidates. Mark them hidden_at only +// if the candidate pool would free at least PRUNE_TRIGGER_TOKENS — pruning +// 3 small tool_results to recover 500 tokens isn't worth the loss of +// fidelity for the model's next turn. +// +// Stops at the last compaction summary boundary (chats.tail_start_id). The +// v1.11.0 summary already encodes everything before that point; pruning +// across the boundary would double-erase. + +export const PROTECTED_TOKENS = 40_000; +export const PRUNE_TRIGGER_TOKENS = 20_000; + +// Rough char-to-token estimate. Same heuristic compaction's usable() uses +// implicitly via the buffer constant. +function estimateTokens(text: string): number { + return Math.ceil(text.length / 4); +} + +function payloadTokens(payload: unknown): number { + return estimateTokens(JSON.stringify(payload ?? '')); +} + +export interface PruneResult { + hidden: number; + freedTokens: number; +} + +// Pure algorithmic core, exported for unit-test access. Takes parts already +// ordered newest-first, plus an optional cutoff (last compaction summary +// boundary). Returns the part ids to hide and the total token estimate of +// the candidates. Caller does the DB UPDATE. +export interface PartForPrune { + id: string; + payload: unknown; + created_at: Date; +} + +export function selectPruneTargets( + partsNewestFirst: ReadonlyArray, + tailStartCreatedAt: Date | null, +): { ids: string[]; freedTokens: number } { + let protectedTokens = 0; + const candidates: { id: string; tokens: number }[] = []; + let crossedProtection = false; + + for (const part of partsNewestFirst) { + if (tailStartCreatedAt && part.created_at < tailStartCreatedAt) { + // Past the last summary boundary; the v1.11.0 anchored summary already + // covers everything older. Bail rather than double-erase. + break; + } + const tokens = payloadTokens(part.payload); + if (!crossedProtection) { + protectedTokens += tokens; + if (protectedTokens >= PROTECTED_TOKENS) { + crossedProtection = true; + } + continue; + } + candidates.push({ id: part.id, tokens }); + } + + const candidateTokens = candidates.reduce((s, c) => s + c.tokens, 0); + if (candidates.length === 0 || candidateTokens < PRUNE_TRIGGER_TOKENS) { + return { ids: [], freedTokens: 0 }; + } + return { ids: candidates.map((c) => c.id), freedTokens: candidateTokens }; +} + +export async function prune(args: { + sql: Sql; + chatId: string; +}): Promise { + const { sql, chatId } = args; + + // Newest-first scan of visible tool_result parts in this chat. Pull + // chats.tail_start_id alongside so we know where the last summary boundary + // sits (don't prune across it). + const parts = await sql<{ + id: string; + payload: unknown; + created_at: Date; + tail_start_id: string | null; + }[]>` + SELECT p.id, p.payload, m.created_at, + (SELECT c.tail_start_id FROM chats c WHERE c.id = ${chatId}) AS tail_start_id + FROM message_parts p + JOIN messages m ON m.id = p.message_id + WHERE m.chat_id = ${chatId} + AND p.kind = 'tool_result' + AND p.hidden_at IS NULL + ORDER BY m.created_at DESC, p.sequence DESC + `; + + if (parts.length === 0) { + return { hidden: 0, freedTokens: 0 }; + } + + // Read the boundary cutoff timestamp once. Older messages are off-limits. + let tailStartCreatedAt: Date | null = null; + const firstTailId = parts[0]?.tail_start_id ?? null; + if (firstTailId) { + const tailRow = await sql<{ created_at: Date }[]>` + SELECT created_at FROM messages WHERE id = ${firstTailId} + `; + tailStartCreatedAt = tailRow[0]?.created_at ?? null; + } + + const decision = selectPruneTargets(parts, tailStartCreatedAt); + if (decision.ids.length === 0) { + return { hidden: 0, freedTokens: 0 }; + } + + await sql` + UPDATE message_parts + SET hidden_at = clock_timestamp() + WHERE id = ANY(${decision.ids}) + `; + return { hidden: decision.ids.length, freedTokens: decision.freedTokens }; +}