Four independent items, all owed from prior dispatches.
- statement_timeout at the database level via:
ALTER DATABASE boocode SET statement_timeout = '30s';
Applied operationally; documented as a comment at the top of schema.sql
(ALTER DATABASE can't run inside a DO block, so it's not idempotent
inside applySchema). Re-apply after a volume reset.
- Tool registry alpha-sorted at module load. llama.cpp's prompt cache
hits on byte-identical prefixes; any reordering of the tool list near
the top of the system prompt would invalidate every cached turn.
Single-source sort at the ALL_TOOLS export so toolJsonSchemas() and
TOOLS_BY_NAME inherit the order automatically. New tools.test.ts
asserts the invariant; total tests 173 (was 172).
- Periodic in-process stuck-row sweeper. Runs every 60s, marks
'streaming' rows older than 5 minutes as 'failed', and publishes
chat_status='idle' on the user channel so the UI dot drops without a
refresh. Closes the mid-session crash UX gap; the v1.12.1 boot sweep
only fires once at startup, so sessions used to stay stuck until next
reboot. setInterval cleaned up via app.addHook('onClose'). Mirrors
handleAbortOrError's publish pattern.
- experimental_repairToolCall wired through AI SDK v6 streamText. Pass-
through implementation: log + return the original toolCall so the
stream keeps going. executeToolPhase's existing error paths (unknown
tool name → 'unknown tool: X' result; zod-reject → 'tool X rejected
— field: required') already surface bad calls to the model; the value
here is preventing the AI SDK from THROWING on parse errors and
killing the whole stream. Owed since v1.13.1-A.
Smoke verified:
- statement_timeout = '30s' confirmed via SHOW.
- Tool path normal flow intact (list_dir prompt → tool_call → result
→ final assistant). No malformed tool calls in the test run; repair
log will surface them when qwen3.6 actually emits one.
- Alpha order verified at runtime via the dist bundle: match: true.
- Sweeper logic not traffic-tested (no stuck rows to find), but the
SQL UPDATE + broker.publishUser pattern is identical to handleAbort
and the boot sweep — synthesis-only verification.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
270 lines
9.4 KiB
TypeScript
270 lines
9.4 KiB
TypeScript
import Fastify from 'fastify';
|
|
import fastifyStatic from '@fastify/static';
|
|
import fastifyWebsocket from '@fastify/websocket';
|
|
import { existsSync } from 'node:fs';
|
|
import { resolve } from 'node:path';
|
|
import { loadConfig } from './config.js';
|
|
import { getSql, applySchema, pingDb, closeDb } from './db.js';
|
|
|
|
import { registerProjectRoutes } from './routes/projects.js';
|
|
import { registerSessionRoutes } from './routes/sessions.js';
|
|
import { registerSettingsRoutes } from './routes/settings.js';
|
|
import { registerMessageRoutes } from './routes/messages.js';
|
|
import { registerChatRoutes } from './routes/chats.js';
|
|
import { registerSidebarRoutes } from './routes/sidebar.js';
|
|
import { registerWebSocket } from './routes/ws.js';
|
|
import { registerModelRoutes } from './routes/models.js';
|
|
import { registerAgentRoutes } from './routes/agents.js';
|
|
import { registerSkillsRoutes } from './routes/skills.js';
|
|
import { createInferenceRunner } from './services/inference/index.js';
|
|
import { createBroker } from './services/broker.js';
|
|
import { listSkills } from './services/skills.js';
|
|
import * as compaction from './services/compaction.js';
|
|
import { configureModelContext } from './services/model-context.js';
|
|
|
|
async function main() {
|
|
const config = loadConfig();
|
|
|
|
const app = Fastify({
|
|
logger: { level: config.LOG_LEVEL },
|
|
});
|
|
|
|
// Allow empty JSON bodies on POSTs that don't take a body (archive, unarchive, stop, etc.).
|
|
// Default Fastify parser throws FST_ERR_CTP_EMPTY_JSON_BODY on empty string.
|
|
app.removeContentTypeParser(['application/json']);
|
|
app.addContentTypeParser('application/json', { parseAs: 'string' }, (_req, body, done) => {
|
|
const str = (body as string) ?? '';
|
|
if (str.trim().length === 0) {
|
|
done(null, {});
|
|
return;
|
|
}
|
|
try {
|
|
done(null, JSON.parse(str));
|
|
} catch (err) {
|
|
done(err as Error, undefined);
|
|
}
|
|
});
|
|
|
|
const sql = getSql(config);
|
|
await applySchema(sql);
|
|
app.log.info('database schema applied');
|
|
|
|
const swept = await sql<{ count: string }[]>`
|
|
WITH swept AS (
|
|
UPDATE messages SET status = 'failed'
|
|
WHERE status = 'streaming' AND created_at < NOW() - INTERVAL '5 minutes'
|
|
RETURNING id
|
|
) SELECT count(*)::text AS count FROM swept
|
|
`;
|
|
const sweptCount = Number(swept[0]?.count ?? 0);
|
|
if (sweptCount > 0) {
|
|
app.log.info({ sweptCount }, 'swept stale streaming messages to failed');
|
|
}
|
|
|
|
// v1.11.3: tell the model-context cache where llama-swap lives. Cache
|
|
// lookups go to ${LLAMA_SWAP_URL}/upstream/<model>/props to read
|
|
// default_generation_settings.n_ctx — the value persisted as messages.ctx_max.
|
|
configureModelContext({ llamaSwapUrl: config.LLAMA_SWAP_URL });
|
|
|
|
await app.register(fastifyWebsocket);
|
|
|
|
app.get('/api/health', async () => {
|
|
const dbOk = await pingDb(sql);
|
|
return { status: dbOk ? 'ok' : 'degraded', db: dbOk };
|
|
});
|
|
|
|
const broker = createBroker();
|
|
|
|
registerProjectRoutes(app, sql, config, broker);
|
|
registerSessionRoutes(app, sql, config, broker);
|
|
registerSettingsRoutes(app, sql);
|
|
registerModelRoutes(app, config);
|
|
registerAgentRoutes(app, sql);
|
|
registerSidebarRoutes(app, sql);
|
|
registerChatRoutes(app, sql, broker);
|
|
|
|
// Batch 9.6: warm the skills cache at boot and surface the count. Empty or
|
|
// missing /data/skills is non-fatal — the skill tools just return empty.
|
|
try {
|
|
const skills = await listSkills();
|
|
app.log.info(`skills loaded: ${skills.length}`);
|
|
} catch (err) {
|
|
app.log.warn({ err }, 'skills boot walk failed');
|
|
}
|
|
|
|
const inference = createInferenceRunner(
|
|
{
|
|
sql,
|
|
config,
|
|
log: app.log,
|
|
publish: (sessionId, frame) => {
|
|
broker.publish(sessionId, frame as unknown as Record<string, unknown> & { type: string });
|
|
},
|
|
// v1.11: broker handle for compaction.process to publish 'compacted'
|
|
// frames on the per-session channel. Inference's regular publish path
|
|
// is bound to (sessionId, InferenceFrame); compaction publishes a
|
|
// different frame shape, so it goes through the raw broker.
|
|
broker,
|
|
},
|
|
(user, frame) => {
|
|
broker.publishUser(user, frame as unknown as Record<string, unknown> & { type: string });
|
|
}
|
|
);
|
|
registerMessageRoutes(app, sql, {
|
|
enqueueInference: (sessionId, chatId, assistantId, user) => {
|
|
inference.enqueue(sessionId, chatId, assistantId, user);
|
|
},
|
|
// v1.11: synchronous compaction. Awaits the LLM call inside the route's
|
|
// request lifecycle; the new summary row arrives via the WS 'compacted'
|
|
// frame published from inside compaction.process. We let the error
|
|
// bubble up so the route can reply 500 — manual /compact failures
|
|
// should be loud (the user just clicked a button).
|
|
runCompaction: (chatId) =>
|
|
compaction.process({ sql, config, log: app.log, broker, chatId }),
|
|
cancelInference: async (sessionId, chatId) => {
|
|
return inference.cancel(sessionId, chatId);
|
|
},
|
|
hasActiveInference: (chatId) => inference.hasActive(chatId),
|
|
publishUserMessage: (sessionId, chatId, userMessageId, content) => {
|
|
broker.publish(sessionId, {
|
|
type: 'message_started',
|
|
message_id: userMessageId,
|
|
chat_id: chatId,
|
|
role: 'user',
|
|
});
|
|
broker.publish(sessionId, {
|
|
type: 'delta',
|
|
message_id: userMessageId,
|
|
chat_id: chatId,
|
|
content,
|
|
});
|
|
broker.publish(sessionId, {
|
|
type: 'message_complete',
|
|
message_id: userMessageId,
|
|
chat_id: chatId,
|
|
});
|
|
},
|
|
publishMessagesDeleted: (sessionId, chatId, messageIds) => {
|
|
broker.publish(sessionId, {
|
|
type: 'messages_deleted',
|
|
message_ids: messageIds,
|
|
chat_id: chatId,
|
|
});
|
|
},
|
|
publishSessionFrame: (sessionId, frame) => {
|
|
broker.publish(sessionId, frame);
|
|
},
|
|
});
|
|
registerSkillsRoutes(app, sql, {
|
|
enqueueInference: (sessionId, chatId, assistantId, user) => {
|
|
inference.enqueue(sessionId, chatId, assistantId, user);
|
|
},
|
|
publishUserMessage: (sessionId, chatId, userMessageId, content) => {
|
|
broker.publish(sessionId, {
|
|
type: 'message_started',
|
|
message_id: userMessageId,
|
|
chat_id: chatId,
|
|
role: 'user',
|
|
});
|
|
broker.publish(sessionId, {
|
|
type: 'delta',
|
|
message_id: userMessageId,
|
|
chat_id: chatId,
|
|
content,
|
|
});
|
|
broker.publish(sessionId, {
|
|
type: 'message_complete',
|
|
message_id: userMessageId,
|
|
chat_id: chatId,
|
|
});
|
|
},
|
|
publishSessionFrame: (sessionId, frame) => {
|
|
broker.publish(sessionId, frame);
|
|
},
|
|
});
|
|
registerWebSocket(app, sql, broker);
|
|
|
|
const webDist = process.env.WEB_DIST_PATH ?? resolve(process.cwd(), '../web/dist');
|
|
if (existsSync(webDist)) {
|
|
await app.register(fastifyStatic, {
|
|
root: webDist,
|
|
prefix: '/',
|
|
wildcard: false,
|
|
});
|
|
app.setNotFoundHandler((req, reply) => {
|
|
if (req.url.startsWith('/api')) {
|
|
reply.code(404).send({ error: 'not found' });
|
|
return;
|
|
}
|
|
reply.sendFile('index.html');
|
|
});
|
|
app.log.info(`serving static frontend from ${webDist}`);
|
|
}
|
|
|
|
// v1.13.3: periodic in-process sweeper for streaming rows orphaned by a
|
|
// mid-session crash. The boot sweep (above) only fires once at startup;
|
|
// this loop catches the in-flight case. 60s cadence + 5-min threshold
|
|
// matches the boot sweep so behavior is consistent. Publishes
|
|
// chat_status='idle' on the user channel so the UI dot drops without a
|
|
// refresh — same pattern as handleAbortOrError.
|
|
const SWEEP_INTERVAL_MS = 60_000;
|
|
const sweepStaleStreaming = async (): Promise<void> => {
|
|
try {
|
|
const rows = await sql<{ id: string; chat_id: string }[]>`
|
|
UPDATE messages
|
|
SET status = 'failed', finished_at = clock_timestamp()
|
|
WHERE status = 'streaming'
|
|
AND created_at < NOW() - INTERVAL '5 minutes'
|
|
RETURNING id, chat_id
|
|
`;
|
|
if (rows.length === 0) return;
|
|
app.log.warn(
|
|
{ swept: rows.length, ids: rows.map((r) => r.id) },
|
|
'swept stale streaming rows',
|
|
);
|
|
const seenChats = new Set<string>();
|
|
const now = new Date().toISOString();
|
|
for (const row of rows) {
|
|
if (seenChats.has(row.chat_id)) continue;
|
|
seenChats.add(row.chat_id);
|
|
broker.publishUser('default', {
|
|
type: 'chat_status',
|
|
chat_id: row.chat_id,
|
|
status: 'idle',
|
|
at: now,
|
|
});
|
|
}
|
|
} catch (err) {
|
|
app.log.error({ err }, 'stuck-row sweeper failed');
|
|
}
|
|
};
|
|
const sweepTimer = setInterval(() => { void sweepStaleStreaming(); }, SWEEP_INTERVAL_MS);
|
|
app.addHook('onClose', async () => { clearInterval(sweepTimer); });
|
|
|
|
const shutdown = async (signal: string) => {
|
|
app.log.info(`received ${signal}, shutting down`);
|
|
try {
|
|
await app.close();
|
|
await closeDb();
|
|
process.exit(0);
|
|
} catch (err) {
|
|
app.log.error(err);
|
|
process.exit(1);
|
|
}
|
|
};
|
|
|
|
process.on('SIGINT', () => void shutdown('SIGINT'));
|
|
process.on('SIGTERM', () => void shutdown('SIGTERM'));
|
|
|
|
// Bound to 0.0.0.0 intentionally. Public access goes through Caddy → Authelia.
|
|
// Direct Tailscale access (100.114.205.53:9500) is unauthenticated by design;
|
|
// the threat model treats Tailnet membership as the trust boundary.
|
|
await app.listen({ port: config.PORT, host: config.HOST });
|
|
app.log.info(`boocode server listening on http://${config.HOST}:${config.PORT}`);
|
|
}
|
|
|
|
main().catch((err) => {
|
|
console.error('Fatal startup error:', err);
|
|
process.exit(1);
|
|
});
|