import type { FastifyInstance } from 'fastify'; import { z } from 'zod'; import type { Sql } from '../db.js'; import type { Message, Session } from '../types/api.js'; const SendBody = z.object({ content: z.string().min(1).max(64_000), }); interface MessageHandlers { enqueueInference: (sessionId: string, assistantMessageId: string) => void; publishUserMessage: ( sessionId: string, userMessageId: string, content: string ) => void; publishMessagesDeleted: (sessionId: string, messageIds: string[]) => void; } export function registerMessageRoutes( app: FastifyInstance, sql: Sql, handlers: MessageHandlers ): void { app.get<{ Params: { id: string } }>( '/api/sessions/:id/messages', async (req, reply) => { const session = await sql`SELECT id FROM sessions WHERE id = ${req.params.id}`; if (session.length === 0) { reply.code(404); return { error: 'session not found' }; } const rows = await sql` SELECT id, session_id, role, content, tool_calls, tool_results, status, last_seq, tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at FROM messages WHERE session_id = ${req.params.id} ORDER BY created_at ASC, id ASC `; return rows; } ); app.post<{ Params: { id: string } }>( '/api/sessions/:id/messages', async (req, reply) => { const parsed = SendBody.safeParse(req.body); if (!parsed.success) { reply.code(400); return { error: 'invalid body', details: parsed.error.flatten() }; } const session = await sql`SELECT id FROM sessions WHERE id = ${req.params.id}`; if (session.length === 0) { reply.code(404); return { error: 'session not found' }; } const result = await sql.begin(async (tx) => { const [userMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, role, content, status, created_at) VALUES (${req.params.id}, 'user', ${parsed.data.content}, 'complete', clock_timestamp()) RETURNING id `; const [assistantMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, role, content, status, created_at) VALUES (${req.params.id}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; await tx`UPDATE sessions SET updated_at = NOW() WHERE id = ${req.params.id}`; return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id }; }); handlers.publishUserMessage( req.params.id, result.user_message_id, parsed.data.content ); handlers.enqueueInference(req.params.id, result.assistant_message_id); reply.code(202); return result; } ); app.post<{ Params: { id: string; message_id: string } }>( '/api/sessions/:id/messages/:message_id/regenerate', async (req, reply) => { const { id: sessionId, message_id: targetId } = req.params; const target = await sql<{ id: string; role: string; status: string }[]>` SELECT id, role, status FROM messages WHERE session_id = ${sessionId} AND id = ${targetId} `; if (target.length === 0) { reply.code(404); return { error: 'message not found' }; } const targetRow = target[0]!; if (targetRow.role !== 'assistant') { reply.code(400); return { error: 'only assistant messages can be regenerated' }; } if (targetRow.status === 'streaming') { reply.code(409); return { error: 'message is still streaming' }; } const { newAssistantId, deletedIds } = await sql.begin(async (tx) => { // Subquery keeps created_at in postgres at TIMESTAMPTZ µs precision. // Round-tripping through JS Date loses sub-ms precision and can pull // earlier rows (e.g. the triggering user message) into the >= bound. const deletedRows = await tx<{ id: string }[]>` DELETE FROM messages WHERE session_id = ${sessionId} AND created_at >= ( SELECT created_at FROM messages WHERE id = ${targetId} ) RETURNING id `; const [row] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, role, content, status, created_at) VALUES (${sessionId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; await tx`UPDATE sessions SET updated_at = NOW() WHERE id = ${sessionId}`; return { newAssistantId: row!.id, deletedIds: deletedRows.map((r) => r.id), }; }); handlers.publishMessagesDeleted(sessionId, deletedIds); handlers.enqueueInference(sessionId, newAssistantId); reply.code(202); return { assistant_message_id: newAssistantId }; } ); }