The CoderPane runs its own inference runner and broker on the boocoder service. The AskUserInputCard was calling /api/chats/:id/answer_user_input on the main BooChat server, which has a different inference runner — the answer was accepted but the next turn was enqueued on the wrong runner, so nothing happened. Fix: register the same answer_user_input endpoint on the boocoder, and add an apiPrefix prop to AskUserInputCard so the CoderPane routes through /api/coder/chats/:id/answer_user_input. BooChat's MessageList continues to use the default (no prefix) path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
403 lines
14 KiB
TypeScript
403 lines
14 KiB
TypeScript
import type { FastifyInstance } from 'fastify';
|
|
import { z } from 'zod';
|
|
import type { Sql } from '../db.js';
|
|
import type { Broker } from '@boocode/server/broker';
|
|
import type { WsFrame } from '@boocode/server/ws-frames';
|
|
import { resolveChatId } from './chat-resolve.js';
|
|
|
|
const AnswerUserInputBody = z.object({
|
|
tool_call_id: z.string().min(1),
|
|
answers: z
|
|
.array(
|
|
z.object({
|
|
question: z.string(),
|
|
selected_options: z.array(z.string()),
|
|
free_text: z.string().nullable(),
|
|
}),
|
|
)
|
|
.min(1)
|
|
.max(3),
|
|
});
|
|
|
|
const AskUserInputArgs = z.object({
|
|
questions: z
|
|
.array(
|
|
z.object({
|
|
question: z.string(),
|
|
type: z.enum(['single_select', 'multi_select']),
|
|
options: z.array(z.string()).min(1),
|
|
}),
|
|
)
|
|
.min(1)
|
|
.max(3),
|
|
});
|
|
|
|
const SendBody = z.object({
|
|
content: z.string().min(1).max(64_000),
|
|
pane_id: z.string().min(1).max(200),
|
|
chat_id: z.string().uuid().optional(),
|
|
provider: z.string().max(100).optional(),
|
|
model: z.string().max(200).optional(),
|
|
mode_id: z.string().max(200).optional(),
|
|
thinking_option_id: z.string().max(200).optional(),
|
|
});
|
|
|
|
interface InferenceApi {
|
|
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
|
|
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
|
|
hasActive: (chatId: string) => boolean;
|
|
}
|
|
|
|
interface MessageRow {
|
|
id: string;
|
|
role: string;
|
|
content: string | null;
|
|
status: string | null;
|
|
tool_calls: Array<{ id: string; name: string; args?: Record<string, unknown> }> | null;
|
|
tool_results: {
|
|
tool_call_id: string;
|
|
output: unknown;
|
|
truncated?: boolean;
|
|
error?: string;
|
|
} | null;
|
|
reasoning_parts: Array<{ text?: string }> | null;
|
|
}
|
|
|
|
function mapCoderMessageRow(row: MessageRow) {
|
|
if (row.role === 'tool') {
|
|
if (!row.tool_results?.tool_call_id) return null;
|
|
return {
|
|
id: row.id,
|
|
role: 'tool' as const,
|
|
tool_results: row.tool_results,
|
|
};
|
|
}
|
|
if (row.role !== 'user' && row.role !== 'assistant' && row.role !== 'system') {
|
|
return null;
|
|
}
|
|
const tool_calls = row.tool_calls?.map((tc) => ({
|
|
id: tc.id,
|
|
function: {
|
|
name: tc.name,
|
|
arguments: JSON.stringify(tc.args ?? {}),
|
|
},
|
|
}));
|
|
const reasoningText = row.reasoning_parts?.map((p) => p.text ?? '').join('') ?? '';
|
|
return {
|
|
id: row.id,
|
|
role: row.role as 'user' | 'assistant' | 'system',
|
|
content: row.content ?? '',
|
|
status: (row.status ?? 'complete') as 'streaming' | 'complete' | 'failed',
|
|
...(reasoningText ? { reasoning_text: reasoningText } : {}),
|
|
...(tool_calls?.length ? { tool_calls } : {}),
|
|
};
|
|
}
|
|
|
|
export function registerMessageRoutes(
|
|
app: FastifyInstance,
|
|
sql: Sql,
|
|
broker: Broker,
|
|
inference: InferenceApi,
|
|
): void {
|
|
// GET /api/sessions/:sessionId/messages — hydrate CoderPane on load / reconnect
|
|
app.get<{ Params: { sessionId: string }; Querystring: { chat_id?: string } }>(
|
|
'/api/sessions/:sessionId/messages',
|
|
async (req, reply) => {
|
|
const sessionId = req.params.sessionId;
|
|
const chatId = req.query.chat_id;
|
|
const sessionRows = await sql<{ id: string }[]>`
|
|
SELECT id FROM sessions WHERE id = ${sessionId}
|
|
`;
|
|
if (sessionRows.length === 0) {
|
|
reply.code(404);
|
|
return { error: 'session not found' };
|
|
}
|
|
|
|
if (chatId) {
|
|
const chatRows = await sql<{ id: string }[]>`
|
|
SELECT id FROM chats
|
|
WHERE id = ${chatId} AND session_id = ${sessionId} AND status = 'open'
|
|
`;
|
|
if (chatRows.length === 0) {
|
|
reply.code(404);
|
|
return { error: 'chat not found or not open in this session' };
|
|
}
|
|
}
|
|
|
|
const rows = chatId
|
|
? await sql<MessageRow[]>`
|
|
SELECT id, role, content, status, tool_calls, tool_results, reasoning_parts
|
|
FROM messages_with_parts
|
|
WHERE session_id = ${sessionId} AND chat_id = ${chatId}
|
|
ORDER BY created_at ASC, id ASC
|
|
`
|
|
: await sql<MessageRow[]>`
|
|
SELECT id, role, content, status, tool_calls, tool_results, reasoning_parts
|
|
FROM messages_with_parts
|
|
WHERE session_id = ${sessionId}
|
|
ORDER BY created_at ASC, id ASC
|
|
`;
|
|
|
|
return rows.map(mapCoderMessageRow).filter((m) => m !== null);
|
|
},
|
|
);
|
|
|
|
// POST /api/sessions/:sessionId/messages — send a user message + kick off inference
|
|
app.post<{ Params: { sessionId: string } }>(
|
|
'/api/sessions/:sessionId/messages',
|
|
async (req, reply) => {
|
|
const parsed = SendBody.safeParse(req.body);
|
|
if (!parsed.success) {
|
|
reply.code(400);
|
|
return { error: 'invalid body', details: parsed.error.flatten() };
|
|
}
|
|
|
|
const sessionId = req.params.sessionId;
|
|
const { content, pane_id, chat_id: explicitChatId, provider, model, mode_id, thinking_option_id } =
|
|
parsed.data;
|
|
const isExternal = provider && provider !== 'boocode';
|
|
|
|
// Validate session exists
|
|
const sessionRows = await sql<{ id: string; project_id: string }[]>`
|
|
SELECT id, project_id FROM sessions WHERE id = ${sessionId}
|
|
`;
|
|
if (sessionRows.length === 0) {
|
|
reply.code(404);
|
|
return { error: 'session not found' };
|
|
}
|
|
|
|
const resolved = await resolveChatId(sql, sessionId, pane_id);
|
|
if (!resolved) {
|
|
reply.code(404);
|
|
return { error: 'pane not found' };
|
|
}
|
|
|
|
let chatId = resolved;
|
|
if (explicitChatId) {
|
|
const chatRows = await sql<{ id: string }[]>`
|
|
SELECT id FROM chats WHERE id = ${explicitChatId} AND session_id = ${sessionId} AND status = 'open'
|
|
`;
|
|
if (chatRows.length === 0) {
|
|
reply.code(404);
|
|
return { error: 'chat not found or not open in this session' };
|
|
}
|
|
chatId = explicitChatId;
|
|
}
|
|
|
|
if (!isExternal) {
|
|
// Reject if inference is already running on this chat
|
|
if (inference.hasActive(chatId)) {
|
|
reply.code(409);
|
|
return { error: 'inference already running on this chat' };
|
|
}
|
|
}
|
|
|
|
// Create user message
|
|
const [userMsg] = await sql<{ id: string }[]>`
|
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
VALUES (${sessionId}, ${chatId}, 'user', ${content}, 'complete', clock_timestamp())
|
|
RETURNING id
|
|
`;
|
|
await sql`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
|
|
await sql`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chatId}`;
|
|
|
|
// Publish user message frames
|
|
broker.publishFrame(sessionId, {
|
|
type: 'message_started',
|
|
message_id: userMsg!.id,
|
|
chat_id: chatId,
|
|
role: 'user',
|
|
} as unknown as WsFrame);
|
|
broker.publishFrame(sessionId, {
|
|
type: 'delta',
|
|
message_id: userMsg!.id,
|
|
chat_id: chatId,
|
|
content,
|
|
} as unknown as WsFrame);
|
|
broker.publishFrame(sessionId, {
|
|
type: 'message_complete',
|
|
message_id: userMsg!.id,
|
|
chat_id: chatId,
|
|
} as unknown as WsFrame);
|
|
|
|
if (isExternal) {
|
|
// External provider: create a task for the dispatcher
|
|
const projectId = sessionRows[0]!.project_id;
|
|
const [task] = await sql<{ id: string; state: string }[]>`
|
|
INSERT INTO tasks (project_id, input, agent, model, mode_id, thinking_option_id, session_id)
|
|
VALUES (${projectId}, ${content}, ${provider}, ${model ?? null}, ${mode_id ?? null}, ${thinking_option_id ?? null}, ${sessionId})
|
|
RETURNING id, state
|
|
`;
|
|
reply.code(202);
|
|
return { user_message_id: userMsg!.id, task_id: task!.id, dispatched: true };
|
|
}
|
|
|
|
// Native provider: create streaming assistant row + enqueue inference
|
|
const [assistantMsg] = await sql<{ id: string }[]>`
|
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
|
RETURNING id
|
|
`;
|
|
|
|
inference.enqueue(sessionId, chatId, assistantMsg!.id, 'default');
|
|
|
|
reply.code(202);
|
|
return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id };
|
|
},
|
|
);
|
|
|
|
// POST /api/chats/:id/answer_user_input — answer a pending ask_user_input
|
|
app.post<{ Params: { id: string } }>(
|
|
'/api/chats/:id/answer_user_input',
|
|
async (req, reply) => {
|
|
const parsed = AnswerUserInputBody.safeParse(req.body);
|
|
if (!parsed.success) {
|
|
reply.code(400);
|
|
return { error: 'invalid_body', details: parsed.error.flatten() };
|
|
}
|
|
const { tool_call_id, answers } = parsed.data;
|
|
|
|
const chatRows = await sql<{ id: string; session_id: string }[]>`
|
|
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
|
|
`;
|
|
if (chatRows.length === 0) {
|
|
reply.code(404);
|
|
return { error: 'chat_not_found' };
|
|
}
|
|
const chat = chatRows[0]!;
|
|
const sessionId = chat.session_id;
|
|
|
|
const callerRows = await sql<{
|
|
message_id: string;
|
|
payload: { id: string; name: string; args: Record<string, unknown> };
|
|
}[]>`
|
|
SELECT p.message_id, p.payload
|
|
FROM message_parts p
|
|
JOIN messages m ON m.id = p.message_id
|
|
WHERE m.chat_id = ${chat.id}
|
|
AND m.role = 'assistant'
|
|
AND p.kind = 'tool_call'
|
|
AND p.payload->>'id' = ${tool_call_id}
|
|
ORDER BY m.created_at DESC
|
|
LIMIT 1
|
|
`;
|
|
if (!callerRows[0]) {
|
|
reply.code(404);
|
|
return { error: 'unknown_tool_call_id' };
|
|
}
|
|
const foundCall = callerRows[0].payload;
|
|
if (foundCall.name !== 'ask_user_input') {
|
|
reply.code(400);
|
|
return { error: 'tool_call_not_ask_user_input' };
|
|
}
|
|
|
|
const argsParsed = AskUserInputArgs.safeParse(foundCall.args);
|
|
if (!argsParsed.success) {
|
|
reply.code(400);
|
|
return { error: 'mismatched_answer_shape', detail: 'tool_call args invalid' };
|
|
}
|
|
const questions = argsParsed.data.questions;
|
|
if (answers.length !== questions.length) {
|
|
reply.code(400);
|
|
return { error: 'mismatched_answer_shape', detail: `expected ${questions.length} answer(s), got ${answers.length}` };
|
|
}
|
|
for (let i = 0; i < questions.length; i++) {
|
|
const q = questions[i]!;
|
|
const a = answers[i]!;
|
|
for (const sel of a.selected_options) {
|
|
if (!q.options.includes(sel)) {
|
|
reply.code(400);
|
|
return { error: 'mismatched_answer_shape', detail: `answer ${i + 1} option not in question: ${sel}` };
|
|
}
|
|
}
|
|
if (q.type === 'single_select' && a.selected_options.length > 1) {
|
|
reply.code(400);
|
|
return { error: 'mismatched_answer_shape', detail: `answer ${i + 1} multi on single_select` };
|
|
}
|
|
if (a.selected_options.length === 0 && (!a.free_text || !a.free_text.trim())) {
|
|
reply.code(400);
|
|
return { error: 'mismatched_answer_shape', detail: `answer ${i + 1} is empty` };
|
|
}
|
|
}
|
|
|
|
const toolRows = await sql<{
|
|
message_id: string;
|
|
payload: { tool_call_id: string; output: unknown };
|
|
}[]>`
|
|
SELECT p.message_id, p.payload
|
|
FROM message_parts p
|
|
JOIN messages m ON m.id = p.message_id
|
|
WHERE m.chat_id = ${chat.id}
|
|
AND m.role = 'tool'
|
|
AND p.kind = 'tool_result'
|
|
AND p.payload->>'tool_call_id' = ${tool_call_id}
|
|
ORDER BY m.created_at DESC
|
|
LIMIT 1
|
|
`;
|
|
if (!toolRows[0]) {
|
|
reply.code(404);
|
|
return { error: 'unknown_tool_call_id', detail: 'tool message not found' };
|
|
}
|
|
if (toolRows[0].payload?.output !== null) {
|
|
reply.code(409);
|
|
return { error: 'tool_call_already_answered' };
|
|
}
|
|
|
|
const answerSet = { answers };
|
|
const newToolResults = { tool_call_id, output: answerSet, truncated: false };
|
|
const toolMessageId = toolRows[0].message_id;
|
|
|
|
const result = await sql.begin(async (tx) => {
|
|
await tx`DELETE FROM message_parts WHERE message_id = ${toolMessageId} AND kind = 'tool_result'`;
|
|
await tx`
|
|
INSERT INTO message_parts (message_id, sequence, kind, payload)
|
|
VALUES (${toolMessageId}, 0, 'tool_result', ${tx.json(newToolResults as never)})
|
|
`;
|
|
const [assistantMsg] = await tx<{ id: string }[]>`
|
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp())
|
|
RETURNING id
|
|
`;
|
|
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
|
|
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`;
|
|
return { tool_message_id: toolMessageId, assistant_message_id: assistantMsg!.id };
|
|
});
|
|
|
|
broker.publishFrame(sessionId, {
|
|
type: 'tool_result',
|
|
tool_message_id: result.tool_message_id,
|
|
tool_call_id,
|
|
chat_id: chat.id,
|
|
output: answerSet,
|
|
truncated: false,
|
|
} as unknown as WsFrame);
|
|
inference.enqueue(sessionId, chat.id, result.assistant_message_id, 'default');
|
|
|
|
reply.code(202);
|
|
return result;
|
|
},
|
|
);
|
|
|
|
// POST /api/sessions/:sessionId/stop — cancel active inference
|
|
app.post<{ Params: { sessionId: string } }>(
|
|
'/api/sessions/:sessionId/stop',
|
|
async (req, reply) => {
|
|
const sessionId = req.params.sessionId;
|
|
|
|
// Find active chats in this session
|
|
const chats = await sql<{ id: string }[]>`
|
|
SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open'
|
|
`;
|
|
let cancelled = false;
|
|
for (const chat of chats) {
|
|
if (inference.hasActive(chat.id)) {
|
|
cancelled = await inference.cancel(sessionId, chat.id);
|
|
break;
|
|
}
|
|
}
|
|
|
|
return { cancelled };
|
|
},
|
|
);
|
|
}
|