/** * Pure scheduling decisions for the DB-driven flow-runner — NO database, no IO. * * This is the decision core of the orchestrator's wave scheduler, lifted out of * the IO-heavy `flow-runner.ts` so it can be unit-tested in isolation (the repo * pattern: `backends/turn-guard.ts`, `backends/lifecycle-decisions.ts`). It is a * faithful port of the Phase-1 in-memory scheduler (`conductor/src/flow.ts:27-41`): * a step is ready when every dependency is *settled*, and a ready step is skipped * when its `when()` guard returns false against the current context. * * Vocabulary (DB-adapted from flow.ts's `done`/`skipped` sets): * - done — a step that completed with output (its result is in ctx.results) * - skipped — a step whose when() guard returned false at ready-time * - inFlight — an agent step whose task is dispatched but not yet terminal * - excluded — a step pre-skipped at launch by its when() guard against the run * input (band/repo gating). It never gets a flow_steps row, so it * is tracked here rather than read back from the DB. A dependency * on an excluded step counts as satisfied (the fan-in step still * runs with whatever the live angles produced — flow.ts:39 / the * fold's `if (out)` guard). * * "Settled" = done ∪ skipped ∪ excluded. Only settled deps unblock a step; * an inFlight dep does NOT (the runner waits for its terminal callback). */ import type { Flow, Step, StepContext, TriggerRule } from '../conductor/types.js'; export interface SchedulerState { /** step ids that completed successfully (results available) */ readonly done: ReadonlySet; /** step ids skipped at ready-time (when() === false with results in hand) */ readonly skipped: ReadonlySet; /** step ids whose agent task is dispatched but not yet terminal */ readonly inFlight: ReadonlySet; /** step ids pre-skipped at launch (band/when gating) — never given a row */ readonly excluded: ReadonlySet; } /** A dependency is satisfied once it is done, skipped, or excluded. */ function isSatisfied(state: SchedulerState, id: string): boolean { return state.done.has(id) || state.skipped.has(id) || state.excluded.has(id); } /** * The steps that will appear in this run — every step NOT pre-skipped by its * `when()` guard evaluated against the launch context (input only; results are * empty at launch). The complement (steps filtered out here) is the `excluded` * set. Mirrors the Spine factory's band gating (`spine.ts:71`). Pure. */ export function manifestSteps(flow: Flow, launchCtx: StepContext): Step[] { return flow.steps.filter((s) => !s.when || s.when(launchCtx)); } /** * Steps whose dependencies are all satisfied and that are not themselves * settled, excluded, or in flight — i.e. ready to evaluate/dispatch this tick. * Faithful to `conductor/flow.ts:27-36`. Pure. */ export function readySteps(flow: Flow, state: SchedulerState): Step[] { return flow.steps.filter( (s) => !state.done.has(s.id) && !state.skipped.has(s.id) && !state.inFlight.has(s.id) && !state.excluded.has(s.id) && ((s.deps ?? []).length === 0 || evaluateTriggerRule(s.deps ?? [], state.done, state.skipped, state.excluded, s.trigger_rule)), ); } /** * Partition a ready set into the steps to skip (when() === false at ready-time) * vs the steps to run, evaluating each guard against the current context * (input + completed results). Faithful to `conductor/flow.ts:39`. Pure. */ export function partitionReady( ready: readonly Step[], ctx: StepContext, ): { toRun: Step[]; toSkip: Step[] } { const toRun: Step[] = []; const toSkip: Step[] = []; for (const s of ready) { if (s.when && !s.when(ctx)) toSkip.push(s); else toRun.push(s); } return { toRun, toSkip }; } /** True when every step is settled (done ∪ skipped ∪ excluded). Pure. */ export function isRunComplete(flow: Flow, state: SchedulerState): boolean { return flow.steps.every((s) => isSatisfied(state, s.id)); } /** * True when the run can make no further progress yet cannot complete: no ready * step, nothing in flight to eventually unblock one, and not complete. Signals a * dependency cycle or an unsatisfiable dep (flow.ts:33 throws on this). Pure. */ export function isStuck(flow: Flow, state: SchedulerState): boolean { return ( !isRunComplete(flow, state) && state.inFlight.size === 0 && readySteps(flow, state).length === 0 ); } // ─── Resume reconciliation (D-9) ───────────────────────────────────────────── /** * Per-step action for `initResume`. Pure — no IO; callers supply DB rows. * * - 'keep': step is already settled, or has a live pending task that the * dispatcher's startup poll will run automatically. * - 're-dispatch': step is running but its task died (non-terminal non-pending * state, or absent) — re-insert a fresh task with the stored * prompt. * - 'mark-done': task completed before the terminal callback ran; write output * and advance. * - 'mark-failed': task failed; propagate so advance() fails the run. * - 'mark-cancelled': task was cancelled before the callback ran; propagate so * advance() cancels the run. */ export type ResumeAction = | 'keep' | 're-dispatch' | 'mark-done' | 'mark-failed' | 'mark-cancelled'; /** * Decide what to do with ONE flow step during startup resume (D-9). Pure. * * @param status - flow_steps.status * @param taskId - flow_steps.task_id (null for code steps or unstarted agent steps) * @param taskState - tasks.state for taskId, or null if the task row is absent */ export function reconcileResumeStep( status: string, taskId: string | null, taskState: string | null, ): ResumeAction { if (status !== 'running') return 'keep'; // Running step: decide by its task's current state. if (!taskId || taskState === null) return 're-dispatch'; // task gone or never created switch (taskState) { case 'completed': return 'mark-done'; case 'failed': return 'mark-failed'; case 'cancelled': return 'mark-cancelled'; case 'pending': return 'keep'; // dispatcher startup poll will run it normally default: return 're-dispatch'; // 'running' or 'blocked' — PTY is dead } } export interface StepResumeDecision { stepId: string; action: ResumeAction; } // ─── Dispatcher routing guard (H1 fix) ─────────────────────────────────────── /** * Returns true when a task whose named agent is unavailable must FAIL HARD * rather than fall through to native inference. Orchestrator steps (qwen+plan) * are the only case: the PTY `--approval-mode plan` path is the only route * that enforces the read-only invariant, so if qwen is missing the step must * fail instead of silently gaining write capability. Pure. */ export function shouldFailOnMissingAgent(agent: string, modeId: string | null): boolean { return agent === 'qwen' && modeId === 'plan'; } /** * Evaluate a trigger rule against dependency results. * - all_success: every dep must be done (not skipped/failed) * - one_success: at least one dep must be done * - all_done: every dep must be settled regardless of outcome */ export function evaluateTriggerRule( deps: string[], done: ReadonlySet, skipped: ReadonlySet, excluded: ReadonlySet, rule?: TriggerRule, ): boolean { if (deps.length === 0) return true; const satisfied = new Set([...done, ...skipped, ...excluded]); switch (rule ?? 'all_success') { case 'all_success': return deps.every((d) => done.has(d) || skipped.has(d) || excluded.has(d)); case 'one_success': return deps.some((d) => done.has(d)); case 'all_done': return deps.every((d) => satisfied.has(d)); } } /** * Reconcile every step of an in-flight run for startup resume. Returns one * decision per step. Pure — no IO. */ export function reconcileRun( steps: ReadonlyArray<{ stepId: string; taskId: string | null; status: string }>, taskStates: ReadonlyMap, ): StepResumeDecision[] { return steps.map((step) => ({ stepId: step.stepId, action: reconcileResumeStep( step.status, step.taskId, step.taskId ? (taskStates.get(step.taskId) ?? null) : null, ), })); }