feat: phase 3-5 — workflow engine, background subagents, multi-modal, cache shape, inline diff

Phase 3: Dynamic Workflow Engine
- VM sandbox (node:vm) with agent/parallel/pipeline API, Claude Code compatible
- Workflow file discovery (.boocode/workflows/*.js + ~/.boocode/workflows/*.js)
- Workflow manager with session/chat creation and inference dispatch
- Built-in catalog: deep-research, review-code, find-issues
- Resumability cache: SHA-256 hash of agent spec, in-memory Map

Phase 4: Background Subagents
- background-task.ts service: spawn/poll/cancel lifecycle
- spawn_subagent, subagent_status, subagent_result tools in ALL_TOOLS

Phase 5: Multi-modal + Cache Shape
- Multi-modal stub with type defs and hook point in payload.ts
- CacheShapeBadge component in trace viewer (colored bar + %)
This commit is contained in:
2026-06-08 03:11:39 +00:00
parent 591d373534
commit f22da55734
23 changed files with 2938 additions and 33 deletions

View File

@@ -0,0 +1,659 @@
// v2.8.0: WorkflowManager — ties discovery, sandbox, and inference dispatch
// together into a single orchestrator for multi-agent workflow scripts.
//
// Creates isolated sessions+chats for each agent() call within a workflow,
// dispatches inference via the existing pipeline, polls for completion, and
// returns structured results. All failures are returned as errors rather than
// thrown exceptions (catch-safe API).
import { randomUUID } from 'node:crypto';
import type { Sql } from '../../db.js';
import type { Config } from '../../config.js';
import type { FastifyBaseLogger } from 'fastify';
import type { Broker } from '../broker.js';
import type { UserStreamFrame } from '../../types/api.js';
import type {
WorkflowRun,
WorkflowRunStatus,
WorkflowContext,
WorkflowEvent,
AgentTaskSpec,
AgentTaskResult,
WorkflowScriptMeta,
} from './types.js';
import { discoverWorkflows, findWorkflow, isBuiltinWorkflow } from './discovery.js';
import { getBuiltinWorkflow } from './catalog.js';
import { cacheKey, getCachedResult, setCachedResult } from './resumability.js';
import {
executeWorkflowScript,
executeWorkflowScriptFromCode,
isEsmSyntax,
transformEsmToCjs,
} from './sandbox.js';
import { runInference } from '../inference/index.js';
import { readFileSync } from 'node:fs';
import vm from 'node:vm';
/**
* Maximum time to wait for a single agent task to complete (5 minutes).
* Beyond this, the task is treated as failed/timed out.
*/
const AGENT_TASK_TIMEOUT_MS = 300_000;
/**
* Polling interval when waiting for an agent task to finish.
*/
const POLL_INTERVAL_MS = 500;
/**
* Maximum time for the entire workflow run (30 minutes).
*/
const WORKFLOW_TIMEOUT_MS = 1_800_000;
/**
* Token budget tracker. Tracks total token spend across agent calls.
*/
class BudgetTracker {
total: number | null;
#spent = 0;
constructor(total: number | null) {
this.total = total;
}
spend(amount: number): void {
this.#spent += amount;
}
spent(): number {
return this.#spent;
}
remaining(): number {
if (this.total === null) return Infinity;
return Math.max(0, this.total - this.#spent);
}
}
/**
* Creates a no-op bounded publish function that avoids WS dependency
* for background workflow agent tasks. Messages are still persisted to DB.
*/
function noopPublish(): void {
/* intentional no-op */
}
function noopPublishUser(): void {
/* intentional no-op */
}
/**
* Callback type for workflow lifecycle events.
*/
export type WorkflowEventHandler = (event: WorkflowEvent) => void;
/**
* WorkflowManager — the orchestrator for sandboxed multi-agent workflows.
*/
export class WorkflowManager {
/** Active workflow runs by run ID. */
readonly #runs = new Map<string, WorkflowRunState>();
/** Registered event listeners. */
readonly #listeners = new Set<WorkflowEventHandler>();
constructor(
private sql: Sql,
private config: Config,
private log: FastifyBaseLogger,
private projectRoot: string,
private projectId: string,
private broker: Broker,
) {}
// ---- public API ----
/**
* Discover all available workflow scripts.
*/
listWorkflows(): WorkflowMetaInfo[] {
return discoverWorkflows(this.projectRoot).map((m) => ({
name: m.name,
sourceFile: m.sourceFile,
}));
}
/**
* Find a specific workflow by name.
*/
getWorkflow(name: string): WorkflowMetaInfo | undefined {
const found = findWorkflow(name, this.projectRoot);
if (!found) return undefined;
return { name: found.name, sourceFile: found.sourceFile };
}
/**
* Load the metadata (name, description, phases) from a workflow file
* without executing it.
*
* @param name - Workflow name.
* @returns The script's meta, or undefined if not found.
*/
async loadWorkflowMeta(name: string): Promise<WorkflowScriptMeta | undefined> {
const found = findWorkflow(name, this.projectRoot);
if (!found) return undefined;
// Built-in workflows: return meta directly from the catalog
if (isBuiltinWorkflow(found)) {
const builtin = getBuiltinWorkflow(name);
if (!builtin) return { name, description: '' };
return {
name: builtin.name,
description: builtin.description,
phases: builtin.phases,
};
}
try {
// Load meta by executing the script in a throwaway context
const context = this.#createMinimalContext('meta-loader');
const code = readFileSync(found.sourceFile, 'utf8');
const finalCode = isEsmSyntax(code) ? transformEsmToCjs(code) : code;
const sandboxData: Record<string, unknown> & {
module: { exports: Record<string, unknown> };
} = {
...context,
console: { log: () => {} },
module: { exports: {} },
exports: {},
};
vm.createContext(sandboxData as unknown as vm.Context);
new vm.Script(finalCode).runInContext(sandboxData as unknown as vm.Context, {
timeout: 10_000,
filename: found.sourceFile,
});
const meta = sandboxData.module.exports.meta as WorkflowScriptMeta | undefined;
return meta ?? { name, description: '' };
} catch {
return { name, description: '' };
}
}
/**
* Execute a workflow by name.
*
* @param name - The workflow name (without .js extension).
* @param args - Optional arguments to pass to the workflow function.
* @returns The run ID for tracking.
*/
async runWorkflow(
name: string,
args?: Record<string, unknown>,
): Promise<{ runId: string }> {
const found = findWorkflow(name, this.projectRoot);
if (!found) {
throw new Error(`Workflow not found: "${name}". ` +
`Check .boocode/workflows/ or ~/.boocode/workflows/ for a ${name}.js file.`);
}
const runId = randomUUID();
const startedAt = new Date().toISOString();
const state: WorkflowRunState = {
id: runId,
name,
status: 'running',
startedAt,
abortController: new AbortController(),
};
this.#runs.set(runId, state);
this.#emit({ type: 'run_started', runId, name });
// Run asynchronously — caller receives the runId immediately.
void this.#executeRun(state, found.sourceFile, args ?? {});
return { runId };
}
/**
* Get the current status of a workflow run.
*/
getRunStatus(runId: string): WorkflowRun | undefined {
const state = this.#runs.get(runId);
if (!state) return undefined;
return {
id: state.id,
name: state.name,
status: state.status,
started_at: state.startedAt,
finished_at: state.finishedAt,
error: state.error,
};
}
/**
* Cancel a running workflow. Best-effort — agent tasks in-flight will be
* aborted via AbortSignal.
*
* @param runId - The workflow run ID.
* @returns true if the workflow was found and cancelled.
*/
cancelRun(runId: string): boolean {
const state = this.#runs.get(runId);
if (!state || state.status !== 'running') return false;
state.status = 'cancelled';
state.finishedAt = new Date().toISOString();
state.abortController.abort();
this.#emit({ type: 'run_cancelled', runId, name: state.name });
return true;
}
/**
* Subscribe to workflow lifecycle events.
* Returns an unsubscribe function.
*/
onEvent(handler: WorkflowEventHandler): () => void {
this.#listeners.add(handler);
return () => {
this.#listeners.delete(handler);
};
}
// ---- internal execution ----
/**
* Execute the workflow script in the sandbox.
*/
async #executeRun(
state: WorkflowRunState,
sourceFile: string,
args: Record<string, unknown>,
): Promise<void> {
const BULTIN_MARKER = 'builtin:';
const budgetTracker = new BudgetTracker(null); // no fixed total yet
const runId = state.id;
try {
const context: WorkflowContext = {
agent: (prompt, opts) =>
this.#handleAgentCall(runId, prompt, opts ?? { prompt }, state.abortController.signal),
parallel: (thunks) =>
Promise.all(thunks.map((t) => t())),
pipeline: async (items, ...stages) => {
let result = [...items];
for (const stage of stages) {
result = await Promise.all(result.map(stage));
}
return result;
},
phase: (title) => {
this.#emit({ type: 'phase', runId, title });
},
log: (message) => {
this.#emit({ type: 'log', runId, message });
},
budget: {
total: budgetTracker.total,
spent: () => budgetTracker.spent(),
remaining: () => budgetTracker.remaining(),
},
args,
workflow: (nestedName, nestedArgs) =>
this.#handleNestedWorkflow(runId, nestedName, nestedArgs ?? {}, state.abortController.signal),
};
let result: unknown;
if (sourceFile.startsWith(BULTIN_MARKER)) {
// Built-in workflow: generate script from catalog and execute
const workflowName = sourceFile.slice(BULTIN_MARKER.length);
const builtin = getBuiltinWorkflow(workflowName);
if (!builtin) {
throw new Error(`Built-in workflow "${workflowName}" not found in catalog`);
}
const scriptCode = builtin.generateScript(args);
result = await executeWorkflowScriptFromCode(scriptCode, context, args, sourceFile);
} else {
result = await executeWorkflowScript(sourceFile, context, args);
}
// Only update to completed if we haven't been cancelled mid-flight.
if (state.status !== 'cancelled') {
state.status = 'completed';
state.finishedAt = new Date().toISOString();
}
// Store result
state.result = result;
this.#emit({ type: 'run_completed', runId, name: state.name });
} catch (err) {
if (state.status === 'cancelled') return; // already handled
const message = err instanceof Error ? err.message : String(err);
state.status = 'failed';
state.finishedAt = new Date().toISOString();
state.error = message;
this.#emit({ type: 'run_failed', runId, name: state.name, error: message });
}
}
/**
* Handle an `agent()` call from within a workflow.
* Creates a session + chat, dispatches inference, polls for completion.
*/
async #handleAgentCall(
runId: string,
prompt: string,
spec: AgentTaskSpec,
signal: AbortSignal,
): Promise<unknown> {
const label = spec.label ?? `agent-${prompt.slice(0, 40).replace(/\s+/g, '_')}`;
this.#emit({ type: 'agent_task_started', runId, label });
try {
const result = await this.executeAgentTask(prompt, spec, signal);
this.#emit({ type: 'agent_task_completed', runId, label });
return result;
} catch (err) {
this.#emit({ type: 'agent_task_completed', runId, label });
const message = err instanceof Error ? err.message : String(err);
return {
ok: false,
output: null,
error: message,
} satisfies AgentTaskResult;
}
}
/**
* Core agent task execution: create session/chat, dispatch inference, poll.
*
* Exported as a public method for testing.
*/
async executeAgentTask(
prompt: string,
spec: AgentTaskSpec,
signal?: AbortSignal,
): Promise<unknown> {
// ---- 0. Check resumability cache before creating a new task ----
const cacheKeyStr = cacheKey(spec, '');
const cached = getCachedResult(cacheKeyStr);
if (cached) {
return { ...cached, cached: true } satisfies AgentTaskResult;
}
const model = spec.model ?? null;
// ---- 1. Create a session for this agent task ----
const sessionName = `workflow-agent-${spec.label ?? 'task'}`;
const sessionResult = await this.sql.begin(async (tx) => {
const [session] = await tx<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model)
VALUES (${this.projectId}, ${sessionName}, ${model ?? 'qwen3.6-35b-a3b-mxfp4'})
RETURNING id
`;
if (!session) throw new Error('Failed to create workflow agent session');
return session;
});
const sessionId = sessionResult.id;
// ---- 2. Create a chat in this session ----
const chatResult = await this.sql.begin(async (tx) => {
const [chat] = await tx<{ id: string }[]>`
INSERT INTO chats (session_id, name)
VALUES (${sessionId}, ${spec.label ?? null})
RETURNING id
`;
if (!chat) throw new Error('Failed to create workflow agent chat');
return chat;
});
const chatId = chatResult.id;
// ---- 3. Insert user message + streaming assistant message ----
const { userMessageId, assistantMessageId } = await this.sql.begin(async (tx) => {
const [userMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${prompt}, 'complete', clock_timestamp())
RETURNING id
`;
const [assistantMsg] = await tx<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
return {
userMessageId: userMsg!.id,
assistantMessageId: assistantMsg!.id,
};
});
// ---- 4. Dispatch inference ----
// Create a bounded InferenceContext that won't crash on missing WS
const ctx: import('../inference/types.js').InferenceContext = {
sql: this.sql,
config: this.config,
log: this.log,
publish: noopPublish as unknown as import('../inference/types.js').FramePublisher,
publishUser: noopPublishUser as unknown as (frame: UserStreamFrame) => void,
broker: this.broker,
};
// Create a merged signal (workflow cancellation + optional caller signal)
const mergedController = new AbortController();
const onAbort = () => mergedController.abort();
signal?.addEventListener('abort', onAbort, { once: true });
const inferencePromise = runInference(
ctx,
sessionId,
chatId,
assistantMessageId,
mergedController.signal,
).finally(() => {
signal?.removeEventListener('abort', onAbort);
});
// ---- 5. Poll for completion ----
try {
const result = await this.#pollForCompletion(
chatId,
assistantMessageId,
inferencePromise,
mergedController.signal,
);
// Cache successful results for resumability
if (typeof result === 'object' && result !== null && (result as Record<string, unknown>).ok === true) {
setCachedResult(cacheKeyStr, {
ok: true,
output: (result as Record<string, unknown>).output,
token_usage: (result as Record<string, unknown>).token_usage as
| { prompt: number; completion: number }
| undefined,
});
}
return result;
} catch (err) {
if ((err as Error)?.message === 'cancelled') {
return { ok: false, output: null, error: 'Task was cancelled' } satisfies AgentTaskResult;
}
return {
ok: false,
output: null,
error: err instanceof Error ? err.message : String(err),
} satisfies AgentTaskResult;
}
}
/**
* Poll the messages table until the assistant message status changes
* from 'streaming' to 'complete' / 'failed' / 'cancelled'.
*/
async #pollForCompletion(
chatId: string,
assistantMessageId: string,
inferencePromise: Promise<void>,
signal: AbortSignal,
): Promise<unknown> {
// Wait for either inference to finish or timeout
const timeout = new Promise<never>((_, reject) => {
const timer = setTimeout(() => {
reject(new Error(`Agent task timed out after ${AGENT_TASK_TIMEOUT_MS}ms`));
}, AGENT_TASK_TIMEOUT_MS);
signal.addEventListener('abort', () => {
clearTimeout(timer);
reject(new Error('cancelled'));
}, { once: true });
});
// Poll loop — runs until inference completes, timeout, or cancellation
const pollLoop = (async () => {
// eslint-disable-next-line no-constant-condition
while (true) {
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
const rows = await this.sql<{
status: string;
content: string;
tool_calls: unknown;
tokens_used: number | null;
}[]>`
SELECT m.status, m.content, m.role,
(SELECT jsonb_agg(p.payload ORDER BY p.sequence)
FROM message_parts p
WHERE p.message_id = m.id AND p.kind = 'tool_call' AND p.hidden_at IS NULL) AS tool_calls,
m.tokens_used
FROM messages m
WHERE m.id = ${assistantMessageId}
`;
const msg = rows[0];
if (!msg) {
throw new Error(`Assistant message ${assistantMessageId} not found`);
}
if (msg.status === 'complete') {
return {
ok: true,
output: msg.content,
token_usage: msg.tokens_used ? { prompt: 0, completion: msg.tokens_used } : undefined,
};
}
if (msg.status === 'failed' || msg.status === 'cancelled') {
return {
ok: false,
output: msg.content || null,
error: `Assistant message ended with status: ${msg.status}`,
};
}
// Still streaming — continue polling
}
})();
// Race: polling vs timeout vs inference error vs cancellation
try {
return await Promise.race([pollLoop, timeout]);
} finally {
// Ensure inference is settled (but don't block on it)
inferencePromise.catch(() => {});
}
}
/**
* Handle a nested `workflow()` call from within a workflow.
* Runs the named workflow with the given args and returns its result.
*/
async #handleNestedWorkflow(
parentRunId: string,
name: string,
args: Record<string, unknown>,
signal: AbortSignal,
): Promise<unknown> {
const found = findWorkflow(name, this.projectRoot);
if (!found) {
return { ok: false, output: null, error: `Nested workflow not found: "${name}"` };
}
const nestedRunId = randomUUID();
const startedAt = new Date().toISOString();
const nestedState: WorkflowRunState = {
id: nestedRunId,
name,
status: 'running',
startedAt,
abortController: new AbortController(),
};
this.#runs.set(nestedRunId, nestedState);
this.#emit({ type: 'run_started', runId: nestedRunId, name });
// Link parent cancellation to nested
signal.addEventListener('abort', () => {
nestedState.abortController.abort();
}, { once: true });
await this.#executeRun(nestedState, found.sourceFile, args);
if (nestedState.status === 'cancelled') {
return { ok: false, output: null, error: 'Nested workflow cancelled' };
}
if (nestedState.status === 'failed') {
return { ok: false, output: null, error: nestedState.error };
}
return { ok: true, output: nestedState.result };
}
/**
* Create a minimal WorkflowContext for non-execution purposes
* (e.g. loading meta).
*/
#createMinimalContext(runId: string): Record<string, unknown> {
return {
agent: () => Promise.reject(new Error('Not available in this context')),
parallel: () => Promise.reject(new Error('Not available in this context')),
pipeline: () => Promise.reject(new Error('Not available in this context')),
phase: () => {},
log: () => {},
budget: { total: null, spent: () => 0, remaining: () => Infinity },
args: {},
workflow: () => Promise.reject(new Error('Not available in this context')),
};
}
/**
* Emit a workflow event to all registered listeners.
*/
#emit(event: WorkflowEvent): void {
for (const handler of this.#listeners) {
try {
handler(event);
} catch {
// Swallow listener errors — one bad listener shouldn't break others
}
}
}
}
// ---- internal types ----
/**
* Metadata returned from listWorkflows / getWorkflow.
*/
export interface WorkflowMetaInfo {
name: string;
sourceFile: string;
}
/**
* Internal mutable state for an active workflow run.
*/
interface WorkflowRunState {
id: string;
name: string;
status: WorkflowRunStatus;
startedAt: string;
finishedAt?: string;
error?: string;
result?: unknown;
abortController: AbortController;
}