feat: in-app Orchestrator (Phase 2) — multi-agent conductor
Brings the deterministic Han-flow conductor into BooCode: launch any read-only flow from BooChat or BooCoder, watch each agent stream live in a Paseo-style run pane, get an evidence-disciplined report — on local Qwen, persisted and resumable. Read-only enforced hard via qwen --approval-mode plan (orchestrator tasks fail closed if qwen is unavailable; never fall to write-capable native). Backend (apps/coder): re-homed conductor defs, flow_runs/flow_steps schema, flow-runner + dispatcher onTaskTerminal hook, restart-resume, runs routes (launch/list/get/cancel), user-channel WS. Contracts: two flow_run_* frames. Web: orchestrator pane kind + OrchestratorPane, Workflow button + slash flows (BooChat/BooCoder parity), FlowLauncherDialog, "New Orchestrator" in the + and split menus, runs history + export. Plan: openspec/changes/orchestrator.
This commit is contained in:
66
conductor/src/flow.ts
Normal file
66
conductor/src/flow.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
/**
|
||||
* The conductor itself: a deterministic wave scheduler over a flow's steps.
|
||||
*
|
||||
* Each tick, every step whose dependencies are all satisfied runs concurrently
|
||||
* (the fan-out). The scheduler blocks on each wave before the next (the
|
||||
* fan-in / barrier on deps). `agent` steps dispatch a Han worker; `code` steps
|
||||
* run pure TS. Sequencing, parallelism, and the fold all live HERE, in code —
|
||||
* never in a model's context.
|
||||
*/
|
||||
import type { Flow, FlowInput, RunResult, StepContext } from './types.js';
|
||||
import { dispatchAgent } from './dispatch.js';
|
||||
|
||||
export interface RunOptions {
|
||||
onLog?: (msg: string) => void;
|
||||
}
|
||||
|
||||
export async function runFlow(flow: Flow, input: FlowInput, opts: RunOptions = {}): Promise<RunResult> {
|
||||
const log = opts.onLog ?? ((m: string) => console.error(m));
|
||||
const results: Record<string, string> = {};
|
||||
const ctx = (): StepContext => ({ input, results });
|
||||
|
||||
const done = new Set<string>();
|
||||
const skipped = new Set<string>();
|
||||
const total = flow.steps.length;
|
||||
|
||||
while (done.size + skipped.size < total) {
|
||||
const ready = flow.steps.filter(
|
||||
(s) =>
|
||||
!done.has(s.id) &&
|
||||
!skipped.has(s.id) &&
|
||||
(s.deps ?? []).every((d) => done.has(d) || skipped.has(d)),
|
||||
);
|
||||
if (ready.length === 0) {
|
||||
throw new Error('conductor: dependency cycle or unsatisfiable deps among remaining steps');
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
ready.map(async (s) => {
|
||||
if (s.when && !s.when(ctx())) {
|
||||
skipped.add(s.id);
|
||||
log(`↷ skip ${s.id}`);
|
||||
return;
|
||||
}
|
||||
const started = Date.now();
|
||||
log(`▶ ${s.id}${s.agent ? ` → ${s.agent}` : ' (code)'}`);
|
||||
const produced = await s.run(ctx());
|
||||
// agent steps: run() built the prompt → dispatch a worker.
|
||||
// code steps: run() already produced the result.
|
||||
const result = s.kind === 'agent' ? await dispatchAgent(s.agent!, produced) : produced;
|
||||
results[s.id] = result;
|
||||
done.add(s.id);
|
||||
log(`✓ ${s.id} (${secs(started)}s, ${result.length} chars)`);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
results,
|
||||
artifact: flow.render(ctx()),
|
||||
outputPath: flow.output?.(ctx()),
|
||||
};
|
||||
}
|
||||
|
||||
function secs(since: number): number {
|
||||
return Math.round((Date.now() - since) / 1000);
|
||||
}
|
||||
Reference in New Issue
Block a user