import Fastify from 'fastify'; import fastifyStatic from '@fastify/static'; import fastifyWebsocket from '@fastify/websocket'; import { existsSync } from 'node:fs'; import { resolve } from 'node:path'; import { loadConfig } from './config.js'; import { getSql, applySchema, pingDb, closeDb } from './db.js'; import { registerProjectRoutes } from './routes/projects.js'; import { registerSessionRoutes } from './routes/sessions.js'; import { registerSettingsRoutes } from './routes/settings.js'; import { registerMessageRoutes } from './routes/messages.js'; import { registerArtifactRoutes } from './routes/artifacts.js'; import { registerChatRoutes } from './routes/chats.js'; import { registerSidebarRoutes } from './routes/sidebar.js'; import { registerWebSocket } from './routes/ws.js'; import { registerModelRoutes } from './routes/models.js'; import { registerAgentRoutes } from './routes/agents.js'; import { registerSkillsRoutes } from './routes/skills.js'; import { registerToolsRoutes } from './routes/tools.js'; import { createInferenceRunner } from './services/inference/index.js'; import { createBroker } from './services/broker.js'; import { listSkills } from './services/skills.js'; import * as compaction from './services/compaction.js'; import { configureModelContext } from './services/model-context.js'; import { cleanupTruncations } from './services/truncate.js'; async function main() { const config = loadConfig(); const app = Fastify({ logger: { level: config.LOG_LEVEL }, }); // Allow empty JSON bodies on POSTs that don't take a body (archive, unarchive, stop, etc.). // Default Fastify parser throws FST_ERR_CTP_EMPTY_JSON_BODY on empty string. app.removeContentTypeParser(['application/json']); app.addContentTypeParser('application/json', { parseAs: 'string' }, (_req, body, done) => { const str = (body as string) ?? ''; if (str.trim().length === 0) { done(null, {}); return; } try { done(null, JSON.parse(str)); } catch (err) { done(err as Error, undefined); } }); const sql = getSql(config); await applySchema(sql); app.log.info('database schema applied'); const swept = await sql<{ count: string }[]>` WITH swept AS ( UPDATE messages SET status = 'failed' WHERE status = 'streaming' AND created_at < NOW() - INTERVAL '5 minutes' RETURNING id ) SELECT count(*)::text AS count FROM swept `; const sweptCount = Number(swept[0]?.count ?? 0); if (sweptCount > 0) { app.log.info({ sweptCount }, 'swept stale streaming messages to failed'); } // v1.11.3: tell the model-context cache where llama-swap lives. Cache // lookups go to ${LLAMA_SWAP_URL}/upstream//props to read // default_generation_settings.n_ctx — the value persisted as messages.ctx_max. configureModelContext({ llamaSwapUrl: config.LLAMA_SWAP_URL }); await app.register(fastifyWebsocket); app.get('/api/health', async () => { const dbOk = await pingDb(sql); return { status: dbOk ? 'ok' : 'degraded', db: dbOk }; }); const broker = createBroker(app.log); registerProjectRoutes(app, sql, config, broker); registerSessionRoutes(app, sql, config, broker); registerSettingsRoutes(app, sql); registerModelRoutes(app, config); registerAgentRoutes(app, sql); registerSidebarRoutes(app, sql); registerChatRoutes(app, sql, broker); registerToolsRoutes(app, sql); // Batch 9.6: warm the skills cache at boot and surface the count. Empty or // missing /data/skills is non-fatal — the skill tools just return empty. try { const skills = await listSkills(); app.log.info(`skills loaded: ${skills.length}`); } catch (err) { app.log.warn({ err }, 'skills boot walk failed'); } const inference = createInferenceRunner( { sql, config, log: app.log, publish: (sessionId, frame) => { // v1.13.11-b: route through the typed publishFrame so the broker's // Zod gate validates every inference frame before delivery. broker.publishFrame(sessionId, frame as unknown as import('./types/ws-frames.js').WsFrame); }, // v1.11: broker handle for compaction.process to publish 'compacted' // frames on the per-session channel. Inference's regular publish path // is bound to (sessionId, InferenceFrame); compaction publishes a // different frame shape, so it goes through the raw broker. broker, }, (user, frame) => { broker.publishUserFrame(user, frame as unknown as import('./types/ws-frames.js').WsFrame); } ); registerMessageRoutes(app, sql, config, broker, { enqueueInference: (sessionId, chatId, assistantId, user) => { inference.enqueue(sessionId, chatId, assistantId, user); }, // v1.11: synchronous compaction. Awaits the LLM call inside the route's // request lifecycle; the new summary row arrives via the WS 'compacted' // frame published from inside compaction.process. We let the error // bubble up so the route can reply 500 — manual /compact failures // should be loud (the user just clicked a button). runCompaction: (chatId) => compaction.process({ sql, config, log: app.log, broker, chatId }), cancelInference: async (sessionId, chatId) => { return inference.cancel(sessionId, chatId); }, hasActiveInference: (chatId) => inference.hasActive(chatId), publishUserMessage: (sessionId, chatId, userMessageId, content) => { broker.publishFrame(sessionId, { type: 'message_started', message_id: userMessageId, chat_id: chatId, role: 'user', }); broker.publishFrame(sessionId, { type: 'delta', message_id: userMessageId, chat_id: chatId, content, }); broker.publishFrame(sessionId, { type: 'message_complete', message_id: userMessageId, chat_id: chatId, }); }, publishMessagesDeleted: (sessionId, chatId, messageIds) => { broker.publishFrame(sessionId, { type: 'messages_deleted', message_ids: messageIds, chat_id: chatId, }); }, publishSessionFrame: (sessionId, frame) => { broker.publishFrame(sessionId, frame as import('./types/ws-frames.js').WsFrame); }, }); registerArtifactRoutes(app, sql); registerSkillsRoutes(app, sql, { enqueueInference: (sessionId, chatId, assistantId, user) => { inference.enqueue(sessionId, chatId, assistantId, user); }, publishUserMessage: (sessionId, chatId, userMessageId, content) => { broker.publishFrame(sessionId, { type: 'message_started', message_id: userMessageId, chat_id: chatId, role: 'user', }); broker.publishFrame(sessionId, { type: 'delta', message_id: userMessageId, chat_id: chatId, content, }); broker.publishFrame(sessionId, { type: 'message_complete', message_id: userMessageId, chat_id: chatId, }); }, publishSessionFrame: (sessionId, frame) => { broker.publishFrame(sessionId, frame as import('./types/ws-frames.js').WsFrame); }, }); registerWebSocket(app, sql, broker); const webDist = process.env.WEB_DIST_PATH ?? resolve(process.cwd(), '../web/dist'); if (existsSync(webDist)) { await app.register(fastifyStatic, { root: webDist, prefix: '/', wildcard: false, }); app.setNotFoundHandler((req, reply) => { if (req.url.startsWith('/api')) { reply.code(404).send({ error: 'not found' }); return; } reply.sendFile('index.html'); }); app.log.info(`serving static frontend from ${webDist}`); } // v1.13.3: periodic in-process sweeper for streaming rows orphaned by a // mid-session crash. The boot sweep (above) only fires once at startup; // this loop catches the in-flight case. 60s cadence + 5-min threshold // matches the boot sweep so behavior is consistent. Publishes // chat_status='idle' on the user channel so the UI dot drops without a // refresh — same pattern as handleAbortOrError. const SWEEP_INTERVAL_MS = 60_000; const sweepStaleStreaming = async (): Promise => { try { const rows = await sql<{ id: string; chat_id: string }[]>` UPDATE messages SET status = 'failed', finished_at = clock_timestamp() WHERE status = 'streaming' AND created_at < NOW() - INTERVAL '5 minutes' RETURNING id, chat_id `; if (rows.length === 0) return; app.log.warn( { swept: rows.length, ids: rows.map((r) => r.id) }, 'swept stale streaming rows', ); const seenChats = new Set(); const now = new Date().toISOString(); for (const row of rows) { if (seenChats.has(row.chat_id)) continue; seenChats.add(row.chat_id); broker.publishUserFrame('default', { type: 'chat_status', chat_id: row.chat_id, status: 'idle', at: now, }); } } catch (err) { app.log.error({ err }, 'stuck-row sweeper failed'); } }; // v1.13.5: truncation cleanup rides the same cadence — 60s tick reaps // tmpfs files past the 7-day TTL plus any orphans whose owning part has // been pruned (v1.13.4) or deleted. No-op when the dir is empty. const sweepTimer = setInterval(() => { void sweepStaleStreaming(); void cleanupTruncations({ sql, log: app.log }); }, SWEEP_INTERVAL_MS); app.addHook('onClose', async () => { clearInterval(sweepTimer); }); const shutdown = async (signal: string) => { app.log.info(`received ${signal}, shutting down`); try { await app.close(); await closeDb(); process.exit(0); } catch (err) { app.log.error(err); process.exit(1); } }; process.on('SIGINT', () => void shutdown('SIGINT')); process.on('SIGTERM', () => void shutdown('SIGTERM')); // Bound to 0.0.0.0 intentionally. Public access goes through Caddy → Authelia. // Direct Tailscale access (100.114.205.53:9500) is unauthenticated by design; // the threat model treats Tailnet membership as the trust boundary. await app.listen({ port: config.PORT, host: config.HOST }); app.log.info(`boocode server listening on http://${config.HOST}:${config.PORT}`); } main().catch((err) => { console.error('Fatal startup error:', err); process.exit(1); });