/** * Filesystem-backed workflow store. * * Stores each run as `{basePath}/{runId}/run.json` and * `{basePath}/{runId}/events.jsonl`. Thread-safe writes use atomic * rename (write to temp file, then rename). */ import { mkdir, writeFile, readFile, readdir, rename, unlink } from 'node:fs/promises'; import { existsSync } from 'node:fs'; import { join } from 'node:path'; import { nanoid } from 'nanoid'; import type { IWorkflowStore, WorkflowRun, WorkflowEvent, WorkflowRunStatus, CreateWorkflowRunData, } from '../engine/deps.js'; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- const ACTIVE_STATUSES: WorkflowRunStatus[] = ['pending', 'running']; function parseRun(raw: string): WorkflowRun { const obj = JSON.parse(raw); return { ...obj, createdAt: new Date(obj.createdAt), updatedAt: new Date(obj.updatedAt), }; } function serializeRun(run: WorkflowRun): string { return JSON.stringify( { ...run, createdAt: run.createdAt.toISOString(), updatedAt: run.updatedAt.toISOString(), }, null, 2, ); } function parseEvent(line: string): WorkflowEvent { const obj = JSON.parse(line); return { ...obj, createdAt: new Date(obj.createdAt), }; } function serializeEvent(event: WorkflowEvent): string { return JSON.stringify({ ...event, createdAt: event.createdAt.toISOString(), }); } // --------------------------------------------------------------------------- // Atomic write helper // --------------------------------------------------------------------------- async function atomicWrite(filePath: string, data: string): Promise { const tmp = `${filePath}.${nanoid(8)}.tmp`; await writeFile(tmp, data, 'utf-8'); await rename(tmp, filePath); } // --------------------------------------------------------------------------- // Factory // --------------------------------------------------------------------------- export function createFsStore(basePath: string): IWorkflowStore { // Ensure base directory exists on first write — no side effects at import. const store: IWorkflowStore = { // -- Run lifecycle ------------------------------------------------------- async createWorkflowRun(data: CreateWorkflowRunData): Promise { const id = nanoid(); const now = new Date(); const run: WorkflowRun = { id, workflowPath: data.workflowPath, workflowName: data.workflowName, status: 'pending', trigger: data.trigger, input: data.input, createdAt: now, updatedAt: now, }; const runDir = join(basePath, id); await mkdir(runDir, { recursive: true }); await atomicWrite(join(runDir, 'run.json'), serializeRun(run)); // Create empty events file await atomicWrite(join(runDir, 'events.jsonl'), ''); return run; }, async getWorkflowRun(id: string): Promise { const filePath = join(basePath, id, 'run.json'); if (!existsSync(filePath)) return null; const raw = await readFile(filePath, 'utf-8'); return parseRun(raw); }, async updateWorkflowRun( id: string, data: Partial, ): Promise { const existing = await store.getWorkflowRun(id); if (!existing) throw new Error(`WorkflowRun not found: ${id}`); const updated: WorkflowRun = { ...existing, ...data, id: existing.id, // id is immutable updatedAt: new Date(), }; await atomicWrite( join(basePath, id, 'run.json'), serializeRun(updated), ); return updated; }, async failWorkflowRun(id: string, error: string): Promise { return store.updateWorkflowRun(id, { status: 'failed', error, } as Partial); }, async getWorkflowRunStatus( id: string, ): Promise { const run = await store.getWorkflowRun(id); return run?.status ?? null; }, // -- Events -------------------------------------------------------------- async createWorkflowEvent( event: Omit, ): Promise { const full: WorkflowEvent = { ...event, id: nanoid(), createdAt: new Date(), }; const eventsPath = join(basePath, event.runId, 'events.jsonl'); // Ensure directory exists if (!existsSync(join(basePath, event.runId))) { await mkdir(join(basePath, event.runId), { recursive: true }); } const line = serializeEvent(full) + '\n'; // Append — not atomic for appends, but each line is self-contained const existing = existsSync(eventsPath) ? await readFile(eventsPath, 'utf-8').catch(() => '') : ''; await atomicWrite(eventsPath, existing + line); return full; }, async getCompletedDagNodeOutputs( runId: string, ): Promise>> { const eventsPath = join(basePath, runId, 'events.jsonl'); if (!existsSync(eventsPath)) return {}; const raw = await readFile(eventsPath, 'utf-8'); const lines = raw.split('\n').filter(Boolean); const outputs: Record> = {}; for (const line of lines) { const event = parseEvent(line); if (event.type === 'node_complete' && event.nodeId && event.data?.output) { outputs[event.nodeId] = event.data.output as Record; } } return outputs; }, // -- Active runs --------------------------------------------------------- async getActiveWorkflowRunByPath( path: string, opts?: { excludeId?: string }, ): Promise { if (!existsSync(basePath)) return null; const entries = await readdir(basePath, { withFileTypes: true }); for (const entry of entries) { if (!entry.isDirectory()) continue; const runPath = join(basePath, entry.name, 'run.json'); if (!existsSync(runPath)) continue; const raw = await readFile(runPath, 'utf-8'); const run = parseRun(raw); if (run.workflowPath !== path) continue; if (!ACTIVE_STATUSES.includes(run.status)) continue; if (opts?.excludeId && run.id === opts.excludeId) continue; return run; } return null; }, // -- Codebase ------------------------------------------------------------ async getCodebase( _id: string, ): Promise | null> { // Filesystem store does not persist codebase records return null; }, async getCodebaseEnvVars( _id: string, ): Promise> { // Filesystem store does not persist codebase env vars return {}; }, // -- Resumption ---------------------------------------------------------- async resumeWorkflowRun(id: string): Promise { return store.updateWorkflowRun(id, { status: 'running', } as Partial); }, }; return store; }