v2.0.0-beta: write tools, pending-changes queue, inference loop, API routes

Phase 2 of v2.0. BooCoder is now a functional write-capable chatbot.

Write-path guard: resolveWritePath() uses resolve() (no realpath — files may
not exist for creates) + prefix-check + secret-file deny list (.env, *.pem,
id_rsa*, etc.). 23 unit tests cover traversal attacks.

Pending-changes service: queueEdit/Create/Delete → applyOne/All →
rejectOne/All → rewindOne. Edit diffs stored as JSON {old, new}. All writes
queue before touching disk; apply re-validates the path guard.

5 write tools: edit_file, create_file, delete_file, apply_pending, rewind.
Registered alongside 25 read-only tools from BooChat (30 total, alpha-sorted).
Write tools use a module-level inference context for sql+sessionId injection.

Inference loop via workspace dependency: apps/coder imports
createInferenceRunner, createBroker, ALL_TOOLS from @boocode/server (dist/).
apps/server gains declaration: true + exports map with typed subpath entries.
No code duplication — one inference engine shared by both apps.

API routes: POST /api/sessions/:id/messages (user msg → inference), POST stop,
GET/POST pending-changes CRUD (5 endpoints), WebSocket session streaming.

Dockerfile updated to build apps/server first (coder depends on its .d.ts).
Health endpoint reports tool count: {"ok":true,"db":true,"tools":30}.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 01:53:38 +00:00
parent 006226cce5
commit ce31577d1e
23 changed files with 1236 additions and 5 deletions

View File

@@ -0,0 +1,126 @@
import type { FastifyInstance } from 'fastify';
import { z } from 'zod';
import type { Sql } from '../db.js';
import type { Broker } from '@boocode/server/broker';
import type { WsFrame } from '@boocode/server/ws-frames';
const SendBody = z.object({
content: z.string().min(1).max(64_000),
chat_id: z.string().uuid(),
});
interface InferenceApi {
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
hasActive: (chatId: string) => boolean;
}
export function registerMessageRoutes(
app: FastifyInstance,
sql: Sql,
broker: Broker,
inference: InferenceApi,
): void {
// POST /api/sessions/:sessionId/messages — send a user message + kick off inference
app.post<{ Params: { sessionId: string } }>(
'/api/sessions/:sessionId/messages',
async (req, reply) => {
const parsed = SendBody.safeParse(req.body);
if (!parsed.success) {
reply.code(400);
return { error: 'invalid body', details: parsed.error.flatten() };
}
const sessionId = req.params.sessionId;
const { content, chat_id: chatId } = parsed.data;
// Validate session exists
const sessionRows = await sql<{ id: string }[]>`
SELECT id FROM sessions WHERE id = ${sessionId}
`;
if (sessionRows.length === 0) {
reply.code(404);
return { error: 'session not found' };
}
// Validate chat belongs to session and is open
const chatRows = await sql<{ id: string; session_id: string }[]>`
SELECT id, session_id FROM chats WHERE id = ${chatId} AND session_id = ${sessionId} AND status = 'open'
`;
if (chatRows.length === 0) {
reply.code(404);
return { error: 'chat not found or not open in this session' };
}
// Reject if inference is already running on this chat
if (inference.hasActive(chatId)) {
reply.code(409);
return { error: 'inference already running on this chat' };
}
// Create user message + streaming assistant row in a transaction
const result = await sql.begin(async (tx) => {
const [userMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${content}, 'complete', clock_timestamp())
RETURNING id
`;
const [assistantMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chatId}`;
return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id };
});
// Publish user message frames so WS subscribers see it immediately
broker.publishFrame(sessionId, {
type: 'message_started',
message_id: result.user_message_id,
chat_id: chatId,
role: 'user',
} as unknown as WsFrame);
broker.publishFrame(sessionId, {
type: 'delta',
message_id: result.user_message_id,
chat_id: chatId,
content,
} as unknown as WsFrame);
broker.publishFrame(sessionId, {
type: 'message_complete',
message_id: result.user_message_id,
chat_id: chatId,
} as unknown as WsFrame);
// Enqueue inference — the runner will stream assistant deltas via broker
inference.enqueue(sessionId, chatId, result.assistant_message_id, 'default');
reply.code(202);
return result;
},
);
// POST /api/sessions/:sessionId/stop — cancel active inference
app.post<{ Params: { sessionId: string } }>(
'/api/sessions/:sessionId/stop',
async (req, reply) => {
const sessionId = req.params.sessionId;
// Find active chats in this session
const chats = await sql<{ id: string }[]>`
SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open'
`;
let cancelled = false;
for (const chat of chats) {
if (inference.hasActive(chat.id)) {
cancelled = await inference.cancel(sessionId, chat.id);
break;
}
}
return { cancelled };
},
);
}

View File

@@ -0,0 +1,121 @@
import type { FastifyInstance } from 'fastify';
import type { Sql } from '../db.js';
import {
listPending,
applyOne,
applyAll,
rejectOne,
rewindOne,
} from '../services/pending_changes.js';
/**
* Resolve project root from a session's project path.
*/
async function resolveProjectRoot(sql: Sql, sessionId: string): Promise<string | null> {
const rows = await sql<{ path: string }[]>`
SELECT p.path FROM sessions s
JOIN projects p ON s.project_id = p.id
WHERE s.id = ${sessionId}
`;
return rows.length > 0 ? rows[0]!.path : null;
}
/**
* Resolve project root from a pending change's session.
*/
async function resolveProjectRootForChange(sql: Sql, changeId: string): Promise<string | null> {
const rows = await sql<{ path: string }[]>`
SELECT p.path FROM pending_changes pc
JOIN sessions s ON pc.session_id = s.id
JOIN projects p ON s.project_id = p.id
WHERE pc.id = ${changeId}
`;
return rows.length > 0 ? rows[0]!.path : null;
}
export function registerPendingRoutes(app: FastifyInstance, sql: Sql): void {
// GET /api/sessions/:sessionId/pending — list pending changes for a session
app.get<{ Params: { sessionId: string } }>(
'/api/sessions/:sessionId/pending',
async (req, reply) => {
const sessionId = req.params.sessionId;
const session = await sql<{ id: string }[]>`SELECT id FROM sessions WHERE id = ${sessionId}`;
if (session.length === 0) {
reply.code(404);
return { error: 'session not found' };
}
const pending = await listPending(sql, sessionId);
return pending;
},
);
// POST /api/sessions/:sessionId/pending/apply — apply all pending changes
app.post<{ Params: { sessionId: string } }>(
'/api/sessions/:sessionId/pending/apply',
async (req, reply) => {
const sessionId = req.params.sessionId;
const projectRoot = await resolveProjectRoot(sql, sessionId);
if (!projectRoot) {
reply.code(404);
return { error: 'session or project not found' };
}
const results = await applyAll(sql, sessionId, projectRoot);
return { results };
},
);
// POST /api/pending/:id/apply — apply a single pending change
app.post<{ Params: { id: string } }>(
'/api/pending/:id/apply',
async (req, reply) => {
const changeId = req.params.id;
const projectRoot = await resolveProjectRootForChange(sql, changeId);
if (!projectRoot) {
reply.code(404);
return { error: 'pending change or project not found' };
}
const result = await applyOne(sql, changeId, projectRoot);
if (!result.success) {
reply.code(422);
}
return result;
},
);
// POST /api/pending/:id/reject — reject a single pending change
app.post<{ Params: { id: string } }>(
'/api/pending/:id/reject',
async (req, reply) => {
const changeId = req.params.id;
await rejectOne(sql, changeId);
return { ok: true };
},
);
// POST /api/pending/:id/rewind — rewind (undo) an applied change
app.post<{ Params: { id: string } }>(
'/api/pending/:id/rewind',
async (req, reply) => {
const changeId = req.params.id;
const projectRoot = await resolveProjectRootForChange(sql, changeId);
if (!projectRoot) {
reply.code(404);
return { error: 'pending change or project not found' };
}
const result = await rewindOne(sql, changeId, projectRoot);
if (!result.success) {
reply.code(422);
}
return result;
},
);
}

View File

@@ -0,0 +1,51 @@
import type { FastifyInstance } from 'fastify';
import type { Sql } from '../db.js';
import type { Broker } from '@boocode/server/broker';
export function registerWebSocket(
app: FastifyInstance,
sql: Sql,
broker: Broker,
): void {
// Per-session streaming WebSocket. Clients connect here to receive live
// inference frames (deltas, tool_calls, tool_results, message_complete).
app.get<{ Params: { sessionId: string } }>(
'/api/ws/sessions/:sessionId',
{ websocket: true },
async (socket, req) => {
const sessionId = req.params.sessionId;
// Validate session exists
const session = await sql<{ id: string }[]>`SELECT id FROM sessions WHERE id = ${sessionId}`;
if (session.length === 0) {
socket.send(JSON.stringify({ type: 'error', error: 'session not found' }));
socket.close(1008, 'session not found');
return;
}
// Send snapshot of existing messages so client can hydrate
const messages = await sql<Record<string, unknown>[]>`
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at, metadata,
summary, tail_start_id, compacted_at
FROM messages_with_parts
WHERE session_id = ${sessionId}
ORDER BY created_at ASC, id ASC
`;
socket.send(JSON.stringify({ type: 'snapshot', messages }));
// Subscribe to broker for live frames
const unsubscribe = broker.subscribe(sessionId, (frame) => {
if (socket.readyState !== socket.OPEN) return;
try {
socket.send(JSON.stringify(frame));
} catch (err) {
app.log.warn({ err, sessionId }, 'ws send failed');
}
});
socket.on('close', () => unsubscribe());
socket.on('error', () => unsubscribe());
},
);
}