feat: in-app Orchestrator (Phase 2) — multi-agent conductor

Brings the deterministic Han-flow conductor into BooCode: launch any read-only
flow from BooChat or BooCoder, watch each agent stream live in a Paseo-style
run pane, get an evidence-disciplined report — on local Qwen, persisted and
resumable. Read-only enforced hard via qwen --approval-mode plan (orchestrator
tasks fail closed if qwen is unavailable; never fall to write-capable native).

Backend (apps/coder): re-homed conductor defs, flow_runs/flow_steps schema,
flow-runner + dispatcher onTaskTerminal hook, restart-resume, runs routes
(launch/list/get/cancel), user-channel WS. Contracts: two flow_run_* frames.
Web: orchestrator pane kind + OrchestratorPane, Workflow button + slash flows
(BooChat/BooCoder parity), FlowLauncherDialog, "New Orchestrator" in the + and
split menus, runs history + export. Plan: openspec/changes/orchestrator.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-03 14:59:07 +00:00
parent 519b1d2ca1
commit 1937af8df9
118 changed files with 15723 additions and 27 deletions

View File

@@ -0,0 +1,389 @@
import { describe, it, expect } from 'vitest';
import type { Flow, Step, StepContext } from '../../conductor/types.js';
import {
manifestSteps,
readySteps,
partitionReady,
isRunComplete,
isStuck,
reconcileResumeStep,
reconcileRun,
shouldFailOnMissingAgent,
type SchedulerState,
} from '../flow-runner-decisions.js';
/**
* The DB-driven flow-runner replaces the Phase-1 in-memory wave scheduler
* (conductor/src/flow.ts). These pure helpers are that scheduler's decision
* core, lifted out so the DB-backed runner stays a thin IO shell over a
* testable kernel (repo pattern: turn-guard.ts / lifecycle-decisions.ts).
*
* The schedule must match conductor/flow.ts:27-41 exactly: a step is ready when
* every dependency is settled (completed OR skipped/excluded), and a ready step
* is skipped when its when() guard returns false against the current context.
*/
// A small synthetic flow exercising: parallel angles, a band-gated angle, a code
// fold that fans them in, an agent synthesizer, and a terminal validator.
function makeFlow(): Flow {
const steps: Step[] = [
{ id: 'a', kind: 'agent', agent: 'analyst-a', run: () => 'prompt a' },
{
id: 'b',
kind: 'agent',
agent: 'analyst-b',
// band-gated: only runs at medium+ (mirrors Angle.minBand gating)
when: (ctx) => ctx.input.band === 'medium' || ctx.input.band === 'large',
run: () => 'prompt b',
},
{ id: 'fold', kind: 'code', deps: ['a', 'b'], run: (ctx) => `fold:${Object.keys(ctx.results).join(',')}` },
{ id: 'synth', kind: 'agent', agent: 'architect', deps: ['fold'], run: () => 'prompt synth' },
{ id: 'val', kind: 'agent', agent: 'validator', deps: ['synth'], run: () => 'prompt val' },
];
return { name: 'demo', description: 'demo flow', steps, render: () => 'report' };
}
function ctxOf(band: string, results: Record<string, string> = {}): StepContext {
return { input: { question: 'q', band }, results };
}
const emptyState = (over: Partial<SchedulerState> = {}): SchedulerState => ({
done: new Set(),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
...over,
});
describe('manifestSteps', () => {
it('drops a band-gated step at small band, keeps it at medium', () => {
const flow = makeFlow();
const small = manifestSteps(flow, ctxOf('small')).map((s) => s.id);
expect(small).toEqual(['a', 'fold', 'synth', 'val']); // b excluded by when()
const medium = manifestSteps(flow, ctxOf('medium')).map((s) => s.id);
expect(medium).toEqual(['a', 'b', 'fold', 'synth', 'val']);
});
it('includes every step when no when() guards are defined', () => {
const flow: Flow = {
name: 'guardless',
description: 'no guards',
steps: [
{ id: 'x', kind: 'agent', agent: 'a', run: () => 'p' },
{ id: 'y', kind: 'code', deps: ['x'], run: () => 'r' },
],
render: () => '',
};
expect(manifestSteps(flow, ctxOf('small')).map((s) => s.id)).toEqual(['x', 'y']);
});
it('returns empty when every when() guard evaluates false', () => {
const flow: Flow = {
name: 'all-gated',
description: 'all filtered',
steps: [
{ id: 'p', kind: 'agent', agent: 'a', when: () => false, run: () => 'p' },
{ id: 'q', kind: 'agent', agent: 'b', when: () => false, run: () => 'q' },
],
render: () => '',
};
expect(manifestSteps(flow, ctxOf('small'))).toEqual([]);
});
});
describe('readySteps', () => {
it('returns only dep-free, unsettled steps first', () => {
const flow = makeFlow();
// small band → b is excluded; a is the only dep-free live step.
const state = emptyState({ excluded: new Set(['b']) });
expect(readySteps(flow, state).map((s) => s.id)).toEqual(['a']);
});
it('treats an excluded dependency as satisfied (fold unblocks once a is done)', () => {
const flow = makeFlow();
// a completed, b excluded (small band) → fold's deps [a,b] are both settled.
const state = emptyState({ done: new Set(['a']), excluded: new Set(['b']) });
expect(readySteps(flow, state).map((s) => s.id)).toEqual(['fold']);
});
it('does not re-offer a step that is in flight', () => {
const flow = makeFlow();
const state = emptyState({ inFlight: new Set(['a']), excluded: new Set(['b']) });
expect(readySteps(flow, state).map((s) => s.id)).toEqual([]);
});
it('waits on an unfinished dependency', () => {
const flow = makeFlow();
// fold done, synth done → val is ready; synth not done → val not ready.
const blocked = emptyState({ done: new Set(['a', 'fold']), excluded: new Set(['b']) });
expect(readySteps(flow, blocked).map((s) => s.id)).toEqual(['synth']);
});
it('returns both parallel dep-free steps when neither is excluded or settled', () => {
const flow = makeFlow();
// readySteps does NOT evaluate when() — that is partitionReady's job.
// Both a and b have no deps and are unsettled → both offered.
expect(readySteps(flow, emptyState()).map((s) => s.id)).toEqual(['a', 'b']);
});
it('does not unblock a step whose dep is in flight (dep not yet satisfied)', () => {
const flow = makeFlow();
// a is in flight → isSatisfied('a') === false → fold (deps:['a','b']) blocked.
// b excluded (satisfied). Nothing else has all deps satisfied → empty wave.
const state = emptyState({ inFlight: new Set(['a']), excluded: new Set(['b']) });
const ready = readySteps(flow, state).map((s) => s.id);
expect(ready).not.toContain('fold');
expect(ready).toEqual([]);
});
it('advances through the full wave chain: a+b+fold+synth done → val is ready', () => {
const flow = makeFlow();
const state = emptyState({ done: new Set(['a', 'b', 'fold', 'synth']) });
expect(readySteps(flow, state).map((s) => s.id)).toEqual(['val']);
});
});
describe('partitionReady', () => {
it('routes a when()-false step to toSkip, the rest to toRun', () => {
const flow = makeFlow();
// At small band both a and b are "ready" if offered; b's guard fails → skip.
const ready = [flow.steps[0]!, flow.steps[1]!]; // a, b
const { toRun, toSkip } = partitionReady(ready, ctxOf('small'));
expect(toRun.map((s) => s.id)).toEqual(['a']);
expect(toSkip.map((s) => s.id)).toEqual(['b']);
});
it('keeps every guardless step in toRun', () => {
const flow = makeFlow();
const ready = [flow.steps[2]!, flow.steps[3]!]; // fold, synth (no when)
const { toRun, toSkip } = partitionReady(ready, ctxOf('large'));
expect(toRun.map((s) => s.id)).toEqual(['fold', 'synth']);
expect(toSkip).toEqual([]);
});
it('handles an empty ready list gracefully', () => {
const { toRun, toSkip } = partitionReady([], ctxOf('small'));
expect(toRun).toEqual([]);
expect(toSkip).toEqual([]);
});
});
describe('isRunComplete / isStuck', () => {
it('is complete when every step is settled or excluded', () => {
const flow = makeFlow();
const state = emptyState({
done: new Set(['a', 'fold', 'synth', 'val']),
excluded: new Set(['b']),
});
expect(isRunComplete(flow, state)).toBe(true);
expect(isStuck(flow, state)).toBe(false);
});
it('is not complete while a step is pending', () => {
const flow = makeFlow();
const state = emptyState({ done: new Set(['a']), excluded: new Set(['b']) });
expect(isRunComplete(flow, state)).toBe(false);
});
it('is not stuck while a ready step still exists', () => {
const flow = makeFlow();
// a done, b excluded → fold is ready, so the run can still progress.
const state = emptyState({ done: new Set(['a']), excluded: new Set(['b']) });
expect(isStuck(flow, state)).toBe(false);
});
it('is stuck on an unsatisfiable dependency with nothing in flight', () => {
// 'orphan' depends on 'ghost', which is never produced (not a step, never
// settled) — and nothing is in flight to ever settle it: a dead end.
const cyclic: Flow = {
name: 'stuck',
description: 'unsatisfiable',
steps: [{ id: 'orphan', kind: 'agent', agent: 'x', deps: ['ghost'], run: () => 'p' }],
render: () => 'r',
};
const state = emptyState();
expect(readySteps(cyclic, state)).toEqual([]);
expect(isRunComplete(cyclic, state)).toBe(false);
expect(isStuck(cyclic, state)).toBe(true);
});
it('is complete when every step is skipped (no done, no excluded)', () => {
const flow = makeFlow();
const state = emptyState({ skipped: new Set(['a', 'b', 'fold', 'synth', 'val']) });
expect(isRunComplete(flow, state)).toBe(true);
expect(isStuck(flow, state)).toBe(false);
});
it('is complete when steps are spread across done + skipped + excluded', () => {
const flow = makeFlow();
// a done, b excluded, fold done, synth skipped, val done — all settled.
const state = emptyState({
done: new Set(['a', 'fold', 'val']),
skipped: new Set(['synth']),
excluded: new Set(['b']),
});
expect(isRunComplete(flow, state)).toBe(true);
expect(isStuck(flow, state)).toBe(false);
});
it('is NOT stuck when in-flight tasks exist even if no step is currently ready', () => {
const flow = makeFlow();
// a is in flight → fold's dep unsatisfied → nothing ready.
// But a is still running so the run CAN make progress → not stuck.
const state = emptyState({ inFlight: new Set(['a']), excluded: new Set(['b']) });
expect(isStuck(flow, state)).toBe(false);
expect(isRunComplete(flow, state)).toBe(false);
expect(readySteps(flow, state)).toEqual([]); // confirms nothing ready
});
});
// ─── Resume reconciliation (D-9) ─────────────────────────────────────────────
describe('reconcileResumeStep', () => {
it('keeps non-running steps regardless of task state', () => {
for (const status of ['completed', 'skipped', 'failed', 'cancelled', 'pending']) {
expect(reconcileResumeStep(status, 'tid', 'running')).toBe('keep');
}
});
it('re-dispatches a running step with no task_id', () => {
expect(reconcileResumeStep('running', null, null)).toBe('re-dispatch');
});
it('re-dispatches a running step whose task is absent from the DB', () => {
expect(reconcileResumeStep('running', 'tid', null)).toBe('re-dispatch');
});
it('marks done when the task completed before the callback ran', () => {
expect(reconcileResumeStep('running', 'tid', 'completed')).toBe('mark-done');
});
it('marks failed when the task failed', () => {
expect(reconcileResumeStep('running', 'tid', 'failed')).toBe('mark-failed');
});
it('marks cancelled when the task was cancelled', () => {
expect(reconcileResumeStep('running', 'tid', 'cancelled')).toBe('mark-cancelled');
});
it('keeps a running step whose task is pending (dispatcher startup poll handles it)', () => {
expect(reconcileResumeStep('running', 'tid', 'pending')).toBe('keep');
});
it('re-dispatches when the task is running (PTY dead on restart)', () => {
expect(reconcileResumeStep('running', 'tid', 'running')).toBe('re-dispatch');
});
it('re-dispatches when the task is blocked (permission dialog gone on restart)', () => {
expect(reconcileResumeStep('running', 'tid', 'blocked')).toBe('re-dispatch');
});
});
// ─── Dispatcher routing guard (H1) ───────────────────────────────────────────
describe('shouldFailOnMissingAgent', () => {
it('returns true for qwen+plan (orchestrator read-only gate)', () => {
expect(shouldFailOnMissingAgent('qwen', 'plan')).toBe(true);
});
it('returns false for qwen without plan mode', () => {
expect(shouldFailOnMissingAgent('qwen', null)).toBe(false);
expect(shouldFailOnMissingAgent('qwen', 'auto')).toBe(false);
});
it('returns false for non-qwen agents even with plan mode', () => {
expect(shouldFailOnMissingAgent('goose', 'plan')).toBe(false);
expect(shouldFailOnMissingAgent('opencode', 'plan')).toBe(false);
expect(shouldFailOnMissingAgent('claude', 'plan')).toBe(false);
});
it('returns false for qwen with any mode other than plan', () => {
for (const mode of ['bypassPermissions', 'acceptEdits', 'dontAsk', 'default', '']) {
expect(shouldFailOnMissingAgent('qwen', mode)).toBe(false);
}
});
it('returns false for empty or unknown agent name even with plan mode', () => {
expect(shouldFailOnMissingAgent('', 'plan')).toBe(false);
expect(shouldFailOnMissingAgent('native', 'plan')).toBe(false);
expect(shouldFailOnMissingAgent('boocode', 'plan')).toBe(false);
});
});
describe('reconcileRun', () => {
it('returns one decision per step', () => {
const steps = [
{ stepId: 'a', taskId: null, status: 'completed' },
{ stepId: 'b', taskId: 't1', status: 'running' },
{ stepId: 'c', taskId: 't2', status: 'running' },
];
const taskStates = new Map([['t1', 'completed'], ['t2', 'running']]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions).toHaveLength(3);
expect(decisions[0]).toEqual({ stepId: 'a', action: 'keep' });
expect(decisions[1]).toEqual({ stepId: 'b', action: 'mark-done' });
expect(decisions[2]).toEqual({ stepId: 'c', action: 're-dispatch' });
});
it('handles a mixed run: completed steps kept, live-pending kept, stale re-dispatched', () => {
const steps = [
{ stepId: 'finder-1', taskId: 't1', status: 'completed' },
{ stepId: 'finder-2', taskId: 't2', status: 'running' }, // PTY dead
{ stepId: 'finder-3', taskId: 't3', status: 'running' }, // pending in dispatcher
{ stepId: 'synth', taskId: null, status: 'pending' }, // not yet started
];
const taskStates = new Map([
['t1', 'completed'],
['t2', 'running'], // stuck — PTY dead
['t3', 'pending'], // dispatcher will handle
]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions.find((d) => d.stepId === 'finder-1')?.action).toBe('keep');
expect(decisions.find((d) => d.stepId === 'finder-2')?.action).toBe('re-dispatch');
expect(decisions.find((d) => d.stepId === 'finder-3')?.action).toBe('keep');
expect(decisions.find((d) => d.stepId === 'synth')?.action).toBe('keep');
});
it('produces mark-failed for a failed task and mark-done for a completed task', () => {
const steps = [
{ stepId: 'a', taskId: 't1', status: 'running' },
{ stepId: 'b', taskId: 't2', status: 'running' },
];
const taskStates = new Map([['t1', 'failed'], ['t2', 'completed']]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions.find((d) => d.stepId === 'a')?.action).toBe('mark-failed');
expect(decisions.find((d) => d.stepId === 'b')?.action).toBe('mark-done');
});
it('is idempotent: a re-dispatched step (task now pending) is kept on second call', () => {
// After re-dispatch, flow_steps.status stays 'running' but task_id → new pending task.
const steps = [{ stepId: 'x', taskId: 'new-task', status: 'running' }];
const taskStates = new Map([['new-task', 'pending']]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions[0]?.action).toBe('keep');
});
it('returns an empty array when there are no steps', () => {
expect(reconcileRun([], new Map())).toEqual([]);
});
it('re-dispatches a running step whose taskId is absent from the taskStates map', () => {
// taskId is set but the task row no longer exists in the DB → taskState resolves to null.
const steps = [{ stepId: 'x', taskId: 'orphan-task', status: 'running' }];
const decisions = reconcileRun(steps, new Map());
expect(decisions[0]?.action).toBe('re-dispatch');
});
it('re-dispatches a running step with null taskId', () => {
const steps = [{ stepId: 'y', taskId: null, status: 'running' }];
const decisions = reconcileRun(steps, new Map());
expect(decisions[0]?.action).toBe('re-dispatch');
});
it('propagates mark-cancelled when the associated task was cancelled before the callback ran', () => {
const steps = [{ stepId: 'z', taskId: 'tid', status: 'running' }];
const taskStates = new Map([['tid', 'cancelled']]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions[0]?.action).toBe('mark-cancelled');
});
});

View File

@@ -25,6 +25,15 @@ import { type AcpToolSnapshot, synthesizeCanceledSnapshots } from './acp-tool-sn
import { makeFrameEmitter, type FrameEmitter } from './frame-emitter.js';
import { buildAcpClient } from './acp-client.js';
/**
* Mode ids that enforce read-only at the agent's tool layer. When one of these is
* requested, applying it is safety-critical: a failure to set it must abort the
* turn (fail closed), never continue write-capable. `plan` is qwen's read-only
* approval mode (the orchestrator's gate, D-4); extend this set if another agent's
* read-only mode id is added to flows.
*/
const READ_ONLY_MODE_IDS = new Set(['plan']);
export interface AcpDispatchResult {
exitCode: number;
output: string;
@@ -65,6 +74,21 @@ async function applySessionOverrides(
try {
await connection.setSessionMode({ sessionId: acpSessionId, modeId });
} catch (err) {
// Defense-in-depth for the orchestrator read-only invariant (D-4): a
// read-only / plan mode that CANNOT be applied must FAIL CLOSED. If we only
// warned and continued (the default for non-read-only modes), the agent
// would run write-capable under a plan-mode request — exactly the silent
// gap this guards. Re-throw so dispatchViaAcp's catch marks the task failed
// rather than running an unguarded turn. (Orchestrator qwen+plan is routed
// to the PTY hard gate and never reaches here; this backstops any other
// route that lands a read-only mode on the ACP path.)
if (READ_ONLY_MODE_IDS.has(modeId)) {
log.error(
{ modeId, err: err instanceof Error ? err.message : String(err) },
'acp-dispatch: read-only setSessionMode failed — failing closed (aborting turn)',
);
throw err instanceof Error ? err : new Error(String(err));
}
log.warn({ modeId, err: err instanceof Error ? err.message : String(err) }, 'acp-dispatch: setSessionMode failed');
}
}

View File

@@ -28,6 +28,7 @@ import {
classifyTerminalStatus,
type TerminalMessageStatus,
} from './finalize-message.js';
import { shouldFailOnMissingAgent } from './flow-runner-decisions.js';
interface InferenceRunner {
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
@@ -41,8 +42,23 @@ interface Deps {
broker: Broker;
log: FastifyBaseLogger;
config: Config;
/**
* Orchestrator hook (D-2). Fired once per task as it reaches a terminal state
* (completed | failed | cancelled), AFTER the run-function has written that
* state. Path-agnostic — it keys off the settled `tasks` row, not any single
* run-function's terminal branch, so it fires for native + every external
* path. The flow-runner wires this to advance its `flow_runs`; absent (default)
* the dispatcher behaves exactly as before. Best-effort: a throw is logged and
* swallowed so it can never wedge the poll loop.
*/
onTaskTerminal?: (taskId: string, state: string) => void;
}
// Terminal task states the orchestrator hook fires on. 'blocked' is excluded —
// a blocked task is awaiting a human decision, so its runTask promise has not
// settled yet (the hook only fires after runTask settles).
const TERMINAL_TASK_STATES = new Set(['completed', 'failed', 'cancelled']);
// LISTEN/NOTIFY ('tasks_new') is the fast path — the dispatcher reacts to new
// tasks immediately. The poll is only a safety net for notifications missed
// during a listen-connection drop (porsager auto-reconnects), so it can stay slow.
@@ -54,7 +70,7 @@ export function createDispatcher(deps: Deps): {
start(): void;
stop(): Promise<void>;
} {
const { sql, inference, broker, log, config } = deps;
const { sql, inference, broker, log, config, onTaskTerminal } = deps;
let timer: ReturnType<typeof setInterval> | null = null;
let listener: { unlisten: () => Promise<void> } | null = null;
let polling = false;
@@ -134,6 +150,28 @@ export function createDispatcher(deps: Deps): {
return taskControllers.cancel(taskId);
}
// D-2: notify the orchestrator that a task settled. Re-reads the terminal state
// the run-function wrote (so this is path-agnostic — it works for native and
// every external path without instrumenting each terminal branch). Best-effort:
// a read failure or a callback throw is logged and swallowed.
function fireTaskTerminal(taskId: string): void {
if (!onTaskTerminal) return;
sql<{ state: string }[]>`SELECT state FROM tasks WHERE id = ${taskId}`
.then((rows) => {
const state = rows[0]?.state;
if (state && TERMINAL_TASK_STATES.has(state)) {
try {
onTaskTerminal(taskId, state);
} catch (err) {
log.error({ err, taskId }, 'dispatcher: onTaskTerminal callback threw');
}
}
})
.catch((err) => {
log.error({ err, taskId }, 'dispatcher: onTaskTerminal state read failed');
});
}
async function poll(): Promise<void> {
// `polling` serializes poll() execution itself (timer + NOTIFY can fire
// concurrently) so we never double-select a task. It does NOT serialize task
@@ -172,6 +210,15 @@ export function createDispatcher(deps: Deps): {
taskControllers.delete(task.id);
});
inflight.set(key, p);
// D-2: fire the orchestrator hook once the run settles (terminal state
// written), on both fulfilment and rejection. Detached from `p` so it
// never affects the inflight lifecycle or stop()'s drain.
if (onTaskTerminal) {
void p.then(
() => fireTaskTerminal(task.id),
() => fireTaskTerminal(task.id),
);
}
}
} finally {
polling = false;
@@ -197,6 +244,24 @@ export function createDispatcher(deps: Deps): {
SELECT name, supports_acp, install_path FROM available_agents WHERE name = ${task.agent}
`;
if (agentRow) {
// ORCHESTRATOR READ-ONLY INVARIANT (D-4). A qwen task dispatched in plan
// mode MUST bind the hard tool-level gate, and only the PTY path applies
// it (`qwen --approval-mode plan`, pty-dispatch.ts:75 — reads allowed,
// writes blocked inside the agent binary). The ACP paths set the mode via
// a session RPC (`setSessionMode`) that is fail-OPEN — a failed/ignored
// call leaves the agent write-capable — so they are never safe for the
// read-only guarantee. Force the one-shot PTY path for qwen+plan tasks
// regardless of available_agents.supports_acp (which probes true for qwen,
// since `qwen --help` lists `--acp`). This is correct on its own merits
// too: qwen's ACP bridge is an HTTP daemon, not the stdio ACP that
// dispatchViaAcp drives (root CLAUDE.md), so PTY is the working qwen path.
// Scoped to qwen (the orchestrator's only agent) to avoid changing the
// routing of any other agent; the ACP fail-closed guard (acp-dispatch.ts)
// backstops a plan-mode task that reaches ACP by any other route.
if (task.agent === 'qwen' && task.mode_id === 'plan') {
await runExternalAgent(task, /* supportsAcp */ false, agentRow.install_path);
return;
}
// v2.6 (1.7): opencode routes to its warm HTTP-server backend.
// v2.6 Phase 2 (2.4): goose/qwen route to the warm ACP backend WHEN the
// task came from a real chat tab (session_id + chat_id) — shouldUseWarmBackend.
@@ -217,6 +282,19 @@ export function createDispatcher(deps: Deps): {
}
return;
}
// Orchestrator (qwen+plan) tasks must NEVER fall through to write-capable
// native inference — the PTY plan-mode path is the only safe route. Fail
// hard so the flow-runner propagates a clear error to the run. (H1 fix)
if (shouldFailOnMissingAgent(task.agent, task.mode_id)) {
const errMsg = 'orchestrator task cannot run: qwen agent is not available (probe failed or binary missing)';
log.error({ taskId, agent: task.agent }, `dispatcher: ${errMsg}`);
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg}
WHERE id = ${taskId}
`.catch(() => {});
return;
}
// Agent specified but not available — fall through to Path A with a warning
log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native');
}

View File

@@ -0,0 +1,186 @@
/**
* Pure scheduling decisions for the DB-driven flow-runner — NO database, no IO.
*
* This is the decision core of the orchestrator's wave scheduler, lifted out of
* the IO-heavy `flow-runner.ts` so it can be unit-tested in isolation (the repo
* pattern: `backends/turn-guard.ts`, `backends/lifecycle-decisions.ts`). It is a
* faithful port of the Phase-1 in-memory scheduler (`conductor/src/flow.ts:27-41`):
* a step is ready when every dependency is *settled*, and a ready step is skipped
* when its `when()` guard returns false against the current context.
*
* Vocabulary (DB-adapted from flow.ts's `done`/`skipped` sets):
* - done — a step that completed with output (its result is in ctx.results)
* - skipped — a step whose when() guard returned false at ready-time
* - inFlight — an agent step whose task is dispatched but not yet terminal
* - excluded — a step pre-skipped at launch by its when() guard against the run
* input (band/repo gating). It never gets a flow_steps row, so it
* is tracked here rather than read back from the DB. A dependency
* on an excluded step counts as satisfied (the fan-in step still
* runs with whatever the live angles produced — flow.ts:39 / the
* fold's `if (out)` guard).
*
* "Settled" = done skipped excluded. Only settled deps unblock a step;
* an inFlight dep does NOT (the runner waits for its terminal callback).
*/
import type { Flow, Step, StepContext } from '../conductor/types.js';
export interface SchedulerState {
/** step ids that completed successfully (results available) */
readonly done: ReadonlySet<string>;
/** step ids skipped at ready-time (when() === false with results in hand) */
readonly skipped: ReadonlySet<string>;
/** step ids whose agent task is dispatched but not yet terminal */
readonly inFlight: ReadonlySet<string>;
/** step ids pre-skipped at launch (band/when gating) — never given a row */
readonly excluded: ReadonlySet<string>;
}
/** A dependency is satisfied once it is done, skipped, or excluded. */
function isSatisfied(state: SchedulerState, id: string): boolean {
return state.done.has(id) || state.skipped.has(id) || state.excluded.has(id);
}
/**
* The steps that will appear in this run — every step NOT pre-skipped by its
* `when()` guard evaluated against the launch context (input only; results are
* empty at launch). The complement (steps filtered out here) is the `excluded`
* set. Mirrors the Spine factory's band gating (`spine.ts:71`). Pure.
*/
export function manifestSteps(flow: Flow, launchCtx: StepContext): Step[] {
return flow.steps.filter((s) => !s.when || s.when(launchCtx));
}
/**
* Steps whose dependencies are all satisfied and that are not themselves
* settled, excluded, or in flight — i.e. ready to evaluate/dispatch this tick.
* Faithful to `conductor/flow.ts:27-36`. Pure.
*/
export function readySteps(flow: Flow, state: SchedulerState): Step[] {
return flow.steps.filter(
(s) =>
!state.done.has(s.id) &&
!state.skipped.has(s.id) &&
!state.inFlight.has(s.id) &&
!state.excluded.has(s.id) &&
(s.deps ?? []).every((d) => isSatisfied(state, d)),
);
}
/**
* Partition a ready set into the steps to skip (when() === false at ready-time)
* vs the steps to run, evaluating each guard against the current context
* (input + completed results). Faithful to `conductor/flow.ts:39`. Pure.
*/
export function partitionReady(
ready: readonly Step[],
ctx: StepContext,
): { toRun: Step[]; toSkip: Step[] } {
const toRun: Step[] = [];
const toSkip: Step[] = [];
for (const s of ready) {
if (s.when && !s.when(ctx)) toSkip.push(s);
else toRun.push(s);
}
return { toRun, toSkip };
}
/** True when every step is settled (done skipped excluded). Pure. */
export function isRunComplete(flow: Flow, state: SchedulerState): boolean {
return flow.steps.every((s) => isSatisfied(state, s.id));
}
/**
* True when the run can make no further progress yet cannot complete: no ready
* step, nothing in flight to eventually unblock one, and not complete. Signals a
* dependency cycle or an unsatisfiable dep (flow.ts:33 throws on this). Pure.
*/
export function isStuck(flow: Flow, state: SchedulerState): boolean {
return (
!isRunComplete(flow, state) &&
state.inFlight.size === 0 &&
readySteps(flow, state).length === 0
);
}
// ─── Resume reconciliation (D-9) ─────────────────────────────────────────────
/**
* Per-step action for `initResume`. Pure — no IO; callers supply DB rows.
*
* - 'keep': step is already settled, or has a live pending task that the
* dispatcher's startup poll will run automatically.
* - 're-dispatch': step is running but its task died (non-terminal non-pending
* state, or absent) — re-insert a fresh task with the stored
* prompt.
* - 'mark-done': task completed before the terminal callback ran; write output
* and advance.
* - 'mark-failed': task failed; propagate so advance() fails the run.
* - 'mark-cancelled': task was cancelled before the callback ran; propagate so
* advance() cancels the run.
*/
export type ResumeAction =
| 'keep'
| 're-dispatch'
| 'mark-done'
| 'mark-failed'
| 'mark-cancelled';
/**
* Decide what to do with ONE flow step during startup resume (D-9). Pure.
*
* @param status - flow_steps.status
* @param taskId - flow_steps.task_id (null for code steps or unstarted agent steps)
* @param taskState - tasks.state for taskId, or null if the task row is absent
*/
export function reconcileResumeStep(
status: string,
taskId: string | null,
taskState: string | null,
): ResumeAction {
if (status !== 'running') return 'keep';
// Running step: decide by its task's current state.
if (!taskId || taskState === null) return 're-dispatch'; // task gone or never created
switch (taskState) {
case 'completed': return 'mark-done';
case 'failed': return 'mark-failed';
case 'cancelled': return 'mark-cancelled';
case 'pending': return 'keep'; // dispatcher startup poll will run it normally
default: return 're-dispatch'; // 'running' or 'blocked' — PTY is dead
}
}
export interface StepResumeDecision {
stepId: string;
action: ResumeAction;
}
// ─── Dispatcher routing guard (H1 fix) ───────────────────────────────────────
/**
* Returns true when a task whose named agent is unavailable must FAIL HARD
* rather than fall through to native inference. Orchestrator steps (qwen+plan)
* are the only case: the PTY `--approval-mode plan` path is the only route
* that enforces the read-only invariant, so if qwen is missing the step must
* fail instead of silently gaining write capability. Pure.
*/
export function shouldFailOnMissingAgent(agent: string, modeId: string | null): boolean {
return agent === 'qwen' && modeId === 'plan';
}
/**
* Reconcile every step of an in-flight run for startup resume. Returns one
* decision per step. Pure — no IO.
*/
export function reconcileRun(
steps: ReadonlyArray<{ stepId: string; taskId: string | null; status: string }>,
taskStates: ReadonlyMap<string, string>,
): StepResumeDecision[] {
return steps.map((step) => ({
stepId: step.stepId,
action: reconcileResumeStep(
step.status,
step.taskId,
step.taskId ? (taskStates.get(step.taskId) ?? null) : null,
),
}));
}

View File

@@ -0,0 +1,765 @@
/**
* Flow-runner — the orchestrator's DB-backed execution engine (D-2/D-3/D-4).
*
* Replaces the Phase-1 in-memory wave scheduler (`conductor/src/flow.ts`) with a
* scheduler that persists every step to `flow_runs`/`flow_steps` and executes
* agent steps by INSERTing a normal `tasks` row that the EXISTING dispatcher
* picks up (LISTEN 'tasks_new'). It owns no poll loop — it reacts to the
* dispatcher's `onTaskTerminal` hook (`handleTaskTerminal`). The pure scheduling
* decisions live in `flow-runner-decisions.ts`; this file is the IO shell.
*
* Two load-bearing properties:
* - READ-ONLY (D-4): every dispatched step task is hardcoded `agent='qwen'`,
* `mode_id='plan'`, NEVER user-overridable. The dispatcher's qwen+plan routing
* rule forces the PTY path so qwen runs with `--approval-mode plan` — a hard
* tool-level gate (reads allowed, writes blocked) inside the agent binary.
* - PROMPTS BUILT IN-PROCESS (D-1): each worker prompt is assembled here by
* calling the flow's `step.run(ctx)` and prepending the agent persona + the
* evidence/YAGNI contracts (the contracts are baked into `step.run`'s output
* by the flow definition). The prompt is the task's `input`; no closure is
* ever serialized to the DB.
*
* STREAM ATTRIBUTION. Each agent step gets its own synthetic session + chat. The
* step task carries that `session_id`; the one-shot external path reuses the
* session's single open chat, so its delta/tool_call/message_complete frames are
* keyed by the step's `chat_id` (D-6). A per-step session (not one per run) is
* required because `runExternalAgent` picks the session's most-recent open chat —
* sharing one session across a parallel wave would misattribute the streams.
*
* FRAME CHANNEL. Run-level frames (`flow_run_started`, `flow_run_step_updated`)
* are published on the user channel (run/sidebar-level, like `session_updated`).
* The per-agent token stream rides the existing per-session frames the dispatcher
* already emits. (Phase 8 wires the OrchestratorPane's subscription to both.)
*/
import type { Sql } from '../db.js';
import type { Broker } from '@boocode/server/broker';
import type { WsFrame } from '@boocode/contracts/ws-frames';
import type { FastifyBaseLogger } from 'fastify';
import type { Config } from '../config.js';
import { getFlow } from '../conductor/flows/index.js';
import { loadPersona } from '../conductor/persona-loader.js';
import type { Band, DispatchFn, Flow, FlowInput, Step, StepContext } from '../conductor/types.js';
import {
isRunComplete,
manifestSteps,
partitionReady,
readySteps,
reconcileRun,
type SchedulerState,
type StepResumeDecision,
} from './flow-runner-decisions.js';
export interface LaunchOpts {
projectId: string;
flowName: string;
band: Band;
/** the flow input; MUST carry a non-empty `question` (CHECK input ? 'question') */
input: FlowInput;
/** override the run model; defaults to config.DEFAULT_MODEL (qwen 35B) */
model?: string;
}
export interface FlowRunner {
/** Start a run: persist it, build the roster, dispatch the first ready wave. */
launch(opts: LaunchOpts): Promise<{ runId: string }>;
/**
* Wire this to `createDispatcher({ onTaskTerminal })`. Fires when ANY task
* settles; the runner ignores tasks it doesn't own. Never throws.
*/
handleTaskTerminal(taskId: string, state: string): void;
/**
* Re-advance every flow_run still marked 'running' on coder startup (D-9).
* Idempotent: completed steps are kept, stale-running steps are re-dispatched
* (a re-dispatched step already has a 'pending' task that won't be touched
* again), completed tasks are surfaced as done. Never throws.
*/
initResume(): Promise<void>;
/**
* Cancel a running flow run (Phase 6 stop route). Marks the run and all
* non-terminal steps 'cancelled', publishes cancel frames, and returns the
* task_ids of any in-flight step tasks so the route can abort them via the
* dispatcher's cancelExternalTask. Returns cancelled:false when the run is
* not found or already terminal.
*/
cancel(runId: string): Promise<{ cancelled: boolean; taskIds: string[] }>;
}
interface Deps {
sql: Sql;
broker: Broker;
log: FastifyBaseLogger;
config: Config;
}
interface FlowStepRow {
step_id: string;
kind: 'agent' | 'code';
agent: string | null;
status: string;
chat_id: string | null;
output: string | null;
}
export function createFlowRunner(deps: Deps): FlowRunner {
const { sql, broker, log, config } = deps;
// Serialize advance() per run so two near-simultaneous terminal callbacks for a
// parallel wave can't double-dispatch the next wave or race the DB reads.
const advanceChain = new Map<string, Promise<void>>();
// ctx.dispatch sub-tasks (code-review's per-dimension adversarial verify): a
// taskId → resolver map. These tasks have NO flow_steps row; handleTaskTerminal
// resolves them here instead of advancing a run.
const subDispatchWaiters = new Map<string, (output: string) => void>();
function publishUser(frame: Record<string, unknown>): void {
broker.publishUserFrame('default', frame as unknown as WsFrame);
}
function humanize(stepId: string): string {
return stepId
.replace(/[-_]+/g, ' ')
.replace(/\b\w/g, (c) => c.toUpperCase());
}
function buildCtx(
input: FlowInput,
results: Record<string, string>,
model: string,
dispatch?: DispatchFn,
): StepContext {
return { input, results, model, dispatch };
}
/** Latest assistant message text for a chat — the FULL worker output (≤50k as
* the dispatcher persists it), NOT the ≤500-char tasks.output_summary (C3). */
async function readChatOutput(chatId: string): Promise<string> {
const [m] = await sql<{ content: string | null }[]>`
SELECT content FROM messages
WHERE chat_id = ${chatId} AND role = 'assistant'
ORDER BY created_at DESC LIMIT 1
`;
return m?.content ?? '';
}
async function readTaskOutput(taskId: string): Promise<string> {
const [t] = await sql<{ chat_id: string | null }[]>`SELECT chat_id FROM tasks WHERE id = ${taskId}`;
return t?.chat_id ? readChatOutput(t.chat_id) : '';
}
/** A synthetic session + chat for one agent step's stream (D-6). One session
* per step so the one-shot path's single-open-chat pick lands on this chat. */
async function createStepChat(
projectId: string,
model: string,
flowName: string,
label: string,
): Promise<string> {
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${projectId}, ${`Flow ${flowName} · ${label}`}, ${model}, 'open')
RETURNING id
`;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${session!.id}, ${label}, 'open')
RETURNING id
`;
return chat!.id;
}
// ─── launch ──────────────────────────────────────────────────────────────────
async function launch(opts: LaunchOpts): Promise<{ runId: string }> {
const flow = getFlow(opts.flowName);
if (!flow) throw new Error(`unknown flow: ${opts.flowName}`);
const model = (opts.model && opts.model.trim()) || config.DEFAULT_MODEL;
const input: FlowInput = { ...opts.input, band: opts.band };
if (typeof input.question !== 'string' || !input.question.trim()) {
throw new Error('flow input requires a non-empty "question"');
}
const [run] = await sql<{ id: string }[]>`
INSERT INTO flow_runs (project_id, flow_name, band, model, status, input)
VALUES (${opts.projectId}, ${opts.flowName}, ${opts.band}, ${model}, 'running', ${sql.json(input as never)})
RETURNING id
`;
const runId = run!.id;
// Manifest = every step not pre-skipped by its when() guard at launch (band /
// repo gating). Agent steps get a synthetic chat now so the roster carries
// their chat_id; code steps get a row but no chat (not in the roster frame).
const launchCtx = buildCtx(input, {}, model);
const manifest = manifestSteps(flow, launchCtx);
const frameSteps: Array<Record<string, unknown>> = [];
for (const step of manifest) {
let chatId: string | null = null;
if (step.kind === 'agent') {
chatId = await createStepChat(opts.projectId, model, opts.flowName, humanize(step.id));
}
await sql`
INSERT INTO flow_steps (run_id, step_id, kind, agent, status, chat_id)
VALUES (${runId}, ${step.id}, ${step.kind}, ${step.agent ?? null}, 'pending', ${chatId})
`;
if (step.kind === 'agent' && chatId) {
frameSteps.push({
step_id: step.id,
agent: step.agent,
kind: 'agent',
chat_id: chatId,
label: humanize(step.id),
});
}
}
publishUser({
type: 'flow_run_started',
run_id: runId,
flow_name: opts.flowName,
band: opts.band,
steps: frameSteps,
});
void advance(runId);
return { runId };
}
// ─── advance (serialized per run) ─────────────────────────────────────────────
function advance(runId: string): Promise<void> {
const prev = advanceChain.get(runId) ?? Promise.resolve();
const next = prev
.catch(() => {})
.then(() => advanceInner(runId).catch((err) => {
log.error({ err: errMsg(err), runId }, 'flow-runner: advance failed');
}));
advanceChain.set(runId, next);
void next.finally(() => {
if (advanceChain.get(runId) === next) advanceChain.delete(runId);
});
return next;
}
async function advanceInner(runId: string): Promise<void> {
const [run] = await sql<{ project_id: string; flow_name: string; model: string; input: FlowInput; status: string }[]>`
SELECT project_id, flow_name, model, input, status FROM flow_runs WHERE id = ${runId}
`;
if (!run) return;
// Terminal (incl. 'cancelled' from a stop) → stop advancing. This makes the
// runner idempotent against duplicate terminal callbacks and respects a stop.
if (run.status !== 'running') return;
const flow = getFlow(run.flow_name);
if (!flow) {
await failRun(runId, flow, run.input, run.model, `unknown flow: ${run.flow_name}`);
return;
}
const input = run.input;
const model = run.model;
const dispatch: DispatchFn = (agent, task) => dispatchSubAgent(run.project_id, model, agent, task);
const rows = await sql<FlowStepRow[]>`
SELECT step_id, kind, agent, status, chat_id, output FROM flow_steps WHERE run_id = ${runId}
`;
// Re-derive the excluded set (band/when pre-skips) from the flow def + input —
// excluded steps never got a row, so a dependent treats them as satisfied.
const launchCtx = buildCtx(input, {}, model, dispatch);
const manifestIds = new Set(manifestSteps(flow, launchCtx).map((s) => s.id));
const excluded = new Set(flow.steps.filter((s) => !manifestIds.has(s.id)).map((s) => s.id));
const done = new Set<string>();
const skipped = new Set<string>();
const inFlight = new Set<string>();
const results: Record<string, string> = {};
for (const r of rows) {
switch (r.status) {
case 'completed':
done.add(r.step_id);
if (r.output != null) results[r.step_id] = r.output;
break;
case 'skipped':
skipped.add(r.step_id);
break;
case 'running':
inFlight.add(r.step_id);
break;
case 'failed':
// A failed worker makes the deterministic report untrustworthy — fail the
// whole run (matches the Phase-1 CLI, which throws on a dispatch failure).
await failRun(runId, flow, input, model, `step '${r.step_id}' failed`, r.step_id);
return;
case 'cancelled':
await cancelRun(runId);
return;
// 'pending' → a candidate for the ready computation below
}
}
// Drain ready skips + code steps (synchronous), re-evaluating after each batch,
// then dispatch the full ready agent wave and wait for their terminal callbacks.
for (;;) {
const state: SchedulerState = { done, skipped, inFlight, excluded };
if (isRunComplete(flow, state)) {
await finishRun(runId, flow, input, results, model, dispatch);
return;
}
const ready = readySteps(flow, state);
if (ready.length === 0) {
if (inFlight.size > 0) return; // agents in flight will re-enter via the hook
await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle');
return;
}
const ctx = buildCtx(input, results, model, dispatch);
const { toRun, toSkip } = partitionReady(ready, ctx);
if (toSkip.length > 0) {
for (const s of toSkip) {
await markStep(runId, s.id, 'skipped');
skipped.add(s.id);
if (s.kind === 'agent') publishStep(runId, s.id, 'skipped');
}
continue; // re-evaluate — a skip can settle a fan-in step's deps
}
const codeReady = toRun.filter((s) => s.kind === 'code');
if (codeReady.length > 0) {
for (const s of codeReady) {
let out: string;
try {
// Code steps run IN-PROCESS (fold / synthesis-fold / code-review verify).
// verify uses ctx.dispatch → dispatchSubAgent (read-only qwen workers).
out = await s.run(buildCtx(input, results, model, dispatch));
} catch (err) {
await failRun(runId, flow, input, model, `code step '${s.id}' threw: ${errMsg(err)}`, s.id);
return;
}
await markStep(runId, s.id, 'completed', out);
results[s.id] = out;
done.add(s.id);
}
continue; // re-evaluate — code output can unblock the next wave
}
// Only agent steps remain ready → dispatch the whole parallel wave, then wait.
for (const s of toRun) {
await dispatchAgentStep(runId, run.project_id, model, s, ctx);
inFlight.add(s.id);
publishStep(runId, s.id, 'running');
}
return;
}
}
// ─── step execution ───────────────────────────────────────────────────────────
async function dispatchAgentStep(
runId: string,
projectId: string,
model: string,
step: Step,
ctx: StepContext,
): Promise<void> {
const [row] = await sql<{ chat_id: string | null }[]>`
SELECT chat_id FROM flow_steps WHERE run_id = ${runId} AND step_id = ${step.id}
`;
const chatId = row?.chat_id ?? null;
const [chat] = chatId
? await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}`
: [];
const sessionId = chat?.session_id ?? null;
// Build the worker prompt IN-PROCESS (D-1): persona + (task + contracts). The
// flow's step.run already bakes in the evidence/YAGNI contracts.
const persona = step.agent ? await loadPersona(step.agent) : '';
const taskPrompt = await step.run(ctx);
const fullPrompt = persona ? `${persona}\n\n---\n\n${taskPrompt}` : taskPrompt;
// READ-ONLY (D-4): agent='qwen', mode_id='plan' are hardcoded, never
// user-overridable. The dispatcher's qwen+plan rule forces the PTY hard gate.
const [task] = await sql<{ id: string }[]>`
INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id)
VALUES (${projectId}, ${fullPrompt}, 'qwen', ${model}, 'plan', ${sessionId}, ${chatId})
RETURNING id
`;
await sql`
UPDATE flow_steps
SET task_id = ${task!.id}, status = 'running', input = ${fullPrompt}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.id}
`;
}
/**
* ctx.dispatch for code steps: dispatch a read-only qwen worker under a Han
* persona and resolve with its full output. The sub-task has no flow_steps row;
* handleTaskTerminal resolves the waiter rather than advancing a run.
*/
function dispatchSubAgent(
projectId: string,
model: string,
persona: string,
task: string,
): Promise<string> {
return (async () => {
const personaText = await loadPersona(persona).catch(() => '');
const prompt = personaText ? `${personaText}\n\n---\n\n${task}` : task;
const chatId = await createStepChat(projectId, model, 'sub', humanize(persona));
const [chat] = await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}`;
const [t] = await sql<{ id: string }[]>`
INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id)
VALUES (${projectId}, ${prompt}, 'qwen', ${model}, 'plan', ${chat?.session_id ?? null}, ${chatId})
RETURNING id
`;
return await new Promise<string>((resolve) => {
subDispatchWaiters.set(t!.id, resolve);
});
})();
}
async function markStep(
runId: string,
stepId: string,
status: 'completed' | 'skipped',
output?: string,
): Promise<void> {
if (output !== undefined) {
await sql`
UPDATE flow_steps SET status = ${status}, output = ${output}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${stepId}
`;
} else {
await sql`
UPDATE flow_steps SET status = ${status}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${stepId}
`;
}
}
// ─── run completion ─────────────────────────────────────────────────────────
async function finishRun(
runId: string,
flow: Flow,
input: FlowInput,
results: Record<string, string>,
model: string,
dispatch: DispatchFn,
): Promise<void> {
let report: string;
try {
report = flow.render(buildCtx(input, results, model, dispatch));
} catch (err) {
await failRun(runId, flow, input, model, `report render threw: ${errMsg(err)}`);
return;
}
const updated = await sql`
UPDATE flow_runs SET status = 'completed', report = ${report}, updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return; // already terminal (e.g. cancelled) — don't publish
publishStep(runId, lastAgentStepId(flow, input, model), 'completed', {
run_status: 'completed',
report,
});
}
async function failRun(
runId: string,
flow: Flow | undefined,
input: FlowInput,
model: string,
error: string,
failedStepId?: string,
): Promise<void> {
const updated = await sql`
UPDATE flow_runs SET status = 'failed', error = ${error}, updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return;
const stepId = failedStepId ?? (flow ? lastAgentStepId(flow, input, model) : 'run');
log.warn({ runId, error }, 'flow-runner: run failed');
publishStep(runId, stepId, 'failed', { run_status: 'failed' });
}
async function cancelRun(runId: string): Promise<void> {
// A step was cancelled (e.g. PTY killed) while the run was still 'running' —
// mark the run cancelled and publish frames for any remaining pending steps.
const updated = await sql`
UPDATE flow_runs SET status = 'cancelled', updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return; // idempotent — already terminal
// Any remaining pending steps are unreachable; mark + publish them so the
// pane can show them as cancelled rather than stuck in pending.
const pending = await sql<{ step_id: string; kind: string }[]>`
SELECT step_id, kind FROM flow_steps
WHERE run_id = ${runId} AND status IN ('pending', 'running')
`;
if (pending.length > 0) {
await sql`
UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND status IN ('pending', 'running')
`;
for (const s of pending) {
if (s.kind === 'agent') publishStep(runId, s.step_id, 'cancelled', { run_status: 'cancelled' });
}
}
log.info({ runId }, 'flow-runner: run cancelled');
}
/** The terminal agent step in roster order — a valid roster step_id to carry the
* run-level completion/failure status (the pane reads run_status/report off it). */
function lastAgentStepId(flow: Flow, input: FlowInput, model: string): string {
const agents = manifestSteps(flow, buildCtx(input, {}, model)).filter((s) => s.kind === 'agent');
return agents.length ? agents[agents.length - 1]!.id : (flow.steps[0]?.id ?? 'run');
}
function publishStep(
runId: string,
stepId: string,
status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled',
extra?: { run_status?: 'running' | 'completed' | 'failed' | 'cancelled'; report?: string },
): void {
publishUser({
type: 'flow_run_step_updated',
run_id: runId,
step_id: stepId,
status,
...(extra ?? {}),
});
}
// ─── terminal callback (wired to createDispatcher.onTaskTerminal) ─────────────
function handleTaskTerminal(taskId: string, state: string): void {
void (async () => {
// 1. A ctx.dispatch sub-task → resolve its waiter with the full output.
const waiter = subDispatchWaiters.get(taskId);
if (waiter) {
subDispatchWaiters.delete(taskId);
const out = await readTaskOutput(taskId).catch(() => '');
waiter(out);
return;
}
// 2. A flow step task → mark the step, then advance its run.
const [step] = await sql<{ run_id: string; step_id: string; chat_id: string | null; status: string }[]>`
SELECT run_id, step_id, chat_id, status FROM flow_steps WHERE task_id = ${taskId}
`;
if (!step) return; // not a flow task
if (step.status !== 'running') return; // idempotent against duplicate callbacks
if (state === 'completed') {
const output = step.chat_id ? await readChatOutput(step.chat_id).catch(() => '') : '';
await sql`
UPDATE flow_steps SET status = 'completed', output = ${output}, updated_at = clock_timestamp()
WHERE run_id = ${step.run_id} AND step_id = ${step.step_id} AND status = 'running'
`;
publishStep(step.run_id, step.step_id, 'completed');
} else {
const stepStatus = state === 'cancelled' ? 'cancelled' : 'failed';
await sql`
UPDATE flow_steps SET status = ${stepStatus}, updated_at = clock_timestamp()
WHERE run_id = ${step.run_id} AND step_id = ${step.step_id} AND status = 'running'
`;
if (stepStatus === 'failed') {
publishStep(step.run_id, step.step_id, 'failed');
} else {
// A cancelled step always triggers run cancellation (advanceInner path).
// Include run_status: 'cancelled' here so single-step flows get a
// complete run-level cancel frame before cancelRun runs.
publishStep(step.run_id, step.step_id, 'cancelled', { run_status: 'cancelled' });
}
}
await advance(step.run_id);
})().catch((err) => {
log.error({ err: errMsg(err), taskId }, 'flow-runner: handleTaskTerminal failed');
});
}
// ─── startup resume (D-9) ─────────────────────────────────────────────────────
/**
* Apply one step's resume decision to the DB, then return (the caller drives the
* loop). Re-dispatch reuses the prompt already stored in flow_steps.input (built
* in-process at launch per D-1) so the flow def doesn't need to be reloaded and
* ctx.dispatch closures don't re-execute.
*/
async function applyResumeDecision(
runId: string,
projectId: string,
model: string,
step: { step_id: string; task_id: string | null; chat_id: string | null; input: string | null },
decision: StepResumeDecision,
): Promise<void> {
switch (decision.action) {
case 'keep':
break;
case 'mark-done': {
const output = step.chat_id ? await readChatOutput(step.chat_id).catch(() => '') : '';
await sql`
UPDATE flow_steps
SET status = 'completed', output = ${output}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
break;
}
case 'mark-failed':
await sql`
UPDATE flow_steps
SET status = 'failed', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
break;
case 'mark-cancelled':
await sql`
UPDATE flow_steps
SET status = 'cancelled', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
break;
case 're-dispatch': {
// No stored prompt means we can't rebuild the task input (the step never
// reached the dispatch stage). Fail it so advance() fails the run cleanly.
if (!step.input) {
await sql`
UPDATE flow_steps
SET status = 'failed', error = 're-dispatch: no stored prompt',
updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
break;
}
// Reuse stored chat / session so the stream still rides the pre-created
// synthetic chat (D-6). READ-ONLY (D-4): agent='qwen', mode_id='plan'.
const chatId = step.chat_id;
const [chat] = chatId
? await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatId}`
: [];
const sessionId = chat?.session_id ?? null;
const [task] = await sql<{ id: string }[]>`
INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id)
VALUES (${projectId}, ${step.input}, 'qwen', ${model}, 'plan', ${sessionId}, ${chatId})
RETURNING id
`;
await sql`
UPDATE flow_steps
SET task_id = ${task!.id}, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${step.step_id}
`;
log.info({ runId, stepId: step.step_id, taskId: task!.id }, 'flow-runner: step re-dispatched on resume');
break;
}
}
}
async function resumeRun(run: {
id: string;
project_id: string;
model: string;
}): Promise<void> {
const rows = await sql<{
step_id: string;
task_id: string | null;
status: string;
chat_id: string | null;
input: string | null;
}[]>`SELECT step_id, task_id, status, chat_id, input FROM flow_steps WHERE run_id = ${run.id}`;
// Load task states for all referenced tasks in one query.
const taskIds = rows.map((r) => r.task_id).filter((id): id is string => id !== null);
const taskStates = new Map<string, string>();
if (taskIds.length > 0) {
const tasks = await sql<{ id: string; state: string }[]>`
SELECT id, state FROM tasks WHERE id = ANY(${taskIds})
`;
for (const t of tasks) taskStates.set(t.id, t.state);
}
const decisions = reconcileRun(
rows.map((r) => ({ stepId: r.step_id, taskId: r.task_id, status: r.status })),
taskStates,
);
for (const decision of decisions) {
if (decision.action === 'keep') continue;
const row = rows.find((r) => r.step_id === decision.stepId)!;
await applyResumeDecision(run.id, run.project_id, run.model, row, decision);
}
// advance() re-derives the ready wave and dispatches it; new re-dispatched tasks
// are already 'pending' and will be picked up by the dispatcher's startup poll.
await advance(run.id);
log.info({ runId: run.id }, 'flow-runner: run resumed');
}
async function initResume(): Promise<void> {
const runs = await sql<{ id: string; project_id: string; model: string }[]>`
SELECT id, project_id, model FROM flow_runs WHERE status = 'running'
`;
if (runs.length === 0) return;
log.info({ count: runs.length }, 'flow-runner: resuming in-flight runs on startup');
for (const run of runs) {
await resumeRun(run).catch((err) => {
log.error({ err: errMsg(err), runId: run.id }, 'flow-runner: initResume failed for run');
});
}
}
// ─── cancel (Phase 6 stop route) ─────────────────────────────────────────────
async function cancel(runId: string): Promise<{ cancelled: boolean; taskIds: string[] }> {
const updated = await sql`
UPDATE flow_runs SET status = 'cancelled', updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return { cancelled: false, taskIds: [] };
// Mark all non-terminal steps cancelled and collect in-flight task_ids.
const steps = await sql<{ step_id: string; task_id: string | null; kind: string }[]>`
SELECT step_id, task_id, kind FROM flow_steps
WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped')
`;
if (steps.length > 0) {
await sql`
UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped')
`;
for (const s of steps) {
if (s.kind === 'agent') publishStep(runId, s.step_id, 'cancelled', { run_status: 'cancelled' });
}
} else {
// Run was 'running' but no non-terminal steps — edge case (advance in progress).
// Re-publish the last agent step with run_status: 'cancelled' so the pane updates.
const [last] = await sql<{ step_id: string }[]>`
SELECT step_id FROM flow_steps WHERE run_id = ${runId} AND kind = 'agent'
ORDER BY created_at DESC LIMIT 1
`;
if (last) publishStep(runId, last.step_id, 'cancelled', { run_status: 'cancelled' });
}
const taskIds = steps
.filter((s): s is typeof s & { task_id: string } => s.task_id !== null)
.map((s) => s.task_id);
log.info({ runId }, 'flow-runner: run cancelled by request');
return { cancelled: true, taskIds };
}
return { launch, handleTaskTerminal, initResume, cancel };
}
function errMsg(e: unknown): string {
return e instanceof Error ? e.message : String(e);
}