From 1cb6eee24ced9021e145cc54acfc7db478acd6c7 Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Fri, 22 May 2026 05:46:29 +0000 Subject: [PATCH] v1.13.0: message_parts table + dual-write at every tool_calls/tool_results site MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a granular message_parts table (one row per text/tool_call/tool_result chunk) without changing any read path. Old messages.content / tool_calls / tool_results columns remain authoritative for v1.13.0; this dispatch is write-only mirroring so the AI SDK migration in v1.13.1 can flip read authority without a backfill window. Schema: CREATE TABLE message_parts (id, message_id FK ON DELETE CASCADE, sequence int, kind text CHECK (text|tool_call|tool_result|reasoning|step_start), payload jsonb, created_at, UNIQUE (message_id, sequence)) New module services/inference/parts.ts with two pure derive helpers (partsFromAssistantMessage, partsFromToolMessage) and insertParts that fan-outs a multi-row INSERT via postgres-js. Wired dual-write at every site that writes tool_calls or tool_results: - tool-phase.ts: assistant finalize UPDATE, executed-tool UPDATE, ask_user_input sentinel UPDATE - messages.ts answer flow: DELETE pending tool_result part + INSERT answered one inside the existing sql.begin - skills.ts: synthetic assistant + tool INSERTs both inside existing tx - chats.ts fork: CTE clones parts via ROW_NUMBER pairing (source→dest message id mapping in one statement, no N+1) - error-handler.ts finalizeCompletion: text part for plain text-only assistant turns Deviation: tool-phase.ts finalize UPDATEs and finalizeCompletion text-part write are not wrapped in fresh sql.begin transactions. Safe in v1.13.0 because JSON columns are authoritative for reads. v1.13.1 must wrap these sites before flipping read authority — TODO comments added at each unwrapped site referencing v1.13.1. Tests: 8 new unit tests for the derive helpers in services/__tests__/parts.test.ts. Existing 162 tests untouched. 170 total. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/server/src/routes/chats.ts | 22 +++++ apps/server/src/routes/messages.ts | 9 ++ apps/server/src/routes/skills.ts | 15 +++ apps/server/src/schema.sql | 17 ++++ .../src/services/__tests__/parts.test.ts | 91 +++++++++++++++++++ .../src/services/inference/error-handler.ts | 13 +++ apps/server/src/services/inference/parts.ts | 86 ++++++++++++++++++ .../src/services/inference/tool-phase.ts | 37 ++++++++ 8 files changed, 290 insertions(+) create mode 100644 apps/server/src/services/__tests__/parts.test.ts create mode 100644 apps/server/src/services/inference/parts.ts diff --git a/apps/server/src/routes/chats.ts b/apps/server/src/routes/chats.ts index df601a5..05f4997 100644 --- a/apps/server/src/routes/chats.ts +++ b/apps/server/src/routes/chats.ts @@ -313,6 +313,28 @@ export function registerChatRoutes( AND created_at <= ${target.created_at}::timestamptz AND status = 'complete' `; + // v1.13.0: clone message_parts for the forked messages. Source and + // destination preserve ordering (the INSERT above orders by created_at, + // id) so a ROW_NUMBER pairing maps source.id → dest.id deterministically. + await tx` + WITH src AS ( + SELECT id, ROW_NUMBER() OVER (ORDER BY created_at ASC, id ASC) AS rn + FROM messages + WHERE chat_id = ${source.id} + AND created_at <= ${target.created_at}::timestamptz + AND status = 'complete' + ), + dst AS ( + SELECT id, ROW_NUMBER() OVER (ORDER BY created_at ASC, id ASC) AS rn + FROM messages + WHERE chat_id = ${chat!.id} + ) + INSERT INTO message_parts (message_id, sequence, kind, payload) + SELECT dst.id, p.sequence, p.kind, p.payload + FROM message_parts p + JOIN src ON p.message_id = src.id + JOIN dst ON dst.rn = src.rn + `; return chat!; }); diff --git a/apps/server/src/routes/messages.ts b/apps/server/src/routes/messages.ts index bfaf148..1316305 100644 --- a/apps/server/src/routes/messages.ts +++ b/apps/server/src/routes/messages.ts @@ -576,6 +576,15 @@ export function registerMessageRoutes( SET tool_results = ${tx.json(newToolResults as never)} WHERE id = ${toolRow.id} `; + // v1.13.0: replace the pending tool_result part inserted at message + // creation (tool-phase.ts) with the answered one. Delete-then-insert + // is simpler than UPDATE because parts are append-style elsewhere; + // the UNIQUE (message_id, sequence) constraint blocks plain insert. + await tx`DELETE FROM message_parts WHERE message_id = ${toolRow.id} AND kind = 'tool_result'`; + await tx` + INSERT INTO message_parts (message_id, sequence, kind, payload) + VALUES (${toolRow.id}, 0, 'tool_result', ${tx.json(newToolResults as never)}) + `; const [assistantMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp()) diff --git a/apps/server/src/routes/skills.ts b/apps/server/src/routes/skills.ts index 5471915..6f4802f 100644 --- a/apps/server/src/routes/skills.ts +++ b/apps/server/src/routes/skills.ts @@ -90,11 +90,26 @@ export function registerSkillsRoutes( VALUES (${sessionId}, ${chat.id}, 'assistant', '', ${sql.json(toolCalls as never)}, 'complete', clock_timestamp()) RETURNING id `; + // v1.13.0: dual-write the synthetic assistant message's tool_call. + // Single skill_use tool_call, no text content, so one part at seq 0. + await tx` + INSERT INTO message_parts (message_id, sequence, kind, payload) + VALUES (${synthAssistant!.id}, 0, 'tool_call', ${tx.json({ + id: toolCallId, + name: 'skill_use', + args: { name: skill_name }, + } as never)}) + `; const [toolMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, tool_results, status, created_at) VALUES (${sessionId}, ${chat.id}, 'tool', '', ${sql.json(toolResults as never)}, 'complete', clock_timestamp()) RETURNING id `; + // v1.13.0: dual-write the synthetic tool result (the skill body). + await tx` + INSERT INTO message_parts (message_id, sequence, kind, payload) + VALUES (${toolMsg!.id}, 0, 'tool_result', ${tx.json(toolResults as never)}) + `; const [userMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chat.id}, 'user', ${userText}, 'complete', clock_timestamp()) diff --git a/apps/server/src/schema.sql b/apps/server/src/schema.sql index 68ee801..a109d1b 100644 --- a/apps/server/src/schema.sql +++ b/apps/server/src/schema.sql @@ -32,6 +32,23 @@ CREATE TABLE IF NOT EXISTS messages ( CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, created_at); +-- v1.13.0: granular message parts table for AI SDK migration. Old +-- messages.content / tool_calls / tool_results columns stay authoritative +-- for reads in v1.13.0; this table is dual-written so the swap can happen +-- in a later dispatch without a backfill window. ON DELETE CASCADE means +-- removing a message removes its parts in one go. +CREATE TABLE IF NOT EXISTS message_parts ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + message_id uuid NOT NULL REFERENCES messages(id) ON DELETE CASCADE, + sequence int NOT NULL, + kind text NOT NULL, + payload jsonb NOT NULL, + created_at timestamptz NOT NULL DEFAULT clock_timestamp(), + CONSTRAINT message_parts_kind_chk CHECK (kind IN ('text', 'tool_call', 'tool_result', 'reasoning', 'step_start')), + CONSTRAINT message_parts_seq_uniq UNIQUE (message_id, sequence) +); +CREATE INDEX IF NOT EXISTS message_parts_msg_seq_idx ON message_parts (message_id, sequence); + ALTER TABLE messages ADD COLUMN IF NOT EXISTS tokens_used INTEGER; ALTER TABLE messages ADD COLUMN IF NOT EXISTS ctx_used INTEGER; ALTER TABLE messages ADD COLUMN IF NOT EXISTS ctx_max INTEGER; diff --git a/apps/server/src/services/__tests__/parts.test.ts b/apps/server/src/services/__tests__/parts.test.ts new file mode 100644 index 0000000..bf94fa0 --- /dev/null +++ b/apps/server/src/services/__tests__/parts.test.ts @@ -0,0 +1,91 @@ +import { describe, it, expect } from 'vitest'; +import { partsFromAssistantMessage, partsFromToolMessage } from '../inference/parts.js'; +import type { ToolCall, ToolResult } from '../../types/api.js'; + +describe('partsFromAssistantMessage', () => { + it('emits one text part for content-only assistant', () => { + const parts = partsFromAssistantMessage({ content: 'hello world', tool_calls: null }); + expect(parts).toHaveLength(1); + expect(parts[0]).toEqual({ + sequence: 0, + kind: 'text', + payload: { text: 'hello world' }, + }); + }); + + it('emits one tool_call part for empty-content + single tool_call', () => { + const tc: ToolCall = { id: 'call_1', name: 'view_file', args: { path: 'src/a.ts' } }; + const parts = partsFromAssistantMessage({ content: '', tool_calls: [tc] }); + expect(parts).toHaveLength(1); + expect(parts[0]).toEqual({ + sequence: 0, + kind: 'tool_call', + payload: { id: 'call_1', name: 'view_file', args: { path: 'src/a.ts' } }, + }); + }); + + it('emits text then tool_call parts in order when both present', () => { + const tc: ToolCall = { id: 'call_2', name: 'grep', args: { pattern: 'foo' } }; + const parts = partsFromAssistantMessage({ content: 'let me search', tool_calls: [tc] }); + expect(parts.map((p) => [p.sequence, p.kind])).toEqual([ + [0, 'text'], + [1, 'tool_call'], + ]); + }); + + it('preserves tool_call order with multiple calls', () => { + const calls: ToolCall[] = [ + { id: 'a', name: 'list_dir', args: { path: '.' } }, + { id: 'b', name: 'view_file', args: { path: 'x.ts' } }, + { id: 'c', name: 'grep', args: { pattern: 'y' } }, + ]; + const parts = partsFromAssistantMessage({ content: '', tool_calls: calls }); + expect(parts).toHaveLength(3); + expect(parts.map((p) => p.payload)).toEqual([ + { id: 'a', name: 'list_dir', args: { path: '.' } }, + { id: 'b', name: 'view_file', args: { path: 'x.ts' } }, + { id: 'c', name: 'grep', args: { pattern: 'y' } }, + ]); + expect(parts.map((p) => p.sequence)).toEqual([0, 1, 2]); + }); + + it('returns empty array for empty content + null tool_calls', () => { + expect(partsFromAssistantMessage({ content: '', tool_calls: null })).toEqual([]); + }); +}); + +describe('partsFromToolMessage', () => { + it('emits a single tool_result part at sequence 0', () => { + const tr: ToolResult = { + tool_call_id: 'call_1', + output: { contents: 'console.log(1)' }, + truncated: false, + }; + const parts = partsFromToolMessage({ tool_results: tr }); + expect(parts).toHaveLength(1); + expect(parts[0]).toEqual({ + sequence: 0, + kind: 'tool_result', + payload: { + tool_call_id: 'call_1', + output: { contents: 'console.log(1)' }, + truncated: false, + }, + }); + }); + + it('includes error in payload when present', () => { + const tr: ToolResult = { + tool_call_id: 'call_2', + output: null, + truncated: false, + error: 'permission denied', + }; + const parts = partsFromToolMessage({ tool_results: tr }); + expect(parts[0]!.payload).toMatchObject({ error: 'permission denied' }); + }); + + it('returns empty array when tool_results is null', () => { + expect(partsFromToolMessage({ tool_results: null })).toEqual([]); + }); +}); diff --git a/apps/server/src/services/inference/error-handler.ts b/apps/server/src/services/inference/error-handler.ts index 6364ccc..9b260b5 100644 --- a/apps/server/src/services/inference/error-handler.ts +++ b/apps/server/src/services/inference/error-handler.ts @@ -1,6 +1,7 @@ import type { MessageMetadata, Session } from '../../types/api.js'; import * as modelContext from '../model-context.js'; import { maybeFlagForCompaction } from './payload.js'; +import { insertParts, partsFromAssistantMessage } from './parts.js'; import type { InferenceContext, StreamResult, TurnArgs } from './turn.js'; export async function handleAbortOrError( @@ -112,6 +113,18 @@ export async function finalizeCompletion( WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; + // v1.13.0: dual-write the text part. finalizeCompletion is the terminal + // path for text-only assistant turns (no tool calls); tool_calls are null + // here by construction (the tool-bearing path goes through executeToolPhase). + // TODO(v1.13.1): wrap the UPDATE above and this insertParts in a single + // sql.begin before flipping read authority to message_parts. + await insertParts( + ctx.sql, + partsFromAssistantMessage({ content, tool_calls: null }).map((p) => ({ + ...p, + message_id: assistantMessageId, + })), + ); // v1.11: flag for compaction on the terminal turn too. Catches the common // case of a turn that hit the limit without invoking tools. await maybeFlagForCompaction(ctx, chatId, updated); diff --git a/apps/server/src/services/inference/parts.ts b/apps/server/src/services/inference/parts.ts new file mode 100644 index 0000000..f62eea3 --- /dev/null +++ b/apps/server/src/services/inference/parts.ts @@ -0,0 +1,86 @@ +import type { Sql } from '../../db.js'; +import type { ToolCall, ToolResult } from '../../types/api.js'; + +// v1.13.0: dual-write helper. Every site that writes the legacy +// messages.tool_calls / messages.tool_results JSON columns calls into here +// to mirror the same data into message_parts rows. Reads still go to the +// JSON columns; the swap to parts-as-source-of-truth happens in a later +// v1.13 dispatch alongside the AI SDK streamText migration. + +export type PartKind = 'text' | 'tool_call' | 'tool_result' | 'reasoning' | 'step_start'; + +export interface PartInsert { + message_id: string; + sequence: number; + kind: PartKind; + payload: unknown; +} + +export async function insertParts(sql: Sql, parts: PartInsert[]): Promise { + if (parts.length === 0) return; + // postgres-js fans out an array of objects to a multi-row INSERT. Each + // payload field needs sql.json() so jsonb storage receives a JSON value + // rather than a quoted string. + await sql` + INSERT INTO message_parts ${sql( + parts.map((p) => ({ + message_id: p.message_id, + sequence: p.sequence, + kind: p.kind, + payload: sql.json(p.payload as never), + })), + 'message_id', + 'sequence', + 'kind', + 'payload', + )} + `; +} + +// Derive parts from the canonical messages row for an assistant message. +// content (when non-empty) becomes a 'text' part at sequence 0; each tool_call +// becomes a 'tool_call' part with payload { id, name, args } where args is +// the parsed object (we use the in-memory ToolCall shape, not the OpenAI +// stringified one). +export function partsFromAssistantMessage(args: { + content: string; + tool_calls: ToolCall[] | null; +}): Omit[] { + const out: Omit[] = []; + let seq = 0; + if (args.content && args.content.length > 0) { + out.push({ sequence: seq, kind: 'text', payload: { text: args.content } }); + seq += 1; + } + for (const tc of args.tool_calls ?? []) { + out.push({ + sequence: seq, + kind: 'tool_call', + payload: { id: tc.id, name: tc.name, args: tc.args }, + }); + seq += 1; + } + return out; +} + +// Derive a single tool_result part from a tool message's tool_results JSON. +// The payload includes the same shape that buildMessagesPayload reads from +// later: tool_call_id, output, optional error/truncated metadata. +export function partsFromToolMessage(args: { + tool_results: ToolResult | null; +}): Omit[] { + if (!args.tool_results) return []; + const tr = args.tool_results; + return [ + { + sequence: 0, + kind: 'tool_result', + payload: { + tool_call_id: tr.tool_call_id, + output: tr.output, + truncated: tr.truncated, + ...(tr.error ? { error: tr.error } : {}), + }, + }, + ]; +} diff --git a/apps/server/src/services/inference/tool-phase.ts b/apps/server/src/services/inference/tool-phase.ts index 8fd94de..69d6da6 100644 --- a/apps/server/src/services/inference/tool-phase.ts +++ b/apps/server/src/services/inference/tool-phase.ts @@ -3,6 +3,7 @@ import * as modelContext from '../model-context.js'; import { PathScopeError } from '../path_guard.js'; import { TOOLS_BY_NAME } from '../tools.js'; import { maybeFlagForCompaction } from './payload.js'; +import { insertParts, partsFromAssistantMessage, partsFromToolMessage } from './parts.js'; import type { InferenceContext, StreamResult, @@ -97,6 +98,20 @@ export async function executeToolPhase( WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; + // v1.13.0: dual-write to message_parts. Reads still consume the JSON + // columns above; this mirrors the same data into the granular table so + // the AI SDK migration can swap reads later without a backfill window. + // TODO(v1.13.1): wrap the UPDATE above and this insertParts in a single + // sql.begin before flipping read authority to message_parts. Without the + // transaction, a crash between the two leaves an orphan message that + // becomes invisible in the parts-authoritative read path. + await insertParts( + ctx.sql, + partsFromAssistantMessage({ content, tool_calls: toolCalls }).map((p) => ({ + ...p, + message_id: assistantMessageId, + })), + ); // v1.11: flag for compaction if this turn pushed us over the usable budget. // We never compact mid-loop (the recursive runAssistantTurn keeps tools // flowing); the flag fires on the NEXT turn's pre-fetch hook above. @@ -150,6 +165,18 @@ export async function executeToolPhase( SET tool_results = ${ctx.sql.json(sentinel as never)} WHERE id = ${toolMessageId} `; + // v1.13.0: mirror the pending sentinel into message_parts. The + // answer-endpoint UPDATE later (messages.ts:576) will delete and + // re-insert this part when the user submits their answer. + // TODO(v1.13.1): wrap the INSERT + UPDATE + insertParts triple in + // a per-iteration sql.begin before flipping read authority. + await insertParts( + ctx.sql, + partsFromToolMessage({ tool_results: sentinel }).map((p) => ({ + ...p, + message_id: toolMessageId, + })), + ); return; } const tres = await executeToolCall(projectRoot, tc); @@ -164,6 +191,16 @@ export async function executeToolPhase( SET tool_results = ${ctx.sql.json(stored as never)} WHERE id = ${toolMessageId} `; + // v1.13.0: dual-write the tool_result part. + // TODO(v1.13.1): wrap the INSERT + UPDATE + insertParts triple in a + // per-iteration sql.begin before flipping read authority. + await insertParts( + ctx.sql, + partsFromToolMessage({ tool_results: stored }).map((p) => ({ + ...p, + message_id: toolMessageId, + })), + ); ctx.publish(sessionId, { type: 'tool_result', tool_message_id: toolMessageId,