chore: add ion package, codesight wiki, work plans, ascli config
New @boocode/ion package (v0.0.1) for inference optimization network. .codesight/ wiki artifacts for codebase documentation. .omo/ work plans for openspec cleanup and enhanced file panel.
This commit is contained in:
249
packages/ion/src/store/fs-store.ts
Normal file
249
packages/ion/src/store/fs-store.ts
Normal file
@@ -0,0 +1,249 @@
|
||||
/**
|
||||
* Filesystem-backed workflow store.
|
||||
*
|
||||
* Stores each run as `{basePath}/{runId}/run.json` and
|
||||
* `{basePath}/{runId}/events.jsonl`. Thread-safe writes use atomic
|
||||
* rename (write to temp file, then rename).
|
||||
*/
|
||||
|
||||
import { mkdir, writeFile, readFile, readdir, rename, unlink } from 'node:fs/promises';
|
||||
import { existsSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import type {
|
||||
IWorkflowStore,
|
||||
WorkflowRun,
|
||||
WorkflowEvent,
|
||||
WorkflowRunStatus,
|
||||
CreateWorkflowRunData,
|
||||
} from '../engine/deps.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const ACTIVE_STATUSES: WorkflowRunStatus[] = ['pending', 'running'];
|
||||
|
||||
function parseRun(raw: string): WorkflowRun {
|
||||
const obj = JSON.parse(raw);
|
||||
return {
|
||||
...obj,
|
||||
createdAt: new Date(obj.createdAt),
|
||||
updatedAt: new Date(obj.updatedAt),
|
||||
};
|
||||
}
|
||||
|
||||
function serializeRun(run: WorkflowRun): string {
|
||||
return JSON.stringify(
|
||||
{
|
||||
...run,
|
||||
createdAt: run.createdAt.toISOString(),
|
||||
updatedAt: run.updatedAt.toISOString(),
|
||||
},
|
||||
null,
|
||||
2,
|
||||
);
|
||||
}
|
||||
|
||||
function parseEvent(line: string): WorkflowEvent {
|
||||
const obj = JSON.parse(line);
|
||||
return {
|
||||
...obj,
|
||||
createdAt: new Date(obj.createdAt),
|
||||
};
|
||||
}
|
||||
|
||||
function serializeEvent(event: WorkflowEvent): string {
|
||||
return JSON.stringify({
|
||||
...event,
|
||||
createdAt: event.createdAt.toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Atomic write helper
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function atomicWrite(filePath: string, data: string): Promise<void> {
|
||||
const tmp = `${filePath}.${nanoid(8)}.tmp`;
|
||||
await writeFile(tmp, data, 'utf-8');
|
||||
await rename(tmp, filePath);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Factory
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export function createFsStore(basePath: string): IWorkflowStore {
|
||||
// Ensure base directory exists on first write — no side effects at import.
|
||||
|
||||
const store: IWorkflowStore = {
|
||||
// -- Run lifecycle -------------------------------------------------------
|
||||
|
||||
async createWorkflowRun(data: CreateWorkflowRunData): Promise<WorkflowRun> {
|
||||
const id = nanoid();
|
||||
const now = new Date();
|
||||
const run: WorkflowRun = {
|
||||
id,
|
||||
workflowPath: data.workflowPath,
|
||||
workflowName: data.workflowName,
|
||||
status: 'pending',
|
||||
trigger: data.trigger,
|
||||
input: data.input,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
};
|
||||
|
||||
const runDir = join(basePath, id);
|
||||
await mkdir(runDir, { recursive: true });
|
||||
await atomicWrite(join(runDir, 'run.json'), serializeRun(run));
|
||||
// Create empty events file
|
||||
await atomicWrite(join(runDir, 'events.jsonl'), '');
|
||||
|
||||
return run;
|
||||
},
|
||||
|
||||
async getWorkflowRun(id: string): Promise<WorkflowRun | null> {
|
||||
const filePath = join(basePath, id, 'run.json');
|
||||
if (!existsSync(filePath)) return null;
|
||||
const raw = await readFile(filePath, 'utf-8');
|
||||
return parseRun(raw);
|
||||
},
|
||||
|
||||
async updateWorkflowRun(
|
||||
id: string,
|
||||
data: Partial<WorkflowRun>,
|
||||
): Promise<WorkflowRun> {
|
||||
const existing = await store.getWorkflowRun(id);
|
||||
if (!existing) throw new Error(`WorkflowRun not found: ${id}`);
|
||||
|
||||
const updated: WorkflowRun = {
|
||||
...existing,
|
||||
...data,
|
||||
id: existing.id, // id is immutable
|
||||
updatedAt: new Date(),
|
||||
};
|
||||
|
||||
await atomicWrite(
|
||||
join(basePath, id, 'run.json'),
|
||||
serializeRun(updated),
|
||||
);
|
||||
return updated;
|
||||
},
|
||||
|
||||
async failWorkflowRun(id: string, error: string): Promise<WorkflowRun> {
|
||||
return store.updateWorkflowRun(id, {
|
||||
status: 'failed',
|
||||
error,
|
||||
} as Partial<WorkflowRun>);
|
||||
},
|
||||
|
||||
async getWorkflowRunStatus(
|
||||
id: string,
|
||||
): Promise<WorkflowRunStatus | null> {
|
||||
const run = await store.getWorkflowRun(id);
|
||||
return run?.status ?? null;
|
||||
},
|
||||
|
||||
// -- Events --------------------------------------------------------------
|
||||
|
||||
async createWorkflowEvent(
|
||||
event: Omit<WorkflowEvent, 'id' | 'createdAt'>,
|
||||
): Promise<WorkflowEvent> {
|
||||
const full: WorkflowEvent = {
|
||||
...event,
|
||||
id: nanoid(),
|
||||
createdAt: new Date(),
|
||||
};
|
||||
|
||||
const eventsPath = join(basePath, event.runId, 'events.jsonl');
|
||||
// Ensure directory exists
|
||||
if (!existsSync(join(basePath, event.runId))) {
|
||||
await mkdir(join(basePath, event.runId), { recursive: true });
|
||||
}
|
||||
|
||||
const line = serializeEvent(full) + '\n';
|
||||
// Append — not atomic for appends, but each line is self-contained
|
||||
const existing = existsSync(eventsPath)
|
||||
? await readFile(eventsPath, 'utf-8').catch(() => '')
|
||||
: '';
|
||||
await atomicWrite(eventsPath, existing + line);
|
||||
|
||||
return full;
|
||||
},
|
||||
|
||||
async getCompletedDagNodeOutputs(
|
||||
runId: string,
|
||||
): Promise<Record<string, Record<string, unknown>>> {
|
||||
const eventsPath = join(basePath, runId, 'events.jsonl');
|
||||
if (!existsSync(eventsPath)) return {};
|
||||
|
||||
const raw = await readFile(eventsPath, 'utf-8');
|
||||
const lines = raw.split('\n').filter(Boolean);
|
||||
|
||||
const outputs: Record<string, Record<string, unknown>> = {};
|
||||
for (const line of lines) {
|
||||
const event = parseEvent(line);
|
||||
if (event.type === 'node_complete' && event.nodeId && event.data?.output) {
|
||||
outputs[event.nodeId] = event.data.output as Record<string, unknown>;
|
||||
}
|
||||
}
|
||||
|
||||
return outputs;
|
||||
},
|
||||
|
||||
// -- Active runs ---------------------------------------------------------
|
||||
|
||||
async getActiveWorkflowRunByPath(
|
||||
path: string,
|
||||
opts?: { excludeId?: string },
|
||||
): Promise<WorkflowRun | null> {
|
||||
if (!existsSync(basePath)) return null;
|
||||
|
||||
const entries = await readdir(basePath, { withFileTypes: true });
|
||||
for (const entry of entries) {
|
||||
if (!entry.isDirectory()) continue;
|
||||
const runPath = join(basePath, entry.name, 'run.json');
|
||||
if (!existsSync(runPath)) continue;
|
||||
|
||||
const raw = await readFile(runPath, 'utf-8');
|
||||
const run = parseRun(raw);
|
||||
|
||||
if (run.workflowPath !== path) continue;
|
||||
if (!ACTIVE_STATUSES.includes(run.status)) continue;
|
||||
if (opts?.excludeId && run.id === opts.excludeId) continue;
|
||||
|
||||
return run;
|
||||
}
|
||||
|
||||
return null;
|
||||
},
|
||||
|
||||
// -- Codebase ------------------------------------------------------------
|
||||
|
||||
async getCodebase(
|
||||
_id: string,
|
||||
): Promise<Record<string, unknown> | null> {
|
||||
// Filesystem store does not persist codebase records
|
||||
return null;
|
||||
},
|
||||
|
||||
async getCodebaseEnvVars(
|
||||
_id: string,
|
||||
): Promise<Record<string, string>> {
|
||||
// Filesystem store does not persist codebase env vars
|
||||
return {};
|
||||
},
|
||||
|
||||
// -- Resumption ----------------------------------------------------------
|
||||
|
||||
async resumeWorkflowRun(id: string): Promise<WorkflowRun> {
|
||||
return store.updateWorkflowRun(id, {
|
||||
status: 'running',
|
||||
} as Partial<WorkflowRun>);
|
||||
},
|
||||
};
|
||||
|
||||
return store;
|
||||
}
|
||||
22
packages/ion/src/store/index.ts
Normal file
22
packages/ion/src/store/index.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Store module — persistence backends for the Ion workflow engine.
|
||||
*
|
||||
* Provides three store implementations:
|
||||
* - `createFsStore` — filesystem-backed (JSON files + JSONL events)
|
||||
* - `createSqliteStore` — SQLite-backed (better-sqlite3, optional dep)
|
||||
* - `createPostgresStore` — Postgres-backed (postgres.js, optional dep)
|
||||
*
|
||||
* All implement the `IWorkflowStore` interface from `../engine/deps.js`.
|
||||
*/
|
||||
|
||||
export { createFsStore } from './fs-store.js';
|
||||
export { createSqliteStore } from './sqlite-store.js';
|
||||
export { createPostgresStore } from './pg-store.js';
|
||||
|
||||
export type {
|
||||
IWorkflowStore,
|
||||
WorkflowRun,
|
||||
WorkflowEvent,
|
||||
WorkflowRunStatus,
|
||||
CreateWorkflowRunData,
|
||||
} from './types.js';
|
||||
46
packages/ion/src/store/optional-deps.d.ts
vendored
Normal file
46
packages/ion/src/store/optional-deps.d.ts
vendored
Normal file
@@ -0,0 +1,46 @@
|
||||
/**
|
||||
* Type declarations for optional store dependencies.
|
||||
*
|
||||
* These modules are optional — they may not be installed.
|
||||
* We declare just enough types here for the store implementations to compile
|
||||
* without requiring the actual packages.
|
||||
*/
|
||||
|
||||
declare module 'better-sqlite3' {
|
||||
interface RunResult {
|
||||
changes: number;
|
||||
lastInsertRowid: number | bigint;
|
||||
}
|
||||
|
||||
interface Statement {
|
||||
run(...params: unknown[]): RunResult;
|
||||
get(...params: unknown[]): unknown;
|
||||
all(...params: unknown[]): unknown[];
|
||||
}
|
||||
|
||||
class Database {
|
||||
constructor(filename: string, options?: unknown);
|
||||
prepare(sql: string): Statement;
|
||||
exec(sql: string): Database;
|
||||
pragma(pragma: string): unknown;
|
||||
close(): void;
|
||||
}
|
||||
|
||||
export default Database;
|
||||
}
|
||||
|
||||
declare module 'postgres' {
|
||||
interface Sql {
|
||||
<T = unknown>(
|
||||
strings: TemplateStringsArray,
|
||||
...values: unknown[]
|
||||
): Promise<T[]>;
|
||||
unsafe(sql: string, params?: unknown[]): Promise<unknown[]>;
|
||||
json(data: unknown): unknown;
|
||||
end(): Promise<void>;
|
||||
}
|
||||
|
||||
function postgres(connectionString: string): Sql;
|
||||
|
||||
export default postgres;
|
||||
}
|
||||
309
packages/ion/src/store/pg-store.ts
Normal file
309
packages/ion/src/store/pg-store.ts
Normal file
@@ -0,0 +1,309 @@
|
||||
/**
|
||||
* Postgres-backed workflow store.
|
||||
*
|
||||
* Uses postgres.js for async, connection-pooled persistence.
|
||||
* The dependency is optional — a helpful error is thrown if not installed.
|
||||
*/
|
||||
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import type {
|
||||
IWorkflowStore,
|
||||
WorkflowRun,
|
||||
WorkflowEvent,
|
||||
WorkflowRunStatus,
|
||||
CreateWorkflowRunData,
|
||||
} from '../engine/deps.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Optional dependency loading
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function loadPostgres(): Promise<typeof import('postgres')> {
|
||||
try {
|
||||
return await import('postgres');
|
||||
} catch {
|
||||
throw new Error(
|
||||
'postgres is not installed. Install it with: npm install postgres',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Schema
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const SCHEMA_SQL = `
|
||||
CREATE TABLE IF NOT EXISTS workflow_runs (
|
||||
id TEXT PRIMARY KEY,
|
||||
workflow_path TEXT NOT NULL,
|
||||
workflow_name TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
trigger TEXT NOT NULL,
|
||||
input JSONB NOT NULL DEFAULT '{}',
|
||||
output JSONB,
|
||||
error TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS workflow_events (
|
||||
id TEXT PRIMARY KEY,
|
||||
run_id TEXT NOT NULL REFERENCES workflow_runs(id),
|
||||
node_id TEXT,
|
||||
type TEXT NOT NULL,
|
||||
data JSONB NOT NULL DEFAULT '{}',
|
||||
created_at TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_workflow_runs_path_status
|
||||
ON workflow_runs(workflow_path, status);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_workflow_events_run_id
|
||||
ON workflow_events(run_id);
|
||||
`;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Row mappers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
interface RunRow {
|
||||
id: string;
|
||||
workflow_path: string;
|
||||
workflow_name: string;
|
||||
status: string;
|
||||
trigger: string;
|
||||
input: unknown;
|
||||
output: unknown;
|
||||
error: string | null;
|
||||
created_at: Date | string;
|
||||
updated_at: Date | string;
|
||||
}
|
||||
|
||||
interface EventRow {
|
||||
id: string;
|
||||
run_id: string;
|
||||
node_id: string | null;
|
||||
type: string;
|
||||
data: unknown;
|
||||
created_at: Date | string;
|
||||
}
|
||||
|
||||
function rowToRun(row: RunRow): WorkflowRun {
|
||||
return {
|
||||
id: row.id,
|
||||
workflowPath: row.workflow_path,
|
||||
workflowName: row.workflow_name,
|
||||
status: row.status as WorkflowRunStatus,
|
||||
trigger: row.trigger,
|
||||
input: (row.input ?? {}) as Record<string, unknown>,
|
||||
output: row.output
|
||||
? (row.output as Record<string, unknown>)
|
||||
: undefined,
|
||||
error: row.error ?? undefined,
|
||||
createdAt: new Date(row.created_at),
|
||||
updatedAt: new Date(row.updated_at),
|
||||
};
|
||||
}
|
||||
|
||||
function rowToEvent(row: EventRow): WorkflowEvent {
|
||||
return {
|
||||
id: row.id,
|
||||
runId: row.run_id,
|
||||
nodeId: row.node_id ?? undefined,
|
||||
type: row.type,
|
||||
data: (row.data ?? {}) as Record<string, unknown>,
|
||||
createdAt: new Date(row.created_at),
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Factory
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export async function createPostgresStore(
|
||||
connectionString: string,
|
||||
): Promise<IWorkflowStore> {
|
||||
const mod = await loadPostgres();
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const sql: any = mod.default
|
||||
? mod.default(connectionString)
|
||||
: (mod as any)(connectionString);
|
||||
|
||||
// Initialize schema
|
||||
await sql.unsafe(SCHEMA_SQL);
|
||||
|
||||
const ACTIVE_STATUSES: WorkflowRunStatus[] = ['pending', 'running'];
|
||||
|
||||
const store: IWorkflowStore = {
|
||||
// -- Run lifecycle -------------------------------------------------------
|
||||
|
||||
async createWorkflowRun(data: CreateWorkflowRunData): Promise<WorkflowRun> {
|
||||
const id = nanoid();
|
||||
const now = new Date();
|
||||
|
||||
const rows = await sql`
|
||||
INSERT INTO workflow_runs (id, workflow_path, workflow_name, status, trigger, input, created_at, updated_at)
|
||||
VALUES (${id}, ${data.workflowPath}, ${data.workflowName}, 'pending', ${data.trigger}, ${sql.json(data.input)}, ${now.toISOString()}, ${now.toISOString()})
|
||||
RETURNING *
|
||||
`;
|
||||
|
||||
return rowToRun(rows[0] as RunRow);
|
||||
},
|
||||
|
||||
async getWorkflowRun(id: string): Promise<WorkflowRun | null> {
|
||||
const rows = await sql`
|
||||
SELECT * FROM workflow_runs WHERE id = ${id}
|
||||
`;
|
||||
if (rows.length === 0) return null;
|
||||
return rowToRun(rows[0] as RunRow);
|
||||
},
|
||||
|
||||
async updateWorkflowRun(
|
||||
id: string,
|
||||
data: Partial<WorkflowRun>,
|
||||
): Promise<WorkflowRun> {
|
||||
const existing = await store.getWorkflowRun(id);
|
||||
if (!existing) throw new Error(`WorkflowRun not found: ${id}`);
|
||||
|
||||
const now = new Date().toISOString();
|
||||
const sets: string[] = ['updated_at = $1'];
|
||||
const values: unknown[] = [now];
|
||||
let paramIdx = 2;
|
||||
|
||||
if (data.status !== undefined) {
|
||||
sets.push(`status = $${paramIdx++}`);
|
||||
values.push(data.status);
|
||||
}
|
||||
if (data.output !== undefined) {
|
||||
sets.push(`output = $${paramIdx++}`);
|
||||
values.push(JSON.stringify(data.output));
|
||||
}
|
||||
if (data.error !== undefined) {
|
||||
sets.push(`error = $${paramIdx++}`);
|
||||
values.push(data.error);
|
||||
}
|
||||
if (data.workflowPath !== undefined) {
|
||||
sets.push(`workflow_path = $${paramIdx++}`);
|
||||
values.push(data.workflowPath);
|
||||
}
|
||||
if (data.workflowName !== undefined) {
|
||||
sets.push(`workflow_name = $${paramIdx++}`);
|
||||
values.push(data.workflowName);
|
||||
}
|
||||
if (data.trigger !== undefined) {
|
||||
sets.push(`trigger = $${paramIdx++}`);
|
||||
values.push(data.trigger);
|
||||
}
|
||||
if (data.input !== undefined) {
|
||||
sets.push(`input = $${paramIdx++}`);
|
||||
values.push(JSON.stringify(data.input));
|
||||
}
|
||||
|
||||
values.push(id);
|
||||
const query = `UPDATE workflow_runs SET ${sets.join(', ')} WHERE id = $${paramIdx}`;
|
||||
|
||||
await sql.unsafe(query, values);
|
||||
|
||||
const updated = await store.getWorkflowRun(id);
|
||||
return updated!;
|
||||
},
|
||||
|
||||
async failWorkflowRun(id: string, error: string): Promise<WorkflowRun> {
|
||||
return store.updateWorkflowRun(id, {
|
||||
status: 'failed',
|
||||
error,
|
||||
} as Partial<WorkflowRun>);
|
||||
},
|
||||
|
||||
async getWorkflowRunStatus(
|
||||
id: string,
|
||||
): Promise<WorkflowRunStatus | null> {
|
||||
const rows = await sql`
|
||||
SELECT status FROM workflow_runs WHERE id = ${id}
|
||||
`;
|
||||
if (rows.length === 0) return null;
|
||||
return (rows[0] as { status: string }).status as WorkflowRunStatus;
|
||||
},
|
||||
|
||||
// -- Events --------------------------------------------------------------
|
||||
|
||||
async createWorkflowEvent(
|
||||
event: Omit<WorkflowEvent, 'id' | 'createdAt'>,
|
||||
): Promise<WorkflowEvent> {
|
||||
const id = nanoid();
|
||||
const now = new Date();
|
||||
|
||||
const rows = await sql`
|
||||
INSERT INTO workflow_events (id, run_id, node_id, type, data, created_at)
|
||||
VALUES (${id}, ${event.runId}, ${event.nodeId ?? null}, ${event.type}, ${sql.json(event.data)}, ${now.toISOString()})
|
||||
RETURNING *
|
||||
`;
|
||||
|
||||
return rowToEvent(rows[0] as EventRow);
|
||||
},
|
||||
|
||||
async getCompletedDagNodeOutputs(
|
||||
runId: string,
|
||||
): Promise<Record<string, Record<string, unknown>>> {
|
||||
const rows = await sql`
|
||||
SELECT node_id, data FROM workflow_events
|
||||
WHERE run_id = ${runId} AND type = 'node_complete' AND node_id IS NOT NULL
|
||||
`;
|
||||
|
||||
const outputs: Record<string, Record<string, unknown>> = {};
|
||||
for (const row of rows) {
|
||||
const r = row as { node_id: string; data: unknown };
|
||||
const parsed = (r.data ?? {}) as Record<string, unknown>;
|
||||
if (parsed.output) {
|
||||
outputs[r.node_id] = parsed.output as Record<string, unknown>;
|
||||
}
|
||||
}
|
||||
|
||||
return outputs;
|
||||
},
|
||||
|
||||
// -- Active runs ---------------------------------------------------------
|
||||
|
||||
async getActiveWorkflowRunByPath(
|
||||
path: string,
|
||||
opts?: { excludeId?: string },
|
||||
): Promise<WorkflowRun | null> {
|
||||
const excludeId = opts?.excludeId;
|
||||
const rows = await sql`
|
||||
SELECT * FROM workflow_runs
|
||||
WHERE workflow_path = ${path}
|
||||
AND status IN ${sql(ACTIVE_STATUSES)}
|
||||
${excludeId ? sql`AND id != ${excludeId}` : sql``}
|
||||
LIMIT 1
|
||||
`;
|
||||
|
||||
if (rows.length === 0) return null;
|
||||
return rowToRun(rows[0] as RunRow);
|
||||
},
|
||||
|
||||
// -- Codebase ------------------------------------------------------------
|
||||
|
||||
async getCodebase(
|
||||
_id: string,
|
||||
): Promise<Record<string, unknown> | null> {
|
||||
return null;
|
||||
},
|
||||
|
||||
async getCodebaseEnvVars(
|
||||
_id: string,
|
||||
): Promise<Record<string, string>> {
|
||||
return {};
|
||||
},
|
||||
|
||||
// -- Resumption ----------------------------------------------------------
|
||||
|
||||
async resumeWorkflowRun(id: string): Promise<WorkflowRun> {
|
||||
return store.updateWorkflowRun(id, {
|
||||
status: 'running',
|
||||
} as Partial<WorkflowRun>);
|
||||
},
|
||||
};
|
||||
|
||||
return store;
|
||||
}
|
||||
336
packages/ion/src/store/sqlite-store.ts
Normal file
336
packages/ion/src/store/sqlite-store.ts
Normal file
@@ -0,0 +1,336 @@
|
||||
/**
|
||||
* SQLite-backed workflow store.
|
||||
*
|
||||
* Uses better-sqlite3 for synchronous, file-based persistence.
|
||||
* The dependency is optional — a helpful error is thrown if not installed.
|
||||
*/
|
||||
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import type {
|
||||
IWorkflowStore,
|
||||
WorkflowRun,
|
||||
WorkflowEvent,
|
||||
WorkflowRunStatus,
|
||||
CreateWorkflowRunData,
|
||||
} from '../engine/deps.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Optional dependency loading
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function loadBetterSqlite3(): Promise<typeof import('better-sqlite3')> {
|
||||
try {
|
||||
return await import('better-sqlite3');
|
||||
} catch {
|
||||
throw new Error(
|
||||
'better-sqlite3 is not installed. Install it with: npm install better-sqlite3',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Schema
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const SCHEMA_SQL = `
|
||||
CREATE TABLE IF NOT EXISTS workflow_runs (
|
||||
id TEXT PRIMARY KEY,
|
||||
workflow_path TEXT NOT NULL,
|
||||
workflow_name TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
trigger TEXT NOT NULL,
|
||||
input TEXT NOT NULL DEFAULT '{}',
|
||||
output TEXT,
|
||||
error TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS workflow_events (
|
||||
id TEXT PRIMARY KEY,
|
||||
run_id TEXT NOT NULL,
|
||||
node_id TEXT,
|
||||
type TEXT NOT NULL,
|
||||
data TEXT NOT NULL DEFAULT '{}',
|
||||
created_at TEXT NOT NULL,
|
||||
FOREIGN KEY (run_id) REFERENCES workflow_runs(id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_workflow_runs_path_status
|
||||
ON workflow_runs(workflow_path, status);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_workflow_events_run_id
|
||||
ON workflow_events(run_id);
|
||||
`;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Row mappers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
interface RunRow {
|
||||
id: string;
|
||||
workflow_path: string;
|
||||
workflow_name: string;
|
||||
status: string;
|
||||
trigger: string;
|
||||
input: string;
|
||||
output: string | null;
|
||||
error: string | null;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
}
|
||||
|
||||
interface EventRow {
|
||||
id: string;
|
||||
run_id: string;
|
||||
node_id: string | null;
|
||||
type: string;
|
||||
data: string;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
function rowToRun(row: RunRow): WorkflowRun {
|
||||
return {
|
||||
id: row.id,
|
||||
workflowPath: row.workflow_path,
|
||||
workflowName: row.workflow_name,
|
||||
status: row.status as WorkflowRunStatus,
|
||||
trigger: row.trigger,
|
||||
input: JSON.parse(row.input),
|
||||
output: row.output ? JSON.parse(row.output) : undefined,
|
||||
error: row.error ?? undefined,
|
||||
createdAt: new Date(row.created_at),
|
||||
updatedAt: new Date(row.updated_at),
|
||||
};
|
||||
}
|
||||
|
||||
function rowToEvent(row: EventRow): WorkflowEvent {
|
||||
return {
|
||||
id: row.id,
|
||||
runId: row.run_id,
|
||||
nodeId: row.node_id ?? undefined,
|
||||
type: row.type,
|
||||
data: JSON.parse(row.data),
|
||||
createdAt: new Date(row.created_at),
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Factory
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export async function createSqliteStore(
|
||||
dbPath: string,
|
||||
): Promise<IWorkflowStore> {
|
||||
const mod = await loadBetterSqlite3();
|
||||
const DatabaseCtor = mod.default ?? mod;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const db: any = new DatabaseCtor(dbPath);
|
||||
|
||||
// Enable WAL mode for better concurrent read performance
|
||||
db.pragma('journal_mode = WAL');
|
||||
|
||||
// Initialize schema
|
||||
db.exec(SCHEMA_SQL);
|
||||
|
||||
const ACTIVE_STATUSES: WorkflowRunStatus[] = ['pending', 'running'];
|
||||
|
||||
const store: IWorkflowStore = {
|
||||
// -- Run lifecycle -------------------------------------------------------
|
||||
|
||||
createWorkflowRun(data: CreateWorkflowRunData): Promise<WorkflowRun> {
|
||||
const id = nanoid();
|
||||
const now = new Date().toISOString();
|
||||
const inputJson = JSON.stringify(data.input);
|
||||
|
||||
db.prepare(
|
||||
`INSERT INTO workflow_runs (id, workflow_path, workflow_name, status, trigger, input, created_at, updated_at)
|
||||
VALUES (?, ?, ?, 'pending', ?, ?, ?, ?)`,
|
||||
).run(
|
||||
id,
|
||||
data.workflowPath,
|
||||
data.workflowName,
|
||||
data.trigger,
|
||||
inputJson,
|
||||
now,
|
||||
now,
|
||||
);
|
||||
|
||||
return Promise.resolve({
|
||||
id,
|
||||
workflowPath: data.workflowPath,
|
||||
workflowName: data.workflowName,
|
||||
status: 'pending',
|
||||
trigger: data.trigger,
|
||||
input: data.input,
|
||||
createdAt: new Date(now),
|
||||
updatedAt: new Date(now),
|
||||
});
|
||||
},
|
||||
|
||||
getWorkflowRun(id: string): Promise<WorkflowRun | null> {
|
||||
const row = db
|
||||
.prepare('SELECT * FROM workflow_runs WHERE id = ?')
|
||||
.get(id) as RunRow | undefined;
|
||||
if (!row) return Promise.resolve(null);
|
||||
return Promise.resolve(rowToRun(row));
|
||||
},
|
||||
|
||||
updateWorkflowRun(
|
||||
id: string,
|
||||
data: Partial<WorkflowRun>,
|
||||
): Promise<WorkflowRun> {
|
||||
const existing = db
|
||||
.prepare('SELECT * FROM workflow_runs WHERE id = ?')
|
||||
.get(id) as RunRow | undefined;
|
||||
if (!existing) throw new Error(`WorkflowRun not found: ${id}`);
|
||||
|
||||
const now = new Date().toISOString();
|
||||
const sets: string[] = ['updated_at = ?'];
|
||||
const values: unknown[] = [now];
|
||||
|
||||
if (data.status !== undefined) {
|
||||
sets.push('status = ?');
|
||||
values.push(data.status);
|
||||
}
|
||||
if (data.output !== undefined) {
|
||||
sets.push('output = ?');
|
||||
values.push(JSON.stringify(data.output));
|
||||
}
|
||||
if (data.error !== undefined) {
|
||||
sets.push('error = ?');
|
||||
values.push(data.error);
|
||||
}
|
||||
if (data.workflowPath !== undefined) {
|
||||
sets.push('workflow_path = ?');
|
||||
values.push(data.workflowPath);
|
||||
}
|
||||
if (data.workflowName !== undefined) {
|
||||
sets.push('workflow_name = ?');
|
||||
values.push(data.workflowName);
|
||||
}
|
||||
if (data.trigger !== undefined) {
|
||||
sets.push('trigger = ?');
|
||||
values.push(data.trigger);
|
||||
}
|
||||
if (data.input !== undefined) {
|
||||
sets.push('input = ?');
|
||||
values.push(JSON.stringify(data.input));
|
||||
}
|
||||
|
||||
values.push(id);
|
||||
db.prepare(
|
||||
`UPDATE workflow_runs SET ${sets.join(', ')} WHERE id = ?`,
|
||||
).run(...values);
|
||||
|
||||
const updated = db
|
||||
.prepare('SELECT * FROM workflow_runs WHERE id = ?')
|
||||
.get(id) as RunRow;
|
||||
return Promise.resolve(rowToRun(updated));
|
||||
},
|
||||
|
||||
failWorkflowRun(id: string, error: string): Promise<WorkflowRun> {
|
||||
return store.updateWorkflowRun(id, {
|
||||
status: 'failed',
|
||||
error,
|
||||
} as Partial<WorkflowRun>);
|
||||
},
|
||||
|
||||
getWorkflowRunStatus(id: string): Promise<WorkflowRunStatus | null> {
|
||||
const row = db
|
||||
.prepare('SELECT status FROM workflow_runs WHERE id = ?')
|
||||
.get(id) as { status: string } | undefined;
|
||||
if (!row) return Promise.resolve(null);
|
||||
return Promise.resolve(row.status as WorkflowRunStatus);
|
||||
},
|
||||
|
||||
// -- Events --------------------------------------------------------------
|
||||
|
||||
createWorkflowEvent(
|
||||
event: Omit<WorkflowEvent, 'id' | 'createdAt'>,
|
||||
): Promise<WorkflowEvent> {
|
||||
const id = nanoid();
|
||||
const now = new Date().toISOString();
|
||||
const dataJson = JSON.stringify(event.data);
|
||||
|
||||
db.prepare(
|
||||
`INSERT INTO workflow_events (id, run_id, node_id, type, data, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)`,
|
||||
).run(id, event.runId, event.nodeId ?? null, event.type, dataJson, now);
|
||||
|
||||
return Promise.resolve({
|
||||
id,
|
||||
runId: event.runId,
|
||||
nodeId: event.nodeId,
|
||||
type: event.type,
|
||||
data: event.data,
|
||||
createdAt: new Date(now),
|
||||
});
|
||||
},
|
||||
|
||||
getCompletedDagNodeOutputs(
|
||||
runId: string,
|
||||
): Promise<Record<string, Record<string, unknown>>> {
|
||||
const rows = db
|
||||
.prepare(
|
||||
`SELECT node_id, data FROM workflow_events
|
||||
WHERE run_id = ? AND type = 'node_complete' AND node_id IS NOT NULL`,
|
||||
)
|
||||
.all(runId) as { node_id: string; data: string }[];
|
||||
|
||||
const outputs: Record<string, Record<string, unknown>> = {};
|
||||
for (const row of rows) {
|
||||
const parsed = JSON.parse(row.data);
|
||||
if (parsed.output) {
|
||||
outputs[row.node_id] = parsed.output as Record<string, unknown>;
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.resolve(outputs);
|
||||
},
|
||||
|
||||
// -- Active runs ---------------------------------------------------------
|
||||
|
||||
getActiveWorkflowRunByPath(
|
||||
path: string,
|
||||
opts?: { excludeId?: string },
|
||||
): Promise<WorkflowRun | null> {
|
||||
const statuses = ACTIVE_STATUSES;
|
||||
const placeholders = statuses.map(() => '?').join(', ');
|
||||
let query = `SELECT * FROM workflow_runs WHERE workflow_path = ? AND status IN (${placeholders})`;
|
||||
const params: unknown[] = [path, ...statuses];
|
||||
|
||||
if (opts?.excludeId) {
|
||||
query += ' AND id != ?';
|
||||
params.push(opts.excludeId);
|
||||
}
|
||||
|
||||
query += ' LIMIT 1';
|
||||
|
||||
const row = db.prepare(query).get(...params) as RunRow | undefined;
|
||||
if (!row) return Promise.resolve(null);
|
||||
return Promise.resolve(rowToRun(row));
|
||||
},
|
||||
|
||||
// -- Codebase ------------------------------------------------------------
|
||||
|
||||
getCodebase(_id: string): Promise<Record<string, unknown> | null> {
|
||||
return Promise.resolve(null);
|
||||
},
|
||||
|
||||
getCodebaseEnvVars(_id: string): Promise<Record<string, string>> {
|
||||
return Promise.resolve({});
|
||||
},
|
||||
|
||||
// -- Resumption ----------------------------------------------------------
|
||||
|
||||
resumeWorkflowRun(id: string): Promise<WorkflowRun> {
|
||||
return store.updateWorkflowRun(id, {
|
||||
status: 'running',
|
||||
} as Partial<WorkflowRun>);
|
||||
},
|
||||
};
|
||||
|
||||
return store;
|
||||
}
|
||||
14
packages/ion/src/store/types.ts
Normal file
14
packages/ion/src/store/types.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
/**
|
||||
* Store type re-exports.
|
||||
*
|
||||
* Re-exports the workflow store types from the engine dependency interfaces
|
||||
* so consumers can import them from the store module directly.
|
||||
*/
|
||||
|
||||
export type {
|
||||
IWorkflowStore,
|
||||
WorkflowRun,
|
||||
WorkflowEvent,
|
||||
WorkflowRunStatus,
|
||||
CreateWorkflowRunData,
|
||||
} from '../engine/deps.js';
|
||||
Reference in New Issue
Block a user