v1.13.0: message_parts table + dual-write at every tool_calls/tool_results site
Adds a granular message_parts table (one row per text/tool_call/tool_result
chunk) without changing any read path. Old messages.content / tool_calls /
tool_results columns remain authoritative for v1.13.0; this dispatch is
write-only mirroring so the AI SDK migration in v1.13.1 can flip read
authority without a backfill window.
Schema:
CREATE TABLE message_parts (id, message_id FK ON DELETE CASCADE,
sequence int, kind text CHECK (text|tool_call|tool_result|reasoning|step_start),
payload jsonb, created_at, UNIQUE (message_id, sequence))
New module services/inference/parts.ts with two pure derive helpers
(partsFromAssistantMessage, partsFromToolMessage) and insertParts that
fan-outs a multi-row INSERT via postgres-js.
Wired dual-write at every site that writes tool_calls or tool_results:
- tool-phase.ts: assistant finalize UPDATE, executed-tool UPDATE,
ask_user_input sentinel UPDATE
- messages.ts answer flow: DELETE pending tool_result part + INSERT
answered one inside the existing sql.begin
- skills.ts: synthetic assistant + tool INSERTs both inside existing tx
- chats.ts fork: CTE clones parts via ROW_NUMBER pairing (source→dest
message id mapping in one statement, no N+1)
- error-handler.ts finalizeCompletion: text part for plain text-only
assistant turns
Deviation: tool-phase.ts finalize UPDATEs and finalizeCompletion text-part
write are not wrapped in fresh sql.begin transactions. Safe in v1.13.0
because JSON columns are authoritative for reads. v1.13.1 must wrap these
sites before flipping read authority — TODO comments added at each
unwrapped site referencing v1.13.1.
Tests: 8 new unit tests for the derive helpers in
services/__tests__/parts.test.ts. Existing 162 tests untouched. 170 total.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -313,6 +313,28 @@ export function registerChatRoutes(
|
||||
AND created_at <= ${target.created_at}::timestamptz
|
||||
AND status = 'complete'
|
||||
`;
|
||||
// v1.13.0: clone message_parts for the forked messages. Source and
|
||||
// destination preserve ordering (the INSERT above orders by created_at,
|
||||
// id) so a ROW_NUMBER pairing maps source.id → dest.id deterministically.
|
||||
await tx`
|
||||
WITH src AS (
|
||||
SELECT id, ROW_NUMBER() OVER (ORDER BY created_at ASC, id ASC) AS rn
|
||||
FROM messages
|
||||
WHERE chat_id = ${source.id}
|
||||
AND created_at <= ${target.created_at}::timestamptz
|
||||
AND status = 'complete'
|
||||
),
|
||||
dst AS (
|
||||
SELECT id, ROW_NUMBER() OVER (ORDER BY created_at ASC, id ASC) AS rn
|
||||
FROM messages
|
||||
WHERE chat_id = ${chat!.id}
|
||||
)
|
||||
INSERT INTO message_parts (message_id, sequence, kind, payload)
|
||||
SELECT dst.id, p.sequence, p.kind, p.payload
|
||||
FROM message_parts p
|
||||
JOIN src ON p.message_id = src.id
|
||||
JOIN dst ON dst.rn = src.rn
|
||||
`;
|
||||
return chat!;
|
||||
});
|
||||
|
||||
|
||||
@@ -576,6 +576,15 @@ export function registerMessageRoutes(
|
||||
SET tool_results = ${tx.json(newToolResults as never)}
|
||||
WHERE id = ${toolRow.id}
|
||||
`;
|
||||
// v1.13.0: replace the pending tool_result part inserted at message
|
||||
// creation (tool-phase.ts) with the answered one. Delete-then-insert
|
||||
// is simpler than UPDATE because parts are append-style elsewhere;
|
||||
// the UNIQUE (message_id, sequence) constraint blocks plain insert.
|
||||
await tx`DELETE FROM message_parts WHERE message_id = ${toolRow.id} AND kind = 'tool_result'`;
|
||||
await tx`
|
||||
INSERT INTO message_parts (message_id, sequence, kind, payload)
|
||||
VALUES (${toolRow.id}, 0, 'tool_result', ${tx.json(newToolResults as never)})
|
||||
`;
|
||||
const [assistantMsg] = await tx<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
|
||||
@@ -90,11 +90,26 @@ export function registerSkillsRoutes(
|
||||
VALUES (${sessionId}, ${chat.id}, 'assistant', '', ${sql.json(toolCalls as never)}, 'complete', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
// v1.13.0: dual-write the synthetic assistant message's tool_call.
|
||||
// Single skill_use tool_call, no text content, so one part at seq 0.
|
||||
await tx`
|
||||
INSERT INTO message_parts (message_id, sequence, kind, payload)
|
||||
VALUES (${synthAssistant!.id}, 0, 'tool_call', ${tx.json({
|
||||
id: toolCallId,
|
||||
name: 'skill_use',
|
||||
args: { name: skill_name },
|
||||
} as never)})
|
||||
`;
|
||||
const [toolMsg] = await tx<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, tool_results, status, created_at)
|
||||
VALUES (${sessionId}, ${chat.id}, 'tool', '', ${sql.json(toolResults as never)}, 'complete', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
// v1.13.0: dual-write the synthetic tool result (the skill body).
|
||||
await tx`
|
||||
INSERT INTO message_parts (message_id, sequence, kind, payload)
|
||||
VALUES (${toolMsg!.id}, 0, 'tool_result', ${tx.json(toolResults as never)})
|
||||
`;
|
||||
const [userMsg] = await tx<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chat.id}, 'user', ${userText}, 'complete', clock_timestamp())
|
||||
|
||||
@@ -32,6 +32,23 @@ CREATE TABLE IF NOT EXISTS messages (
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, created_at);
|
||||
|
||||
-- v1.13.0: granular message parts table for AI SDK migration. Old
|
||||
-- messages.content / tool_calls / tool_results columns stay authoritative
|
||||
-- for reads in v1.13.0; this table is dual-written so the swap can happen
|
||||
-- in a later dispatch without a backfill window. ON DELETE CASCADE means
|
||||
-- removing a message removes its parts in one go.
|
||||
CREATE TABLE IF NOT EXISTS message_parts (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
message_id uuid NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
|
||||
sequence int NOT NULL,
|
||||
kind text NOT NULL,
|
||||
payload jsonb NOT NULL,
|
||||
created_at timestamptz NOT NULL DEFAULT clock_timestamp(),
|
||||
CONSTRAINT message_parts_kind_chk CHECK (kind IN ('text', 'tool_call', 'tool_result', 'reasoning', 'step_start')),
|
||||
CONSTRAINT message_parts_seq_uniq UNIQUE (message_id, sequence)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS message_parts_msg_seq_idx ON message_parts (message_id, sequence);
|
||||
|
||||
ALTER TABLE messages ADD COLUMN IF NOT EXISTS tokens_used INTEGER;
|
||||
ALTER TABLE messages ADD COLUMN IF NOT EXISTS ctx_used INTEGER;
|
||||
ALTER TABLE messages ADD COLUMN IF NOT EXISTS ctx_max INTEGER;
|
||||
|
||||
91
apps/server/src/services/__tests__/parts.test.ts
Normal file
91
apps/server/src/services/__tests__/parts.test.ts
Normal file
@@ -0,0 +1,91 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { partsFromAssistantMessage, partsFromToolMessage } from '../inference/parts.js';
|
||||
import type { ToolCall, ToolResult } from '../../types/api.js';
|
||||
|
||||
describe('partsFromAssistantMessage', () => {
|
||||
it('emits one text part for content-only assistant', () => {
|
||||
const parts = partsFromAssistantMessage({ content: 'hello world', tool_calls: null });
|
||||
expect(parts).toHaveLength(1);
|
||||
expect(parts[0]).toEqual({
|
||||
sequence: 0,
|
||||
kind: 'text',
|
||||
payload: { text: 'hello world' },
|
||||
});
|
||||
});
|
||||
|
||||
it('emits one tool_call part for empty-content + single tool_call', () => {
|
||||
const tc: ToolCall = { id: 'call_1', name: 'view_file', args: { path: 'src/a.ts' } };
|
||||
const parts = partsFromAssistantMessage({ content: '', tool_calls: [tc] });
|
||||
expect(parts).toHaveLength(1);
|
||||
expect(parts[0]).toEqual({
|
||||
sequence: 0,
|
||||
kind: 'tool_call',
|
||||
payload: { id: 'call_1', name: 'view_file', args: { path: 'src/a.ts' } },
|
||||
});
|
||||
});
|
||||
|
||||
it('emits text then tool_call parts in order when both present', () => {
|
||||
const tc: ToolCall = { id: 'call_2', name: 'grep', args: { pattern: 'foo' } };
|
||||
const parts = partsFromAssistantMessage({ content: 'let me search', tool_calls: [tc] });
|
||||
expect(parts.map((p) => [p.sequence, p.kind])).toEqual([
|
||||
[0, 'text'],
|
||||
[1, 'tool_call'],
|
||||
]);
|
||||
});
|
||||
|
||||
it('preserves tool_call order with multiple calls', () => {
|
||||
const calls: ToolCall[] = [
|
||||
{ id: 'a', name: 'list_dir', args: { path: '.' } },
|
||||
{ id: 'b', name: 'view_file', args: { path: 'x.ts' } },
|
||||
{ id: 'c', name: 'grep', args: { pattern: 'y' } },
|
||||
];
|
||||
const parts = partsFromAssistantMessage({ content: '', tool_calls: calls });
|
||||
expect(parts).toHaveLength(3);
|
||||
expect(parts.map((p) => p.payload)).toEqual([
|
||||
{ id: 'a', name: 'list_dir', args: { path: '.' } },
|
||||
{ id: 'b', name: 'view_file', args: { path: 'x.ts' } },
|
||||
{ id: 'c', name: 'grep', args: { pattern: 'y' } },
|
||||
]);
|
||||
expect(parts.map((p) => p.sequence)).toEqual([0, 1, 2]);
|
||||
});
|
||||
|
||||
it('returns empty array for empty content + null tool_calls', () => {
|
||||
expect(partsFromAssistantMessage({ content: '', tool_calls: null })).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('partsFromToolMessage', () => {
|
||||
it('emits a single tool_result part at sequence 0', () => {
|
||||
const tr: ToolResult = {
|
||||
tool_call_id: 'call_1',
|
||||
output: { contents: 'console.log(1)' },
|
||||
truncated: false,
|
||||
};
|
||||
const parts = partsFromToolMessage({ tool_results: tr });
|
||||
expect(parts).toHaveLength(1);
|
||||
expect(parts[0]).toEqual({
|
||||
sequence: 0,
|
||||
kind: 'tool_result',
|
||||
payload: {
|
||||
tool_call_id: 'call_1',
|
||||
output: { contents: 'console.log(1)' },
|
||||
truncated: false,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('includes error in payload when present', () => {
|
||||
const tr: ToolResult = {
|
||||
tool_call_id: 'call_2',
|
||||
output: null,
|
||||
truncated: false,
|
||||
error: 'permission denied',
|
||||
};
|
||||
const parts = partsFromToolMessage({ tool_results: tr });
|
||||
expect(parts[0]!.payload).toMatchObject({ error: 'permission denied' });
|
||||
});
|
||||
|
||||
it('returns empty array when tool_results is null', () => {
|
||||
expect(partsFromToolMessage({ tool_results: null })).toEqual([]);
|
||||
});
|
||||
});
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { MessageMetadata, Session } from '../../types/api.js';
|
||||
import * as modelContext from '../model-context.js';
|
||||
import { maybeFlagForCompaction } from './payload.js';
|
||||
import { insertParts, partsFromAssistantMessage } from './parts.js';
|
||||
import type { InferenceContext, StreamResult, TurnArgs } from './turn.js';
|
||||
|
||||
export async function handleAbortOrError(
|
||||
@@ -112,6 +113,18 @@ export async function finalizeCompletion(
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
||||
`;
|
||||
// v1.13.0: dual-write the text part. finalizeCompletion is the terminal
|
||||
// path for text-only assistant turns (no tool calls); tool_calls are null
|
||||
// here by construction (the tool-bearing path goes through executeToolPhase).
|
||||
// TODO(v1.13.1): wrap the UPDATE above and this insertParts in a single
|
||||
// sql.begin before flipping read authority to message_parts.
|
||||
await insertParts(
|
||||
ctx.sql,
|
||||
partsFromAssistantMessage({ content, tool_calls: null }).map((p) => ({
|
||||
...p,
|
||||
message_id: assistantMessageId,
|
||||
})),
|
||||
);
|
||||
// v1.11: flag for compaction on the terminal turn too. Catches the common
|
||||
// case of a turn that hit the limit without invoking tools.
|
||||
await maybeFlagForCompaction(ctx, chatId, updated);
|
||||
|
||||
86
apps/server/src/services/inference/parts.ts
Normal file
86
apps/server/src/services/inference/parts.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
import type { Sql } from '../../db.js';
|
||||
import type { ToolCall, ToolResult } from '../../types/api.js';
|
||||
|
||||
// v1.13.0: dual-write helper. Every site that writes the legacy
|
||||
// messages.tool_calls / messages.tool_results JSON columns calls into here
|
||||
// to mirror the same data into message_parts rows. Reads still go to the
|
||||
// JSON columns; the swap to parts-as-source-of-truth happens in a later
|
||||
// v1.13 dispatch alongside the AI SDK streamText migration.
|
||||
|
||||
export type PartKind = 'text' | 'tool_call' | 'tool_result' | 'reasoning' | 'step_start';
|
||||
|
||||
export interface PartInsert {
|
||||
message_id: string;
|
||||
sequence: number;
|
||||
kind: PartKind;
|
||||
payload: unknown;
|
||||
}
|
||||
|
||||
export async function insertParts(sql: Sql, parts: PartInsert[]): Promise<void> {
|
||||
if (parts.length === 0) return;
|
||||
// postgres-js fans out an array of objects to a multi-row INSERT. Each
|
||||
// payload field needs sql.json() so jsonb storage receives a JSON value
|
||||
// rather than a quoted string.
|
||||
await sql`
|
||||
INSERT INTO message_parts ${sql(
|
||||
parts.map((p) => ({
|
||||
message_id: p.message_id,
|
||||
sequence: p.sequence,
|
||||
kind: p.kind,
|
||||
payload: sql.json(p.payload as never),
|
||||
})),
|
||||
'message_id',
|
||||
'sequence',
|
||||
'kind',
|
||||
'payload',
|
||||
)}
|
||||
`;
|
||||
}
|
||||
|
||||
// Derive parts from the canonical messages row for an assistant message.
|
||||
// content (when non-empty) becomes a 'text' part at sequence 0; each tool_call
|
||||
// becomes a 'tool_call' part with payload { id, name, args } where args is
|
||||
// the parsed object (we use the in-memory ToolCall shape, not the OpenAI
|
||||
// stringified one).
|
||||
export function partsFromAssistantMessage(args: {
|
||||
content: string;
|
||||
tool_calls: ToolCall[] | null;
|
||||
}): Omit<PartInsert, 'message_id'>[] {
|
||||
const out: Omit<PartInsert, 'message_id'>[] = [];
|
||||
let seq = 0;
|
||||
if (args.content && args.content.length > 0) {
|
||||
out.push({ sequence: seq, kind: 'text', payload: { text: args.content } });
|
||||
seq += 1;
|
||||
}
|
||||
for (const tc of args.tool_calls ?? []) {
|
||||
out.push({
|
||||
sequence: seq,
|
||||
kind: 'tool_call',
|
||||
payload: { id: tc.id, name: tc.name, args: tc.args },
|
||||
});
|
||||
seq += 1;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// Derive a single tool_result part from a tool message's tool_results JSON.
|
||||
// The payload includes the same shape that buildMessagesPayload reads from
|
||||
// later: tool_call_id, output, optional error/truncated metadata.
|
||||
export function partsFromToolMessage(args: {
|
||||
tool_results: ToolResult | null;
|
||||
}): Omit<PartInsert, 'message_id'>[] {
|
||||
if (!args.tool_results) return [];
|
||||
const tr = args.tool_results;
|
||||
return [
|
||||
{
|
||||
sequence: 0,
|
||||
kind: 'tool_result',
|
||||
payload: {
|
||||
tool_call_id: tr.tool_call_id,
|
||||
output: tr.output,
|
||||
truncated: tr.truncated,
|
||||
...(tr.error ? { error: tr.error } : {}),
|
||||
},
|
||||
},
|
||||
];
|
||||
}
|
||||
@@ -3,6 +3,7 @@ import * as modelContext from '../model-context.js';
|
||||
import { PathScopeError } from '../path_guard.js';
|
||||
import { TOOLS_BY_NAME } from '../tools.js';
|
||||
import { maybeFlagForCompaction } from './payload.js';
|
||||
import { insertParts, partsFromAssistantMessage, partsFromToolMessage } from './parts.js';
|
||||
import type {
|
||||
InferenceContext,
|
||||
StreamResult,
|
||||
@@ -97,6 +98,20 @@ export async function executeToolPhase(
|
||||
WHERE id = ${assistantMessageId}
|
||||
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
||||
`;
|
||||
// v1.13.0: dual-write to message_parts. Reads still consume the JSON
|
||||
// columns above; this mirrors the same data into the granular table so
|
||||
// the AI SDK migration can swap reads later without a backfill window.
|
||||
// TODO(v1.13.1): wrap the UPDATE above and this insertParts in a single
|
||||
// sql.begin before flipping read authority to message_parts. Without the
|
||||
// transaction, a crash between the two leaves an orphan message that
|
||||
// becomes invisible in the parts-authoritative read path.
|
||||
await insertParts(
|
||||
ctx.sql,
|
||||
partsFromAssistantMessage({ content, tool_calls: toolCalls }).map((p) => ({
|
||||
...p,
|
||||
message_id: assistantMessageId,
|
||||
})),
|
||||
);
|
||||
// v1.11: flag for compaction if this turn pushed us over the usable budget.
|
||||
// We never compact mid-loop (the recursive runAssistantTurn keeps tools
|
||||
// flowing); the flag fires on the NEXT turn's pre-fetch hook above.
|
||||
@@ -150,6 +165,18 @@ export async function executeToolPhase(
|
||||
SET tool_results = ${ctx.sql.json(sentinel as never)}
|
||||
WHERE id = ${toolMessageId}
|
||||
`;
|
||||
// v1.13.0: mirror the pending sentinel into message_parts. The
|
||||
// answer-endpoint UPDATE later (messages.ts:576) will delete and
|
||||
// re-insert this part when the user submits their answer.
|
||||
// TODO(v1.13.1): wrap the INSERT + UPDATE + insertParts triple in
|
||||
// a per-iteration sql.begin before flipping read authority.
|
||||
await insertParts(
|
||||
ctx.sql,
|
||||
partsFromToolMessage({ tool_results: sentinel }).map((p) => ({
|
||||
...p,
|
||||
message_id: toolMessageId,
|
||||
})),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const tres = await executeToolCall(projectRoot, tc);
|
||||
@@ -164,6 +191,16 @@ export async function executeToolPhase(
|
||||
SET tool_results = ${ctx.sql.json(stored as never)}
|
||||
WHERE id = ${toolMessageId}
|
||||
`;
|
||||
// v1.13.0: dual-write the tool_result part.
|
||||
// TODO(v1.13.1): wrap the INSERT + UPDATE + insertParts triple in a
|
||||
// per-iteration sql.begin before flipping read authority.
|
||||
await insertParts(
|
||||
ctx.sql,
|
||||
partsFromToolMessage({ tool_results: stored }).map((p) => ({
|
||||
...p,
|
||||
message_id: toolMessageId,
|
||||
})),
|
||||
);
|
||||
ctx.publish(sessionId, {
|
||||
type: 'tool_result',
|
||||
tool_message_id: toolMessageId,
|
||||
|
||||
Reference in New Issue
Block a user