/** * The conductor itself: a deterministic wave scheduler over a flow's steps. * * Each tick, every step whose dependencies are all satisfied runs concurrently * (the fan-out). The scheduler blocks on each wave before the next (the * fan-in / barrier on deps). `agent` steps dispatch a Han worker; `code` steps * run pure TS. Sequencing, parallelism, and the fold all live HERE, in code — * never in a model's context. */ import type { Flow, FlowInput, RunResult, StepContext } from './types.js'; import { dispatchAgent } from './dispatch.js'; export interface RunOptions { onLog?: (msg: string) => void; } export async function runFlow(flow: Flow, input: FlowInput, opts: RunOptions = {}): Promise { const log = opts.onLog ?? ((m: string) => console.error(m)); const results: Record = {}; const ctx = (): StepContext => ({ input, results }); const done = new Set(); const skipped = new Set(); const total = flow.steps.length; while (done.size + skipped.size < total) { const ready = flow.steps.filter( (s) => !done.has(s.id) && !skipped.has(s.id) && (s.deps ?? []).every((d) => done.has(d) || skipped.has(d)), ); if (ready.length === 0) { throw new Error('conductor: dependency cycle or unsatisfiable deps among remaining steps'); } await Promise.all( ready.map(async (s) => { if (s.when && !s.when(ctx())) { skipped.add(s.id); log(`↷ skip ${s.id}`); return; } const started = Date.now(); log(`▶ ${s.id}${s.agent ? ` → ${s.agent}` : ' (code)'}`); const produced = await s.run(ctx()); // agent steps: run() built the prompt → dispatch a worker. // code steps: run() already produced the result. const result = s.kind === 'agent' ? await dispatchAgent(s.agent!, produced) : produced; results[s.id] = result; done.add(s.id); log(`✓ ${s.id} (${secs(started)}s, ${result.length} chars)`); }), ); } return { results, artifact: flow.render(ctx()), outputPath: flow.output?.(ctx()), }; } function secs(since: number): number { return Math.round((Date.now() - since) / 1000); }