diff --git a/.omo/drafts/workflow-engine-design.md b/.omo/drafts/workflow-engine-design.md new file mode 100644 index 0000000..a829e06 --- /dev/null +++ b/.omo/drafts/workflow-engine-design.md @@ -0,0 +1,55 @@ +# Dynamic Workflow Engine — Design + +## Architecture + +``` +User writes workflow JS file: +.boocode/workflows/my-flow.js + +Workflow Runtime (apps/server) + ├── isolated-vm sandbox (or node:vm) + ├── API surface: agent(), parallel(), pipeline(), phase(), budget() + ├── Tool bridge → BooCode's existing tool set + ├── Workflow manager (concurrency, lifecycle) + ├── Resumability cache (SHA-256 of agent spec) + └── Catalog (built-in workflows: deep-research, review-code) + +Workflow execution: + 1. User triggers workflow (slash command or Orchestrator panel) + 2. File discovery finds .boocode/workflows/.js + 3. Sandbox compiles and executes the script + 4. agent() calls go through tool bridge → existing inference pipeline + 5. parallel() spawns concurrent agent calls (max 3 default) + 6. Results stream via existing WS frames + 7. Completed agents cached by hash for resume + +API Surface (Claude Code compatible): + agent(prompt, { label?, schema?, model?, capabilities?, max_tool_calls? }) + parallel([() => agent(...), () => agent(...)]) + pipeline(items, ...stages) + phase(title) + log(message) + budget.total / budget.spent() / budget.remaining() + args + workflow(name, args?) — one level of nesting +``` + +## Implementation Plan + +### Phase 1: Core Runtime (this session) +- Sandbox using Node's `vm` module (no extra deps) +- `agent()` function that creates a task and waits for completion +- Workflow file discovery +- Basic workflow manager + +### Phase 2: Advanced Primitives +- `parallel()` with concurrency limits +- `pipeline()` streaming +- `budget()` token tracking +- Workflow resumability cache + +### Phase 3: UI + Polish +- Integration with Orchestrator panel +- Built-in workflow catalog +- Workflow editor +- Error recovery diff --git a/apps/coder/src/services/pending_changes.ts b/apps/coder/src/services/pending_changes.ts index cf1d914..9e6144d 100644 --- a/apps/coder/src/services/pending_changes.ts +++ b/apps/coder/src/services/pending_changes.ts @@ -4,6 +4,8 @@ import { randomBytes } from 'node:crypto'; import type { Sql } from '../db.js'; import { resolveWritePath } from './write_guard.js'; import { locateMatch } from './fuzzy-match.js'; +import { conflictIndex } from './conflict-index.js'; +import { findConflicts } from './collision-detector.js'; /** * Write a file atomically: stage to a sibling temp file, then rename over the @@ -170,6 +172,10 @@ export async function queueEdit( VALUES (${sessionId}, ${taskId}, ${resolved}, 'edit', ${diff}, ${agent}) RETURNING * `; + + // Register in the conflict index so concurrent worktrees see this edit. + conflictIndex.registerChange(resolved, sessionId, agent ?? 'unknown'); + return row!; } @@ -216,6 +222,9 @@ export async function queueCreate( VALUES (${sessionId}, ${taskId}, ${resolved}, 'create', ${content}, ${agent}) RETURNING * `; + + conflictIndex.registerChange(resolved, sessionId, agent ?? 'unknown'); + return row!; } @@ -238,6 +247,9 @@ export async function queueDelete( VALUES (${sessionId}, ${taskId}, ${resolved}, 'delete', '', ${agent}) RETURNING * `; + + conflictIndex.registerChange(resolved, sessionId, agent ?? 'unknown'); + return row!; } @@ -260,6 +272,23 @@ export async function applyOne( // Re-validate path in case projectRoot has shifted resolveWritePath(projectRoot, change.file_path); + // Advisory collision check: log a warning if another worktree has pending + // edits to this file. Does NOT block the write — same non-blocking pattern + // as the edit guards (validateEditResult, checkDroppedImports). + { + const conflicts = conflictIndex.query( + [change.file_path], + change.session_id, // sessionId doubles as worktree identifier + new Map(), + ); + for (const v of conflicts) { + console.log( + `[collision] ${v.filePath} — conflict with worktrees [${v.worktrees.join(', ')}] ` + + `agents [${v.agents.join(', ')}] severity=${v.severity}`, + ); + } + } + switch (change.operation) { case 'create': { await mkdir(dirname(change.file_path), { recursive: true }); diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 86797e0..e614199 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -21,10 +21,11 @@ import { registerSkillsRoutes } from './routes/skills.js'; import { registerTraceRoutes } from './routes/traces.js'; import { registerToolsRoutes } from './routes/tools.js'; import { registerAnalyticsRoutes } from './routes/analytics.js'; -import { registerMemoryRoutes } from './routes/memory.js'; + import { registerInferenceSettingsRoutes } from './routes/inference-settings.js'; -import { createInferenceRunner } from './services/inference/index.js'; +import { createInferenceRunner, runInferenceWithModel } from './services/inference/index.js'; import { createBroker } from './services/broker.js'; +import { setBackgroundInferenceEnqueuer } from './services/background-task.js'; import { listSkills } from './services/skills.js'; import * as compaction from './services/compaction.js'; import { configureModelContext } from './services/model-context.js'; @@ -125,11 +126,37 @@ async function main() { registerModelRoutes(app, config); registerAgentRoutes(app, sql); registerSidebarRoutes(app, sql); - registerChatRoutes(app, sql, broker); + registerChatRoutes(app, sql, broker, config, { + enqueueCompare: (sessionId, chatId, assistantMessageId, modelOverride, compareGroupId) => { + // Reuse the inference runner's context pattern for compare mode. + // Each compare run gets its own AbortController; cancellation keyed by + // chatId (cancels ALL parallel runs in that compare group). + const compareCtx: import('./services/inference/types.js').InferenceContext = { + sql, + config, + log: app.log, + publish: (sid, frame) => { + broker.publishFrame(sid, frame as unknown as import('@boocode/contracts/ws-frames').WsFrame); + }, + publishUser: (frame) => { + broker.publishUserFrame('default', frame as unknown as import('@boocode/contracts/ws-frames').WsFrame); + }, + broker, + hooks: hasHooks ? hookRunner : undefined, + }; + compareCtx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'streaming', at: new Date().toISOString() }); + void runInferenceWithModel(compareCtx, sessionId, chatId, assistantMessageId, modelOverride, compareGroupId).catch( + (err: Error) => app.log.error({ err, chatId, modelOverride }, 'compare inference failed'), + ); + }, + cancelInference: async (_sessionId, chatId) => { + return inference.cancel(_sessionId, chatId); + }, + hasActiveInference: (chatId) => inference.hasActive(chatId), + }); registerTraceRoutes(app, sql); registerToolsRoutes(app, sql); registerAnalyticsRoutes(app, sql); - registerMemoryRoutes(app, sql); registerInferenceSettingsRoutes(app); // Batch 9.6: warm the skills cache at boot and surface the count. Empty or @@ -167,6 +194,13 @@ async function main() { broker.publishUserFrame(user, frame as unknown as import('@boocode/contracts/ws-frames').WsFrame); } ); + // v2.x: wire the background subagent task system to the inference runner. + // Tools (spawn_subagent) dispatch fire-and-forget inference via this + // module-level reference — no import cycle through the tool registry. + setBackgroundInferenceEnqueuer((sessionId, chatId, assistantId, user) => { + inference.enqueue(sessionId, chatId, assistantId, user); + }); + registerMessageRoutes(app, sql, config, broker, { enqueueInference: (sessionId, chatId, assistantId, user) => { inference.enqueue(sessionId, chatId, assistantId, user); diff --git a/apps/server/src/services/background-task.ts b/apps/server/src/services/background-task.ts new file mode 100644 index 0000000..2732ab3 --- /dev/null +++ b/apps/server/src/services/background-task.ts @@ -0,0 +1,260 @@ +// v2.x: Background subagent task service. +// Creates and tracks background tasks that run as independent inference +// sessions. The spawner creates a session+chat, inserts messages, and +// dispatches inference asynchronously. Callers poll status and retrieve +// results via the companion tools (background-subagent-tools.ts). +// +// Module-level inference enqueuer: set at server startup so tools can +// dispatch background inference without importing the runner directly. + +import type { Sql } from '../db.js'; +import type { FastifyBaseLogger } from 'fastify'; + +export interface BackgroundTask { + id: string; + session_id: string; + chat_id: string; + agent: string | null; + model: string; + input: string; + status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'; + output_summary: string | null; + created_at: string; + finished_at: string | null; +} + +// Module-level reference to the inference enqueuer, set at server startup. +let _enqueueInference: + | ((sessionId: string, chatId: string, assistantMessageId: string, user: string) => void) + | null = null; + +export function setBackgroundInferenceEnqueuer( + enqueue: ( + sessionId: string, + chatId: string, + assistantMessageId: string, + user: string, + ) => void, +): void { + _enqueueInference = enqueue; +} + +function mapTaskState(state: string): BackgroundTask['status'] { + switch (state) { + case 'pending': + return 'pending'; + case 'running': + return 'running'; + case 'completed': + return 'completed'; + case 'failed': + return 'failed'; + case 'blocked': + return 'pending'; // blocked is internal — surface as pending + case 'cancelled': + return 'cancelled'; + default: + return 'pending'; + } +} + +// Spawn a background subagent task: create session + chat + messages + tasks +// row, then fire-and-forget the inference. Returns immediately with the task +// metadata — inference runs asynchronously. +export async function spawnBackgroundTask( + sql: Sql, + log: FastifyBaseLogger, + projectId: string, + input: string, + model: string, + agent?: string, + label?: string, +): Promise { + const sessionName = + label != null && label.length > 0 + ? `Subagent: ${label}` + : `Background: ${input.slice(0, 50)}${input.length > 50 ? '...' : ''}`; + + const result = await sql.begin(async (tx) => { + // 1. Create session for the background task + const [sess] = await tx<{ id: string }[]>` + INSERT INTO sessions (project_id, name, model, system_prompt) + VALUES (${projectId}, ${sessionName}, ${model}, '') + RETURNING id + `; + const sessionId = sess!.id; + + // 2. Create chat in that session + const [ch] = await tx<{ id: string }[]>` + INSERT INTO chats (session_id, name, status) + VALUES (${sessionId}, ${label ?? null}, 'open') + RETURNING id + `; + const chatId = ch!.id; + + // 3. Insert user message with the task input + await tx` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'user', ${input}, 'complete', clock_timestamp()) + `; + + // 4. Insert streaming assistant message (inference fills it) + const [assistantRow] = await tx<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) + RETURNING id + `; + const assistantMessageId = assistantRow!.id; + + // 5. Insert tasks row for tracking + const [task] = await tx<{ id: string; created_at: string }[]>` + INSERT INTO tasks (project_id, session_id, state, input, agent, model) + VALUES (${projectId}, ${sessionId}, 'running', ${input}, ${agent ?? null}, ${model}) + RETURNING id, created_at + `; + + return { sessionId, chatId, assistantMessageId, task: task! }; + }); + + // After the transaction commits, fire-and-forget inference dispatch. + if (_enqueueInference) { + try { + _enqueueInference(result.sessionId, result.chatId, result.assistantMessageId, 'default'); + } catch (err) { + log.warn( + { err, taskId: result.task.id }, + 'background inference enqueue failed', + ); + } + } + + log.info( + { + taskId: result.task.id, + sessionId: result.sessionId, + chatId: result.chatId, + model, + agent, + }, + 'spawned background subagent task', + ); + + return { + id: result.task.id, + session_id: result.sessionId, + chat_id: result.chatId, + agent: agent ?? null, + model, + input, + status: 'running', + output_summary: null, + created_at: result.task.created_at, + finished_at: null, + }; +} + +// Look up a background task by its tasks.id. Includes the status from the +// tasks table and the chat_id from the linked chat. +export async function getBackgroundTaskStatus( + sql: Sql, + taskId: string, +): Promise { + const rows = await sql< + { + id: string; + session_id: string; + state: string; + input: string; + agent: string | null; + model: string | null; + output_summary: string | null; + created_at: string; + ended_at: string | null; + }[] + >` + SELECT id, session_id, state, input, agent, model, output_summary, created_at, ended_at + FROM tasks + WHERE id = ${taskId} + `; + if (rows.length === 0) return null; + const r = rows[0]!; + + // Find the chat_id from the session (background sessions have exactly one chat). + const chatRows = await sql<{ id: string }[]>` + SELECT id FROM chats WHERE session_id = ${r.session_id} LIMIT 2 + `; + + return { + id: r.id, + session_id: r.session_id, + chat_id: chatRows[0]?.id ?? '', + agent: r.agent, + model: r.model ?? '', + input: r.input, + status: mapTaskState(r.state), + output_summary: r.output_summary, + created_at: r.created_at, + finished_at: r.ended_at, + }; +} + +// Retrieve the full output and token usage from a completed background task. +// Returns null if the task has no completed assistant message. +export async function getBackgroundTaskResult( + sql: Sql, + taskId: string, + chatId: string, +): Promise<{ + output: string; + token_usage: { prompt: number; completion: number } | null; +} | null> { + // Verify the task exists and chatId belongs to it. + const taskRows = await sql<{ session_id: string }[]>` + SELECT session_id FROM tasks WHERE id = ${taskId} + `; + if (taskRows.length === 0) return null; + + // Read the last complete assistant message (the one with content). + const msgRows = await sql< + { + content: string; + tokens_used: number | null; + ctx_used: number | null; + }[] + >` + SELECT content, tokens_used, ctx_used + FROM messages + WHERE chat_id = ${chatId} + AND role = 'assistant' + AND status = 'complete' + AND content <> '' + ORDER BY created_at DESC + LIMIT 1 + `; + if (msgRows.length === 0) return null; + + const m = msgRows[0]!; + return { + output: m.content, + token_usage: + m.tokens_used != null || m.ctx_used != null + ? { prompt: m.ctx_used ?? 0, completion: m.tokens_used ?? 0 } + : null, + }; +} + +// Cancel a pending or running background task. Returns true if a row was +// actually updated (the task existed and was in a cancellable state). +export async function cancelBackgroundTask( + sql: Sql, + taskId: string, +): Promise { + const rows = await sql<{ id: string }[]>` + UPDATE tasks + SET state = 'cancelled', ended_at = clock_timestamp() + WHERE id = ${taskId} + AND state IN ('pending', 'running') + RETURNING id + `; + return rows.length > 0; +} diff --git a/apps/server/src/services/inference/compute-diff.ts b/apps/server/src/services/inference/compute-diff.ts new file mode 100644 index 0000000..b84b6f2 --- /dev/null +++ b/apps/server/src/services/inference/compute-diff.ts @@ -0,0 +1,132 @@ +/** + * Compact unified-diff generator for write-tool results. + * + * Produces a minimal unified diff string (---/+++ header + +/- lines) from + * old/new text pairs so the frontend can render an inline diff snippet + * without pulling in a full diff library. + */ + +// Write-tool names that can produce file diffs. +export const WRITE_TOOL_NAMES = new Set([ + 'edit_file', + 'create_file', + 'delete_file', + 'apply_pending', +]); + +/** + * Compute a compact unified diff from old → new text. + * + * @param oldStr The original text (empty for creates) + * @param newStr The replacement text (empty for deletes) + * @param filePath Display path for the file header + * @returns A unified-diff string, or empty string if old === new + */ +export function computeDiff(oldStr: string, newStr: string, filePath: string): string { + if (oldStr === newStr) return ''; + + const oldLines = oldStr.split('\n'); + const newLines = newStr.split('\n'); + + // For empty old → new file (create), show all lines as additions + if (oldStr.length === 0 && newStr.length > 0) { + const header = `--- /dev/null\n+++ b/${filePath}\n`; + const body = newLines.map((line) => `+${line}`).join('\n'); + return header + body; + } + + // For old → empty (delete), show all lines as removals + if (newStr.length === 0 && oldStr.length > 0) { + const header = `--- a/${filePath}\n+++ /dev/null\n`; + const body = oldLines.map((line) => `-${line}`).join('\n'); + return header + body; + } + + // Simple line-by-line diff for edit: collect changed lines with context. + // Uses a straightforward algorithm: find the first differing line and the + // last differing line, then output the block with +/- markers. + const header = `--- a/${filePath}\n+++ b/${filePath}\n`; + + const maxLen = Math.max(oldLines.length, newLines.length); + let firstDiff = -1; + let lastDiff = -1; + + for (let i = 0; i < maxLen; i++) { + const a = i < oldLines.length ? oldLines[i] : undefined; + const b = i < newLines.length ? newLines[i] : undefined; + if (a !== b) { + if (firstDiff === -1) firstDiff = i; + lastDiff = i; + } + } + + if (firstDiff === -1) return ''; + + // Add context lines around the changed block (up to 2 lines each side) + const contextBefore = 2; + const contextAfter = 2; + const start = Math.max(0, firstDiff - contextBefore); + const end = Math.min(maxLen - 1, lastDiff + contextAfter); + + // Build the unified diff hunk + const hunkLines: string[] = []; + const hunkOldStart = start + 1; // 1-indexed + const hunkNewStart = start + 1; + const hunkOldLen = end - start + 1; + const hunkNewLen = end - start + 1; + + for (let i = start; i <= end; i++) { + const oldLine = i < oldLines.length ? oldLines[i] : undefined; + const newLine = i < newLines.length ? newLines[i] : undefined; + + if (oldLine === newLine) { + hunkLines.push(` ${oldLine ?? ''}`); + } else { + if (oldLine !== undefined) { + hunkLines.push(`-${oldLine}`); + } + if (newLine !== undefined) { + hunkLines.push(`+${newLine}`); + } + } + } + + const hunkHeader = `@@ -${hunkOldStart},${hunkOldLen} +${hunkNewStart},${hunkNewLen} @@\n`; + return header + hunkHeader + hunkLines.join('\n'); +} + +/** + * Check whether a tool name corresponds to a file-modifying write tool + * that should produce a diff in its tool result. + */ +export function isWriteTool(name: string): boolean { + return WRITE_TOOL_NAMES.has(name); +} + +/** + * Extract a diff string from tool call args for write tools. + * Returns empty string if the tool doesn't produce diffs or args are missing. + */ +export function diffFromToolArgs(name: string, args: Record, filePath?: string): string { + switch (name) { + case 'edit_file': { + const oldStr = String(args.old_string ?? ''); + const newStr = String(args.new_string ?? ''); + const path = filePath ?? String(args.file_path ?? 'file'); + return computeDiff(oldStr, newStr, path); + } + case 'create_file': { + const content = String(args.content ?? ''); + const path = filePath ?? String(args.file_path ?? 'file'); + return computeDiff('', content, path); + } + case 'delete_file': + // No content available at queue time — actual content is read at apply time. + return ''; + case 'apply_pending': + // Meta-tool — individual changes produce their own diffs. + return ''; + default: + return ''; + } +} diff --git a/apps/server/src/services/inference/multi-modal.ts b/apps/server/src/services/inference/multi-modal.ts new file mode 100644 index 0000000..9b056ff --- /dev/null +++ b/apps/server/src/services/inference/multi-modal.ts @@ -0,0 +1,56 @@ +// vDeepSeek (stub): multi-modal (image) attachment support. +// +// When a message carries images, DeepSeek V4 models can process them +// natively via the @ai-sdk/deepseek provider. This module provides the +// helper types and functions to detect and convert image attachments. +// +// FULL INTEGRATION requires: +// 1. Storing image data alongside messages (message_parts with kind='image' +// or a dedicated attachments table with base64-encoded data). +// 2. Extending OpenAiMessage.content from `string | null` to +// `string | null | Array<{ type: 'text'; text: string } | { type: 'image'; image: string }>` +// in apps/server/src/services/inference/payload.ts. +// 3. Updating toModelMessages() in stream-phase-adapter.ts to emit AI SDK +// content arrays with image parts for multimodal user messages. +// +// None of the above is done yet — this file is a type scaffold. + +import type { Message } from '../../types/api.js'; + +/** Shape of a decoded image attachment ready for the AI SDK. */ +export interface ImageAttachment { + /** Base64-encoded image data (no data URI prefix — raw bytes). */ + data: string; + /** MIME type (e.g. 'image/png', 'image/jpeg', 'image/webp'). */ + mimeType: string; +} + +/** + * Check if a user message has image content that can be forwarded to a + * multimodal model. Currently a stub — always returns false until the + * message-pipeline stores image attachments addressably. + */ +export function hasImageAttachments(_message: Message): boolean { + // TODO(vDeepSeek): scan message_parts for kind='image' or inspect + // message.content for inline data URIs (data:image/...). + return false; +} + +/** + * Convert internal image attachments to the format expected by the AI SDK + * ModelMessage content array. + * + * The @ai-sdk/deepseek provider accepts images as: + * { type: 'image'; image: 'data:image/png;base64,...' } + * + * @param attachments — List of decoded image attachments. + * @returns AI SDK inline file parts suitable for ModelMessage.content. + */ +export function imageAttachmentsToParts( + attachments: ImageAttachment[], +): Array<{ type: 'image'; image: string }> { + return attachments.map((a) => ({ + type: 'image' as const, + image: `data:${a.mimeType};base64,${a.data}`, + })); +} diff --git a/apps/server/src/services/inference/tool-phase.ts b/apps/server/src/services/inference/tool-phase.ts index e17f8ba..a2a6134 100644 --- a/apps/server/src/services/inference/tool-phase.ts +++ b/apps/server/src/services/inference/tool-phase.ts @@ -19,6 +19,7 @@ import { formatUnknownToolError } from './tool-suggestions.js'; import { resolveGrantRoot } from '../grant_resolver.js'; import { stripToolMarkup } from './tool-call-parser.js'; import { repairToolInput } from './tool-input-repair.js'; +import { diffFromToolArgs, isWriteTool } from './compute-diff.js'; import type { FailureKind } from './mistake-tracker.js'; import { insertToolTrace, updateToolTrace } from '../tool-traces.js'; import type { @@ -445,6 +446,16 @@ export async function executeToolPhase( if (SYNTHESIS_TOOLS.has(tc.name)) { synthEntries.push({ tc, output: tres.output, ...(tres.error ? { error: tres.error } : {}) }); } + // v2.8: compute a compact unified diff for successful write-tool results. + // The diff is derived from tool call args (old_string/new_string for + // edit_file, content for create_file) and included in the WS frame so + // the frontend can render a DiffSnippet inline. Not persisted to message_parts + // (the args alone are enough to reproduce it on reload if needed). + const toolDiff = + !tres.error && tres.outcome === 'success' && isWriteTool(tc.name) + ? diffFromToolArgs(tc.name, tc.args as Record) + : undefined; + const stored = { tool_call_id: tc.id, output: tres.output, @@ -467,6 +478,7 @@ export async function executeToolPhase( output: tres.output, truncated: tres.truncated, ...(tres.error ? { error: tres.error } : {}), + ...(toolDiff ? { diff: toolDiff } : {}), }); }) ); diff --git a/apps/server/src/services/tools/background-subagent-tools.ts b/apps/server/src/services/tools/background-subagent-tools.ts new file mode 100644 index 0000000..0a2894c --- /dev/null +++ b/apps/server/src/services/tools/background-subagent-tools.ts @@ -0,0 +1,305 @@ +// v2.x: Background subagent tools. Three tools that let the model spawn +// non-blocking subagent tasks, poll their status, and retrieve results. +// +// spawn_subagent — Create a background session+chat, dispatch inference, +// return immediately with a task_id. +// subagent_status — Poll the status of a previously spawned task. +// subagent_result — Retrieve the full output of a completed task. +// +// These tools reuse the existing sessions/chats/messages/tables and the +// inference pipeline — no new tables or services needed. +// +// Registered in tools.ts ALL_TOOLS. Lives in its own file so tests can +// import executors without dragging in the full tool registry. +// +// Follows the read_tab_by_number.ts pattern: a pure executor function plus +// a ToolDef wrapper. Type-only import from tools.ts to dodge runtime cycles. + +import { z } from 'zod'; +import type { Sql } from '../../db.js'; +import type { ToolDef, ToolExecCtx } from '../tools.js'; +import { + spawnBackgroundTask, + getBackgroundTaskStatus, + getBackgroundTaskResult, +} from '../background-task.js'; + +// --------------------------------------------------------------------------- +// spawn_subagent +// --------------------------------------------------------------------------- + +export const SpawnSubagentInput = z.object({ + input: z.string().min(1).describe('The task to execute in the background'), + model: z + .string() + .min(1) + .optional() + .describe('Model to use (defaults to session model)'), + agent: z + .string() + .min(1) + .optional() + .describe('Agent to use (defaults to boocode)'), + label: z + .string() + .max(100) + .optional() + .describe('Human-readable label for display'), +}); + +export type SpawnSubagentInputT = z.infer; + +export async function executeSpawnSubagent( + input: SpawnSubagentInputT, + sql: Sql, + sessionId: string, +): Promise> { + // Resolve project_id + model from the current session. + const sessRows = await sql< + { project_id: string; model: string }[] + >` + SELECT project_id, model FROM sessions WHERE id = ${sessionId} + `; + if (sessRows.length === 0) { + return { error: 'current session not found' }; + } + const projectId = sessRows[0]!.project_id; + const model = input.model ?? sessRows[0]!.model; + + const task = await spawnBackgroundTask( + sql, + // We pass a minimal logger shim — the real logger is wired by the + // inference pipeline. This keeps the tool's execute signature clean. + { info: () => {}, warn: () => {}, error: () => {} } as unknown as import('fastify').FastifyBaseLogger, + projectId, + input.input, + model, + input.agent, + input.label, + ); + + // Elapsed time since creation is negligible (task was just spawned). + return { + task_id: task.id, + status: task.status, + session_id: task.session_id, + chat_id: task.chat_id, + created_at: task.created_at, + }; +} + +export const spawnSubagent: ToolDef = { + name: 'spawn_subagent', + description: + 'Spawn a background subagent task. Creates a new session and chat, dispatches inference asynchronously, and returns immediately with a task_id. Use subagent_status to poll for completion and subagent_result to retrieve the full output. Non-blocking — the model continues while the subagent works in the background.', + inputSchema: SpawnSubagentInput, + jsonSchema: { + type: 'function', + function: { + name: 'spawn_subagent', + description: + 'Spawn a background subagent task. Returns immediately with a task_id — poll with subagent_status.', + parameters: { + type: 'object', + properties: { + input: { + type: 'string', + description: 'The task to execute in the background', + }, + model: { + type: 'string', + description: 'Model to use (defaults to session model)', + }, + agent: { + type: 'string', + description: 'Agent to use (defaults to boocode)', + }, + label: { + type: 'string', + maxLength: 100, + description: 'Human-readable label for display', + }, + }, + required: ['input'], + additionalProperties: false, + }, + }, + }, + async execute(input, _projectRoot, _extraRoots, toolCtx?: ToolExecCtx) { + if (!toolCtx) { + return { error: 'spawn_subagent unavailable: no session context' }; + } + try { + return await executeSpawnSubagent(input, toolCtx.sql, toolCtx.sessionId); + } catch (err) { + return { + error: `spawn_subagent failed: ${err instanceof Error ? err.message : String(err)}`, + }; + } + }, +}; + +// --------------------------------------------------------------------------- +// subagent_status +// --------------------------------------------------------------------------- + +export const SubagentStatusInput = z.object({ + task_id: z.string().uuid().describe('Task ID from spawn_subagent'), +}); + +export type SubagentStatusInputT = z.infer; + +export async function executeSubagentStatus( + input: SubagentStatusInputT, + sql: Sql, +): Promise> { + const task = await getBackgroundTaskStatus(sql, input.task_id); + if (!task) { + return { error: 'task not found', task_id: input.task_id }; + } + + // Compute elapsed time from created_at (ISO string). + let elapsed_seconds: number | null = null; + try { + const created = new Date(task.created_at).getTime(); + const finished = task.finished_at + ? new Date(task.finished_at).getTime() + : Date.now(); + elapsed_seconds = Math.round((finished - created) / 1000); + } catch { + elapsed_seconds = null; + } + + return { + task_id: task.id, + status: task.status, + output_summary: task.output_summary, + finished_at: task.finished_at, + elapsed_seconds, + }; +} + +export const subagentStatus: ToolDef = { + name: 'subagent_status', + description: + 'Poll the status of a background subagent task by task_id. Returns the current status (running/completed/failed/cancelled), an output summary if completed, and elapsed time. Useful after spawn_subagent to check if work is done.', + inputSchema: SubagentStatusInput, + jsonSchema: { + type: 'function', + function: { + name: 'subagent_status', + description: + 'Poll the status of a background subagent task. Returns status, output summary, and elapsed time.', + parameters: { + type: 'object', + properties: { + task_id: { + type: 'string', + format: 'uuid', + description: 'Task ID from spawn_subagent', + }, + }, + required: ['task_id'], + additionalProperties: false, + }, + }, + }, + async execute(input, _projectRoot, _extraRoots, toolCtx?: ToolExecCtx) { + if (!toolCtx) { + return { error: 'subagent_status unavailable: no session context' }; + } + try { + return await executeSubagentStatus(input, toolCtx.sql); + } catch (err) { + return { + error: `subagent_status failed: ${err instanceof Error ? err.message : String(err)}`, + }; + } + }, +}; + +// --------------------------------------------------------------------------- +// subagent_result +// --------------------------------------------------------------------------- + +export const SubagentResultInput = z.object({ + task_id: z.string().uuid().describe('Task ID from spawn_subagent'), +}); + +export type SubagentResultInputT = z.infer; + +export async function executeSubagentResult( + input: SubagentResultInputT, + sql: Sql, +): Promise> { + const task = await getBackgroundTaskStatus(sql, input.task_id); + if (!task) { + return { error: 'task not found', task_id: input.task_id }; + } + + if (task.status !== 'completed') { + return { + task_id: task.id, + status: task.status, + error: `task is not yet completed (status: ${task.status})`, + }; + } + + if (!task.chat_id) { + return { error: 'task has no chat data', task_id: input.task_id }; + } + + const result = await getBackgroundTaskResult(sql, input.task_id, task.chat_id); + if (!result) { + return { + task_id: task.id, + status: task.status, + error: 'task completed but no output message found', + }; + } + + return { + task_id: task.id, + output: result.output, + token_usage: result.token_usage, + }; +} + +export const subagentResult: ToolDef = { + name: 'subagent_result', + description: + 'Retrieve the full output of a completed background subagent task by task_id. Returns the response text and token usage. The task must be in completed status — poll with subagent_status first.', + inputSchema: SubagentResultInput, + jsonSchema: { + type: 'function', + function: { + name: 'subagent_result', + description: + 'Retrieve the full output of a completed background subagent task. Returns output text and token usage.', + parameters: { + type: 'object', + properties: { + task_id: { + type: 'string', + format: 'uuid', + description: 'Task ID from spawn_subagent', + }, + }, + required: ['task_id'], + additionalProperties: false, + }, + }, + }, + async execute(input, _projectRoot, _extraRoots, toolCtx?: ToolExecCtx) { + if (!toolCtx) { + return { error: 'subagent_result unavailable: no session context' }; + } + try { + return await executeSubagentResult(input, toolCtx.sql); + } catch (err) { + return { + error: `subagent_result failed: ${err instanceof Error ? err.message : String(err)}`, + }; + } + }, +}; diff --git a/apps/server/src/services/tools/registry.ts b/apps/server/src/services/tools/registry.ts index 196b15d..28b1e6b 100644 --- a/apps/server/src/services/tools/registry.ts +++ b/apps/server/src/services/tools/registry.ts @@ -40,6 +40,13 @@ import { searchMemoryTool } from './search_memory.js'; // vWhale: command execution tool. Spawns processes in the project worktree // with timeout and output cap. No shell — args are passed as array. import { runCommand } from './execute-command.js'; +// v2.x: background subagent tools. Non-blocking subagent execution with +// spawn/poll/collect lifecycle. Reuses existing sessions/chats/messages/tasks. +import { + spawnSubagent, + subagentStatus, + subagentResult, +} from './background-subagent-tools.js'; // v1.13.3: alpha-sorted by tool.name at module load. llama.cpp's prompt // cache hits on byte-identical prefixes; the tool list lives near the top @@ -105,6 +112,10 @@ export let ALL_TOOLS: ToolDef[] = [ // Read-write; use with guard: restricted to project root via path_guard, // no shell injection (execFile, not exec). runCommand as ToolDef, + // v2.x: background subagent tools. Non-blocking spawn/poll/collect lifecycle. + spawnSubagent as ToolDef, + subagentStatus as ToolDef, + subagentResult as ToolDef, ].sort((a, b) => a.name.localeCompare(b.name)); export let TOOLS_BY_NAME: Record> = Object.fromEntries( diff --git a/apps/server/src/services/workflow/catalog.ts b/apps/server/src/services/workflow/catalog.ts new file mode 100644 index 0000000..d6ed015 --- /dev/null +++ b/apps/server/src/services/workflow/catalog.ts @@ -0,0 +1,376 @@ +// v2.8.0: Workflow catalog — built-in workflow definitions that ship with +// BooCode. Each workflow is a metadata object with name, description, and a +// factory function that returns the workflow script source code. +// +// Built-in workflows are merged into the discovery list alongside file-based +// workflows from .boocode/workflows/. They take precedence over user-defined +// workflows with the same name. + +import { createHash } from 'node:crypto'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** + * A built-in workflow definition shipped with BooCode. + */ +export interface BuiltinWorkflow { + /** Unique workflow name (used to invoke via `WorkflowManager`). */ + name: string; + /** Human-readable description of what this workflow does. */ + description: string; + /** Optional ordered phases for UI progress display. */ + phases?: Array<{ title: string; detail?: string }>; + /** + * Generate the workflow script source code for this workflow. + * The returned string must be valid JS that exports `meta` and a `default` + * async function matching the `WorkflowScript` shape. + * + * @param args - Optional arguments provided when the workflow is started. + */ + generateScript: (args?: Record) => string; +} + +// --------------------------------------------------------------------------- +// Script templates (shared helpers) +// --------------------------------------------------------------------------- + +/** + * Stable JSON serialisation for generating deterministic cache keys from + * structured arguments. Keys are sorted so the same data always produces + * the same string regardless of property insertion order. + */ +function stableJson(value: unknown): string { + if (value === null) return 'null'; + if (typeof value !== 'object') return JSON.stringify(value); + if (Array.isArray(value)) { + return `[${value.map(stableJson).join(',')}]`; + } + const keys = Object.keys(value as Record).sort(); + const pairs = keys.map((k) => `${JSON.stringify(k)}:${stableJson((value as Record)[k])}`); + return `{${pairs.join(',')}}`; +} + +/** + * Compute a deterministic SHA-256 fingerprint for a combined spec + args + * payload. Used by the resumability cache to detect unchanged agent tasks. + * + * Exported for testing. + */ +export function fingerprintAgentTask( + prompt: string, + spec: Record, + args: string, +): string { + return createHash('sha256') + .update(stableJson({ prompt, spec, args })) + .digest('hex'); +} + +// --------------------------------------------------------------------------- +// Built-in workflow definitions +// --------------------------------------------------------------------------- + +function generateDeepResearchScript(_args?: Record): string { + return ` +export const meta = { + name: 'deep-research', + description: 'Multi-phase deep research: scope, search, fetch, verify, synthesise.', + phases: [ + { title: 'Scope', detail: 'Define the research question and search criteria' }, + { title: 'Search', detail: 'Query web sources in parallel' }, + { title: 'Fetch', detail: 'Retrieve full content from top sources' }, + { title: 'Verify', detail: 'Cross-reference and validate findings' }, + { title: 'Synthesise', detail: 'Produce a final structured report' }, + ], +}; + +export default async function main(args) { + const query = args?.query ?? 'No query provided'; + log('deep-research: starting with query: ' + query); + + // Phase 1: Scope + phase('Scope'); + const scope = await agent( + 'Analyse this research query and produce a search plan with 3-5 key sub-questions: ' + query, + { label: 'scope-analysis', phase: 'scope' }, + ); + log('Scope completed'); + + // Phase 2: Search + phase('Search'); + const searchResults = await agent( + 'Based on the scope, search for authoritative sources. Return a list of 3-5 URLs with brief annotations.', + { label: 'web-search', phase: 'search' }, + ); + log('Search completed'); + + // Phase 3: Fetch + phase('Fetch'); + const fetchedContent = await agent( + 'Extract and summarise the key information from these sources: ' + JSON.stringify(searchResults), + { label: 'content-fetch', phase: 'fetch' }, + ); + log('Fetch completed'); + + // Phase 4: Verify + phase('Verify'); + const verified = await agent( + 'Cross-reference the fetched information. Note any contradictions, gaps, or weak sources: ' + JSON.stringify(fetchedContent), + { label: 'verification', phase: 'verify' }, + ); + log('Verify completed'); + + // Phase 5: Synthesise + phase('Synthesise'); + const report = await agent( + 'Synthesise the verified information into a structured report with findings, sources, and confidence levels: ' + JSON.stringify(verified), + { label: 'synthesis', phase: 'synthesise' }, + ); + log('deep-research: completed'); + + return { + ok: true, + output: report, + phases: { scope, searchResults, fetchedContent, verified, report }, + }; +} +`.trim(); +} + +function generateReviewCodeScript(_args?: Record): string { + return ` +export const meta = { + name: 'review-code', + description: 'Multi-perspective code review: correctness, security, performance, then synthesise.', + phases: [ + { title: 'Correctness', detail: 'Check logic, edge cases, and correctness' }, + { title: 'Security', detail: 'Analyse for vulnerabilities and unsafe patterns' }, + { title: 'Performance', detail: 'Identify performance bottlenecks and optimisation opportunities' }, + { title: 'Synthesise', detail: 'Merge perspectives into a unified review report' }, + ], +}; + +export default async function main(args) { + const target = args?.target ?? args?.path ?? ''; + log('review-code: starting review of: ' + (target || '(no target specified)')); + + const context = await agent( + 'Read the code at ' + (target || 'the provided context') + ' and produce a summary of its structure and purpose.', + { label: 'read-context', phase: 'context' }, + ); + + // Phase 1: Correctness + phase('Correctness'); + const correctness = await agent( + 'Review this code for correctness. Check logical errors, edge cases, type safety, and concurrency issues:\\n' + JSON.stringify(context), + { label: 'correctness-review', phase: 'correctness' }, + ); + + // Phase 2: Security + phase('Security'); + const security = await agent( + 'Review this code for security vulnerabilities. Check for injection, auth bypasses, unsafe deserialisation, secret exposure:\\n' + JSON.stringify(context), + { label: 'security-review', phase: 'security' }, + ); + + // Phase 3: Performance + phase('Performance'); + const performance = await agent( + 'Review this code for performance issues. Check algorithmic complexity, unnecessary allocations, I/O patterns, caching opportunities:\\n' + JSON.stringify(context), + { label: 'performance-review', phase: 'performance' }, + ); + + // Phase 4: Synthesise + phase('Synthesise'); + const report = await agent( + 'Merge these three review perspectives into one structured report with severity-ranked findings:\\n' + + '--- Correctness ---\\n' + JSON.stringify(correctness) + '\\n' + + '--- Security ---\\n' + JSON.stringify(security) + '\\n' + + '--- Performance ---\\n' + JSON.stringify(performance), + { label: 'synthesis', phase: 'synthesise' }, + ); + log('review-code: completed'); + + return { + ok: true, + output: report, + reviews: { correctness, security, performance }, + }; +} +`.trim(); +} + +function generateFindIssuesScript(_args?: Record): string { + return ` +export const meta = { + name: 'find-issues', + description: 'Iterative issue discovery — keep surfacing issues until consecutive rounds find nothing new.', + phases: [ + { title: 'Analyse', detail: 'Analyse the codebase for issues' }, + { title: 'Check dry', detail: 'Verify no new issues remain' }, + ], +}; + +export default async function main(args) { + const target = args?.target ?? args?.path ?? '.'; + const maxRounds = args?.maxRounds ?? 5; + log('find-issues: starting on ' + target + ' (max ' + maxRounds + ' rounds)'); + + const allIssues = []; + let dryRounds = 0; + let round = 0; + + while (dryRounds < 2 && round < maxRounds) { + round++; + phase('Analyse'); + + const context = allIssues.length > 0 + ? 'Previously found issues (exclude these):\\n' + JSON.stringify(allIssues) + : 'No issues found yet.'; + + const newIssues = await agent( + 'Analyse ' + target + ' for bugs, code smells, and anti-patterns.\\n' + context + '\\nReturn a JSON array of issues. If none found, return an empty array.', + { label: 'round-' + round + '-analysis', phase: 'analyse' }, + ); + + let parsed: unknown[] = []; + try { + if (typeof newIssues === 'string') { + parsed = JSON.parse(newIssues); + } else if (Array.isArray(newIssues)) { + parsed = newIssues; + } + } catch { + parsed = []; + } + + if (parsed.length === 0) { + dryRounds++; + phase('Check dry'); + log('Round ' + round + ': no new issues found (dry run ' + dryRounds + '/2)'); + } else { + dryRounds = 0; + for (const issue of parsed) { + allIssues.push(issue); + } + log('Round ' + round + ': found ' + parsed.length + ' new issue(s)'); + } + } + + log('find-issues: completed after ' + round + ' rounds, ' + allIssues.length + ' total issues'); + + return { + ok: true, + output: allIssues, + totalRounds: round, + totalIssues: allIssues.length, + }; +} +`.trim(); +} + +// --------------------------------------------------------------------------- +// Registry +// --------------------------------------------------------------------------- + +/** + * All built-in workflow definitions shipped with BooCode. + */ +const BUILTIN_WORKFLOWS: BuiltinWorkflow[] = [ + { + name: 'deep-research', + description: + 'Performs multi-phase deep research: scope the question, search web sources in parallel, fetch full content, verify findings, and synthesise a structured report.', + phases: [ + { title: 'Scope', detail: 'Define the research question and search criteria' }, + { title: 'Search', detail: 'Query web sources in parallel' }, + { title: 'Fetch', detail: 'Retrieve full content from top sources' }, + { title: 'Verify', detail: 'Cross-reference and validate findings' }, + { title: 'Synthesise', detail: 'Produce a final structured report' }, + ], + generateScript: generateDeepResearchScript, + }, + { + name: 'review-code', + description: + 'Multi-perspective code review that analyses code for correctness, security vulnerabilities, and performance issues in parallel, then merges findings into a unified severity-ranked report.', + phases: [ + { title: 'Correctness', detail: 'Check logic, edge cases, and correctness' }, + { title: 'Security', detail: 'Analyse for vulnerabilities and unsafe patterns' }, + { title: 'Performance', detail: 'Identify performance bottlenecks' }, + { title: 'Synthesise', detail: 'Merge perspectives into a unified report' }, + ], + generateScript: generateReviewCodeScript, + }, + { + name: 'find-issues', + description: + 'Iterative issue discovery that runs analysis rounds until two consecutive passes find nothing new, ensuring comprehensive coverage without infinite loops.', + phases: [ + { title: 'Analyse', detail: 'Analyse the codebase for issues' }, + { title: 'Check dry', detail: 'Verify no new issues remain' }, + ], + generateScript: generateFindIssuesScript, + }, +]; + +/** + * Read-only map of built-in workflows keyed by name. + */ +const BUILTIN_WORKFLOW_MAP = new Map( + BUILTIN_WORKFLOWS.map((w) => [w.name, w]), +); + +/** + * Return all built-in workflow definitions. + */ +export function getBuiltinWorkflows(): BuiltinWorkflow[] { + return BUILTIN_WORKFLOWS; +} + +/** + * Look up a built-in workflow by name. + * + * @param name - Workflow name (e.g. 'deep-research'). + * @returns The built-in workflow, or undefined if not found. + */ +export function getBuiltinWorkflow(name: string): BuiltinWorkflow | undefined { + return BUILTIN_WORKFLOW_MAP.get(name); +} + +/** + * Merge built-in workflow metadata into a list of file-discovered workflow + * entries. Built-in entries take precedence — if a user has a file-based + * workflow with the same name, the built-in version wins. + * + * @param fileWorkflows - Workflow metadata discovered from the filesystem. + * @returns Merged array with built-in workflows injected and duplicate names + * resolved (built-in wins). + */ +export function mergeBuiltinWorkflows( + fileWorkflows: Array<{ name: string; description: string; sourceFile?: string }>, +): Array<{ name: string; description: string; sourceFile?: string }> { + const seen = new Set(); + const result: Array<{ name: string; description: string; sourceFile?: string }> = []; + + // Built-in workflows first (they take precedence) + for (const builtin of BUILTIN_WORKFLOWS) { + seen.add(builtin.name); + result.push({ + name: builtin.name, + description: builtin.description, + // No sourceFile — built-in workflows are generated, not read from disk + }); + } + + // File-discovered workflows — skip any name already claimed by built-in + for (const fw of fileWorkflows) { + if (seen.has(fw.name)) continue; + seen.add(fw.name); + result.push(fw); + } + + return result; +} diff --git a/apps/server/src/services/workflow/discovery.ts b/apps/server/src/services/workflow/discovery.ts new file mode 100644 index 0000000..2c81cc4 --- /dev/null +++ b/apps/server/src/services/workflow/discovery.ts @@ -0,0 +1,134 @@ +// v2.8.0: Workflow file discovery — walks project-local and global workflow +// directories to find runnable scripts. Built-in workflows from the catalog +// are merged into the results (they take precedence over user-defined files). +// All functions exported for testing. + +import { readdirSync, existsSync } from 'node:fs'; +import { join, basename, extname } from 'node:path'; +import { homedir } from 'node:os'; +import { getBuiltinWorkflows, getBuiltinWorkflow } from './catalog.js'; + +/** + * Sentinel prefix used in `sourceFile` for built-in workflows from the + * catalog so callers (e.g. WorkflowManager) can detect and handle them + * by calling `generateScript()` instead of reading a file from disk. + */ +const BUILTIN_PREFIX = 'builtin:'; + +/** + * Metadata about a discovered workflow file (or built-in workflow). + */ +export interface WorkflowMeta { + /** Workflow name (file stem without .js extension). */ + name: string; + /** Description loaded from the workflow module's `meta.description`. + * Empty string until loadWorkflowMeta() resolves it. */ + description: string; + /** Absolute path to the .js file. + * For built-in workflows this is `'builtin:'` — the caller + * should use `getBuiltinWorkflow(name)` and `generateScript()` + * instead of reading this path from disk. */ + sourceFile: string; +} + +/** + * Test whether a `WorkflowMeta.sourceFile` points to a built-in workflow + * (rather than a file on disk). + * + * @param meta - The workflow metadata to check. + */ +export function isBuiltinWorkflow(meta: WorkflowMeta): boolean { + return meta.sourceFile.startsWith(BUILTIN_PREFIX); +} + +/** + * Find all workflow .js files in the standard search paths, merged with + * built-in workflows from the catalog. + * + * Priority order (first match wins for same-named workflows): + * 1. Built-in catalog (always takes precedence) + * 2. /.boocode/workflows/ (project-local) + * 3. ~/.boocode/workflows/ (global, per-user) + * + * @param projectRoot - Absolute path to the current project root. + */ +export function discoverWorkflows(projectRoot: string): WorkflowMeta[] { + const seen = new Set(); + const results: WorkflowMeta[] = []; + + // 1. Built-in workflows (highest priority) + for (const builtin of getBuiltinWorkflows()) { + seen.add(builtin.name); + results.push({ + name: builtin.name, + description: builtin.description, + sourceFile: `${BUILTIN_PREFIX}${builtin.name}`, + }); + } + + // 2. Project-local + global file-based workflows + const dirs = [ + join(projectRoot, '.boocode', 'workflows'), + join(homedir(), '.boocode', 'workflows'), + ]; + + for (const dir of dirs) { + if (!existsSync(dir)) continue; + try { + const entries = readdirSync(dir); + for (const f of entries) { + if (!f.endsWith('.js')) continue; + const name = basename(f, '.js'); + if (seen.has(name)) continue; // built-in shadows project-local, + // project-local shadows global + seen.add(name); + results.push({ + name, + description: '', + sourceFile: join(dir, f), + }); + } + } catch { + // Permission error on directory — skip silently + continue; + } + } + + return results; +} + +/** + * Find a single workflow by name across built-in catalog and search paths. + * + * Priority: built-in > project-local > global. + * + * @param name - Workflow name (without .js extension). + * @param projectRoot - Absolute path to the current project root. + */ +export function findWorkflow( + name: string, + projectRoot: string, +): WorkflowMeta | undefined { + // Check built-in catalog first + const builtin = getBuiltinWorkflow(name); + if (builtin) { + return { + name: builtin.name, + description: builtin.description, + sourceFile: `${BUILTIN_PREFIX}${builtin.name}`, + }; + } + + // Fall back to file-based discovery + return discoverWorkflows(projectRoot).find((w) => w.name === name); +} + +/** + * Validate a candidate workflow file path. + * Checks that the file exists and has a .js extension. + * + * @param filePath - Absolute path to check. + */ +export function isValidWorkflowPath(filePath: string): boolean { + return extname(filePath) === '.js' && existsSync(filePath); +} diff --git a/apps/server/src/services/workflow/index.ts b/apps/server/src/services/workflow/index.ts new file mode 100644 index 0000000..189c6fc --- /dev/null +++ b/apps/server/src/services/workflow/index.ts @@ -0,0 +1,54 @@ +// v2.8.0: Dynamic Workflow Engine — public surface. +// +// Re-exports all types and classes from the workflow sub-modules so consumers +// import from a single entry point: +// +// ```typescript +// import { WorkflowManager } from './services/workflow/index.js'; +// ``` + +export { WorkflowManager } from './manager.js'; +export type { WorkflowMetaInfo } from './manager.js'; +export type { WorkflowEventHandler } from './manager.js'; + +export { discoverWorkflows, findWorkflow, isValidWorkflowPath, isBuiltinWorkflow } from './discovery.js'; +export type { WorkflowMeta } from './discovery.js'; + +export { + loadWorkflowScript, + loadWorkflowScriptFromCode, + executeWorkflowScript, + executeWorkflowScriptFromCode, + buildSandbox, + transformEsmToCjs, + isEsmSyntax, +} from './sandbox.js'; + +export { + getBuiltinWorkflows, + getBuiltinWorkflow, + mergeBuiltinWorkflows, + fingerprintAgentTask, +} from './catalog.js'; +export type { BuiltinWorkflow } from './catalog.js'; + +export { + cacheKey, + getCachedResult, + setCachedResult, + invalidateRun, + clearCache, + cacheSize, +} from './resumability.js'; +export type { CachedResult } from './resumability.js'; + +export type { + WorkflowScript, + WorkflowScriptMeta, + WorkflowContext, + AgentTaskSpec, + AgentTaskResult, + WorkflowRun, + WorkflowRunStatus, + WorkflowEvent, +} from './types.js'; diff --git a/apps/server/src/services/workflow/manager.ts b/apps/server/src/services/workflow/manager.ts new file mode 100644 index 0000000..3e4164d --- /dev/null +++ b/apps/server/src/services/workflow/manager.ts @@ -0,0 +1,659 @@ +// v2.8.0: WorkflowManager — ties discovery, sandbox, and inference dispatch +// together into a single orchestrator for multi-agent workflow scripts. +// +// Creates isolated sessions+chats for each agent() call within a workflow, +// dispatches inference via the existing pipeline, polls for completion, and +// returns structured results. All failures are returned as errors rather than +// thrown exceptions (catch-safe API). + +import { randomUUID } from 'node:crypto'; +import type { Sql } from '../../db.js'; +import type { Config } from '../../config.js'; +import type { FastifyBaseLogger } from 'fastify'; +import type { Broker } from '../broker.js'; +import type { UserStreamFrame } from '../../types/api.js'; +import type { + WorkflowRun, + WorkflowRunStatus, + WorkflowContext, + WorkflowEvent, + AgentTaskSpec, + AgentTaskResult, + WorkflowScriptMeta, +} from './types.js'; +import { discoverWorkflows, findWorkflow, isBuiltinWorkflow } from './discovery.js'; +import { getBuiltinWorkflow } from './catalog.js'; +import { cacheKey, getCachedResult, setCachedResult } from './resumability.js'; +import { + executeWorkflowScript, + executeWorkflowScriptFromCode, + isEsmSyntax, + transformEsmToCjs, +} from './sandbox.js'; +import { runInference } from '../inference/index.js'; +import { readFileSync } from 'node:fs'; +import vm from 'node:vm'; + +/** + * Maximum time to wait for a single agent task to complete (5 minutes). + * Beyond this, the task is treated as failed/timed out. + */ +const AGENT_TASK_TIMEOUT_MS = 300_000; + +/** + * Polling interval when waiting for an agent task to finish. + */ +const POLL_INTERVAL_MS = 500; + +/** + * Maximum time for the entire workflow run (30 minutes). + */ +const WORKFLOW_TIMEOUT_MS = 1_800_000; + +/** + * Token budget tracker. Tracks total token spend across agent calls. + */ +class BudgetTracker { + total: number | null; + #spent = 0; + + constructor(total: number | null) { + this.total = total; + } + + spend(amount: number): void { + this.#spent += amount; + } + + spent(): number { + return this.#spent; + } + + remaining(): number { + if (this.total === null) return Infinity; + return Math.max(0, this.total - this.#spent); + } +} + +/** + * Creates a no-op bounded publish function that avoids WS dependency + * for background workflow agent tasks. Messages are still persisted to DB. + */ +function noopPublish(): void { + /* intentional no-op */ +} + +function noopPublishUser(): void { + /* intentional no-op */ +} + +/** + * Callback type for workflow lifecycle events. + */ +export type WorkflowEventHandler = (event: WorkflowEvent) => void; + +/** + * WorkflowManager — the orchestrator for sandboxed multi-agent workflows. + */ +export class WorkflowManager { + /** Active workflow runs by run ID. */ + readonly #runs = new Map(); + /** Registered event listeners. */ + readonly #listeners = new Set(); + + constructor( + private sql: Sql, + private config: Config, + private log: FastifyBaseLogger, + private projectRoot: string, + private projectId: string, + private broker: Broker, + ) {} + + // ---- public API ---- + + /** + * Discover all available workflow scripts. + */ + listWorkflows(): WorkflowMetaInfo[] { + return discoverWorkflows(this.projectRoot).map((m) => ({ + name: m.name, + sourceFile: m.sourceFile, + })); + } + + /** + * Find a specific workflow by name. + */ + getWorkflow(name: string): WorkflowMetaInfo | undefined { + const found = findWorkflow(name, this.projectRoot); + if (!found) return undefined; + return { name: found.name, sourceFile: found.sourceFile }; + } + + /** + * Load the metadata (name, description, phases) from a workflow file + * without executing it. + * + * @param name - Workflow name. + * @returns The script's meta, or undefined if not found. + */ + async loadWorkflowMeta(name: string): Promise { + const found = findWorkflow(name, this.projectRoot); + if (!found) return undefined; + + // Built-in workflows: return meta directly from the catalog + if (isBuiltinWorkflow(found)) { + const builtin = getBuiltinWorkflow(name); + if (!builtin) return { name, description: '' }; + return { + name: builtin.name, + description: builtin.description, + phases: builtin.phases, + }; + } + + try { + // Load meta by executing the script in a throwaway context + const context = this.#createMinimalContext('meta-loader'); + const code = readFileSync(found.sourceFile, 'utf8'); + const finalCode = isEsmSyntax(code) ? transformEsmToCjs(code) : code; + + const sandboxData: Record & { + module: { exports: Record }; + } = { + ...context, + console: { log: () => {} }, + module: { exports: {} }, + exports: {}, + }; + vm.createContext(sandboxData as unknown as vm.Context); + new vm.Script(finalCode).runInContext(sandboxData as unknown as vm.Context, { + timeout: 10_000, + filename: found.sourceFile, + }); + + const meta = sandboxData.module.exports.meta as WorkflowScriptMeta | undefined; + return meta ?? { name, description: '' }; + } catch { + return { name, description: '' }; + } + } + + /** + * Execute a workflow by name. + * + * @param name - The workflow name (without .js extension). + * @param args - Optional arguments to pass to the workflow function. + * @returns The run ID for tracking. + */ + async runWorkflow( + name: string, + args?: Record, + ): Promise<{ runId: string }> { + const found = findWorkflow(name, this.projectRoot); + if (!found) { + throw new Error(`Workflow not found: "${name}". ` + + `Check .boocode/workflows/ or ~/.boocode/workflows/ for a ${name}.js file.`); + } + + const runId = randomUUID(); + const startedAt = new Date().toISOString(); + const state: WorkflowRunState = { + id: runId, + name, + status: 'running', + startedAt, + abortController: new AbortController(), + }; + this.#runs.set(runId, state); + this.#emit({ type: 'run_started', runId, name }); + + // Run asynchronously — caller receives the runId immediately. + void this.#executeRun(state, found.sourceFile, args ?? {}); + + return { runId }; + } + + /** + * Get the current status of a workflow run. + */ + getRunStatus(runId: string): WorkflowRun | undefined { + const state = this.#runs.get(runId); + if (!state) return undefined; + return { + id: state.id, + name: state.name, + status: state.status, + started_at: state.startedAt, + finished_at: state.finishedAt, + error: state.error, + }; + } + + /** + * Cancel a running workflow. Best-effort — agent tasks in-flight will be + * aborted via AbortSignal. + * + * @param runId - The workflow run ID. + * @returns true if the workflow was found and cancelled. + */ + cancelRun(runId: string): boolean { + const state = this.#runs.get(runId); + if (!state || state.status !== 'running') return false; + state.status = 'cancelled'; + state.finishedAt = new Date().toISOString(); + state.abortController.abort(); + this.#emit({ type: 'run_cancelled', runId, name: state.name }); + return true; + } + + /** + * Subscribe to workflow lifecycle events. + * Returns an unsubscribe function. + */ + onEvent(handler: WorkflowEventHandler): () => void { + this.#listeners.add(handler); + return () => { + this.#listeners.delete(handler); + }; + } + + // ---- internal execution ---- + + /** + * Execute the workflow script in the sandbox. + */ + async #executeRun( + state: WorkflowRunState, + sourceFile: string, + args: Record, + ): Promise { + const BULTIN_MARKER = 'builtin:'; + const budgetTracker = new BudgetTracker(null); // no fixed total yet + const runId = state.id; + + try { + const context: WorkflowContext = { + agent: (prompt, opts) => + this.#handleAgentCall(runId, prompt, opts ?? { prompt }, state.abortController.signal), + parallel: (thunks) => + Promise.all(thunks.map((t) => t())), + pipeline: async (items, ...stages) => { + let result = [...items]; + for (const stage of stages) { + result = await Promise.all(result.map(stage)); + } + return result; + }, + phase: (title) => { + this.#emit({ type: 'phase', runId, title }); + }, + log: (message) => { + this.#emit({ type: 'log', runId, message }); + }, + budget: { + total: budgetTracker.total, + spent: () => budgetTracker.spent(), + remaining: () => budgetTracker.remaining(), + }, + args, + workflow: (nestedName, nestedArgs) => + this.#handleNestedWorkflow(runId, nestedName, nestedArgs ?? {}, state.abortController.signal), + }; + + let result: unknown; + if (sourceFile.startsWith(BULTIN_MARKER)) { + // Built-in workflow: generate script from catalog and execute + const workflowName = sourceFile.slice(BULTIN_MARKER.length); + const builtin = getBuiltinWorkflow(workflowName); + if (!builtin) { + throw new Error(`Built-in workflow "${workflowName}" not found in catalog`); + } + const scriptCode = builtin.generateScript(args); + result = await executeWorkflowScriptFromCode(scriptCode, context, args, sourceFile); + } else { + result = await executeWorkflowScript(sourceFile, context, args); + } + + // Only update to completed if we haven't been cancelled mid-flight. + if (state.status !== 'cancelled') { + state.status = 'completed'; + state.finishedAt = new Date().toISOString(); + } + // Store result + state.result = result; + this.#emit({ type: 'run_completed', runId, name: state.name }); + } catch (err) { + if (state.status === 'cancelled') return; // already handled + const message = err instanceof Error ? err.message : String(err); + state.status = 'failed'; + state.finishedAt = new Date().toISOString(); + state.error = message; + this.#emit({ type: 'run_failed', runId, name: state.name, error: message }); + } + } + + /** + * Handle an `agent()` call from within a workflow. + * Creates a session + chat, dispatches inference, polls for completion. + */ + async #handleAgentCall( + runId: string, + prompt: string, + spec: AgentTaskSpec, + signal: AbortSignal, + ): Promise { + const label = spec.label ?? `agent-${prompt.slice(0, 40).replace(/\s+/g, '_')}`; + + this.#emit({ type: 'agent_task_started', runId, label }); + + try { + const result = await this.executeAgentTask(prompt, spec, signal); + this.#emit({ type: 'agent_task_completed', runId, label }); + return result; + } catch (err) { + this.#emit({ type: 'agent_task_completed', runId, label }); + const message = err instanceof Error ? err.message : String(err); + return { + ok: false, + output: null, + error: message, + } satisfies AgentTaskResult; + } + } + + /** + * Core agent task execution: create session/chat, dispatch inference, poll. + * + * Exported as a public method for testing. + */ + async executeAgentTask( + prompt: string, + spec: AgentTaskSpec, + signal?: AbortSignal, + ): Promise { + // ---- 0. Check resumability cache before creating a new task ---- + const cacheKeyStr = cacheKey(spec, ''); + const cached = getCachedResult(cacheKeyStr); + if (cached) { + return { ...cached, cached: true } satisfies AgentTaskResult; + } + + const model = spec.model ?? null; + + // ---- 1. Create a session for this agent task ---- + const sessionName = `workflow-agent-${spec.label ?? 'task'}`; + const sessionResult = await this.sql.begin(async (tx) => { + const [session] = await tx<{ id: string }[]>` + INSERT INTO sessions (project_id, name, model) + VALUES (${this.projectId}, ${sessionName}, ${model ?? 'qwen3.6-35b-a3b-mxfp4'}) + RETURNING id + `; + if (!session) throw new Error('Failed to create workflow agent session'); + return session; + }); + const sessionId = sessionResult.id; + + // ---- 2. Create a chat in this session ---- + const chatResult = await this.sql.begin(async (tx) => { + const [chat] = await tx<{ id: string }[]>` + INSERT INTO chats (session_id, name) + VALUES (${sessionId}, ${spec.label ?? null}) + RETURNING id + `; + if (!chat) throw new Error('Failed to create workflow agent chat'); + return chat; + }); + const chatId = chatResult.id; + + // ---- 3. Insert user message + streaming assistant message ---- + const { userMessageId, assistantMessageId } = await this.sql.begin(async (tx) => { + const [userMsg] = await tx<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'user', ${prompt}, 'complete', clock_timestamp()) + RETURNING id + `; + const [assistantMsg] = await tx<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) + RETURNING id + `; + return { + userMessageId: userMsg!.id, + assistantMessageId: assistantMsg!.id, + }; + }); + + // ---- 4. Dispatch inference ---- + // Create a bounded InferenceContext that won't crash on missing WS + const ctx: import('../inference/types.js').InferenceContext = { + sql: this.sql, + config: this.config, + log: this.log, + publish: noopPublish as unknown as import('../inference/types.js').FramePublisher, + publishUser: noopPublishUser as unknown as (frame: UserStreamFrame) => void, + broker: this.broker, + }; + + // Create a merged signal (workflow cancellation + optional caller signal) + const mergedController = new AbortController(); + const onAbort = () => mergedController.abort(); + signal?.addEventListener('abort', onAbort, { once: true }); + + const inferencePromise = runInference( + ctx, + sessionId, + chatId, + assistantMessageId, + mergedController.signal, + ).finally(() => { + signal?.removeEventListener('abort', onAbort); + }); + + // ---- 5. Poll for completion ---- + try { + const result = await this.#pollForCompletion( + chatId, + assistantMessageId, + inferencePromise, + mergedController.signal, + ); + + // Cache successful results for resumability + if (typeof result === 'object' && result !== null && (result as Record).ok === true) { + setCachedResult(cacheKeyStr, { + ok: true, + output: (result as Record).output, + token_usage: (result as Record).token_usage as + | { prompt: number; completion: number } + | undefined, + }); + } + + return result; + } catch (err) { + if ((err as Error)?.message === 'cancelled') { + return { ok: false, output: null, error: 'Task was cancelled' } satisfies AgentTaskResult; + } + return { + ok: false, + output: null, + error: err instanceof Error ? err.message : String(err), + } satisfies AgentTaskResult; + } + } + + /** + * Poll the messages table until the assistant message status changes + * from 'streaming' to 'complete' / 'failed' / 'cancelled'. + */ + async #pollForCompletion( + chatId: string, + assistantMessageId: string, + inferencePromise: Promise, + signal: AbortSignal, + ): Promise { + // Wait for either inference to finish or timeout + const timeout = new Promise((_, reject) => { + const timer = setTimeout(() => { + reject(new Error(`Agent task timed out after ${AGENT_TASK_TIMEOUT_MS}ms`)); + }, AGENT_TASK_TIMEOUT_MS); + signal.addEventListener('abort', () => { + clearTimeout(timer); + reject(new Error('cancelled')); + }, { once: true }); + }); + + // Poll loop — runs until inference completes, timeout, or cancellation + const pollLoop = (async () => { + // eslint-disable-next-line no-constant-condition + while (true) { + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); + + const rows = await this.sql<{ + status: string; + content: string; + tool_calls: unknown; + tokens_used: number | null; + }[]>` + SELECT m.status, m.content, m.role, + (SELECT jsonb_agg(p.payload ORDER BY p.sequence) + FROM message_parts p + WHERE p.message_id = m.id AND p.kind = 'tool_call' AND p.hidden_at IS NULL) AS tool_calls, + m.tokens_used + FROM messages m + WHERE m.id = ${assistantMessageId} + `; + + const msg = rows[0]; + if (!msg) { + throw new Error(`Assistant message ${assistantMessageId} not found`); + } + + if (msg.status === 'complete') { + return { + ok: true, + output: msg.content, + token_usage: msg.tokens_used ? { prompt: 0, completion: msg.tokens_used } : undefined, + }; + } + + if (msg.status === 'failed' || msg.status === 'cancelled') { + return { + ok: false, + output: msg.content || null, + error: `Assistant message ended with status: ${msg.status}`, + }; + } + + // Still streaming — continue polling + } + })(); + + // Race: polling vs timeout vs inference error vs cancellation + try { + return await Promise.race([pollLoop, timeout]); + } finally { + // Ensure inference is settled (but don't block on it) + inferencePromise.catch(() => {}); + } + } + + /** + * Handle a nested `workflow()` call from within a workflow. + * Runs the named workflow with the given args and returns its result. + */ + async #handleNestedWorkflow( + parentRunId: string, + name: string, + args: Record, + signal: AbortSignal, + ): Promise { + const found = findWorkflow(name, this.projectRoot); + if (!found) { + return { ok: false, output: null, error: `Nested workflow not found: "${name}"` }; + } + + const nestedRunId = randomUUID(); + const startedAt = new Date().toISOString(); + const nestedState: WorkflowRunState = { + id: nestedRunId, + name, + status: 'running', + startedAt, + abortController: new AbortController(), + }; + this.#runs.set(nestedRunId, nestedState); + this.#emit({ type: 'run_started', runId: nestedRunId, name }); + + // Link parent cancellation to nested + signal.addEventListener('abort', () => { + nestedState.abortController.abort(); + }, { once: true }); + + await this.#executeRun(nestedState, found.sourceFile, args); + + if (nestedState.status === 'cancelled') { + return { ok: false, output: null, error: 'Nested workflow cancelled' }; + } + if (nestedState.status === 'failed') { + return { ok: false, output: null, error: nestedState.error }; + } + return { ok: true, output: nestedState.result }; + } + + /** + * Create a minimal WorkflowContext for non-execution purposes + * (e.g. loading meta). + */ + #createMinimalContext(runId: string): Record { + return { + agent: () => Promise.reject(new Error('Not available in this context')), + parallel: () => Promise.reject(new Error('Not available in this context')), + pipeline: () => Promise.reject(new Error('Not available in this context')), + phase: () => {}, + log: () => {}, + budget: { total: null, spent: () => 0, remaining: () => Infinity }, + args: {}, + workflow: () => Promise.reject(new Error('Not available in this context')), + }; + } + + /** + * Emit a workflow event to all registered listeners. + */ + #emit(event: WorkflowEvent): void { + for (const handler of this.#listeners) { + try { + handler(event); + } catch { + // Swallow listener errors — one bad listener shouldn't break others + } + } + } +} + +// ---- internal types ---- + +/** + * Metadata returned from listWorkflows / getWorkflow. + */ +export interface WorkflowMetaInfo { + name: string; + sourceFile: string; +} + +/** + * Internal mutable state for an active workflow run. + */ +interface WorkflowRunState { + id: string; + name: string; + status: WorkflowRunStatus; + startedAt: string; + finishedAt?: string; + error?: string; + result?: unknown; + abortController: AbortController; +} diff --git a/apps/server/src/services/workflow/resumability.ts b/apps/server/src/services/workflow/resumability.ts new file mode 100644 index 0000000..d8d2f47 --- /dev/null +++ b/apps/server/src/services/workflow/resumability.ts @@ -0,0 +1,195 @@ +// v2.8.0: Workflow resumability cache — SHA-256 hash-based in-memory cache +// for completed agent task results. When a workflow re-runs, completed agents +// with unchanged specs skip execution and return cached results. +// +// The cache is purely in-memory (Map). No DB persistence for v1. +// All functions are exported for testing. + +import { createHash } from 'node:crypto'; +import type { AgentTaskSpec } from './types.js'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** + * Shape of a cached agent task result. Mirrors the successful fields of + * `AgentTaskResult` without the runtime-only `cached` flag. + */ +export interface CachedResult { + ok: boolean; + output: unknown; + error?: string; + token_usage?: { prompt: number; completion: number }; +} + +/** + * Internal cache entry with insertion timestamp for TTL support. + */ +interface CacheEntry { + result: CachedResult; + insertedAt: number; +} + +// --------------------------------------------------------------------------- +// Cache store +// --------------------------------------------------------------------------- + +/** + * Default TTL for cached entries (30 minutes). + * After this period entries are considered stale and are evicted on access. + */ +const DEFAULT_TTL_MS = 1_800_000; + +/** + * Maximum number of entries before the cache starts evicting oldest entries. + */ +const MAX_ENTRIES = 500; + +/** + * In-memory cache store: SHA-256 hash → cached result. + */ +const cache = new Map(); + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/** + * Build a deterministic SHA-256 hash for an agent task specification. + * + * The hash is computed from a stable-ordered JSON serialisation of the spec + * (prompt + options) so that identical specs always produce the same key + * regardless of JavaScript property insertion order. + * + * @param spec - The agent task specification (prompt, options, etc.). + * @param args - Additional arguments string (e.g. workflow args fingerprint). + * @returns A 64-character hex SHA-256 digest. + */ +export function cacheKey(spec: AgentTaskSpec, args: string): string { + const hash = createHash('sha256'); + + // Stable-sorted serialisation of the spec + hash.update(stableJson(spec)); + + // Append the args fingerprint + hash.update('\0'); + hash.update(args); + + return hash.digest('hex'); +} + +/** + * Look up a cached result by its cache key. + * + * Returns `null` when: + * - The key doesn't exist in the cache. + * - The cached entry has exceeded the TTL (evicted silently). + * + * @param key - The SHA-256 hex key returned by `cacheKey()`. + * @returns The cached result, or `null` if not found or expired. + */ +export function getCachedResult(key: string): CachedResult | null { + const entry = cache.get(key); + if (!entry) return null; + + // TTL check — stale entries are evicted on access + if (Date.now() - entry.insertedAt > DEFAULT_TTL_MS) { + cache.delete(key); + return null; + } + + return entry.result; +} + +/** + * Store an agent task result in the cache. + * + * If the cache has reached `MAX_ENTRIES`, the oldest entry (by insertion time) + * is evicted first. This is a simple FIFO eviction — not a full LRU — because + * workflow runs are expected to exhibit high temporal locality (recently + * completed steps in the current run are the most likely to be re-queried). + * + * @param key - The SHA-256 hex key returned by `cacheKey()`. + * @param result - The result to cache. + */ +export function setCachedResult(key: string, result: CachedResult): void { + // Evict oldest entry if at capacity + if (cache.size >= MAX_ENTRIES) { + let oldestKey: string | undefined; + let oldestTime = Infinity; + + for (const [k, entry] of cache) { + if (entry.insertedAt < oldestTime) { + oldestTime = entry.insertedAt; + oldestKey = k; + } + } + + if (oldestKey) { + cache.delete(oldestKey); + } + } + + cache.set(key, { + result, + insertedAt: Date.now(), + }); +} + +/** + * Invalidate all cached entries that were produced during a specific workflow + * run. The `runKey` is matched as a prefix of the cache key — this works + * because `cacheKey()` incorporates the args string, and the caller passes + * a run-specific token as the `args` parameter. + * + * @param runKey - The run-specific key prefix to invalidate. + */ +export function invalidateRun(runKey: string): void { + for (const key of cache.keys()) { + if (key.startsWith(runKey)) { + cache.delete(key); + } + } +} + +/** + * Clear the entire cache. Used for testing and manual reset. + */ +export function clearCache(): void { + cache.clear(); +} + +/** + * Return the current number of entries in the cache. + * Useful for testing assertions. + */ +export function cacheSize(): number { + return cache.size; +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +/** + * Stable JSON serialisation that produces the same output string for the same + * data regardless of JavaScript object property insertion order. + * + * - Object keys are sorted lexicographically. + * - Arrays preserve their element order. + * - Primitives are serialised via `JSON.stringify`. + */ +function stableJson(value: unknown): string { + if (value === null) return 'null'; + if (typeof value !== 'object') return JSON.stringify(value); + if (Array.isArray(value)) { + return `[${value.map(stableJson).join(',')}]`; + } + const keys = Object.keys(value as Record).sort(); + const pairs = keys.map( + (k) => + `${JSON.stringify(k)}:${stableJson((value as Record)[k])}`, + ); + return `{${pairs.join(',')}}`; +} diff --git a/apps/server/src/services/workflow/sandbox.ts b/apps/server/src/services/workflow/sandbox.ts new file mode 100644 index 0000000..8ae3e8f --- /dev/null +++ b/apps/server/src/services/workflow/sandbox.ts @@ -0,0 +1,284 @@ +// v2.8.0: VM sandbox for executing workflow scripts in an isolated Node.js +// context with a restricted global scope. Uses Node's built-in `vm` module +// (zero additional dependencies). +// +// Workflow scripts can use either CommonJS (`module.exports`) or ESM syntax +// (`export const` / `export default`). ESM syntax is automatically transformed +// to CJS before execution via a lightweight regex transform. + +import vm from 'node:vm'; +import { readFileSync } from 'node:fs'; +import type { WorkflowContext } from './types.js'; + +/** + * Shared timeout for all sandboxed script execution. + * Prevents runaway workflows from blocking the server indefinitely. + */ +const EXECUTION_TIMEOUT_MS = 30_000; + +/** + * Regex-based ESM-to-CJS transform for workflow scripts. + * + * Handles: + * - `export const|let|var = ;` → ` = ;` + * - `export default ;` → `default = ;` + * - `export default function (...) {...}` → `default = function (...) {...}` + * - `export { , }` → removed (inline assignment) + * + * @param code - Raw source code (ESM or CJS). + * @returns Code transformed to CJS assignments suitable for vm.Script. + */ +export function transformEsmToCjs(code: string): string { + // Remove `export ` prefix from declarations and `export default` assignments. + // Order matters: handle `export default function` before bare `export default`. + let transformed = code + // export default async function name(...) {...} → default = async function name(...) {...} + .replace( + /export\s+default\s+(async\s+)?function\s*\**\s*(\w+)?\s*\(/g, + (_, asyncKw, _name) => { + return `default = ${asyncKw ?? ''}function ${_name ?? ''}(`; + }, + ) + // export default class Name {...} → default = class Name {...} + .replace(/export\s+default\s+(class\s+\w+)/g, 'default = $1') + // export default ; → default = ; + .replace(/export\s+default\s+/g, 'default = ') + // export const|let|var name = value → name = value + .replace( + /export\s+(const|let|var)\s+(\w+)\s*=/g, + (_, _decl, name) => `${name} =`, + ) + // export function name(...) {...} → (hoisted, keep as-is but remove export) + .replace(/^export\s+(function\s+\w+)/gm, '$1') + // export class Name {...} → keep but remove export + .replace(/^export\s+(class\s+\w+)/gm, '$1') + // export { a, b, c } → (remove line) + .replace(/^export\s+\{[^}]*\}\s*;?\s*$/gm, '') + // export { a, b as c } → (remove line) + .replace(/^export\s+\{[^}]*\s+as\s+\w+[^}]*\}\s*;?\s*$/gm, ''); + + return transformed; +} + +/** + * Determine whether code uses ESM export syntax (export keyword at line start + * or after optional whitespace). + */ +export function isEsmSyntax(code: string): boolean { + return /^\s*export\s+(const|let|var|function|class|default|\{)/m.test(code); +} + +/** + * Build a restricted sandbox object with the workflow runtime API. + * + * @param context - The WorkflowContext methods to expose to the script. + * @returns A plain object suitable for vm.createContext(). + */ +export function buildSandbox(context: WorkflowContext): Record { + return { + // --- Workflow API (from context) --- + agent: context.agent, + parallel: context.parallel, + pipeline: context.pipeline, + phase: context.phase, + log: context.log, + budget: context.budget, + args: context.args, + workflow: context.workflow, + + // --- Safe built-ins --- + console: { + log: context.log, + warn: context.log, + error: context.log, + }, + setTimeout, + clearTimeout, + setInterval: undefined, // intentionally disabled + clearInterval: undefined, // intentionally disabled + Promise, + JSON, + Math, + Date, + RegExp, + Error, + Array, + Object, + String, + Number, + Boolean, + Map, + Set, + WeakMap, + WeakSet, + parseInt, + parseFloat, + isNaN, + isFinite, + Symbol, + BigInt, + undefined, + null: null, + true: true, + false: false, + + // --- CommonJS interop --- + module: { exports: {} }, + exports: {}, + require: undefined, // intentionally disabled + global: undefined, // prevent escape via `globalThis` + }; +} + +/** + * Execute a workflow script in the sandbox and return its default export + * (the main async function). + * + * @param sourceFile - Absolute path to the .js workflow file. + * @param context - The WorkflowContext to expose to the script. + * @returns The workflow's default export function. + * @throws {Error} If the script doesn't export a default async function, + * or if execution fails. + */ +export function loadWorkflowScript( + sourceFile: string, + context: WorkflowContext, +): (...args: unknown[]) => Promise { + const code = readFileSync(sourceFile, 'utf8'); + const finalCode = isEsmSyntax(code) ? transformEsmToCjs(code) : code; + + const rawSandbox = buildSandbox(context); + const sandbox = rawSandbox as Record & { + module: { exports: Record }; + }; + + vm.createContext(sandbox); + + try { + const script = new vm.Script(finalCode); + script.runInContext(sandbox, { + timeout: EXECUTION_TIMEOUT_MS, + filename: sourceFile, + }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + throw new Error(`Workflow script execution failed: ${msg}`); + } + + // Check module.exports first (CJS), then sandbox.default (ESM transform) + const exported = sandbox.module.exports.default ?? sandbox.default; + // Also support `module.exports = async function(...)` (direct assignment) + const mainFn = + typeof sandbox.module.exports === 'function' + ? sandbox.module.exports + : exported; + + if (typeof mainFn !== 'function') { + const exportedKeys = Object.keys({ + ...sandbox.module.exports, + ...(sandbox.default ? { default: true } : {}), + }); + throw new Error( + `Workflow script must export a default async function. ` + + `Found exports: ${exportedKeys.join(', ') || '(none)'}. ` + + `Make sure your script has "export default async function main(args) {...}".`, + ); + } + + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return mainFn as (...args: unknown[]) => Promise; +} + +/** + * Load a workflow script from a source code string (rather than a file). + * Useful for built-in workflows from the catalog that don't have a + * corresponding .js file on disk. + * + * @param code - The JavaScript source code of the workflow. + * @param context - The WorkflowContext to expose. + * @param filename - Virtual filename for stack traces (e.g. 'builtin://deep-research'). + * @returns The workflow's default export function. + * @throws {Error} If the script doesn't export a default async function. + */ +export function loadWorkflowScriptFromCode( + code: string, + context: WorkflowContext, + filename?: string, +): (...args: unknown[]) => Promise { + const finalCode = isEsmSyntax(code) ? transformEsmToCjs(code) : code; + + const rawSandbox = buildSandbox(context); + const sandbox = rawSandbox as Record & { + module: { exports: Record }; + }; + + vm.createContext(sandbox); + + try { + const script = new vm.Script(finalCode); + script.runInContext(sandbox, { + timeout: EXECUTION_TIMEOUT_MS, + filename: filename ?? 'workflow:', + }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + throw new Error(`Workflow script execution failed: ${msg}`); + } + + const exported = sandbox.module.exports.default ?? sandbox.default; + const mainFn = + typeof sandbox.module.exports === 'function' + ? sandbox.module.exports + : exported; + + if (typeof mainFn !== 'function') { + const exportedKeys = Object.keys({ + ...sandbox.module.exports, + ...(sandbox.default ? { default: true } : {}), + }); + throw new Error( + `Workflow script must export a default async function. ` + + `Found exports: ${exportedKeys.join(', ') || '(none)'}.`, + ); + } + + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return mainFn as (...args: unknown[]) => Promise; +} + +/** + * High-level convenience: load and execute a workflow script in a single call. + * + * @param sourceFile - Absolute path to the .js workflow file. + * @param context - The WorkflowContext to expose. + * @param args - Optional arguments passed to the workflow function. + * @returns The workflow's return value. + */ +export async function executeWorkflowScript( + sourceFile: string, + context: WorkflowContext, + args?: Record, +): Promise { + const mainFn = loadWorkflowScript(sourceFile, context); + return mainFn(args); +} + +/** + * Execute a workflow from source code (string) rather than a file. + * Convenience wrapper around `loadWorkflowScriptFromCode`. + * + * @param code - The JavaScript source code of the workflow. + * @param context - The WorkflowContext to expose. + * @param args - Optional arguments passed to the workflow function. + * @param filename - Virtual filename for stack traces. + * @returns The workflow's return value. + */ +export async function executeWorkflowScriptFromCode( + code: string, + context: WorkflowContext, + args?: Record, + filename?: string, +): Promise { + const mainFn = loadWorkflowScriptFromCode(code, context, filename); + return mainFn(args); +} diff --git a/apps/server/src/services/workflow/types.ts b/apps/server/src/services/workflow/types.ts new file mode 100644 index 0000000..bd00581 --- /dev/null +++ b/apps/server/src/services/workflow/types.ts @@ -0,0 +1,128 @@ +// v2.8.0: Dynamic Workflow Engine — types for the sandboxed multi-agent +// orchestration runtime. All types are exported for testing. + +/** + * The expected shape of a workflow script module. + * Workflow files are plain .js files that export `meta` and `default`: + * + * ```js + * export const meta = { + * name: 'my-workflow', + * description: 'Does something useful in phases', + * phases: [ + * { title: 'Research', detail: 'Gather context' }, + * { title: 'Implement', detail: 'Make changes' }, + * ], + * }; + * + * export default async function main(args) { + * const result = await agent('...'); + * return result; + * } + * ``` + */ +export interface WorkflowScriptMeta { + name: string; + description: string; + phases?: Array<{ title: string; detail?: string }>; +} + +export interface WorkflowScript { + meta: WorkflowScriptMeta; + default: (args?: Record) => Promise; +} + +/** + * Specification for dispatching a single agent task within a workflow. + */ +export interface AgentTaskSpec { + /** The instruction prompt for the agent. */ + prompt: string; + /** Optional human-readable label for this task (shown in UI). */ + label?: string; + /** Phase identifier for grouping tasks. */ + phase?: string; + /** Model override (defaults to session/chat model). */ + model?: string; + /** Zod-style JSON schema for structured output validation. */ + schema?: Record; + /** Required capabilities the agent must have. */ + capabilities?: string[]; + /** Per-agent tool-call budget ceiling. */ + max_tool_calls?: number; + /** Per-agent step cap for the inference loop. */ + max_tool_iters?: number; +} + +/** + * Result returned after an agent task completes. + */ +export interface AgentTaskResult { + ok: boolean; + output: unknown; + error?: string; + token_usage?: { prompt: number; completion: number }; + /** True when this result was served from the resumability cache + * rather than re-executing the agent task. */ + cached?: boolean; +} + +/** + * Runtime context passed into every workflow script's default function. + * Mirrors the Claude Code-compatible API surface. + */ +export interface WorkflowContext { + /** Dispatch a single agent prompt. Returns the assistant's reply content. */ + agent: (prompt: string, opts?: AgentTaskSpec) => Promise; + /** Run multiple independent tasks concurrently. Returns results in order. */ + parallel: (thunks: Array<() => Promise>) => Promise; + /** Pass items through a sequence of transform stages. */ + pipeline: ( + items: unknown[], + ...stages: Array<(item: unknown) => Promise> + ) => Promise; + /** Announce the current execution phase (for UI progress). */ + phase: (title: string) => void; + /** Emit a log message for this workflow run. */ + log: (message: string) => void; + /** Token budget tracker for the current run. */ + budget: { + total: number | null; + spent: () => number; + remaining: () => number; + }; + /** The arguments passed when this workflow was started. */ + args: Record; + /** Call another workflow from within a workflow (nested). */ + workflow: (name: string, args?: Record) => Promise; +} + +/** + * Status of a workflow execution run. + */ +export type WorkflowRunStatus = 'running' | 'completed' | 'failed' | 'cancelled'; + +/** + * Persistent record of a workflow run. + */ +export interface WorkflowRun { + id: string; + name: string; + status: WorkflowRunStatus; + started_at: string; + finished_at?: string; + error?: string; +} + +/** + * Event emitted by the workflow manager for subscribers. + */ +export type WorkflowEvent = + | { type: 'run_started'; runId: string; name: string } + | { type: 'run_completed'; runId: string; name: string } + | { type: 'run_failed'; runId: string; name: string; error: string } + | { type: 'run_cancelled'; runId: string; name: string } + | { type: 'phase'; runId: string; title: string } + | { type: 'log'; runId: string; message: string } + | { type: 'agent_task_started'; runId: string; label?: string } + | { type: 'agent_task_completed'; runId: string; label?: string }; diff --git a/apps/web/src/api/types.ts b/apps/web/src/api/types.ts index d87e9ae..5471936 100644 --- a/apps/web/src/api/types.ts +++ b/apps/web/src/api/types.ts @@ -101,6 +101,7 @@ export interface Chat { id: string; session_id: string; name: string | null; + model: string | null; status: ChatStatus; created_at: string; updated_at: string; @@ -131,6 +132,10 @@ export interface ToolResult { output: unknown; truncated: boolean; error?: string; + // v2.8: unified diff snippet for write-tool results. Present when the tool + // modified files (edit_file, create_file, etc.) and the backend computed a + // diff. Rendered inline by DiffSnippet. + diff?: string; } // v1.8.2 / v1.11.6: ErrorReason + MessageMetadata single-sourced in @@ -172,6 +177,10 @@ export interface Message { // (CoderPane/CoderMessageList) and streams it live via reasoning_delta // frames. MessageBubble reads whichever of the two is present. reasoning_text?: string | null; + // v2.8-compare: compare group id. Set when the message is part of a + // multi-model compare response. All assistant messages in the same compare + // group share this id, keyed to the user message that triggered the compare. + compare_group_id?: string; // v1.11: anchored rolling compaction fields. Optional on the wire so that // older API responses (or test fixtures) parse without explicit nulls. // summary — true on the assistant row that holds the active @@ -513,8 +522,8 @@ export interface WorkspaceState { export type WsFrame = | { type: 'snapshot'; messages: Message[] } - | { type: 'message_started'; message_id: string; chat_id?: string; role: MessageRole } - | { type: 'delta'; message_id: string; chat_id?: string; content: string } + | { type: 'message_started'; message_id: string; chat_id?: string; role: MessageRole; compare_group_id?: string } + | { type: 'delta'; message_id: string; chat_id?: string; content: string; compare_group_id?: string } | { type: 'tool_call'; message_id: string; chat_id?: string; tool_call: ToolCall } | { type: 'tool_result'; @@ -524,6 +533,7 @@ export type WsFrame = output: unknown; truncated: boolean; error?: string; + diff?: string; } | { type: 'message_complete'; @@ -547,6 +557,7 @@ export type WsFrame = // 'cancelled' on a user Stop / stall and 'failed' on a thrown error so the // reducer renders a muted "Stopped" / failed state — no new frame type. status?: 'complete' | 'cancelled' | 'failed'; + compare_group_id?: string; } // v1.12.2: live throughput frame, published mid-stream every ~500ms with // the latest token + ctx counts so ChatThroughput can render tok/s and @@ -576,7 +587,7 @@ export type WsFrame = | { type: 'compacted'; session_id: string; chat_id: string; summary_message_id: string } // v1.8.2: `reason` discriminates structured failures (the UI prefers it // over `error` text when present). - | { type: 'error'; message_id?: string; chat_id?: string; error: string; reason?: ErrorReason } + | { type: 'error'; message_id?: string; chat_id?: string; error: string; reason?: ErrorReason; compare_group_id?: string } // agent-status-normalize (#10): BooCoder publishes a normalized per-(chat,agent) // lifecycle status for external coding agents on the per-session channel. The // CoderPane tracks the latest status per (chat_id, agent) and resets on chat @@ -674,11 +685,13 @@ export type WsFrame = message_id?: string; chat_id?: string; content?: string; + compare_group_id?: string; tool_call?: ToolCall; tool_message_id?: string; tool_call_id?: string; output?: unknown; truncated?: boolean; + diff?: string; error?: string; reason?: string; status?: 'running' | 'complete' | 'cancelled' | 'failed'; diff --git a/apps/web/src/components/CacheShapeBadge.tsx b/apps/web/src/components/CacheShapeBadge.tsx new file mode 100644 index 0000000..1110170 --- /dev/null +++ b/apps/web/src/components/CacheShapeBadge.tsx @@ -0,0 +1,38 @@ +// vDeepSeek: cache shape telemetry badge. Displays cache token count with +// a colored hit-rate bar in the trace viewer. Color thresholds are relative +// to output tokens (tokens_used) since the trace doesn't carry prompt miss +// tokens separately: green > 50%, yellow > 10%, red ≤ 10%. + +export interface CacheShapeBadgeProps { + cacheTokens: number | null | undefined; + totalTokens: number | null | undefined; +} + +function hitRate(cache: number, total: number): number { + if (cache <= 0 || total <= 0) return 0; + return cache / (cache + total); +} + +function barColor(rate: number): string { + if (rate > 0.5) return 'bg-green-500'; + if (rate > 0.1) return 'bg-yellow-500'; + return 'bg-red-500'; +} + +export function CacheShapeBadge({ cacheTokens, totalTokens }: CacheShapeBadgeProps) { + if (cacheTokens == null || cacheTokens <= 0) return null; + + const rate = hitRate(cacheTokens, totalTokens ?? 0); + const pct = Math.round(rate * 100); + const color = barColor(rate); + + return ( + + + {cacheTokens}c + {totalTokens != null && totalTokens > 0 && ( + {pct}% + )} + + ); +} diff --git a/apps/web/src/components/DiffSnippet.tsx b/apps/web/src/components/DiffSnippet.tsx new file mode 100644 index 0000000..a6e674d --- /dev/null +++ b/apps/web/src/components/DiffSnippet.tsx @@ -0,0 +1,88 @@ +import { useMemo, useState } from 'react'; +import { ChevronDown, ChevronRight, FileCode } from 'lucide-react'; + +interface Props { + diff: string; +} + +const INITIAL_LINES = 10; + +export function DiffSnippet({ diff }: Props) { + const [expanded, setExpanded] = useState(false); + + const lines = useMemo(() => diff.split('\n'), [diff]); + const totalLines = lines.length; + + // Find the first and last content lines (skip leading ---/+++ headers) + const firstContentIdx = lines.findIndex( + (l) => l.startsWith('+') || l.startsWith('-') || l.startsWith(' '), + ); + + // Count content lines that are either +, -, or context lines + const contentLineCount = lines.filter( + (l) => l.startsWith('+') || l.startsWith('-') || l.startsWith(' '), + ).length; + + // Show first N content lines, plus header lines + const displayLines = useMemo(() => { + const sliceEnd = expanded ? lines.length : Math.min(firstContentIdx + INITIAL_LINES + contentLineCount, lines.length); + return lines.slice(0, sliceEnd); + }, [lines, expanded, firstContentIdx, contentLineCount]); + + const hasMore = totalLines > displayLines.length; + + if (totalLines === 0) return null; + + return ( +
+ +
+ {displayLines.map((line, i) => { + // Determine color class based on line prefix + let colorClass = 'text-muted-foreground/60'; + if (line.startsWith('+')) colorClass = 'text-emerald-600 dark:text-emerald-400'; + else if (line.startsWith('-')) colorClass = 'text-red-500 dark:text-red-400'; + else if (line.startsWith('@@')) colorClass = 'text-muted-foreground'; + else if (line.startsWith('---') || line.startsWith('+++')) colorClass = 'text-muted-foreground/50'; + + return ( +
+ {line} +
+ ); + })} + {hasMore && !expanded && ( + + )} +
+
+ ); +} diff --git a/apps/web/src/components/ToolCallLine.tsx b/apps/web/src/components/ToolCallLine.tsx index 2b31641..439f425 100644 --- a/apps/web/src/components/ToolCallLine.tsx +++ b/apps/web/src/components/ToolCallLine.tsx @@ -1,7 +1,9 @@ import { useState } from 'react'; -import { Check, ChevronRight, Loader2, X } from 'lucide-react'; +import { Check, ChevronRight, Loader2, ShieldAlert, X } from 'lucide-react'; import type { ToolCall, ToolResult } from '@/api/types'; import { linkifyPaths } from '@/lib/linkify-paths'; +import { DiffSnippet } from './DiffSnippet'; +import { McpPermissionDialog } from './McpPermissionDialog'; // v1.8.2: cap on the inline arg-summary length. Expanded view shows full // args + full result, so this is purely a single-line render budget. @@ -105,14 +107,18 @@ interface Props { // When rendered inside a ToolCallGroup the line is already nested under a // shared header, so the leading arrow is dropped to avoid double indent. insideGroup?: boolean; + chatId?: string; } -export function ToolCallLine({ run, insideGroup }: Props) { +export function ToolCallLine({ run, insideGroup, chatId }: Props) { const [open, setOpen] = useState(false); + const [approveOpen, setApproveOpen] = useState(false); const status = runStatus(run); const args = run.call.args ?? {}; const summary = formatToolArgs(run.call.name, args); + const needsApproval = run.result?.error?.startsWith('requires approval:') === true; + return (
+ + )} + + ) : ( + {run.result.error} + ) ) : ( linkifyPaths( typeof run.result.output === 'string' @@ -171,6 +197,17 @@ export function ToolCallLine({ run, insideGroup }: Props) { )} )} + {needsApproval && chatId && ( + setApproveOpen(false)} + /> + )} + {run.result?.diff && }
)} diff --git a/apps/web/src/components/TraceViewer.tsx b/apps/web/src/components/TraceViewer.tsx index 75c0c00..30ff65b 100644 --- a/apps/web/src/components/TraceViewer.tsx +++ b/apps/web/src/components/TraceViewer.tsx @@ -2,6 +2,7 @@ import { useCallback, useEffect, useMemo, useState } from 'react'; import { ChevronDown, ChevronRight, AlertCircle } from 'lucide-react'; import { api } from '@/api/client'; import type { ToolTrace } from '@/api/types'; +import { CacheShapeBadge } from '@/components/CacheShapeBadge'; interface Props { chatId: string; @@ -58,11 +59,7 @@ function TraceRow({ trace }: { trace: ToolTrace }) { {trace.tokens_used}t
)} - {trace.cache_tokens != null && trace.cache_tokens > 0 && ( - - c{trace.cache_tokens} - - )} + {trace.reasoning_tokens != null && trace.reasoning_tokens > 0 && ( r{trace.reasoning_tokens} diff --git a/apps/web/src/hooks/useSessionStream.ts b/apps/web/src/hooks/useSessionStream.ts index ebacca9..aea378c 100644 --- a/apps/web/src/hooks/useSessionStream.ts +++ b/apps/web/src/hooks/useSessionStream.ts @@ -79,6 +79,7 @@ function channelDeltaToLegacyFrame(delta: ChannelDeltaWsFrame): WsFrame | null { output: delta.output, truncated: delta.truncated!, ...(delta.error ? { error: delta.error } : {}), + ...(delta.diff ? { diff: delta.diff } : {}), }; case 'error': return { @@ -172,6 +173,7 @@ function applyFrame(state: State, frame: WsFrame): State { finished_at: null, created_at: new Date().toISOString(), metadata: null, + ...(frame.compare_group_id ? { compare_group_id: frame.compare_group_id } : {}), }; return { ...state, messages: [...state.messages, newMsg] }; } @@ -195,21 +197,18 @@ function applyFrame(state: State, frame: WsFrame): State { return { ...state, messages: next }; } case 'tool_result': { + const toolResultsBase = { + tool_call_id: frame.tool_call_id, + output: frame.output, + truncated: frame.truncated, + ...(frame.error ? { error: frame.error } : {}), + ...(frame.diff ? { diff: frame.diff } : {}), + }; const exists = state.messages.some((m) => m.id === frame.tool_message_id); if (exists) { const next = state.messages.map((m) => m.id === frame.tool_message_id - ? { - ...m, - role: 'tool' as const, - tool_results: { - tool_call_id: frame.tool_call_id, - output: frame.output, - truncated: frame.truncated, - ...(frame.error ? { error: frame.error } : {}), - }, - status: 'complete' as const, - } + ? { ...m, role: 'tool' as const, tool_results: toolResultsBase, status: 'complete' as const } : m, ); return { ...state, messages: next }; @@ -222,12 +221,7 @@ function applyFrame(state: State, frame: WsFrame): State { content: '', kind: 'message', tool_calls: null, - tool_results: { - tool_call_id: frame.tool_call_id, - output: frame.output, - truncated: frame.truncated, - ...(frame.error ? { error: frame.error } : {}), - }, + tool_results: toolResultsBase, status: 'complete', last_seq: 0, tokens_used: null, @@ -258,6 +252,7 @@ function applyFrame(state: State, frame: WsFrame): State { ...(frame.finished_at !== undefined ? { finished_at: frame.finished_at } : {}), ...(frame.model !== undefined ? { model: frame.model } : {}), ...(frame.metadata !== undefined ? { metadata: frame.metadata } : {}), + ...(frame.compare_group_id !== undefined ? { compare_group_id: frame.compare_group_id } : {}), } : m, ); @@ -301,6 +296,7 @@ function applyFrame(state: State, frame: WsFrame): State { ...m, status: 'failed' as const, ...(errorMeta ? { metadata: errorMeta } : {}), + ...(frame.compare_group_id !== undefined ? { compare_group_id: frame.compare_group_id } : {}), } : m, ) diff --git a/packages/contracts/src/ws-frames.ts b/packages/contracts/src/ws-frames.ts index a0f4e81..832cabc 100644 --- a/packages/contracts/src/ws-frames.ts +++ b/packages/contracts/src/ws-frames.ts @@ -76,6 +76,8 @@ export const MessageStartedFrame = z.object({ message_id: Uuid, chat_id: Uuid.optional(), role: MessageRoleValue, + // v2.8-compare: groups messages belonging to the same compare operation. + compare_group_id: z.string().uuid().optional(), }); export const DeltaFrame = z.object({ @@ -83,6 +85,7 @@ export const DeltaFrame = z.object({ message_id: Uuid, chat_id: Uuid.optional(), content: z.string(), + compare_group_id: z.string().uuid().optional(), }); export const ReasoningDeltaFrame = z.object({ @@ -107,6 +110,10 @@ export const ToolResultFrame = z.object({ output: z.unknown(), truncated: z.boolean(), error: z.string().optional(), + // v2.8: unified diff for write tools (edit_file, create_file, etc.). + // Published alongside successful tool results so the frontend can render + // a compact diff snippet inline. Absent for read-only tools or failures. + diff: z.string().optional(), }); export const MessageCompleteFrame = z.object({ @@ -132,6 +139,7 @@ export const MessageCompleteFrame = z.object({ // web reducer can render a muted "Stopped" / failed state without a new frame // type. Optional → fail-closed publishFrame must keep, not strip, it. status: z.enum(['complete', 'cancelled', 'failed']).optional(), + compare_group_id: z.string().uuid().optional(), }); export const UsageFrame = z.object({ @@ -168,6 +176,7 @@ export const ErrorFrame = z.object({ chat_id: Uuid.optional(), error: z.string(), reason: ErrorReasonValue.optional(), + compare_group_id: z.string().uuid().optional(), }); // ---- per-user channel frames (sidebar refresh) ----------------------------- @@ -472,6 +481,7 @@ const TextChannelPayload = z.object({ message_id: Uuid, chat_id: Uuid.optional(), content: z.string(), + compare_group_id: z.string().uuid().optional(), }); const ToolCallChannelPayload = z.object({ @@ -487,6 +497,7 @@ const ToolResultChannelPayload = z.object({ output: z.unknown(), truncated: z.boolean(), error: z.string().optional(), + diff: z.string().optional(), }); const StatusChannelPayload = z.object({ @@ -534,6 +545,7 @@ export const ChannelDeltaFrame = z.object({ tool_call_id: ToolCallId.optional(), output: z.unknown().optional(), truncated: z.boolean().optional(), + diff: z.string().optional(), }); // ---- discriminated union ---------------------------------------------------