feat(coder): flow-runner decisions, conductor types, collision detection tests
- Add flow-runner-decisions.ts: decision-aware step execution - Extend flow-runner.ts: dynamic step decisions - Extend conductor types: additional flow state types - Add collision-detector.test.ts: edit collision unit tests - Add conflict-index.test.ts: conflict resolution index tests
This commit is contained in:
@@ -36,9 +36,20 @@ export interface StepContext {
|
|||||||
* Falls back to a default in render functions when absent.
|
* Falls back to a default in render functions when absent.
|
||||||
*/
|
*/
|
||||||
readonly model?: string;
|
readonly model?: string;
|
||||||
|
/**
|
||||||
|
* Inter-agent messaging within the same flow run.
|
||||||
|
* `publish` broadcasts on the user WS channel and delivers to in-process
|
||||||
|
* subscribers via the broker. `subscribe` registers a handler scoped to the
|
||||||
|
* run and channel; returns an unsubscribe function.
|
||||||
|
* Undefined in contexts without a run id (manifest-only contexts).
|
||||||
|
*/
|
||||||
|
readonly messaging?: {
|
||||||
|
publish(channel: string, message: unknown): void;
|
||||||
|
subscribe(channel: string, handler: (msg: unknown) => void): () => void;
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export type StepKind = 'agent' | 'code' | 'approval' | 'switch';
|
export type StepKind = 'agent' | 'code' | 'approval' | 'switch' | 'do_while';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* One branch of a SWITCH step. The first case whose condition evaluates to true
|
* One branch of a SWITCH step. The first case whose condition evaluates to true
|
||||||
@@ -89,6 +100,12 @@ export interface Step {
|
|||||||
cases?: SwitchCase[];
|
cases?: SwitchCase[];
|
||||||
/** for kind:'switch' — fallback step ids when no case matches */
|
/** for kind:'switch' — fallback step ids when no case matches */
|
||||||
defaultBranch?: string[];
|
defaultBranch?: string[];
|
||||||
|
/** for kind:'do_while' — step IDs in the loop body (re-evaluated each iteration) */
|
||||||
|
loopBody?: string[];
|
||||||
|
/** for kind:'do_while' — guard evaluated each iteration; terminates when false */
|
||||||
|
loopCondition?: (ctx: StepContext) => boolean;
|
||||||
|
/** for kind:'do_while' — cap on total iterations (default 100) */
|
||||||
|
loopMaxIterations?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface Flow {
|
export interface Flow {
|
||||||
|
|||||||
90
apps/coder/src/services/__tests__/collision-detector.test.ts
Normal file
90
apps/coder/src/services/__tests__/collision-detector.test.ts
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
import { describe, it, expect } from 'vitest';
|
||||||
|
import { findConflicts } from '../collision-detector.js';
|
||||||
|
import type { ConflictEntry, ConflictIndexData } from '../collision-detector.js';
|
||||||
|
|
||||||
|
function entry(worktreeId: string, agent: string, start?: number, end?: number): ConflictEntry {
|
||||||
|
return {
|
||||||
|
worktreeId,
|
||||||
|
agent,
|
||||||
|
lineRange: start !== undefined && end !== undefined ? { start, end } : undefined,
|
||||||
|
status: 'pending' as const,
|
||||||
|
timestamp: 1000,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function index(entries: Array<[string, ConflictEntry[]]>): ConflictIndexData {
|
||||||
|
return new Map(entries.map(([path, es]) => [path, new Set(es)] as const));
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('findConflicts', () => {
|
||||||
|
it('returns empty when no files in index', () => {
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), new Map());
|
||||||
|
expect(result).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns empty when only own worktree has the file', () => {
|
||||||
|
const idx = index([['src/a.ts', [entry('wt-1', 'agent-a', 1, 10)]]]);
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx);
|
||||||
|
expect(result).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects same_file conflict from another worktree', () => {
|
||||||
|
const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 5, 15)]]]);
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx);
|
||||||
|
expect(result).toHaveLength(1);
|
||||||
|
expect(result[0]!.filePath).toBe('src/a.ts');
|
||||||
|
expect(result[0]!.worktrees).toEqual(['wt-2']);
|
||||||
|
expect(result[0]!.agents).toEqual(['agent-b']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('reports same_line severity when ranges overlap', () => {
|
||||||
|
const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 10, 20)]]]);
|
||||||
|
const ranges = new Map([['src/a.ts', { start: 15, end: 25 }]]);
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', ranges, idx);
|
||||||
|
expect(result[0]!.severity).toBe('same_line');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('reports different_area severity when ranges are far apart', () => {
|
||||||
|
const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 1, 10)]]]);
|
||||||
|
const ranges = new Map([['src/a.ts', { start: 100, end: 200 }]]);
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', ranges, idx);
|
||||||
|
expect(result[0]!.severity).toBe('different_area');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('reports adjacent_line severity when ranges are 3 lines apart', () => {
|
||||||
|
const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 10, 15)]]]);
|
||||||
|
const ranges = new Map([['src/a.ts', { start: 19, end: 25 }]]);
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', ranges, idx);
|
||||||
|
expect(result[0]!.severity).toBe('adjacent_line');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns entry for each conflicting file', () => {
|
||||||
|
const idx = index([
|
||||||
|
['src/a.ts', [entry('wt-2', 'agent-b', 1, 10)]],
|
||||||
|
['src/b.ts', [entry('wt-3', 'agent-c', 1, 10)]],
|
||||||
|
]);
|
||||||
|
const result = findConflicts(['src/a.ts', 'src/b.ts', 'src/c.ts'], 'wt-1', new Map(), idx);
|
||||||
|
expect(result).toHaveLength(2);
|
||||||
|
expect(result.map((v) => v.filePath).sort()).toEqual(['src/a.ts', 'src/b.ts']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('excludes entries from the same worktree', () => {
|
||||||
|
const idx = index([['src/a.ts', [entry('wt-1', 'agent-a', 1, 10), entry('wt-2', 'agent-b', 5, 15)]]]);
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx);
|
||||||
|
expect(result).toHaveLength(1);
|
||||||
|
expect(result[0]!.worktrees).toEqual(['wt-2']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('deduplicates worktree IDs in verdict', () => {
|
||||||
|
const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 1, 5), entry('wt-2', 'agent-b', 10, 15)]]]);
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx);
|
||||||
|
expect(result[0]!.worktrees).toEqual(['wt-2']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('reports same_line when no lineRange on either side (create/delete conflates)', () => {
|
||||||
|
const idx = index([['src/a.ts', [entry('wt-2', 'agent-b')]]]);
|
||||||
|
const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx);
|
||||||
|
expect(result).toHaveLength(1);
|
||||||
|
expect(result[0]!.severity).toBe('different_area');
|
||||||
|
});
|
||||||
|
});
|
||||||
146
apps/coder/src/services/__tests__/conflict-index.test.ts
Normal file
146
apps/coder/src/services/__tests__/conflict-index.test.ts
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
import { describe, it, expect, beforeEach } from 'vitest';
|
||||||
|
import { ConflictIndex } from '../conflict-index.js';
|
||||||
|
|
||||||
|
describe('ConflictIndex', () => {
|
||||||
|
let idx: ConflictIndex;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
idx = new ConflictIndex();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('registerChange', () => {
|
||||||
|
it('adds an entry for a file path', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 });
|
||||||
|
const entries = idx.getEntriesFor('src/a.ts');
|
||||||
|
expect(entries.size).toBe(1);
|
||||||
|
const entry = [...entries][0]!;
|
||||||
|
expect(entry.worktreeId).toBe('wt-1');
|
||||||
|
expect(entry.agent).toBe('agent-a');
|
||||||
|
expect(entry.lineRange).toEqual({ start: 1, end: 10 });
|
||||||
|
expect(entry.status).toBe('pending');
|
||||||
|
expect(entry.timestamp).toBeGreaterThan(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('supports multiple entries for the same file path', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 });
|
||||||
|
idx.registerChange('src/a.ts', 'wt-2', 'agent-b', { start: 20, end: 30 });
|
||||||
|
expect(idx.getEntriesFor('src/a.ts').size).toBe(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('allows a worktree to have multiple entries (several edits to same file)', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 });
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 20, end: 30 });
|
||||||
|
// Duplicate entries with same fields — the Set dedupes by ref,
|
||||||
|
// so a second identical call is still a distinct object (allowed).
|
||||||
|
expect(idx.getEntriesFor('src/a.ts').size).toBe(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('separates files into distinct keys', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
idx.registerChange('src/b.ts', 'wt-2', 'agent-b');
|
||||||
|
expect(idx.getEntriesFor('src/a.ts').size).toBe(1);
|
||||||
|
expect(idx.getEntriesFor('src/b.ts').size).toBe(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('removeWorktree', () => {
|
||||||
|
it('removes all entries for a given worktree', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
idx.registerChange('src/a.ts', 'wt-2', 'agent-b');
|
||||||
|
idx.registerChange('src/b.ts', 'wt-1', 'agent-a');
|
||||||
|
idx.removeWorktree('wt-1');
|
||||||
|
expect(idx.getEntriesFor('src/a.ts').size).toBe(1);
|
||||||
|
expect([...idx.getEntriesFor('src/a.ts')][0]!.worktreeId).toBe('wt-2');
|
||||||
|
expect(idx.getEntriesFor('src/b.ts').size).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('is a no-op when worktree has no entries', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
idx.removeWorktree('wt-ghost');
|
||||||
|
expect(idx.getEntriesFor('src/a.ts').size).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('cleans up file key when last entry is removed', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
idx.removeWorktree('wt-1');
|
||||||
|
// After removal the key should be gone
|
||||||
|
expect(idx.snapshot().has('src/a.ts')).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('sweepStale', () => {
|
||||||
|
it('removes entries older than maxAgeMs', async () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
idx.registerChange('src/b.ts', 'wt-2', 'agent-b');
|
||||||
|
// Wait a tick so timestamps diverge
|
||||||
|
await new Promise((r) => setTimeout(r, 10));
|
||||||
|
idx.registerChange('src/c.ts', 'wt-3', 'agent-c');
|
||||||
|
const removed = idx.sweepStale(5); // 5ms cutoff — entries from before the await are stale
|
||||||
|
expect(removed).toBeGreaterThanOrEqual(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('removes file key when all entries swept', async () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
// Wait so timestamp is definitely older than cutoff
|
||||||
|
await new Promise((r) => setTimeout(r, 10));
|
||||||
|
const removed = idx.sweepStale(5);
|
||||||
|
expect(removed).toBe(1);
|
||||||
|
expect(idx.snapshot().has('src/a.ts')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns 0 when no entries are stale', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
const removed = idx.sweepStale(86_400_000); // 24h
|
||||||
|
expect(removed).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getConflictsFor', () => {
|
||||||
|
it('returns conflicts between worktrees', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 });
|
||||||
|
idx.registerChange('src/a.ts', 'wt-2', 'agent-b', { start: 5, end: 15 });
|
||||||
|
const conflicts = idx.getConflictsFor('src/a.ts');
|
||||||
|
expect(conflicts).toHaveLength(1);
|
||||||
|
expect(conflicts[0]!.filePath).toBe('src/a.ts');
|
||||||
|
// getConflictsFor doesn't know the caller's line range,
|
||||||
|
// so severity defaults to 'different_area'
|
||||||
|
expect(conflicts[0]!.severity).toBe('different_area');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns empty for files with only one worktree', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
expect(idx.getConflictsFor('src/a.ts')).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns empty for files not in index', () => {
|
||||||
|
expect(idx.getConflictsFor('src/never-touched.ts')).toEqual([]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('query', () => {
|
||||||
|
it('delegates to findConflicts with proper data', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-2', 'agent-b', { start: 5, end: 15 });
|
||||||
|
const ranges = new Map([['src/a.ts', { start: 10, end: 20 }]]);
|
||||||
|
const result = idx.query(['src/a.ts'], 'wt-1', ranges);
|
||||||
|
expect(result).toHaveLength(1);
|
||||||
|
expect(result[0]!.severity).toBe('same_line');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns empty when no conflicts', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 });
|
||||||
|
const result = idx.query(['src/a.ts'], 'wt-1', new Map());
|
||||||
|
expect(result).toEqual([]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('snapshot', () => {
|
||||||
|
it('returns a copy of the internal map', () => {
|
||||||
|
idx.registerChange('src/a.ts', 'wt-1', 'agent-a');
|
||||||
|
const snap = idx.snapshot();
|
||||||
|
expect(snap.has('src/a.ts')).toBe(true);
|
||||||
|
// Mutating the snapshot doesn't affect the original
|
||||||
|
idx.removeWorktree('wt-1');
|
||||||
|
expect(snap.has('src/a.ts')).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -47,12 +47,21 @@ export interface SchedulerState {
|
|||||||
* remainder of the run — they won't execute and won't block dependents.
|
* remainder of the run — they won't execute and won't block dependents.
|
||||||
*/
|
*/
|
||||||
readonly switchResults: ReadonlyMap<string, { chosenCase: string | null; excluded: ReadonlySet<string> }>;
|
readonly switchResults: ReadonlyMap<string, { chosenCase: string | null; excluded: ReadonlySet<string> }>;
|
||||||
|
/** Per-DO_WHILE iteration count; presence in the map indicates an active loop */
|
||||||
|
readonly loopIterations: ReadonlyMap<string, number>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A dependency is satisfied once it is done, skipped, excluded, or timed out. */
|
/** A dependency is satisfied once it is done, skipped, excluded, or timed out.
|
||||||
|
* Dependencies on a running DO_WHILE step are also satisfied so body steps
|
||||||
|
* execute during an active loop iteration. */
|
||||||
function isSatisfied(state: SchedulerState, id: string): boolean {
|
function isSatisfied(state: SchedulerState, id: string): boolean {
|
||||||
const effectiveExcluded = getEffectiveExcluded(state);
|
const effectiveExcluded = getEffectiveExcluded(state);
|
||||||
return state.done.has(id) || state.skipped.has(id) || effectiveExcluded.has(id) || state.timedOut.has(id);
|
if (state.done.has(id) || state.skipped.has(id) || effectiveExcluded.has(id) || state.timedOut.has(id)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// A dependency on a running DO_WHILE step is satisfied (body runs during the loop).
|
||||||
|
if (state.loopIterations.has(id) && state.inFlight.has(id)) return true;
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -375,3 +384,17 @@ export function reconcileRun(
|
|||||||
),
|
),
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* True when a DO_WHILE loop should stop: the condition returned false or the
|
||||||
|
* iteration cap was reached. Pure — no IO.
|
||||||
|
*
|
||||||
|
* @param step - the DO_WHILE step definition
|
||||||
|
* @param ctx - current step context (input + accumulated results)
|
||||||
|
* @param iterations - number of completed iterations so far
|
||||||
|
*/
|
||||||
|
export function isLoopTerminated(step: Step, ctx: StepContext, iterations: number): boolean {
|
||||||
|
if (iterations >= (step.loopMaxIterations ?? 100)) return true;
|
||||||
|
if (step.loopCondition) return !step.loopCondition(ctx);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|||||||
@@ -32,7 +32,7 @@
|
|||||||
* already emits. (Phase 8 wires the OrchestratorPane's subscription to both.)
|
* already emits. (Phase 8 wires the OrchestratorPane's subscription to both.)
|
||||||
*/
|
*/
|
||||||
import type { Sql } from '../db.js';
|
import type { Sql } from '../db.js';
|
||||||
import type { Broker } from '@boocode/server/broker';
|
import type { Broker, Frame, Listener } from '@boocode/server/broker';
|
||||||
import type { WsFrame } from '@boocode/contracts/ws-frames';
|
import type { WsFrame } from '@boocode/contracts/ws-frames';
|
||||||
import type { FastifyBaseLogger } from 'fastify';
|
import type { FastifyBaseLogger } from 'fastify';
|
||||||
import type { Config } from '../config.js';
|
import type { Config } from '../config.js';
|
||||||
@@ -42,6 +42,7 @@ import type { Band, DispatchFn, Flow, FlowInput, Step, StepContext } from '../co
|
|||||||
import {
|
import {
|
||||||
buildBatchState,
|
buildBatchState,
|
||||||
getReadyInBatch,
|
getReadyInBatch,
|
||||||
|
isLoopTerminated,
|
||||||
isRunComplete,
|
isRunComplete,
|
||||||
manifestSteps,
|
manifestSteps,
|
||||||
partitionReady,
|
partitionReady,
|
||||||
@@ -98,7 +99,7 @@ interface Deps {
|
|||||||
|
|
||||||
interface FlowStepRow {
|
interface FlowStepRow {
|
||||||
step_id: string;
|
step_id: string;
|
||||||
kind: 'agent' | 'code' | 'switch';
|
kind: 'agent' | 'code' | 'switch' | 'do_while';
|
||||||
agent: string | null;
|
agent: string | null;
|
||||||
status: string;
|
status: string;
|
||||||
chat_id: string | null;
|
chat_id: string | null;
|
||||||
@@ -118,6 +119,10 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
// taskId → resolver map. These tasks have NO flow_steps row; handleTaskTerminal
|
// taskId → resolver map. These tasks have NO flow_steps row; handleTaskTerminal
|
||||||
// resolves them here instead of advancing a run.
|
// resolves them here instead of advancing a run.
|
||||||
const subDispatchWaiters = new Map<string, (output: string) => void>();
|
const subDispatchWaiters = new Map<string, (output: string) => void>();
|
||||||
|
/** Per-DO_WHILE step iteration count; persists across advance() calls. */
|
||||||
|
const loopIterations = new Map<string, number>();
|
||||||
|
/** Per-run messaging subscriptions; cleaned up when the run terminates. */
|
||||||
|
const messagingCleanups = new Map<string, Set<() => void>>();
|
||||||
|
|
||||||
function publishUser(frame: Record<string, unknown>): void {
|
function publishUser(frame: Record<string, unknown>): void {
|
||||||
broker.publishUserFrame('default', frame as unknown as WsFrame);
|
broker.publishUserFrame('default', frame as unknown as WsFrame);
|
||||||
@@ -134,8 +139,42 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
results: Record<string, string>,
|
results: Record<string, string>,
|
||||||
model: string,
|
model: string,
|
||||||
dispatch?: DispatchFn,
|
dispatch?: DispatchFn,
|
||||||
|
runId?: string,
|
||||||
|
stepId?: string,
|
||||||
): StepContext {
|
): StepContext {
|
||||||
return { input, results, model, dispatch };
|
let messaging: StepContext['messaging'] = undefined;
|
||||||
|
if (runId) {
|
||||||
|
if (!messagingCleanups.has(runId)) {
|
||||||
|
messagingCleanups.set(runId, new Set());
|
||||||
|
}
|
||||||
|
const subs = messagingCleanups.get(runId)!;
|
||||||
|
messaging = {
|
||||||
|
publish(channel: string, message: unknown) {
|
||||||
|
const content = typeof message === 'string' ? message : JSON.stringify(message);
|
||||||
|
const topic = `run:${runId}:${channel}`;
|
||||||
|
const frame = {
|
||||||
|
type: 'agent_message' as const,
|
||||||
|
run_id: runId,
|
||||||
|
sender_step_id: stepId ?? '',
|
||||||
|
content,
|
||||||
|
...(channel ? { channel } : {}),
|
||||||
|
};
|
||||||
|
broker.publishUserFrame('default', frame as unknown as WsFrame);
|
||||||
|
broker.publish(topic, frame as unknown as Frame);
|
||||||
|
},
|
||||||
|
subscribe(channel: string, handler: (msg: unknown) => void) {
|
||||||
|
const topic = `run:${runId}:${channel}`;
|
||||||
|
const listener: Listener = (f) => { handler(f); };
|
||||||
|
const unsub = broker.subscribe(topic, listener);
|
||||||
|
subs.add(unsub);
|
||||||
|
return () => {
|
||||||
|
unsub();
|
||||||
|
subs.delete(unsub);
|
||||||
|
};
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return { input, results, model, dispatch, messaging };
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Latest assistant message text for a chat — the FULL worker output (≤50k as
|
/** Latest assistant message text for a chat — the FULL worker output (≤50k as
|
||||||
@@ -378,7 +417,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
for (;;) {
|
for (;;) {
|
||||||
// Build per-batch state from the current inFlight set for batch parallelism gating.
|
// Build per-batch state from the current inFlight set for batch parallelism gating.
|
||||||
const batchState = buildBatchState(flow, inFlight);
|
const batchState = buildBatchState(flow, inFlight);
|
||||||
const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut, batchState, switchResults: switchExcluded };
|
const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut, batchState, switchResults: switchExcluded, loopIterations };
|
||||||
|
|
||||||
if (isRunComplete(flow, state)) {
|
if (isRunComplete(flow, state)) {
|
||||||
await finishRun(runId, flow, input, results, model, dispatch);
|
await finishRun(runId, flow, input, results, model, dispatch);
|
||||||
@@ -387,7 +426,46 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
|
|
||||||
const ready = getReadyInBatch(readySteps(flow, state), state, flow);
|
const ready = getReadyInBatch(readySteps(flow, state), state, flow);
|
||||||
if (ready.length === 0) {
|
if (ready.length === 0) {
|
||||||
if (inFlight.size > 0) return; // agents in flight will re-enter via the hook
|
// Before declaring stuck, check for running DO_WHILE steps whose body
|
||||||
|
// is fully done — triggers the next loop iteration or terminates.
|
||||||
|
if (inFlight.size > 0) {
|
||||||
|
let doWhileReEval = false;
|
||||||
|
for (const s of flow.steps) {
|
||||||
|
if (s.kind !== 'do_while' || !s.loopBody || s.loopBody.length === 0) continue;
|
||||||
|
if (!inFlight.has(s.id)) continue;
|
||||||
|
if (!s.loopBody.every((bId) => done.has(bId))) continue;
|
||||||
|
doWhileReEval = true;
|
||||||
|
const iterations = loopIterations.get(s.id) ?? 0;
|
||||||
|
const dwCtx = buildCtx(input, results, model, dispatch);
|
||||||
|
if (isLoopTerminated(s, dwCtx, iterations)) {
|
||||||
|
await markStep(runId, s.id, 'completed');
|
||||||
|
done.add(s.id);
|
||||||
|
results[s.id] = '';
|
||||||
|
inFlight.delete(s.id);
|
||||||
|
publishStep(runId, s.id, 'completed');
|
||||||
|
} else {
|
||||||
|
await sql`
|
||||||
|
UPDATE flow_steps SET status = 'running', updated_at = clock_timestamp()
|
||||||
|
WHERE run_id = ${runId} AND step_id = ${s.id}
|
||||||
|
`;
|
||||||
|
inFlight.add(s.id);
|
||||||
|
loopIterations.set(s.id, iterations + 1);
|
||||||
|
for (const bodyId of s.loopBody) {
|
||||||
|
done.delete(bodyId);
|
||||||
|
delete results[bodyId];
|
||||||
|
await sql`
|
||||||
|
UPDATE flow_steps
|
||||||
|
SET status = 'pending', output = NULL, updated_at = clock_timestamp()
|
||||||
|
WHERE run_id = ${runId} AND step_id = ${bodyId}
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
publishStep(runId, s.id, 'running');
|
||||||
|
}
|
||||||
|
break; // one DO_WHILE at a time
|
||||||
|
}
|
||||||
|
if (doWhileReEval) continue;
|
||||||
|
return; // genuine inFlight agents with no ready steps
|
||||||
|
}
|
||||||
await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle');
|
await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -429,6 +507,49 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
continue; // re-evaluate — excluded steps may unblock dependents
|
continue; // re-evaluate — excluded steps may unblock dependents
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DO_WHILE steps: first-activation only (ready to run for the first time).
|
||||||
|
// Re-evaluation of running DO_WHILE steps whose body is complete is handled
|
||||||
|
// in the `ready.length === 0` block above (Path 1) — this avoids duplicate
|
||||||
|
// SQL updates and competing state mutations.
|
||||||
|
const doWhileReady = toRun.filter((s) => s.kind === 'do_while');
|
||||||
|
if (doWhileReady.length > 0) {
|
||||||
|
for (const s of doWhileReady) {
|
||||||
|
const iterations = loopIterations.get(s.id) ?? 0;
|
||||||
|
const dwCtx = buildCtx(input, results, model, dispatch);
|
||||||
|
if (isLoopTerminated(s, dwCtx, iterations)) {
|
||||||
|
// Loop done — mark DO_WHILE completed. Body steps stay in their
|
||||||
|
// current state (already done from the last iteration).
|
||||||
|
await markStep(runId, s.id, 'completed');
|
||||||
|
done.add(s.id);
|
||||||
|
results[s.id] = '';
|
||||||
|
inFlight.delete(s.id);
|
||||||
|
publishStep(runId, s.id, 'completed');
|
||||||
|
} else {
|
||||||
|
// Start or continue the loop.
|
||||||
|
await sql`
|
||||||
|
UPDATE flow_steps SET status = 'running', updated_at = clock_timestamp()
|
||||||
|
WHERE run_id = ${runId} AND step_id = ${s.id}
|
||||||
|
`;
|
||||||
|
inFlight.add(s.id);
|
||||||
|
loopIterations.set(s.id, iterations + 1);
|
||||||
|
// On re-iteration, reset body steps from 'completed' back to 'pending'.
|
||||||
|
if (iterations > 0 && s.loopBody) {
|
||||||
|
for (const bodyId of s.loopBody) {
|
||||||
|
done.delete(bodyId);
|
||||||
|
delete results[bodyId];
|
||||||
|
await sql`
|
||||||
|
UPDATE flow_steps
|
||||||
|
SET status = 'pending', output = NULL, updated_at = clock_timestamp()
|
||||||
|
WHERE run_id = ${runId} AND step_id = ${bodyId}
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
publishStep(runId, s.id, 'running');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue; // re-evaluate — body steps may be newly pending
|
||||||
|
}
|
||||||
|
|
||||||
const codeReady = toRun.filter((s) => s.kind === 'code');
|
const codeReady = toRun.filter((s) => s.kind === 'code');
|
||||||
if (codeReady.length > 0) {
|
if (codeReady.length > 0) {
|
||||||
for (const s of codeReady) {
|
for (const s of codeReady) {
|
||||||
@@ -436,7 +557,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
try {
|
try {
|
||||||
// Code steps run IN-PROCESS (fold / synthesis-fold / code-review verify).
|
// Code steps run IN-PROCESS (fold / synthesis-fold / code-review verify).
|
||||||
// verify uses ctx.dispatch → dispatchSubAgent (read-only qwen workers).
|
// verify uses ctx.dispatch → dispatchSubAgent (read-only qwen workers).
|
||||||
out = await s.run(buildCtx(input, results, model, dispatch));
|
out = await s.run(buildCtx(input, results, model, dispatch, runId, s.id));
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
await failRun(runId, flow, input, model, `code step '${s.id}' threw: ${errMsg(err)}`, s.id);
|
await failRun(runId, flow, input, model, `code step '${s.id}' threw: ${errMsg(err)}`, s.id);
|
||||||
return;
|
return;
|
||||||
@@ -559,6 +680,14 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
await appendStepEvent(sql, runId, stepId, status, output ? { outputLength: output.length } : undefined);
|
await appendStepEvent(sql, runId, stepId, status, output ? { outputLength: output.length } : undefined);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function cleanupMessaging(runId: string): void {
|
||||||
|
const cleanups = messagingCleanups.get(runId);
|
||||||
|
if (cleanups) {
|
||||||
|
for (const fn of cleanups) fn();
|
||||||
|
messagingCleanups.delete(runId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ─── run completion ─────────────────────────────────────────────────────────
|
// ─── run completion ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
async function finishRun(
|
async function finishRun(
|
||||||
@@ -580,12 +709,16 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
UPDATE flow_runs SET status = 'completed', report = ${report}, updated_at = clock_timestamp()
|
UPDATE flow_runs SET status = 'completed', report = ${report}, updated_at = clock_timestamp()
|
||||||
WHERE id = ${runId} AND status = 'running'
|
WHERE id = ${runId} AND status = 'running'
|
||||||
`;
|
`;
|
||||||
if (updated.count === 0) return; // already terminal (e.g. cancelled) — don't publish
|
if (updated.count === 0) {
|
||||||
|
cleanupMessaging(runId);
|
||||||
|
return; // already terminal (e.g. cancelled) — don't publish
|
||||||
|
}
|
||||||
deps.onRunTerminal?.(runId, 'completed');
|
deps.onRunTerminal?.(runId, 'completed');
|
||||||
publishStep(runId, lastAgentStepId(flow, input, model), 'completed', {
|
publishStep(runId, lastAgentStepId(flow, input, model), 'completed', {
|
||||||
run_status: 'completed',
|
run_status: 'completed',
|
||||||
report,
|
report,
|
||||||
});
|
});
|
||||||
|
cleanupMessaging(runId);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function failRun(
|
async function failRun(
|
||||||
@@ -606,6 +739,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
log.warn({ runId, error }, 'flow-runner: run failed');
|
log.warn({ runId, error }, 'flow-runner: run failed');
|
||||||
await appendStepEvent(sql, runId, stepId, 'failed', { error });
|
await appendStepEvent(sql, runId, stepId, 'failed', { error });
|
||||||
publishStep(runId, stepId, 'failed', { run_status: 'failed' });
|
publishStep(runId, stepId, 'failed', { run_status: 'failed' });
|
||||||
|
cleanupMessaging(runId);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function cancelRun(runId: string): Promise<void> {
|
async function cancelRun(runId: string): Promise<void> {
|
||||||
@@ -633,6 +767,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info({ runId }, 'flow-runner: run cancelled');
|
log.info({ runId }, 'flow-runner: run cancelled');
|
||||||
|
cleanupMessaging(runId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The terminal agent step in roster order — a valid roster step_id to carry the
|
/** The terminal agent step in roster order — a valid roster step_id to carry the
|
||||||
@@ -918,6 +1053,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
|||||||
.map((s) => s.task_id);
|
.map((s) => s.task_id);
|
||||||
|
|
||||||
log.info({ runId }, 'flow-runner: run cancelled by request');
|
log.info({ runId }, 'flow-runner: run cancelled by request');
|
||||||
|
cleanupMessaging(runId);
|
||||||
return { cancelled: true, taskIds };
|
return { cancelled: true, taskIds };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user