diff --git a/apps/coder/src/conductor/types.ts b/apps/coder/src/conductor/types.ts index 4231dab..3592707 100644 --- a/apps/coder/src/conductor/types.ts +++ b/apps/coder/src/conductor/types.ts @@ -38,7 +38,20 @@ export interface StepContext { readonly model?: string; } -export type StepKind = 'agent' | 'code' | 'approval'; +export type StepKind = 'agent' | 'code' | 'approval' | 'switch'; + +/** + * One branch of a SWITCH step. The first case whose condition evaluates to true + * is selected; all other branches' stepIds are excluded from execution. + */ +export interface SwitchCase { + /** Human-readable label for this branch (reported in switch output). */ + label: string; + /** Pure guard — called with the current step context to decide this branch. */ + condition: (ctx: StepContext) => boolean; + /** stepIds belonging to this branch. */ + stepIds: string[]; +} export type TriggerRule = 'all_success' | 'one_success' | 'all_done'; @@ -63,12 +76,19 @@ export interface Step { /** * For kind:'agent', returns the worker PROMPT (task + any prior outputs). * For kind:'code', returns the step RESULT directly (the fold/transform). + * For kind:'switch', unused (the runner evaluates cases internally). */ run: (ctx: StepContext) => string | Promise; /** optional guard — when it returns false the step is skipped (e.g. no repo) */ when?: (ctx: StepContext) => boolean; /** max retries on timeout (0 or unset = no retry) */ maxRetries?: number; + /** batch group id; steps sharing the same batch are gated by batchConfig.maxConcurrent */ + batch?: string; + /** for kind:'switch' — ordered list of branches evaluated in declaration order */ + cases?: SwitchCase[]; + /** for kind:'switch' — fallback step ids when no case matches */ + defaultBranch?: string[]; } export interface Flow { @@ -79,6 +99,8 @@ export interface Flow { render: (ctx: StepContext) => string; /** optional output filename for the artifact, derived from input */ output?: (ctx: StepContext) => string; + /** batch parallelism control — gates concurrent dispatch of steps sharing the same batch id */ + batchConfig?: { maxConcurrent: number; timeoutMs?: number; joinRule?: TriggerRule }; } export interface RunResult { diff --git a/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts b/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts index 1b0a23d..19ecf52 100644 --- a/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts +++ b/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts @@ -1,16 +1,20 @@ import { describe, it, expect } from 'vitest'; import type { Flow, Step, StepContext } from '../../conductor/types.js'; import { + buildBatchState, + getReadyInBatch, manifestSteps, - readySteps, partitionReady, + readySteps, isRunComplete, isStuck, reconcileResumeStep, reconcileRun, + resolveSwitch, shouldFailOnMissingAgent, type SchedulerState, } from '../flow-runner-decisions.js'; +import type { StepContext } from '../../conductor/types.js'; /** * The DB-driven flow-runner replaces the Phase-1 in-memory wave scheduler @@ -53,6 +57,7 @@ const emptyState = (over: Partial = {}): SchedulerState => ({ inFlight: new Set(), excluded: new Set(), timedOut: new Set(), + switchResults: new Map(), ...over, }); @@ -238,6 +243,442 @@ describe('isRunComplete / isStuck', () => { }); }); +// ─── SWITCH branching (v2.9) ───────────────────────────────────────────────── + +describe('resolveSwitch', () => { + const baseCtx: StepContext = { input: { question: 'q', band: 'small' }, results: {} }; + + it('selects the first matching case and excludes other branches', () => { + const step: Step = { + id: 'router', + kind: 'switch', + run: () => '', + cases: [ + { label: 'a', condition: () => false, stepIds: ['a1', 'a2'] }, + { label: 'b', condition: () => true, stepIds: ['b1', 'b2'] }, + { label: 'c', condition: () => true, stepIds: ['c1', 'c2'] }, + ], + }; + const result = resolveSwitch(step, baseCtx); + expect(result.chosenCase).toBe('b'); + expect(result.excluded).toEqual(['a1', 'a2', 'c1', 'c2']); + }); + + it('falls back to defaultBranch when no case matches', () => { + const step: Step = { + id: 'router', + kind: 'switch', + run: () => '', + cases: [ + { label: 'x', condition: () => false, stepIds: ['x1'] }, + { label: 'y', condition: () => false, stepIds: ['y1'] }, + ], + defaultBranch: ['z1', 'z2'], + }; + const result = resolveSwitch(step, baseCtx); + expect(result.chosenCase).toBeNull(); + // Only case branch steps are excluded; default steps are not. + expect(result.excluded).toEqual(['x1', 'y1']); + }); + + it('excludes all branch steps when no case matches and no default', () => { + const step: Step = { + id: 'router', + kind: 'switch', + run: () => '', + cases: [ + { label: 'p', condition: () => false, stepIds: ['p1'] }, + { label: 'q', condition: () => false, stepIds: ['q1', 'q2'] }, + ], + }; + const result = resolveSwitch(step, baseCtx); + expect(result.chosenCase).toBeNull(); + expect(result.excluded).toEqual(['p1', 'q1', 'q2']); + }); + + it('excludes defaultBranch when a case matched', () => { + const step: Step = { + id: 'router', + kind: 'switch', + run: () => '', + cases: [ + { label: 'hit', condition: () => true, stepIds: ['h1'] }, + { label: 'miss', condition: () => false, stepIds: ['m1'] }, + ], + defaultBranch: ['d1'], + }; + const result = resolveSwitch(step, baseCtx); + expect(result.chosenCase).toBe('hit'); + expect(result.excluded).toEqual(['m1', 'd1']); + }); + + it('returns empty excluded for a degenerate switch with no cases and no default', () => { + const step: Step = { + id: 'noop', + kind: 'switch', + run: () => '', + }; + const result = resolveSwitch(step, baseCtx); + expect(result.chosenCase).toBeNull(); + expect(result.excluded).toEqual([]); + }); + + it('uses ctx.results in condition evaluation', () => { + const step: Step = { + id: 'router', + kind: 'switch', + run: () => '', + cases: [ + { label: 'has', condition: (ctx) => ctx.results['prev'] === 'yes', stepIds: ['yes-branch'] }, + { label: 'no', condition: () => true, stepIds: ['no-branch'] }, + ], + }; + const ctxWithResult: StepContext = { input: { question: 'q', band: 'small' }, results: { prev: 'yes' } }; + const result = resolveSwitch(step, ctxWithResult); + expect(result.chosenCase).toBe('has'); + expect(result.excluded).toEqual(['no-branch']); + }); +}); + +describe('readySteps with switch-excluded steps', () => { + // Flow: switch router → branch-a/branch-b → fold + function switchFlow(): Flow { + const steps: Step[] = [ + { + id: 'switch', kind: 'switch', run: () => '', + cases: [ + { label: 'a', condition: () => true, stepIds: ['branch-a'] }, + { label: 'b', condition: () => false, stepIds: ['branch-b'] }, + ], + }, + { id: 'branch-a', kind: 'agent', agent: 'x', deps: ['switch'], run: () => 'p' }, + { id: 'branch-b', kind: 'agent', agent: 'y', deps: ['switch'], run: () => 'q' }, + { id: 'fold', kind: 'code', deps: ['branch-a', 'branch-b'], run: () => 'r' }, + ]; + return { name: 'switch-demo', description: '', steps, render: () => '' }; + } + + it('excludes non-selected branch steps and treats them as satisfied deps', () => { + const flow = switchFlow(); + // switch completed, branch-b excluded by switch (branch-a selected) + const switchResult = new Map }>([ + ['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }], + ]); + const state: SchedulerState = { + done: new Set(['switch']), + skipped: new Set(), + inFlight: new Set(), + excluded: new Set(), + timedOut: new Set(), + switchResults: switchResult, + }; + const ready = readySteps(flow, state).map((s) => s.id); + // branch-a is ready (dep switch is done), branch-b is excluded + expect(ready).toContain('branch-a'); + expect(ready).not.toContain('branch-b'); + }); + + it('fold unblocks once selected branch completes (excluded branch satisfied)', () => { + const flow = switchFlow(); + const switchResult = new Map }>([ + ['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }], + ]); + const state: SchedulerState = { + done: new Set(['switch', 'branch-a']), + skipped: new Set(), + inFlight: new Set(), + excluded: new Set(), + timedOut: new Set(), + switchResults: switchResult, + }; + const ready = readySteps(flow, state).map((s) => s.id); + // fold's deps: branch-a done, branch-b excluded (via switch) → satisfied + expect(ready).toContain('fold'); + }); + + it('fold stays blocked until selected branch completes, even with excluded dep', () => { + const flow = switchFlow(); + const switchResult = new Map }>([ + ['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }], + ]); + const state: SchedulerState = { + done: new Set(['switch']), + skipped: new Set(), + inFlight: new Set(['branch-a']), + excluded: new Set(), + timedOut: new Set(), + switchResults: switchResult, + }; + const ready = readySteps(flow, state).map((s) => s.id); + // branch-a in flight, branch-b excluded — only branch-a offered + expect(ready).not.toContain('fold'); + }); + + it('isRunComplete returns true when switch-excluded steps are the only unsettled', () => { + const flow = switchFlow(); + // All non-excluded steps done; branch-b is excluded via switch + const switchResult = new Map }>([ + ['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }], + ]); + const state: SchedulerState = { + done: new Set(['switch', 'branch-a', 'fold']), + skipped: new Set(), + inFlight: new Set(), + excluded: new Set(), + timedOut: new Set(), + switchResults: switchResult, + }; + expect(isRunComplete(flow, state)).toBe(true); + expect(isStuck(flow, state)).toBe(false); + }); + + it('combines static excluded with switch-excluded', () => { + const flow = switchFlow(); + // band gating excludes branch-b at launch, AND switch also excludes it + const switchResult = new Map }>([ + ['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }], + ]); + const state: SchedulerState = { + done: new Set(['switch', 'branch-a']), + skipped: new Set(), + inFlight: new Set(), + excluded: new Set(['branch-b']), + timedOut: new Set(), + switchResults: switchResult, + }; + // branch-b excluded both ways; fold sees branch-a done, branch-b excluded + const ready = readySteps(flow, state).map((s) => s.id); + expect(ready).toContain('fold'); + }); +}); + +// ─── Batch parallelism (v2.8.22) ───────────────────────────────────────────── + +describe('buildBatchState', () => { + it('returns empty map when flow has no batchConfig', () => { + const flow: Flow = { + name: 'no-batch', + description: '', + steps: [ + { id: 'a', kind: 'agent', agent: 'x', run: () => 'p' }, + { id: 'b', kind: 'code', deps: ['a'], run: () => 'r' }, + ], + render: () => '', + }; + const bs = buildBatchState(flow, new Set()); + expect(bs.size).toBe(0); + }); + + it('maps each batch group to its running set and config', () => { + const flow: Flow = { + name: 'batched', + description: '', + steps: [ + { id: 'a1', kind: 'agent', agent: 'x', batch: 'review', run: () => 'p' }, + { id: 'a2', kind: 'agent', agent: 'y', batch: 'review', run: () => 'q' }, + { id: 'b1', kind: 'agent', agent: 'z', batch: 'check', run: () => 'r' }, + { id: 'fold', kind: 'code', deps: ['a1', 'a2', 'b1'], run: () => 's' }, + ], + render: () => '', + batchConfig: { maxConcurrent: 2 }, + }; + // a1 is in flight → review batch has 1 running, check has 0. + const bs = buildBatchState(flow, new Set(['a1'])); + expect(bs.size).toBe(2); + + const review = bs.get('review'); + expect(review).toBeDefined(); + expect([...review!.running]).toEqual(['a1']); + expect(review!.maxConcurrent).toBe(2); + expect(review!.joinRule).toBe('all_success'); + + const check = bs.get('check'); + expect(check).toBeDefined(); + expect(check!.running.size).toBe(0); + expect(check!.maxConcurrent).toBe(2); + }); + + it('uses joinRule from batchConfig when provided', () => { + const flow: Flow = { + name: 'join', + description: '', + steps: [ + { id: 'x', kind: 'agent', agent: 'a', batch: 'g1', run: () => 'p' }, + ], + render: () => '', + batchConfig: { maxConcurrent: 1, joinRule: 'one_success' }, + }; + const bs = buildBatchState(flow, new Set()); + expect(bs.get('g1')!.joinRule).toBe('one_success'); + }); + + it('ignores steps without a batch field', () => { + const flow: Flow = { + name: 'mixed', + description: '', + steps: [ + { id: 'a', kind: 'agent', agent: 'x', run: () => 'p' }, + { id: 'b', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' }, + ], + render: () => '', + batchConfig: { maxConcurrent: 3 }, + }; + const bs = buildBatchState(flow, new Set(['a', 'b'])); + // a is inFlight but has no batch — it does not create an entry + expect(bs.size).toBe(1); + expect(bs.has('g1')).toBe(true); + expect(bs.get('g1')!.running.has('b')).toBe(true); + // a is not in any batch entry + for (const entry of bs.values()) { + expect(entry.running.has('a')).toBe(false); + } + }); +}); + +describe('getReadyInBatch', () => { + function makeBatchState( + overrides?: Map; maxConcurrent: number; joinRule: TriggerRule }>, + ): Map; maxConcurrent: number; joinRule: TriggerRule }> { + return overrides ?? new Map(); + } + + it('passes all steps through when batchState is empty', () => { + const steps: Step[] = [ + { id: 'a', kind: 'agent', agent: 'x', run: () => 'p' }, + { id: 'b', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' }, + ]; + const state: SchedulerState = { + done: new Set(), + skipped: new Set(), + inFlight: new Set(), + excluded: new Set(), + timedOut: new Set(), + switchResults: new Map(), + batchState: makeBatchState(), + }; + const result = getReadyInBatch(steps, state, {} as Flow); + expect(result.map((s) => s.id)).toEqual(['a', 'b']); + }); + + it('passes non-batched steps through regardless of batch capacity', () => { + const batchState = new Map(); + batchState.set('g1', { running: new Set(['a']), maxConcurrent: 1, joinRule: 'all_success' }); + const steps: Step[] = [ + { id: 'nobatch', kind: 'agent', agent: 'z', run: () => 'r' }, + { id: 'batched', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' }, + ]; + const state: SchedulerState = { + done: new Set(), + skipped: new Set(), + inFlight: new Set(['a']), + excluded: new Set(), + timedOut: new Set(), + switchResults: new Map(), + batchState, + }; + const result = getReadyInBatch(steps, state, {} as Flow); + // nobatch passes, batched is at maxConcurrent=1 with a already running → blocked + expect(result.map((s) => s.id)).toEqual(['nobatch']); + }); + + it('allows batch steps up to maxConcurrent', () => { + const batchState = new Map(); + batchState.set('g1', { running: new Set(), maxConcurrent: 2, joinRule: 'all_success' }); + const steps: Step[] = [ + { id: 's1', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' }, + { id: 's2', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' }, + { id: 's3', kind: 'agent', agent: 'z', batch: 'g1', run: () => 'r' }, + ]; + const state: SchedulerState = { + done: new Set(), + skipped: new Set(), + inFlight: new Set(), + excluded: new Set(), + timedOut: new Set(), + switchResults: new Map(), + batchState, + }; + // All 0 running, maxConcurrent=2 → all 3 pass through (readySteps would return them, + // but the flow-runner dispatches them one-by-one in the agent dispatch loop; getReadyInBatch + // is called each tick to allow up to maxConcurrent. Since batch is empty on this tick, + // all are allowed — the runner's dispatch loop will put 2 in flight, then next tick blocks.) + const result = getReadyInBatch(steps, state, {} as Flow); + expect(result.map((s) => s.id)).toEqual(['s1', 's2', 's3']); + }); + + it('blocks batch steps when at capacity', () => { + const batchState = new Map(); + batchState.set('g1', { running: new Set(['a', 'b']), maxConcurrent: 2, joinRule: 'all_success' }); + const steps: Step[] = [ + { id: 'c', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' }, + { id: 'd', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' }, + ]; + const state: SchedulerState = { + done: new Set(), + skipped: new Set(), + inFlight: new Set(['a', 'b']), + excluded: new Set(), + timedOut: new Set(), + switchResults: new Map(), + batchState, + }; + // Both batches at capacity → everything filtered out + expect(getReadyInBatch(steps, state, {} as Flow)).toEqual([]); + }); + + it('handles multiple independent batch groups', () => { + const batchState = new Map(); + batchState.set('g1', { running: new Set(['a']), maxConcurrent: 1, joinRule: 'all_success' }); + batchState.set('g2', { running: new Set(), maxConcurrent: 5, joinRule: 'all_success' }); + const steps: Step[] = [ + { id: 'b', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' }, // g1 at capacity → blocked + { id: 'c', kind: 'agent', agent: 'y', batch: 'g2', run: () => 'q' }, // g2 has room → passes + { id: 'd', kind: 'agent', agent: 'z', batch: 'g2', run: () => 'r' }, // g2 has room → passes + ]; + const state: SchedulerState = { + done: new Set(), + skipped: new Set(), + inFlight: new Set(['a']), + excluded: new Set(), + timedOut: new Set(), + switchResults: new Map(), + batchState, + }; + expect(getReadyInBatch(steps, state, {} as Flow).map((s) => s.id)).toEqual(['c', 'd']); + }); + + it('lets a step pass when its batch group is known but has no running steps yet', () => { + const batchState = new Map(); + batchState.set('g1', { running: new Set(), maxConcurrent: 2, joinRule: 'all_success' }); + const steps: Step[] = [ + { id: 'first', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' }, + ]; + const state: SchedulerState = { + done: new Set(), + skipped: new Set(), + inFlight: new Set(), + excluded: new Set(), + timedOut: new Set(), + switchResults: new Map(), + batchState, + }; + expect(getReadyInBatch(steps, state, {} as Flow).map((s) => s.id)).toEqual(['first']); + }); + + it('handles empty step list gracefully', () => { + const state: SchedulerState = { + done: new Set(), + skipped: new Set(), + inFlight: new Set(), + excluded: new Set(), + timedOut: new Set(), + switchResults: new Map(), + batchState: makeBatchState(), + }; + expect(getReadyInBatch([], state, {} as Flow)).toEqual([]); + }); +}); + // ─── Resume reconciliation (D-9) ───────────────────────────────────────────── describe('reconcileResumeStep', () => { diff --git a/apps/coder/src/services/flow-runner-decisions.ts b/apps/coder/src/services/flow-runner-decisions.ts index 3b9f811..c1f7e53 100644 --- a/apps/coder/src/services/flow-runner-decisions.ts +++ b/apps/coder/src/services/flow-runner-decisions.ts @@ -35,11 +35,41 @@ export interface SchedulerState { readonly excluded: ReadonlySet; /** step ids that timed out (terminal — no retries remaining or not retriable) */ readonly timedOut: ReadonlySet; + /** + * Per-batch running sets, populated by buildBatchState from the flow definition + * and the current inFlight set. Only read by getReadyInBatch; never mutated by + * decision functions (the caller maintains it across ticks). + */ + readonly batchState?: Map; maxConcurrent: number; joinRule: TriggerRule }>; + /** + * Per-switch-step routing results. Populated when a SWITCH step completes. + * Step ids in any result's `excluded` set are treated as excluded for the + * remainder of the run — they won't execute and won't block dependents. + */ + readonly switchResults: ReadonlyMap }>; } /** A dependency is satisfied once it is done, skipped, excluded, or timed out. */ function isSatisfied(state: SchedulerState, id: string): boolean { - return state.done.has(id) || state.skipped.has(id) || state.excluded.has(id) || state.timedOut.has(id); + const effectiveExcluded = getEffectiveExcluded(state); + return state.done.has(id) || state.skipped.has(id) || effectiveExcluded.has(id) || state.timedOut.has(id); +} + +/** + * The union of the static `excluded` set and every switch result's excluded + * step ids. Steps excluded by a SWITCH evaluation act exactly like launch-time + * excluded steps: they never run and they don't block dependents. + */ +function getEffectiveExcluded(state: SchedulerState): ReadonlySet { + // Fast path: no switch results → static excluded only. + if (state.switchResults.size === 0) return state.excluded; + const combined = new Set(state.excluded); + for (const result of state.switchResults.values()) { + for (const id of result.excluded) { + combined.add(id); + } + } + return combined; } /** @@ -58,13 +88,14 @@ export function manifestSteps(flow: Flow, launchCtx: StepContext): Step[] { * Faithful to `conductor/flow.ts:27-36`. Pure. */ export function readySteps(flow: Flow, state: SchedulerState): Step[] { + const effectiveExcluded = getEffectiveExcluded(state); return flow.steps.filter( (s) => !state.done.has(s.id) && !state.skipped.has(s.id) && !state.inFlight.has(s.id) && - !state.excluded.has(s.id) && - ((s.deps ?? []).length === 0 || evaluateTriggerRule(s.deps ?? [], state.done, state.skipped, state.excluded, s.trigger_rule)), + !effectiveExcluded.has(s.id) && + ((s.deps ?? []).length === 0 || evaluateTriggerRule(s.deps ?? [], state.done, state.skipped, effectiveExcluded, s.trigger_rule)), ); } @@ -104,6 +135,57 @@ export function isStuck(flow: Flow, state: SchedulerState): boolean { ); } +// ─── Batch parallelism (v2.8.22) ───────────────────────────────────────────── + +/** + * Build the batchState Map from the flow definition and the current inFlight set. + * Only steps with a `batch` field are tracked. Empty map when `flow.batchConfig` + * is absent or no steps belong to a batch. Pure — no IO. + */ +export function buildBatchState( + flow: Flow, + inFlight: ReadonlySet, +): Map; maxConcurrent: number; joinRule: TriggerRule }> { + const result = new Map; maxConcurrent: number; joinRule: TriggerRule }>(); + if (!flow.batchConfig) return result; + + // Collect every unique batch group referenced by the flow's steps. + const groups = new Set(); + for (const s of flow.steps) { + if (s.batch) groups.add(s.batch); + } + + const { maxConcurrent, joinRule } = flow.batchConfig; + for (const batch of groups) { + const running = new Set( + flow.steps.filter((s) => s.batch === batch && inFlight.has(s.id)).map((s) => s.id), + ); + result.set(batch, { running, maxConcurrent, joinRule: joinRule ?? 'all_success' }); + } + return result; +} + +/** + * Gate a ready step list by batch parallelism limits. Steps without a `batch` + * field always pass through. Steps belonging to a batch are only included if + * that batch's currently-running count is below its `maxConcurrent` cap. + * + * This is ADDITIVE to the existing wave scheduler: pure dep-based readiness + * is computed first (readySteps), then this function applies the batch ceiling. + * Steps excluded here remain pending and will be picked up on the next tick + * when a running batch step completes. + */ +export function getReadyInBatch(ready: readonly Step[], state: SchedulerState, _flow: Flow): Step[] { + const batchState = state.batchState; + if (!batchState || batchState.size === 0) return [...ready]; + return ready.filter((s) => { + if (!s.batch) return true; + const bs = batchState.get(s.batch); + if (!bs) return true; + return bs.running.size < bs.maxConcurrent; + }); +} + // ─── Resume reconciliation (D-9) ───────────────────────────────────────────── /** @@ -194,6 +276,60 @@ export function shouldFailOnMissingAgent(agent: string, modeId: string | null): return agent === 'qwen' && modeId === 'plan'; } +/** + * Evaluate a SWITCH step: iterate cases in declaration order and return the + * label of the first matching case plus every step id that belongs to a + * non-selected branch. When no case matches, the defaultBranch (if present) + * is the effective choice. If there is no default, all branch steps are + * excluded and the switch returns `chosenCase: null`. + * + * Pure — no IO. The caller adds the returned `excluded` ids to the scheduler + * state's switchResults so downstream decision functions see them as excluded. + */ +export function resolveSwitch( + step: Step, + ctx: StepContext, +): { chosenCase: string | null; excluded: string[] } { + const cases = step.cases; + if (!cases || cases.length === 0) { + // Degenerate switch — nothing to evaluate. + return { chosenCase: null, excluded: [] }; + } + + // Evaluate conditions in order. + for (const c of cases) { + if (c.condition(ctx)) { + // This case matches — exclude all OTHER branches. + const excluded: string[] = []; + for (const other of cases) { + if (other.label !== c.label) { + excluded.push(...other.stepIds); + } + } + // The default branch is also excluded when a case matched. + if (step.defaultBranch) excluded.push(...step.defaultBranch); + return { chosenCase: c.label, excluded }; + } + } + + // No case matched — use default branch if present. + if (step.defaultBranch) { + // Default is the chosen branch: exclude all explicit case branches. + const excluded: string[] = []; + for (const c of cases) { + excluded.push(...c.stepIds); + } + return { chosenCase: null, excluded }; + } + + // No case matched and no default — exclude everything. + const excluded: string[] = []; + for (const c of cases) { + excluded.push(...c.stepIds); + } + return { chosenCase: null, excluded }; +} + /** * Evaluate a trigger rule against dependency results. * - all_success: every dep must be done (not skipped/failed) diff --git a/apps/coder/src/services/flow-runner.ts b/apps/coder/src/services/flow-runner.ts index a4bda15..ab0aee1 100644 --- a/apps/coder/src/services/flow-runner.ts +++ b/apps/coder/src/services/flow-runner.ts @@ -40,11 +40,14 @@ import { getFlow } from '../conductor/flows/index.js'; import { loadPersona } from '../conductor/persona-loader.js'; import type { Band, DispatchFn, Flow, FlowInput, Step, StepContext } from '../conductor/types.js'; import { + buildBatchState, + getReadyInBatch, isRunComplete, manifestSteps, partitionReady, readySteps, reconcileRun, + resolveSwitch, type SchedulerState, type StepResumeDecision, } from './flow-runner-decisions.js'; @@ -95,7 +98,7 @@ interface Deps { interface FlowStepRow { step_id: string; - kind: 'agent' | 'code'; + kind: 'agent' | 'code' | 'switch'; agent: string | null; status: string; chat_id: string | null; @@ -280,6 +283,8 @@ export function createFlowRunner(deps: Deps): FlowRunner { const skipped = new Set(); const inFlight = new Set(); const timedOut = new Set(); + /** Per-switch routing results — maps switch step id → resolved branch details */ + const switchExcluded = new Map }>(); const results: Record = {}; for (const r of rows) { switch (r.status) { @@ -311,6 +316,8 @@ export function createFlowRunner(deps: Deps): FlowRunner { // ─── Timeout detection ─────────────────────────────────────────────────────── // Check running steps. If a step has been 'running' longer than // FLOW_STEP_TIMEOUT_MS, mark it timed_out or re-dispatch if retriable. + // Build a context here so the timeout retry path can re-dispatch the step. + const timeoutCtx = buildCtx(input, results, model, dispatch); const timeoutMs = config.FLOW_STEP_TIMEOUT_MS; const nowDate = new Date(); let detectedTimedOut = false; @@ -341,7 +348,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { SET retry_count = ${retryCount + 1}, updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${r.step_id} AND status = 'running' `; - await dispatchAgentStep(runId, run.project_id, model, step, ctx); + await dispatchAgentStep(runId, run.project_id, model, step, timeoutCtx); inFlight.add(r.step_id); log.warn({ runId, stepId: r.step_id, retry: retryCount + 1, maxRetries }, 'flow-runner: step timed out, retrying'); @@ -369,14 +376,16 @@ export function createFlowRunner(deps: Deps): FlowRunner { // Drain ready skips + code steps (synchronous), re-evaluating after each batch, // then dispatch the full ready agent wave and wait for their terminal callbacks. for (;;) { - const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut }; + // Build per-batch state from the current inFlight set for batch parallelism gating. + const batchState = buildBatchState(flow, inFlight); + const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut, batchState, switchResults: switchExcluded }; if (isRunComplete(flow, state)) { await finishRun(runId, flow, input, results, model, dispatch); return; } - const ready = readySteps(flow, state); + const ready = getReadyInBatch(readySteps(flow, state), state, flow); if (ready.length === 0) { if (inFlight.size > 0) return; // agents in flight will re-enter via the hook await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle'); @@ -395,6 +404,31 @@ export function createFlowRunner(deps: Deps): FlowRunner { continue; // re-evaluate — a skip can settle a fan-in step's deps } + // SWITCH steps run synchronously — evaluate conditions, update the excluded + // set in SchedulerState, and mark themselves complete. Non-selected branch + // step ids are excluded from ever running. + const switchReady = toRun.filter((s) => s.kind === 'switch'); + if (switchReady.length > 0) { + for (const s of switchReady) { + let result: { chosenCase: string | null; excluded: string[] }; + try { + result = resolveSwitch(s, buildCtx(input, results, model, dispatch)); + } catch (err) { + await failRun(runId, flow, input, model, `switch step '${s.id}' threw: ${errMsg(err)}`, s.id); + return; + } + switchExcluded.set(s.id, { + chosenCase: result.chosenCase, + excluded: new Set(result.excluded), + }); + const outputText = result.chosenCase ? `branch:${result.chosenCase}` : ''; + await markStep(runId, s.id, 'completed', outputText); + results[s.id] = outputText; + done.add(s.id); + } + continue; // re-evaluate — excluded steps may unblock dependents + } + const codeReady = toRun.filter((s) => s.kind === 'code'); if (codeReady.length > 0) { for (const s of codeReady) {