import type { FastifyInstance } from 'fastify'; import { z } from 'zod'; import type { Sql } from '../db.js'; import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/server/ws-frames'; const SendBody = z.object({ content: z.string().min(1).max(64_000), chat_id: z.string().uuid(), }); interface InferenceApi { enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void; cancel: (sessionId: string, chatId: string) => Promise; hasActive: (chatId: string) => boolean; } export function registerMessageRoutes( app: FastifyInstance, sql: Sql, broker: Broker, inference: InferenceApi, ): void { // POST /api/sessions/:sessionId/messages — send a user message + kick off inference app.post<{ Params: { sessionId: string } }>( '/api/sessions/:sessionId/messages', async (req, reply) => { const parsed = SendBody.safeParse(req.body); if (!parsed.success) { reply.code(400); return { error: 'invalid body', details: parsed.error.flatten() }; } const sessionId = req.params.sessionId; const { content, chat_id: chatId } = parsed.data; // Validate session exists const sessionRows = await sql<{ id: string }[]>` SELECT id FROM sessions WHERE id = ${sessionId} `; if (sessionRows.length === 0) { reply.code(404); return { error: 'session not found' }; } // Validate chat belongs to session and is open const chatRows = await sql<{ id: string; session_id: string }[]>` SELECT id, session_id FROM chats WHERE id = ${chatId} AND session_id = ${sessionId} AND status = 'open' `; if (chatRows.length === 0) { reply.code(404); return { error: 'chat not found or not open in this session' }; } // Reject if inference is already running on this chat if (inference.hasActive(chatId)) { reply.code(409); return { error: 'inference already running on this chat' }; } // Create user message + streaming assistant row in a transaction const result = await sql.begin(async (tx) => { const [userMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${content}, 'complete', clock_timestamp()) RETURNING id `; const [assistantMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chatId}`; return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id }; }); // Publish user message frames so WS subscribers see it immediately broker.publishFrame(sessionId, { type: 'message_started', message_id: result.user_message_id, chat_id: chatId, role: 'user', } as unknown as WsFrame); broker.publishFrame(sessionId, { type: 'delta', message_id: result.user_message_id, chat_id: chatId, content, } as unknown as WsFrame); broker.publishFrame(sessionId, { type: 'message_complete', message_id: result.user_message_id, chat_id: chatId, } as unknown as WsFrame); // Enqueue inference — the runner will stream assistant deltas via broker inference.enqueue(sessionId, chatId, result.assistant_message_id, 'default'); reply.code(202); return result; }, ); // POST /api/sessions/:sessionId/stop — cancel active inference app.post<{ Params: { sessionId: string } }>( '/api/sessions/:sessionId/stop', async (req, reply) => { const sessionId = req.params.sessionId; // Find active chats in this session const chats = await sql<{ id: string }[]>` SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' `; let cancelled = false; for (const chat of chats) { if (inference.hasActive(chat.id)) { cancelled = await inference.cancel(sessionId, chat.id); break; } } return { cancelled }; }, ); }