Files
boocode/apps/server/src/routes/messages.ts

598 lines
21 KiB
TypeScript

import type { FastifyInstance } from 'fastify';
import { z } from 'zod';
import type { Sql } from '../db.js';
import type { Chat, Message, Session, ToolCall } from '../types/api.js';
const SendBody = z.object({
content: z.string().min(1).max(64_000),
});
// v1.8.2: Continue extends an inference loop that hit the tool budget. Caller
// passes the sentinel message it's continuing from; server validates shape
// and the per-chat hard ceiling before resuming.
const ContinueBody = z.object({
sentinel_message_id: z.string().uuid(),
});
// Batch 9.7: ask_user_input answer submission. Defensive shape — the question
// content is echoed back for traceability but the server does NOT trust it
// (the source of truth is the assistant message's tool_calls.args.questions).
const AnswerUserInputBody = z.object({
tool_call_id: z.string().min(1),
answers: z
.array(
z.object({
question: z.string(),
selected_options: z.array(z.string()),
free_text: z.string().nullable(),
}),
)
.min(1)
.max(3),
});
// Same shape the model declared via the tool's zod input. Re-derived here so
// the route can validate args without depending on services/tools.ts (which
// would pull in fs/path_guard for nothing).
const AskUserInputArgs = z.object({
questions: z
.array(
z.object({
question: z.string(),
type: z.enum(['single_select', 'multi_select']),
options: z.array(z.string()).min(1),
}),
)
.min(1)
.max(3),
});
interface MessageHandlers {
enqueueInference: (sessionId: string, chatId: string, assistantMessageId: string, user: string) => void;
enqueueCompact: (sessionId: string, chatId: string, compactMessageId: string, user: string) => void;
publishUserMessage: (
sessionId: string,
chatId: string,
userMessageId: string,
content: string
) => void;
publishMessagesDeleted: (sessionId: string, chatId: string, messageIds: string[]) => void;
// Batch 9.7: lets the answer endpoint emit the tool_result frame that the
// pause path intentionally skipped. Matches SkillInvokeHandlers in
// routes/skills.ts so index.ts can pass the same broker.publish adapter.
publishSessionFrame: (
sessionId: string,
frame: Record<string, unknown> & { type: string }
) => void;
cancelInference: (sessionId: string, chatId: string) => Promise<boolean>;
hasActiveInference: (chatId: string) => boolean;
}
export function registerMessageRoutes(
app: FastifyInstance,
sql: Sql,
handlers: MessageHandlers
): void {
app.get<{ Params: { id: string } }>(
'/api/sessions/:id/messages',
async (req, reply) => {
const session = await sql<Session[]>`SELECT id FROM sessions WHERE id = ${req.params.id}`;
if (session.length === 0) {
reply.code(404);
return { error: 'session not found' };
}
const rows = await sql<Message[]>`
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata
FROM messages
WHERE session_id = ${req.params.id}
ORDER BY created_at ASC, id ASC
`;
return rows;
}
);
app.post<{ Params: { id: string } }>(
'/api/chats/:id/messages',
async (req, reply) => {
const parsed = SendBody.safeParse(req.body);
if (!parsed.success) {
reply.code(400);
return { error: 'invalid body', details: parsed.error.flatten() };
}
const chatRows = await sql<Chat[]>`
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat not found' };
}
const chat = chatRows[0]!;
const sessionId = chat.session_id;
const result = await sql.begin(async (tx) => {
const [userMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chat.id}, 'user', ${parsed.data.content}, 'complete', clock_timestamp())
RETURNING id
`;
const [assistantMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`;
return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id };
});
handlers.publishUserMessage(
sessionId,
chat.id,
result.user_message_id,
parsed.data.content
);
handlers.enqueueInference(sessionId, chat.id, result.assistant_message_id, 'default');
reply.code(202);
return result;
}
);
app.post<{ Params: { id: string; message_id: string } }>(
'/api/chats/:id/messages/:message_id/regenerate',
async (req, reply) => {
const { id: chatId, message_id: targetId } = req.params;
const chatRows = await sql<Chat[]>`
SELECT id, session_id FROM chats WHERE id = ${chatId}
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat not found' };
}
const chat = chatRows[0]!;
const sessionId = chat.session_id;
const target = await sql<{ id: string; role: string; status: string }[]>`
SELECT id, role, status
FROM messages
WHERE chat_id = ${chatId} AND id = ${targetId}
`;
if (target.length === 0) {
reply.code(404);
return { error: 'message not found' };
}
const targetRow = target[0]!;
if (targetRow.role !== 'assistant') {
reply.code(400);
return { error: 'only assistant messages can be regenerated' };
}
if (targetRow.status === 'streaming') {
reply.code(409);
return { error: 'message is still streaming' };
}
const { newAssistantId, deletedIds } = await sql.begin(async (tx) => {
const deletedRows = await tx<{ id: string }[]>`
DELETE FROM messages
WHERE chat_id = ${chatId}
AND created_at >= (
SELECT created_at FROM messages WHERE id = ${targetId}
)
RETURNING id
`;
const [row] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chatId}`;
return {
newAssistantId: row!.id,
deletedIds: deletedRows.map((r) => r.id),
};
});
handlers.publishMessagesDeleted(sessionId, chatId, deletedIds);
handlers.enqueueInference(sessionId, chatId, newAssistantId, 'default');
reply.code(202);
return { assistant_message_id: newAssistantId };
}
);
app.delete<{ Params: { id: string; message_id: string } }>(
'/api/chats/:id/messages/:message_id',
async (req, reply) => {
const { id: chatId, message_id: messageId } = req.params;
const chatRows = await sql<Chat[]>`
SELECT id, session_id FROM chats WHERE id = ${chatId}
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat not found' };
}
const chat = chatRows[0]!;
if (handlers.hasActiveInference(chatId)) {
reply.code(409);
return { error: 'chat is currently streaming; stop it first' };
}
const deletedIds = await sql.begin(async (tx) => {
const deletedRows = await tx<{ id: string }[]>`
DELETE FROM messages
WHERE chat_id = ${chatId}
AND created_at >= (
SELECT created_at FROM messages
WHERE id = ${messageId} AND chat_id = ${chatId}
)
RETURNING id
`;
if (deletedRows.length > 0) {
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chatId}`;
}
return deletedRows.map((r) => r.id);
});
if (deletedIds.length === 0) {
reply.code(404);
return { error: 'message not found' };
}
handlers.publishMessagesDeleted(chat.session_id, chatId, deletedIds);
reply.code(204);
return null;
}
);
app.post<{ Params: { id: string } }>(
'/api/chats/:id/compact',
async (req, reply) => {
const chatRows = await sql<Chat[]>`
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat not found' };
}
const chat = chatRows[0]!;
const sessionId = chat.session_id;
const [compactMsg] = await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, kind, status, created_at)
VALUES (${sessionId}, ${chat.id}, 'system', '', 'compact', 'streaming', clock_timestamp())
RETURNING id
`;
handlers.enqueueCompact(sessionId, chat.id, compactMsg!.id, 'default');
reply.code(202);
return { compact_message_id: compactMsg!.id };
}
);
app.post<{ Params: { id: string } }>(
'/api/chats/:id/stop',
async (req, reply) => {
const chatRows = await sql<Chat[]>`
SELECT id, session_id FROM chats WHERE id = ${req.params.id}
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat not found' };
}
const chat = chatRows[0]!;
const cancelled = await handlers.cancelInference(chat.session_id, chat.id);
if (!cancelled) {
reply.code(409);
return { error: 'no active generation to stop' };
}
reply.code(200);
return { stopped: true };
}
);
app.post<{ Params: { id: string } }>(
'/api/chats/:id/continue',
async (req, reply) => {
const parsed = ContinueBody.safeParse(req.body);
if (!parsed.success) {
reply.code(400);
return { error: 'invalid body', details: parsed.error.flatten() };
}
const chatRows = await sql<Chat[]>`
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat not found' };
}
const chat = chatRows[0]!;
const sessionId = chat.session_id;
// Cap-hit sentinels are only ever inserted after a turn completes, so
// there must not be an active inference at this moment. If there is,
// the client is racing the cap-hit summary that just emitted the
// sentinel — bail rather than enqueue a parallel run.
if (handlers.hasActiveInference(chat.id)) {
reply.code(409);
return { error: 'chat is currently streaming' };
}
const sentinel = await sql<{ metadata: { kind?: unknown; can_continue?: unknown } | null }[]>`
SELECT metadata
FROM messages
WHERE id = ${parsed.data.sentinel_message_id}
AND chat_id = ${chat.id}
AND role = 'system'
`;
if (sentinel.length === 0) {
reply.code(404);
return { error: 'sentinel not found' };
}
const meta = sentinel[0]!.metadata;
if (!meta || meta.kind !== 'cap_hit') {
reply.code(400);
return { error: 'message is not a cap-hit sentinel' };
}
// Server-side hard ceiling check. UI already disables the button when
// can_continue is false; defending against a stale tab or a direct
// API hit is the only reason this lives on the server too.
if (meta.can_continue !== true) {
reply.code(409);
return { error: 'hard limit reached for this chat' };
}
const result = await sql.begin(async (tx) => {
const [assistantMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`;
return { assistant_message_id: assistantMsg!.id };
});
handlers.enqueueInference(sessionId, chat.id, result.assistant_message_id, 'default');
reply.code(202);
return result;
}
);
app.post<{ Params: { id: string } }>(
'/api/chats/:id/force_send',
async (req, reply) => {
const parsed = SendBody.safeParse(req.body);
if (!parsed.success) {
reply.code(400);
return { error: 'invalid body', details: parsed.error.flatten() };
}
const chatRows = await sql<Chat[]>`
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat not found' };
}
const chat = chatRows[0]!;
const sessionId = chat.session_id;
// Await actual cancellation completion (catch block persists state).
// 5s timeout guards against llama-swap stalls; if hit, proceed anyway.
await Promise.race([
handlers.cancelInference(sessionId, chat.id).then(() => undefined),
new Promise<void>((_, rej) =>
setTimeout(() => rej(new Error('cancel-timeout')), 5000)
),
]).catch((e: Error) => {
if (e.message !== 'cancel-timeout') throw e;
req.log.warn({ chatId: chat.id }, 'cancel timeout exceeded, proceeding with force-send');
});
const result = await sql.begin(async (tx) => {
const [userMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chat.id}, 'user', ${parsed.data.content}, 'complete', clock_timestamp())
RETURNING id
`;
const [assistantMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`;
return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id };
});
handlers.publishUserMessage(
sessionId,
chat.id,
result.user_message_id,
parsed.data.content
);
handlers.enqueueInference(sessionId, chat.id, result.assistant_message_id, 'default');
reply.code(202);
return result;
}
);
// Batch 9.7: resume an ask_user_input pause. Validates the body matches the
// question shape the model declared, UPDATEs the pending tool row's
// tool_results to the AnswerSet, publishes the deferred tool_result frame,
// and enqueues the next assistant turn. Error codes per spec:
// 400 invalid_body / mismatched_answer_shape
// 404 chat_not_found / unknown_tool_call_id
// 409 tool_call_already_answered
app.post<{ Params: { id: string } }>(
'/api/chats/:id/answer_user_input',
async (req, reply) => {
const parsed = AnswerUserInputBody.safeParse(req.body);
if (!parsed.success) {
reply.code(400);
return { error: 'invalid_body', details: parsed.error.flatten() };
}
const { tool_call_id, answers } = parsed.data;
const chatRows = await sql<Chat[]>`
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat_not_found' };
}
const chat = chatRows[0]!;
const sessionId = chat.session_id;
// Find the assistant message that emitted this tool_call. Scoped by
// chat_id + role to avoid cross-chat lookups; ordered by created_at DESC
// because the most recent issuance wins when an LLM reuses call IDs
// across turns (the older, already-answered one is a different row with
// populated tool_results downstream).
const callerRows = await sql<{ id: string; tool_calls: ToolCall[] | null }[]>`
SELECT id, tool_calls FROM messages
WHERE chat_id = ${chat.id}
AND role = 'assistant'
AND tool_calls IS NOT NULL
ORDER BY created_at DESC
`;
let foundCall: ToolCall | null = null;
for (const row of callerRows) {
const match = row.tool_calls?.find((tc) => tc.id === tool_call_id);
if (match) {
foundCall = match;
break;
}
}
if (!foundCall) {
reply.code(404);
return { error: 'unknown_tool_call_id' };
}
if (foundCall.name !== 'ask_user_input') {
reply.code(400);
return { error: 'tool_call_not_ask_user_input' };
}
// Validate the args themselves — the LLM could have emitted bad JSON.
const argsParsed = AskUserInputArgs.safeParse(foundCall.args);
if (!argsParsed.success) {
reply.code(400);
return { error: 'mismatched_answer_shape', detail: 'tool_call args invalid' };
}
const questions = argsParsed.data.questions;
if (answers.length !== questions.length) {
reply.code(400);
return {
error: 'mismatched_answer_shape',
detail: `expected ${questions.length} answer(s), got ${answers.length}`,
};
}
for (let i = 0; i < questions.length; i++) {
const q = questions[i]!;
const a = answers[i]!;
for (const sel of a.selected_options) {
if (!q.options.includes(sel)) {
reply.code(400);
return {
error: 'mismatched_answer_shape',
detail: `answer ${i + 1} contains option not in question: ${sel}`,
};
}
}
if (q.type === 'single_select' && a.selected_options.length > 1) {
reply.code(400);
return {
error: 'mismatched_answer_shape',
detail: `answer ${i + 1} has multiple selections on single_select`,
};
}
const hasOpt = a.selected_options.length > 0;
const hasText = a.free_text !== null && a.free_text.trim().length > 0;
if (!hasOpt && !hasText) {
reply.code(400);
return { error: 'mismatched_answer_shape', detail: `answer ${i + 1} is empty` };
}
}
// Find the pending tool row. ORDER BY created_at DESC + LIMIT 1 picks
// the most recent row with this tool_call_id; the already-answered
// check below guards against UPDATE-ing a stale answer.
const toolRows = await sql<{
id: string;
tool_results: { tool_call_id: string; output: unknown } | null;
}[]>`
SELECT id, tool_results FROM messages
WHERE chat_id = ${chat.id}
AND role = 'tool'
AND tool_results->>'tool_call_id' = ${tool_call_id}
ORDER BY created_at DESC
LIMIT 1
`;
const toolRow = toolRows[0];
if (!toolRow) {
reply.code(404);
return { error: 'unknown_tool_call_id', detail: 'tool message not found' };
}
if (toolRow.tool_results && toolRow.tool_results.output !== null) {
reply.code(409);
return { error: 'tool_call_already_answered' };
}
const answerSet = { answers };
const newToolResults = {
tool_call_id,
output: answerSet,
truncated: false,
};
const result = await sql.begin(async (tx) => {
await tx`
UPDATE messages
SET tool_results = ${tx.json(newToolResults as never)}
WHERE id = ${toolRow.id}
`;
const [assistantMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`;
return {
tool_message_id: toolRow.id,
assistant_message_id: assistantMsg!.id,
};
});
// Publish the deferred tool_result frame. useSessionStream's reducer
// updates the matching tool_run.result so AskUserInputCard flips into
// its read-only "answered" mode without a refetch.
handlers.publishSessionFrame(sessionId, {
type: 'tool_result',
tool_message_id: result.tool_message_id,
tool_call_id,
chat_id: chat.id,
output: answerSet,
truncated: false,
});
handlers.enqueueInference(sessionId, chat.id, result.assistant_message_id, 'default');
reply.code(202);
return result;
},
);
}