feat(coder): flow-runner decisions, conductor types, collision detection tests

- Add flow-runner-decisions.ts: decision-aware step execution
- Extend flow-runner.ts: dynamic step decisions
- Extend conductor types: additional flow state types
- Add collision-detector.test.ts: edit collision unit tests
- Add conflict-index.test.ts: conflict resolution index tests
This commit is contained in:
2026-06-08 03:48:58 +00:00
parent d360051329
commit 25590071ef
5 changed files with 422 additions and 10 deletions

View File

@@ -32,7 +32,7 @@
* already emits. (Phase 8 wires the OrchestratorPane's subscription to both.)
*/
import type { Sql } from '../db.js';
import type { Broker } from '@boocode/server/broker';
import type { Broker, Frame, Listener } from '@boocode/server/broker';
import type { WsFrame } from '@boocode/contracts/ws-frames';
import type { FastifyBaseLogger } from 'fastify';
import type { Config } from '../config.js';
@@ -42,6 +42,7 @@ import type { Band, DispatchFn, Flow, FlowInput, Step, StepContext } from '../co
import {
buildBatchState,
getReadyInBatch,
isLoopTerminated,
isRunComplete,
manifestSteps,
partitionReady,
@@ -98,7 +99,7 @@ interface Deps {
interface FlowStepRow {
step_id: string;
kind: 'agent' | 'code' | 'switch';
kind: 'agent' | 'code' | 'switch' | 'do_while';
agent: string | null;
status: string;
chat_id: string | null;
@@ -118,6 +119,10 @@ export function createFlowRunner(deps: Deps): FlowRunner {
// taskId → resolver map. These tasks have NO flow_steps row; handleTaskTerminal
// resolves them here instead of advancing a run.
const subDispatchWaiters = new Map<string, (output: string) => void>();
/** Per-DO_WHILE step iteration count; persists across advance() calls. */
const loopIterations = new Map<string, number>();
/** Per-run messaging subscriptions; cleaned up when the run terminates. */
const messagingCleanups = new Map<string, Set<() => void>>();
function publishUser(frame: Record<string, unknown>): void {
broker.publishUserFrame('default', frame as unknown as WsFrame);
@@ -134,8 +139,42 @@ export function createFlowRunner(deps: Deps): FlowRunner {
results: Record<string, string>,
model: string,
dispatch?: DispatchFn,
runId?: string,
stepId?: string,
): StepContext {
return { input, results, model, dispatch };
let messaging: StepContext['messaging'] = undefined;
if (runId) {
if (!messagingCleanups.has(runId)) {
messagingCleanups.set(runId, new Set());
}
const subs = messagingCleanups.get(runId)!;
messaging = {
publish(channel: string, message: unknown) {
const content = typeof message === 'string' ? message : JSON.stringify(message);
const topic = `run:${runId}:${channel}`;
const frame = {
type: 'agent_message' as const,
run_id: runId,
sender_step_id: stepId ?? '',
content,
...(channel ? { channel } : {}),
};
broker.publishUserFrame('default', frame as unknown as WsFrame);
broker.publish(topic, frame as unknown as Frame);
},
subscribe(channel: string, handler: (msg: unknown) => void) {
const topic = `run:${runId}:${channel}`;
const listener: Listener = (f) => { handler(f); };
const unsub = broker.subscribe(topic, listener);
subs.add(unsub);
return () => {
unsub();
subs.delete(unsub);
};
},
};
}
return { input, results, model, dispatch, messaging };
}
/** Latest assistant message text for a chat — the FULL worker output (≤50k as
@@ -378,7 +417,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
for (;;) {
// Build per-batch state from the current inFlight set for batch parallelism gating.
const batchState = buildBatchState(flow, inFlight);
const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut, batchState, switchResults: switchExcluded };
const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut, batchState, switchResults: switchExcluded, loopIterations };
if (isRunComplete(flow, state)) {
await finishRun(runId, flow, input, results, model, dispatch);
@@ -387,7 +426,46 @@ export function createFlowRunner(deps: Deps): FlowRunner {
const ready = getReadyInBatch(readySteps(flow, state), state, flow);
if (ready.length === 0) {
if (inFlight.size > 0) return; // agents in flight will re-enter via the hook
// Before declaring stuck, check for running DO_WHILE steps whose body
// is fully done — triggers the next loop iteration or terminates.
if (inFlight.size > 0) {
let doWhileReEval = false;
for (const s of flow.steps) {
if (s.kind !== 'do_while' || !s.loopBody || s.loopBody.length === 0) continue;
if (!inFlight.has(s.id)) continue;
if (!s.loopBody.every((bId) => done.has(bId))) continue;
doWhileReEval = true;
const iterations = loopIterations.get(s.id) ?? 0;
const dwCtx = buildCtx(input, results, model, dispatch);
if (isLoopTerminated(s, dwCtx, iterations)) {
await markStep(runId, s.id, 'completed');
done.add(s.id);
results[s.id] = '';
inFlight.delete(s.id);
publishStep(runId, s.id, 'completed');
} else {
await sql`
UPDATE flow_steps SET status = 'running', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${s.id}
`;
inFlight.add(s.id);
loopIterations.set(s.id, iterations + 1);
for (const bodyId of s.loopBody) {
done.delete(bodyId);
delete results[bodyId];
await sql`
UPDATE flow_steps
SET status = 'pending', output = NULL, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${bodyId}
`;
}
publishStep(runId, s.id, 'running');
}
break; // one DO_WHILE at a time
}
if (doWhileReEval) continue;
return; // genuine inFlight agents with no ready steps
}
await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle');
return;
}
@@ -429,6 +507,49 @@ export function createFlowRunner(deps: Deps): FlowRunner {
continue; // re-evaluate — excluded steps may unblock dependents
}
// DO_WHILE steps: first-activation only (ready to run for the first time).
// Re-evaluation of running DO_WHILE steps whose body is complete is handled
// in the `ready.length === 0` block above (Path 1) — this avoids duplicate
// SQL updates and competing state mutations.
const doWhileReady = toRun.filter((s) => s.kind === 'do_while');
if (doWhileReady.length > 0) {
for (const s of doWhileReady) {
const iterations = loopIterations.get(s.id) ?? 0;
const dwCtx = buildCtx(input, results, model, dispatch);
if (isLoopTerminated(s, dwCtx, iterations)) {
// Loop done — mark DO_WHILE completed. Body steps stay in their
// current state (already done from the last iteration).
await markStep(runId, s.id, 'completed');
done.add(s.id);
results[s.id] = '';
inFlight.delete(s.id);
publishStep(runId, s.id, 'completed');
} else {
// Start or continue the loop.
await sql`
UPDATE flow_steps SET status = 'running', updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${s.id}
`;
inFlight.add(s.id);
loopIterations.set(s.id, iterations + 1);
// On re-iteration, reset body steps from 'completed' back to 'pending'.
if (iterations > 0 && s.loopBody) {
for (const bodyId of s.loopBody) {
done.delete(bodyId);
delete results[bodyId];
await sql`
UPDATE flow_steps
SET status = 'pending', output = NULL, updated_at = clock_timestamp()
WHERE run_id = ${runId} AND step_id = ${bodyId}
`;
}
}
publishStep(runId, s.id, 'running');
}
}
continue; // re-evaluate — body steps may be newly pending
}
const codeReady = toRun.filter((s) => s.kind === 'code');
if (codeReady.length > 0) {
for (const s of codeReady) {
@@ -436,7 +557,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
try {
// Code steps run IN-PROCESS (fold / synthesis-fold / code-review verify).
// verify uses ctx.dispatch → dispatchSubAgent (read-only qwen workers).
out = await s.run(buildCtx(input, results, model, dispatch));
out = await s.run(buildCtx(input, results, model, dispatch, runId, s.id));
} catch (err) {
await failRun(runId, flow, input, model, `code step '${s.id}' threw: ${errMsg(err)}`, s.id);
return;
@@ -559,6 +680,14 @@ export function createFlowRunner(deps: Deps): FlowRunner {
await appendStepEvent(sql, runId, stepId, status, output ? { outputLength: output.length } : undefined);
}
function cleanupMessaging(runId: string): void {
const cleanups = messagingCleanups.get(runId);
if (cleanups) {
for (const fn of cleanups) fn();
messagingCleanups.delete(runId);
}
}
// ─── run completion ─────────────────────────────────────────────────────────
async function finishRun(
@@ -580,12 +709,16 @@ export function createFlowRunner(deps: Deps): FlowRunner {
UPDATE flow_runs SET status = 'completed', report = ${report}, updated_at = clock_timestamp()
WHERE id = ${runId} AND status = 'running'
`;
if (updated.count === 0) return; // already terminal (e.g. cancelled) — don't publish
if (updated.count === 0) {
cleanupMessaging(runId);
return; // already terminal (e.g. cancelled) — don't publish
}
deps.onRunTerminal?.(runId, 'completed');
publishStep(runId, lastAgentStepId(flow, input, model), 'completed', {
run_status: 'completed',
report,
});
cleanupMessaging(runId);
}
async function failRun(
@@ -606,6 +739,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
log.warn({ runId, error }, 'flow-runner: run failed');
await appendStepEvent(sql, runId, stepId, 'failed', { error });
publishStep(runId, stepId, 'failed', { run_status: 'failed' });
cleanupMessaging(runId);
}
async function cancelRun(runId: string): Promise<void> {
@@ -633,6 +767,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
}
}
log.info({ runId }, 'flow-runner: run cancelled');
cleanupMessaging(runId);
}
/** The terminal agent step in roster order — a valid roster step_id to carry the
@@ -918,6 +1053,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
.map((s) => s.task_id);
log.info({ runId }, 'flow-runner: run cancelled by request');
cleanupMessaging(runId);
return { cancelled: true, taskIds };
}