import type { FastifyInstance } from 'fastify'; import { z } from 'zod'; import type { Sql } from '../db.js'; import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/server/ws-frames'; import { resolveChatId } from './chat-resolve.js'; const AnswerUserInputBody = z.object({ tool_call_id: z.string().min(1), answers: z .array( z.object({ question: z.string(), selected_options: z.array(z.string()), free_text: z.string().nullable(), }), ) .min(1) .max(3), }); const AskUserInputArgs = z.object({ questions: z .array( z.object({ question: z.string(), type: z.enum(['single_select', 'multi_select']), options: z.array(z.string()).min(1), }), ) .min(1) .max(3), }); const SendBody = z.object({ content: z.string().min(1).max(64_000), pane_id: z.string().min(1).max(200), chat_id: z.string().uuid().optional(), provider: z.string().max(100).optional(), model: z.string().max(200).optional(), mode_id: z.string().max(200).optional(), thinking_option_id: z.string().max(200).optional(), }); interface InferenceApi { enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void; cancel: (sessionId: string, chatId: string) => Promise; hasActive: (chatId: string) => boolean; } interface MessageRow { id: string; role: string; content: string | null; status: string | null; tool_calls: Array<{ id: string; name: string; args?: Record }> | null; tool_results: { tool_call_id: string; output: unknown; truncated?: boolean; error?: string; } | null; reasoning_parts: Array<{ text?: string }> | null; } function mapCoderMessageRow(row: MessageRow) { if (row.role === 'tool') { if (!row.tool_results?.tool_call_id) return null; return { id: row.id, role: 'tool' as const, tool_results: row.tool_results, }; } if (row.role !== 'user' && row.role !== 'assistant' && row.role !== 'system') { return null; } const tool_calls = row.tool_calls?.map((tc) => ({ id: tc.id, function: { name: tc.name, arguments: JSON.stringify(tc.args ?? {}), }, })); const reasoningText = row.reasoning_parts?.map((p) => p.text ?? '').join('') ?? ''; return { id: row.id, role: row.role as 'user' | 'assistant' | 'system', content: row.content ?? '', status: (row.status ?? 'complete') as 'streaming' | 'complete' | 'failed', ...(reasoningText ? { reasoning_text: reasoningText } : {}), ...(tool_calls?.length ? { tool_calls } : {}), }; } export function registerMessageRoutes( app: FastifyInstance, sql: Sql, broker: Broker, inference: InferenceApi, ): void { // GET /api/sessions/:sessionId/messages — hydrate CoderPane on load / reconnect app.get<{ Params: { sessionId: string }; Querystring: { chat_id?: string } }>( '/api/sessions/:sessionId/messages', async (req, reply) => { const sessionId = req.params.sessionId; const chatId = req.query.chat_id; const sessionRows = await sql<{ id: string }[]>` SELECT id FROM sessions WHERE id = ${sessionId} `; if (sessionRows.length === 0) { reply.code(404); return { error: 'session not found' }; } if (chatId) { const chatRows = await sql<{ id: string }[]>` SELECT id FROM chats WHERE id = ${chatId} AND session_id = ${sessionId} AND status = 'open' `; if (chatRows.length === 0) { reply.code(404); return { error: 'chat not found or not open in this session' }; } } const rows = chatId ? await sql` SELECT id, role, content, status, tool_calls, tool_results, reasoning_parts FROM messages_with_parts WHERE session_id = ${sessionId} AND chat_id = ${chatId} ORDER BY created_at ASC, id ASC ` : await sql` SELECT id, role, content, status, tool_calls, tool_results, reasoning_parts FROM messages_with_parts WHERE session_id = ${sessionId} ORDER BY created_at ASC, id ASC `; return rows.map(mapCoderMessageRow).filter((m) => m !== null); }, ); // POST /api/sessions/:sessionId/messages — send a user message + kick off inference app.post<{ Params: { sessionId: string } }>( '/api/sessions/:sessionId/messages', async (req, reply) => { const parsed = SendBody.safeParse(req.body); if (!parsed.success) { reply.code(400); return { error: 'invalid body', details: parsed.error.flatten() }; } const sessionId = req.params.sessionId; const { content, pane_id, chat_id: explicitChatId, provider, model, mode_id, thinking_option_id } = parsed.data; const isExternal = provider && provider !== 'boocode'; // Validate session exists const sessionRows = await sql<{ id: string; project_id: string }[]>` SELECT id, project_id FROM sessions WHERE id = ${sessionId} `; if (sessionRows.length === 0) { reply.code(404); return { error: 'session not found' }; } const resolved = await resolveChatId(sql, sessionId, pane_id); if (!resolved) { reply.code(404); return { error: 'pane not found' }; } let chatId = resolved; if (explicitChatId) { const chatRows = await sql<{ id: string }[]>` SELECT id FROM chats WHERE id = ${explicitChatId} AND session_id = ${sessionId} AND status = 'open' `; if (chatRows.length === 0) { reply.code(404); return { error: 'chat not found or not open in this session' }; } chatId = explicitChatId; } if (!isExternal) { // Reject if inference is already running on this chat if (inference.hasActive(chatId)) { reply.code(409); return { error: 'inference already running on this chat' }; } } // Create user message const [userMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${content}, 'complete', clock_timestamp()) RETURNING id `; await sql`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; await sql`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chatId}`; // Publish user message frames broker.publishFrame(sessionId, { type: 'message_started', message_id: userMsg!.id, chat_id: chatId, role: 'user', } as unknown as WsFrame); broker.publishFrame(sessionId, { type: 'delta', message_id: userMsg!.id, chat_id: chatId, content, } as unknown as WsFrame); broker.publishFrame(sessionId, { type: 'message_complete', message_id: userMsg!.id, chat_id: chatId, } as unknown as WsFrame); if (isExternal) { // External provider: create a task for the dispatcher const projectId = sessionRows[0]!.project_id; const [task] = await sql<{ id: string; state: string }[]>` INSERT INTO tasks (project_id, input, agent, model, mode_id, thinking_option_id, session_id) VALUES (${projectId}, ${content}, ${provider}, ${model ?? null}, ${mode_id ?? null}, ${thinking_option_id ?? null}, ${sessionId}) RETURNING id, state `; reply.code(202); return { user_message_id: userMsg!.id, task_id: task!.id, dispatched: true }; } // Native provider: create streaming assistant row + enqueue inference const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; inference.enqueue(sessionId, chatId, assistantMsg!.id, 'default'); reply.code(202); return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id }; }, ); // POST /api/chats/:id/answer_user_input — answer a pending ask_user_input app.post<{ Params: { id: string } }>( '/api/chats/:id/answer_user_input', async (req, reply) => { const parsed = AnswerUserInputBody.safeParse(req.body); if (!parsed.success) { reply.code(400); return { error: 'invalid_body', details: parsed.error.flatten() }; } const { tool_call_id, answers } = parsed.data; const chatRows = await sql<{ id: string; session_id: string }[]>` SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open' `; if (chatRows.length === 0) { reply.code(404); return { error: 'chat_not_found' }; } const chat = chatRows[0]!; const sessionId = chat.session_id; const callerRows = await sql<{ message_id: string; payload: { id: string; name: string; args: Record }; }[]>` SELECT p.message_id, p.payload FROM message_parts p JOIN messages m ON m.id = p.message_id WHERE m.chat_id = ${chat.id} AND m.role = 'assistant' AND p.kind = 'tool_call' AND p.payload->>'id' = ${tool_call_id} ORDER BY m.created_at DESC LIMIT 1 `; if (!callerRows[0]) { reply.code(404); return { error: 'unknown_tool_call_id' }; } const foundCall = callerRows[0].payload; if (foundCall.name !== 'ask_user_input') { reply.code(400); return { error: 'tool_call_not_ask_user_input' }; } const argsParsed = AskUserInputArgs.safeParse(foundCall.args); if (!argsParsed.success) { reply.code(400); return { error: 'mismatched_answer_shape', detail: 'tool_call args invalid' }; } const questions = argsParsed.data.questions; if (answers.length !== questions.length) { reply.code(400); return { error: 'mismatched_answer_shape', detail: `expected ${questions.length} answer(s), got ${answers.length}` }; } for (let i = 0; i < questions.length; i++) { const q = questions[i]!; const a = answers[i]!; for (const sel of a.selected_options) { if (!q.options.includes(sel)) { reply.code(400); return { error: 'mismatched_answer_shape', detail: `answer ${i + 1} option not in question: ${sel}` }; } } if (q.type === 'single_select' && a.selected_options.length > 1) { reply.code(400); return { error: 'mismatched_answer_shape', detail: `answer ${i + 1} multi on single_select` }; } if (a.selected_options.length === 0 && (!a.free_text || !a.free_text.trim())) { reply.code(400); return { error: 'mismatched_answer_shape', detail: `answer ${i + 1} is empty` }; } } const toolRows = await sql<{ message_id: string; payload: { tool_call_id: string; output: unknown }; }[]>` SELECT p.message_id, p.payload FROM message_parts p JOIN messages m ON m.id = p.message_id WHERE m.chat_id = ${chat.id} AND m.role = 'tool' AND p.kind = 'tool_result' AND p.payload->>'tool_call_id' = ${tool_call_id} ORDER BY m.created_at DESC LIMIT 1 `; if (!toolRows[0]) { reply.code(404); return { error: 'unknown_tool_call_id', detail: 'tool message not found' }; } if (toolRows[0].payload?.output !== null) { reply.code(409); return { error: 'tool_call_already_answered' }; } const answerSet = { answers }; const newToolResults = { tool_call_id, output: answerSet, truncated: false }; const toolMessageId = toolRows[0].message_id; const result = await sql.begin(async (tx) => { await tx`DELETE FROM message_parts WHERE message_id = ${toolMessageId} AND kind = 'tool_result'`; await tx` INSERT INTO message_parts (message_id, sequence, kind, payload) VALUES (${toolMessageId}, 0, 'tool_result', ${tx.json(newToolResults as never)}) `; const [assistantMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`; return { tool_message_id: toolMessageId, assistant_message_id: assistantMsg!.id }; }); broker.publishFrame(sessionId, { type: 'tool_result', tool_message_id: result.tool_message_id, tool_call_id, chat_id: chat.id, output: answerSet, truncated: false, } as unknown as WsFrame); inference.enqueue(sessionId, chat.id, result.assistant_message_id, 'default'); reply.code(202); return result; }, ); // POST /api/sessions/:sessionId/stop — cancel active inference app.post<{ Params: { sessionId: string } }>( '/api/sessions/:sessionId/stop', async (req, reply) => { const sessionId = req.params.sessionId; // Find active chats in this session const chats = await sql<{ id: string }[]>` SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' `; let cancelled = false; for (const chat of chats) { if (inference.hasActive(chat.id)) { cancelled = await inference.cancel(sessionId, chat.id); break; } } return { cancelled }; }, ); }