/** * Top-level workflow executor for the Ion engine. * * Orchestrates the full lifecycle of a workflow run: * - Load configuration and resolve provider/model * - Create or resume a WorkflowRun * - Path-lock guard (prevent concurrent runs on the same working path) * - Pre-create artifacts directory * - Delegate to executeDagWorkflow for DAG traversal * - Update run status on completion/failure * - Emit events and notify the user */ import { mkdir } from 'node:fs/promises'; import { join, resolve } from 'node:path'; import type { WorkflowDefinition } from '../schema/index.js'; import type { IWorkflowPlatform, IWorkflowStore, WorkflowDeps, WorkflowConfig, WorkflowRun, CreateWorkflowRunData, } from './deps.js'; import { executeDagWorkflow, type DagWorkflowResult } from './dag-executor.js'; import { safeSendMessage } from './utils.js'; // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- /** Options for workflow execution. */ export interface WorkflowExecutionOptions { /** Whether to resume a previously failed/paused run. */ resume?: boolean; /** Additional input variables for the workflow. */ input?: Record; /** Override the provider id. */ provider?: string; /** Override the model. */ model?: string; /** Codebase id for scoped paths. */ codebaseId?: string; } /** Result of a workflow execution. */ export interface WorkflowExecutionResult { /** The workflow run record. */ run: WorkflowRun; /** The DAG execution result. */ dagResult?: DagWorkflowResult; /** Whether the execution was successful. */ success: boolean; /** Error message if execution failed. */ error?: string; } /** Hydrated resumable run data. */ export interface HydratedResumableRun { /** The pre-created or resumed workflow run. */ preCreatedRun: WorkflowRun; /** Prior completed node outputs from the previous run. */ priorCompletedNodes: Record>; } /** Resolved project paths for a workflow run. */ export interface ProjectPaths { /** Directory for workflow artifacts. */ artifactsDir: string; /** Directory for workflow logs. */ logDir: string; } // --------------------------------------------------------------------------- // Main executor // --------------------------------------------------------------------------- /** * Execute a workflow from start to finish. * * This is the primary entry point for running a workflow. It handles: * 1. Loading configuration * 2. Resolving provider and model * 3. Creating or resuming a WorkflowRun * 4. Path-lock guard * 5. Pre-creating the artifacts directory * 6. Delegating to executeDagWorkflow * 7. Updating run status on completion/failure * * @param deps - Dependency injection container. * @param platform - Platform interface for messaging. * @param conversationId - Conversation channel id. * @param cwd - Working directory for the workflow. * @param workflow - The workflow definition. * @param userMessage - The triggering user message (stored as input). * @param opts - Execution options. * @returns WorkflowExecutionResult with run, dag result, and success status. */ export async function executeWorkflow( deps: WorkflowDeps, platform: IWorkflowPlatform, conversationId: string, cwd: string, workflow: WorkflowDefinition, userMessage: string, opts: WorkflowExecutionOptions = {}, ): Promise { // 1. Load configuration let config: WorkflowConfig; try { config = await deps.loadConfig(cwd); } catch (err) { return { run: createFailedRun(workflow, err), success: false, error: `Failed to load configuration: ${err instanceof Error ? err.message : String(err)}`, }; } // 2. Resolve provider and model const providerId = opts.provider ?? workflow.provider ?? config.assistant; const model = opts.model ?? workflow.model ?? config.assistants[providerId]?.model; // 3. Create or resume a workflow run let workflowRun: WorkflowRun; let priorCompletedNodes: Record> | undefined; if (opts.resume) { // Try to find an existing run to resume const existingRun = await deps.store.getActiveWorkflowRunByPath(workflow.name); if (existingRun) { const hydrated = await hydrateResumableRun(deps, existingRun); workflowRun = hydrated.preCreatedRun; priorCompletedNodes = hydrated.priorCompletedNodes; } else { // No existing run — create a new one workflowRun = await createNewRun(deps, workflow, userMessage, opts); } } else { // 4. Path-lock guard: check no other run is active const activeRun = await deps.store.getActiveWorkflowRunByPath(workflow.name); if (activeRun) { const errorMsg = `Workflow "${workflow.name}" already has an active run (${activeRun.id}). Wait for it to complete or cancel it first.`; await safeSendMessage(platform, conversationId, `❌ ${errorMsg}`); return { run: createFailedRun(workflow, new Error(errorMsg)), success: false, error: errorMsg, }; } workflowRun = await createNewRun(deps, workflow, userMessage, opts); } // 5. Pre-create artifacts directory const paths = resolveProjectPaths(deps, cwd, workflowRun.id, opts.codebaseId); try { await mkdir(paths.artifactsDir, { recursive: true }); await mkdir(paths.logDir, { recursive: true }); } catch (err) { // Artifacts dir creation is best-effort } // 6. Set status to running try { workflowRun = await deps.store.updateWorkflowRun(workflowRun.id, { status: 'running', }); } catch (err) { return { run: workflowRun, success: false, error: `Failed to set workflow run status to running: ${err instanceof Error ? err.message : String(err)}`, }; } // 7. Notify user await safeSendMessage( platform, conversationId, `🚀 Starting workflow "${workflow.name}" (run ${workflowRun.id})`, ); // 8. Execute the DAG let dagResult: DagWorkflowResult | undefined; try { dagResult = await executeDagWorkflow( deps, platform, conversationId, cwd, workflow, workflowRun, priorCompletedNodes, ); // 9. Update run status on completion if (dagResult.success) { workflowRun = await deps.store.updateWorkflowRun(workflowRun.id, { status: 'completed', output: Object.fromEntries(dagResult.nodeOutputs), }); await safeSendMessage( platform, conversationId, `✅ Workflow "${workflow.name}" completed successfully\n${dagResult.summary}`, ); } else { workflowRun = await deps.store.failWorkflowRun( workflowRun.id, dagResult.error ?? 'Workflow failed', ); await safeSendMessage( platform, conversationId, `❌ Workflow "${workflow.name}" failed: ${dagResult.error}`, ); } return { run: workflowRun, dagResult, success: dagResult.success, error: dagResult.error, }; } catch (err) { // Unhandled error — update DB and notify const errorMsg = err instanceof Error ? err.message : String(err); try { workflowRun = await deps.store.failWorkflowRun(workflowRun.id, errorMsg); } catch { // Best-effort DB update } await safeSendMessage( platform, conversationId, `❌ Workflow "${workflow.name}" failed with error: ${errorMsg}`, ); // Emit error event try { await deps.store.createWorkflowEvent({ runId: workflowRun.id, type: 'workflow_failed', data: { error: errorMsg }, }); } catch { // Best-effort event emission } return { run: workflowRun, success: false, error: errorMsg, }; } } // --------------------------------------------------------------------------- // Resume support // --------------------------------------------------------------------------- /** * Hydrate a resumable workflow run. * * Loads completed node outputs from the previous run and sets the * run status back to 'running' so execution can continue. * * @param deps - Dependency injection container. * @param candidate - The existing workflow run to resume. * @returns Hydrated run with prior completed nodes. */ export async function hydrateResumableRun( deps: WorkflowDeps, candidate: WorkflowRun, ): Promise { // Load completed node outputs from the previous run const priorCompletedNodes = await deps.store.getCompletedDagNodeOutputs(candidate.id); // Resume the workflow run (set status back to 'running') const preCreatedRun = await deps.store.resumeWorkflowRun(candidate.id); return { preCreatedRun, priorCompletedNodes, }; } // --------------------------------------------------------------------------- // Project paths // --------------------------------------------------------------------------- /** * Resolve project paths for a workflow run. * * Uses codebase-scoped paths if a codebaseId is provided, * otherwise falls back to cwd-based paths. * * @param deps - Dependency injection container. * @param cwd - Working directory. * @param workflowRunId - The workflow run id. * @param codebaseId - Optional codebase id for scoped paths. * @returns Resolved artifacts and log directories. */ export function resolveProjectPaths( _deps: WorkflowDeps, cwd: string, workflowRunId: string, codebaseId?: string, ): ProjectPaths { if (codebaseId) { // Codebase-scoped paths return { artifactsDir: resolve(cwd, '.ion', 'codebases', codebaseId, 'artifacts', workflowRunId), logDir: resolve(cwd, '.ion', 'codebases', codebaseId, 'logs', workflowRunId), }; } // Cwd-based paths (default) return { artifactsDir: resolve(cwd, '.ion', 'artifacts', workflowRunId), logDir: resolve(cwd, '.ion', 'logs', workflowRunId), }; } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /** * Create a new workflow run in the store. */ async function createNewRun( deps: WorkflowDeps, workflow: WorkflowDefinition, userMessage: string, opts: WorkflowExecutionOptions, ): Promise { const data: CreateWorkflowRunData = { workflowPath: workflow.name, workflowName: workflow.name, trigger: 'manual', input: { message: userMessage, ...(opts.input ?? {}), }, }; return deps.store.createWorkflowRun(data); } /** * Create a minimal failed run object for error cases where * the store is not available. */ function createFailedRun(workflow: WorkflowDefinition, error: unknown): WorkflowRun { return { id: 'error', workflowPath: workflow.name, workflowName: workflow.name, status: 'failed', trigger: 'manual', input: {}, error: error instanceof Error ? error.message : String(error), createdAt: new Date(), updatedAt: new Date(), }; }