From e2d6a6b6cddad562c1ce0ac044e0b9fde5af6d02 Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Mon, 8 Jun 2026 03:48:58 +0000 Subject: [PATCH] feat(coder): flow-runner decisions, conductor types, collision detection tests - Add flow-runner-decisions.ts: decision-aware step execution - Extend flow-runner.ts: dynamic step decisions - Extend conductor types: additional flow state types - Add collision-detector.test.ts: edit collision unit tests - Add conflict-index.test.ts: conflict resolution index tests --- apps/coder/src/conductor/types.ts | 19 ++- .../__tests__/collision-detector.test.ts | 90 +++++++++++ .../services/__tests__/conflict-index.test.ts | 146 +++++++++++++++++ .../src/services/flow-runner-decisions.ts | 27 +++- apps/coder/src/services/flow-runner.ts | 150 +++++++++++++++++- 5 files changed, 422 insertions(+), 10 deletions(-) create mode 100644 apps/coder/src/services/__tests__/collision-detector.test.ts create mode 100644 apps/coder/src/services/__tests__/conflict-index.test.ts diff --git a/apps/coder/src/conductor/types.ts b/apps/coder/src/conductor/types.ts index 3592707..fd07be7 100644 --- a/apps/coder/src/conductor/types.ts +++ b/apps/coder/src/conductor/types.ts @@ -36,9 +36,20 @@ export interface StepContext { * Falls back to a default in render functions when absent. */ readonly model?: string; + /** + * Inter-agent messaging within the same flow run. + * `publish` broadcasts on the user WS channel and delivers to in-process + * subscribers via the broker. `subscribe` registers a handler scoped to the + * run and channel; returns an unsubscribe function. + * Undefined in contexts without a run id (manifest-only contexts). + */ + readonly messaging?: { + publish(channel: string, message: unknown): void; + subscribe(channel: string, handler: (msg: unknown) => void): () => void; + }; } -export type StepKind = 'agent' | 'code' | 'approval' | 'switch'; +export type StepKind = 'agent' | 'code' | 'approval' | 'switch' | 'do_while'; /** * One branch of a SWITCH step. The first case whose condition evaluates to true @@ -89,6 +100,12 @@ export interface Step { cases?: SwitchCase[]; /** for kind:'switch' — fallback step ids when no case matches */ defaultBranch?: string[]; + /** for kind:'do_while' — step IDs in the loop body (re-evaluated each iteration) */ + loopBody?: string[]; + /** for kind:'do_while' — guard evaluated each iteration; terminates when false */ + loopCondition?: (ctx: StepContext) => boolean; + /** for kind:'do_while' — cap on total iterations (default 100) */ + loopMaxIterations?: number; } export interface Flow { diff --git a/apps/coder/src/services/__tests__/collision-detector.test.ts b/apps/coder/src/services/__tests__/collision-detector.test.ts new file mode 100644 index 0000000..5a0d93b --- /dev/null +++ b/apps/coder/src/services/__tests__/collision-detector.test.ts @@ -0,0 +1,90 @@ +import { describe, it, expect } from 'vitest'; +import { findConflicts } from '../collision-detector.js'; +import type { ConflictEntry, ConflictIndexData } from '../collision-detector.js'; + +function entry(worktreeId: string, agent: string, start?: number, end?: number): ConflictEntry { + return { + worktreeId, + agent, + lineRange: start !== undefined && end !== undefined ? { start, end } : undefined, + status: 'pending' as const, + timestamp: 1000, + }; +} + +function index(entries: Array<[string, ConflictEntry[]]>): ConflictIndexData { + return new Map(entries.map(([path, es]) => [path, new Set(es)] as const)); +} + +describe('findConflicts', () => { + it('returns empty when no files in index', () => { + const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), new Map()); + expect(result).toEqual([]); + }); + + it('returns empty when only own worktree has the file', () => { + const idx = index([['src/a.ts', [entry('wt-1', 'agent-a', 1, 10)]]]); + const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx); + expect(result).toEqual([]); + }); + + it('detects same_file conflict from another worktree', () => { + const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 5, 15)]]]); + const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx); + expect(result).toHaveLength(1); + expect(result[0]!.filePath).toBe('src/a.ts'); + expect(result[0]!.worktrees).toEqual(['wt-2']); + expect(result[0]!.agents).toEqual(['agent-b']); + }); + + it('reports same_line severity when ranges overlap', () => { + const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 10, 20)]]]); + const ranges = new Map([['src/a.ts', { start: 15, end: 25 }]]); + const result = findConflicts(['src/a.ts'], 'wt-1', ranges, idx); + expect(result[0]!.severity).toBe('same_line'); + }); + + it('reports different_area severity when ranges are far apart', () => { + const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 1, 10)]]]); + const ranges = new Map([['src/a.ts', { start: 100, end: 200 }]]); + const result = findConflicts(['src/a.ts'], 'wt-1', ranges, idx); + expect(result[0]!.severity).toBe('different_area'); + }); + + it('reports adjacent_line severity when ranges are 3 lines apart', () => { + const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 10, 15)]]]); + const ranges = new Map([['src/a.ts', { start: 19, end: 25 }]]); + const result = findConflicts(['src/a.ts'], 'wt-1', ranges, idx); + expect(result[0]!.severity).toBe('adjacent_line'); + }); + + it('returns entry for each conflicting file', () => { + const idx = index([ + ['src/a.ts', [entry('wt-2', 'agent-b', 1, 10)]], + ['src/b.ts', [entry('wt-3', 'agent-c', 1, 10)]], + ]); + const result = findConflicts(['src/a.ts', 'src/b.ts', 'src/c.ts'], 'wt-1', new Map(), idx); + expect(result).toHaveLength(2); + expect(result.map((v) => v.filePath).sort()).toEqual(['src/a.ts', 'src/b.ts']); + }); + + it('excludes entries from the same worktree', () => { + const idx = index([['src/a.ts', [entry('wt-1', 'agent-a', 1, 10), entry('wt-2', 'agent-b', 5, 15)]]]); + const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx); + expect(result).toHaveLength(1); + expect(result[0]!.worktrees).toEqual(['wt-2']); + }); + + it('deduplicates worktree IDs in verdict', () => { + const idx = index([['src/a.ts', [entry('wt-2', 'agent-b', 1, 5), entry('wt-2', 'agent-b', 10, 15)]]]); + const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx); + expect(result[0]!.worktrees).toEqual(['wt-2']); + }); + + it('reports same_line when no lineRange on either side (create/delete conflates)', () => { + const idx = index([['src/a.ts', [entry('wt-2', 'agent-b')]]]); + const result = findConflicts(['src/a.ts'], 'wt-1', new Map(), idx); + expect(result).toHaveLength(1); + expect(result[0]!.severity).toBe('different_area'); + }); +}); diff --git a/apps/coder/src/services/__tests__/conflict-index.test.ts b/apps/coder/src/services/__tests__/conflict-index.test.ts new file mode 100644 index 0000000..ca1c1e6 --- /dev/null +++ b/apps/coder/src/services/__tests__/conflict-index.test.ts @@ -0,0 +1,146 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { ConflictIndex } from '../conflict-index.js'; + +describe('ConflictIndex', () => { + let idx: ConflictIndex; + + beforeEach(() => { + idx = new ConflictIndex(); + }); + + describe('registerChange', () => { + it('adds an entry for a file path', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 }); + const entries = idx.getEntriesFor('src/a.ts'); + expect(entries.size).toBe(1); + const entry = [...entries][0]!; + expect(entry.worktreeId).toBe('wt-1'); + expect(entry.agent).toBe('agent-a'); + expect(entry.lineRange).toEqual({ start: 1, end: 10 }); + expect(entry.status).toBe('pending'); + expect(entry.timestamp).toBeGreaterThan(0); + }); + + it('supports multiple entries for the same file path', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 }); + idx.registerChange('src/a.ts', 'wt-2', 'agent-b', { start: 20, end: 30 }); + expect(idx.getEntriesFor('src/a.ts').size).toBe(2); + }); + + it('allows a worktree to have multiple entries (several edits to same file)', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 }); + idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 20, end: 30 }); + // Duplicate entries with same fields — the Set dedupes by ref, + // so a second identical call is still a distinct object (allowed). + expect(idx.getEntriesFor('src/a.ts').size).toBe(2); + }); + + it('separates files into distinct keys', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + idx.registerChange('src/b.ts', 'wt-2', 'agent-b'); + expect(idx.getEntriesFor('src/a.ts').size).toBe(1); + expect(idx.getEntriesFor('src/b.ts').size).toBe(1); + }); + }); + + describe('removeWorktree', () => { + it('removes all entries for a given worktree', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + idx.registerChange('src/a.ts', 'wt-2', 'agent-b'); + idx.registerChange('src/b.ts', 'wt-1', 'agent-a'); + idx.removeWorktree('wt-1'); + expect(idx.getEntriesFor('src/a.ts').size).toBe(1); + expect([...idx.getEntriesFor('src/a.ts')][0]!.worktreeId).toBe('wt-2'); + expect(idx.getEntriesFor('src/b.ts').size).toBe(0); + }); + + it('is a no-op when worktree has no entries', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + idx.removeWorktree('wt-ghost'); + expect(idx.getEntriesFor('src/a.ts').size).toBe(1); + }); + + it('cleans up file key when last entry is removed', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + idx.removeWorktree('wt-1'); + // After removal the key should be gone + expect(idx.snapshot().has('src/a.ts')).toBe(false); + }); + }); + + describe('sweepStale', () => { + it('removes entries older than maxAgeMs', async () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + idx.registerChange('src/b.ts', 'wt-2', 'agent-b'); + // Wait a tick so timestamps diverge + await new Promise((r) => setTimeout(r, 10)); + idx.registerChange('src/c.ts', 'wt-3', 'agent-c'); + const removed = idx.sweepStale(5); // 5ms cutoff — entries from before the await are stale + expect(removed).toBeGreaterThanOrEqual(1); + }); + + it('removes file key when all entries swept', async () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + // Wait so timestamp is definitely older than cutoff + await new Promise((r) => setTimeout(r, 10)); + const removed = idx.sweepStale(5); + expect(removed).toBe(1); + expect(idx.snapshot().has('src/a.ts')).toBe(false); + }); + + it('returns 0 when no entries are stale', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + const removed = idx.sweepStale(86_400_000); // 24h + expect(removed).toBe(0); + }); + }); + + describe('getConflictsFor', () => { + it('returns conflicts between worktrees', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 }); + idx.registerChange('src/a.ts', 'wt-2', 'agent-b', { start: 5, end: 15 }); + const conflicts = idx.getConflictsFor('src/a.ts'); + expect(conflicts).toHaveLength(1); + expect(conflicts[0]!.filePath).toBe('src/a.ts'); + // getConflictsFor doesn't know the caller's line range, + // so severity defaults to 'different_area' + expect(conflicts[0]!.severity).toBe('different_area'); + }); + + it('returns empty for files with only one worktree', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + expect(idx.getConflictsFor('src/a.ts')).toEqual([]); + }); + + it('returns empty for files not in index', () => { + expect(idx.getConflictsFor('src/never-touched.ts')).toEqual([]); + }); + }); + + describe('query', () => { + it('delegates to findConflicts with proper data', () => { + idx.registerChange('src/a.ts', 'wt-2', 'agent-b', { start: 5, end: 15 }); + const ranges = new Map([['src/a.ts', { start: 10, end: 20 }]]); + const result = idx.query(['src/a.ts'], 'wt-1', ranges); + expect(result).toHaveLength(1); + expect(result[0]!.severity).toBe('same_line'); + }); + + it('returns empty when no conflicts', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a', { start: 1, end: 10 }); + const result = idx.query(['src/a.ts'], 'wt-1', new Map()); + expect(result).toEqual([]); + }); + }); + + describe('snapshot', () => { + it('returns a copy of the internal map', () => { + idx.registerChange('src/a.ts', 'wt-1', 'agent-a'); + const snap = idx.snapshot(); + expect(snap.has('src/a.ts')).toBe(true); + // Mutating the snapshot doesn't affect the original + idx.removeWorktree('wt-1'); + expect(snap.has('src/a.ts')).toBe(true); + }); + }); +}); diff --git a/apps/coder/src/services/flow-runner-decisions.ts b/apps/coder/src/services/flow-runner-decisions.ts index c1f7e53..62ca59d 100644 --- a/apps/coder/src/services/flow-runner-decisions.ts +++ b/apps/coder/src/services/flow-runner-decisions.ts @@ -47,12 +47,21 @@ export interface SchedulerState { * remainder of the run — they won't execute and won't block dependents. */ readonly switchResults: ReadonlyMap }>; + /** Per-DO_WHILE iteration count; presence in the map indicates an active loop */ + readonly loopIterations: ReadonlyMap; } -/** A dependency is satisfied once it is done, skipped, excluded, or timed out. */ +/** A dependency is satisfied once it is done, skipped, excluded, or timed out. + * Dependencies on a running DO_WHILE step are also satisfied so body steps + * execute during an active loop iteration. */ function isSatisfied(state: SchedulerState, id: string): boolean { const effectiveExcluded = getEffectiveExcluded(state); - return state.done.has(id) || state.skipped.has(id) || effectiveExcluded.has(id) || state.timedOut.has(id); + if (state.done.has(id) || state.skipped.has(id) || effectiveExcluded.has(id) || state.timedOut.has(id)) { + return true; + } + // A dependency on a running DO_WHILE step is satisfied (body runs during the loop). + if (state.loopIterations.has(id) && state.inFlight.has(id)) return true; + return false; } /** @@ -375,3 +384,17 @@ export function reconcileRun( ), })); } + +/** + * True when a DO_WHILE loop should stop: the condition returned false or the + * iteration cap was reached. Pure — no IO. + * + * @param step - the DO_WHILE step definition + * @param ctx - current step context (input + accumulated results) + * @param iterations - number of completed iterations so far + */ +export function isLoopTerminated(step: Step, ctx: StepContext, iterations: number): boolean { + if (iterations >= (step.loopMaxIterations ?? 100)) return true; + if (step.loopCondition) return !step.loopCondition(ctx); + return false; +} diff --git a/apps/coder/src/services/flow-runner.ts b/apps/coder/src/services/flow-runner.ts index ab0aee1..161d504 100644 --- a/apps/coder/src/services/flow-runner.ts +++ b/apps/coder/src/services/flow-runner.ts @@ -32,7 +32,7 @@ * already emits. (Phase 8 wires the OrchestratorPane's subscription to both.) */ import type { Sql } from '../db.js'; -import type { Broker } from '@boocode/server/broker'; +import type { Broker, Frame, Listener } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/contracts/ws-frames'; import type { FastifyBaseLogger } from 'fastify'; import type { Config } from '../config.js'; @@ -42,6 +42,7 @@ import type { Band, DispatchFn, Flow, FlowInput, Step, StepContext } from '../co import { buildBatchState, getReadyInBatch, + isLoopTerminated, isRunComplete, manifestSteps, partitionReady, @@ -98,7 +99,7 @@ interface Deps { interface FlowStepRow { step_id: string; - kind: 'agent' | 'code' | 'switch'; + kind: 'agent' | 'code' | 'switch' | 'do_while'; agent: string | null; status: string; chat_id: string | null; @@ -118,6 +119,10 @@ export function createFlowRunner(deps: Deps): FlowRunner { // taskId → resolver map. These tasks have NO flow_steps row; handleTaskTerminal // resolves them here instead of advancing a run. const subDispatchWaiters = new Map void>(); + /** Per-DO_WHILE step iteration count; persists across advance() calls. */ + const loopIterations = new Map(); + /** Per-run messaging subscriptions; cleaned up when the run terminates. */ + const messagingCleanups = new Map void>>(); function publishUser(frame: Record): void { broker.publishUserFrame('default', frame as unknown as WsFrame); @@ -134,8 +139,42 @@ export function createFlowRunner(deps: Deps): FlowRunner { results: Record, model: string, dispatch?: DispatchFn, + runId?: string, + stepId?: string, ): StepContext { - return { input, results, model, dispatch }; + let messaging: StepContext['messaging'] = undefined; + if (runId) { + if (!messagingCleanups.has(runId)) { + messagingCleanups.set(runId, new Set()); + } + const subs = messagingCleanups.get(runId)!; + messaging = { + publish(channel: string, message: unknown) { + const content = typeof message === 'string' ? message : JSON.stringify(message); + const topic = `run:${runId}:${channel}`; + const frame = { + type: 'agent_message' as const, + run_id: runId, + sender_step_id: stepId ?? '', + content, + ...(channel ? { channel } : {}), + }; + broker.publishUserFrame('default', frame as unknown as WsFrame); + broker.publish(topic, frame as unknown as Frame); + }, + subscribe(channel: string, handler: (msg: unknown) => void) { + const topic = `run:${runId}:${channel}`; + const listener: Listener = (f) => { handler(f); }; + const unsub = broker.subscribe(topic, listener); + subs.add(unsub); + return () => { + unsub(); + subs.delete(unsub); + }; + }, + }; + } + return { input, results, model, dispatch, messaging }; } /** Latest assistant message text for a chat — the FULL worker output (≤50k as @@ -378,7 +417,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { for (;;) { // Build per-batch state from the current inFlight set for batch parallelism gating. const batchState = buildBatchState(flow, inFlight); - const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut, batchState, switchResults: switchExcluded }; + const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut, batchState, switchResults: switchExcluded, loopIterations }; if (isRunComplete(flow, state)) { await finishRun(runId, flow, input, results, model, dispatch); @@ -387,7 +426,46 @@ export function createFlowRunner(deps: Deps): FlowRunner { const ready = getReadyInBatch(readySteps(flow, state), state, flow); if (ready.length === 0) { - if (inFlight.size > 0) return; // agents in flight will re-enter via the hook + // Before declaring stuck, check for running DO_WHILE steps whose body + // is fully done — triggers the next loop iteration or terminates. + if (inFlight.size > 0) { + let doWhileReEval = false; + for (const s of flow.steps) { + if (s.kind !== 'do_while' || !s.loopBody || s.loopBody.length === 0) continue; + if (!inFlight.has(s.id)) continue; + if (!s.loopBody.every((bId) => done.has(bId))) continue; + doWhileReEval = true; + const iterations = loopIterations.get(s.id) ?? 0; + const dwCtx = buildCtx(input, results, model, dispatch); + if (isLoopTerminated(s, dwCtx, iterations)) { + await markStep(runId, s.id, 'completed'); + done.add(s.id); + results[s.id] = ''; + inFlight.delete(s.id); + publishStep(runId, s.id, 'completed'); + } else { + await sql` + UPDATE flow_steps SET status = 'running', updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${s.id} + `; + inFlight.add(s.id); + loopIterations.set(s.id, iterations + 1); + for (const bodyId of s.loopBody) { + done.delete(bodyId); + delete results[bodyId]; + await sql` + UPDATE flow_steps + SET status = 'pending', output = NULL, updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${bodyId} + `; + } + publishStep(runId, s.id, 'running'); + } + break; // one DO_WHILE at a time + } + if (doWhileReEval) continue; + return; // genuine inFlight agents with no ready steps + } await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle'); return; } @@ -429,6 +507,49 @@ export function createFlowRunner(deps: Deps): FlowRunner { continue; // re-evaluate — excluded steps may unblock dependents } + // DO_WHILE steps: first-activation only (ready to run for the first time). + // Re-evaluation of running DO_WHILE steps whose body is complete is handled + // in the `ready.length === 0` block above (Path 1) — this avoids duplicate + // SQL updates and competing state mutations. + const doWhileReady = toRun.filter((s) => s.kind === 'do_while'); + if (doWhileReady.length > 0) { + for (const s of doWhileReady) { + const iterations = loopIterations.get(s.id) ?? 0; + const dwCtx = buildCtx(input, results, model, dispatch); + if (isLoopTerminated(s, dwCtx, iterations)) { + // Loop done — mark DO_WHILE completed. Body steps stay in their + // current state (already done from the last iteration). + await markStep(runId, s.id, 'completed'); + done.add(s.id); + results[s.id] = ''; + inFlight.delete(s.id); + publishStep(runId, s.id, 'completed'); + } else { + // Start or continue the loop. + await sql` + UPDATE flow_steps SET status = 'running', updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${s.id} + `; + inFlight.add(s.id); + loopIterations.set(s.id, iterations + 1); + // On re-iteration, reset body steps from 'completed' back to 'pending'. + if (iterations > 0 && s.loopBody) { + for (const bodyId of s.loopBody) { + done.delete(bodyId); + delete results[bodyId]; + await sql` + UPDATE flow_steps + SET status = 'pending', output = NULL, updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${bodyId} + `; + } + } + publishStep(runId, s.id, 'running'); + } + } + continue; // re-evaluate — body steps may be newly pending + } + const codeReady = toRun.filter((s) => s.kind === 'code'); if (codeReady.length > 0) { for (const s of codeReady) { @@ -436,7 +557,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { try { // Code steps run IN-PROCESS (fold / synthesis-fold / code-review verify). // verify uses ctx.dispatch → dispatchSubAgent (read-only qwen workers). - out = await s.run(buildCtx(input, results, model, dispatch)); + out = await s.run(buildCtx(input, results, model, dispatch, runId, s.id)); } catch (err) { await failRun(runId, flow, input, model, `code step '${s.id}' threw: ${errMsg(err)}`, s.id); return; @@ -559,6 +680,14 @@ export function createFlowRunner(deps: Deps): FlowRunner { await appendStepEvent(sql, runId, stepId, status, output ? { outputLength: output.length } : undefined); } + function cleanupMessaging(runId: string): void { + const cleanups = messagingCleanups.get(runId); + if (cleanups) { + for (const fn of cleanups) fn(); + messagingCleanups.delete(runId); + } + } + // ─── run completion ───────────────────────────────────────────────────────── async function finishRun( @@ -580,12 +709,16 @@ export function createFlowRunner(deps: Deps): FlowRunner { UPDATE flow_runs SET status = 'completed', report = ${report}, updated_at = clock_timestamp() WHERE id = ${runId} AND status = 'running' `; - if (updated.count === 0) return; // already terminal (e.g. cancelled) — don't publish + if (updated.count === 0) { + cleanupMessaging(runId); + return; // already terminal (e.g. cancelled) — don't publish + } deps.onRunTerminal?.(runId, 'completed'); publishStep(runId, lastAgentStepId(flow, input, model), 'completed', { run_status: 'completed', report, }); + cleanupMessaging(runId); } async function failRun( @@ -606,6 +739,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { log.warn({ runId, error }, 'flow-runner: run failed'); await appendStepEvent(sql, runId, stepId, 'failed', { error }); publishStep(runId, stepId, 'failed', { run_status: 'failed' }); + cleanupMessaging(runId); } async function cancelRun(runId: string): Promise { @@ -633,6 +767,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { } } log.info({ runId }, 'flow-runner: run cancelled'); + cleanupMessaging(runId); } /** The terminal agent step in roster order — a valid roster step_id to carry the @@ -918,6 +1053,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { .map((s) => s.task_id); log.info({ runId }, 'flow-runner: run cancelled by request'); + cleanupMessaging(runId); return { cancelled: true, taskIds }; }