diff --git a/apps/coder/src/index.ts b/apps/coder/src/index.ts index 4de35df..78a69de 100644 --- a/apps/coder/src/index.ts +++ b/apps/coder/src/index.ts @@ -23,7 +23,11 @@ import { setInferenceContext, clearInferenceContext } from './services/tools/inf // Routes import { registerMessageRoutes } from './routes/messages.js'; import { registerPendingRoutes } from './routes/pending.js'; +import { registerTaskRoutes } from './routes/tasks.js'; import { registerWebSocket } from './routes/ws.js'; +// Phase 4: dispatcher + agent probe +import { createDispatcher } from './services/dispatcher.js'; +import { probeAgents } from './services/agent-probe.js'; async function main() { const config = loadConfig(); @@ -113,9 +117,18 @@ async function main() { }); }); + // Phase 4: probe available agents on startup + await probeAgents(sql, app.log); + + // Phase 4: dispatcher — polls tasks table and runs inference + const dispatcher = createDispatcher({ sql, inference: inferenceApi, broker, log: app.log, config }); + dispatcher.start(); + app.addHook('onClose', () => dispatcher.stop()); + // Register routes registerMessageRoutes(app, sql, broker, inferenceApi); registerPendingRoutes(app, sql); + registerTaskRoutes(app, sql, inferenceApi); registerWebSocket(app, sql, broker); // Serve static frontend (built web app). In production, the dist/ is diff --git a/apps/coder/src/routes/tasks.ts b/apps/coder/src/routes/tasks.ts new file mode 100644 index 0000000..021d1e3 --- /dev/null +++ b/apps/coder/src/routes/tasks.ts @@ -0,0 +1,138 @@ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import type { Sql } from '../db.js'; + +interface InferenceApi { + cancel: (sessionId: string, chatId: string) => Promise; +} + +const CreateBody = z.object({ + project_id: z.string().uuid(), + input: z.string().min(1).max(64_000), + agent: z.string().max(100).optional(), + model: z.string().max(200).optional(), +}); + +const ListQuery = z.object({ + state: z.enum(['pending', 'running', 'completed', 'failed', 'blocked', 'cancelled']).optional(), + project_id: z.string().uuid().optional(), +}); + +export function registerTaskRoutes(app: FastifyInstance, sql: Sql, inference: InferenceApi): void { + // POST /api/tasks — create a new task + app.post('/api/tasks', async (req, reply) => { + const parsed = CreateBody.safeParse(req.body); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid body', details: parsed.error.flatten() }; + } + + const { project_id, input, agent, model } = parsed.data; + + const [task] = await sql<{ id: string; state: string }[]>` + INSERT INTO tasks (project_id, input, agent, model) + VALUES (${project_id}, ${input}, ${agent ?? null}, ${model ?? null}) + RETURNING id, state + `; + + reply.code(201); + return { id: task!.id, state: task!.state }; + }); + + // GET /api/tasks — list tasks with optional filters + app.get('/api/tasks', async (req, _reply) => { + const parsed = ListQuery.safeParse(req.query); + if (!parsed.success) { + return { error: 'invalid query', details: parsed.error.flatten() }; + } + + const { state, project_id } = parsed.data; + + // Build query with optional filters + if (state && project_id) { + return sql` + SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at + FROM tasks + WHERE state = ${state} AND project_id = ${project_id} + ORDER BY created_at DESC + LIMIT 100 + `; + } else if (state) { + return sql` + SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at + FROM tasks + WHERE state = ${state} + ORDER BY created_at DESC + LIMIT 100 + `; + } else if (project_id) { + return sql` + SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at + FROM tasks + WHERE project_id = ${project_id} + ORDER BY created_at DESC + LIMIT 100 + `; + } else { + return sql` + SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at + FROM tasks + ORDER BY created_at DESC + LIMIT 100 + `; + } + }); + + // GET /api/tasks/:id — single task detail + app.get<{ Params: { id: string } }>('/api/tasks/:id', async (req, reply) => { + const rows = await sql` + SELECT id, project_id, parent_task_id, state, input, output_summary, agent, model, execution_path, worktree_path, session_id, cost_tokens, started_at, ended_at, created_at + FROM tasks + WHERE id = ${req.params.id} + `; + if (rows.length === 0) { + reply.code(404); + return { error: 'task not found' }; + } + return rows[0]; + }); + + // POST /api/tasks/:id/cancel — cancel a pending or running task + app.post<{ Params: { id: string } }>('/api/tasks/:id/cancel', async (req, reply) => { + const taskId = req.params.id; + + // Get current task state + session info + const rows = await sql<{ id: string; state: string; session_id: string | null }[]>` + SELECT id, state, session_id FROM tasks WHERE id = ${taskId} + `; + if (rows.length === 0) { + reply.code(404); + return { error: 'task not found' }; + } + + const task = rows[0]!; + if (task.state !== 'pending' && task.state !== 'running') { + reply.code(409); + return { error: `cannot cancel task in state '${task.state}'` }; + } + + // If running, try to cancel inference + if (task.state === 'running' && task.session_id) { + // Find active chat in the task's session + const chats = await sql<{ id: string }[]>` + SELECT id FROM chats WHERE session_id = ${task.session_id} AND status = 'open' + `; + for (const chat of chats) { + await inference.cancel(task.session_id, chat.id); + } + } + + await sql` + UPDATE tasks + SET state = 'cancelled', ended_at = clock_timestamp() + WHERE id = ${taskId} AND state IN ('pending', 'running') + `; + + return { cancelled: true }; + }); +} diff --git a/apps/coder/src/schema.sql b/apps/coder/src/schema.sql index 76ac92f..63f8de1 100644 --- a/apps/coder/src/schema.sql +++ b/apps/coder/src/schema.sql @@ -43,6 +43,9 @@ CREATE TABLE IF NOT EXISTS available_agents ( last_probed_at TIMESTAMPTZ ); +-- v2.0.0 Phase 4: link tasks to their inference sessions. +ALTER TABLE tasks ADD COLUMN IF NOT EXISTS session_id UUID REFERENCES sessions(id); + -- Human inbox: tasks needing attention CREATE OR REPLACE VIEW human_inbox AS SELECT * FROM tasks WHERE state IN ('blocked', 'failed'); diff --git a/apps/coder/src/services/agent-probe.ts b/apps/coder/src/services/agent-probe.ts new file mode 100644 index 0000000..cbdb841 --- /dev/null +++ b/apps/coder/src/services/agent-probe.ts @@ -0,0 +1,51 @@ +import { execFile } from 'node:child_process'; +import { promisify } from 'node:util'; +import type { Sql } from '../db.js'; +import type { FastifyBaseLogger } from 'fastify'; + +const execFileAsync = promisify(execFile); + +const KNOWN_AGENTS: Array<{ name: string; supportsAcp: boolean }> = [ + { name: 'opencode', supportsAcp: true }, + { name: 'goose', supportsAcp: true }, + { name: 'claude', supportsAcp: false }, + { name: 'pi', supportsAcp: false }, +]; + +export async function probeAgents(sql: Sql, log: FastifyBaseLogger): Promise { + log.info('agent-probe: scanning PATH for known agents'); + + for (const agent of KNOWN_AGENTS) { + try { + // Check if the agent binary is on PATH + const { stdout: whichOut } = await execFileAsync('which', [agent.name], { timeout: 5_000 }); + const installPath = whichOut.trim(); + if (!installPath) continue; + + // Get version + let version: string | null = null; + try { + const { stdout: verOut } = await execFileAsync(agent.name, ['--version'], { timeout: 10_000 }); + version = verOut.trim().slice(0, 100); + } catch { + // Some agents may not support --version — that's fine + } + + // UPSERT into available_agents + await sql` + INSERT INTO available_agents (name, install_path, version, supports_acp, last_probed_at) + VALUES (${agent.name}, ${installPath}, ${version}, ${agent.supportsAcp}, clock_timestamp()) + ON CONFLICT (name) DO UPDATE SET + install_path = EXCLUDED.install_path, + version = EXCLUDED.version, + supports_acp = EXCLUDED.supports_acp, + last_probed_at = EXCLUDED.last_probed_at + `; + log.info({ agent: agent.name, version, installPath }, 'agent-probe: found'); + } catch { + // Agent not found on PATH — skip silently + } + } + + log.info('agent-probe: scan complete'); +} diff --git a/apps/coder/src/services/dispatcher.ts b/apps/coder/src/services/dispatcher.ts new file mode 100644 index 0000000..9117f88 --- /dev/null +++ b/apps/coder/src/services/dispatcher.ts @@ -0,0 +1,191 @@ +import type { Sql } from '../db.js'; +import type { FastifyBaseLogger } from 'fastify'; +import type { Broker } from '@boocode/server/broker'; +import type { Config } from '../config.js'; + +interface InferenceRunner { + enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void; + cancel: (sessionId: string, chatId: string) => Promise; + hasActive: (chatId: string) => boolean; +} + +interface Deps { + sql: Sql; + inference: InferenceRunner; + broker: Broker; + log: FastifyBaseLogger; + config: Config; +} + +const POLL_INTERVAL_MS = 5_000; +const COMPLETION_POLL_MS = 2_000; + +export function createDispatcher(deps: Deps): { start(): void; stop(): Promise } { + const { sql, inference, log, config } = deps; + let timer: ReturnType | null = null; + let running = false; + let stopping = false; + let inflightPromise: Promise | null = null; + + async function poll(): Promise { + if (running || stopping) return; + + // Grab one pending task + const rows = await sql<{ id: string; project_id: string; input: string; agent: string | null; model: string | null }[]>` + SELECT id, project_id, input, agent, model + FROM tasks + WHERE state = 'pending' + ORDER BY created_at + LIMIT 1 + `; + if (rows.length === 0) return; + + const task = rows[0]!; + running = true; + inflightPromise = runTask(task).finally(() => { + running = false; + inflightPromise = null; + }); + } + + async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise { + const taskId = task.id; + log.info({ taskId }, 'dispatcher: starting task'); + + try { + // Mark running + await sql` + UPDATE tasks + SET state = 'running', started_at = clock_timestamp(), execution_path = 'native' + WHERE id = ${taskId} + `; + + // Create session + chat for this task + const model = task.model ?? config.DEFAULT_MODEL; + const sessionName = 'Task: ' + task.input.slice(0, 40); + + const [session] = await sql<{ id: string }[]>` + INSERT INTO sessions (project_id, name, model, status) + VALUES (${task.project_id}, ${sessionName}, ${model}, 'open') + RETURNING id + `; + const sessionId = session!.id; + + const [chat] = await sql<{ id: string }[]>` + INSERT INTO chats (session_id, name, status) + VALUES (${sessionId}, 'Task execution', 'open') + RETURNING id + `; + const chatId = chat!.id; + + // Link task to session + await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; + + // Create user message + streaming assistant + await sql<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) + RETURNING id + `; + const [assistantMsg] = await sql<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) + RETURNING id + `; + const assistantId = assistantMsg!.id; + + // Enqueue inference + inference.enqueue(sessionId, chatId, assistantId, 'default'); + + // Wait for inference to complete (poll message status) + const finalStatus = await waitForCompletion(assistantId); + + if (stopping) { + // Graceful shutdown — mark cancelled + await sql` + UPDATE tasks + SET state = 'cancelled', ended_at = clock_timestamp() + WHERE id = ${taskId} + `; + return; + } + + if (finalStatus === 'complete') { + // Grab assistant content for output_summary + const [msg] = await sql<{ content: string | null }[]>` + SELECT content FROM messages WHERE id = ${assistantId} + `; + const summary = (msg?.content ?? '').slice(0, 500); + await sql` + UPDATE tasks + SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary} + WHERE id = ${taskId} + `; + log.info({ taskId }, 'dispatcher: task completed'); + } else { + // failed or cancelled + const [msg] = await sql<{ content: string | null }[]>` + SELECT content FROM messages WHERE id = ${assistantId} + `; + const summary = (msg?.content ?? 'Inference failed').slice(0, 500); + await sql` + UPDATE tasks + SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary} + WHERE id = ${taskId} + `; + log.warn({ taskId, finalStatus }, 'dispatcher: task failed'); + } + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + log.error({ taskId, err: errMsg }, 'dispatcher: task error'); + await sql` + UPDATE tasks + SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} + WHERE id = ${taskId} + `.catch(() => {}); // best-effort + } + } + + async function waitForCompletion(assistantId: string): Promise { + // Poll until the assistant message is no longer streaming + for (;;) { + if (stopping) return 'cancelled'; + + const [row] = await sql<{ status: string }[]>` + SELECT status FROM messages WHERE id = ${assistantId} + `; + const status = row?.status ?? 'failed'; + if (status !== 'streaming') return status; + + await sleep(COMPLETION_POLL_MS); + } + } + + function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + return { + start() { + log.info('dispatcher: starting poll loop'); + timer = setInterval(() => { + poll().catch((err) => { + log.error({ err }, 'dispatcher: poll error'); + }); + }, POLL_INTERVAL_MS); + }, + + async stop() { + stopping = true; + if (timer) { + clearInterval(timer); + timer = null; + } + if (inflightPromise) { + log.info('dispatcher: waiting for in-flight task'); + await inflightPromise; + } + log.info('dispatcher: stopped'); + }, + }; +}