Files
boocode/apps/coder/src/services/flow-runner.ts
indifferentketchup 1937af8df9 feat: in-app Orchestrator (Phase 2) — multi-agent conductor
Brings the deterministic Han-flow conductor into BooCode: launch any read-only
flow from BooChat or BooCoder, watch each agent stream live in a Paseo-style
run pane, get an evidence-disciplined report — on local Qwen, persisted and
resumable. Read-only enforced hard via qwen --approval-mode plan (orchestrator
tasks fail closed if qwen is unavailable; never fall to write-capable native).

Backend (apps/coder): re-homed conductor defs, flow_runs/flow_steps schema,
flow-runner + dispatcher onTaskTerminal hook, restart-resume, runs routes
(launch/list/get/cancel), user-channel WS. Contracts: two flow_run_* frames.
Web: orchestrator pane kind + OrchestratorPane, Workflow button + slash flows
(BooChat/BooCoder parity), FlowLauncherDialog, "New Orchestrator" in the + and
split menus, runs history + export. Plan: openspec/changes/orchestrator.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 15:22:48 +00:00

766 lines
30 KiB
TypeScript

/**
* Flow-runner — the orchestrator's DB-backed execution engine (D-2/D-3/D-4).
*
* Replaces the Phase-1 in-memory wave scheduler (`conductor/src/flow.ts`) with a
* scheduler that persists every step to `flow_runs`/`flow_steps` and executes
* agent steps by INSERTing a normal `tasks` row that the EXISTING dispatcher
* picks up (LISTEN 'tasks_new'). It owns no poll loop — it reacts to the
* dispatcher's `onTaskTerminal` hook (`handleTaskTerminal`). The pure scheduling
* decisions live in `flow-runner-decisions.ts`; this file is the IO shell.
*
* Two load-bearing properties:
* - READ-ONLY (D-4): every dispatched step task is hardcoded `agent='qwen'`,
* `mode_id='plan'`, NEVER user-overridable. The dispatcher's qwen+plan routing
* rule forces the PTY path so qwen runs with `--approval-mode plan` — a hard
* tool-level gate (reads allowed, writes blocked) inside the agent binary.
* - PROMPTS BUILT IN-PROCESS (D-1): each worker prompt is assembled here by
* calling the flow's `step.run(ctx)` and prepending the agent persona + the
* evidence/YAGNI contracts (the contracts are baked into `step.run`'s output
* by the flow definition). The prompt is the task's `input`; no closure is
* ever serialized to the DB.
*
* STREAM ATTRIBUTION. Each agent step gets its own synthetic session + chat. The
* step task carries that `session_id`; the one-shot external path reuses the
* session's single open chat, so its delta/tool_call/message_complete frames are
* keyed by the step's `chat_id` (D-6). A per-step session (not one per run) is
* required because `runExternalAgent` picks the session's most-recent open chat —
* sharing one session across a parallel wave would misattribute the streams.
*
* FRAME CHANNEL. Run-level frames (`flow_run_started`, `flow_run_step_updated`)
* are published on the user channel (run/sidebar-level, like `session_updated`).
* The per-agent token stream rides the existing per-session frames the dispatcher
* already emits. (Phase 8 wires the OrchestratorPane's subscription to both.)
*/
import type { Sql } from '../db.js';
import type { Broker } from '@boocode/server/broker';
import type { WsFrame } from '@boocode/contracts/ws-frames';
import type { FastifyBaseLogger } from 'fastify';
import type { Config } from '../config.js';
import { getFlow } from '../conductor/flows/index.js';
import { loadPersona } from '../conductor/persona-loader.js';
import type { Band, DispatchFn, Flow, FlowInput, Step, StepContext } from '../conductor/types.js';
import {
isRunComplete,
manifestSteps,
partitionReady,
readySteps,
reconcileRun,
type SchedulerState,
type StepResumeDecision,
} from './flow-runner-decisions.js';
export interface LaunchOpts {
projectId: string;
flowName: string;
band: Band;
/** the flow input; MUST carry a non-empty `question` (CHECK input ? 'question') */
input: FlowInput;
/** override the run model; defaults to config.DEFAULT_MODEL (qwen 35B) */
model?: string;
}
export interface FlowRunner {
/** Start a run: persist it, build the roster, dispatch the first ready wave. */
launch(opts: LaunchOpts): Promise<{ runId: string }>;
/**
* Wire this to `createDispatcher({ onTaskTerminal })`. Fires when ANY task
* settles; the runner ignores tasks it doesn't own. Never throws.
*/
handleTaskTerminal(taskId: string, state: string): void;
/**
* Re-advance every flow_run still marked 'running' on coder startup (D-9).
* Idempotent: completed steps are kept, stale-running steps are re-dispatched
* (a re-dispatched step already has a 'pending' task that won't be touched
* again), completed tasks are surfaced as done. Never throws.
*/
initResume(): Promise<void>;
/**
* Cancel a running flow run (Phase 6 stop route). Marks the run and all
* non-terminal steps 'cancelled', publishes cancel frames, and returns the
* task_ids of any in-flight step tasks so the route can abort them via the
* dispatcher's cancelExternalTask. Returns cancelled:false when the run is
* not found or already terminal.
*/
cancel(runId: string): Promise<{ cancelled: boolean; taskIds: string[] }>;
}
interface Deps {
sql: Sql;
broker: Broker;
log: FastifyBaseLogger;
config: Config;
}
interface FlowStepRow {
step_id: string;
kind: 'agent' | 'code';
agent: string | null;
status: string;
chat_id: string | null;
output: string | null;
}
export function createFlowRunner(deps: Deps): FlowRunner {
const { sql, broker, log, config } = deps;
// Serialize advance() per run so two near-simultaneous terminal callbacks for a
// parallel wave can't double-dispatch the next wave or race the DB reads.
const advanceChain = new Map<string, Promise<void>>();
// ctx.dispatch sub-tasks (code-review's per-dimension adversarial verify): a
// taskId → resolver map. These tasks have NO flow_steps row; handleTaskTerminal
// resolves them here instead of advancing a run.
const subDispatchWaiters = new Map<string, (output: string) => void>();
function publishUser(frame: Record<string, unknown>): void {
broker.publishUserFrame('default', frame as unknown as WsFrame);
}
function humanize(stepId: string): string {
return stepId
.replace(/[-_]+/g, ' ')
.replace(/\b\w/g, (c) => c.toUpperCase());
}
function buildCtx(
input: FlowInput,
results: Record<string, string>,
model: string,
dispatch?: DispatchFn,
): StepContext {
return { input, results, model, dispatch };
}
/** Latest assistant message text for a chat — the FULL worker output (≤50k as
* the dispatcher persists it), NOT the ≤500-char tasks.output_summary (C3). */
async function readChatOutput(chatId: string): Promise<string> {
const [m] = await sql<{ content: string | null }[]>`
SELECT content FROM messages
WHERE chat_id = ${chatId} AND role = 'assistant'
ORDER BY created_at DESC LIMIT 1
`;
return m?.content ?? '';
}
async function readTaskOutput(taskId: string): Promise<string> {
const [t] = await sql<{ chat_id: string | null }[]>`SELECT chat_id FROM tasks WHERE id = ${taskId}`;
return t?.chat_id ? readChatOutput(t.chat_id) : '';
}
/** A synthetic session + chat for one agent step's stream (D-6). One session
* per step so the one-shot path's single-open-chat pick lands on this chat. */
async function createStepChat(
projectId: string,
model: string,
flowName: string,
label: string,
): Promise<string> {
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${projectId}, ${`Flow ${flowName} · ${label}`}, ${model}, 'open')
RETURNING id
`;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${session!.id}, ${label}, 'open')
RETURNING id
`;
return chat!.id;
}
// ─── launch ──────────────────────────────────────────────────────────────────
async function launch(opts: LaunchOpts): Promise<{ runId: string }> {
const flow = getFlow(opts.flowName);
if (!flow) throw new Error(`unknown flow: ${opts.flowName}`);
const model = (opts.model && opts.model.trim()) || config.DEFAULT_MODEL;
const input: FlowInput = { ...opts.input, band: opts.band };
if (typeof input.question !== 'string' || !input.question.trim()) {
throw new Error('flow input requires a non-empty "question"');
}
const [run] = await sql<{ id: string }[]>`
INSERT INTO flow_runs (project_id, flow_name, band, model, status, input)
VALUES (${opts.projectId}, ${opts.flowName}, ${opts.band}, ${model}, 'running', ${sql.json(input as never)})
RETURNING id
`;
const runId = run!.id;
// Manifest = every step not pre-skipped by its when() guard at launch (band /
// repo gating). Agent steps get a synthetic chat now so the roster carries
// their chat_id; code steps get a row but no chat (not in the roster frame).
const launchCtx = buildCtx(input, {}, model);
const manifest = manifestSteps(flow, launchCtx);
const frameSteps: Array<Record<string, unknown>> = [];
for (const step of manifest) {
let chatId: string | null = null;
if (step.kind === 'agent') {
chatId = await createStepChat(opts.projectId, model, opts.flowName, humanize(step.id));
}
await sql`
INSERT INTO flow_steps (run_id, step_id, kind, agent, status, chat_id)
VALUES (${runId}, ${step.id}, ${step.kind}, ${step.agent ?? null}, 'pending', ${chatId})
`;
if (step.kind === 'agent' && chatId) {
frameSteps.push({
step_id: step.id,
agent: step.agent,
kind: 'agent',
chat_id: chatId,
label: humanize(step.id),
});
}
}
publishUser({
type: 'flow_run_started',
run_id: runId,
flow_name: opts.flowName,
band: opts.band,
steps: frameSteps,
});
void advance(runId);
return { runId };
}
// ─── advance (serialized per run) ─────────────────────────────────────────────
function advance(runId: string): Promise<void> {
const prev = advanceChain.get(runId) ?? Promise.resolve();
const next = prev
.catch(() => {})
.then(() => advanceInner(runId).catch((err) => {
log.error({ err: errMsg(err), runId }, 'flow-runner: advance failed');
}));
advanceChain.set(runId, next);
void next.finally(() => {
if (advanceChain.get(runId) === next) advanceChain.delete(runId);
});
return next;
}
async function advanceInner(runId: string): Promise<void> {
const [run] = await sql<{ project_id: string; flow_name: string; model: string; input: FlowInput; status: string }[]>`
SELECT project_id, flow_name, model, input, status FROM flow_runs WHERE id = ${runId}
`;
if (!run) return;
// Terminal (incl. 'cancelled' from a stop) → stop advancing. This makes the
// runner idempotent against duplicate terminal callbacks and respects a stop.
if (run.status !== 'running') return;
const flow = getFlow(run.flow_name);
if (!flow) {
await failRun(runId, flow, run.input, run.model, `unknown flow: ${run.flow_name}`);
return;
}
const input = run.input;
const model = run.model;
const dispatch: DispatchFn = (agent, task) => dispatchSubAgent(run.project_id, model, agent, task);
const rows = await sql<FlowStepRow[]>`
SELECT step_id, kind, agent, status, chat_id, output FROM flow_steps WHERE run_id = ${runId}
`;
// Re-derive the excluded set (band/when pre-skips) from the flow def + input —
// excluded steps never got a row, so a dependent treats them as satisfied.
const launchCtx = buildCtx(input, {}, model, dispatch);
const manifestIds = new Set(manifestSteps(flow, launchCtx).map((s) => s.id));
const excluded = new Set(flow.steps.filter((s) => !manifestIds.has(s.id)).map((s) => s.id));
const done = new Set<string>();
const skipped = new Set<string>();
const inFlight = new Set<string>();
const results: Record<string, string> = {};
for (const r of rows) {
switch (r.status) {
case 'completed':
done.add(r.step_id);
if (r.output != null) results[r.step_id] = r.output;
break;
case 'skipped':
skipped.add(r.step_id);
break;
case 'running':
inFlight.add(r.step_id);
break;
case 'failed':
// A failed worker makes the deterministic report untrustworthy — fail the
// whole run (matches the Phase-1 CLI, which throws on a dispatch failure).
await failRun(runId, flow, input, model, `step '${r.step_id}' failed`, r.step_id);
return;
case 'cancelled':
await cancelRun(runId);
return;
// 'pending' → a candidate for the ready computation below
}
}
// Drain ready skips + code steps (synchronous), re-evaluating after each batch,
// then dispatch the full ready agent wave and wait for their terminal callbacks.
for (;;) {
const state: SchedulerState = { done, skipped, inFlight, excluded };
if (isRunComplete(flow, state)) {
await finishRun(runId, flow, input, results, model, dispatch);
return;
}
const ready = readySteps(flow, state);
if (ready.length === 0) {
if (inFlight.size > 0) return; // agents in flight will re-enter via the hook
await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle');
return;
}
const ctx = buildCtx(input, results, model, dispatch);
const { toRun, toSkip } = partitionReady(ready, ctx);
if (toSkip.length > 0) {
for (const s of toSkip) {
await markStep(runId, s.id, 'skipped');
skipped.add(s.id);
if (s.kind === 'agent') publishStep(runId, s.id, 'skipped');
}
continue; // re-evaluate — a skip can settle a fan-in step's deps
}
const codeReady = toRun.filter((s) => s.kind === 'code');
if (codeReady.length > 0) {
for (const s of codeReady) {
let out: string;
try {
// Code steps run IN-PROCESS (fold / synthesis-fold / code-review verify).
// verify uses ctx.dispatch → dispatchSubAgent (read-only qwen workers).
out = await s.run(buildCtx(input, results, model, dispatch));
} catch (err) {
await failRun(runId, flow, input, model, `code step '${s.id}' threw: ${errMsg(err)}`, s.id);
return;
}
await markStep(runId, s.id, 'completed', out);
results[s.id] = out;
done.add(s.id);
}
continue; // re-evaluate — code output can unblock the next wave
}
// Only agent steps remain ready → dispatch the whole parallel wave, then wait.
for (const s of toRun) {
await dispatchAgentStep(runId, run.project_id, model, s, ctx);
inFlight.add(s.id);
publishStep(runId, s.id, 'running');
}
return;
}
}
// ─── step execution ───────────────────────────────────────────────────────────
async function dispatchAgentStep(
runId: string,
projectId: string,
model: string,
step: Step,
ctx: StepContext,
): Promise<void> {
const [row] = await sql<{ chat_id: string | null }[]>`
SELECT chat_id FROM flow_steps WHERE run_id = ${runId} AND step_id = ${step.id}
`;
const chatId = row?.chat_id ?? null;
const [chat] = chatId
? await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}`
: [];
const sessionId = chat?.session_id ?? null;
// Build the worker prompt IN-PROCESS (D-1): persona + (task + contracts). The
// flow's step.run already bakes in the evidence/YAGNI contracts.
const persona = step.agent ? await loadPersona(step.agent) : '';
const taskPrompt = await step.run(ctx);
const fullPrompt = persona ? `${persona}\n\n---\n\n${taskPrompt}` : taskPrompt;
// READ-ONLY (D-4): agent='qwen', mode_id='plan' are hardcoded, never
// user-overridable. The dispatcher's qwen+plan rule forces the PTY hard gate.
const [task] = await sql<{ id: string }[]>`
INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id)
VALUES (${projectId}, ${fullPrompt}, 'qwen', ${model}, 'plan', ${sessionId}, ${chatId})
RETURNING id
`;
await sql`
UPDATE flow_steps
SET task_id = ${task!.id}, status = 'running', input = ${fullPrompt}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.id}
`;
}
/**
* ctx.dispatch for code steps: dispatch a read-only qwen worker under a Han
* persona and resolve with its full output. The sub-task has no flow_steps row;
* handleTaskTerminal resolves the waiter rather than advancing a run.
*/
function dispatchSubAgent(
projectId: string,
model: string,
persona: string,
task: string,
): Promise<string> {
return (async () => {
const personaText = await loadPersona(persona).catch(() => '');
const prompt = personaText ? `${personaText}\n\n---\n\n${task}` : task;
const chatId = await createStepChat(projectId, model, 'sub', humanize(persona));
const [chat] = await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}`;
const [t] = await sql<{ id: string }[]>`
INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id)
VALUES (${projectId}, ${prompt}, 'qwen', ${model}, 'plan', ${chat?.session_id ?? null}, ${chatId})
RETURNING id
`;
return await new Promise<string>((resolve) => {
subDispatchWaiters.set(t!.id, resolve);
});
})();
}
async function markStep(
runId: string,
stepId: string,
status: 'completed' | 'skipped',
output?: string,
): Promise<void> {
if (output !== undefined) {
await sql`
UPDATE flow_steps SET status = ${status}, output = ${output}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${stepId}
`;
} else {
await sql`
UPDATE flow_steps SET status = ${status}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${stepId}
`;
}
}
// ─── run completion ─────────────────────────────────────────────────────────
async function finishRun(
runId: string,
flow: Flow,
input: FlowInput,
results: Record<string, string>,
model: string,
dispatch: DispatchFn,
): Promise<void> {
let report: string;
try {
report = flow.render(buildCtx(input, results, model, dispatch));
} catch (err) {
await failRun(runId, flow, input, model, `report render threw: ${errMsg(err)}`);
return;
}
const updated = await sql`
UPDATE flow_runs SET status = 'completed', report = ${report}, updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return; // already terminal (e.g. cancelled) — don't publish
publishStep(runId, lastAgentStepId(flow, input, model), 'completed', {
run_status: 'completed',
report,
});
}
async function failRun(
runId: string,
flow: Flow | undefined,
input: FlowInput,
model: string,
error: string,
failedStepId?: string,
): Promise<void> {
const updated = await sql`
UPDATE flow_runs SET status = 'failed', error = ${error}, updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return;
const stepId = failedStepId ?? (flow ? lastAgentStepId(flow, input, model) : 'run');
log.warn({ runId, error }, 'flow-runner: run failed');
publishStep(runId, stepId, 'failed', { run_status: 'failed' });
}
async function cancelRun(runId: string): Promise<void> {
// A step was cancelled (e.g. PTY killed) while the run was still 'running' —
// mark the run cancelled and publish frames for any remaining pending steps.
const updated = await sql`
UPDATE flow_runs SET status = 'cancelled', updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return; // idempotent — already terminal
// Any remaining pending steps are unreachable; mark + publish them so the
// pane can show them as cancelled rather than stuck in pending.
const pending = await sql<{ step_id: string; kind: string }[]>`
SELECT step_id, kind FROM flow_steps
WHERE run_id = ${runId} AND status IN ('pending', 'running')
`;
if (pending.length > 0) {
await sql`
UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND status IN ('pending', 'running')
`;
for (const s of pending) {
if (s.kind === 'agent') publishStep(runId, s.step_id, 'cancelled', { run_status: 'cancelled' });
}
}
log.info({ runId }, 'flow-runner: run cancelled');
}
/** The terminal agent step in roster order — a valid roster step_id to carry the
* run-level completion/failure status (the pane reads run_status/report off it). */
function lastAgentStepId(flow: Flow, input: FlowInput, model: string): string {
const agents = manifestSteps(flow, buildCtx(input, {}, model)).filter((s) => s.kind === 'agent');
return agents.length ? agents[agents.length - 1]!.id : (flow.steps[0]?.id ?? 'run');
}
function publishStep(
runId: string,
stepId: string,
status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled',
extra?: { run_status?: 'running' | 'completed' | 'failed' | 'cancelled'; report?: string },
): void {
publishUser({
type: 'flow_run_step_updated',
run_id: runId,
step_id: stepId,
status,
...(extra ?? {}),
});
}
// ─── terminal callback (wired to createDispatcher.onTaskTerminal) ─────────────
function handleTaskTerminal(taskId: string, state: string): void {
void (async () => {
// 1. A ctx.dispatch sub-task → resolve its waiter with the full output.
const waiter = subDispatchWaiters.get(taskId);
if (waiter) {
subDispatchWaiters.delete(taskId);
const out = await readTaskOutput(taskId).catch(() => '');
waiter(out);
return;
}
// 2. A flow step task → mark the step, then advance its run.
const [step] = await sql<{ run_id: string; step_id: string; chat_id: string | null; status: string }[]>`
SELECT run_id, step_id, chat_id, status FROM flow_steps WHERE task_id = ${taskId}
`;
if (!step) return; // not a flow task
if (step.status !== 'running') return; // idempotent against duplicate callbacks
if (state === 'completed') {
const output = step.chat_id ? await readChatOutput(step.chat_id).catch(() => '') : '';
await sql`
UPDATE flow_steps SET status = 'completed', output = ${output}, updated_at = clock_timestamp()
WHERE run_id = ${step.run_id} AND step_id = ${step.step_id} AND status = 'running'
`;
publishStep(step.run_id, step.step_id, 'completed');
} else {
const stepStatus = state === 'cancelled' ? 'cancelled' : 'failed';
await sql`
UPDATE flow_steps SET status = ${stepStatus}, updated_at = clock_timestamp()
WHERE run_id = ${step.run_id} AND step_id = ${step.step_id} AND status = 'running'
`;
if (stepStatus === 'failed') {
publishStep(step.run_id, step.step_id, 'failed');
} else {
// A cancelled step always triggers run cancellation (advanceInner path).
// Include run_status: 'cancelled' here so single-step flows get a
// complete run-level cancel frame before cancelRun runs.
publishStep(step.run_id, step.step_id, 'cancelled', { run_status: 'cancelled' });
}
}
await advance(step.run_id);
})().catch((err) => {
log.error({ err: errMsg(err), taskId }, 'flow-runner: handleTaskTerminal failed');
});
}
// ─── startup resume (D-9) ─────────────────────────────────────────────────────
/**
* Apply one step's resume decision to the DB, then return (the caller drives the
* loop). Re-dispatch reuses the prompt already stored in flow_steps.input (built
* in-process at launch per D-1) so the flow def doesn't need to be reloaded and
* ctx.dispatch closures don't re-execute.
*/
async function applyResumeDecision(
runId: string,
projectId: string,
model: string,
step: { step_id: string; task_id: string | null; chat_id: string | null; input: string | null },
decision: StepResumeDecision,
): Promise<void> {
switch (decision.action) {
case 'keep':
break;
case 'mark-done': {
const output = step.chat_id ? await readChatOutput(step.chat_id).catch(() => '') : '';
await sql`
UPDATE flow_steps
SET status = 'completed', output = ${output}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
break;
}
case 'mark-failed':
await sql`
UPDATE flow_steps
SET status = 'failed', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
break;
case 'mark-cancelled':
await sql`
UPDATE flow_steps
SET status = 'cancelled', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
break;
case 're-dispatch': {
// No stored prompt means we can't rebuild the task input (the step never
// reached the dispatch stage). Fail it so advance() fails the run cleanly.
if (!step.input) {
await sql`
UPDATE flow_steps
SET status = 'failed', error = 're-dispatch: no stored prompt',
updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
break;
}
// Reuse stored chat / session so the stream still rides the pre-created
// synthetic chat (D-6). READ-ONLY (D-4): agent='qwen', mode_id='plan'.
const chatId = step.chat_id;
const [chat] = chatId
? await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}`
: [];
const sessionId = chat?.session_id ?? null;
const [task] = await sql<{ id: string }[]>`
INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id)
VALUES (${projectId}, ${step.input}, 'qwen', ${model}, 'plan', ${sessionId}, ${chatId})
RETURNING id
`;
await sql`
UPDATE flow_steps
SET task_id = ${task!.id}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
log.info({ runId, stepId: step.step_id, taskId: task!.id }, 'flow-runner: step re-dispatched on resume');
break;
}
}
}
async function resumeRun(run: {
id: string;
project_id: string;
model: string;
}): Promise<void> {
const rows = await sql<{
step_id: string;
task_id: string | null;
status: string;
chat_id: string | null;
input: string | null;
}[]>`SELECT step_id, task_id, status, chat_id, input FROM flow_steps WHERE run_id = ${run.id}`;
// Load task states for all referenced tasks in one query.
const taskIds = rows.map((r) => r.task_id).filter((id): id is string => id !== null);
const taskStates = new Map<string, string>();
if (taskIds.length > 0) {
const tasks = await sql<{ id: string; state: string }[]>`
SELECT id, state FROM tasks WHERE id = ANY(${taskIds})
`;
for (const t of tasks) taskStates.set(t.id, t.state);
}
const decisions = reconcileRun(
rows.map((r) => ({ stepId: r.step_id, taskId: r.task_id, status: r.status })),
taskStates,
);
for (const decision of decisions) {
if (decision.action === 'keep') continue;
const row = rows.find((r) => r.step_id === decision.stepId)!;
await applyResumeDecision(run.id, run.project_id, run.model, row, decision);
}
// advance() re-derives the ready wave and dispatches it; new re-dispatched tasks
// are already 'pending' and will be picked up by the dispatcher's startup poll.
await advance(run.id);
log.info({ runId: run.id }, 'flow-runner: run resumed');
}
async function initResume(): Promise<void> {
const runs = await sql<{ id: string; project_id: string; model: string }[]>`
SELECT id, project_id, model FROM flow_runs WHERE status = 'running'
`;
if (runs.length === 0) return;
log.info({ count: runs.length }, 'flow-runner: resuming in-flight runs on startup');
for (const run of runs) {
await resumeRun(run).catch((err) => {
log.error({ err: errMsg(err), runId: run.id }, 'flow-runner: initResume failed for run');
});
}
}
// ─── cancel (Phase 6 stop route) ─────────────────────────────────────────────
async function cancel(runId: string): Promise<{ cancelled: boolean; taskIds: string[] }> {
const updated = await sql`
UPDATE flow_runs SET status = 'cancelled', updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return { cancelled: false, taskIds: [] };
// Mark all non-terminal steps cancelled and collect in-flight task_ids.
const steps = await sql<{ step_id: string; task_id: string | null; kind: string }[]>`
SELECT step_id, task_id, kind FROM flow_steps
WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped')
`;
if (steps.length > 0) {
await sql`
UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped')
`;
for (const s of steps) {
if (s.kind === 'agent') publishStep(runId, s.step_id, 'cancelled', { run_status: 'cancelled' });
}
} else {
// Run was 'running' but no non-terminal steps — edge case (advance in progress).
// Re-publish the last agent step with run_status: 'cancelled' so the pane updates.
const [last] = await sql<{ step_id: string }[]>`
SELECT step_id FROM flow_steps WHERE run_id = ${runId} AND kind = 'agent'
ORDER BY created_at DESC LIMIT 1
`;
if (last) publishStep(runId, last.step_id, 'cancelled', { run_status: 'cancelled' });
}
const taskIds = steps
.filter((s): s is typeof s & { task_id: string } => s.task_id !== null)
.map((s) => s.task_id);
log.info({ runId }, 'flow-runner: run cancelled by request');
return { cancelled: true, taskIds };
}
return { launch, handleTaskTerminal, initResume, cancel };
}
function errMsg(e: unknown): string {
return e instanceof Error ? e.message : String(e);
}