v2.0.0-final: dispatcher + task queue + agent probing

Phase 4 of v2.0. BooCoder can now queue tasks and dispatch them
through the inference loop autonomously.

Dispatcher (services/dispatcher.ts): in-process setInterval(5s) polls
tasks WHERE state='pending', picks one at a time, creates an isolated
session+chat, enqueues inference with the task's input as the user
message, polls for completion, marks state completed/failed with
output_summary. Single-task-at-a-time for v2.0.0; parallel dispatch
is a Phase 5+ concern. Respects onClose hook for graceful shutdown.

Task routes (routes/tasks.ts): POST /api/tasks (create), GET /api/tasks
(list with state/project filters), GET /api/tasks/:id (detail),
POST /api/tasks/:id/cancel (marks cancelled, aborts if running).

Agent probe (services/agent-probe.ts): on startup, probes PATH for
opencode/goose/claude/pi via which + --version. UPSERTs into
available_agents table. Finds nothing inside the container (expected —
Phase 5 addresses host-agent access via ACP/PTY).

Schema: ALTER TABLE tasks ADD COLUMN IF NOT EXISTS session_id (links
task to its auto-created inference session for isolation).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 03:55:18 +00:00
parent 73b53089b0
commit 752ea74f43
5 changed files with 396 additions and 0 deletions

View File

@@ -23,7 +23,11 @@ import { setInferenceContext, clearInferenceContext } from './services/tools/inf
// Routes
import { registerMessageRoutes } from './routes/messages.js';
import { registerPendingRoutes } from './routes/pending.js';
import { registerTaskRoutes } from './routes/tasks.js';
import { registerWebSocket } from './routes/ws.js';
// Phase 4: dispatcher + agent probe
import { createDispatcher } from './services/dispatcher.js';
import { probeAgents } from './services/agent-probe.js';
async function main() {
const config = loadConfig();
@@ -113,9 +117,18 @@ async function main() {
});
});
// Phase 4: probe available agents on startup
await probeAgents(sql, app.log);
// Phase 4: dispatcher — polls tasks table and runs inference
const dispatcher = createDispatcher({ sql, inference: inferenceApi, broker, log: app.log, config });
dispatcher.start();
app.addHook('onClose', () => dispatcher.stop());
// Register routes
registerMessageRoutes(app, sql, broker, inferenceApi);
registerPendingRoutes(app, sql);
registerTaskRoutes(app, sql, inferenceApi);
registerWebSocket(app, sql, broker);
// Serve static frontend (built web app). In production, the dist/ is

View File

@@ -0,0 +1,138 @@
import type { FastifyInstance } from 'fastify';
import { z } from 'zod';
import type { Sql } from '../db.js';
interface InferenceApi {
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
}
const CreateBody = z.object({
project_id: z.string().uuid(),
input: z.string().min(1).max(64_000),
agent: z.string().max(100).optional(),
model: z.string().max(200).optional(),
});
const ListQuery = z.object({
state: z.enum(['pending', 'running', 'completed', 'failed', 'blocked', 'cancelled']).optional(),
project_id: z.string().uuid().optional(),
});
export function registerTaskRoutes(app: FastifyInstance, sql: Sql, inference: InferenceApi): void {
// POST /api/tasks — create a new task
app.post('/api/tasks', async (req, reply) => {
const parsed = CreateBody.safeParse(req.body);
if (!parsed.success) {
reply.code(400);
return { error: 'invalid body', details: parsed.error.flatten() };
}
const { project_id, input, agent, model } = parsed.data;
const [task] = await sql<{ id: string; state: string }[]>`
INSERT INTO tasks (project_id, input, agent, model)
VALUES (${project_id}, ${input}, ${agent ?? null}, ${model ?? null})
RETURNING id, state
`;
reply.code(201);
return { id: task!.id, state: task!.state };
});
// GET /api/tasks — list tasks with optional filters
app.get('/api/tasks', async (req, _reply) => {
const parsed = ListQuery.safeParse(req.query);
if (!parsed.success) {
return { error: 'invalid query', details: parsed.error.flatten() };
}
const { state, project_id } = parsed.data;
// Build query with optional filters
if (state && project_id) {
return sql`
SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at
FROM tasks
WHERE state = ${state} AND project_id = ${project_id}
ORDER BY created_at DESC
LIMIT 100
`;
} else if (state) {
return sql`
SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at
FROM tasks
WHERE state = ${state}
ORDER BY created_at DESC
LIMIT 100
`;
} else if (project_id) {
return sql`
SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at
FROM tasks
WHERE project_id = ${project_id}
ORDER BY created_at DESC
LIMIT 100
`;
} else {
return sql`
SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at
FROM tasks
ORDER BY created_at DESC
LIMIT 100
`;
}
});
// GET /api/tasks/:id — single task detail
app.get<{ Params: { id: string } }>('/api/tasks/:id', async (req, reply) => {
const rows = await sql`
SELECT id, project_id, parent_task_id, state, input, output_summary, agent, model, execution_path, worktree_path, session_id, cost_tokens, started_at, ended_at, created_at
FROM tasks
WHERE id = ${req.params.id}
`;
if (rows.length === 0) {
reply.code(404);
return { error: 'task not found' };
}
return rows[0];
});
// POST /api/tasks/:id/cancel — cancel a pending or running task
app.post<{ Params: { id: string } }>('/api/tasks/:id/cancel', async (req, reply) => {
const taskId = req.params.id;
// Get current task state + session info
const rows = await sql<{ id: string; state: string; session_id: string | null }[]>`
SELECT id, state, session_id FROM tasks WHERE id = ${taskId}
`;
if (rows.length === 0) {
reply.code(404);
return { error: 'task not found' };
}
const task = rows[0]!;
if (task.state !== 'pending' && task.state !== 'running') {
reply.code(409);
return { error: `cannot cancel task in state '${task.state}'` };
}
// If running, try to cancel inference
if (task.state === 'running' && task.session_id) {
// Find active chat in the task's session
const chats = await sql<{ id: string }[]>`
SELECT id FROM chats WHERE session_id = ${task.session_id} AND status = 'open'
`;
for (const chat of chats) {
await inference.cancel(task.session_id, chat.id);
}
}
await sql`
UPDATE tasks
SET state = 'cancelled', ended_at = clock_timestamp()
WHERE id = ${taskId} AND state IN ('pending', 'running')
`;
return { cancelled: true };
});
}

View File

@@ -43,6 +43,9 @@ CREATE TABLE IF NOT EXISTS available_agents (
last_probed_at TIMESTAMPTZ
);
-- v2.0.0 Phase 4: link tasks to their inference sessions.
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS session_id UUID REFERENCES sessions(id);
-- Human inbox: tasks needing attention
CREATE OR REPLACE VIEW human_inbox AS
SELECT * FROM tasks WHERE state IN ('blocked', 'failed');

View File

@@ -0,0 +1,51 @@
import { execFile } from 'node:child_process';
import { promisify } from 'node:util';
import type { Sql } from '../db.js';
import type { FastifyBaseLogger } from 'fastify';
const execFileAsync = promisify(execFile);
const KNOWN_AGENTS: Array<{ name: string; supportsAcp: boolean }> = [
{ name: 'opencode', supportsAcp: true },
{ name: 'goose', supportsAcp: true },
{ name: 'claude', supportsAcp: false },
{ name: 'pi', supportsAcp: false },
];
export async function probeAgents(sql: Sql, log: FastifyBaseLogger): Promise<void> {
log.info('agent-probe: scanning PATH for known agents');
for (const agent of KNOWN_AGENTS) {
try {
// Check if the agent binary is on PATH
const { stdout: whichOut } = await execFileAsync('which', [agent.name], { timeout: 5_000 });
const installPath = whichOut.trim();
if (!installPath) continue;
// Get version
let version: string | null = null;
try {
const { stdout: verOut } = await execFileAsync(agent.name, ['--version'], { timeout: 10_000 });
version = verOut.trim().slice(0, 100);
} catch {
// Some agents may not support --version — that's fine
}
// UPSERT into available_agents
await sql`
INSERT INTO available_agents (name, install_path, version, supports_acp, last_probed_at)
VALUES (${agent.name}, ${installPath}, ${version}, ${agent.supportsAcp}, clock_timestamp())
ON CONFLICT (name) DO UPDATE SET
install_path = EXCLUDED.install_path,
version = EXCLUDED.version,
supports_acp = EXCLUDED.supports_acp,
last_probed_at = EXCLUDED.last_probed_at
`;
log.info({ agent: agent.name, version, installPath }, 'agent-probe: found');
} catch {
// Agent not found on PATH — skip silently
}
}
log.info('agent-probe: scan complete');
}

View File

@@ -0,0 +1,191 @@
import type { Sql } from '../db.js';
import type { FastifyBaseLogger } from 'fastify';
import type { Broker } from '@boocode/server/broker';
import type { Config } from '../config.js';
interface InferenceRunner {
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
hasActive: (chatId: string) => boolean;
}
interface Deps {
sql: Sql;
inference: InferenceRunner;
broker: Broker;
log: FastifyBaseLogger;
config: Config;
}
const POLL_INTERVAL_MS = 5_000;
const COMPLETION_POLL_MS = 2_000;
export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<void> } {
const { sql, inference, log, config } = deps;
let timer: ReturnType<typeof setInterval> | null = null;
let running = false;
let stopping = false;
let inflightPromise: Promise<void> | null = null;
async function poll(): Promise<void> {
if (running || stopping) return;
// Grab one pending task
const rows = await sql<{ id: string; project_id: string; input: string; agent: string | null; model: string | null }[]>`
SELECT id, project_id, input, agent, model
FROM tasks
WHERE state = 'pending'
ORDER BY created_at
LIMIT 1
`;
if (rows.length === 0) return;
const task = rows[0]!;
running = true;
inflightPromise = runTask(task).finally(() => {
running = false;
inflightPromise = null;
});
}
async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise<void> {
const taskId = task.id;
log.info({ taskId }, 'dispatcher: starting task');
try {
// Mark running
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = 'native'
WHERE id = ${taskId}
`;
// Create session + chat for this task
const model = task.model ?? config.DEFAULT_MODEL;
const sessionName = 'Task: ' + task.input.slice(0, 40);
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${task.project_id}, ${sessionName}, ${model}, 'open')
RETURNING id
`;
const sessionId = session!.id;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'Task execution', 'open')
RETURNING id
`;
const chatId = chat!.id;
// Link task to session
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
// Create user message + streaming assistant
await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
RETURNING id
`;
const [assistantMsg] = await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
const assistantId = assistantMsg!.id;
// Enqueue inference
inference.enqueue(sessionId, chatId, assistantId, 'default');
// Wait for inference to complete (poll message status)
const finalStatus = await waitForCompletion(assistantId);
if (stopping) {
// Graceful shutdown — mark cancelled
await sql`
UPDATE tasks
SET state = 'cancelled', ended_at = clock_timestamp()
WHERE id = ${taskId}
`;
return;
}
if (finalStatus === 'complete') {
// Grab assistant content for output_summary
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
const summary = (msg?.content ?? '').slice(0, 500);
await sql`
UPDATE tasks
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}
WHERE id = ${taskId}
`;
log.info({ taskId }, 'dispatcher: task completed');
} else {
// failed or cancelled
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
const summary = (msg?.content ?? 'Inference failed').slice(0, 500);
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}
WHERE id = ${taskId}
`;
log.warn({ taskId, finalStatus }, 'dispatcher: task failed');
}
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, err: errMsg }, 'dispatcher: task error');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {}); // best-effort
}
}
async function waitForCompletion(assistantId: string): Promise<string> {
// Poll until the assistant message is no longer streaming
for (;;) {
if (stopping) return 'cancelled';
const [row] = await sql<{ status: string }[]>`
SELECT status FROM messages WHERE id = ${assistantId}
`;
const status = row?.status ?? 'failed';
if (status !== 'streaming') return status;
await sleep(COMPLETION_POLL_MS);
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
return {
start() {
log.info('dispatcher: starting poll loop');
timer = setInterval(() => {
poll().catch((err) => {
log.error({ err }, 'dispatcher: poll error');
});
}, POLL_INTERVAL_MS);
},
async stop() {
stopping = true;
if (timer) {
clearInterval(timer);
timer = null;
}
if (inflightPromise) {
log.info('dispatcher: waiting for in-flight task');
await inflightPromise;
}
log.info('dispatcher: stopped');
},
};
}