/** * DAG Executor — the core execution engine for Ion workflows. * * Traverses a DAG of nodes in topological layers, executing nodes within * each layer concurrently via Promise.allSettled. Supports prompt nodes, * command/bash/script nodes, approval gates, loop nodes, and cancel nodes. * * Architecture mirrors Archon's dag-executor: Kahn's algorithm for layer * building, trigger rules for dependency evaluation, and Promise.allSettled * for concurrent execution within each layer. */ import { execFile } from 'node:child_process'; import { promisify } from 'node:util'; import { readFile } from 'node:fs/promises'; import { join } from 'node:path'; import type { DagNode, PromptNode, BashNode, ScriptNode, ApprovalNode, LoopNode, CancelNode, NodeOutput, NodeExecutionResult, TriggerRule, WorkflowDefinition, } from '../schema/index.js'; import { DEFAULT_TRIGGER_RULE } from '../schema/index.js'; import { isBashNode, isScriptNode, isLoopNode, isApprovalNode, isCancelNode, isPromptNode, isCommandNode, } from '../schema/index.js'; import type { IWorkflowPlatform, WorkflowDeps, WorkflowConfig } from './deps.js'; import { evaluateCondition, substituteWorkflowVariables, substituteNodeOutputRefs, buildPromptWithContext, classifyError, safeSendMessage, formatSubprocessFailure, sleep, retryWithBackoff, OutputRefError, DagCycleError, NodeTimeoutError, ApprovalRejectedError, LoopMaxIterationsError, } from './utils.js'; const execFileAsync = promisify(execFile); // --------------------------------------------------------------------------- // Topological layer building (Kahn's algorithm) // --------------------------------------------------------------------------- /** * Build topological layers from a flat list of DAG nodes using Kahn's algorithm. * * Each layer contains nodes that can execute concurrently. Nodes in layer N+1 * depend only on nodes in layers 0..N. * * @param nodes - Flat list of DAG nodes with `depends_on` references. * @returns Array of layers, each layer being an array of nodes. * @throws DagCycleError if a cycle is detected. */ export function buildTopologicalLayers(nodes: DagNode[]): DagNode[][] { const nodeMap = new Map(); const inDegree = new Map(); const adjacency = new Map>(); // dep → nodes that depend on it // Initialize for (const node of nodes) { nodeMap.set(node.id, node); inDegree.set(node.id, node.depends_on.length); for (const dep of node.depends_on) { if (!adjacency.has(dep)) adjacency.set(dep, new Set()); adjacency.get(dep)!.add(node.id); } } // Start with zero-in-degree nodes let currentLayer: string[] = []; for (const [id, degree] of inDegree) { if (degree === 0) currentLayer.push(id); } const layers: DagNode[][] = []; let totalProcessed = 0; while (currentLayer.length > 0) { // Build the layer from current zero-in-degree nodes const layerNodes = currentLayer .map((id) => nodeMap.get(id)) .filter((n): n is DagNode => n !== undefined); layers.push(layerNodes); totalProcessed += layerNodes.length; // Reduce in-degree for dependents const nextLayer: string[] = []; for (const id of currentLayer) { const dependents = adjacency.get(id); if (!dependents) continue; for (const depId of dependents) { const currentDeg = inDegree.get(depId)! - 1; inDegree.set(depId, currentDeg); if (currentDeg === 0) nextLayer.push(depId); } } currentLayer = nextLayer; } // Cycle detection if (totalProcessed < nodes.length) { throw new DagCycleError(nodes.length, totalProcessed); } return layers; } // --------------------------------------------------------------------------- // Trigger rule evaluation // --------------------------------------------------------------------------- /** * Check whether a node should run or be skipped based on its trigger rule * and the completion states of its dependencies. * * @param node - The DAG node to evaluate. * @param nodeOutputs - Map of completed node outputs. * @returns `'run'` if the node should execute, `'skip'` if it should be skipped. */ export function checkTriggerRule( node: DagNode, nodeOutputs: Map, ): 'run' | 'skip' { const rule: TriggerRule = node.trigger_rule ?? DEFAULT_TRIGGER_RULE; if (node.depends_on.length === 0) return 'run'; const depOutputs = node.depends_on.map((depId) => nodeOutputs.get(depId)); switch (rule) { case 'all_success': // All dependencies must have completed successfully return depOutputs.every((o) => o?.state === 'completed') ? 'run' : 'skip'; case 'one_success': // At least one dependency must have completed successfully return depOutputs.some((o) => o?.state === 'completed') ? 'run' : 'skip'; case 'all_done': // All dependencies must have finished (any terminal status) return depOutputs.every((o) => o !== undefined) ? 'run' : 'skip'; case 'none_failed_min_one_success': // No dependency failed AND at least one succeeded const hasFailure = depOutputs.some((o) => o?.state === 'failed'); const hasSuccess = depOutputs.some((o) => o?.state === 'completed'); return !hasFailure && hasSuccess ? 'run' : 'skip'; default: return 'run'; } } // --------------------------------------------------------------------------- // Node output reference substitution // --------------------------------------------------------------------------- /** * Substitute node output references in a prompt string. * * Resolves `$nodeId.output` → full text, `$nodeId.output.field` → structured field. * * @param prompt - Template string with `$nodeId.output` references. * @param nodeOutputs - Map of node id → NodeOutput. * @param escapedForBash - If true, escape special bash characters in output values. */ export { substituteNodeOutputRefs } from './utils.js'; // --------------------------------------------------------------------------- // Prompt / command node execution // --------------------------------------------------------------------------- /** * Execute a single PromptNode or CommandNode by sending a prompt to an AI provider. * * Handles: * - Loading prompt from command file or inline prompt * - Variable substitution (workflow vars + node output refs) * - Streaming: accumulates output, forwards messages to platform * - Idle timeout: aborts after configurable period of inactivity * - Structured output: validates against output_format schema, reask loop * - Retry on transient errors */ export async function executeNodeInternal( node: PromptNode, deps: WorkflowDeps, platform: IWorkflowPlatform, conversationId: string, cwd: string, config: WorkflowConfig, nodeOutputs: Map, workflowVariables: Record, ): Promise { const providerId = node.provider ?? config.assistant; const provider = deps.getAgentProvider(providerId); // Resolve prompt text let promptText: string; if (node.command_file) { try { const filePath = join(cwd, node.command_file); promptText = await readFile(filePath, 'utf-8'); } catch (err) { return { state: 'failed', error: `Failed to read command file "${node.command_file}": ${err instanceof Error ? err.message : String(err)}`, }; } } else if (node.prompt) { promptText = node.prompt; } else { return { state: 'failed', error: `Prompt node "${node.id}" has neither prompt nor command_file` }; } // Apply variable substitution try { promptText = buildPromptWithContext(promptText, workflowVariables, nodeOutputs); } catch (err) { if (err instanceof OutputRefError) { return { state: 'failed', error: err.message }; } throw err; } // Merge node-level env vars const mergedVars = { ...workflowVariables, ...(node.env ?? {}) }; // Retry configuration const maxAttempts = node.retry?.max_attempts ?? 1; const onError = node.retry?.on_error ?? 'transient'; const delayMs = node.retry?.delay_ms ?? 1000; // Idle timeout const idleTimeoutMs = node.idle_timeout_ms ?? 300_000; // 5 minutes default // Structured output reask loop const maxReaskAttempts = node.output_format ? 3 : 1; let lastOutput = ''; let lastFields: Record | undefined; let lastError: string | undefined; let costUsd: number | undefined; for (let reaskAttempt = 0; reaskAttempt < maxReaskAttempts; reaskAttempt++) { const currentPrompt = reaskAttempt === 0 ? promptText : `${promptText}\n\nPrevious response did not match the expected format. Please try again, ensuring your response matches: ${JSON.stringify(node.output_format)}`; // Execute with retry let responseText: string | undefined; let retryError: unknown; const retryPredicate = onError === 'all' ? undefined : (err: unknown) => classifyError(err) === 'transient' || classifyError(err) === 'rate_limit'; try { responseText = await retryWithBackoff( () => executeWithIdleTimeout(provider, currentPrompt, idleTimeoutMs, node.id), maxAttempts, delayMs, retryPredicate, ); } catch (err) { retryError = err; } if (retryError) { const category = classifyError(retryError); if (category === 'timeout') { return { state: 'failed', error: `Node "${node.id}" timed out after ${idleTimeoutMs}ms of inactivity`, }; } return { state: 'failed', error: retryError instanceof Error ? retryError.message : String(retryError), }; } lastOutput = responseText ?? ''; // Validate structured output if schema provided if (node.output_format && lastOutput) { try { const parsed = tryParseStructuredOutput(lastOutput); if (parsed) { const validation = validateStructuredOutput(parsed, node.output_format); if (validation.valid) { lastFields = parsed; break; // Valid structured output } // If not valid and we have reask attempts left, continue loop if (reaskAttempt < maxReaskAttempts - 1) continue; } } catch { // If parsing fails and we have reask attempts left, continue if (reaskAttempt < maxReaskAttempts - 1) continue; } } // No structured output required, or best-effort on last attempt break; } // Notify platform await safeSendMessage( platform, conversationId, `✅ Node "${node.name ?? node.id}" completed`, ); return { state: 'completed', output: lastOutput, fields: lastFields, costUsd, }; } /** * Execute a provider call with an idle timeout. * * If no response is received within `idleTimeoutMs`, the request is aborted. */ async function executeWithIdleTimeout( provider: { sendPrompt: (prompt: string, options?: Record) => Promise }, prompt: string, idleTimeoutMs: number, nodeId: string, ): Promise { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), idleTimeoutMs); try { const result = await provider.sendPrompt(prompt, { signal: controller.signal }); clearTimeout(timeoutId); return result; } catch (err) { clearTimeout(timeoutId); if (controller.signal.aborted) { throw new NodeTimeoutError(nodeId, idleTimeoutMs); } throw err; } } /** * Attempt to parse structured output from a model response. * * Tries JSON parse first, then looks for JSON within markdown code blocks. */ function tryParseStructuredOutput(text: string): Record | undefined { // Try direct JSON parse try { const parsed = JSON.parse(text); if (typeof parsed === 'object' && parsed !== null && !Array.isArray(parsed)) { return parsed as Record; } } catch { // Not direct JSON } // Try extracting JSON from markdown code block const jsonBlockMatch = text.match(/```(?:json)?\s*\n?([\s\S]*?)\n?```/); if (jsonBlockMatch) { try { const parsed = JSON.parse(jsonBlockMatch[1]!); if (typeof parsed === 'object' && parsed !== null && !Array.isArray(parsed)) { return parsed as Record; } } catch { // Not valid JSON in code block } } return undefined; } /** * Validate parsed output against a schema definition. * * Simple validation: checks that all required keys are present. * Full JSON Schema validation would require a library; this is best-effort. */ function validateStructuredOutput( parsed: Record, schema: Record, ): { valid: boolean; missingKeys?: string[] } { const required = schema['required']; if (Array.isArray(required)) { const missing = required.filter((key) => !(key in parsed)); if (missing.length > 0) { return { valid: false, missingKeys: missing as string[] }; } } return { valid: true }; } // --------------------------------------------------------------------------- // Script / Bash node execution // --------------------------------------------------------------------------- /** * Execute a BashNode or ScriptNode. * * For bash nodes: runs `bash -c