Files
boocode/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts
indifferentketchup 74da084521 feat(conductor): Wave 2 — parallel batch execution + SWITCH branching step
- Parallel batch execution: batch field on Step, batchConfig on Flow,
  batch-aware readySteps with maxConcurrent gating, getReadyInBatch helper
- SWITCH branching step: new 'switch' StepKind with cases/programmed conditions,
  resolveSwitch() pure function, switch-excluded steps tracked in
  SchedulerState, non-selected branches excluded from execution
2026-06-08 03:00:06 +00:00

832 lines
32 KiB
TypeScript

import { describe, it, expect } from 'vitest';
import type { Flow, Step, StepContext } from '../../conductor/types.js';
import {
buildBatchState,
getReadyInBatch,
manifestSteps,
partitionReady,
readySteps,
isRunComplete,
isStuck,
reconcileResumeStep,
reconcileRun,
resolveSwitch,
shouldFailOnMissingAgent,
type SchedulerState,
} from '../flow-runner-decisions.js';
import type { StepContext } from '../../conductor/types.js';
/**
* The DB-driven flow-runner replaces the Phase-1 in-memory wave scheduler
* (conductor/src/flow.ts). These pure helpers are that scheduler's decision
* core, lifted out so the DB-backed runner stays a thin IO shell over a
* testable kernel (repo pattern: turn-guard.ts / lifecycle-decisions.ts).
*
* The schedule must match conductor/flow.ts:27-41 exactly: a step is ready when
* every dependency is settled (completed OR skipped/excluded), and a ready step
* is skipped when its when() guard returns false against the current context.
*/
// A small synthetic flow exercising: parallel angles, a band-gated angle, a code
// fold that fans them in, an agent synthesizer, and a terminal validator.
function makeFlow(): Flow {
const steps: Step[] = [
{ id: 'a', kind: 'agent', agent: 'analyst-a', run: () => 'prompt a' },
{
id: 'b',
kind: 'agent',
agent: 'analyst-b',
// band-gated: only runs at medium+ (mirrors Angle.minBand gating)
when: (ctx) => ctx.input.band === 'medium' || ctx.input.band === 'large',
run: () => 'prompt b',
},
{ id: 'fold', kind: 'code', deps: ['a', 'b'], run: (ctx) => `fold:${Object.keys(ctx.results).join(',')}` },
{ id: 'synth', kind: 'agent', agent: 'architect', deps: ['fold'], run: () => 'prompt synth' },
{ id: 'val', kind: 'agent', agent: 'validator', deps: ['synth'], run: () => 'prompt val' },
];
return { name: 'demo', description: 'demo flow', steps, render: () => 'report' };
}
function ctxOf(band: string, results: Record<string, string> = {}): StepContext {
return { input: { question: 'q', band }, results };
}
const emptyState = (over: Partial<SchedulerState> = {}): SchedulerState => ({
done: new Set(),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
timedOut: new Set(),
switchResults: new Map(),
...over,
});
describe('manifestSteps', () => {
it('drops a band-gated step at small band, keeps it at medium', () => {
const flow = makeFlow();
const small = manifestSteps(flow, ctxOf('small')).map((s) => s.id);
expect(small).toEqual(['a', 'fold', 'synth', 'val']); // b excluded by when()
const medium = manifestSteps(flow, ctxOf('medium')).map((s) => s.id);
expect(medium).toEqual(['a', 'b', 'fold', 'synth', 'val']);
});
it('includes every step when no when() guards are defined', () => {
const flow: Flow = {
name: 'guardless',
description: 'no guards',
steps: [
{ id: 'x', kind: 'agent', agent: 'a', run: () => 'p' },
{ id: 'y', kind: 'code', deps: ['x'], run: () => 'r' },
],
render: () => '',
};
expect(manifestSteps(flow, ctxOf('small')).map((s) => s.id)).toEqual(['x', 'y']);
});
it('returns empty when every when() guard evaluates false', () => {
const flow: Flow = {
name: 'all-gated',
description: 'all filtered',
steps: [
{ id: 'p', kind: 'agent', agent: 'a', when: () => false, run: () => 'p' },
{ id: 'q', kind: 'agent', agent: 'b', when: () => false, run: () => 'q' },
],
render: () => '',
};
expect(manifestSteps(flow, ctxOf('small'))).toEqual([]);
});
});
describe('readySteps', () => {
it('returns only dep-free, unsettled steps first', () => {
const flow = makeFlow();
// small band → b is excluded; a is the only dep-free live step.
const state = emptyState({ excluded: new Set(['b']) });
expect(readySteps(flow, state).map((s) => s.id)).toEqual(['a']);
});
it('treats an excluded dependency as satisfied (fold unblocks once a is done)', () => {
const flow = makeFlow();
// a completed, b excluded (small band) → fold's deps [a,b] are both settled.
const state = emptyState({ done: new Set(['a']), excluded: new Set(['b']) });
expect(readySteps(flow, state).map((s) => s.id)).toEqual(['fold']);
});
it('does not re-offer a step that is in flight', () => {
const flow = makeFlow();
const state = emptyState({ inFlight: new Set(['a']), excluded: new Set(['b']) });
expect(readySteps(flow, state).map((s) => s.id)).toEqual([]);
});
it('waits on an unfinished dependency', () => {
const flow = makeFlow();
// fold done, synth done → val is ready; synth not done → val not ready.
const blocked = emptyState({ done: new Set(['a', 'fold']), excluded: new Set(['b']) });
expect(readySteps(flow, blocked).map((s) => s.id)).toEqual(['synth']);
});
it('returns both parallel dep-free steps when neither is excluded or settled', () => {
const flow = makeFlow();
// readySteps does NOT evaluate when() — that is partitionReady's job.
// Both a and b have no deps and are unsettled → both offered.
expect(readySteps(flow, emptyState()).map((s) => s.id)).toEqual(['a', 'b']);
});
it('does not unblock a step whose dep is in flight (dep not yet satisfied)', () => {
const flow = makeFlow();
// a is in flight → isSatisfied('a') === false → fold (deps:['a','b']) blocked.
// b excluded (satisfied). Nothing else has all deps satisfied → empty wave.
const state = emptyState({ inFlight: new Set(['a']), excluded: new Set(['b']) });
const ready = readySteps(flow, state).map((s) => s.id);
expect(ready).not.toContain('fold');
expect(ready).toEqual([]);
});
it('advances through the full wave chain: a+b+fold+synth done → val is ready', () => {
const flow = makeFlow();
const state = emptyState({ done: new Set(['a', 'b', 'fold', 'synth']) });
expect(readySteps(flow, state).map((s) => s.id)).toEqual(['val']);
});
});
describe('partitionReady', () => {
it('routes a when()-false step to toSkip, the rest to toRun', () => {
const flow = makeFlow();
// At small band both a and b are "ready" if offered; b's guard fails → skip.
const ready = [flow.steps[0]!, flow.steps[1]!]; // a, b
const { toRun, toSkip } = partitionReady(ready, ctxOf('small'));
expect(toRun.map((s) => s.id)).toEqual(['a']);
expect(toSkip.map((s) => s.id)).toEqual(['b']);
});
it('keeps every guardless step in toRun', () => {
const flow = makeFlow();
const ready = [flow.steps[2]!, flow.steps[3]!]; // fold, synth (no when)
const { toRun, toSkip } = partitionReady(ready, ctxOf('large'));
expect(toRun.map((s) => s.id)).toEqual(['fold', 'synth']);
expect(toSkip).toEqual([]);
});
it('handles an empty ready list gracefully', () => {
const { toRun, toSkip } = partitionReady([], ctxOf('small'));
expect(toRun).toEqual([]);
expect(toSkip).toEqual([]);
});
});
describe('isRunComplete / isStuck', () => {
it('is complete when every step is settled or excluded', () => {
const flow = makeFlow();
const state = emptyState({
done: new Set(['a', 'fold', 'synth', 'val']),
excluded: new Set(['b']),
});
expect(isRunComplete(flow, state)).toBe(true);
expect(isStuck(flow, state)).toBe(false);
});
it('is not complete while a step is pending', () => {
const flow = makeFlow();
const state = emptyState({ done: new Set(['a']), excluded: new Set(['b']) });
expect(isRunComplete(flow, state)).toBe(false);
});
it('is not stuck while a ready step still exists', () => {
const flow = makeFlow();
// a done, b excluded → fold is ready, so the run can still progress.
const state = emptyState({ done: new Set(['a']), excluded: new Set(['b']) });
expect(isStuck(flow, state)).toBe(false);
});
it('is stuck on an unsatisfiable dependency with nothing in flight', () => {
// 'orphan' depends on 'ghost', which is never produced (not a step, never
// settled) — and nothing is in flight to ever settle it: a dead end.
const cyclic: Flow = {
name: 'stuck',
description: 'unsatisfiable',
steps: [{ id: 'orphan', kind: 'agent', agent: 'x', deps: ['ghost'], run: () => 'p' }],
render: () => 'r',
};
const state = emptyState();
expect(readySteps(cyclic, state)).toEqual([]);
expect(isRunComplete(cyclic, state)).toBe(false);
expect(isStuck(cyclic, state)).toBe(true);
});
it('is complete when every step is skipped (no done, no excluded)', () => {
const flow = makeFlow();
const state = emptyState({ skipped: new Set(['a', 'b', 'fold', 'synth', 'val']) });
expect(isRunComplete(flow, state)).toBe(true);
expect(isStuck(flow, state)).toBe(false);
});
it('is complete when steps are spread across done + skipped + excluded', () => {
const flow = makeFlow();
// a done, b excluded, fold done, synth skipped, val done — all settled.
const state = emptyState({
done: new Set(['a', 'fold', 'val']),
skipped: new Set(['synth']),
excluded: new Set(['b']),
});
expect(isRunComplete(flow, state)).toBe(true);
expect(isStuck(flow, state)).toBe(false);
});
it('is NOT stuck when in-flight tasks exist even if no step is currently ready', () => {
const flow = makeFlow();
// a is in flight → fold's dep unsatisfied → nothing ready.
// But a is still running so the run CAN make progress → not stuck.
const state = emptyState({ inFlight: new Set(['a']), excluded: new Set(['b']) });
expect(isStuck(flow, state)).toBe(false);
expect(isRunComplete(flow, state)).toBe(false);
expect(readySteps(flow, state)).toEqual([]); // confirms nothing ready
});
});
// ─── SWITCH branching (v2.9) ─────────────────────────────────────────────────
describe('resolveSwitch', () => {
const baseCtx: StepContext = { input: { question: 'q', band: 'small' }, results: {} };
it('selects the first matching case and excludes other branches', () => {
const step: Step = {
id: 'router',
kind: 'switch',
run: () => '',
cases: [
{ label: 'a', condition: () => false, stepIds: ['a1', 'a2'] },
{ label: 'b', condition: () => true, stepIds: ['b1', 'b2'] },
{ label: 'c', condition: () => true, stepIds: ['c1', 'c2'] },
],
};
const result = resolveSwitch(step, baseCtx);
expect(result.chosenCase).toBe('b');
expect(result.excluded).toEqual(['a1', 'a2', 'c1', 'c2']);
});
it('falls back to defaultBranch when no case matches', () => {
const step: Step = {
id: 'router',
kind: 'switch',
run: () => '',
cases: [
{ label: 'x', condition: () => false, stepIds: ['x1'] },
{ label: 'y', condition: () => false, stepIds: ['y1'] },
],
defaultBranch: ['z1', 'z2'],
};
const result = resolveSwitch(step, baseCtx);
expect(result.chosenCase).toBeNull();
// Only case branch steps are excluded; default steps are not.
expect(result.excluded).toEqual(['x1', 'y1']);
});
it('excludes all branch steps when no case matches and no default', () => {
const step: Step = {
id: 'router',
kind: 'switch',
run: () => '',
cases: [
{ label: 'p', condition: () => false, stepIds: ['p1'] },
{ label: 'q', condition: () => false, stepIds: ['q1', 'q2'] },
],
};
const result = resolveSwitch(step, baseCtx);
expect(result.chosenCase).toBeNull();
expect(result.excluded).toEqual(['p1', 'q1', 'q2']);
});
it('excludes defaultBranch when a case matched', () => {
const step: Step = {
id: 'router',
kind: 'switch',
run: () => '',
cases: [
{ label: 'hit', condition: () => true, stepIds: ['h1'] },
{ label: 'miss', condition: () => false, stepIds: ['m1'] },
],
defaultBranch: ['d1'],
};
const result = resolveSwitch(step, baseCtx);
expect(result.chosenCase).toBe('hit');
expect(result.excluded).toEqual(['m1', 'd1']);
});
it('returns empty excluded for a degenerate switch with no cases and no default', () => {
const step: Step = {
id: 'noop',
kind: 'switch',
run: () => '',
};
const result = resolveSwitch(step, baseCtx);
expect(result.chosenCase).toBeNull();
expect(result.excluded).toEqual([]);
});
it('uses ctx.results in condition evaluation', () => {
const step: Step = {
id: 'router',
kind: 'switch',
run: () => '',
cases: [
{ label: 'has', condition: (ctx) => ctx.results['prev'] === 'yes', stepIds: ['yes-branch'] },
{ label: 'no', condition: () => true, stepIds: ['no-branch'] },
],
};
const ctxWithResult: StepContext = { input: { question: 'q', band: 'small' }, results: { prev: 'yes' } };
const result = resolveSwitch(step, ctxWithResult);
expect(result.chosenCase).toBe('has');
expect(result.excluded).toEqual(['no-branch']);
});
});
describe('readySteps with switch-excluded steps', () => {
// Flow: switch router → branch-a/branch-b → fold
function switchFlow(): Flow {
const steps: Step[] = [
{
id: 'switch', kind: 'switch', run: () => '',
cases: [
{ label: 'a', condition: () => true, stepIds: ['branch-a'] },
{ label: 'b', condition: () => false, stepIds: ['branch-b'] },
],
},
{ id: 'branch-a', kind: 'agent', agent: 'x', deps: ['switch'], run: () => 'p' },
{ id: 'branch-b', kind: 'agent', agent: 'y', deps: ['switch'], run: () => 'q' },
{ id: 'fold', kind: 'code', deps: ['branch-a', 'branch-b'], run: () => 'r' },
];
return { name: 'switch-demo', description: '', steps, render: () => '' };
}
it('excludes non-selected branch steps and treats them as satisfied deps', () => {
const flow = switchFlow();
// switch completed, branch-b excluded by switch (branch-a selected)
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
]);
const state: SchedulerState = {
done: new Set(['switch']),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
timedOut: new Set(),
switchResults: switchResult,
};
const ready = readySteps(flow, state).map((s) => s.id);
// branch-a is ready (dep switch is done), branch-b is excluded
expect(ready).toContain('branch-a');
expect(ready).not.toContain('branch-b');
});
it('fold unblocks once selected branch completes (excluded branch satisfied)', () => {
const flow = switchFlow();
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
]);
const state: SchedulerState = {
done: new Set(['switch', 'branch-a']),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
timedOut: new Set(),
switchResults: switchResult,
};
const ready = readySteps(flow, state).map((s) => s.id);
// fold's deps: branch-a done, branch-b excluded (via switch) → satisfied
expect(ready).toContain('fold');
});
it('fold stays blocked until selected branch completes, even with excluded dep', () => {
const flow = switchFlow();
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
]);
const state: SchedulerState = {
done: new Set(['switch']),
skipped: new Set(),
inFlight: new Set(['branch-a']),
excluded: new Set(),
timedOut: new Set(),
switchResults: switchResult,
};
const ready = readySteps(flow, state).map((s) => s.id);
// branch-a in flight, branch-b excluded — only branch-a offered
expect(ready).not.toContain('fold');
});
it('isRunComplete returns true when switch-excluded steps are the only unsettled', () => {
const flow = switchFlow();
// All non-excluded steps done; branch-b is excluded via switch
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
]);
const state: SchedulerState = {
done: new Set(['switch', 'branch-a', 'fold']),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
timedOut: new Set(),
switchResults: switchResult,
};
expect(isRunComplete(flow, state)).toBe(true);
expect(isStuck(flow, state)).toBe(false);
});
it('combines static excluded with switch-excluded', () => {
const flow = switchFlow();
// band gating excludes branch-b at launch, AND switch also excludes it
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
]);
const state: SchedulerState = {
done: new Set(['switch', 'branch-a']),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(['branch-b']),
timedOut: new Set(),
switchResults: switchResult,
};
// branch-b excluded both ways; fold sees branch-a done, branch-b excluded
const ready = readySteps(flow, state).map((s) => s.id);
expect(ready).toContain('fold');
});
});
// ─── Batch parallelism (v2.8.22) ─────────────────────────────────────────────
describe('buildBatchState', () => {
it('returns empty map when flow has no batchConfig', () => {
const flow: Flow = {
name: 'no-batch',
description: '',
steps: [
{ id: 'a', kind: 'agent', agent: 'x', run: () => 'p' },
{ id: 'b', kind: 'code', deps: ['a'], run: () => 'r' },
],
render: () => '',
};
const bs = buildBatchState(flow, new Set());
expect(bs.size).toBe(0);
});
it('maps each batch group to its running set and config', () => {
const flow: Flow = {
name: 'batched',
description: '',
steps: [
{ id: 'a1', kind: 'agent', agent: 'x', batch: 'review', run: () => 'p' },
{ id: 'a2', kind: 'agent', agent: 'y', batch: 'review', run: () => 'q' },
{ id: 'b1', kind: 'agent', agent: 'z', batch: 'check', run: () => 'r' },
{ id: 'fold', kind: 'code', deps: ['a1', 'a2', 'b1'], run: () => 's' },
],
render: () => '',
batchConfig: { maxConcurrent: 2 },
};
// a1 is in flight → review batch has 1 running, check has 0.
const bs = buildBatchState(flow, new Set(['a1']));
expect(bs.size).toBe(2);
const review = bs.get('review');
expect(review).toBeDefined();
expect([...review!.running]).toEqual(['a1']);
expect(review!.maxConcurrent).toBe(2);
expect(review!.joinRule).toBe('all_success');
const check = bs.get('check');
expect(check).toBeDefined();
expect(check!.running.size).toBe(0);
expect(check!.maxConcurrent).toBe(2);
});
it('uses joinRule from batchConfig when provided', () => {
const flow: Flow = {
name: 'join',
description: '',
steps: [
{ id: 'x', kind: 'agent', agent: 'a', batch: 'g1', run: () => 'p' },
],
render: () => '',
batchConfig: { maxConcurrent: 1, joinRule: 'one_success' },
};
const bs = buildBatchState(flow, new Set());
expect(bs.get('g1')!.joinRule).toBe('one_success');
});
it('ignores steps without a batch field', () => {
const flow: Flow = {
name: 'mixed',
description: '',
steps: [
{ id: 'a', kind: 'agent', agent: 'x', run: () => 'p' },
{ id: 'b', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' },
],
render: () => '',
batchConfig: { maxConcurrent: 3 },
};
const bs = buildBatchState(flow, new Set(['a', 'b']));
// a is inFlight but has no batch — it does not create an entry
expect(bs.size).toBe(1);
expect(bs.has('g1')).toBe(true);
expect(bs.get('g1')!.running.has('b')).toBe(true);
// a is not in any batch entry
for (const entry of bs.values()) {
expect(entry.running.has('a')).toBe(false);
}
});
});
describe('getReadyInBatch', () => {
function makeBatchState(
overrides?: Map<string, { running: Set<string>; maxConcurrent: number; joinRule: TriggerRule }>,
): Map<string, { running: Set<string>; maxConcurrent: number; joinRule: TriggerRule }> {
return overrides ?? new Map();
}
it('passes all steps through when batchState is empty', () => {
const steps: Step[] = [
{ id: 'a', kind: 'agent', agent: 'x', run: () => 'p' },
{ id: 'b', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' },
];
const state: SchedulerState = {
done: new Set(),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
timedOut: new Set(),
switchResults: new Map(),
batchState: makeBatchState(),
};
const result = getReadyInBatch(steps, state, {} as Flow);
expect(result.map((s) => s.id)).toEqual(['a', 'b']);
});
it('passes non-batched steps through regardless of batch capacity', () => {
const batchState = new Map();
batchState.set('g1', { running: new Set(['a']), maxConcurrent: 1, joinRule: 'all_success' });
const steps: Step[] = [
{ id: 'nobatch', kind: 'agent', agent: 'z', run: () => 'r' },
{ id: 'batched', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' },
];
const state: SchedulerState = {
done: new Set(),
skipped: new Set(),
inFlight: new Set(['a']),
excluded: new Set(),
timedOut: new Set(),
switchResults: new Map(),
batchState,
};
const result = getReadyInBatch(steps, state, {} as Flow);
// nobatch passes, batched is at maxConcurrent=1 with a already running → blocked
expect(result.map((s) => s.id)).toEqual(['nobatch']);
});
it('allows batch steps up to maxConcurrent', () => {
const batchState = new Map();
batchState.set('g1', { running: new Set(), maxConcurrent: 2, joinRule: 'all_success' });
const steps: Step[] = [
{ id: 's1', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' },
{ id: 's2', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' },
{ id: 's3', kind: 'agent', agent: 'z', batch: 'g1', run: () => 'r' },
];
const state: SchedulerState = {
done: new Set(),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
timedOut: new Set(),
switchResults: new Map(),
batchState,
};
// All 0 running, maxConcurrent=2 → all 3 pass through (readySteps would return them,
// but the flow-runner dispatches them one-by-one in the agent dispatch loop; getReadyInBatch
// is called each tick to allow up to maxConcurrent. Since batch is empty on this tick,
// all are allowed — the runner's dispatch loop will put 2 in flight, then next tick blocks.)
const result = getReadyInBatch(steps, state, {} as Flow);
expect(result.map((s) => s.id)).toEqual(['s1', 's2', 's3']);
});
it('blocks batch steps when at capacity', () => {
const batchState = new Map();
batchState.set('g1', { running: new Set(['a', 'b']), maxConcurrent: 2, joinRule: 'all_success' });
const steps: Step[] = [
{ id: 'c', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' },
{ id: 'd', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' },
];
const state: SchedulerState = {
done: new Set(),
skipped: new Set(),
inFlight: new Set(['a', 'b']),
excluded: new Set(),
timedOut: new Set(),
switchResults: new Map(),
batchState,
};
// Both batches at capacity → everything filtered out
expect(getReadyInBatch(steps, state, {} as Flow)).toEqual([]);
});
it('handles multiple independent batch groups', () => {
const batchState = new Map();
batchState.set('g1', { running: new Set(['a']), maxConcurrent: 1, joinRule: 'all_success' });
batchState.set('g2', { running: new Set(), maxConcurrent: 5, joinRule: 'all_success' });
const steps: Step[] = [
{ id: 'b', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' }, // g1 at capacity → blocked
{ id: 'c', kind: 'agent', agent: 'y', batch: 'g2', run: () => 'q' }, // g2 has room → passes
{ id: 'd', kind: 'agent', agent: 'z', batch: 'g2', run: () => 'r' }, // g2 has room → passes
];
const state: SchedulerState = {
done: new Set(),
skipped: new Set(),
inFlight: new Set(['a']),
excluded: new Set(),
timedOut: new Set(),
switchResults: new Map(),
batchState,
};
expect(getReadyInBatch(steps, state, {} as Flow).map((s) => s.id)).toEqual(['c', 'd']);
});
it('lets a step pass when its batch group is known but has no running steps yet', () => {
const batchState = new Map();
batchState.set('g1', { running: new Set(), maxConcurrent: 2, joinRule: 'all_success' });
const steps: Step[] = [
{ id: 'first', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' },
];
const state: SchedulerState = {
done: new Set(),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
timedOut: new Set(),
switchResults: new Map(),
batchState,
};
expect(getReadyInBatch(steps, state, {} as Flow).map((s) => s.id)).toEqual(['first']);
});
it('handles empty step list gracefully', () => {
const state: SchedulerState = {
done: new Set(),
skipped: new Set(),
inFlight: new Set(),
excluded: new Set(),
timedOut: new Set(),
switchResults: new Map(),
batchState: makeBatchState(),
};
expect(getReadyInBatch([], state, {} as Flow)).toEqual([]);
});
});
// ─── Resume reconciliation (D-9) ─────────────────────────────────────────────
describe('reconcileResumeStep', () => {
it('keeps non-running steps regardless of task state', () => {
for (const status of ['completed', 'skipped', 'failed', 'cancelled', 'pending']) {
expect(reconcileResumeStep(status, 'tid', 'running')).toBe('keep');
}
});
it('re-dispatches a running step with no task_id', () => {
expect(reconcileResumeStep('running', null, null)).toBe('re-dispatch');
});
it('re-dispatches a running step whose task is absent from the DB', () => {
expect(reconcileResumeStep('running', 'tid', null)).toBe('re-dispatch');
});
it('marks done when the task completed before the callback ran', () => {
expect(reconcileResumeStep('running', 'tid', 'completed')).toBe('mark-done');
});
it('marks failed when the task failed', () => {
expect(reconcileResumeStep('running', 'tid', 'failed')).toBe('mark-failed');
});
it('marks cancelled when the task was cancelled', () => {
expect(reconcileResumeStep('running', 'tid', 'cancelled')).toBe('mark-cancelled');
});
it('keeps a running step whose task is pending (dispatcher startup poll handles it)', () => {
expect(reconcileResumeStep('running', 'tid', 'pending')).toBe('keep');
});
it('re-dispatches when the task is running (PTY dead on restart)', () => {
expect(reconcileResumeStep('running', 'tid', 'running')).toBe('re-dispatch');
});
it('re-dispatches when the task is blocked (permission dialog gone on restart)', () => {
expect(reconcileResumeStep('running', 'tid', 'blocked')).toBe('re-dispatch');
});
});
// ─── Dispatcher routing guard (H1) ───────────────────────────────────────────
describe('shouldFailOnMissingAgent', () => {
it('returns true for qwen+plan (orchestrator read-only gate)', () => {
expect(shouldFailOnMissingAgent('qwen', 'plan')).toBe(true);
});
it('returns false for qwen without plan mode', () => {
expect(shouldFailOnMissingAgent('qwen', null)).toBe(false);
expect(shouldFailOnMissingAgent('qwen', 'auto')).toBe(false);
});
it('returns false for non-qwen agents even with plan mode', () => {
expect(shouldFailOnMissingAgent('goose', 'plan')).toBe(false);
expect(shouldFailOnMissingAgent('opencode', 'plan')).toBe(false);
expect(shouldFailOnMissingAgent('claude', 'plan')).toBe(false);
});
it('returns false for qwen with any mode other than plan', () => {
for (const mode of ['bypassPermissions', 'acceptEdits', 'dontAsk', 'default', '']) {
expect(shouldFailOnMissingAgent('qwen', mode)).toBe(false);
}
});
it('returns false for empty or unknown agent name even with plan mode', () => {
expect(shouldFailOnMissingAgent('', 'plan')).toBe(false);
expect(shouldFailOnMissingAgent('native', 'plan')).toBe(false);
expect(shouldFailOnMissingAgent('boocode', 'plan')).toBe(false);
});
});
describe('reconcileRun', () => {
it('returns one decision per step', () => {
const steps = [
{ stepId: 'a', taskId: null, status: 'completed' },
{ stepId: 'b', taskId: 't1', status: 'running' },
{ stepId: 'c', taskId: 't2', status: 'running' },
];
const taskStates = new Map([['t1', 'completed'], ['t2', 'running']]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions).toHaveLength(3);
expect(decisions[0]).toEqual({ stepId: 'a', action: 'keep' });
expect(decisions[1]).toEqual({ stepId: 'b', action: 'mark-done' });
expect(decisions[2]).toEqual({ stepId: 'c', action: 're-dispatch' });
});
it('handles a mixed run: completed steps kept, live-pending kept, stale re-dispatched', () => {
const steps = [
{ stepId: 'finder-1', taskId: 't1', status: 'completed' },
{ stepId: 'finder-2', taskId: 't2', status: 'running' }, // PTY dead
{ stepId: 'finder-3', taskId: 't3', status: 'running' }, // pending in dispatcher
{ stepId: 'synth', taskId: null, status: 'pending' }, // not yet started
];
const taskStates = new Map([
['t1', 'completed'],
['t2', 'running'], // stuck — PTY dead
['t3', 'pending'], // dispatcher will handle
]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions.find((d) => d.stepId === 'finder-1')?.action).toBe('keep');
expect(decisions.find((d) => d.stepId === 'finder-2')?.action).toBe('re-dispatch');
expect(decisions.find((d) => d.stepId === 'finder-3')?.action).toBe('keep');
expect(decisions.find((d) => d.stepId === 'synth')?.action).toBe('keep');
});
it('produces mark-failed for a failed task and mark-done for a completed task', () => {
const steps = [
{ stepId: 'a', taskId: 't1', status: 'running' },
{ stepId: 'b', taskId: 't2', status: 'running' },
];
const taskStates = new Map([['t1', 'failed'], ['t2', 'completed']]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions.find((d) => d.stepId === 'a')?.action).toBe('mark-failed');
expect(decisions.find((d) => d.stepId === 'b')?.action).toBe('mark-done');
});
it('is idempotent: a re-dispatched step (task now pending) is kept on second call', () => {
// After re-dispatch, flow_steps.status stays 'running' but task_id → new pending task.
const steps = [{ stepId: 'x', taskId: 'new-task', status: 'running' }];
const taskStates = new Map([['new-task', 'pending']]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions[0]?.action).toBe('keep');
});
it('returns an empty array when there are no steps', () => {
expect(reconcileRun([], new Map())).toEqual([]);
});
it('re-dispatches a running step whose taskId is absent from the taskStates map', () => {
// taskId is set but the task row no longer exists in the DB → taskState resolves to null.
const steps = [{ stepId: 'x', taskId: 'orphan-task', status: 'running' }];
const decisions = reconcileRun(steps, new Map());
expect(decisions[0]?.action).toBe('re-dispatch');
});
it('re-dispatches a running step with null taskId', () => {
const steps = [{ stepId: 'y', taskId: null, status: 'running' }];
const decisions = reconcileRun(steps, new Map());
expect(decisions[0]?.action).toBe('re-dispatch');
});
it('propagates mark-cancelled when the associated task was cancelled before the callback ran', () => {
const steps = [{ stepId: 'z', taskId: 'tid', status: 'running' }];
const taskStates = new Map([['tid', 'cancelled']]);
const decisions = reconcileRun(steps, taskStates);
expect(decisions[0]?.action).toBe('mark-cancelled');
});
});