import type { Sql } from '../db.js'; import type { FastifyBaseLogger } from 'fastify'; import type { Broker } from '@boocode/server/broker'; import type { Config } from '../config.js'; interface InferenceRunner { enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void; cancel: (sessionId: string, chatId: string) => Promise; hasActive: (chatId: string) => boolean; } interface Deps { sql: Sql; inference: InferenceRunner; broker: Broker; log: FastifyBaseLogger; config: Config; } const POLL_INTERVAL_MS = 5_000; const COMPLETION_POLL_MS = 2_000; export function createDispatcher(deps: Deps): { start(): void; stop(): Promise } { const { sql, inference, log, config } = deps; let timer: ReturnType | null = null; let running = false; let stopping = false; let inflightPromise: Promise | null = null; async function poll(): Promise { if (running || stopping) return; // Grab one pending task const rows = await sql<{ id: string; project_id: string; input: string; agent: string | null; model: string | null }[]>` SELECT id, project_id, input, agent, model FROM tasks WHERE state = 'pending' ORDER BY created_at LIMIT 1 `; if (rows.length === 0) return; const task = rows[0]!; running = true; inflightPromise = runTask(task).finally(() => { running = false; inflightPromise = null; }); } async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise { const taskId = task.id; log.info({ taskId }, 'dispatcher: starting task'); try { // Mark running await sql` UPDATE tasks SET state = 'running', started_at = clock_timestamp(), execution_path = 'native' WHERE id = ${taskId} `; // Create session + chat for this task const model = task.model ?? config.DEFAULT_MODEL; const sessionName = 'Task: ' + task.input.slice(0, 40); const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${task.project_id}, ${sessionName}, ${model}, 'open') RETURNING id `; const sessionId = session!.id; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'Task execution', 'open') RETURNING id `; const chatId = chat!.id; // Link task to session await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; // Create user message + streaming assistant await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) RETURNING id `; const [assistantMsg] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; const assistantId = assistantMsg!.id; // Enqueue inference inference.enqueue(sessionId, chatId, assistantId, 'default'); // Wait for inference to complete (poll message status) const finalStatus = await waitForCompletion(assistantId); if (stopping) { // Graceful shutdown — mark cancelled await sql` UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId} `; return; } if (finalStatus === 'complete') { // Grab assistant content for output_summary const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} `; const summary = (msg?.content ?? '').slice(0, 500); await sql` UPDATE tasks SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary} WHERE id = ${taskId} `; log.info({ taskId }, 'dispatcher: task completed'); } else { // failed or cancelled const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} `; const summary = (msg?.content ?? 'Inference failed').slice(0, 500); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary} WHERE id = ${taskId} `; log.warn({ taskId, finalStatus }, 'dispatcher: task failed'); } } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); log.error({ taskId, err: errMsg }, 'dispatcher: task error'); await sql` UPDATE tasks SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} WHERE id = ${taskId} `.catch(() => {}); // best-effort } } async function waitForCompletion(assistantId: string): Promise { // Poll until the assistant message is no longer streaming for (;;) { if (stopping) return 'cancelled'; const [row] = await sql<{ status: string }[]>` SELECT status FROM messages WHERE id = ${assistantId} `; const status = row?.status ?? 'failed'; if (status !== 'streaming') return status; await sleep(COMPLETION_POLL_MS); } } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } return { start() { log.info('dispatcher: starting poll loop'); timer = setInterval(() => { poll().catch((err) => { log.error({ err }, 'dispatcher: poll error'); }); }, POLL_INTERVAL_MS); }, async stop() { stopping = true; if (timer) { clearInterval(timer); timer = null; } if (inflightPromise) { log.info('dispatcher: waiting for in-flight task'); await inflightPromise; } log.info('dispatcher: stopped'); }, }; }