diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 58d463d..7e88fdb 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -19,6 +19,7 @@ import { registerSkillsRoutes } from './routes/skills.js'; import { createInferenceRunner } from './services/inference.js'; import { createBroker } from './services/broker.js'; import { listSkills } from './services/skills.js'; +import * as compaction from './services/compaction.js'; async function main() { const config = loadConfig(); @@ -81,6 +82,11 @@ async function main() { publish: (sessionId, frame) => { broker.publish(sessionId, frame as unknown as Record & { type: string }); }, + // v1.11: broker handle for compaction.process to publish 'compacted' + // frames on the per-session channel. Inference's regular publish path + // is bound to (sessionId, InferenceFrame); compaction publishes a + // different frame shape, so it goes through the raw broker. + broker, }, (user, frame) => { broker.publishUser(user, frame as unknown as Record & { type: string }); @@ -90,9 +96,13 @@ async function main() { enqueueInference: (sessionId, chatId, assistantId, user) => { inference.enqueue(sessionId, chatId, assistantId, user); }, - enqueueCompact: (sessionId, chatId, compactId, user) => { - inference.enqueueCompact(sessionId, chatId, compactId, user); - }, + // v1.11: synchronous compaction. Awaits the LLM call inside the route's + // request lifecycle; the new summary row arrives via the WS 'compacted' + // frame published from inside compaction.process. We let the error + // bubble up so the route can reply 500 — manual /compact failures + // should be loud (the user just clicked a button). + runCompaction: (chatId) => + compaction.process({ sql, config, log: app.log, broker, chatId }), cancelInference: async (sessionId, chatId) => { return inference.cancel(sessionId, chatId); }, diff --git a/apps/server/src/routes/chats.ts b/apps/server/src/routes/chats.ts index b845c48..5d0c45f 100644 --- a/apps/server/src/routes/chats.ts +++ b/apps/server/src/routes/chats.ts @@ -316,7 +316,8 @@ export function registerChatRoutes( } const rows = await sql` SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq, - tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata + tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata, + summary, tail_start_id, compacted_at FROM messages WHERE chat_id = ${req.params.id} ORDER BY created_at ASC, id ASC diff --git a/apps/server/src/routes/messages.ts b/apps/server/src/routes/messages.ts index 4c0e666..bfaf148 100644 --- a/apps/server/src/routes/messages.ts +++ b/apps/server/src/routes/messages.ts @@ -49,7 +49,12 @@ const AskUserInputArgs = z.object({ interface MessageHandlers { enqueueInference: (sessionId: string, chatId: string, assistantMessageId: string, user: string) => void; - enqueueCompact: (sessionId: string, chatId: string, compactMessageId: string, user: string) => void; + // v1.11: returns a promise that resolves after compaction.process finishes + // (await the LLM call). Throws on failure — the route surfaces a 500. + // Replaces the v1.10 enqueueCompact (which fired-and-forgot a kind='compact' + // streaming row). The new anchored-rolling strategy inserts a single + // summary=true assistant row only after the LLM responds. + runCompaction: (chatId: string) => Promise; publishUserMessage: ( sessionId: string, chatId: string, @@ -81,9 +86,15 @@ export function registerMessageRoutes( reply.code(404); return { error: 'session not found' }; } + // v1.11: returns ALL messages including compacted ones. The UI + // distinguishes via the new `summary` flag (renders an accordion + // SummaryCard) and shows compacted_at-stamped rows inline for context. + // Internal inference assembly filters compacted_at IS NULL separately — + // see services/inference.ts loadContext + services/compaction.ts. const rows = await sql` SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq, - tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata + tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata, + summary, tail_start_id, compacted_at FROM messages WHERE session_id = ${req.params.id} ORDER BY created_at ASC, id ASC @@ -251,29 +262,30 @@ export function registerMessageRoutes( } ); + // v1.11: manual /compact. Was a streaming kind='compact' row inserted by + // this handler; now delegates to the anchored-rolling compaction service. + // Synchronous (we await the LLM call) — callers either await or rely on + // the 'compacted' WS frame to refresh their view. The response carries + // no body of interest; the new summary row arrives via the WS frame. app.post<{ Params: { id: string } }>( '/api/chats/:id/compact', async (req, reply) => { - const chatRows = await sql` - SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open' + const chatRows = await sql<{ id: string }[]>` + SELECT id FROM chats WHERE id = ${req.params.id} AND status = 'open' `; if (chatRows.length === 0) { reply.code(404); return { error: 'chat not found' }; } - const chat = chatRows[0]!; - const sessionId = chat.session_id; - - const [compactMsg] = await sql<{ id: string }[]>` - INSERT INTO messages (session_id, chat_id, role, content, kind, status, created_at) - VALUES (${sessionId}, ${chat.id}, 'system', '', 'compact', 'streaming', clock_timestamp()) - RETURNING id - `; - - handlers.enqueueCompact(sessionId, chat.id, compactMsg!.id, 'default'); - - reply.code(202); - return { compact_message_id: compactMsg!.id }; + try { + await handlers.runCompaction(chatRows[0]!.id); + } catch (err) { + req.log.error({ err, chatId: chatRows[0]!.id }, 'manual compaction failed'); + reply.code(500); + return { error: err instanceof Error ? err.message : 'compaction failed' }; + } + reply.code(200); + return { ok: true }; } ); diff --git a/apps/server/src/routes/ws.ts b/apps/server/src/routes/ws.ts index 88c854e..e25efd2 100644 --- a/apps/server/src/routes/ws.ts +++ b/apps/server/src/routes/ws.ts @@ -21,9 +21,12 @@ export function registerWebSocket( return; } + // v1.11: snapshot includes compaction fields so MessageBubble can + // render the SummaryCard for summary=true rows on first connect. const messages = await sql` SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq, - tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata + tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata, + summary, tail_start_id, compacted_at FROM messages WHERE session_id = ${sessionId} ORDER BY created_at ASC, id ASC diff --git a/apps/server/src/schema.sql b/apps/server/src/schema.sql index 9be9395..801a487 100644 --- a/apps/server/src/schema.sql +++ b/apps/server/src/schema.sql @@ -179,3 +179,25 @@ INSERT INTO settings (key, value) VALUES ('theme_mode', '"dark"') ON CONFLICT (k ALTER TABLE projects ADD COLUMN IF NOT EXISTS default_system_prompt TEXT NOT NULL DEFAULT ''; ALTER TABLE projects ADD COLUMN IF NOT EXISTS default_web_search_enabled BOOLEAN NOT NULL DEFAULT false; ALTER TABLE sessions ADD COLUMN IF NOT EXISTS web_search_enabled BOOLEAN; + +-- v1.11: anchored rolling compaction. +-- compacted_at — marks rows that are "behind the curtain" of the latest +-- summary. Inference assembly filters compacted_at IS NULL; +-- the API GET still returns all rows so the UI can show +-- history with the summary card inline. +-- summary — true on the assistant row that IS the anchored summary. +-- Exactly one row per chat is the "current" summary +-- (every prior summary row is itself compacted_at-stamped +-- when superseded, leaving one live anchor). +-- tail_start_id — points at the first preserved message that the summary +-- covers up to (exclusive). Lets the UI/debug reason about +-- the boundary without re-deriving from compacted_at. +-- needs_compaction — flag on chats (not sessions) because chat history is +-- per-chat; sessions have 1:N chats. Set true post-overflow, +-- cleared by compaction.process at the start of the next +-- inference turn. +ALTER TABLE messages ADD COLUMN IF NOT EXISTS compacted_at TIMESTAMPTZ; +ALTER TABLE messages ADD COLUMN IF NOT EXISTS summary BOOLEAN NOT NULL DEFAULT FALSE; +ALTER TABLE messages ADD COLUMN IF NOT EXISTS tail_start_id UUID REFERENCES messages(id) ON DELETE SET NULL; +ALTER TABLE chats ADD COLUMN IF NOT EXISTS needs_compaction BOOLEAN NOT NULL DEFAULT FALSE; +CREATE INDEX IF NOT EXISTS idx_messages_chat_compacted ON messages (chat_id, compacted_at); diff --git a/apps/server/src/services/__tests__/compaction.test.ts b/apps/server/src/services/__tests__/compaction.test.ts new file mode 100644 index 0000000..a239ee8 --- /dev/null +++ b/apps/server/src/services/__tests__/compaction.test.ts @@ -0,0 +1,258 @@ +import { describe, it, expect } from 'vitest'; +import { + usable, + isOverflow, + estimate, + turns, + select, + buildPrompt, + type CompactionMessage, +} from '../compaction.js'; +import { SUMMARY_TEMPLATE } from '../compaction-prompt.js'; + +// ---- fixture ---------------------------------------------------------------- +// Tiny constructor for the message shape `compaction.ts` consumes. Default +// values match the post-CP1 schema (summary=false, kind='message', complete). +// Tests that need a summary row pass `summary: true`. + +let counter = 0; +function mkMsg( + role: CompactionMessage['role'], + content: string, + overrides: Partial = {}, +): CompactionMessage { + counter += 1; + return { + id: `m${counter}`, + role, + content, + kind: 'message', + summary: false, + status: 'complete', + tool_calls: null, + tool_results: null, + metadata: null, + created_at: new Date(counter * 1000).toISOString(), + ...overrides, + }; +} + +// ---- usable ----------------------------------------------------------------- + +describe('usable', () => { + it('returns 0 when contextLimit is 0', () => { + expect(usable(0)).toBe(0); + }); + + it('returns 0 when contextLimit is below the 20k buffer', () => { + // Math.max(0, x - 20000) clamps the subtraction so we never report + // negative headroom. A 10k-context model reports 0 usable, which makes + // isOverflow short-circuit to false (correct — we can't size the + // compaction with no headroom). + expect(usable(10_000)).toBe(0); + expect(usable(19_999)).toBe(0); + expect(usable(20_000)).toBe(0); + }); + + it('subtracts the 20k buffer from a normal-sized context window', () => { + expect(usable(100_000)).toBe(80_000); + expect(usable(32_768)).toBe(12_768); + }); +}); + +// ---- isOverflow ------------------------------------------------------------- + +describe('isOverflow', () => { + it('returns false when usable is 0 (unknown / sub-buffer context)', () => { + expect(isOverflow({ prompt_tokens: 999_999, completion_tokens: 0 }, 0)).toBe(false); + expect(isOverflow({ prompt_tokens: 0, completion_tokens: 999_999 }, 10_000)).toBe(false); + }); + + it('returns false at 50% of usable', () => { + // usable(100k) = 80k → 50% = 40k. + expect(isOverflow({ prompt_tokens: 30_000, completion_tokens: 10_000 }, 100_000)).toBe(false); + }); + + it('returns false just under usable', () => { + expect(isOverflow({ prompt_tokens: 79_000, completion_tokens: 999 }, 100_000)).toBe(false); + }); + + it('returns true exactly at usable (>=, not strict >)', () => { + expect(isOverflow({ prompt_tokens: 80_000, completion_tokens: 0 }, 100_000)).toBe(true); + }); + + it('returns true above usable', () => { + expect(isOverflow({ prompt_tokens: 50_000, completion_tokens: 40_000 }, 100_000)).toBe(true); + }); +}); + +// ---- estimate --------------------------------------------------------------- + +describe('estimate', () => { + it('returns a tiny value for an empty array (JSON.stringify([]) is "[]")', () => { + // Math.ceil('[]'.length / 4) = 1. Documented here so the next reader + // doesn't think "0" is the expected baseline — char-count/4 will never + // be exactly 0 for any JSON-serializable input. + expect(estimate([])).toBe(1); + }); + + it('scales roughly with content length', () => { + const tiny = estimate([mkMsg('user', 'hi')]); + const big = estimate([mkMsg('user', 'x'.repeat(4000))]); + expect(big).toBeGreaterThan(tiny); + expect(big).toBeGreaterThanOrEqual(1000); // 4000 chars / 4 = 1000 floor + }); + + it('is deterministic across repeated calls', () => { + const msgs = [mkMsg('user', 'one'), mkMsg('assistant', 'two')]; + expect(estimate(msgs)).toBe(estimate(msgs)); + }); +}); + +// ---- turns ------------------------------------------------------------------ + +describe('turns', () => { + it('returns [] for an empty message list', () => { + expect(turns([])).toEqual([]); + }); + + it('returns one turn for a single user message', () => { + const u = mkMsg('user', 'hi'); + const result = turns([u]); + expect(result).toHaveLength(1); + expect(result[0]).toEqual({ start: 0, end: 1, id: u.id }); + }); + + it('returns two turns for user/assistant/user/assistant', () => { + const u1 = mkMsg('user', 'q1'); + const a1 = mkMsg('assistant', 'a1'); + const u2 = mkMsg('user', 'q2'); + const a2 = mkMsg('assistant', 'a2'); + const result = turns([u1, a1, u2, a2]); + expect(result).toEqual([ + { start: 0, end: 2, id: u1.id }, + { start: 2, end: 4, id: u2.id }, + ]); + }); + + it('extends the final turn end to include trailing non-user messages', () => { + // Spec wording: "user/assistant + trailing system → trailing included + // in last turn's range". Single-turn variant: [user, assistant, system] + // should produce one turn with end=3 (covers all three indices). + const u = mkMsg('user', 'q'); + const a = mkMsg('assistant', 'a'); + const s = mkMsg('system', 'note'); + const result = turns([u, a, s]); + expect(result).toEqual([{ start: 0, end: 3, id: u.id }]); + }); + + it('skips user rows flagged as summary (anchored-rolling rows)', () => { + // Defense-in-depth — process() pre-filters summary rows, but turns() + // also skips them so a misuse from another caller doesn't create a + // bogus turn boundary on the summary row itself. + const u1 = mkMsg('user', 'q1'); + const a1 = mkMsg('assistant', 'a1'); + const sum = mkMsg('user', 'rolled-up', { summary: true }); + const u2 = mkMsg('user', 'q2'); + const result = turns([u1, a1, sum, u2]); + expect(result.map((t) => t.id)).toEqual([u1.id, u2.id]); + }); +}); + +// ---- select ----------------------------------------------------------------- + +describe('select', () => { + it('returns empty head + undefined tail for an empty message list', () => { + const result = select([], 100_000); + expect(result.head).toEqual([]); + expect(result.tail_start_id).toBeUndefined(); + }); + + it('full-preserves when there are fewer turns than tail_turns', () => { + // 1 turn but tail_turns=2: keep === turn0 → keep.start === 0 → + // sentinel-return path that signals "no compaction this round". + const u = mkMsg('user', 'only'); + const a = mkMsg('assistant', 'a'); + const result = select([u, a], 100_000, 2); + expect(result.head).toEqual([u, a]); + expect(result.tail_start_id).toBeUndefined(); + }); + + it('keeps the last tail_turns turns when they all fit the budget', () => { + // 3 turns, all small. tail_turns=2 means keep the last 2; head = + // messages[0..turn2.start] = just turn1's content. + const u1 = mkMsg('user', 'q1'); + const a1 = mkMsg('assistant', 'a1'); + const u2 = mkMsg('user', 'q2'); + const a2 = mkMsg('assistant', 'a2'); + const u3 = mkMsg('user', 'q3'); + const a3 = mkMsg('assistant', 'a3'); + const msgs = [u1, a1, u2, a2, u3, a3]; + const result = select(msgs, 100_000, 2); + // Turn boundaries: [0,2), [2,4), [4,6). slice(-2) = turns at 2 and 4. + // Walking backward: u3 fits, then u2 fits → keep={start:2, id:u2.id}. + expect(result.tail_start_id).toBe(u2.id); + expect(result.head).toEqual([u1, a1]); + }); + + it('splits a turn mid-stream when the whole turn would overflow the budget', () => { + // tail_turns=1 so we look only at the most recent turn. Stuff it past + // 8k of content (max preserve budget) and the splitter walks forward + // looking for the largest suffix that fits. + const u1 = mkMsg('user', 'q1'); + const a1 = mkMsg('assistant', 'a1'); + const u2 = mkMsg('user', 'q2 with a giant payload'); + const huge = mkMsg('assistant', 'X'.repeat(40_000)); // ~10k tokens + const smallTail = mkMsg('assistant', 'short answer'); + const msgs = [u1, a1, u2, huge, smallTail]; + const result = select(msgs, 100_000, 1); + // The split walks from turn.start+1 forward; the first index whose + // [i, end) slice fits the budget becomes the new keep. We don't assert + // a specific id (depends on character math), only that compaction was + // triggered (tail_start_id set, head non-empty) and that the head + // doesn't include the final small message. + expect(result.tail_start_id).toBeDefined(); + expect(result.head.length).toBeGreaterThan(0); + expect(result.head).not.toContain(smallTail); + }); + + it('full-preserves when no split point fits', () => { + // Single oversized turn; splitTurn walks but each suffix is still too + // big. After the loop, keep is undefined → full-preserve sentinel. + // Force this with a sub-buffer context so budget is the floor (2k), + // and a single 40k-char message. + const u = mkMsg('user', 'oversized'); + const a = mkMsg('assistant', 'Y'.repeat(40_000)); + const result = select([u, a], 30_000, 1); + // usable(30k) = 10k → budget = min(8k, max(2k, floor(10k*0.25))) = + // min(8k, max(2k, 2500)) = 2500. 40k chars ≈ 10k tokens. Can't fit. + expect(result.tail_start_id).toBeUndefined(); + expect(result.head).toEqual([u, a]); + }); +}); + +// ---- buildPrompt ------------------------------------------------------------ + +describe('buildPrompt', () => { + it('opens with the "create new" anchor when previousSummary is undefined', () => { + const out = buildPrompt(undefined, []); + expect(out.startsWith('Create a new anchored summary')).toBe(true); + expect(out).toContain(SUMMARY_TEMPLATE); + expect(out).not.toContain(''); + }); + + it('opens with the "update" anchor and embeds previousSummary verbatim', () => { + const prev = '## Goal\n- finish v1.11 compaction'; + const out = buildPrompt(prev, []); + expect(out.startsWith('Update the anchored summary')).toBe(true); + expect(out).toContain(''); + expect(out).toContain(prev); + expect(out).toContain(''); + expect(out).toContain(SUMMARY_TEMPLATE); + }); + + it('appends extra context strings after the template (reserved for plugin injection)', () => { + const out = buildPrompt(undefined, ['extra-context-line']); + expect(out.endsWith('extra-context-line')).toBe(true); + }); +}); diff --git a/apps/server/src/services/compaction-prompt.ts b/apps/server/src/services/compaction-prompt.ts new file mode 100644 index 0000000..6bf96cb --- /dev/null +++ b/apps/server/src/services/compaction-prompt.ts @@ -0,0 +1,40 @@ +// v1.11: anchored rolling summary template. Verbatim port from opencode +// (packages/opencode/src/session/compaction.ts SUMMARY_TEMPLATE). Kept in a +// separate module so the long template literal doesn't bloat compaction.ts. + +export const SUMMARY_TEMPLATE = `Output exactly the Markdown structure shown inside