Files
boocode/apps/coder/src/index.ts
indifferentketchup f32fd928b3 feat: post-review backlog hardening (cancel/parser/stall/history/9502)
Five independent items from the post-review backlog. F1: Stop on an external
agent task now aborts the running child via a per-task AbortController registry
reachable from the cancel route, and finalizes the assistant message as
cancelled (fixing two latent bugs — catch blocks left the message streaming,
and warm success-paths wrote complete on an aborted turn); warm pools/worktrees
are preserved and the native path is unchanged. F2/F3: prune the tool-call
parser to its two load-bearing exports (unexport eight zero-caller symbols, add
a gate test for the <invoke>-as-text fallback) and route placeholder-rejection
logging through pino. F6: a 90s per-chunk stall-timeout wraps native inference's
fullStream via AbortSignal.any so a hung stream finalizes the message instead of
hanging — no retry (a pure classifyStreamError helper is added). F7: a read-only
view_session_history MCP tool (newest-N, chronological). F9: retire the unused
apps/coder/web :9502 fallback SPA, keeping every API/WS/health/MCP route.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 02:23:11 +00:00

280 lines
10 KiB
TypeScript

import Fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';
import { loadConfig } from './config.js';
import { getSql, applySchema, pingDb, closeDb } from './db.js';
import { startMcpServer } from './services/mcp-server.js';
// v2.0.0 Phase 2B: workspace dependency on @boocode/server — reuse the
// inference loop, broker, and tool registry without duplication.
import { createInferenceRunner } from '@boocode/server/inference';
import { createBroker } from '@boocode/server/broker';
import { appendMcpTools, ALL_TOOLS } from '@boocode/server/tools';
import type { Config as ServerConfig } from '@boocode/server/config';
import type { WsFrame } from '@boocode/contracts/ws-frames';
// v2.0.0 Phase 2C: write tools + adapter for BooChat ToolDef compatibility.
import { WRITE_TOOLS } from './services/tools/index.js';
import { adaptWriteTool } from './services/tools/adapter.js';
import { setInferenceContext, clearInferenceContext } from './services/tools/inference_context.js';
// Routes
import { registerMessageRoutes } from './routes/messages.js';
import { registerSkillRoutes } from './routes/skills.js';
import { registerPendingRoutes } from './routes/pending.js';
import { registerCheckpointRoutes } from './routes/checkpoints.js';
import { registerAgentSessionRoutes } from './routes/agent-sessions.js';
import { registerTaskRoutes } from './routes/tasks.js';
import { registerInboxRoutes } from './routes/inbox.js';
import { registerStatsRoutes } from './routes/stats.js';
import { registerArenaRoutes } from './routes/arena.js';
import { registerProviderRoutes } from './routes/providers.js';
import { registerWorktreeSafetyRoutes } from './routes/worktree-safety.js';
import { registerLifecycleRoutes } from './routes/lifecycle.js';
import { registerWebSocket } from './routes/ws.js';
// Phase 4: dispatcher + agent probe
import { createDispatcher } from './services/dispatcher.js';
import { agentPool } from './services/agent-pool.js';
import { createOrphanWorktreeReaper } from './services/orphan-worktree-reaper.js';
import { probeAgents } from './services/agent-probe.js';
import { getProviderSnapshot, persistProbedModels } from './services/provider-snapshot.js';
import { setPermissionHooks } from './services/permission-waiter.js';
import { publishAgentStatus } from './services/agent-status-publish.js';
import { homedir } from 'node:os';
async function main() {
// MCP mode: stdio transport, no HTTP server
if (process.argv.includes('--mcp')) {
const config = loadConfig();
const sql = getSql(config);
await applySchema(sql);
await startMcpServer(sql);
return;
}
const config = loadConfig();
const app = Fastify({
logger: { level: config.LOG_LEVEL },
});
// Allow empty JSON bodies (same pattern as apps/server).
app.removeContentTypeParser(['application/json']);
app.addContentTypeParser('application/json', { parseAs: 'string' }, (_req, body, done) => {
const str = (body as string) ?? '';
if (str.trim().length === 0) {
done(null, {});
return;
}
try {
done(null, JSON.parse(str));
} catch (err) {
done(err as Error, undefined);
}
});
const sql = getSql(config);
await applySchema(sql);
app.log.info('database schema applied');
// Broker: in-memory pub/sub for session + user channel streaming.
const broker = createBroker(app.log);
// agent-status-normalize (#10): the permission hooks carry only taskId +
// sessionId, but the tasks row holds the (chat_id, agent) pair the status frame
// is keyed on. Resolve it best-effort so a blocked/working status accompanies
// every permission_requested/permission_resolved. Returns null when the task
// lacks a chat_id or agent (sessionless creators) — we simply skip the status.
const resolveChatAgent = async (
taskId: string,
): Promise<{ chatId: string; agent: string } | null> => {
const [row] = await sql<{ chat_id: string | null; agent: string | null }[]>`
SELECT chat_id, agent FROM tasks WHERE id = ${taskId}
`;
if (!row?.chat_id || !row.agent) return null;
return { chatId: row.chat_id, agent: row.agent };
};
setPermissionHooks({
onPrompt: async (prompt) => {
await sql`
UPDATE tasks SET state = 'blocked' WHERE id = ${prompt.taskId} AND state = 'running'
`;
broker.publishFrame(prompt.sessionId, {
type: 'permission_requested',
task_id: prompt.taskId,
session_id: prompt.sessionId,
kind: prompt.kind,
tool_title: prompt.toolTitle,
...(prompt.input ? { input: prompt.input } : {}),
options: prompt.options.map((o) => ({ option_id: o.optionId, label: o.label })),
} as WsFrame);
// #10: agent is blocked on a human decision.
const ca = await resolveChatAgent(prompt.taskId).catch(() => null);
if (ca) {
publishAgentStatus(
broker.publishFrame,
prompt.sessionId,
ca.chatId,
ca.agent,
'blocked',
'permission_request',
);
}
},
onResolved: async (taskId, sessionId) => {
await sql`
UPDATE tasks SET state = 'running' WHERE id = ${taskId} AND state = 'blocked'
`;
broker.publishFrame(sessionId, {
type: 'permission_resolved',
task_id: taskId,
session_id: sessionId,
} as WsFrame);
// #10: human responded — agent resumes work.
const ca = await resolveChatAgent(taskId).catch(() => null);
if (ca) {
publishAgentStatus(
broker.publishFrame,
sessionId,
ca.chatId,
ca.agent,
'working',
'permission_resolved',
);
}
},
});
// --- Tool registry extension ---
// Append BooCoder write tools (adapted to BooChat's ToolDef interface) to
// the shared ALL_TOOLS registry. appendMcpTools re-sorts and rebuilds
// TOOLS_BY_NAME so tool-phase.ts dispatch sees the full set.
const adaptedWriteTools = WRITE_TOOLS.map((t) => adaptWriteTool(t));
appendMcpTools(adaptedWriteTools);
app.log.info(`tool registry: ${ALL_TOOLS.length} tools loaded (${WRITE_TOOLS.length} write tools)`);
// Inference runner: same engine as BooChat, uses ALL_TOOLS (which includes
// the appended write tools) for tool dispatch.
const inference = createInferenceRunner(
{
sql,
config: config as unknown as ServerConfig,
log: app.log,
publish: (sessionId, frame) => {
broker.publishFrame(sessionId, frame as unknown as WsFrame);
},
broker,
},
(user, frame) => {
broker.publishUserFrame(user, frame as unknown as WsFrame);
}
);
// Wrap the inference runner to set/clear the write-tool context around each run.
// The inference runner calls enqueue() which fires asynchronously — we hook
// into the enqueue to set context before the run starts.
const inferenceApi = {
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => {
// Set the inference context so write tools can access sql + sessionId.
// The context persists for the duration of the inference run. Since
// BooCoder is single-user and runs one inference at a time per session,
// this module-level state is safe.
setInferenceContext({ sql, sessionId, taskId: null });
inference.enqueue(sessionId, chatId, assistantId, user);
},
cancel: async (sessionId: string, chatId: string) => {
const result = await inference.cancel(sessionId, chatId);
clearInferenceContext();
return result;
},
hasActive: (chatId: string) => inference.hasActive(chatId),
};
// Register WebSocket support
await app.register(fastifyWebsocket);
// Health endpoint
app.get('/api/health', async (_req, reply) => {
const dbOk = await pingDb(sql);
const status = dbOk ? 200 : 503;
return reply.status(status).send({
ok: dbOk,
db: dbOk,
tools: ALL_TOOLS.length,
});
});
// Phase 4: probe available agents on startup
await probeAgents(sql, app.log);
// Warm provider snapshot in background (ACP cold probes + model merges)
void getProviderSnapshot(sql, config, homedir(), true)
.then((entries) => persistProbedModels(sql, entries, app.log))
.catch((err) => {
app.log.warn(
{ err: err instanceof Error ? err.message : String(err) },
'provider-snapshot: warm failed',
);
});
// Phase 4: dispatcher — polls tasks table and runs inference
const dispatcher = createDispatcher({ sql, inference: inferenceApi, broker, log: app.log, config });
dispatcher.start();
// v2.6 Phase 3: configure + start the agent-pool lifecycle sweep (idle-TTL +
// LRU-cap eviction of warm backends, plus each backend's proactive health probe)
// and the orphan-worktree reaper. Both run on the same periodic timer.
agentPool.configure({
idleTtlMs: config.AGENT_POOL_IDLE_TTL_MS,
maxLive: config.AGENT_POOL_MAX_LIVE,
sweepIntervalMs: config.LIFECYCLE_SWEEP_INTERVAL_MS,
log: app.log,
});
agentPool.startReaper(app.log);
const orphanReaper = createOrphanWorktreeReaper({
sql,
log: app.log,
intervalMs: config.LIFECYCLE_SWEEP_INTERVAL_MS,
graceMs: config.ORPHAN_WORKTREE_GRACE_MS,
});
orphanReaper.start();
app.addHook('onClose', async () => {
// stop() first so in-flight dispatcher turns settle, then stop the reapers and
// drain the pool (kills opencode server + warm ACP children).
await dispatcher.stop();
orphanReaper.stop();
await agentPool.dispose();
});
// Register routes
registerMessageRoutes(app, sql, broker, inferenceApi);
registerSkillRoutes(app, sql, broker, inferenceApi);
registerPendingRoutes(app, sql);
registerCheckpointRoutes(app, sql);
registerAgentSessionRoutes(app, sql);
registerTaskRoutes(app, sql, inferenceApi, dispatcher.cancelExternalTask);
registerInboxRoutes(app, sql);
registerStatsRoutes(app, sql);
registerArenaRoutes(app, sql);
registerProviderRoutes(app, sql, config);
registerWorktreeSafetyRoutes(app, sql);
registerLifecycleRoutes(app, sql);
registerWebSocket(app, sql, broker);
// Graceful shutdown
const shutdown = async () => {
app.log.info('shutting down');
await app.close();
await closeDb();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
await app.listen({ port: config.PORT, host: config.HOST });
app.log.info(`BooCoder listening on ${config.HOST}:${config.PORT}`);
}
main().catch((err) => {
console.error('fatal:', err);
process.exit(1);
});