feat(coder): orchestrator advanced flow patterns
- TriggerRule type (all_success/one_success/all_done) for parallel deps - Variable substitution ($stepId.output.field) in agent step prompts - Approval gate step kind (pauses flow via permission frames) - flow_step_events table for append-only event-sourced step log - evaluateTriggerRule pure function in flow-runner-decisions
This commit is contained in:
@@ -38,7 +38,9 @@ export interface StepContext {
|
|||||||
readonly model?: string;
|
readonly model?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type StepKind = 'agent' | 'code';
|
export type StepKind = 'agent' | 'code' | 'approval';
|
||||||
|
|
||||||
|
export type TriggerRule = 'all_success' | 'one_success' | 'all_done';
|
||||||
|
|
||||||
export interface Step {
|
export interface Step {
|
||||||
/** unique id within the flow; other steps depend on it by this id */
|
/** unique id within the flow; other steps depend on it by this id */
|
||||||
@@ -46,6 +48,8 @@ export interface Step {
|
|||||||
kind: StepKind;
|
kind: StepKind;
|
||||||
/** ids that must complete (or skip) before this step runs */
|
/** ids that must complete (or skip) before this step runs */
|
||||||
deps?: string[];
|
deps?: string[];
|
||||||
|
/** how dependency satisfaction is evaluated (default: all_success) */
|
||||||
|
trigger_rule?: TriggerRule;
|
||||||
/** for kind:'agent' — the persona file name under conductor/agents (no .md) */
|
/** for kind:'agent' — the persona file name under conductor/agents (no .md) */
|
||||||
agent?: string;
|
agent?: string;
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -427,3 +427,14 @@ CREATE INDEX IF NOT EXISTS cross_examinations_battle_idx ON cross_examinations(b
|
|||||||
-- TokenScope: per-category token breakdown on arena contestants and tasks.
|
-- TokenScope: per-category token breakdown on arena contestants and tasks.
|
||||||
ALTER TABLE contestants ADD COLUMN IF NOT EXISTS token_breakdown JSONB;
|
ALTER TABLE contestants ADD COLUMN IF NOT EXISTS token_breakdown JSONB;
|
||||||
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS token_breakdown JSONB;
|
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS token_breakdown JSONB;
|
||||||
|
|
||||||
|
-- Orchestrator flow step events (append-only event log for resume/replay).
|
||||||
|
CREATE TABLE IF NOT EXISTS flow_step_events (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
run_id UUID NOT NULL REFERENCES flow_runs(id),
|
||||||
|
step_id VARCHAR(64) NOT NULL,
|
||||||
|
event VARCHAR(32) NOT NULL,
|
||||||
|
payload JSONB,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp()
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS flow_step_events_run_idx ON flow_step_events(run_id);
|
||||||
|
|||||||
@@ -22,7 +22,7 @@
|
|||||||
* "Settled" = done ∪ skipped ∪ excluded. Only settled deps unblock a step;
|
* "Settled" = done ∪ skipped ∪ excluded. Only settled deps unblock a step;
|
||||||
* an inFlight dep does NOT (the runner waits for its terminal callback).
|
* an inFlight dep does NOT (the runner waits for its terminal callback).
|
||||||
*/
|
*/
|
||||||
import type { Flow, Step, StepContext } from '../conductor/types.js';
|
import type { Flow, Step, StepContext, TriggerRule } from '../conductor/types.js';
|
||||||
|
|
||||||
export interface SchedulerState {
|
export interface SchedulerState {
|
||||||
/** step ids that completed successfully (results available) */
|
/** step ids that completed successfully (results available) */
|
||||||
@@ -62,7 +62,7 @@ export function readySteps(flow: Flow, state: SchedulerState): Step[] {
|
|||||||
!state.skipped.has(s.id) &&
|
!state.skipped.has(s.id) &&
|
||||||
!state.inFlight.has(s.id) &&
|
!state.inFlight.has(s.id) &&
|
||||||
!state.excluded.has(s.id) &&
|
!state.excluded.has(s.id) &&
|
||||||
(s.deps ?? []).every((d) => isSatisfied(state, d)),
|
((s.deps ?? []).length === 0 || evaluateTriggerRule(s.deps ?? [], state.done, state.skipped, state.excluded, s.trigger_rule)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,6 +167,32 @@ export function shouldFailOnMissingAgent(agent: string, modeId: string | null):
|
|||||||
return agent === 'qwen' && modeId === 'plan';
|
return agent === 'qwen' && modeId === 'plan';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Evaluate a trigger rule against dependency results.
|
||||||
|
* - all_success: every dep must be done (not skipped/failed)
|
||||||
|
* - one_success: at least one dep must be done
|
||||||
|
* - all_done: every dep must be settled regardless of outcome
|
||||||
|
*/
|
||||||
|
export function evaluateTriggerRule(
|
||||||
|
deps: string[],
|
||||||
|
done: ReadonlySet<string>,
|
||||||
|
skipped: ReadonlySet<string>,
|
||||||
|
excluded: ReadonlySet<string>,
|
||||||
|
rule?: TriggerRule,
|
||||||
|
): boolean {
|
||||||
|
if (deps.length === 0) return true;
|
||||||
|
const satisfied = new Set([...done, ...skipped, ...excluded]);
|
||||||
|
|
||||||
|
switch (rule ?? 'all_success') {
|
||||||
|
case 'all_success':
|
||||||
|
return deps.every((d) => done.has(d) || skipped.has(d) || excluded.has(d));
|
||||||
|
case 'one_success':
|
||||||
|
return deps.some((d) => done.has(d));
|
||||||
|
case 'all_done':
|
||||||
|
return deps.every((d) => satisfied.has(d));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reconcile every step of an in-flight run for startup resume. Returns one
|
* Reconcile every step of an in-flight run for startup resume. Returns one
|
||||||
* decision per step. Pure — no IO.
|
* decision per step. Pure — no IO.
|
||||||
|
|||||||
@@ -378,7 +378,8 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
// flow's step.run already bakes in the evidence/YAGNI contracts.
|
// flow's step.run already bakes in the evidence/YAGNI contracts.
|
||||||
const persona = step.agent ? await loadPersona(step.agent) : '';
|
const persona = step.agent ? await loadPersona(step.agent) : '';
|
||||||
const taskPrompt = await step.run(ctx);
|
const taskPrompt = await step.run(ctx);
|
||||||
const fullPrompt = persona ? `${persona}\n\n---\n\n${taskPrompt}` : taskPrompt;
|
const resolvedPrompt = resolveVariables(taskPrompt, ctx.results);
|
||||||
|
const fullPrompt = persona ? `${persona}\n\n---\n\n${resolvedPrompt}` : resolvedPrompt;
|
||||||
|
|
||||||
// READ-ONLY (D-4): agent='qwen', mode_id='plan' are hardcoded, never
|
// READ-ONLY (D-4): agent='qwen', mode_id='plan' are hardcoded, never
|
||||||
// user-overridable. The dispatcher's qwen+plan rule forces the PTY hard gate.
|
// user-overridable. The dispatcher's qwen+plan rule forces the PTY hard gate.
|
||||||
@@ -763,3 +764,25 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
function errMsg(e: unknown): string {
|
function errMsg(e: unknown): string {
|
||||||
return e instanceof Error ? e.message : String(e);
|
return e instanceof Error ? e.message : String(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── Variable substitution ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
const VAR_PATTERN = /\$(\w+)\.output(?:\.(\w+(?:\.\w+)*))?/g;
|
||||||
|
|
||||||
|
export function resolveVariables(prompt: string, results: Record<string, string>): string {
|
||||||
|
return prompt.replace(VAR_PATTERN, (match, stepId, fieldPath) => {
|
||||||
|
const output = results[stepId];
|
||||||
|
if (!output) return match;
|
||||||
|
if (!fieldPath) return output;
|
||||||
|
try {
|
||||||
|
const lines = output.split('\n');
|
||||||
|
for (const line of lines) {
|
||||||
|
const parsed = line.match(new RegExp(`^${fieldPath}:\\s*(.+)$`, 'i'));
|
||||||
|
if (parsed) return parsed[1]!.trim();
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// fall through
|
||||||
|
}
|
||||||
|
return match;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user