/** * SQLite-backed workflow store. * * Uses better-sqlite3 for synchronous, file-based persistence. * The dependency is optional — a helpful error is thrown if not installed. */ import { nanoid } from 'nanoid'; import type { IWorkflowStore, WorkflowRun, WorkflowEvent, WorkflowRunStatus, CreateWorkflowRunData, } from '../engine/deps.js'; // --------------------------------------------------------------------------- // Optional dependency loading // --------------------------------------------------------------------------- async function loadBetterSqlite3(): Promise { try { return await import('better-sqlite3'); } catch { throw new Error( 'better-sqlite3 is not installed. Install it with: npm install better-sqlite3', ); } } // --------------------------------------------------------------------------- // Schema // --------------------------------------------------------------------------- const SCHEMA_SQL = ` CREATE TABLE IF NOT EXISTS workflow_runs ( id TEXT PRIMARY KEY, workflow_path TEXT NOT NULL, workflow_name TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', trigger TEXT NOT NULL, input TEXT NOT NULL DEFAULT '{}', output TEXT, error TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS workflow_events ( id TEXT PRIMARY KEY, run_id TEXT NOT NULL, node_id TEXT, type TEXT NOT NULL, data TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, FOREIGN KEY (run_id) REFERENCES workflow_runs(id) ); CREATE INDEX IF NOT EXISTS idx_workflow_runs_path_status ON workflow_runs(workflow_path, status); CREATE INDEX IF NOT EXISTS idx_workflow_events_run_id ON workflow_events(run_id); `; // --------------------------------------------------------------------------- // Row mappers // --------------------------------------------------------------------------- interface RunRow { id: string; workflow_path: string; workflow_name: string; status: string; trigger: string; input: string; output: string | null; error: string | null; created_at: string; updated_at: string; } interface EventRow { id: string; run_id: string; node_id: string | null; type: string; data: string; created_at: string; } function rowToRun(row: RunRow): WorkflowRun { return { id: row.id, workflowPath: row.workflow_path, workflowName: row.workflow_name, status: row.status as WorkflowRunStatus, trigger: row.trigger, input: JSON.parse(row.input), output: row.output ? JSON.parse(row.output) : undefined, error: row.error ?? undefined, createdAt: new Date(row.created_at), updatedAt: new Date(row.updated_at), }; } function rowToEvent(row: EventRow): WorkflowEvent { return { id: row.id, runId: row.run_id, nodeId: row.node_id ?? undefined, type: row.type, data: JSON.parse(row.data), createdAt: new Date(row.created_at), }; } // --------------------------------------------------------------------------- // Factory // --------------------------------------------------------------------------- export async function createSqliteStore( dbPath: string, ): Promise { const mod = await loadBetterSqlite3(); const DatabaseCtor = mod.default ?? mod; // eslint-disable-next-line @typescript-eslint/no-explicit-any const db: any = new DatabaseCtor(dbPath); // Enable WAL mode for better concurrent read performance db.pragma('journal_mode = WAL'); // Initialize schema db.exec(SCHEMA_SQL); const ACTIVE_STATUSES: WorkflowRunStatus[] = ['pending', 'running']; const store: IWorkflowStore = { // -- Run lifecycle ------------------------------------------------------- createWorkflowRun(data: CreateWorkflowRunData): Promise { const id = nanoid(); const now = new Date().toISOString(); const inputJson = JSON.stringify(data.input); db.prepare( `INSERT INTO workflow_runs (id, workflow_path, workflow_name, status, trigger, input, created_at, updated_at) VALUES (?, ?, ?, 'pending', ?, ?, ?, ?)`, ).run( id, data.workflowPath, data.workflowName, data.trigger, inputJson, now, now, ); return Promise.resolve({ id, workflowPath: data.workflowPath, workflowName: data.workflowName, status: 'pending', trigger: data.trigger, input: data.input, createdAt: new Date(now), updatedAt: new Date(now), }); }, getWorkflowRun(id: string): Promise { const row = db .prepare('SELECT * FROM workflow_runs WHERE id = ?') .get(id) as RunRow | undefined; if (!row) return Promise.resolve(null); return Promise.resolve(rowToRun(row)); }, updateWorkflowRun( id: string, data: Partial, ): Promise { const existing = db .prepare('SELECT * FROM workflow_runs WHERE id = ?') .get(id) as RunRow | undefined; if (!existing) throw new Error(`WorkflowRun not found: ${id}`); const now = new Date().toISOString(); const sets: string[] = ['updated_at = ?']; const values: unknown[] = [now]; if (data.status !== undefined) { sets.push('status = ?'); values.push(data.status); } if (data.output !== undefined) { sets.push('output = ?'); values.push(JSON.stringify(data.output)); } if (data.error !== undefined) { sets.push('error = ?'); values.push(data.error); } if (data.workflowPath !== undefined) { sets.push('workflow_path = ?'); values.push(data.workflowPath); } if (data.workflowName !== undefined) { sets.push('workflow_name = ?'); values.push(data.workflowName); } if (data.trigger !== undefined) { sets.push('trigger = ?'); values.push(data.trigger); } if (data.input !== undefined) { sets.push('input = ?'); values.push(JSON.stringify(data.input)); } values.push(id); db.prepare( `UPDATE workflow_runs SET ${sets.join(', ')} WHERE id = ?`, ).run(...values); const updated = db .prepare('SELECT * FROM workflow_runs WHERE id = ?') .get(id) as RunRow; return Promise.resolve(rowToRun(updated)); }, failWorkflowRun(id: string, error: string): Promise { return store.updateWorkflowRun(id, { status: 'failed', error, } as Partial); }, getWorkflowRunStatus(id: string): Promise { const row = db .prepare('SELECT status FROM workflow_runs WHERE id = ?') .get(id) as { status: string } | undefined; if (!row) return Promise.resolve(null); return Promise.resolve(row.status as WorkflowRunStatus); }, // -- Events -------------------------------------------------------------- createWorkflowEvent( event: Omit, ): Promise { const id = nanoid(); const now = new Date().toISOString(); const dataJson = JSON.stringify(event.data); db.prepare( `INSERT INTO workflow_events (id, run_id, node_id, type, data, created_at) VALUES (?, ?, ?, ?, ?, ?)`, ).run(id, event.runId, event.nodeId ?? null, event.type, dataJson, now); return Promise.resolve({ id, runId: event.runId, nodeId: event.nodeId, type: event.type, data: event.data, createdAt: new Date(now), }); }, getCompletedDagNodeOutputs( runId: string, ): Promise>> { const rows = db .prepare( `SELECT node_id, data FROM workflow_events WHERE run_id = ? AND type = 'node_complete' AND node_id IS NOT NULL`, ) .all(runId) as { node_id: string; data: string }[]; const outputs: Record> = {}; for (const row of rows) { const parsed = JSON.parse(row.data); if (parsed.output) { outputs[row.node_id] = parsed.output as Record; } } return Promise.resolve(outputs); }, // -- Active runs --------------------------------------------------------- getActiveWorkflowRunByPath( path: string, opts?: { excludeId?: string }, ): Promise { const statuses = ACTIVE_STATUSES; const placeholders = statuses.map(() => '?').join(', '); let query = `SELECT * FROM workflow_runs WHERE workflow_path = ? AND status IN (${placeholders})`; const params: unknown[] = [path, ...statuses]; if (opts?.excludeId) { query += ' AND id != ?'; params.push(opts.excludeId); } query += ' LIMIT 1'; const row = db.prepare(query).get(...params) as RunRow | undefined; if (!row) return Promise.resolve(null); return Promise.resolve(rowToRun(row)); }, // -- Codebase ------------------------------------------------------------ getCodebase(_id: string): Promise | null> { return Promise.resolve(null); }, getCodebaseEnvVars(_id: string): Promise> { return Promise.resolve({}); }, // -- Resumption ---------------------------------------------------------- resumeWorkflowRun(id: string): Promise { return store.updateWorkflowRun(id, { status: 'running', } as Partial); }, }; return store; }