From c11e26090fe2ca984cd54fecb147700e3773cfa3 Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Mon, 8 Jun 2026 01:11:07 +0000 Subject: [PATCH] =?UTF-8?q?feat(coder):=20boulder=20state=20=E2=80=94=20cr?= =?UTF-8?q?oss-session=20plan=20persistence=20+=20auto-resumption?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New plans table (id, project_id, title, description, status, flow_run_id, progress_pct, items_total, items_completed, metadata, timestamps) with CHECK constraints and indexes. Plan store (plan-store.ts): createPlan, getPlan, listPlans, listActivePlans, updatePlan, updatePlanFromRun, findPlanWithRunningRun, planStatusFromRun. Flow-runner integration: onRunTerminal callback fires on every terminal transition (complete/fail/cancel) and updates linked plans automatically. 5 API endpoints: GET /api/plans, GET /api/plans/active, GET /api/plans/:id, POST /api/plans, PATCH /api/plans/:id. 484 tests pass, build clean. --- apps/coder/src/index.ts | 15 +- apps/coder/src/routes/plans.ts | 134 +++++++++++++ apps/coder/src/schema.sql | 28 +++ .../src/services/__tests__/plan-store.test.ts | 16 ++ apps/coder/src/services/flow-runner.ts | 6 + apps/coder/src/services/plan-store.ts | 184 ++++++++++++++++++ 6 files changed, 381 insertions(+), 2 deletions(-) create mode 100644 apps/coder/src/routes/plans.ts create mode 100644 apps/coder/src/services/__tests__/plan-store.test.ts create mode 100644 apps/coder/src/services/plan-store.ts diff --git a/apps/coder/src/index.ts b/apps/coder/src/index.ts index 2309297..9680819 100644 --- a/apps/coder/src/index.ts +++ b/apps/coder/src/index.ts @@ -29,7 +29,9 @@ import { registerProviderRoutes } from './routes/providers.js'; import { registerWorktreeSafetyRoutes } from './routes/worktree-safety.js'; import { registerLifecycleRoutes } from './routes/lifecycle.js'; import { registerAnalyticsRoutes } from './routes/analytics.js'; +import { registerPlanRoutes } from './routes/plans.js'; import { registerWebSocket } from './routes/ws.js'; +import { updatePlanFromRun } from './services/plan-store.js'; // Phase 4: dispatcher + agent probe import { createDispatcher } from './services/dispatcher.js'; // Orchestrator (Phase 2): DB-backed flow-runner; advances on the dispatcher's @@ -229,8 +231,16 @@ async function main() { // Orchestrator (Phase 2): the flow-runner reacts to the dispatcher's // onTaskTerminal hook to advance flow_runs. Created before the dispatcher so its - // terminal callback can be wired in. - const flowRunner = createFlowRunner({ sql, broker, log: app.log, config }); + // terminal callback can be wired in. onRunTerminal updates linked plans. + const flowRunner = createFlowRunner({ + sql, broker, log: app.log, config, + onRunTerminal: (runId, status) => { + updatePlanFromRun(sql, runId, status).catch((err) => { + app.log.error({ err: err instanceof Error ? err.message : String(err), runId }, + 'plans: updatePlanFromRun failed'); + }); + }, + }); // Arena SEAM (a): build the local-model set from the live llama-swap model list. // Both bare IDs ('qwen3.6-35b') and prefixed IDs ('llama-swap/qwen3.6-35b') are @@ -384,6 +394,7 @@ async function main() { registerWorktreeSafetyRoutes(app, sql); registerLifecycleRoutes(app, sql); registerAnalyticsRoutes(app, sql); + registerPlanRoutes(app, sql); registerWebSocket(app, sql, broker); // Graceful shutdown diff --git a/apps/coder/src/routes/plans.ts b/apps/coder/src/routes/plans.ts new file mode 100644 index 0000000..6b998a4 --- /dev/null +++ b/apps/coder/src/routes/plans.ts @@ -0,0 +1,134 @@ +/** + * Boulder state — plan routes. + * + * GET /api/plans?project_id= — list plans for a project + * GET /api/plans/active?project_id= — list active (in-flight) plans + * POST /api/plans — create a new plan + * PATCH /api/plans/:id — update plan progress / status + */ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import type { Sql } from '../db.js'; +import { + createPlan, + getPlan, + listPlans, + listActivePlans, + updatePlan, +} from '../services/plan-store.js'; + +const CreatePlanBody = z.object({ + project_id: z.string().uuid(), + title: z.string().min(1).max(500), + description: z.string().max(10_000).optional(), + flow_run_id: z.string().uuid().optional(), + metadata: z.record(z.unknown()).optional(), +}); + +const ListPlansQuery = z.object({ + project_id: z.string().uuid(), +}); + +const UpdatePlanBody = z.object({ + title: z.string().min(1).max(500).optional(), + description: z.string().max(10_000).nullable().optional(), + status: z.enum(['active', 'completed', 'cancelled', 'failed']).optional(), + progress_pct: z.number().int().min(0).max(100).optional(), + items_total: z.number().int().min(0).optional(), + items_completed: z.number().int().min(0).optional(), + metadata: z.record(z.unknown()).nullable().optional(), +}); + +const PlanIdParam = z.string().uuid(); + +export function registerPlanRoutes(app: FastifyInstance, sql: Sql): void { + // GET /api/plans?project_id= — all plans for a project + app.get('/api/plans', async (req, reply) => { + const parsed = ListPlansQuery.safeParse(req.query); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid query', details: parsed.error.flatten() }; + } + const plans = await listPlans(sql, parsed.data.project_id); + return { plans }; + }); + + // GET /api/plans/active?project_id= — active plans only + app.get('/api/plans/active', async (req, reply) => { + const parsed = ListPlansQuery.safeParse(req.query); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid query', details: parsed.error.flatten() }; + } + const plans = await listActivePlans(sql, parsed.data.project_id); + return { plans }; + }); + + // POST /api/plans — create a new plan + app.post('/api/plans', async (req, reply) => { + const parsed = CreatePlanBody.safeParse(req.body); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid body', details: parsed.error.flatten() }; + } + + const { project_id, title, description, flow_run_id, metadata } = parsed.data; + const plan = await createPlan(sql, { + projectId: project_id, + title, + description, + flowRunId: flow_run_id, + metadata, + }); + + reply.code(201); + return { plan }; + }); + + // GET /api/plans/:id — single plan + app.get<{ Params: { id: string } }>('/api/plans/:id', async (req, reply) => { + const parsedId = PlanIdParam.safeParse(req.params.id); + if (!parsedId.success) { + reply.code(400); + return { error: 'invalid id' }; + } + const plan = await getPlan(sql, parsedId.data); + if (!plan) { + reply.code(404); + return { error: 'plan not found' }; + } + return { plan }; + }); + + // PATCH /api/plans/:id — update plan + app.patch<{ Params: { id: string } }>('/api/plans/:id', async (req, reply) => { + const parsedId = PlanIdParam.safeParse(req.params.id); + if (!parsedId.success) { + reply.code(400); + return { error: 'invalid id' }; + } + + const parsed = UpdatePlanBody.safeParse(req.body); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid body', details: parsed.error.flatten() }; + } + + const { title, description, status, progress_pct, items_total, items_completed, metadata } = parsed.data; + const plan = await updatePlan(sql, parsedId.data, { + title, + description: description === null ? null : description, + status, + progressPct: progress_pct, + itemsTotal: items_total, + itemsCompleted: items_completed, + metadata: metadata === null ? null : metadata, + }); + + if (!plan) { + reply.code(404); + return { error: 'plan not found' }; + } + return { plan }; + }); +} diff --git a/apps/coder/src/schema.sql b/apps/coder/src/schema.sql index 958ed48..6caca52 100644 --- a/apps/coder/src/schema.sql +++ b/apps/coder/src/schema.sql @@ -438,3 +438,31 @@ CREATE TABLE IF NOT EXISTS flow_step_events ( created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp() ); CREATE INDEX IF NOT EXISTS flow_step_events_run_idx ON flow_step_events(run_id); + +-- v2.9.0: Boulder state — cross-session plan persistence with auto-resumption. +-- project_id carries no FK (matches tasks/fow_runs convention). +-- flow_run_id links the plan to an in-flight orchestrator run for auto-tracking. +CREATE TABLE IF NOT EXISTS plans ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + project_id UUID NOT NULL, + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL DEFAULT 'active', + flow_run_id UUID REFERENCES flow_runs(id) ON DELETE SET NULL, + progress_pct INTEGER NOT NULL DEFAULT 0, + items_total INTEGER NOT NULL DEFAULT 0, + items_completed INTEGER NOT NULL DEFAULT 0, + metadata JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(), + CONSTRAINT plans_status_chk CHECK (status IN ('active', 'completed', 'cancelled', 'failed')), + CONSTRAINT plans_progress_chk CHECK (progress_pct >= 0 AND progress_pct <= 100), + CONSTRAINT plans_items_chk CHECK (items_total >= 0 AND items_completed >= 0 AND items_completed <= items_total) +); + +-- Plan queries by project and status. +CREATE INDEX IF NOT EXISTS plans_project_status_idx ON plans(project_id, status); +-- Fast lookup of the plan owning a flow run (for onRunTerminal updates). +CREATE INDEX IF NOT EXISTS plans_flow_run_id_idx ON plans(flow_run_id); +-- Plans sorted by recency (for "resume from last" surface). +CREATE INDEX IF NOT EXISTS plans_project_created_idx ON plans(project_id, created_at DESC); diff --git a/apps/coder/src/services/__tests__/plan-store.test.ts b/apps/coder/src/services/__tests__/plan-store.test.ts new file mode 100644 index 0000000..b7cc51d --- /dev/null +++ b/apps/coder/src/services/__tests__/plan-store.test.ts @@ -0,0 +1,16 @@ +import { describe, it, expect } from 'vitest'; +import { planStatusFromRun } from '../plan-store.js'; + +describe('planStatusFromRun', () => { + it('maps completed to completed', () => { + expect(planStatusFromRun('completed')).toBe('completed'); + }); + + it('maps failed to failed', () => { + expect(planStatusFromRun('failed')).toBe('failed'); + }); + + it('maps cancelled to cancelled', () => { + expect(planStatusFromRun('cancelled')).toBe('cancelled'); + }); +}); diff --git a/apps/coder/src/services/flow-runner.ts b/apps/coder/src/services/flow-runner.ts index 57ef5d0..1347d80 100644 --- a/apps/coder/src/services/flow-runner.ts +++ b/apps/coder/src/services/flow-runner.ts @@ -89,6 +89,8 @@ interface Deps { broker: Broker; log: FastifyBaseLogger; config: Config; + /** Fired when a flow run reaches a terminal state (for plan-store integration). */ + onRunTerminal?: (runId: string, status: 'completed' | 'failed' | 'cancelled') => void; } interface FlowStepRow { @@ -479,6 +481,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { WHERE id = ${runId} AND status = 'running' `; if (updated.count === 0) return; // already terminal (e.g. cancelled) — don't publish + deps.onRunTerminal?.(runId, 'completed'); publishStep(runId, lastAgentStepId(flow, input, model), 'completed', { run_status: 'completed', report, @@ -498,6 +501,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { WHERE id = ${runId} AND status = 'running' `; if (updated.count === 0) return; + deps.onRunTerminal?.(runId, 'failed'); const stepId = failedStepId ?? (flow ? lastAgentStepId(flow, input, model) : 'run'); log.warn({ runId, error }, 'flow-runner: run failed'); await appendStepEvent(sql, runId, stepId, 'failed', { error }); @@ -512,6 +516,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { WHERE id = ${runId} AND status = 'running' `; if (updated.count === 0) return; // idempotent — already terminal + deps.onRunTerminal?.(runId, 'cancelled'); // Any remaining pending steps are unreachable; mark + publish them so the // pane can show them as cancelled rather than stuck in pending. const pending = await sql<{ step_id: string; kind: string }[]>` @@ -742,6 +747,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { WHERE id = ${runId} AND status = 'running' `; if (updated.count === 0) return { cancelled: false, taskIds: [] }; + deps.onRunTerminal?.(runId, 'cancelled'); // Mark all non-terminal steps cancelled and collect in-flight task_ids. const steps = await sql<{ step_id: string; task_id: string | null; kind: string }[]>` diff --git a/apps/coder/src/services/plan-store.ts b/apps/coder/src/services/plan-store.ts new file mode 100644 index 0000000..791edd1 --- /dev/null +++ b/apps/coder/src/services/plan-store.ts @@ -0,0 +1,184 @@ +/** + * Boulder state — cross-session plan persistence for BooCode. + * + * Plans live above flow_runs: a plan tracks a user's work goal and can link to + * a flow run for automatic progress tracking. When the linked flow run reaches + * a terminal state (completed/failed/cancelled), the plan is auto-updated. + * + * Auto-resumption: on startup, plans with a linked in-flight flow_run are + * surfaced via the GET endpoint so the UI can show a resume prompt. The + * flow-runner's initResume() re-advances the actual run; this store surfaces + * the plan-level view. + */ +import type { Sql } from '../db.js'; + +export interface Plan { + id: string; + project_id: string; + title: string; + description: string | null; + status: string; + flow_run_id: string | null; + progress_pct: number; + items_total: number; + items_completed: number; + metadata: Record | null; + created_at: Date; + updated_at: Date; +} + +export interface CreatePlanOpts { + projectId: string; + title: string; + description?: string; + flowRunId?: string; + metadata?: Record; +} + +export interface UpdatePlanOpts { + title?: string; + description?: string | null; + status?: 'active' | 'completed' | 'cancelled' | 'failed'; + progressPct?: number; + itemsTotal?: number; + itemsCompleted?: number; + metadata?: Record | null; +} + +export function createPlan(sql: Sql, opts: CreatePlanOpts): Promise { + return sql` + INSERT INTO plans (project_id, title, description, flow_run_id, metadata) + VALUES ( + ${opts.projectId}, + ${opts.title}, + ${opts.description ?? null}, + ${opts.flowRunId ?? null}, + ${opts.metadata ? sql.json(opts.metadata as never) : null} + ) + RETURNING * + `.then((rows) => rows[0] as unknown as Plan); +} + +export function getPlan(sql: Sql, planId: string): Promise { + return sql` + SELECT * FROM plans WHERE id = ${planId} + `.then((rows) => (rows[0] as unknown as Plan) ?? null); +} + +export function listPlans(sql: Sql, projectId: string): Promise { + return sql` + SELECT * FROM plans + WHERE project_id = ${projectId} + ORDER BY created_at DESC + LIMIT 100 + ` as Promise; +} + +export function listActivePlans(sql: Sql, projectId: string): Promise { + return sql` + SELECT * FROM plans + WHERE project_id = ${projectId} AND status = 'active' + ORDER BY created_at DESC + ` as Promise; +} + +export async function updatePlan( + sql: Sql, + planId: string, + opts: UpdatePlanOpts, +): Promise { + const sets: string[] = []; + const values: unknown[] = []; + + if (opts.title !== undefined) { + sets.push(`title = $${values.length + 1}`); + values.push(opts.title); + } + if (opts.description !== undefined) { + sets.push(`description = $${values.length + 1}`); + values.push(opts.description); + } + if (opts.status !== undefined) { + sets.push(`status = $${values.length + 1}`); + values.push(opts.status); + } + if (opts.progressPct !== undefined) { + sets.push(`progress_pct = $${values.length + 1}`); + values.push(opts.progressPct); + } + if (opts.itemsTotal !== undefined) { + sets.push(`items_total = $${values.length + 1}`); + values.push(opts.itemsTotal); + } + if (opts.itemsCompleted !== undefined) { + sets.push(`items_completed = $${values.length + 1}`); + values.push(opts.itemsCompleted); + } + if (opts.metadata !== undefined) { + sets.push(`metadata = $${values.length + 1}::jsonb`); + values.push(opts.metadata !== null ? JSON.stringify(opts.metadata) : null); + } + + if (sets.length === 0) return getPlan(sql, planId); + + sets.push(`updated_at = clock_timestamp()`); + + const query = ` + UPDATE plans SET ${sets.join(', ')} + WHERE id = $${values.length + 1} + RETURNING * + `; + values.push(planId); + + const result = await sql.unsafe(query, values as never[]); + return (result[0] as unknown as Plan) ?? null; +} + +/** + * Called when a flow run reaches a terminal state. Updates the linked plan's + * status based on the run outcome: + * - completed → plan completed + * - failed → plan failed + * - cancelled → plan cancelled + * Returns true when a plan was updated, false when no plan is linked to the run. + */ +export async function updatePlanFromRun( + sql: Sql, + runId: string, + runStatus: 'completed' | 'failed' | 'cancelled', +): Promise { + const planStatus = planStatusFromRun(runStatus); + const updated = await sql` + UPDATE plans + SET status = ${planStatus}, progress_pct = 100, + items_completed = items_total, updated_at = clock_timestamp() + WHERE flow_run_id = ${runId} AND status = 'active' + `; + return updated.count > 0; +} + +/** Map a flow-run terminal status to its corresponding plan status. Pure. */ +export function planStatusFromRun(runStatus: 'completed' | 'failed' | 'cancelled'): string { + return runStatus === 'completed' ? 'completed' : runStatus; +} + +/** + * Find any active plan linked to a running flow run — used by the startup + * resume path to surface plans that have in-flight orchestrator runs. + */ +export async function findPlanWithRunningRun( + sql: Sql, + projectId: string, +): Promise<(Plan & { run_status: string }) | null> { + const [row] = await sql` + SELECT p.*, fr.status AS run_status + FROM plans p + JOIN flow_runs fr ON fr.id = p.flow_run_id + WHERE p.project_id = ${projectId} + AND p.status = 'active' + AND fr.status = 'running' + ORDER BY p.created_at DESC + LIMIT 1 + `; + return (row as unknown as Plan & { run_status: string }) ?? null; +}