From 0d6e9a24130975ce87b252cc9dc97146d26bd716 Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Sun, 7 Jun 2026 21:55:47 +0000 Subject: [PATCH] feat(coder): complete orchestrator advanced patterns - Approval gate steps pause and await human resolution - appendStepEvent wired into markStep, failRun, dispatchAgentStep - Trigger rule unit tests (6 variants) - New parallel-research flow with one_success trigger --- apps/coder/src/conductor/flows/index.ts | 3 +- .../src/conductor/flows/parallel-research.ts | 59 +++++++++++++++++++ .../services/__tests__/trigger-rules.test.ts | 31 ++++++++++ apps/coder/src/services/flow-runner.ts | 34 ++++++++++- 4 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 apps/coder/src/conductor/flows/parallel-research.ts create mode 100644 apps/coder/src/services/__tests__/trigger-rules.test.ts diff --git a/apps/coder/src/conductor/flows/index.ts b/apps/coder/src/conductor/flows/index.ts index 7f9dbe8..2f9545e 100644 --- a/apps/coder/src/conductor/flows/index.ts +++ b/apps/coder/src/conductor/flows/index.ts @@ -24,6 +24,7 @@ import { } from './planning.js'; import { adr, codingStandard, runbook, tdd, stakeholderSummary } from './authoring.js'; import { codeReview } from './code-review.js'; +import { parallelResearch } from './parallel-research.js'; const spines: Spine[] = [ // analysis / research @@ -53,7 +54,7 @@ const spines: Spine[] = [ stakeholderSummary, ]; -const bespoke: Flow[] = [codeReview]; +const bespoke: Flow[] = [codeReview, parallelResearch]; const ALL: Flow[] = [...spines.map(buildSpineFlow), ...bespoke]; diff --git a/apps/coder/src/conductor/flows/parallel-research.ts b/apps/coder/src/conductor/flows/parallel-research.ts new file mode 100644 index 0000000..a91fbf1 --- /dev/null +++ b/apps/coder/src/conductor/flows/parallel-research.ts @@ -0,0 +1,59 @@ +import type { Flow, Step, StepContext } from '../types.js'; + +const q = (ctx: StepContext) => String(ctx.input.question); + +/** + * Parallel research flow — dispatches 3 research agents simultaneously, + * then synthesizes the result on the first one to complete. + */ +export const parallelResearch: Flow = { + name: 'parallel-research', + description: 'Research from 3 angles in parallel, synthesize results on first completion', + steps: [ + { + id: 'angle-web', + kind: 'agent', + agent: 'research-analyst', + run: (ctx) => + `Research the following question from a web / prior-art perspective:\n\n${q(ctx)}`, + }, + { + id: 'angle-code', + kind: 'agent', + agent: 'codebase-explorer', + deps: [], + run: (ctx) => + `Research the following question from a codebase analysis perspective:\n\n${q(ctx)}`, + }, + { + id: 'angle-security', + kind: 'agent', + agent: 'adversarial-security-analyst', + deps: [], + run: (ctx) => + `Research the following question from a security perspective:\n\n${q(ctx)}`, + }, + { + id: 'synthesize', + kind: 'code', + deps: ['angle-web', 'angle-code', 'angle-security'], + trigger_rule: 'one_success', + run: (ctx) => { + const web = ctx.results['angle-web']; + const code = ctx.results['angle-code']; + const security = ctx.results['angle-security']; + const parts = [ + '# Parallel Research Synthesis', + '', + web ? `## Web Angle\n${web}` : '## Web Angle\n*(not yet completed)*', + code ? `## Code Angle\n${code}` : '## Code Angle\n*(not yet completed)*', + security ? `## Security Angle\n${security}` : '## Security Angle\n*(not yet completed)*', + ]; + return parts.join('\n\n'); + }, + }, + ], + render: (ctx) => { + return ctx.results['synthesize'] ?? 'No synthesis produced.'; + }, +}; diff --git a/apps/coder/src/services/__tests__/trigger-rules.test.ts b/apps/coder/src/services/__tests__/trigger-rules.test.ts new file mode 100644 index 0000000..7e97743 --- /dev/null +++ b/apps/coder/src/services/__tests__/trigger-rules.test.ts @@ -0,0 +1,31 @@ +import { describe, it, expect } from 'vitest'; +import { evaluateTriggerRule } from '../flow-runner-decisions.js'; + +describe('evaluateTriggerRule', () => { + it('all_success requires all deps done', () => { + expect(evaluateTriggerRule(['a', 'b'], new Set(['a', 'b']), new Set(), new Set())).toBe(true); + expect(evaluateTriggerRule(['a', 'b'], new Set(['a']), new Set(), new Set())).toBe(false); + }); + + it('one_success fires on first completion', () => { + expect(evaluateTriggerRule(['a', 'b'], new Set(['a']), new Set(), new Set(), 'one_success')).toBe(true); + expect(evaluateTriggerRule(['a', 'b'], new Set(), new Set(), new Set(), 'one_success')).toBe(false); + }); + + it('all_done includes skipped deps', () => { + expect(evaluateTriggerRule(['a', 'b'], new Set(['a']), new Set(['b']), new Set(), 'all_done')).toBe(true); + }); + + it('all_success treats excluded deps as satisfied', () => { + expect(evaluateTriggerRule(['a', 'b'], new Set(['a']), new Set(), new Set(['b']))).toBe(true); + }); + + it('defaults to all_success', () => { + expect(evaluateTriggerRule(['a'], new Set(['a']), new Set(), new Set())).toBe(true); + expect(evaluateTriggerRule(['a'], new Set(), new Set(), new Set())).toBe(false); + }); + + it('returns true for empty deps', () => { + expect(evaluateTriggerRule([], new Set(), new Set(), new Set())).toBe(true); + }); +}); diff --git a/apps/coder/src/services/flow-runner.ts b/apps/coder/src/services/flow-runner.ts index 1ae9218..57ef5d0 100644 --- a/apps/coder/src/services/flow-runner.ts +++ b/apps/coder/src/services/flow-runner.ts @@ -346,6 +346,20 @@ export function createFlowRunner(deps: Deps): FlowRunner { continue; // re-evaluate — code output can unblock the next wave } + // Approval gate steps: pause and wait for human decision. + const approvalReady = toRun.filter((s) => s.kind === 'approval'); + if (approvalReady.length > 0) { + for (const s of approvalReady) { + await sql` + UPDATE flow_steps SET status = 'blocked', updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${s.id} + `; + await appendStepEvent(sql, runId, s.id, 'paused', { reason: 'awaiting approval' }); + publishStep(runId, s.id, 'blocked'); + } + return; + } + // Only agent steps remain ready → dispatch the whole parallel wave, then wait. for (const s of toRun) { await dispatchAgentStep(runId, run.project_id, model, s, ctx); @@ -393,6 +407,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { SET task_id = ${task!.id}, status = 'running', input = ${fullPrompt}, updated_at = clock_timestamp() WHERE run_id = ${runId} AND step_id = ${step.id} `; + await appendStepEvent(sql, runId, step.id, 'started', { taskId: task!.id }); } /** @@ -439,6 +454,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { WHERE run_id = ${runId} AND step_id = ${stepId} `; } + await appendStepEvent(sql, runId, stepId, status, output ? { outputLength: output.length } : undefined); } // ─── run completion ───────────────────────────────────────────────────────── @@ -484,6 +500,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { if (updated.count === 0) return; const stepId = failedStepId ?? (flow ? lastAgentStepId(flow, input, model) : 'run'); log.warn({ runId, error }, 'flow-runner: run failed'); + await appendStepEvent(sql, runId, stepId, 'failed', { error }); publishStep(runId, stepId, 'failed', { run_status: 'failed' }); } @@ -523,7 +540,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { function publishStep( runId: string, stepId: string, - status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled', + status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled' | 'blocked', extra?: { run_status?: 'running' | 'completed' | 'failed' | 'cancelled'; report?: string }, ): void { publishUser({ @@ -765,6 +782,21 @@ function errMsg(e: unknown): string { return e instanceof Error ? e.message : String(e); } +// ─── Event log ─────────────────────────────────────────────────────────────── + +async function appendStepEvent( + sql: Sql, + runId: string, + stepId: string, + event: string, + payload?: Record, +): Promise { + await sql` + INSERT INTO flow_step_events (run_id, step_id, event, payload) + VALUES (${runId}, ${stepId}, ${event}, ${payload ? sql.json(payload as never) : null}) + `; +} + // ─── Variable substitution ─────────────────────────────────────────────────── const VAR_PATTERN = /\$(\w+)\.output(?:\.(\w+(?:\.\w+)*))?/g;