/** * Phase 6 — Orchestrator run routes. * * POST /api/runs — launch a flow run (validated, calls flow-runner) * GET /api/runs?project_id= — runs history for the NewPaneMenu surface * GET /api/runs/:id — run + steps + report (reopen a pane) * POST /api/runs/:id/cancel — mark run + steps cancelled, abort in-flight tasks */ import type { FastifyInstance } from 'fastify'; import { z } from 'zod'; import type { Sql } from '../db.js'; import type { FlowRunner } from '../services/flow-runner.js'; import type { ExternalCancelFn } from './tasks.js'; import { FLOW_NAMES } from '../conductor/flows/index.js'; const CreateRunBody = z.object({ project_id: z.string().uuid(), flow_name: z.string().min(1).max(100), band: z.enum(['small', 'medium', 'large']), input: z.object({ question: z.string().min(1).max(64_000), }).passthrough(), model: z.string().max(200).optional(), }); const ListRunsQuery = z.object({ project_id: z.string().uuid(), }); const RunIdParam = z.string().uuid(); export function registerRunsRoutes( app: FastifyInstance, sql: Sql, flowRunner: FlowRunner, cancelExternal: ExternalCancelFn, ): void { // POST /api/runs — launch a flow run app.post('/api/runs', async (req, reply) => { const parsed = CreateRunBody.safeParse(req.body); if (!parsed.success) { reply.code(400); return { error: 'invalid body', details: parsed.error.flatten() }; } const { project_id, flow_name, band, input, model } = parsed.data; if (!FLOW_NAMES.includes(flow_name)) { reply.code(422); return { error: 'unknown_flow', message: `unknown flow: ${flow_name}`, known_flows: FLOW_NAMES }; } const { runId } = await flowRunner.launch({ projectId: project_id, flowName: flow_name, band, input, model }); reply.code(201); return { run_id: runId }; }); // GET /api/runs?project_id= — runs history, most-recent-first app.get('/api/runs', async (req, reply) => { const parsed = ListRunsQuery.safeParse(req.query); if (!parsed.success) { reply.code(400); return { error: 'invalid query', details: parsed.error.flatten() }; } const runs = await sql` SELECT id, project_id, flow_name, band, model, status, input, report, error, created_at, updated_at FROM flow_runs WHERE project_id = ${parsed.data.project_id} ORDER BY created_at DESC LIMIT 100 `; return { runs }; }); // GET /api/runs/:id — single run + its steps + report (reopen) app.get<{ Params: { id: string } }>('/api/runs/:id', async (req, reply) => { const parsedId = RunIdParam.safeParse(req.params.id); if (!parsedId.success) { reply.code(400); return { error: 'invalid id' }; } const id = parsedId.data; const [run] = await sql<{ id: string; project_id: string; flow_name: string; band: string; model: string; status: string; input: unknown; report: string | null; error: string | null; created_at: unknown; updated_at: unknown; }[]>` SELECT id, project_id, flow_name, band, model, status, input, report, error, created_at, updated_at FROM flow_runs WHERE id = ${id} `; if (!run) { reply.code(404); return { error: 'run not found' }; } const steps = await sql` SELECT fs.id, fs.run_id, fs.step_id, fs.kind, fs.agent, fs.status, fs.task_id, fs.chat_id, fs.input, fs.output, fs.error, fs.created_at, fs.updated_at, c.session_id FROM flow_steps fs LEFT JOIN chats c ON c.id = fs.chat_id WHERE fs.run_id = ${id} ORDER BY fs.created_at `; return { run, steps }; }); // POST /api/runs/:id/cancel — cancel a running flow run app.post<{ Params: { id: string } }>('/api/runs/:id/cancel', async (req, reply) => { const parsedId = RunIdParam.safeParse(req.params.id); if (!parsedId.success) { reply.code(400); return { error: 'invalid id' }; } const id = parsedId.data; // Verify the run exists const [row] = await sql<{ id: string; status: string }[]>` SELECT id, status FROM flow_runs WHERE id = ${id} `; if (!row) { reply.code(404); return { error: 'run not found' }; } if (row.status !== 'running') { reply.code(409); return { error: `cannot cancel run in status '${row.status}'` }; } // Cancel via flow-runner: marks run + steps cancelled, publishes frames, // returns task_ids of any in-flight step tasks. const { cancelled, taskIds } = await flowRunner.cancel(id); if (!cancelled) { // Race: another path (e.g. natural completion) settled the run first. reply.code(409); return { error: 'run is no longer running' }; } // Abort any in-flight dispatcher tasks so qwen exits promptly. for (const taskId of taskIds) { cancelExternal(taskId); } return { cancelled: true }; }); }