diff --git a/apps/coder/package.json b/apps/coder/package.json index 1cceabe..a31e27b 100644 --- a/apps/coder/package.json +++ b/apps/coder/package.json @@ -8,6 +8,7 @@ "dev": "tsx watch src/index.ts", "build": "tsc && node -e \"import('node:fs').then(fs=>fs.copyFileSync('src/schema.sql','dist/schema.sql'))\"", "start": "node dist/index.js", + "cli": "tsx src/cli.ts", "typecheck": "tsc --noEmit", "test": "vitest run" }, @@ -19,10 +20,12 @@ "@modelcontextprotocol/sdk": "^1.29.0", "fastify": "^4.28.1", "postgres": "^3.4.4", + "ws": "^8.18.0", "zod": "^3.23.8" }, "devDependencies": { "@types/node": "^20.14.10", + "@types/ws": "^8.5.10", "tsx": "^4.16.2", "typescript": "^5.5.0", "vitest": "^3.0.0" diff --git a/apps/coder/src/cli.ts b/apps/coder/src/cli.ts new file mode 100644 index 0000000..68310a7 --- /dev/null +++ b/apps/coder/src/cli.ts @@ -0,0 +1,249 @@ +#!/usr/bin/env node +/** + * BooCoder CLI client. + * + * Usage: + * boocode run "task description" [--agent opencode] [--model claude-opus-4-7] [--project ] + * boocode ls [--state pending|running|completed|failed] + * boocode attach + * boocode send "message" + */ +import { WebSocket } from 'ws'; + +const BASE_URL = process.env.BOOCODER_URL ?? 'http://100.114.205.53:9502'; + +// ─── Arg parsing ───────────────────────────────────────────────────────────── + +function getFlag(args: string[], name: string): string | undefined { + const idx = args.indexOf(name); + if (idx === -1 || idx + 1 >= args.length) return undefined; + return args[idx + 1]; +} + +function hasFlag(args: string[], name: string): boolean { + return args.includes(name); +} + +// ─── HTTP helpers ──────────────────────────────────────────────────────────── + +async function api(method: string, path: string, body?: unknown): Promise { + const url = `${BASE_URL}${path}`; + const res = await fetch(url, { + method, + headers: body ? { 'Content-Type': 'application/json' } : undefined, + body: body ? JSON.stringify(body) : undefined, + }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + throw new Error(`${method} ${path} → ${res.status}: ${text}`); + } + return res.json(); +} + +// ─── WS streaming ──────────────────────────────────────────────────────────── + +function streamSession(sessionId: string): void { + const wsUrl = BASE_URL.replace(/^http/, 'ws') + `/api/ws/sessions/${sessionId}`; + const ws = new WebSocket(wsUrl); + + ws.on('message', (data) => { + try { + const frame = JSON.parse(data.toString()) as { type: string; content?: string; name?: string; arguments?: string }; + if (frame.type === 'delta' && frame.content) { + process.stdout.write(frame.content); + } else if (frame.type === 'tool_call') { + process.stdout.write(`\n[tool: ${frame.name ?? '?'}(${(frame.arguments ?? '').slice(0, 80)})]\n`); + } else if (frame.type === 'tool_result') { + process.stdout.write(`[tool_result]\n`); + } else if (frame.type === 'status' || frame.type === 'chat_status') { + // Silent + } + } catch { + // Non-JSON frame, ignore + } + }); + + ws.on('error', (err) => { + process.stderr.write(`WS error: ${err.message}\n`); + }); + + ws.on('close', () => { + process.stdout.write('\n'); + process.exit(0); + }); + + process.on('SIGINT', () => { + ws.close(); + process.exit(0); + }); +} + +// ─── Commands ──────────────────────────────────────────────────────────────── + +async function cmdRun(args: string[]): Promise { + const input = args.find((a) => !a.startsWith('--')); + if (!input) { + process.stderr.write('Usage: boocode run "task description" [--agent X] [--model X] [--project X]\n'); + process.exit(1); + } + + const agent = getFlag(args, '--agent'); + const model = getFlag(args, '--model'); + const project_id = getFlag(args, '--project'); + + if (!project_id) { + process.stderr.write('Error: --project is required\n'); + process.exit(1); + } + + const result = (await api('POST', '/api/tasks', { + project_id, + input, + ...(agent && { agent }), + ...(model && { model }), + })) as { id: string; state: string }; + + process.stdout.write(`Task created: ${result.id} (state: ${result.state})\n`); + + // Poll until task has session_id, then stream; or poll until terminal state + const POLL_MS = 2000; + for (;;) { + await sleep(POLL_MS); + const task = (await api('GET', `/api/tasks/${result.id}`)) as { + id: string; state: string; session_id?: string; output_summary?: string; + }; + + if (task.session_id) { + process.stdout.write(`Streaming session ${task.session_id}...\n`); + streamSession(task.session_id); + return; // streamSession handles exit + } + + if (task.state === 'completed') { + process.stdout.write(`\nCompleted: ${task.output_summary ?? '(no summary)'}\n`); + return; + } + if (task.state === 'failed') { + process.stderr.write(`\nFailed: ${task.output_summary ?? '(no summary)'}\n`); + process.exit(1); + } + if (task.state === 'cancelled') { + process.stderr.write(`\nCancelled.\n`); + process.exit(1); + } + } +} + +async function cmdLs(args: string[]): Promise { + const state = getFlag(args, '--state'); + const query = state ? `?state=${state}` : ''; + const tasks = (await api('GET', `/api/tasks${query}`)) as Array<{ + id: string; state: string; agent: string | null; input: string; created_at: string; + }>; + + if (tasks.length === 0) { + process.stdout.write('No tasks.\n'); + return; + } + + // Table header + process.stdout.write( + pad('ID', 38) + pad('STATE', 12) + pad('AGENT', 14) + pad('INPUT', 52) + 'CREATED\n', + ); + process.stdout.write('-'.repeat(120) + '\n'); + + for (const t of tasks) { + process.stdout.write( + pad(t.id, 38) + + pad(t.state, 12) + + pad(t.agent ?? '-', 14) + + pad(t.input.slice(0, 50), 52) + + (t.created_at?.slice(0, 19) ?? '') + '\n', + ); + } +} + +async function cmdAttach(args: string[]): Promise { + const taskId = args[0]; + if (!taskId) { + process.stderr.write('Usage: boocode attach \n'); + process.exit(1); + } + + const task = (await api('GET', `/api/tasks/${taskId}`)) as { session_id?: string }; + if (!task.session_id) { + process.stderr.write('Task has no session yet (still pending?).\n'); + process.exit(1); + } + + streamSession(task.session_id); +} + +async function cmdSend(args: string[]): Promise { + const taskId = args[0]; + const message = args[1]; + if (!taskId || !message) { + process.stderr.write('Usage: boocode send "message"\n'); + process.exit(1); + } + + const task = (await api('GET', `/api/tasks/${taskId}`)) as { session_id?: string }; + if (!task.session_id) { + process.stderr.write('Task has no session yet.\n'); + process.exit(1); + } + + // Find active chat + const sessionId = task.session_id; + // POST message to the session's chat (the messages route expects session_id in path) + await api('POST', `/api/sessions/${sessionId}/messages`, { content: message }); + + // Then attach to stream the response + streamSession(sessionId); +} + +// ─── Utils ─────────────────────────────────────────────────────────────────── + +function pad(s: string, width: number): string { + return s.length >= width ? s.slice(0, width) : s + ' '.repeat(width - s.length); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// ─── Main ──────────────────────────────────────────────────────────────────── + +const [cmd, ...rest] = process.argv.slice(2); + +switch (cmd) { + case 'run': + cmdRun(rest).catch(fatal); + break; + case 'ls': + cmdLs(rest).catch(fatal); + break; + case 'attach': + cmdAttach(rest).catch(fatal); + break; + case 'send': + cmdSend(rest).catch(fatal); + break; + default: + process.stdout.write( + 'BooCoder CLI\n\n' + + 'Commands:\n' + + ' run "task" [--agent X] [--model X] [--project ] Create and stream a task\n' + + ' ls [--state pending|running|completed|failed] List tasks\n' + + ' attach Stream a running task\n' + + ' send "message" Send input to a task\n' + + '\n' + + `Base URL: ${BASE_URL} (set BOOCODER_URL to override)\n`, + ); + if (cmd && cmd !== '--help' && cmd !== '-h') process.exit(1); +} + +function fatal(err: unknown): void { + process.stderr.write(`Error: ${err instanceof Error ? err.message : String(err)}\n`); + process.exit(1); +} diff --git a/apps/coder/src/index.ts b/apps/coder/src/index.ts index aaee0c5..8632f55 100644 --- a/apps/coder/src/index.ts +++ b/apps/coder/src/index.ts @@ -25,6 +25,8 @@ import { setInferenceContext, clearInferenceContext } from './services/tools/inf import { registerMessageRoutes } from './routes/messages.js'; import { registerPendingRoutes } from './routes/pending.js'; import { registerTaskRoutes } from './routes/tasks.js'; +import { registerInboxRoutes } from './routes/inbox.js'; +import { registerStatsRoutes } from './routes/stats.js'; import { registerWebSocket } from './routes/ws.js'; // Phase 4: dispatcher + agent probe import { createDispatcher } from './services/dispatcher.js'; @@ -139,6 +141,8 @@ async function main() { registerMessageRoutes(app, sql, broker, inferenceApi); registerPendingRoutes(app, sql); registerTaskRoutes(app, sql, inferenceApi); + registerInboxRoutes(app, sql); + registerStatsRoutes(app, sql); registerWebSocket(app, sql, broker); // Serve static frontend (built web app). In production, the dist/ is diff --git a/apps/coder/src/routes/inbox.ts b/apps/coder/src/routes/inbox.ts new file mode 100644 index 0000000..e90120b --- /dev/null +++ b/apps/coder/src/routes/inbox.ts @@ -0,0 +1,33 @@ +import type { FastifyInstance } from 'fastify'; +import type { Sql } from '../db.js'; + +export function registerInboxRoutes(app: FastifyInstance, sql: Sql): void { + // GET /api/inbox — tasks needing human attention (blocked or failed) + app.get('/api/inbox', async () => { + return sql` + SELECT id, project_id, parent_task_id, state, input, output_summary, agent, model, session_id, started_at, ended_at, created_at + FROM human_inbox + ORDER BY created_at DESC + LIMIT 100 + `; + }); + + // POST /api/inbox/:id/retry — reset a blocked/failed task to pending for re-dispatch + app.post<{ Params: { id: string } }>('/api/inbox/:id/retry', async (req, reply) => { + const taskId = req.params.id; + + const result = await sql` + UPDATE tasks + SET state = 'pending', started_at = NULL, ended_at = NULL, output_summary = NULL + WHERE id = ${taskId} AND state IN ('blocked', 'failed') + RETURNING id, state + `; + + if (result.length === 0) { + reply.code(404); + return { error: 'task not found or not in retryable state' }; + } + + return { id: result[0]!.id, state: result[0]!.state }; + }); +} diff --git a/apps/coder/src/routes/stats.ts b/apps/coder/src/routes/stats.ts new file mode 100644 index 0000000..fd8266c --- /dev/null +++ b/apps/coder/src/routes/stats.ts @@ -0,0 +1,48 @@ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import type { Sql } from '../db.js'; + +const CostQuery = z.object({ + group_by: z.enum(['project', 'agent', 'day']).default('project'), +}); + +export function registerStatsRoutes(app: FastifyInstance, sql: Sql): void { + // GET /api/stats/costs — aggregate cost_tokens by project, agent, or day + app.get('/api/stats/costs', async (req, reply) => { + const parsed = CostQuery.safeParse(req.query); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid query', details: parsed.error.flatten() }; + } + + const { group_by } = parsed.data; + + switch (group_by) { + case 'project': + return sql` + SELECT project_id, COUNT(*)::int AS task_count, COALESCE(SUM(cost_tokens), 0)::int AS total_tokens + FROM tasks + WHERE cost_tokens IS NOT NULL + GROUP BY project_id + ORDER BY total_tokens DESC + `; + case 'agent': + return sql` + SELECT COALESCE(agent, 'native') AS agent, COUNT(*)::int AS task_count, COALESCE(SUM(cost_tokens), 0)::int AS total_tokens + FROM tasks + WHERE cost_tokens IS NOT NULL + GROUP BY agent + ORDER BY total_tokens DESC + `; + case 'day': + return sql` + SELECT DATE(created_at) AS day, COUNT(*)::int AS task_count, COALESCE(SUM(cost_tokens), 0)::int AS total_tokens + FROM tasks + WHERE cost_tokens IS NOT NULL + GROUP BY DATE(created_at) + ORDER BY day DESC + LIMIT 90 + `; + } + }); +} diff --git a/apps/coder/src/services/dispatcher.ts b/apps/coder/src/services/dispatcher.ts index 8f512a9..e3c6197 100644 --- a/apps/coder/src/services/dispatcher.ts +++ b/apps/coder/src/services/dispatcher.ts @@ -134,6 +134,14 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise` + SELECT SUM(tokens_used)::int AS total + FROM messages + WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL + `; + const costTokens = costRow?.total ?? null; + if (finalStatus === 'complete') { const [msg] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE id = ${assistantId} @@ -141,10 +149,10 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise` SELECT content FROM messages WHERE id = ${assistantId} @@ -152,7 +160,7 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise` + SELECT SUM(tokens_used)::int AS total + FROM messages + WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL + `; + const extCostTokens = extCostRow?.total ?? null; + + // Step 6: Mark task completed await sql` UPDATE tasks - SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary} + SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} WHERE id = ${taskId} `; - log.info({ taskId, agent }, 'dispatcher: task completed (external)'); + log.info({ taskId, agent, costTokens: extCostTokens }, 'dispatcher: task completed (external)'); } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); diff --git a/apps/coder/src/services/tools/check_task_status.ts b/apps/coder/src/services/tools/check_task_status.ts new file mode 100644 index 0000000..630b646 --- /dev/null +++ b/apps/coder/src/services/tools/check_task_status.ts @@ -0,0 +1,50 @@ +import { z } from 'zod'; +import type { ToolDef, ToolContext } from './types.js'; + +const CheckTaskStatusInput = z.object({ + task_id: z.string().uuid().describe('ID of the task to check'), +}); + +type CheckTaskStatusInputT = z.infer; + +export const checkTaskStatusTool: ToolDef = { + name: 'check_task_status', + description: 'Check the status and output of a subtask by ID. Returns state, output_summary, and timing.', + inputSchema: CheckTaskStatusInput, + jsonSchema: { + type: 'function', + function: { + name: 'check_task_status', + description: 'Check the status and output of a subtask by ID.', + parameters: { + type: 'object', + properties: { + task_id: { type: 'string', description: 'ID of the task to check' }, + }, + required: ['task_id'], + }, + }, + }, + + async execute(input: CheckTaskStatusInputT, _projectRoot: string, context: ToolContext): Promise { + const { sql } = context; + + const [task] = await sql<{ id: string; state: string; output_summary: string | null; started_at: string | null; ended_at: string | null }[]>` + SELECT id, state, output_summary, started_at, ended_at + FROM tasks + WHERE id = ${input.task_id} + `; + + if (!task) { + return { error: `Task ${input.task_id} not found` }; + } + + return { + id: task.id, + state: task.state, + output_summary: task.output_summary, + started_at: task.started_at, + ended_at: task.ended_at, + }; + }, +}; diff --git a/apps/coder/src/services/tools/index.ts b/apps/coder/src/services/tools/index.ts index 639e166..963563d 100644 --- a/apps/coder/src/services/tools/index.ts +++ b/apps/coder/src/services/tools/index.ts @@ -4,6 +4,9 @@ import { createFileTool } from './create_file.js'; import { deleteFileTool } from './delete_file.js'; import { applyPendingTool } from './apply_pending.js'; import { rewindTool } from './rewind.js'; +import { newTaskTool } from './new_task.js'; +import { listTasksTool } from './list_tasks.js'; +import { checkTaskStatusTool } from './check_task_status.js'; export type { ToolDef, ToolContext, ToolJsonSchema } from './types.js'; @@ -16,6 +19,11 @@ export const WRITE_TOOLS: readonly ToolDef[] = [ deleteFileTool, editFileTool, rewindTool, + // Boomerang subtask tools — orchestrator agents call these to spawn/monitor child tasks. + // An "Orchestrator" agent profile would whitelist [new_task, list_tasks, check_task_status]. + newTaskTool, + listTasksTool, + checkTaskStatusTool, ]; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -23,4 +31,4 @@ export const WRITE_TOOLS_BY_NAME: ReadonlyMap> = new Map( WRITE_TOOLS.map((t) => [t.name, t]), ); -export { editFileTool, createFileTool, deleteFileTool, applyPendingTool, rewindTool }; +export { editFileTool, createFileTool, deleteFileTool, applyPendingTool, rewindTool, newTaskTool, listTasksTool, checkTaskStatusTool }; diff --git a/apps/coder/src/services/tools/list_tasks.ts b/apps/coder/src/services/tools/list_tasks.ts new file mode 100644 index 0000000..57b573b --- /dev/null +++ b/apps/coder/src/services/tools/list_tasks.ts @@ -0,0 +1,56 @@ +import { z } from 'zod'; +import type { ToolDef, ToolContext } from './types.js'; +import { getInferenceContext } from './inference_context.js'; + +const ListTasksInput = z.object({ + parent_task_id: z.string().uuid().optional().describe('Filter by parent task ID. Omit to list children of current task.'), +}); + +type ListTasksInputT = z.infer; + +export const listTasksTool: ToolDef = { + name: 'list_tasks', + description: 'List child tasks of the current task (or a specified parent). Returns id, state, input preview, and output_summary.', + inputSchema: ListTasksInput, + jsonSchema: { + type: 'function', + function: { + name: 'list_tasks', + description: 'List child tasks of the current task (or a specified parent).', + parameters: { + type: 'object', + properties: { + parent_task_id: { type: 'string', description: 'Filter by parent task ID. Omit to list children of current task.' }, + }, + required: [], + }, + }, + }, + + async execute(input: ListTasksInputT, _projectRoot: string, context: ToolContext): Promise { + const { sql } = context; + const ctx = getInferenceContext(); + const parentId = input.parent_task_id ?? ctx.taskId; + + if (!parentId) { + return { tasks: [], note: 'No parent task context — not running inside a task.' }; + } + + const rows = await sql<{ id: string; state: string; input: string; output_summary: string | null }[]>` + SELECT id, state, input, output_summary + FROM tasks + WHERE parent_task_id = ${parentId} + ORDER BY created_at DESC + LIMIT 50 + `; + + return { + tasks: rows.map((r) => ({ + id: r.id, + state: r.state, + input_preview: r.input.slice(0, 100), + output_summary: r.output_summary, + })), + }; + }, +}; diff --git a/apps/coder/src/services/tools/new_task.ts b/apps/coder/src/services/tools/new_task.ts new file mode 100644 index 0000000..026fcec --- /dev/null +++ b/apps/coder/src/services/tools/new_task.ts @@ -0,0 +1,65 @@ +import { z } from 'zod'; +import type { ToolDef, ToolContext } from './types.js'; +import { getInferenceContext } from './inference_context.js'; + +const NewTaskInput = z.object({ + input: z.string().min(1).describe('Task description for the child subtask'), + agent: z.string().optional().describe('Optional: dispatch to a specific agent'), + model: z.string().optional().describe('Optional: model override for the subtask'), +}); + +type NewTaskInputT = z.infer; + +export const newTaskTool: ToolDef = { + name: 'new_task', + description: + 'Spawn a subtask that runs in isolation. The subtask gets its own session and ' + + 'worktree. Use check_task_status to monitor progress. Only the output_summary is ' + + 'accessible to the parent — full isolation (Boomerang pattern).', + inputSchema: NewTaskInput, + jsonSchema: { + type: 'function', + function: { + name: 'new_task', + description: + 'Spawn a subtask that runs in isolation. The subtask gets its own session and ' + + 'worktree. Use check_task_status to monitor progress.', + parameters: { + type: 'object', + properties: { + input: { type: 'string', description: 'Task description for the child subtask' }, + agent: { type: 'string', description: 'Optional: dispatch to a specific agent' }, + model: { type: 'string', description: 'Optional: model override for the subtask' }, + }, + required: ['input'], + }, + }, + }, + + async execute(input: NewTaskInputT, _projectRoot: string, context: ToolContext): Promise { + const { sql } = context; + // Get the current task's project_id from the inference context + const ctx = getInferenceContext(); + const currentTaskId = ctx.taskId; + + // Look up the project_id from the current session + const [session] = await sql<{ project_id: string }[]>` + SELECT project_id FROM sessions WHERE id = ${ctx.sessionId} + `; + if (!session) { + return { error: 'Cannot determine project_id from current session' }; + } + + const [task] = await sql<{ id: string; state: string }[]>` + INSERT INTO tasks (project_id, parent_task_id, input, agent, model) + VALUES (${session.project_id}, ${currentTaskId}, ${input.input}, ${input.agent ?? null}, ${input.model ?? null}) + RETURNING id, state + `; + + return { + message: `Subtask created (id: ${task!.id}). It will run in isolation. Use check_task_status to monitor.`, + task_id: task!.id, + state: task!.state, + }; + }, +}; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 20eebe6..ad614f3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -69,6 +69,9 @@ importers: postgres: specifier: ^3.4.4 version: 3.4.9 + ws: + specifier: ^8.18.0 + version: 8.20.1 zod: specifier: ^3.23.8 version: 3.25.76 @@ -76,6 +79,9 @@ importers: '@types/node': specifier: ^20.14.10 version: 20.19.41 + '@types/ws': + specifier: ^8.5.10 + version: 8.18.1 tsx: specifier: ^4.16.2 version: 4.22.0