// v2.8.0: WorkflowManager — ties discovery, sandbox, and inference dispatch // together into a single orchestrator for multi-agent workflow scripts. // // Creates isolated sessions+chats for each agent() call within a workflow, // dispatches inference via the existing pipeline, polls for completion, and // returns structured results. All failures are returned as errors rather than // thrown exceptions (catch-safe API). import { randomUUID } from 'node:crypto'; import type { Sql } from '../../db.js'; import type { Config } from '../../config.js'; import type { FastifyBaseLogger } from 'fastify'; import type { Broker } from '../broker.js'; import type { UserStreamFrame } from '../../types/api.js'; import type { WorkflowRun, WorkflowRunStatus, WorkflowContext, WorkflowEvent, AgentTaskSpec, AgentTaskResult, WorkflowScriptMeta, } from './types.js'; import { discoverWorkflows, findWorkflow, isBuiltinWorkflow } from './discovery.js'; import { getBuiltinWorkflow } from './catalog.js'; import { cacheKey, getCachedResult, setCachedResult } from './resumability.js'; import { executeWorkflowScript, executeWorkflowScriptFromCode, isEsmSyntax, transformEsmToCjs, } from './sandbox.js'; import { runInference } from '../inference/index.js'; import { readFileSync } from 'node:fs'; import vm from 'node:vm'; /** * Maximum time to wait for a single agent task to complete (5 minutes). * Beyond this, the task is treated as failed/timed out. */ const AGENT_TASK_TIMEOUT_MS = 300_000; /** * Polling interval when waiting for an agent task to finish. */ const POLL_INTERVAL_MS = 500; /** * Maximum time for the entire workflow run (30 minutes). */ const WORKFLOW_TIMEOUT_MS = 1_800_000; /** * Token budget tracker. Tracks total token spend across agent calls. */ class BudgetTracker { total: number | null; #spent = 0; constructor(total: number | null) { this.total = total; } spend(amount: number): void { this.#spent += amount; } spent(): number { return this.#spent; } remaining(): number { if (this.total === null) return Infinity; return Math.max(0, this.total - this.#spent); } } /** * Creates a no-op bounded publish function that avoids WS dependency * for background workflow agent tasks. Messages are still persisted to DB. */ function noopPublish(): void { /* intentional no-op */ } function noopPublishUser(): void { /* intentional no-op */ } /** * Callback type for workflow lifecycle events. */ export type WorkflowEventHandler = (event: WorkflowEvent) => void; /** * WorkflowManager — the orchestrator for sandboxed multi-agent workflows. */ export class WorkflowManager { /** Active workflow runs by run ID. */ readonly #runs = new Map(); /** Registered event listeners. */ readonly #listeners = new Set(); constructor( private sql: Sql, private config: Config, private log: FastifyBaseLogger, private projectRoot: string, private projectId: string, private broker: Broker, ) {} // ---- public API ---- /** * Discover all available workflow scripts. */ listWorkflows(): WorkflowMetaInfo[] { return discoverWorkflows(this.projectRoot).map((m) => ({ name: m.name, sourceFile: m.sourceFile, })); } /** * Find a specific workflow by name. */ getWorkflow(name: string): WorkflowMetaInfo | undefined { const found = findWorkflow(name, this.projectRoot); if (!found) return undefined; return { name: found.name, sourceFile: found.sourceFile }; } /** * Load the metadata (name, description, phases) from a workflow file * without executing it. * * @param name - Workflow name. * @returns The script's meta, or undefined if not found. */ async loadWorkflowMeta(name: string): Promise { const found = findWorkflow(name, this.projectRoot); if (!found) return undefined; // Built-in workflows: return meta directly from the catalog if (isBuiltinWorkflow(found)) { const builtin = getBuiltinWorkflow(name); if (!builtin) return { name, description: '' }; return { name: builtin.name, description: builtin.description, phases: builtin.phases, }; } try { // Load meta by executing the script in a throwaway context const context = this.#createMinimalContext('meta-loader'); const code = readFileSync(found.sourceFile, 'utf8'); const finalCode = isEsmSyntax(code) ? transformEsmToCjs(code) : code; const sandboxData: Record & { module: { exports: Record }; } = { ...context, console: { log: () => {} }, module: { exports: {} }, exports: {}, }; vm.createContext(sandboxData as unknown as vm.Context); new vm.Script(finalCode).runInContext(sandboxData as unknown as vm.Context, { timeout: 10_000, filename: found.sourceFile, }); const meta = sandboxData.module.exports.meta as WorkflowScriptMeta | undefined; return meta ?? { name, description: '' }; } catch { return { name, description: '' }; } } /** * Execute a workflow by name. * * @param name - The workflow name (without .js extension). * @param args - Optional arguments to pass to the workflow function. * @returns The run ID for tracking. */ async runWorkflow( name: string, args?: Record, ): Promise<{ runId: string }> { const found = findWorkflow(name, this.projectRoot); if (!found) { throw new Error(`Workflow not found: "${name}". ` + `Check .boocode/workflows/ or ~/.boocode/workflows/ for a ${name}.js file.`); } const runId = randomUUID(); const startedAt = new Date().toISOString(); const state: WorkflowRunState = { id: runId, name, status: 'running', startedAt, abortController: new AbortController(), }; this.#runs.set(runId, state); this.#emit({ type: 'run_started', runId, name }); // Run asynchronously — caller receives the runId immediately. void this.#executeRun(state, found.sourceFile, args ?? {}); return { runId }; } /** * Get the current status of a workflow run. */ getRunStatus(runId: string): WorkflowRun | undefined { const state = this.#runs.get(runId); if (!state) return undefined; return { id: state.id, name: state.name, status: state.status, started_at: state.startedAt, finished_at: state.finishedAt, error: state.error, }; } /** * Cancel a running workflow. Best-effort — agent tasks in-flight will be * aborted via AbortSignal. * * @param runId - The workflow run ID. * @returns true if the workflow was found and cancelled. */ cancelRun(runId: string): boolean { const state = this.#runs.get(runId); if (!state || state.status !== 'running') return false; state.status = 'cancelled'; state.finishedAt = new Date().toISOString(); state.abortController.abort(); this.#emit({ type: 'run_cancelled', runId, name: state.name }); return true; } /** * Subscribe to workflow lifecycle events. * Returns an unsubscribe function. */ onEvent(handler: WorkflowEventHandler): () => void { this.#listeners.add(handler); return () => { this.#listeners.delete(handler); }; } // ---- internal execution ---- /** * Execute the workflow script in the sandbox. */ async #executeRun( state: WorkflowRunState, sourceFile: string, args: Record, ): Promise { const BULTIN_MARKER = 'builtin:'; const budgetTracker = new BudgetTracker(null); // no fixed total yet const runId = state.id; try { const context: WorkflowContext = { agent: (prompt, opts) => this.#handleAgentCall(runId, prompt, opts ?? { prompt }, state.abortController.signal), parallel: (thunks) => Promise.all(thunks.map((t) => t())), pipeline: async (items, ...stages) => { let result = [...items]; for (const stage of stages) { result = await Promise.all(result.map(stage)); } return result; }, phase: (title) => { this.#emit({ type: 'phase', runId, title }); }, log: (message) => { this.#emit({ type: 'log', runId, message }); }, budget: { total: budgetTracker.total, spent: () => budgetTracker.spent(), remaining: () => budgetTracker.remaining(), }, args, workflow: (nestedName, nestedArgs) => this.#handleNestedWorkflow(runId, nestedName, nestedArgs ?? {}, state.abortController.signal), }; let result: unknown; if (sourceFile.startsWith(BULTIN_MARKER)) { // Built-in workflow: generate script from catalog and execute const workflowName = sourceFile.slice(BULTIN_MARKER.length); const builtin = getBuiltinWorkflow(workflowName); if (!builtin) { throw new Error(`Built-in workflow "${workflowName}" not found in catalog`); } const scriptCode = builtin.generateScript(args); result = await executeWorkflowScriptFromCode(scriptCode, context, args, sourceFile); } else { result = await executeWorkflowScript(sourceFile, context, args); } // Only update to completed if we haven't been cancelled mid-flight. if (state.status !== 'cancelled') { state.status = 'completed'; state.finishedAt = new Date().toISOString(); } // Store result state.result = result; this.#emit({ type: 'run_completed', runId, name: state.name }); } catch (err) { if (state.status === 'cancelled') return; // already handled const message = err instanceof Error ? err.message : String(err); state.status = 'failed'; state.finishedAt = new Date().toISOString(); state.error = message; this.#emit({ type: 'run_failed', runId, name: state.name, error: message }); } } /** * Handle an `agent()` call from within a workflow. * Creates a session + chat, dispatches inference, polls for completion. */ async #handleAgentCall( runId: string, prompt: string, spec: AgentTaskSpec, signal: AbortSignal, ): Promise { const label = spec.label ?? `agent-${prompt.slice(0, 40).replace(/\s+/g, '_')}`; this.#emit({ type: 'agent_task_started', runId, label }); try { const result = await this.executeAgentTask(prompt, spec, signal); this.#emit({ type: 'agent_task_completed', runId, label }); return result; } catch (err) { this.#emit({ type: 'agent_task_completed', runId, label }); const message = err instanceof Error ? err.message : String(err); return { ok: false, output: null, error: message, } satisfies AgentTaskResult; } } /** * Core agent task execution: create session/chat, dispatch inference, poll. * * Exported as a public method for testing. */ async executeAgentTask( prompt: string, spec: AgentTaskSpec, signal?: AbortSignal, ): Promise { // ---- 0. Check resumability cache before creating a new task ---- const cacheKeyStr = cacheKey(spec, ''); const cached = getCachedResult(cacheKeyStr); if (cached) { return { ...cached, cached: true } satisfies AgentTaskResult; } const model = spec.model ?? null; // ---- 1. Create a session for this agent task ---- const sessionName = `workflow-agent-${spec.label ?? 'task'}`; const sessionResult = await this.sql.begin(async (tx) => { const [session] = await tx<{ id: string }[]>` INSERT INTO sessions (project_id, name, model) VALUES (${this.projectId}, ${sessionName}, ${model ?? 'qwen3.6-35b-a3b-mxfp4'}) RETURNING id `; if (!session) throw new Error('Failed to create workflow agent session'); return session; }); const sessionId = sessionResult.id; // ---- 2. Create a chat in this session ---- const chatResult = await this.sql.begin(async (tx) => { const [chat] = await tx<{ id: string }[]>` INSERT INTO chats (session_id, name) VALUES (${sessionId}, ${spec.label ?? null}) RETURNING id `; if (!chat) throw new Error('Failed to create workflow agent chat'); return chat; }); const chatId = chatResult.id; // ---- 3. Insert user message + streaming assistant message ---- const { userMessageId, assistantMessageId } = await this.sql.begin(async (tx) => { const [userMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'user', ${prompt}, 'complete', clock_timestamp()) RETURNING id `; const [assistantMsg] = await tx<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; return { userMessageId: userMsg!.id, assistantMessageId: assistantMsg!.id, }; }); // ---- 4. Dispatch inference ---- // Create a bounded InferenceContext that won't crash on missing WS const ctx: import('../inference/types.js').InferenceContext = { sql: this.sql, config: this.config, log: this.log, publish: noopPublish as unknown as import('../inference/types.js').FramePublisher, publishUser: noopPublishUser as unknown as (frame: UserStreamFrame) => void, broker: this.broker, }; // Create a merged signal (workflow cancellation + optional caller signal) const mergedController = new AbortController(); const onAbort = () => mergedController.abort(); signal?.addEventListener('abort', onAbort, { once: true }); const inferencePromise = runInference( ctx, sessionId, chatId, assistantMessageId, mergedController.signal, ).finally(() => { signal?.removeEventListener('abort', onAbort); }); // ---- 5. Poll for completion ---- try { const result = await this.#pollForCompletion( chatId, assistantMessageId, inferencePromise, mergedController.signal, ); // Cache successful results for resumability if (typeof result === 'object' && result !== null && (result as Record).ok === true) { setCachedResult(cacheKeyStr, { ok: true, output: (result as Record).output, token_usage: (result as Record).token_usage as | { prompt: number; completion: number } | undefined, }); } return result; } catch (err) { if ((err as Error)?.message === 'cancelled') { return { ok: false, output: null, error: 'Task was cancelled' } satisfies AgentTaskResult; } return { ok: false, output: null, error: err instanceof Error ? err.message : String(err), } satisfies AgentTaskResult; } } /** * Poll the messages table until the assistant message status changes * from 'streaming' to 'complete' / 'failed' / 'cancelled'. */ async #pollForCompletion( chatId: string, assistantMessageId: string, inferencePromise: Promise, signal: AbortSignal, ): Promise { // Wait for either inference to finish or timeout const timeout = new Promise((_, reject) => { const timer = setTimeout(() => { reject(new Error(`Agent task timed out after ${AGENT_TASK_TIMEOUT_MS}ms`)); }, AGENT_TASK_TIMEOUT_MS); signal.addEventListener('abort', () => { clearTimeout(timer); reject(new Error('cancelled')); }, { once: true }); }); // Poll loop — runs until inference completes, timeout, or cancellation const pollLoop = (async () => { // eslint-disable-next-line no-constant-condition while (true) { await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); const rows = await this.sql<{ status: string; content: string; tool_calls: unknown; tokens_used: number | null; }[]>` SELECT m.status, m.content, m.role, (SELECT jsonb_agg(p.payload ORDER BY p.sequence) FROM message_parts p WHERE p.message_id = m.id AND p.kind = 'tool_call' AND p.hidden_at IS NULL) AS tool_calls, m.tokens_used FROM messages m WHERE m.id = ${assistantMessageId} `; const msg = rows[0]; if (!msg) { throw new Error(`Assistant message ${assistantMessageId} not found`); } if (msg.status === 'complete') { return { ok: true, output: msg.content, token_usage: msg.tokens_used ? { prompt: 0, completion: msg.tokens_used } : undefined, }; } if (msg.status === 'failed' || msg.status === 'cancelled') { return { ok: false, output: msg.content || null, error: `Assistant message ended with status: ${msg.status}`, }; } // Still streaming — continue polling } })(); // Race: polling vs timeout vs inference error vs cancellation try { return await Promise.race([pollLoop, timeout]); } finally { // Ensure inference is settled (but don't block on it) inferencePromise.catch(() => {}); } } /** * Handle a nested `workflow()` call from within a workflow. * Runs the named workflow with the given args and returns its result. */ async #handleNestedWorkflow( parentRunId: string, name: string, args: Record, signal: AbortSignal, ): Promise { const found = findWorkflow(name, this.projectRoot); if (!found) { return { ok: false, output: null, error: `Nested workflow not found: "${name}"` }; } const nestedRunId = randomUUID(); const startedAt = new Date().toISOString(); const nestedState: WorkflowRunState = { id: nestedRunId, name, status: 'running', startedAt, abortController: new AbortController(), }; this.#runs.set(nestedRunId, nestedState); this.#emit({ type: 'run_started', runId: nestedRunId, name }); // Link parent cancellation to nested signal.addEventListener('abort', () => { nestedState.abortController.abort(); }, { once: true }); await this.#executeRun(nestedState, found.sourceFile, args); if (nestedState.status === 'cancelled') { return { ok: false, output: null, error: 'Nested workflow cancelled' }; } if (nestedState.status === 'failed') { return { ok: false, output: null, error: nestedState.error }; } return { ok: true, output: nestedState.result }; } /** * Create a minimal WorkflowContext for non-execution purposes * (e.g. loading meta). */ #createMinimalContext(runId: string): Record { return { agent: () => Promise.reject(new Error('Not available in this context')), parallel: () => Promise.reject(new Error('Not available in this context')), pipeline: () => Promise.reject(new Error('Not available in this context')), phase: () => {}, log: () => {}, budget: { total: null, spent: () => 0, remaining: () => Infinity }, args: {}, workflow: () => Promise.reject(new Error('Not available in this context')), }; } /** * Emit a workflow event to all registered listeners. */ #emit(event: WorkflowEvent): void { for (const handler of this.#listeners) { try { handler(event); } catch { // Swallow listener errors — one bad listener shouldn't break others } } } } // ---- internal types ---- /** * Metadata returned from listWorkflows / getWorkflow. */ export interface WorkflowMetaInfo { name: string; sourceFile: string; } /** * Internal mutable state for an active workflow run. */ interface WorkflowRunState { id: string; name: string; status: WorkflowRunStatus; startedAt: string; finishedAt?: string; error?: string; result?: unknown; abortController: AbortController; }