/** * Flow-runner — the orchestrator's DB-backed execution engine (D-2/D-3/D-4). * * Replaces the Phase-1 in-memory wave scheduler (`conductor/src/flow.ts`) with a * scheduler that persists every step to `flow_runs`/`flow_steps` and executes * agent steps by INSERTing a normal `tasks` row that the EXISTING dispatcher * picks up (LISTEN 'tasks_new'). It owns no poll loop — it reacts to the * dispatcher's `onTaskTerminal` hook (`handleTaskTerminal`). The pure scheduling * decisions live in `flow-runner-decisions.ts`; this file is the IO shell. * * Two load-bearing properties: * - READ-ONLY (D-4): every dispatched step task is hardcoded `agent='qwen'`, * `mode_id='plan'`, NEVER user-overridable. The dispatcher's qwen+plan routing * rule forces the PTY path so qwen runs with `--approval-mode plan` — a hard * tool-level gate (reads allowed, writes blocked) inside the agent binary. * - PROMPTS BUILT IN-PROCESS (D-1): each worker prompt is assembled here by * calling the flow's `step.run(ctx)` and prepending the agent persona + the * evidence/YAGNI contracts (the contracts are baked into `step.run`'s output * by the flow definition). The prompt is the task's `input`; no closure is * ever serialized to the DB. * * STREAM ATTRIBUTION. Each agent step gets its own synthetic session + chat. The * step task carries that `session_id`; the one-shot external path reuses the * session's single open chat, so its delta/tool_call/message_complete frames are * keyed by the step's `chat_id` (D-6). A per-step session (not one per run) is * required because `runExternalAgent` picks the session's most-recent open chat — * sharing one session across a parallel wave would misattribute the streams. * * FRAME CHANNEL. Run-level frames (`flow_run_started`, `flow_run_step_updated`) * are published on the user channel (run/sidebar-level, like `session_updated`). * The per-agent token stream rides the existing per-session frames the dispatcher * already emits. (Phase 8 wires the OrchestratorPane's subscription to both.) */ import type { Sql } from '../db.js'; import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/contracts/ws-frames'; import type { FastifyBaseLogger } from 'fastify'; import type { Config } from '../config.js'; import { getFlow } from '../conductor/flows/index.js'; import { loadPersona } from '../conductor/persona-loader.js'; import type { Band, DispatchFn, Flow, FlowInput, Step, StepContext } from '../conductor/types.js'; import { isRunComplete, manifestSteps, partitionReady, readySteps, reconcileRun, type SchedulerState, type StepResumeDecision, } from './flow-runner-decisions.js'; export interface LaunchOpts { projectId: string; flowName: string; band: Band; /** the flow input; MUST carry a non-empty `question` (CHECK input ? 'question') */ input: FlowInput; /** override the run model; defaults to config.DEFAULT_MODEL (qwen 35B) */ model?: string; } export interface FlowRunner { /** Start a run: persist it, build the roster, dispatch the first ready wave. */ launch(opts: LaunchOpts): Promise<{ runId: string }>; /** * Wire this to `createDispatcher({ onTaskTerminal })`. Fires when ANY task * settles; the runner ignores tasks it doesn't own. Never throws. */ handleTaskTerminal(taskId: string, state: string): void; /** * Re-advance every flow_run still marked 'running' on coder startup (D-9). * Idempotent: completed steps are kept, stale-running steps are re-dispatched * (a re-dispatched step already has a 'pending' task that won't be touched * again), completed tasks are surfaced as done. Never throws. */ initResume(): Promise; /** * Cancel a running flow run (Phase 6 stop route). Marks the run and all * non-terminal steps 'cancelled', publishes cancel frames, and returns the * task_ids of any in-flight step tasks so the route can abort them via the * dispatcher's cancelExternalTask. Returns cancelled:false when the run is * not found or already terminal. */ cancel(runId: string): Promise<{ cancelled: boolean; taskIds: string[] }>; } interface Deps { sql: Sql; broker: Broker; log: FastifyBaseLogger; config: Config; } interface FlowStepRow { step_id: string; kind: 'agent' | 'code'; agent: string | null; status: string; chat_id: string | null; output: string | null; } export function createFlowRunner(deps: Deps): FlowRunner { const { sql, broker, log, config } = deps; // Serialize advance() per run so two near-simultaneous terminal callbacks for a // parallel wave can't double-dispatch the next wave or race the DB reads. const advanceChain = new Map>(); // ctx.dispatch sub-tasks (code-review's per-dimension adversarial verify): a // taskId → resolver map. These tasks have NO flow_steps row; handleTaskTerminal // resolves them here instead of advancing a run. const subDispatchWaiters = new Map void>(); function publishUser(frame: Record): void { broker.publishUserFrame('default', frame as unknown as WsFrame); } function humanize(stepId: string): string { return stepId .replace(/[-_]+/g, ' ') .replace(/\b\w/g, (c) => c.toUpperCase()); } function buildCtx( input: FlowInput, results: Record, model: string, dispatch?: DispatchFn, ): StepContext { return { input, results, model, dispatch }; } /** Latest assistant message text for a chat — the FULL worker output (≤50k as * the dispatcher persists it), NOT the ≤500-char tasks.output_summary (C3). */ async function readChatOutput(chatId: string): Promise { const [m] = await sql<{ content: string | null }[]>` SELECT content FROM messages WHERE chat_id = ${chatId} AND role = 'assistant' ORDER BY created_at DESC LIMIT 1 `; return m?.content ?? ''; } async function readTaskOutput(taskId: string): Promise { const [t] = await sql<{ chat_id: string | null }[]>`SELECT chat_id FROM tasks WHERE id = ${taskId}`; return t?.chat_id ? readChatOutput(t.chat_id) : ''; } /** A synthetic session + chat for one agent step's stream (D-6). One session * per step so the one-shot path's single-open-chat pick lands on this chat. */ async function createStepChat( projectId: string, model: string, flowName: string, label: string, ): Promise { const [session] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${projectId}, ${`Flow ${flowName} · ${label}`}, ${model}, 'open') RETURNING id `; const [chat] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${session!.id}, ${label}, 'open') RETURNING id `; return chat!.id; } // ─── launch ────────────────────────────────────────────────────────────────── async function launch(opts: LaunchOpts): Promise<{ runId: string }> { const flow = getFlow(opts.flowName); if (!flow) throw new Error(`unknown flow: ${opts.flowName}`); const model = (opts.model && opts.model.trim()) || config.DEFAULT_MODEL; const input: FlowInput = { ...opts.input, band: opts.band }; if (typeof input.question !== 'string' || !input.question.trim()) { throw new Error('flow input requires a non-empty "question"'); } const [run] = await sql<{ id: string }[]>` INSERT INTO flow_runs (project_id, flow_name, band, model, status, input) VALUES (${opts.projectId}, ${opts.flowName}, ${opts.band}, ${model}, 'running', ${sql.json(input as never)}) RETURNING id `; const runId = run!.id; // Manifest = every step not pre-skipped by its when() guard at launch (band / // repo gating). Agent steps get a synthetic chat now so the roster carries // their chat_id; code steps get a row but no chat (not in the roster frame). const launchCtx = buildCtx(input, {}, model); const manifest = manifestSteps(flow, launchCtx); const frameSteps: Array> = []; for (const step of manifest) { let chatId: string | null = null; if (step.kind === 'agent') { chatId = await createStepChat(opts.projectId, model, opts.flowName, humanize(step.id)); } await sql` INSERT INTO flow_steps (run_id, step_id, kind, agent, status, chat_id) VALUES (${runId}, ${step.id}, ${step.kind}, ${step.agent ?? null}, 'pending', ${chatId}) `; if (step.kind === 'agent' && chatId) { frameSteps.push({ step_id: step.id, agent: step.agent, kind: 'agent', chat_id: chatId, label: humanize(step.id), }); } } publishUser({ type: 'flow_run_started', run_id: runId, flow_name: opts.flowName, band: opts.band, steps: frameSteps, }); void advance(runId); return { runId }; } // ─── advance (serialized per run) ───────────────────────────────────────────── function advance(runId: string): Promise { const prev = advanceChain.get(runId) ?? Promise.resolve(); const next = prev .catch(() => {}) .then(() => advanceInner(runId).catch((err) => { log.error({ err: errMsg(err), runId }, 'flow-runner: advance failed'); })); advanceChain.set(runId, next); void next.finally(() => { if (advanceChain.get(runId) === next) advanceChain.delete(runId); }); return next; } async function advanceInner(runId: string): Promise { const [run] = await sql<{ project_id: string; flow_name: string; model: string; input: FlowInput; status: string }[]>` SELECT project_id, flow_name, model, input, status FROM flow_runs WHERE id = ${runId} `; if (!run) return; // Terminal (incl. 'cancelled' from a stop) → stop advancing. This makes the // runner idempotent against duplicate terminal callbacks and respects a stop. if (run.status !== 'running') return; const flow = getFlow(run.flow_name); if (!flow) { await failRun(runId, flow, run.input, run.model, `unknown flow: ${run.flow_name}`); return; } const input = run.input; const model = run.model; const dispatch: DispatchFn = (agent, task) => dispatchSubAgent(run.project_id, model, agent, task); const rows = await sql` SELECT step_id, kind, agent, status, chat_id, output FROM flow_steps WHERE run_id = ${runId} `; // Re-derive the excluded set (band/when pre-skips) from the flow def + input — // excluded steps never got a row, so a dependent treats them as satisfied. const launchCtx = buildCtx(input, {}, model, dispatch); const manifestIds = new Set(manifestSteps(flow, launchCtx).map((s) => s.id)); const excluded = new Set(flow.steps.filter((s) => !manifestIds.has(s.id)).map((s) => s.id)); const done = new Set(); const skipped = new Set(); const inFlight = new Set(); const results: Record = {}; for (const r of rows) { switch (r.status) { case 'completed': done.add(r.step_id); if (r.output != null) results[r.step_id] = r.output; break; case 'skipped': skipped.add(r.step_id); break; case 'running': inFlight.add(r.step_id); break; case 'failed': // A failed worker makes the deterministic report untrustworthy — fail the // whole run (matches the Phase-1 CLI, which throws on a dispatch failure). await failRun(runId, flow, input, model, `step '${r.step_id}' failed`, r.step_id); return; case 'cancelled': await cancelRun(runId); return; // 'pending' → a candidate for the ready computation below } } // Drain ready skips + code steps (synchronous), re-evaluating after each batch, // then dispatch the full ready agent wave and wait for their terminal callbacks. for (;;) { const state: SchedulerState = { done, skipped, inFlight, excluded }; if (isRunComplete(flow, state)) { await finishRun(runId, flow, input, results, model, dispatch); return; } const ready = readySteps(flow, state); if (ready.length === 0) { if (inFlight.size > 0) return; // agents in flight will re-enter via the hook await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle'); return; } const ctx = buildCtx(input, results, model, dispatch); const { toRun, toSkip } = partitionReady(ready, ctx); if (toSkip.length > 0) { for (const s of toSkip) { await markStep(runId, s.id, 'skipped'); skipped.add(s.id); if (s.kind === 'agent') publishStep(runId, s.id, 'skipped'); } continue; // re-evaluate — a skip can settle a fan-in step's deps } const codeReady = toRun.filter((s) => s.kind === 'code'); if (codeReady.length > 0) { for (const s of codeReady) { let out: string; try { // Code steps run IN-PROCESS (fold / synthesis-fold / code-review verify). // verify uses ctx.dispatch → dispatchSubAgent (read-only qwen workers). out = await s.run(buildCtx(input, results, model, dispatch)); } catch (err) { await failRun(runId, flow, input, model, `code step '${s.id}' threw: ${errMsg(err)}`, s.id); return; } await markStep(runId, s.id, 'completed', out); results[s.id] = out; done.add(s.id); } continue; // re-evaluate — code output can unblock the next wave } // Approval gate steps: pause and wait for human decision. const approvalReady = toRun.filter((s) => s.kind === 'approval'); if (approvalReady.length > 0) { for (const s of approvalReady) { await sql` UPDATE flow_steps SET status = 'blocked', updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${s.id} `; await appendStepEvent(sql, runId, s.id, 'paused', { reason: 'awaiting approval' }); publishStep(runId, s.id, 'blocked'); } return; } // Only agent steps remain ready → dispatch the whole parallel wave, then wait. for (const s of toRun) { await dispatchAgentStep(runId, run.project_id, model, s, ctx); inFlight.add(s.id); publishStep(runId, s.id, 'running'); } return; } } // ─── step execution ─────────────────────────────────────────────────────────── async function dispatchAgentStep( runId: string, projectId: string, model: string, step: Step, ctx: StepContext, ): Promise { const [row] = await sql<{ chat_id: string | null }[]>` SELECT chat_id FROM flow_steps WHERE run_id = ${runId} AND step_id = ${step.id} `; const chatId = row?.chat_id ?? null; const [chat] = chatId ? await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}` : []; const sessionId = chat?.session_id ?? null; // Build the worker prompt IN-PROCESS (D-1): persona + (task + contracts). The // flow's step.run already bakes in the evidence/YAGNI contracts. const persona = step.agent ? await loadPersona(step.agent) : ''; const taskPrompt = await step.run(ctx); const resolvedPrompt = resolveVariables(taskPrompt, ctx.results); const fullPrompt = persona ? `${persona}\n\n---\n\n${resolvedPrompt}` : resolvedPrompt; // READ-ONLY (D-4): agent='qwen', mode_id='plan' are hardcoded, never // user-overridable. The dispatcher's qwen+plan rule forces the PTY hard gate. const [task] = await sql<{ id: string }[]>` INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id) VALUES (${projectId}, ${fullPrompt}, 'qwen', ${model}, 'plan', ${sessionId}, ${chatId}) RETURNING id `; await sql` UPDATE flow_steps SET task_id = ${task!.id}, status = 'running', input = ${fullPrompt}, updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${step.id} `; await appendStepEvent(sql, runId, step.id, 'started', { taskId: task!.id }); } /** * ctx.dispatch for code steps: dispatch a read-only qwen worker under a Han * persona and resolve with its full output. The sub-task has no flow_steps row; * handleTaskTerminal resolves the waiter rather than advancing a run. */ function dispatchSubAgent( projectId: string, model: string, persona: string, task: string, ): Promise { return (async () => { const personaText = await loadPersona(persona).catch(() => ''); const prompt = personaText ? `${personaText}\n\n---\n\n${task}` : task; const chatId = await createStepChat(projectId, model, 'sub', humanize(persona)); const [chat] = await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}`; const [t] = await sql<{ id: string }[]>` INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id) VALUES (${projectId}, ${prompt}, 'qwen', ${model}, 'plan', ${chat?.session_id ?? null}, ${chatId}) RETURNING id `; return await new Promise((resolve) => { subDispatchWaiters.set(t!.id, resolve); }); })(); } async function markStep( runId: string, stepId: string, status: 'completed' | 'skipped', output?: string, ): Promise { if (output !== undefined) { await sql` UPDATE flow_steps SET status = ${status}, output = ${output}, updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${stepId} `; } else { await sql` UPDATE flow_steps SET status = ${status}, updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${stepId} `; } await appendStepEvent(sql, runId, stepId, status, output ? { outputLength: output.length } : undefined); } // ─── run completion ───────────────────────────────────────────────────────── async function finishRun( runId: string, flow: Flow, input: FlowInput, results: Record, model: string, dispatch: DispatchFn, ): Promise { let report: string; try { report = flow.render(buildCtx(input, results, model, dispatch)); } catch (err) { await failRun(runId, flow, input, model, `report render threw: ${errMsg(err)}`); return; } const updated = await sql` UPDATE flow_runs SET status = 'completed', report = ${report}, updated_at = clock_timestamp() WHERE id = ${runId} AND status = 'running' `; if (updated.count === 0) return; // already terminal (e.g. cancelled) — don't publish publishStep(runId, lastAgentStepId(flow, input, model), 'completed', { run_status: 'completed', report, }); } async function failRun( runId: string, flow: Flow | undefined, input: FlowInput, model: string, error: string, failedStepId?: string, ): Promise { const updated = await sql` UPDATE flow_runs SET status = 'failed', error = ${error}, updated_at = clock_timestamp() WHERE id = ${runId} AND status = 'running' `; if (updated.count === 0) return; const stepId = failedStepId ?? (flow ? lastAgentStepId(flow, input, model) : 'run'); log.warn({ runId, error }, 'flow-runner: run failed'); await appendStepEvent(sql, runId, stepId, 'failed', { error }); publishStep(runId, stepId, 'failed', { run_status: 'failed' }); } async function cancelRun(runId: string): Promise { // A step was cancelled (e.g. PTY killed) while the run was still 'running' — // mark the run cancelled and publish frames for any remaining pending steps. const updated = await sql` UPDATE flow_runs SET status = 'cancelled', updated_at = clock_timestamp() WHERE id = ${runId} AND status = 'running' `; if (updated.count === 0) return; // idempotent — already terminal // Any remaining pending steps are unreachable; mark + publish them so the // pane can show them as cancelled rather than stuck in pending. const pending = await sql<{ step_id: string; kind: string }[]>` SELECT step_id, kind FROM flow_steps WHERE run_id = ${runId} AND status IN ('pending', 'running') `; if (pending.length > 0) { await sql` UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp() WHERE run_id = ${runId} AND status IN ('pending', 'running') `; for (const s of pending) { if (s.kind === 'agent') publishStep(runId, s.step_id, 'cancelled', { run_status: 'cancelled' }); } } log.info({ runId }, 'flow-runner: run cancelled'); } /** The terminal agent step in roster order — a valid roster step_id to carry the * run-level completion/failure status (the pane reads run_status/report off it). */ function lastAgentStepId(flow: Flow, input: FlowInput, model: string): string { const agents = manifestSteps(flow, buildCtx(input, {}, model)).filter((s) => s.kind === 'agent'); return agents.length ? agents[agents.length - 1]!.id : (flow.steps[0]?.id ?? 'run'); } function publishStep( runId: string, stepId: string, status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled' | 'blocked', extra?: { run_status?: 'running' | 'completed' | 'failed' | 'cancelled'; report?: string }, ): void { publishUser({ type: 'flow_run_step_updated', run_id: runId, step_id: stepId, status, ...(extra ?? {}), }); } // ─── terminal callback (wired to createDispatcher.onTaskTerminal) ───────────── function handleTaskTerminal(taskId: string, state: string): void { void (async () => { // 1. A ctx.dispatch sub-task → resolve its waiter with the full output. const waiter = subDispatchWaiters.get(taskId); if (waiter) { subDispatchWaiters.delete(taskId); const out = await readTaskOutput(taskId).catch(() => ''); waiter(out); return; } // 2. A flow step task → mark the step, then advance its run. const [step] = await sql<{ run_id: string; step_id: string; chat_id: string | null; status: string }[]>` SELECT run_id, step_id, chat_id, status FROM flow_steps WHERE task_id = ${taskId} `; if (!step) return; // not a flow task if (step.status !== 'running') return; // idempotent against duplicate callbacks if (state === 'completed') { const output = step.chat_id ? await readChatOutput(step.chat_id).catch(() => '') : ''; await sql` UPDATE flow_steps SET status = 'completed', output = ${output}, updated_at = clock_timestamp() WHERE run_id = ${step.run_id} AND step_id = ${step.step_id} AND status = 'running' `; publishStep(step.run_id, step.step_id, 'completed'); } else { const stepStatus = state === 'cancelled' ? 'cancelled' : 'failed'; await sql` UPDATE flow_steps SET status = ${stepStatus}, updated_at = clock_timestamp() WHERE run_id = ${step.run_id} AND step_id = ${step.step_id} AND status = 'running' `; if (stepStatus === 'failed') { publishStep(step.run_id, step.step_id, 'failed'); } else { // A cancelled step always triggers run cancellation (advanceInner path). // Include run_status: 'cancelled' here so single-step flows get a // complete run-level cancel frame before cancelRun runs. publishStep(step.run_id, step.step_id, 'cancelled', { run_status: 'cancelled' }); } } await advance(step.run_id); })().catch((err) => { log.error({ err: errMsg(err), taskId }, 'flow-runner: handleTaskTerminal failed'); }); } // ─── startup resume (D-9) ───────────────────────────────────────────────────── /** * Apply one step's resume decision to the DB, then return (the caller drives the * loop). Re-dispatch reuses the prompt already stored in flow_steps.input (built * in-process at launch per D-1) so the flow def doesn't need to be reloaded and * ctx.dispatch closures don't re-execute. */ async function applyResumeDecision( runId: string, projectId: string, model: string, step: { step_id: string; task_id: string | null; chat_id: string | null; input: string | null }, decision: StepResumeDecision, ): Promise { switch (decision.action) { case 'keep': break; case 'mark-done': { const output = step.chat_id ? await readChatOutput(step.chat_id).catch(() => '') : ''; await sql` UPDATE flow_steps SET status = 'completed', output = ${output}, updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${step.step_id} `; break; } case 'mark-failed': await sql` UPDATE flow_steps SET status = 'failed', updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${step.step_id} `; break; case 'mark-cancelled': await sql` UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${step.step_id} `; break; case 're-dispatch': { // No stored prompt means we can't rebuild the task input (the step never // reached the dispatch stage). Fail it so advance() fails the run cleanly. if (!step.input) { await sql` UPDATE flow_steps SET status = 'failed', error = 're-dispatch: no stored prompt', updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${step.step_id} `; break; } // Reuse stored chat / session so the stream still rides the pre-created // synthetic chat (D-6). READ-ONLY (D-4): agent='qwen', mode_id='plan'. const chatId = step.chat_id; const [chat] = chatId ? await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}` : []; const sessionId = chat?.session_id ?? null; const [task] = await sql<{ id: string }[]>` INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id) VALUES (${projectId}, ${step.input}, 'qwen', ${model}, 'plan', ${sessionId}, ${chatId}) RETURNING id `; await sql` UPDATE flow_steps SET task_id = ${task!.id}, updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${step.step_id} `; log.info({ runId, stepId: step.step_id, taskId: task!.id }, 'flow-runner: step re-dispatched on resume'); break; } } } async function resumeRun(run: { id: string; project_id: string; model: string; }): Promise { const rows = await sql<{ step_id: string; task_id: string | null; status: string; chat_id: string | null; input: string | null; }[]>`SELECT step_id, task_id, status, chat_id, input FROM flow_steps WHERE run_id = ${run.id}`; // Load task states for all referenced tasks in one query. const taskIds = rows.map((r) => r.task_id).filter((id): id is string => id !== null); const taskStates = new Map(); if (taskIds.length > 0) { const tasks = await sql<{ id: string; state: string }[]>` SELECT id, state FROM tasks WHERE id = ANY(${taskIds}) `; for (const t of tasks) taskStates.set(t.id, t.state); } const decisions = reconcileRun( rows.map((r) => ({ stepId: r.step_id, taskId: r.task_id, status: r.status })), taskStates, ); for (const decision of decisions) { if (decision.action === 'keep') continue; const row = rows.find((r) => r.step_id === decision.stepId)!; await applyResumeDecision(run.id, run.project_id, run.model, row, decision); } // advance() re-derives the ready wave and dispatches it; new re-dispatched tasks // are already 'pending' and will be picked up by the dispatcher's startup poll. await advance(run.id); log.info({ runId: run.id }, 'flow-runner: run resumed'); } async function initResume(): Promise { const runs = await sql<{ id: string; project_id: string; model: string }[]>` SELECT id, project_id, model FROM flow_runs WHERE status = 'running' `; if (runs.length === 0) return; log.info({ count: runs.length }, 'flow-runner: resuming in-flight runs on startup'); for (const run of runs) { await resumeRun(run).catch((err) => { log.error({ err: errMsg(err), runId: run.id }, 'flow-runner: initResume failed for run'); }); } } // ─── cancel (Phase 6 stop route) ───────────────────────────────────────────── async function cancel(runId: string): Promise<{ cancelled: boolean; taskIds: string[] }> { const updated = await sql` UPDATE flow_runs SET status = 'cancelled', updated_at = clock_timestamp() WHERE id = ${runId} AND status = 'running' `; if (updated.count === 0) return { cancelled: false, taskIds: [] }; // Mark all non-terminal steps cancelled and collect in-flight task_ids. const steps = await sql<{ step_id: string; task_id: string | null; kind: string }[]>` SELECT step_id, task_id, kind FROM flow_steps WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped') `; if (steps.length > 0) { await sql` UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp() WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped') `; for (const s of steps) { if (s.kind === 'agent') publishStep(runId, s.step_id, 'cancelled', { run_status: 'cancelled' }); } } else { // Run was 'running' but no non-terminal steps — edge case (advance in progress). // Re-publish the last agent step with run_status: 'cancelled' so the pane updates. const [last] = await sql<{ step_id: string }[]>` SELECT step_id FROM flow_steps WHERE run_id = ${runId} AND kind = 'agent' ORDER BY created_at DESC LIMIT 1 `; if (last) publishStep(runId, last.step_id, 'cancelled', { run_status: 'cancelled' }); } const taskIds = steps .filter((s): s is typeof s & { task_id: string } => s.task_id !== null) .map((s) => s.task_id); log.info({ runId }, 'flow-runner: run cancelled by request'); return { cancelled: true, taskIds }; } return { launch, handleTaskTerminal, initResume, cancel }; } function errMsg(e: unknown): string { return e instanceof Error ? e.message : String(e); } // ─── Event log ─────────────────────────────────────────────────────────────── async function appendStepEvent( sql: Sql, runId: string, stepId: string, event: string, payload?: Record, ): Promise { await sql` INSERT INTO flow_step_events (run_id, step_id, event, payload) VALUES (${runId}, ${stepId}, ${event}, ${payload ? sql.json(payload as never) : null}) `; } // ─── Variable substitution ─────────────────────────────────────────────────── const VAR_PATTERN = /\$(\w+)\.output(?:\.(\w+(?:\.\w+)*))?/g; export function resolveVariables(prompt: string, results: Record): string { return prompt.replace(VAR_PATTERN, (match, stepId, fieldPath) => { const output = results[stepId]; if (!output) return match; if (!fieldPath) return output; try { const lines = output.split('\n'); for (const line of lines) { const parsed = line.match(new RegExp(`^${fieldPath}:\\s*(.+)$`, 'i')); if (parsed) return parsed[1]!.trim(); } } catch { // fall through } return match; }); }