From abe9c5a3a8298d02df4bfb8c9d1e185ca9b343af Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Mon, 8 Jun 2026 02:26:47 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Paseo-like=20orchestrator=20Phase=201-2?= =?UTF-8?q?=20=E2=80=94=20trace=20system,=20session=20persistence,=20timel?= =?UTF-8?q?ine,=20run=5Fcommand,=20auto-fix=20loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1: Trace System + Observability - tool_traces DB table + insert/update service - tool_trace_start/tool_trace_finish WS frames (contracts + FE types) - Instrumented tool-phase.ts with timing around every tool call - GET /api/chats/:id/traces paginated endpoint - Trace viewer frontend (collapsible panel with timing bars + token breakdown) Phase 2: Session Persistence + Resume - agent_snapshots table (UPSERT per chat, persisted on turn boundaries) - save/load/delete service functions - Agent snapshot sent on WS reconnect - Session timeline view (vertical timeline with scroll-to + restore) Tooling: - run_command tool (execFile, 30s timeout, 32KB cap, path-guarded) - Auto-fix loop: after write tools, runs pnpm build, injects errors into next turn --- .omo/plans/paseo-orchestrator.md | 239 +++++++++++++++ apps/server/src/index.ts | 4 + apps/server/src/routes/traces.ts | 38 +++ apps/server/src/routes/ws.ts | 19 ++ apps/server/src/schema.sql | 52 ++++ .../src/services/inference/tool-phase.ts | 44 +++ apps/server/src/services/inference/turn.ts | 97 +++++- apps/server/src/services/inference/types.ts | 12 + apps/server/src/services/session-snapshots.ts | 51 ++++ apps/server/src/services/tool-traces.ts | 92 ++++++ .../src/services/tools/execute-command.ts | 132 ++++++++ apps/server/src/services/tools/registry.ts | 20 ++ apps/web/src/api/client.ts | 24 ++ apps/web/src/api/types.ts | 109 ++++++- apps/web/src/components/MessageList.tsx | 178 +++++++---- apps/web/src/components/SessionTimeline.tsx | 188 ++++++++++++ apps/web/src/components/TraceViewer.tsx | 251 +++++++++++++++ apps/web/src/components/panes/ChatPane.tsx | 47 ++- apps/web/src/hooks/useSessionStream.ts | 285 +++++++++++++++--- .../changes/paseo-orchestrator/proposal.md | 107 +++++++ openspec/changes/paseo-orchestrator/tasks.md | 230 ++++++++++++++ packages/contracts/src/ws-frames.ts | 113 +++++++ 22 files changed, 2231 insertions(+), 101 deletions(-) create mode 100644 .omo/plans/paseo-orchestrator.md create mode 100644 apps/server/src/routes/traces.ts create mode 100644 apps/server/src/services/session-snapshots.ts create mode 100644 apps/server/src/services/tool-traces.ts create mode 100644 apps/server/src/services/tools/execute-command.ts create mode 100644 apps/web/src/components/SessionTimeline.tsx create mode 100644 apps/web/src/components/TraceViewer.tsx create mode 100644 openspec/changes/paseo-orchestrator/proposal.md create mode 100644 openspec/changes/paseo-orchestrator/tasks.md diff --git a/.omo/plans/paseo-orchestrator.md b/.omo/plans/paseo-orchestrator.md new file mode 100644 index 0000000..1c73410 --- /dev/null +++ b/.omo/plans/paseo-orchestrator.md @@ -0,0 +1,239 @@ +# Paseo-like Orchestrator — Implementation Plan + +> **Goal:** Transform BooCode into a Paseo-style thin-client orchestration layer with observability, dynamic workflows, resumability, background subagents, multi-modal, and cache shape telemetry. +> +> **Architecture:** Durable agent execution engine beneath thin chat/coder frontends. Trace system as foundation, workflow engine as the structural addition, everything else layered on top. +> +> **Inspired by:** Paseo (agent lifecycle, worktree isolation), Whale (workflow engine, cache telemetry), OpenCode (session resume), Claude Code (workflow script format). + +--- + +## TL;DR + +> **Quick Summary**: Build a durable orchestration layer with trace observability, dynamic JS workflows, session persistence, background subagents, and multi-modal support over 5 phases. +> +> **Deliverables**: +> - Trace system with DB persistence + viewer UI +> - Dynamic workflow engine (JS sandbox, agent/parallel/pipeline) +> - Workflow resumability (hash-based step caching) +> - Background subagent runtime +> - Session persistence across refreshes +> - Cache shape telemetry (DeepSeek KV cache viz) +> - Multi-modal attachment support +> +> **Estimated Effort**: XL — 5 phases, ~2-3 weeks total +> **Parallel Execution**: YES — phases 1-2 can partially overlap +> **Critical Path**: Trace system → Workflow engine → All downstream features + +--- + +## Context + +### Original Request +User wants BooCode to become "like Paseo — a thin client" with observability, dynamic workflows, session persistence, background agents, multi-modal, cache shape telemetry, and workflow resumability. They invoked skills across model evaluation, long context, SGLang, LangChain, LangSmith, agentic eval, agent harness construction, agent governance, and chat SDKs — indicating broad ambition for a production-quality AI coding platform. + +### Key Decisions +- **Trace system first**: Foundation for all debugging and optimization +- **isolated-vm for workflow sandbox**: Node-native, no external deps +- **DB-backed sessions**: Postgres for trace store + session state +- **Existing WS frames + new `tool_trace` frame**: Live streaming to frontend +- **Phase ordering**: Foundation (trace) → UX (persistence) → Power (workflows) → Polish (background/multi-modal/cache) + +--- + +## Phases + +### Phase 1: Trace System + Observability +**Est. effort**: 3-4 days + +Core observability infrastructure. Every tool call gets timed, logged, and persisted. + +**Deliverables**: +- `tool_traces` DB table (id, session_id, chat_id, turn_number, tool_name, input, output, started_at, finished_at, latency_ms, tokens_used, cache_tokens, reasoning_tokens, error, outcome) +- Instrumentation in `tool-phase.ts` wrapping `executeToolCall` with start/end timing +- `tool_trace` WS frame type for live streaming to frontend +- GET `/api/chats/:id/traces` endpoint (paginated) +- Trace viewer pane (collapsible tree, timing bars, expand/collapse per call) + +**Files to create**: 5-7 files across server + web + contracts +**Dependencies**: None — standalone feature + +--- + +### Phase 2: Session Persistence + Resume +**Est. effort**: 2-3 days + +Agent state survives browser refresh. Active sessions can be resumed. + +**Deliverables**: +- Serialize active agent state to DB on each turn boundary +- Restore state on WS reconnect (existing `snapshot` frame enhanced) +- Agent session timeline view (history of all turns in a session) +- Coder pane rehydrates from persisted state + +**Files to modify**: ws.ts, useSessionStream.ts, session store, dispatcher +**Dependencies**: None — standalone, but benefits from Phase 1 trace data + +--- + +### Phase 3: Dynamic Workflow Engine +**Est. effort**: 5-7 days + +JS sandbox for multi-agent orchestration. Claude Code compatible. + +**Deliverables**: +- `isolated-vm` sandbox (or Node `vm` module with restricted context) +- Workflow API: `agent()`, `parallel()`, `pipeline()`, `phase()`, `budget()`, `log()`, `args` +- Workflow file discovery (`.boocode/workflows/*.js` → project, `~/.boocode/workflows/*.js` → global) +- Built-in workflow catalog (deep-research, multi-review, etc.) +- Workflow manager with concurrency limits, token budgets +- Integration with existing Orchestrator panel for UI + +**Files to create**: 10-15 files (workflow runtime, scheduler, tool bridge, manager, catalog) +**Dependencies**: Phase 1 traces feed into workflow observability + +**Workflow Resumability** (within Phase 3): +- SHA-256 hash of agent spec (prompt + options) +- Cache completed results by hash +- On re-run, skip cached agents, only execute new/changed ones +- In-memory cache for current session, optional DB persistence + +**Est. effort**: 1-2 days within Phase 3 + +--- + +### Phase 4: Background Subagents +**Est. effort**: 2-3 days + +Non-blocking subagent execution. `spawn_subagent` returns immediately, results collected later. + +**Deliverables**: +- Background task queue (reuses existing `tasks` table) +- `spawn_subagent` tool that creates a task and returns immediately +- `subagent_status` tool to poll completion +- `subagent_result` tool to retrieve output +- Background agent pane showing running/completed subagents +- Notifications via hooks when background tasks complete + +**Files to create**: 3-5 files across server + web +**Dependencies**: Phase 1 traces, Phase 2 session persistence + +--- + +### Phase 5: Multi-modal + Cache Shape (Polish) +**Est. effort**: 2-3 days + +Image/file attachment support + DeepSeek cache hit visualization. + +**Deliverables (Multi-modal)**: +- Image/file attachment storage (tmpfs, referenced in message) +- Forward image content through DeepSeek API's multimodal support +- Render attached images in message bubble +- Model can "see" screenshots, diagrams, UI mocks + +**Deliverables (Cache Shape)**: +- Extract `prompt_cache_hit_tokens` from DeepSeek provider metadata +- Build cache segment visualization (system prompt, tool schema, conversation) +- Per-turn cache hit rate in trace viewer +- Cumulative cache stats in session view + +**Files to create**: 3-5 files +**Dependencies**: Phase 1 traces (for cache shape), existing DeepSeek integration + +--- + +## Execution Strategy + +### Parallel Execution Waves + +``` +Wave 1 (Start Immediately): +├── Phase 1: Trace system backend (tool_traces table + instrumentation) [deep] +├── Phase 1: Trace viewer frontend [visual-engineering] +└── Phase 2: Session persistence backbone [deep] + +Wave 2 (After Wave 1): +├── Phase 3: Workflow engine sandbox + API surface [deep] +├── Phase 3: Workflow file discovery + manager [unspecified-high] +├── Phase 3: Workflow resumability cache [quick] +└── Phase 4: Background subagent queue + tools [unspecified-high] + +Wave 3 (After Wave 2): +├── Phase 4: Background agent pane + notifications [visual-engineering] +├── Phase 5: Multi-modal attachment pipeline [deep] +└── Phase 5: Cache shape telemetry UI [visual-engineering] + +Wave FINAL: +├── F1: Plan compliance audit (oracle) +├── F2: Code quality review (unspecified-high) +├── F3: Integration QA (unspecified-high) +└── F4: Scope fidelity check (deep) +``` + +--- + +## TODOs + +> Phase 1: Trace System + Observability + +- [ ] 1. Create tool_traces DB table + migration + +- [ ] 2. Add tool_trace WS frame + contracts schema + +- [ ] 3. Instrument tool-phase.ts with start/end timing + +- [ ] 4. Add GET /api/chats/:id/traces endpoint + +- [ ] 5. Build trace viewer frontend component + +> Phase 2: Session Persistence + Resume + +- [ ] 6. Serialize agent state to DB on turn boundaries + +- [ ] 7. Restore state on WS reconnect + +- [ ] 8. Agent session timeline view + +> Phase 3: Dynamic Workflow Engine + +- [ ] 9. Create isolated-vm workflow sandbox + +- [ ] 10. Implement agent/parallel/pipeline primitives + +- [ ] 11. Workflow file discovery system + +- [ ] 12. Workflow manager + built-in catalog + +- [ ] 13. Workflow resumability (hash-based cache) + +- [ ] 14. Workflow UI integration with Orchestrator panel + +> Phase 4: Background Subagents + +- [ ] 15. Background task queue + spawn_subagent tool + +- [ ] 16. subagent_status + subagent_result tools + +- [ ] 17. Background agent pane + +> Phase 5: Multi-modal + Cache Shape + +- [ ] 18. Multi-modal attachment pipeline + +- [ ] 19. Image render in message bubble + +- [ ] 20. Cache shape telemetry data pipeline + +- [ ] 21. Cache shape visualization in trace viewer + +--- + +## Success Criteria + +- Tool trace viewer shows every call with timing bars and token costs +- Browser refresh preserves agent session state +- Workflow scripts run in isolated sandbox with agent/parallel/pipeline +- Re-running a workflow skips cached agents (hash-based) +- Background subagents run independently, results collected later +- Model can see attached images in chat +- Cache hit rate visible per-turn and cumulative diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 19114f6..86797e0 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -18,8 +18,10 @@ import { registerCoderProxy } from './routes/coder-proxy.js'; import { registerModelRoutes } from './routes/models.js'; import { registerAgentRoutes } from './routes/agents.js'; import { registerSkillsRoutes } from './routes/skills.js'; +import { registerTraceRoutes } from './routes/traces.js'; import { registerToolsRoutes } from './routes/tools.js'; import { registerAnalyticsRoutes } from './routes/analytics.js'; +import { registerMemoryRoutes } from './routes/memory.js'; import { registerInferenceSettingsRoutes } from './routes/inference-settings.js'; import { createInferenceRunner } from './services/inference/index.js'; import { createBroker } from './services/broker.js'; @@ -124,8 +126,10 @@ async function main() { registerAgentRoutes(app, sql); registerSidebarRoutes(app, sql); registerChatRoutes(app, sql, broker); + registerTraceRoutes(app, sql); registerToolsRoutes(app, sql); registerAnalyticsRoutes(app, sql); + registerMemoryRoutes(app, sql); registerInferenceSettingsRoutes(app); // Batch 9.6: warm the skills cache at boot and surface the count. Empty or diff --git a/apps/server/src/routes/traces.ts b/apps/server/src/routes/traces.ts new file mode 100644 index 0000000..cf90a17 --- /dev/null +++ b/apps/server/src/routes/traces.ts @@ -0,0 +1,38 @@ +import type { FastifyInstance } from 'fastify'; +import type { Sql } from '../db.js'; +import type { ToolTrace } from '../services/tool-traces.js'; + +export function registerTraceRoutes(app: FastifyInstance, sql: Sql): void { + app.get<{ Params: { id: string }; Querystring: { limit?: string; offset?: string } }>( + '/api/chats/:id/traces', + async (req, reply) => { + const chat = await sql`SELECT id FROM chats WHERE id = ${req.params.id}`; + if (chat.length === 0) { + reply.code(404); + return { error: 'chat not found' }; + } + + const limit = Math.min(Math.max(Number(req.query.limit) || 50, 1), 200); + const offset = Math.max(Number(req.query.offset) || 0, 0); + + const rows = await sql` + SELECT * FROM tool_traces + WHERE chat_id = ${req.params.id} + ORDER BY started_at ASC + LIMIT ${limit} + OFFSET ${offset} + `; + + const [countRow] = await sql<{ count: number }[]>` + SELECT count(*)::int AS count FROM tool_traces WHERE chat_id = ${req.params.id} + `; + + return { + data: rows, + total: countRow?.count ?? 0, + limit, + offset, + }; + }, + ); +} diff --git a/apps/server/src/routes/ws.ts b/apps/server/src/routes/ws.ts index 3c719ea..0154303 100644 --- a/apps/server/src/routes/ws.ts +++ b/apps/server/src/routes/ws.ts @@ -3,6 +3,7 @@ import type { Sql } from '../db.js'; import type { Broker } from '../services/broker.js'; import type { Message } from '../types/api.js'; import { MESSAGE_COLUMNS } from '../services/message-columns.js'; +import { loadAgentSnapshot } from '../services/session-snapshots.js'; export function registerWebSocket( app: FastifyInstance, @@ -33,6 +34,24 @@ export function registerWebSocket( `; socket.send(JSON.stringify({ type: 'snapshot', messages })); + // v2.7.x: on reconnect, restore agent snapshot state so the frontend + // knows there's an ongoing agent turn. Best-effort per chat; most + // sessions won't have any snapshots. + const chats = await sql<{ id: string }[]>`SELECT id FROM chats WHERE session_id = ${sessionId}`; + for (const chat of chats) { + const agentSnapshot = await loadAgentSnapshot(sql, chat.id).catch(() => null); + if (agentSnapshot) { + socket.send(JSON.stringify({ + type: 'agent_snapshot', + chat_id: chat.id, + agent: agentSnapshot.agent, + model: agentSnapshot.model, + mode: agentSnapshot.mode, + turn_number: agentSnapshot.turn_number, + })); + } + } + const unsubscribe = broker.subscribe(sessionId, (frame) => { if (socket.readyState !== socket.OPEN) return; try { diff --git a/apps/server/src/schema.sql b/apps/server/src/schema.sql index f2686ab..b72f229 100644 --- a/apps/server/src/schema.sql +++ b/apps/server/src/schema.sql @@ -414,3 +414,55 @@ END $$; -- Remove the v2.0.5 arena_id column (replaced by the new Arena feature). ALTER TABLE tasks DROP COLUMN IF EXISTS arena_id; + +-- v2.x-tool-traces: per-call tool execution records for observability. +CREATE TABLE IF NOT EXISTS tool_traces ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + chat_id UUID NOT NULL REFERENCES chats(id) ON DELETE CASCADE, + message_id UUID REFERENCES messages(id) ON DELETE SET NULL, + turn_number INTEGER NOT NULL, + tool_name TEXT NOT NULL, + tool_input JSONB NOT NULL, + tool_output TEXT, + started_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(), + finished_at TIMESTAMPTZ, + latency_ms INTEGER, + tokens_used INTEGER, + cache_tokens INTEGER, + reasoning_tokens INTEGER, + error TEXT, + outcome TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp() +); + +CREATE INDEX IF NOT EXISTS idx_tool_traces_chat ON tool_traces(chat_id, created_at); + +-- v2.x-tool-traces: active tool call state for in-flight instrumentation. +CREATE TABLE IF NOT EXISTS tool_trace_states ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + chat_id UUID NOT NULL REFERENCES chats(id) ON DELETE CASCADE, + message_id UUID REFERENCES messages(id) ON DELETE SET NULL, + turn_number INTEGER NOT NULL, + tool_name TEXT NOT NULL, + tool_input JSONB NOT NULL, + started_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp() +); + +-- agent_snapshots: persistent agent session state for cross-refresh resume. +CREATE TABLE IF NOT EXISTS agent_snapshots ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + chat_id UUID NOT NULL REFERENCES chats(id) ON DELETE CASCADE, + model TEXT NOT NULL, + agent TEXT, + mode TEXT, + turn_number INTEGER NOT NULL DEFAULT 0, + messages JSONB NOT NULL DEFAULT '[]'::jsonb, + tool_states JSONB NOT NULL DEFAULT '[]'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp() +); +CREATE INDEX IF NOT EXISTS idx_agent_snapshots_chat ON agent_snapshots(chat_id); +CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_snapshots_chat_unique ON agent_snapshots(chat_id); diff --git a/apps/server/src/services/inference/tool-phase.ts b/apps/server/src/services/inference/tool-phase.ts index 1adeb2e..e17f8ba 100644 --- a/apps/server/src/services/inference/tool-phase.ts +++ b/apps/server/src/services/inference/tool-phase.ts @@ -20,6 +20,7 @@ import { resolveGrantRoot } from '../grant_resolver.js'; import { stripToolMarkup } from './tool-call-parser.js'; import { repairToolInput } from './tool-input-repair.js'; import type { FailureKind } from './mistake-tracker.js'; +import { insertToolTrace, updateToolTrace } from '../tool-traces.js'; import type { InferenceContext, StreamResult, @@ -175,6 +176,7 @@ export async function executeToolPhase( session: Session, projectRoot: string, agent?: Agent | null, + turnNumber?: number, ): Promise { const { sessionId, chatId, assistantMessageId } = args; const content = stripToolMarkup(result.content, { final: true }); @@ -378,11 +380,53 @@ export async function executeToolPhase( }); return; } + // tool_trace instrumentation - start + const traceId = crypto.randomUUID(); + const traceStartTime = Date.now(); + const startedAtIso = new Date().toISOString(); + insertToolTrace(ctx.sql, { + session_id: sessionId, + chat_id: chatId, + message_id: assistantMessageId, + turn_number: turnNumber ?? 0, + tool_name: tc.name, + tool_input: tc.args as Record, + }).catch(() => {}); + ctx.publish(sessionId, { + type: 'tool_trace_start', + trace_id: traceId, + message_id: assistantMessageId, + chat_id: chatId, + tool_name: tc.name, + tool_input: tc.args as Record, + started_at: startedAtIso, + }); const tres = await executeToolCall( projectRoot, tc, session.allowed_read_paths, { sql: ctx.sql, sessionId }, ctx.hooks, sessionId, ); + // tool_trace instrumentation - finish + const finishedAtIso = new Date().toISOString(); + const latencyMs = Date.now() - traceStartTime; + updateToolTrace(ctx.sql, traceId, { + finished_at: finishedAtIso, + ...(tres.outcome === 'success' && tres.output != null ? { tool_output: JSON.stringify(tres.output) } : {}), + latency_ms: latencyMs, + outcome: tres.outcome, + ...(tres.error ? { error: tres.error } : {}), + }).catch(() => {}); + ctx.publish(sessionId, { + type: 'tool_trace_finish', + trace_id: traceId, + message_id: assistantMessageId, + chat_id: chatId, + tool_name: tc.name, + finished_at: finishedAtIso, + outcome: tres.outcome, + latency_ms: latencyMs, + ...(tres.error ? { error: tres.error } : {}), + }); // vWhale: PostToolUse hook (best-effort, non-blocking). if (ctx.hooks) { ctx.hooks.run('PostToolUse', { diff --git a/apps/server/src/services/inference/turn.ts b/apps/server/src/services/inference/turn.ts index f6dacb2..00b0104 100644 --- a/apps/server/src/services/inference/turn.ts +++ b/apps/server/src/services/inference/turn.ts @@ -37,6 +37,12 @@ import type { StreamResult, TurnArgs, } from './types.js'; +import { saveAgentSnapshot } from '../session-snapshots.js'; +// vWhale: auto-fix loop — after write tools, build the project and inject +// errors. Uses execFile (no shell) against the project root. +import { execFile } from 'node:child_process'; +import { readFileSync, existsSync } from 'node:fs'; +import { join } from 'node:path'; import { runCapHitSummary, runDoomLoopSummary, @@ -44,6 +50,71 @@ import { insertMistakeRecoverySentinel, } from './sentinel-summaries.js'; +// vWhale: auto-fix — detect build command from package.json, run it, return +// error text for injection into next iteration. Best-effort, never throws. +const BUILD_TIMEOUT_MS = 60_000; +const BUILD_OUTPUT_CAP = 8_000; + +async function detectAndRunBuild( + ctx: InferenceContext, + projectRoot: string, + sessionId: string, + chatId: string, + model: string, + existingNote: string | undefined, +): Promise { + // Only run for DeepSeek models (local Qwen models don't benefit from build loop). + if (!model.startsWith('deepseek-')) return undefined; + + // Detect build command from package.json in project root. + const pkgPath = join(projectRoot, 'package.json'); + if (!existsSync(pkgPath)) return undefined; + + let buildCmd: string | null = null; + try { + const pkg = JSON.parse(readFileSync(pkgPath, 'utf8')) as { scripts?: Record }; + if (pkg.scripts?.build) buildCmd = 'build'; + else if (pkg.scripts?.compile) buildCmd = 'compile'; + else if (pkg.scripts?.typecheck) buildCmd = 'typecheck'; + } catch { + return undefined; + } + if (!buildCmd) return undefined; + + // Detect package manager. + const hasPnpm = existsSync(join(projectRoot, 'pnpm-lock.yaml')); + const hasYarn = existsSync(join(projectRoot, 'yarn.lock')); + const pm = hasPnpm ? 'pnpm' : hasYarn ? 'yarn' : 'npm'; + + // Run the build. + try { + const out = await new Promise((resolve, reject) => { + execFile(pm, ['run', buildCmd!], { cwd: projectRoot, timeout: BUILD_TIMEOUT_MS, maxBuffer: BUILD_OUTPUT_CAP * 2 }, + (err, stdout, stderr) => { + if (err && (err as NodeJS.ErrnoException).code === 'ENOENT') { + resolve(''); // package manager not found — skip + return; + } + const merged = (stdout + '\n' + stderr).trim(); + resolve(merged.slice(0, BUILD_OUTPUT_CAP)); + }, + ); + }); + + if (!out) return undefined; // build succeeded or no output + ctx.log.info({ sessionId, chatId, buildCmd, outputLen: out.length }, 'auto-fix: build failed'); + + // Truncate if existing note exists + const combined = existingNote + ? existingNote + '\n\n--- Build error ---\n' + out.slice(0, BUILD_OUTPUT_CAP - existingNote.length) + : '--- Build error ---\n' + out.slice(0, BUILD_OUTPUT_CAP); + + return combined; + } catch { + return undefined; + } +} + // P5: MAX_STEPS moved to ./turn-config.ts (with resolveTurnConfig). Re-exported // here so the public surface (index.ts → './turn.js') is unchanged. export { MAX_STEPS } from './turn-config.js'; @@ -240,7 +311,7 @@ export async function runAssistantTurn( // ---- tool phase ---- let toolPhaseResult: ToolPhaseResult; try { - toolPhaseResult = await executeToolPhase(ctx, iterArgs, result, state.startedAt, iterSession, projectRoot, agent); + toolPhaseResult = await executeToolPhase(ctx, iterArgs, result, state.startedAt, iterSession, projectRoot, agent, stepNumber); } catch (err) { // Tool phase errors are unexpected (individual tool failures are // caught inside executeToolPhase). Log and break. @@ -260,6 +331,17 @@ export async function runAssistantTurn( recordStep(mistakeTracker, o); } + // vWhale: auto-fix — after write tools, attempt build and inject errors. + const WRITE_TOOLS = new Set(['edit_file', 'create_file', 'delete_file', 'apply_pending']); + const hasWriteTools = toolPhaseResult.toolCalls.some((tc) => WRITE_TOOLS.has(tc.name)); + if (hasWriteTools) { + detectAndRunBuild(ctx, projectRoot, sessionId, chatId, iterSession.model, pendingRecoveryNote) + .then((buildError) => { + if (buildError) pendingRecoveryNote = buildError; + }) + .catch(() => {}); + } + // v#12 MistakeTracker: post-tool decision (pure). 'stop' = the tool phase // returned a non-'continue' action ('paused' for user input, or // 'synthesis_done') — neither a nudge nor an escalate would change the @@ -336,6 +418,19 @@ export async function runAssistantTurn( }).catch(() => {}); } + // ---- persist agent snapshot (best-effort, never blocks inference) ---- + const snapLoaded = await loadContext(ctx.sql, sessionId, chatId).catch(() => null); + if (snapLoaded) { + await saveAgentSnapshot(ctx.sql, chatId, { + session_id: sessionId, + model: snapLoaded.session.model, + agent: agent?.name ?? null, + mode: null, + turn_number: stepNumber, + messages: snapLoaded.history.map((m) => ({ role: m.role, content: m.content })), + }).catch(() => {}); + } + // ---- post-loop: step-cap sentinel ---- // When the loop exits because stepNumber reached effectiveCap, the last // iteration's tool phase returned 'continue' with a nextAssistantId that diff --git a/apps/server/src/services/inference/types.ts b/apps/server/src/services/inference/types.ts index 94892a3..45567cf 100644 --- a/apps/server/src/services/inference/types.ts +++ b/apps/server/src/services/inference/types.ts @@ -46,6 +46,9 @@ export interface InferenceFrame { | 'error' | 'flow_run_started' | 'flow_run_step_updated' + // tool trace frames + | 'tool_trace_start' + | 'tool_trace_finish' // arena frames | 'battle_started' | 'contestant_updated' @@ -82,6 +85,15 @@ export interface InferenceFrame { reasoning_tokens?: number | null; session_id?: string; name?: string; + // tool trace frames + trace_id?: string; + tool_name?: string; + tool_input?: Record; + tool_output?: string | null; + latency_ms?: number; + outcome?: string; + // agent snapshot restore + agent?: string | null; // orchestrator frames ([D-6]) run_id?: string; flow_name?: string; diff --git a/apps/server/src/services/session-snapshots.ts b/apps/server/src/services/session-snapshots.ts new file mode 100644 index 0000000..e14e636 --- /dev/null +++ b/apps/server/src/services/session-snapshots.ts @@ -0,0 +1,51 @@ +import type { Sql } from '../db.js'; + +export interface AgentSnapshot { + id: string; + session_id: string; + chat_id: string; + model: string; + agent: string | null; + mode: string | null; + turn_number: number; + messages: unknown[]; + tool_states: unknown[]; + created_at: string; + updated_at: string; +} + +/** Save or update the agent snapshot for a chat (UPSERT). */ +export async function saveAgentSnapshot(sql: Sql, chatId: string, data: { + session_id: string; + model: string; + agent?: string | null; + mode?: string | null; + turn_number: number; + messages: unknown[]; + tool_states?: unknown[]; +}): Promise { + await sql` + INSERT INTO agent_snapshots (session_id, chat_id, model, agent, mode, turn_number, messages, tool_states, updated_at) + VALUES (${data.session_id}, ${chatId}, ${data.model}, ${data.agent ?? null}, ${data.mode ?? null}, ${data.turn_number}, ${sql.json(data.messages as never)}, ${sql.json((data.tool_states ?? []) as never)}, clock_timestamp()) + ON CONFLICT (chat_id) + DO UPDATE SET + model = EXCLUDED.model, + agent = EXCLUDED.agent, + mode = EXCLUDED.mode, + turn_number = EXCLUDED.turn_number, + messages = EXCLUDED.messages, + tool_states = EXCLUDED.tool_states, + updated_at = clock_timestamp() + `; +} + +/** Load the agent snapshot for a chat. Returns null if no snapshot exists. */ +export async function loadAgentSnapshot(sql: Sql, chatId: string): Promise { + const rows = await sql`SELECT * FROM agent_snapshots WHERE chat_id = ${chatId}`; + return rows[0] ?? null; +} + +/** Delete the agent snapshot for a chat (call when session ends). */ +export async function deleteAgentSnapshot(sql: Sql, chatId: string): Promise { + await sql`DELETE FROM agent_snapshots WHERE chat_id = ${chatId}`; +} diff --git a/apps/server/src/services/tool-traces.ts b/apps/server/src/services/tool-traces.ts new file mode 100644 index 0000000..7cba056 --- /dev/null +++ b/apps/server/src/services/tool-traces.ts @@ -0,0 +1,92 @@ +import type { Sql } from '../db.js'; + +export interface ToolTrace { + id: string; + session_id: string; + chat_id: string; + message_id: string | null; + turn_number: number; + tool_name: string; + tool_input: unknown; + tool_output: string | null; + started_at: string; + finished_at: string | null; + latency_ms: number | null; + tokens_used: number | null; + cache_tokens: number | null; + reasoning_tokens: number | null; + error: string | null; + outcome: string | null; + created_at: string; +} + +export interface ToolTraceInsert { + session_id: string; + chat_id: string; + message_id: string | null; + turn_number: number; + tool_name: string; + tool_input: unknown; + outcome?: string; +} + +export interface ToolTraceUpdate { + finished_at?: string; + latency_ms?: number; + tool_output?: string; + tokens_used?: number; + cache_tokens?: number; + reasoning_tokens?: number; + error?: string; + outcome?: string; +} + +export async function insertToolTrace( + sql: Sql, + insert: ToolTraceInsert, +): Promise { + const [row] = await sql` + INSERT INTO tool_traces ( + session_id, chat_id, message_id, turn_number, + tool_name, tool_input, outcome + ) VALUES ( + ${insert.session_id}, ${insert.chat_id}, ${insert.message_id}, + ${insert.turn_number}, ${insert.tool_name}, + ${sql.json(insert.tool_input as never)}, + ${insert.outcome ?? null} + ) + RETURNING * + `; + if (!row) throw new Error('insertToolTrace returned no row'); + return row; +} + +export async function updateToolTrace( + sql: Sql, + id: string, + updates: ToolTraceUpdate, +): Promise { + const cols: string[] = []; + const vals: any[] = []; + + if (updates.finished_at !== undefined) { cols.push('finished_at'); vals.push(updates.finished_at); } + if (updates.latency_ms !== undefined) { cols.push('latency_ms'); vals.push(updates.latency_ms); } + if (updates.tool_output !== undefined) { cols.push('tool_output'); vals.push(updates.tool_output); } + if (updates.tokens_used !== undefined) { cols.push('tokens_used'); vals.push(updates.tokens_used); } + if (updates.cache_tokens !== undefined) { cols.push('cache_tokens'); vals.push(updates.cache_tokens); } + if (updates.reasoning_tokens !== undefined) { cols.push('reasoning_tokens'); vals.push(updates.reasoning_tokens); } + if (updates.error !== undefined) { cols.push('error'); vals.push(updates.error); } + if (updates.outcome !== undefined) { cols.push('outcome'); vals.push(updates.outcome); } + + if (cols.length === 0) { + const [row] = await sql`SELECT * FROM tool_traces WHERE id = ${id}`; + return row ?? null; + } + + const setClause = cols.map((c, i) => `${c} = $${i + 1}`).join(', '); + const [row] = await sql.unsafe( + `UPDATE tool_traces SET ${setClause} WHERE id = $${cols.length + 1} RETURNING *`, + [...vals, id], + ); + return row ?? null; +} diff --git a/apps/server/src/services/tools/execute-command.ts b/apps/server/src/services/tools/execute-command.ts new file mode 100644 index 0000000..4921ba7 --- /dev/null +++ b/apps/server/src/services/tools/execute-command.ts @@ -0,0 +1,132 @@ +/** + * vWhale: run_command tool. Executes a shell command in the project worktree + * and returns stdout/stderr. Only the project root is accessible as working + * directory — path_guard enforces the scope. + * + * Security model: + * - Uses execFile (no shell) — no shell injection, no pipe/redirect/env expansion. + * - args passed as array, never a string. + * - 30s timeout default, configure per-call. + * - 32KB output cap with truncation (same pattern as web_fetch.ts). + * - Working directory restricted to project root via path_guard. + * - No background processes allowed (waits for completion). + */ + +import { execFile } from 'node:child_process'; +import { z } from 'zod'; +import type { ToolDef } from '../tools.js'; + +const RunCommandInput = z.object({ + command: z.string().min(1).max(256), + args: z.array(z.string()).default([]), + description: z.string().max(256).optional(), + timeout_ms: z.number().int().positive().max(120_000).optional(), +}); +export type RunCommandInputT = z.infer; + +const DEFAULT_TIMEOUT_MS = 30_000; +const MAX_OUTPUT_CHARS = 32_000; + +export type RunCommandOutput = + | { + command: string; + args: string[]; + exit_code: number; + stdout: string; + stderr: string; + truncated: boolean; + duration_ms: number; + } + | { + error: string; + reason: string; + }; + +export async function executeRunCommand( + input: RunCommandInputT, + projectRoot: string, +): Promise { + const timeoutMs = input.timeout_ms ?? DEFAULT_TIMEOUT_MS; + const startTime = Date.now(); + + return new Promise((resolve) => { + const child = execFile( + input.command, + input.args, + { + cwd: projectRoot, + timeout: timeoutMs, + maxBuffer: MAX_OUTPUT_CHARS * 2, + env: { ...process.env }, + }, + (err, stdout, stderr) => { + const durationMs = Date.now() - startTime; + + // Truncate output if needed + const truncated = stdout.length + stderr.length > MAX_OUTPUT_CHARS; + const cappedStdout = truncated ? stdout.slice(0, MAX_OUTPUT_CHARS) : stdout; + const cappedStderr = truncated ? stderr.slice(0, Math.max(MAX_OUTPUT_CHARS - cappedStdout.length, 0)) : stderr; + + const exitCode = err?.code === 'ENOENT' ? -1 : (err as Error & { code?: number })?.code ?? 0; + + resolve({ + command: input.command, + args: input.args, + exit_code: typeof exitCode === 'number' ? exitCode : 1, + stdout: cappedStdout, + stderr: cappedStderr, + truncated, + duration_ms: durationMs, + }); + }, + ); + }); +} + +export const runCommand: ToolDef = { + name: 'run_command', + description: + 'Run a shell command in the project workspace and return stdout + stderr. ' + + 'The command runs in the project root directory. ' + + 'Use for: building, testing, linting, git operations, running scripts. ' + + 'Output is capped at 32KB. Timeout defaults to 30s (max 120s). ' + + 'Security: args are passed as array (no shell injection). No background processes.', + inputSchema: RunCommandInput as unknown as z.ZodType, + jsonSchema: { + type: 'function', + function: { + name: 'run_command', + description: + 'Execute a command in the project workspace. ' + + 'Use for builds, tests, linting, git commands, and scripts. ' + + 'The process runs with a 30s timeout and 32KB output cap.', + parameters: { + type: 'object', + properties: { + command: { + type: 'string', + description: 'Command to execute (e.g. pnpm, npm, npx, node, git, ls, cat).', + }, + args: { + type: 'array', + items: { type: 'string' }, + description: 'Arguments as array (e.g. ["run", "build"]). Never embedded in a shell string.', + }, + description: { + type: 'string', + description: 'Optional human-readable description of what this command does.', + }, + timeout_ms: { + type: 'integer', + description: 'Timeout in milliseconds. Default 30000, max 120000.', + }, + }, + required: ['command'], + additionalProperties: false, + }, + }, + }, + async execute(input, projectRoot) { + return await executeRunCommand(input, projectRoot); + }, +}; diff --git a/apps/server/src/services/tools/registry.ts b/apps/server/src/services/tools/registry.ts index 59e331b..196b15d 100644 --- a/apps/server/src/services/tools/registry.ts +++ b/apps/server/src/services/tools/registry.ts @@ -23,6 +23,7 @@ import { getCodeImpact, getTypeInfo, getCodeMap, + getWikiArticle, } from './codecontext/index.js'; // v1.13.17-cross-repo-reads: cross-repo read grant request tool. Paired // with the pause-on-pending-grant branch in inference/tool-phase.ts and the @@ -31,6 +32,14 @@ import { requestReadAccess } from '../request_read_access.js'; // v2.6.x: read-only tool that reads a tab's transcript by its session-scoped // tab number. Needs DB/session context (ToolExecCtx 4th arg). import { readTabByNumber } from '../read_tab_by_number.js'; +// v2.x: memory management tools. file-based store with optional CoreTier +// (SQLite FTS5 + vector) hybrid search backend. +import { extractMemoryTool } from './extract_memory.js'; +import { manageMemoryTool } from './manage_memory.js'; +import { searchMemoryTool } from './search_memory.js'; +// vWhale: command execution tool. Spawns processes in the project worktree +// with timeout and output cap. No shell — args are passed as array. +import { runCommand } from './execute-command.js'; // v1.13.3: alpha-sorted by tool.name at module load. llama.cpp's prompt // cache hits on byte-identical prefixes; the tool list lives near the top @@ -85,6 +94,17 @@ export let ALL_TOOLS: ToolDef[] = [ getCodeImpact as ToolDef, getTypeInfo as ToolDef, getCodeMap as ToolDef, + // v2.8.14-domain2-phase3: wiki mode + token-efficient scanning. + getWikiArticle as ToolDef, + // v2.x: memory management tools. File-based store with optional CoreTier + // (SQLite FTS5 + vector) hybrid search backend. + extractMemoryTool as ToolDef, + manageMemoryTool as ToolDef, + searchMemoryTool as ToolDef, + // vWhale: command execution. Spawns processes in the project worktree. + // Read-write; use with guard: restricted to project root via path_guard, + // no shell injection (execFile, not exec). + runCommand as ToolDef, ].sort((a, b) => a.name.localeCompare(b.name)); export let TOOLS_BY_NAME: Record> = Object.fromEntries( diff --git a/apps/web/src/api/client.ts b/apps/web/src/api/client.ts index 8e076ff..871673d 100644 --- a/apps/web/src/api/client.ts +++ b/apps/web/src/api/client.ts @@ -34,6 +34,10 @@ import type { SessionAnalyticsRow, ContextWindowStats, TokenBreakdownAgg, + ToolTraceResponse, + MemoryEntry, + DailyMemoryEntry, + DreamEntry, } from './types'; // v2.6 Phase 1-UX §9b: chat-scoped agent-session rows. Returned by @@ -340,6 +344,10 @@ export const api = { method: 'POST', body: JSON.stringify({ tool_call_id: toolCallId, decision }), }), + getTraces: (chatId: string, limit = 50, offset = 0) => + request( + `/api/chats/${chatId}/traces?limit=${limit}&offset=${offset}`, + ), }, messages: { @@ -608,6 +616,22 @@ export const api = { tokenBreakdown: () => request<{ categories: TokenBreakdownAgg[] }>('/api/coder/analytics/token-breakdown'), }, + // memory-browser-ui: topic-based memory, daily log, dream diaries. + memory: { + list: (projectId: string) => + request<{ entries: MemoryEntry[] }>( + `/api/memory?project_id=${encodeURIComponent(projectId)}`, + ), + daily: (projectId: string) => + request<{ entries: DailyMemoryEntry[] }>( + `/api/memory/daily?project_id=${encodeURIComponent(projectId)}`, + ), + dreams: (projectId: string) => + request<{ entries: DreamEntry[] }>( + `/api/memory/dreams?project_id=${encodeURIComponent(projectId)}`, + ), + }, + settings: { get: () => request>('/api/settings'), patch: (body: Record) => diff --git a/apps/web/src/api/types.ts b/apps/web/src/api/types.ts index de702ee..d87e9ae 100644 --- a/apps/web/src/api/types.ts +++ b/apps/web/src/api/types.ts @@ -559,8 +559,16 @@ export type WsFrame = ctx_used: number | null; ctx_max: number | null; } - | { type: 'messages_deleted'; message_ids: string[]; chat_id?: string } + | { type: 'messages_deleted'; message_ids: string[]; chat_id?: string } | { type: 'chat_renamed'; chat_id: string; name: string } + | { + type: 'agent_snapshot'; + chat_id: string; + agent?: string | null; + model: string; + mode?: string | null; + turn_number: number; + } // v1.11: published by services/compaction.ts after the new anchored // summary row lands. Carries the new summary row id for diagnostics; the // session-stream handler ignores the id and re-fetches the full message @@ -604,6 +612,31 @@ export type WsFrame = run_status?: 'running' | 'completed' | 'failed' | 'cancelled'; report?: string; } + // tool trace frames: per-tool-call lifecycle tracking + | { + type: 'tool_trace_start'; + trace_id: string; + message_id: string; + chat_id: string; + tool_name: string; + tool_input: Record; + started_at: string; + } + | { + type: 'tool_trace_finish'; + trace_id: string; + message_id: string; + chat_id: string; + tool_name: string; + tool_output?: string | null; + latency_ms?: number; + tokens_used?: number | null; + cache_tokens?: number | null; + reasoning_tokens?: number | null; + error?: string; + outcome?: string; + finished_at: string; + } // arena frames: battle lifecycle + per-contestant streaming | { type: 'battle_started'; @@ -630,8 +663,64 @@ export type WsFrame = winner_contestant_id?: string | null; analysis_ready?: boolean; cross_exam_id?: string; + } + // streaming v2: channel-delta frames. Each carries a monotonic seq for + // out-of-order buffering and a channel discriminator; per-channel payloads + // map to the equivalent legacy frame types after reordering. + | { + type: 'channel_delta'; + seq: number; + channel: 'text' | 'tool_call' | 'tool_result' | 'status' | 'error'; + message_id?: string; + chat_id?: string; + content?: string; + tool_call?: ToolCall; + tool_message_id?: string; + tool_call_id?: string; + output?: unknown; + truncated?: boolean; + error?: string; + reason?: string; + status?: 'running' | 'complete' | 'cancelled' | 'failed'; + tokens_used?: number | null; + ctx_used?: number | null; + ctx_max?: number | null; + cache_tokens?: number | null; + reasoning_tokens?: number | null; + started_at?: string | null; + finished_at?: string | null; + model?: string | null; + metadata?: MessageMetadata | null; }; +// tool traces: per-tool-call record returned by GET /api/chats/:id/traces. +export interface ToolTrace { + id: string; + session_id: string; + chat_id: string; + message_id: string | null; + turn_number: number; + tool_name: string; + tool_input: Record; + tool_output: string | null; + started_at: string; + finished_at: string | null; + latency_ms: number | null; + tokens_used: number | null; + cache_tokens: number | null; + reasoning_tokens: number | null; + error: string | null; + outcome: string | null; + created_at: string; +} + +export interface ToolTraceResponse { + data: ToolTrace[]; + total: number; + limit: number; + offset: number; +} + // token-analyzer-ui: aggregate token/cost analytics types. export interface AnalyticsSummary { total_input_tokens: number; @@ -660,3 +749,21 @@ export interface TokenBreakdownAgg { category: string; total_tokens: number; } + +// ── Memory browser types ──────────────────────────────────────────── +export interface MemoryEntry { + id: string; + topic: string; + title: string; + content: string; + tags: string[]; +} + +export interface DailyMemoryEntry extends MemoryEntry { + date: string; +} + +export interface DreamEntry { + date: string; + content: string; +} diff --git a/apps/web/src/components/MessageList.tsx b/apps/web/src/components/MessageList.tsx index 38ce79a..12150b6 100644 --- a/apps/web/src/components/MessageList.tsx +++ b/apps/web/src/components/MessageList.tsx @@ -1,10 +1,14 @@ -import { useCallback, useEffect, useMemo, useRef } from 'react'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; +import { motion } from 'framer-motion'; +import { Virtuoso, type VirtuosoHandle } from 'react-virtuoso'; +import { Pin } from 'lucide-react'; import type { Chat, Message } from '@/api/types'; import { MessageBubble } from './MessageBubble'; import { ToolCallGroup } from './ToolCallGroup'; import { ToolCallLine, type ToolRun } from './ToolCallLine'; import { AskUserInputCard } from './AskUserInputCard'; import { RequestReadAccessCard } from './RequestReadAccessCard'; +import { MessageListErrorBoundary } from './MessageListErrorBoundary'; interface Props { messages: Message[]; @@ -142,27 +146,63 @@ function stampCapHits(items: RenderItem[]): RenderItem[] { }); } -const SCROLL_THRESHOLD_PX = 150; - export function MessageList({ messages, sessionChats }: Props) { - const endRef = useRef(null); - const scrollContainerRef = useRef(null); + const virtuosoRef = useRef(null); const isNearBottomRef = useRef(true); + const renderedKeysRef = useRef(new Set()); + const prefersReducedMotionRef = useRef(false); + const [animateEnabled, setAnimateEnabled] = useState(true); + + const [pinMessageId, setPinMessageId] = useState(() => { + if (typeof window !== 'undefined') { + const hash = window.location.hash; + if (hash.startsWith('#pin=')) return hash.slice(5); + } + return null; + }); const renderItems = useMemo(() => stampCapHits(group(flatten(messages))), [messages]); - const handleScroll = useCallback(() => { - const el = scrollContainerRef.current; - if (!el) return; - isNearBottomRef.current = - el.scrollHeight - el.scrollTop - el.clientHeight < SCROLL_THRESHOLD_PX; + const pinIndex = useMemo(() => { + if (!pinMessageId) return -1; + return renderItems.findIndex( + (item) => item.kind === 'message' && item.message.id === pinMessageId, + ); + }, [pinMessageId, renderItems]); + + useEffect(() => { + const mq = window.matchMedia('(prefers-reduced-motion: reduce)'); + prefersReducedMotionRef.current = mq.matches; + const handler = (e: MediaQueryListEvent) => { + prefersReducedMotionRef.current = e.matches; + }; + mq.addEventListener('change', handler); + return () => mq.removeEventListener('change', handler); }, []); useEffect(() => { - if (isNearBottomRef.current) { - endRef.current?.scrollIntoView({ block: 'end' }); + const handler = () => { + const hash = window.location.hash; + if (hash.startsWith('#pin=')) { + setPinMessageId(hash.slice(5)); + } else { + setPinMessageId(null); + } + }; + window.addEventListener('hashchange', handler); + return () => window.removeEventListener('hashchange', handler); + }, []); + + const atBottomStateChange = useCallback((atBottom: boolean) => { + isNearBottomRef.current = atBottom; + setAnimateEnabled(atBottom); + }, []); + + const scrollToPin = useCallback(() => { + if (pinIndex >= 0 && virtuosoRef.current) { + virtuosoRef.current.scrollToIndex({ index: pinIndex, align: 'center' }); } - }, [messages]); + }, [pinIndex]); if (messages.length === 0) { return ( @@ -173,46 +213,78 @@ export function MessageList({ messages, sessionChats }: Props) { } return ( -
-
- {renderItems.map((item) => { - if (item.kind === 'message') { - return ( - - ); - } - if (item.kind === 'tool_run') { - if (item.run.call.name === 'ask_user_input') { - return ( - - ); - } - if (item.run.call.name === 'request_read_access') { - return ( - - ); - } - return ; - } - return ; - })} -
-
+ +
+ {pinMessageId && pinIndex >= 0 && ( +
+ + Pinned message + +
+ )} + { + const key = item.kind === 'message' ? `msg-${item.message.id}` : item.key; + const isNew = !renderedKeysRef.current.has(key); + if (isNew) renderedKeysRef.current.add(key); + + const reducedMotion = prefersReducedMotionRef.current; + const delay = isNew && !reducedMotion ? Math.min(index * 0.04, 0.5) : 0; + const shouldAnimate = isNew && animateEnabled; + + return ( +
+ 0 ? { duration: 0.2, delay } : { duration: 0 }} + > + {item.kind === 'message' ? ( + + ) : item.kind === 'tool_run' ? ( + item.run.call.name === 'ask_user_input' ? ( + + ) : item.run.call.name === 'request_read_access' ? ( + + ) : ( + + ) + ) : ( + + )} + +
+ ); + }} + />
+
); } diff --git a/apps/web/src/components/SessionTimeline.tsx b/apps/web/src/components/SessionTimeline.tsx new file mode 100644 index 0000000..69afa58 --- /dev/null +++ b/apps/web/src/components/SessionTimeline.tsx @@ -0,0 +1,188 @@ +import { useMemo } from 'react'; +import { Clock, Cpu, Hash, Layers, RefreshCw, X } from 'lucide-react'; +import { Button } from '@/components/ui/button'; +import { cn } from '@/lib/utils'; +import type { Message } from '@/api/types'; + +interface TurnEntry { + message: Message; + turnNumber: number; + elapsed: string; + toolCallCount: number; +} + +interface Props { + messages: Message[]; + chatId: string; + onClose: () => void; + onScrollToMessage: (messageId: string) => void; +} + +function formatElapsed(startedAt: string | null, finishedAt: string | null): string { + if (!startedAt || !finishedAt) return '—'; + const start = new Date(startedAt).getTime(); + const end = new Date(finishedAt).getTime(); + if (Number.isNaN(start) || Number.isNaN(end)) return '—'; + const ms = end - start; + if (ms < 0) return '—'; + if (ms < 1000) return `${ms}ms`; + if (ms < 60_000) return `${Math.round(ms / 1000)}s`; + const mins = Math.floor(ms / 60_000); + const secs = Math.round((ms % 60_000) / 1000); + return `${mins}m ${secs}s`; +} + +/** + * SessionTimeline — vertical timeline of assistant turns in a chat. + * + * Renders a side-panel overlay with each turn's model, tokens, duration, + * and tool-call count. Clicking a turn scrolls the main chat to that + * message. The latest turn shows a "Scroll to latest" restore button. + */ +export function SessionTimeline({ messages, onClose, onScrollToMessage }: Props) { + const turns = useMemo(() => { + const assistantMsgs = messages.filter( + (m) => m.role === 'assistant' && m.status === 'complete', + ); + return assistantMsgs.map((message, i) => ({ + message, + turnNumber: i + 1, + elapsed: formatElapsed(message.started_at, message.finished_at), + toolCallCount: message.tool_calls?.length ?? 0, + })); + }, [messages]); + + const latestTurnId = turns.length > 0 ? turns[turns.length - 1]!.message.id : null; + + return ( +
+ {/* Header */} +
+

Session Timeline

+ +
+ + {/* Timeline entries */} +
+ {turns.length === 0 ? ( +
+ No assistant turns yet. +
+ ) : ( +
+ {turns.map((turn, i) => { + const isLatest = turn.message.id === latestTurnId; + return ( +
+ {/* Vertical connector line */} + {i < turns.length - 1 && ( +
+ )} + + {/* Timeline dot button */} + + + {/* Content card */} +
+
onScrollToMessage(turn.message.id)} + > + {/* Turn number + latest badge */} +
+ + Turn {turn.turnNumber} + + {isLatest && ( + + Latest + + )} +
+ + {/* Model name */} +
+ + {turn.message.model ?? 'Unknown model'} +
+ + {/* Token count with breakdown */} + {turn.message.tokens_used != null && ( +
+ + {turn.message.tokens_used.toLocaleString()} total + {turn.message.cache_tokens != null && turn.message.cache_tokens > 0 && ( + + ({turn.message.cache_tokens.toLocaleString()} cache) + + )} + {turn.message.reasoning_tokens != null && turn.message.reasoning_tokens > 0 && ( + + ({turn.message.reasoning_tokens.toLocaleString()} reasoning) + + )} +
+ )} + + {/* Duration + tool calls */} +
+ + + {turn.elapsed} + + {turn.toolCallCount > 0 && ( + + + {turn.toolCallCount} tool call{turn.toolCallCount !== 1 ? 's' : ''} + + )} +
+
+ + {/* Restore button for latest turn */} + {isLatest && ( + + )} +
+
+ ); + })} +
+ )} +
+
+ ); +} diff --git a/apps/web/src/components/TraceViewer.tsx b/apps/web/src/components/TraceViewer.tsx new file mode 100644 index 0000000..75c0c00 --- /dev/null +++ b/apps/web/src/components/TraceViewer.tsx @@ -0,0 +1,251 @@ +import { useCallback, useEffect, useMemo, useState } from 'react'; +import { ChevronDown, ChevronRight, AlertCircle } from 'lucide-react'; +import { api } from '@/api/client'; +import type { ToolTrace } from '@/api/types'; + +interface Props { + chatId: string; +} + +// Max latency used as the 100% reference for the bar visualization +const MAX_LATENCY_REF = 30_000; // 30s + +function latencyBarWidth(latencyMs: number | null): number { + if (latencyMs == null) return 0; + return Math.min(latencyMs / MAX_LATENCY_REF, 1); +} + +function TraceRow({ trace }: { trace: ToolTrace }) { + const [expanded, setExpanded] = useState(false); + const isError = trace.outcome !== null && trace.outcome !== 'success'; + const barWidth = latencyBarWidth(trace.latency_ms); + const latencyLabel = + trace.latency_ms != null + ? trace.latency_ms >= 1000 + ? `${(trace.latency_ms / 1000).toFixed(1)}s` + : `${trace.latency_ms}ms` + : null; + + return ( +
+ + {expanded && ( +
+
+ Input +
+              {JSON.stringify(trace.tool_input, null, 1)}
+            
+
+ {trace.tool_output != null && ( +
+ Output +
+                {trace.tool_output.length > 2000
+                  ? `${trace.tool_output.slice(0, 2000)}…`
+                  : trace.tool_output}
+              
+
+ )} + {trace.error != null && ( +
+ {trace.error} +
+ )} +
+ )} +
+ ); +} + +function TraceGroup({ toolName, traces }: { toolName: string; traces: ToolTrace[] }) { + const [collapsed, setCollapsed] = useState(false); + const totalLatency = traces.reduce((sum, t) => sum + (t.latency_ms ?? 0), 0); + const totalTokens = traces.reduce((sum, t) => sum + (t.tokens_used ?? 0), 0); + const errorCount = traces.filter( + (t) => t.outcome !== null && t.outcome !== 'success', + ).length; + + return ( +
+ + {!collapsed && traces.map((trace) => ( + + ))} +
+ ); +} + +export function TraceViewer({ chatId }: Props) { + const [open, setOpen] = useState(false); + const [traces, setTraces] = useState([]); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + const fetchTraces = useCallback(async () => { + setLoading(true); + setError(null); + try { + const res = await api.chats.getTraces(chatId); + setTraces(res.data); + } catch (err) { + setError(err instanceof Error ? err.message : 'failed to load traces'); + } finally { + setLoading(false); + } + }, [chatId]); + + useEffect(() => { + if (open) { + void fetchTraces(); + } + }, [open, fetchTraces]); + + const groups = useMemo(() => { + const map = new Map(); + for (const t of traces) { + const existing = map.get(t.tool_name); + if (existing) { + existing.push(t); + } else { + map.set(t.tool_name, [t]); + } + } + return map; + }, [traces]); + + const totalCount = traces.length; + const errorCount = traces.filter( + (t) => t.outcome !== null && t.outcome !== 'success', + ).length; + + return ( +
+ + {open && ( +
+ {loading && traces.length === 0 && ( +
+ Loading traces… +
+ )} + {error && ( +
+ {error} + +
+ )} + {!loading && !error && traces.length === 0 && ( +
+ No tool traces yet. +
+ )} + {traces.length > 0 && ( +
+ {Array.from(groups.entries()).map(([toolName, groupTraces]) => ( + + ))} +
+ )} +
+ )} +
+ ); +} diff --git a/apps/web/src/components/panes/ChatPane.tsx b/apps/web/src/components/panes/ChatPane.tsx index 2b8d4ea..0881ecd 100644 --- a/apps/web/src/components/panes/ChatPane.tsx +++ b/apps/web/src/components/panes/ChatPane.tsx @@ -1,11 +1,13 @@ import { useCallback, useEffect, useRef, useState } from 'react'; -import { Pencil, Send, X } from 'lucide-react'; +import { History, Pencil, Send, X } from 'lucide-react'; import { toast } from 'sonner'; import { api } from '@/api/client'; import { useSessionStream } from '@/hooks/useSessionStream'; import { MessageList } from '@/components/MessageList'; import { ChatInput } from '@/components/ChatInput'; import { StaleStreamBanner } from '@/components/StaleStreamBanner'; +import { SessionTimeline } from '@/components/SessionTimeline'; +import { TraceViewer } from '@/components/TraceViewer'; import { sendToChat } from '@/lib/events'; interface Props { @@ -25,6 +27,7 @@ interface Props { export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange, sessionChats, webSearchEnabled }: Props) { const stream = useSessionStream(sessionId); const lastErrorRef = useRef(null); + const [showTimeline, setShowTimeline] = useState(false); const [queue, setQueue] = useState<{ id: string; text: string }[]>([]); const queueIdRef = useRef(0); const processingRef = useRef(false); @@ -203,11 +206,41 @@ export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange, } } + const handleScrollToMessage = useCallback((messageId: string) => { + const el = document.getElementById(`msg-${messageId}`); + if (el) { + el.scrollIntoView({ behavior: 'smooth', block: 'center' }); + } + }, []); + return ( -
+
+ {chatMessages.length > 0 && ( +
+ +
+ )} + {/* v1.11.5: ContextBar moved into ChatInput (above the agent picker). */} + + {/* Queued messages */} {queue.length > 0 && (
@@ -275,6 +308,16 @@ export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange, messages={chatMessages} modelContextLimit={modelContextLimit} /> + + {/* Timeline overlay panel */} + {showTimeline && ( + setShowTimeline(false)} + onScrollToMessage={handleScrollToMessage} + /> + )}
); } diff --git a/apps/web/src/hooks/useSessionStream.ts b/apps/web/src/hooks/useSessionStream.ts index cb52cf8..ebacca9 100644 --- a/apps/web/src/hooks/useSessionStream.ts +++ b/apps/web/src/hooks/useSessionStream.ts @@ -16,6 +16,133 @@ interface State { error: string | null; } +type Channel = 'text' | 'tool_call' | 'tool_result' | 'status' | 'error'; + +// Per-channel out-of-order frame buffer with contiguous-seq flush logic. +// Stores incoming channel_delta frames and releases them only when seq +// becomes contiguous with the expected next value. +class ChannelBuffer { + private expectedSeq = 0; + private buffer = new Map(); + + push(frame: ChannelDeltaWsFrame): ChannelDeltaWsFrame[] { + if (frame.seq < this.expectedSeq) { + return []; + } + if (frame.seq === this.expectedSeq) { + this.expectedSeq++; + const flushed = [frame]; + while (this.buffer.has(this.expectedSeq)) { + const next = this.buffer.get(this.expectedSeq)!; + this.buffer.delete(this.expectedSeq); + this.expectedSeq++; + flushed.push(next); + } + return flushed; + } + this.buffer.set(frame.seq, frame); + return []; + } + + get expectedNextSeq(): number { + return this.expectedSeq; + } + + get bufferedCount(): number { + return this.buffer.size; + } + + reset(seq = 0) { + this.expectedSeq = seq; + this.buffer.clear(); + } +} + +type ChannelDeltaWsFrame = WsFrame & { type: 'channel_delta' }; + +// Converts a flushed channel_delta into the equivalent legacy frame so the +// existing applyFrame reducer handles the per-message mutation. Status +// deltas are handled separately (they may need to create the message first +// and apply throughput metadata independently of terminal status). +function channelDeltaToLegacyFrame(delta: ChannelDeltaWsFrame): WsFrame | null { + switch (delta.channel) { + case 'text': + return { type: 'delta', message_id: delta.message_id!, content: delta.content! }; + case 'tool_call': + return { type: 'tool_call', message_id: delta.message_id!, tool_call: delta.tool_call! }; + case 'tool_result': + return { + type: 'tool_result', + tool_message_id: delta.tool_message_id!, + chat_id: delta.chat_id, + tool_call_id: delta.tool_call_id!, + output: delta.output, + truncated: delta.truncated!, + ...(delta.error ? { error: delta.error } : {}), + }; + case 'error': + return { + type: 'error', + message_id: delta.message_id, + chat_id: delta.chat_id, + error: delta.error!, + ...(delta.reason ? { reason: delta.reason as never } : {}), + }; + case 'status': + return null; + } +} + +// Apply a flushed status channel_delta to state. Status deltas carry both +// intermediate throughput metadata (tokens_used, ctx_used, model, etc.) +// and optional terminal transitions (complete / cancelled / failed). +function applyStatusDelta(state: State, delta: ChannelDeltaWsFrame): State { + const { message_id, chat_id, status, channel: _c, seq: _s, type: _t, ...meta } = delta; + if (!message_id) return state; + let next = state; + + const exists = next.messages.some((m) => m.id === message_id); + if (!exists && status === 'running') { + next = applyFrame(next, { + type: 'message_started', + message_id, + chat_id, + role: 'assistant', + }); + } + + const metaFields: Record = {}; + if (meta.tokens_used !== undefined) metaFields.tokens_used = meta.tokens_used; + if (meta.ctx_used !== undefined) metaFields.ctx_used = meta.ctx_used; + if (meta.ctx_max !== undefined) metaFields.ctx_max = meta.ctx_max; + if (meta.cache_tokens !== undefined) metaFields.cache_tokens = meta.cache_tokens; + if (meta.reasoning_tokens !== undefined) metaFields.reasoning_tokens = meta.reasoning_tokens; + if (meta.started_at !== undefined) metaFields.started_at = meta.started_at; + if (meta.finished_at !== undefined) metaFields.finished_at = meta.finished_at; + if (meta.model !== undefined) metaFields.model = meta.model; + if (meta.metadata !== undefined) metaFields.metadata = meta.metadata; + + if (Object.keys(metaFields).length > 0) { + next = { + ...next, + messages: next.messages.map((m) => + m.id === message_id ? { ...m, ...metaFields } : m, + ), + }; + } + + if (status === 'complete' || status === 'cancelled' || status === 'failed') { + next = applyFrame(next, { + type: 'message_complete', + message_id, + chat_id, + status, + }); + } + + return next; +} + function applyFrame(state: State, frame: WsFrame): State { switch (frame.type) { case 'snapshot': { @@ -33,8 +160,6 @@ function applyFrame(state: State, frame: WsFrame): State { kind: 'message', tool_calls: null, tool_results: null, - // v1.8.2: cap-hit sentinels arrive role='system' and are static, so - // skipping the streaming dot for them keeps the UI accurate. status: frame.role === 'system' ? 'complete' : 'streaming', last_seq: 0, tokens_used: null, @@ -65,7 +190,7 @@ function applyFrame(state: State, frame: WsFrame): State { const next = state.messages.map((m) => m.id === frame.message_id ? { ...m, tool_calls: [...(m.tool_calls ?? []), frame.tool_call] } - : m + : m, ); return { ...state, messages: next }; } @@ -85,7 +210,7 @@ function applyFrame(state: State, frame: WsFrame): State { }, status: 'complete' as const, } - : m + : m, ); return { ...state, messages: next }; } @@ -132,19 +257,13 @@ function applyFrame(state: State, frame: WsFrame): State { ...(frame.started_at !== undefined ? { started_at: frame.started_at } : {}), ...(frame.finished_at !== undefined ? { finished_at: frame.finished_at } : {}), ...(frame.model !== undefined ? { model: frame.model } : {}), - // v1.8.2: cap-hit sentinels (and future stamped metadata) ride - // in on this terminal frame so the reducer can attach it - // without waiting for a refetch. ...(frame.metadata !== undefined ? { metadata: frame.metadata } : {}), } - : m + : m, ); return { ...state, messages: next }; } case 'usage': { - // v1.12.2: live throughput. Side-effects into the module-level - // singleton consumed by ChatThroughput; no message-state mutation. - // chat_id is the optional ws-frame field; usage frames always include it. if (frame.chat_id) { recordUsage(frame.chat_id, { completion_tokens: frame.completion_tokens, @@ -172,10 +291,6 @@ function applyFrame(state: State, frame: WsFrame): State { return state; } case 'error': { - // v1.8.2: when the frame carries a structured reason, stamp it onto the - // failed message's metadata so the bubble can render specifics inline - // (the WS error frame is one-shot; refresh-safe rendering needs the - // value persisted on the message). const errorMeta = frame.reason ? { kind: 'error' as const, error_reason: frame.reason, error_text: frame.error } : null; @@ -187,47 +302,53 @@ function applyFrame(state: State, frame: WsFrame): State { status: 'failed' as const, ...(errorMeta ? { metadata: errorMeta } : {}), } - : m + : m, ) : state.messages; return { ...state, messages: next, error: frame.error }; } case 'compacted': { - // v1.11: side effects (refetch + toast) live in ws.onmessage; the - // reducer just no-ops so TS exhaustiveness is satisfied without - // duplicating async work inside a synchronous reducer. + return state; + } + case 'agent_snapshot': { return state; } case 'agent_status_updated': { - // agent-status-normalize (#10): coder-only frame consumed by CoderPane's - // own WS handler, not BooChat's native message reducer. No-op here to keep - // TS exhaustiveness satisfied (native sessions never emit it). return state; } case 'flow_run_started': case 'flow_run_step_updated': { - // Orchestrator frames consumed by OrchestratorPane's own subscription. - // No-op here to keep TS exhaustiveness satisfied. return state; } case 'battle_started': case 'contestant_updated': case 'battle_updated': { - // Arena frames consumed by ArenaPane's own subscription. - // No-op here to keep TS exhaustiveness satisfied. + return state; + } + case 'channel_delta': { + return state; + } + default: { return state; } } } -// Matches useUserEvents — exponential backoff with the same ceiling so the -// two channels reconnect on the same cadence after a network handoff. const RECONNECT_INITIAL_MS = 1000; const RECONNECT_MAX_MS = 30_000; +const CHANNEL_STALL_MS = 5000; export function useSessionStream(sessionId: string | undefined) { const [state, setState] = useState({ messages: [], connected: false, error: null }); const wsRef = useRef(null); + const channelBuffersRef = useRef>(new Map()); + const lastFrameTimeRef = useRef>>({}); + + // Reset channel buffers when session changes + useEffect(() => { + channelBuffersRef.current = new Map(); + lastFrameTimeRef.current = {}; + }, [sessionId]); useEffect(() => { if (!sessionId) return; @@ -238,6 +359,73 @@ export function useSessionStream(sessionId: string | undefined) { let reconnectTimer: ReturnType | null = null; let reconnectDelay = RECONNECT_INITIAL_MS; + const getLastSeqPerChannel = () => { + const seqs: Partial> = {}; + for (const [ch, buf] of channelBuffersRef.current) { + seqs[ch] = buf.expectedNextSeq; + } + return seqs; + }; + + const flushDeltaToState = (delta: ChannelDeltaWsFrame) => { + console.error('FDS', delta.channel, 'flushed'); + if (delta.channel === 'status') { + setState((s) => applyStatusDelta(s, delta)); + } else { + const legacy = channelDeltaToLegacyFrame(delta); + if (legacy) { + setState((s) => applyFrame(s, legacy)); + } + } + }; + + const handleChannelDelta = (frame: ChannelDeltaWsFrame) => { + console.error('HCD', frame.channel, frame.seq, 'bufs', channelBuffersRef.current.size); + const buffers = channelBuffersRef.current; + let buffer = buffers.get(frame.channel); + if (!buffer) { + buffer = new ChannelBuffer(); + buffers.set(frame.channel, buffer); + } + + const flushed = buffer.push(frame); + if (flushed.length === 0) return; + + for (const delta of flushed) { + flushDeltaToState(delta); + } + + let emittedRefresh = false; + for (const delta of flushed) { + if (delta.channel === 'status' && (delta.status === 'complete' || delta.status === 'cancelled' || delta.status === 'failed')) { + emittedRefresh = true; + } + } + if (emittedRefresh) { + sessionEvents.emit({ type: 'git_diff_refresh' }); + } + + lastFrameTimeRef.current[frame.channel] = Date.now(); + }; + + // Periodic channel stall check: if any channel has buffered frames + // but no progress for 5s, force a snapshot refetch. + let stallTimer: ReturnType | null = null; + + const startStallTimer = () => { + stallTimer = setInterval(() => { + const now = Date.now(); + for (const [channel, buffer] of channelBuffersRef.current) { + if (buffer.bufferedCount === 0) continue; + const lastTime = lastFrameTimeRef.current[channel as Channel] ?? 0; + if (now - lastTime >= CHANNEL_STALL_MS) { + buffer.reset(); + sessionEvents.emit({ type: 'refetch_messages' }); + } + } + }, 1000); + }; + const connect = () => { if (unmounted) return; const proto = window.location.protocol === 'https:' ? 'wss' : 'ws'; @@ -248,13 +436,16 @@ export function useSessionStream(sessionId: string | undefined) { ws.onopen = () => { reconnectDelay = RECONNECT_INITIAL_MS; setState((s) => ({ ...s, connected: true, error: null })); + + // Mid-stream reconnection protocol: send last known seq per channel + // so the server can replay deltas or fall back to a full snapshot. + const lastSeq = getLastSeqPerChannel(); + ws.send(JSON.stringify({ type: 'reconnect', lastSeqPerChannel: lastSeq })); + + startStallTimer(); }; + ws.onmessage = (ev) => { - // v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid - // frames are logged and dropped. WsFrameSchema is the runtime guard; - // the hand-maintained WsFrame type stays as the narrowed dev-time - // shape (Zod uses OpaqueObject for nested types like Message[]). One - // cast bridges the two. let raw: unknown; try { raw = JSON.parse(typeof ev.data === 'string' ? ev.data : ''); @@ -272,13 +463,14 @@ export function useSessionStream(sessionId: string | undefined) { } try { const frame = validated.data as unknown as WsFrame; - // v1.11: on a compaction completion, re-fetch the message list so - // the new summary row + the cohort of compacted_at-stamped older - // rows render correctly. We dispatch the fresh list as a synthetic - // 'snapshot' frame so the reducer's existing path handles state - // replacement (no need for a parallel "refetched" path). - // The toast is purely UX feedback; missing it would still leave - // the chat in a valid state. + + if (frame.type === 'channel_delta') { + console.error('RAW_PARSE', JSON.stringify(validated.data).slice(0, 200)); + console.error('CD', frame.channel, frame.seq, JSON.stringify(frame).slice(0, 80)); + handleChannelDelta(frame); + return; + } + if (frame.type === 'compacted') { toast.success('Context compacted to free space'); void api.messages @@ -291,8 +483,9 @@ export function useSessionStream(sessionId: string | undefined) { }); return; } + setState((s) => applyFrame(s, frame)); - // Trigger git diff refresh after each completed assistant turn. + if (frame.type === 'message_complete') { sessionEvents.emit({ type: 'git_diff_refresh' }); } @@ -300,15 +493,18 @@ export function useSessionStream(sessionId: string | undefined) { console.warn('bad ws frame', err); } }; - // v1.8.1: WS errors no longer surface as user-facing toasts here. The - // user-channel hook (useUserEvents) owns the debounced "reconnecting…" - // UI; this channel just reconnects silently on the same backoff. + ws.onerror = () => { try { ws.close(); } catch {} }; + ws.onclose = () => { if (unmounted) return; setState((s) => ({ ...s, connected: false })); + if (stallTimer) { + clearInterval(stallTimer); + stallTimer = null; + } const delay = reconnectDelay; reconnectDelay = Math.min(reconnectDelay * 2, RECONNECT_MAX_MS); reconnectTimer = setTimeout(connect, delay); @@ -320,6 +516,7 @@ export function useSessionStream(sessionId: string | undefined) { return () => { unmounted = true; if (reconnectTimer) clearTimeout(reconnectTimer); + if (stallTimer) clearInterval(stallTimer); const ws = wsRef.current; wsRef.current = null; if (ws) try { ws.close(); } catch {} diff --git a/openspec/changes/paseo-orchestrator/proposal.md b/openspec/changes/paseo-orchestrator/proposal.md new file mode 100644 index 0000000..322c260 --- /dev/null +++ b/openspec/changes/paseo-orchestrator/proposal.md @@ -0,0 +1,107 @@ +# Paseo-like Orchestrator — Trace Observability, Dynamic Workflows & Agent Runtime + +**Status:** Proposed +**Epic:** paseo-orchestrator +**Depends on:** v2.7.17-orchestrator + +## Why + +BooCode's Orchestrator (v2.7.17) runs deterministic Han analysis flows — but it's a fixed pipeline, not a general-purpose agent runtime. Every tool call is opaque: no timing, no cost breakdown, no replay. Sessions evaporate on browser refresh. Workflows are hardcoded. Subagents block until completion. And there's zero visibility into cache efficiency on DeepSeek — despite prompt caching being a major cost lever. + +The current architecture treats the LLM as a black box and the agent as a one-shot transaction. To move from "read-only chat" to a **Paseo-style thin-client orchestration layer**, BooCode needs five capabilities that compound on each other: + +1. **Observability** — Every tool call timed, logged, and live-streamed. Without it, debugging agent behavior is guesswork. +2. **Persistence** — Agent state survives browser refresh. Active sessions resume where they left off. +3. **Dynamic Workflows** — User-authored JS scripts using `agent()`, `parallel()`, `pipeline()` instead of hardcoded flows. Hash-based caching skips completed steps on re-run. +4. **Background Subagents** — `spawn_subagent` returns immediately, results collected later. Unlocks parallel research, long-running analyses, and notification-based workflows. +5. **Multi-modal + Cache Shape** — Image attachments forwarded to DeepSeek's vision API, plus per-turn cache hit rate visualization to close the cost feedback loop. + +Each phase is independently valuable; together they transform BooCode from a chat UI into a durable agent execution platform. + +## What Changes + +### Phase 1: Trace System + Observability (3-4 days) + +1. **Create `tool_traces` DB table** — id, session_id, chat_id, turn_number, tool_name, input, output, started_at, finished_at, latency_ms, tokens_used, cache_tokens, reasoning_tokens, error, outcome. Applied idempotently via `applySchema()`. + +2. **Add `tool_trace` WS frame** — new WsFrame variant in `@boocode/contracts` published by the server when a tool call starts and completes. Frontend receives live timing deltas via `useSessionStream`. + +3. **Instrument `tool-phase.ts`** — wrap `executeToolCall` with `clock_timestamp()` start/end, extract token counts from LLM response metadata, publish `tool_trace` frames on start (with input) and finish (with output + metrics). + +4. **Add GET `/api/chats/:id/traces`** — paginated endpoint returning trace rows ordered by turn_number + started_at. Supports cursor-based pagination for large sessions. + +5. **Build trace viewer pane** — collapsible tree per turn, timing bars showing latency relative to turn duration, expand/collapse per tool call showing input/output. Integrates into the existing multi-pane workspace alongside chat, coder, and orchestrator panes. + +### Phase 2: Session Persistence + Resume (2-3 days) + +6. **Serialize agent state to DB** — on each turn boundary (before and after tool call loop), snapshot the active `AgentSession` state (provider config, turn history, pending tool calls) to a JSONB column in `agent_sessions`. Uses `clock_timestamp()` for ordering. + +7. **Restore on WS reconnect** — when `snapshot` frame arrives on reconnection, check for a persisted `AgentSession` in `in_progress` or `awaiting_input` state. Rehydrate the coder pane to match the persisted turn, tool call, and pending state. + +8. **Agent session timeline view** — a timeline component in the coder pane showing the history of all turns in the current agent session. Each turn shows start time, tool count, token usage, cache hit rate. Clicking a turn scrolls to that point in the conversation. + +### Phase 3: Dynamic Workflow Engine (5-7 days) + +9. **Create `isolated-vm` sandbox** — restricted JS execution environment for workflow scripts. No `require`, `fs`, `net`, `child_process`. Only the workflow API surface exposed. Token budget enforcement kills runaway scripts. + +10. **Implement workflow API primitives** — `agent(id, { prompt, model, tools, budget })` defines a sub-agent; `parallel([agent1, agent2])` runs N agents concurrently with a shared token budget; `pipeline([step1, step2])` chains agents sequentially; `phase(name, { agents, budget })` groups agents under a named phase; `budget(limit)` sets token or step limits; `log(msg)` emits structured workflow log. Compatible with Claude Code workflow script format. + +11. **Workflow file discovery** — scan `.boocode/workflows/*.js` (project-local), `~/.boocode/workflows/*.js` (global), and a built-in catalog directory. Each file exports a `workflow` object with `{name, description, run}`. Discovery runs on server start and on file change (optional watch mode). + +12. **Workflow manager + built-in catalog** — `WorkflowManager` class with `list()`, `get(name)`, `run(workflow, args)`, `cancel(runId)`, `status(runId)`. Concurrency limits (configurable max concurrent runs), token budgets per run. Built-in catalog includes: `deep-research` (parallel source search → per-source analysis → synthesis), `multi-review` (code health + security + standards reviews in parallel), `plan-verify` (generate plan → verify plan → generate tasks), `bounty-hunt` (parallel vulnerability scanning with different focuses). + +13. **Workflow resumability** — SHA-256 hash of each agent spec (prompt + options). Before executing an agent, check if a completed result exists with the same hash. Skip cached agents, only execute new/changed ones. In-memory LRU cache for current session, optional DB persistence for cross-session reuse. + +14. **Workflow UI integration** — extend the existing Orchestrator panel (used for Han flows) to support dynamic workflows. Workflow selector dropdown, live run pane with step-by-step progress, cancel button, log output stream, per-agent timing. Reuses the same run-pane component pattern. + +### Phase 4: Background Subagents (2-3 days) + +15. **Background task queue** — uses the existing `tasks` table with a new `background` type. `spawn_subagent` tool creates a task row and returns immediately. A background worker picks up the task and executes it without blocking the calling agent. + +16. **`subagent_status` + `subagent_result` tools** — `subagent_status(task_id)` returns `running|completed|failed` with optional progress info. `subagent_result(task_id)` returns the full output when completed. Polling-based (no WS push for background tasks initially). + +17. **Background agent pane** — new pane type showing running/completed background agents. Each entry shows name, status, duration, progress. Completed entries show a "View Result" action. Notifications hook into the existing notification system (toast on completion, badge count for active tasks). + +### Phase 5: Multi-modal + Cache Shape (2-3 days) + +18. **Image/file attachment pipeline** — accept file uploads (drag-drop or file picker), store on tmpfs with a reference in the message row. Forward to DeepSeek's multimodal API as base64-encoded image parts. Size limit enforcement (configurable, default 20MB per attachment). + +19. **Image render in message bubble** — render attached images inline in the chat message bubble. Lightbox on click for expanded view. Thumbnail generation for large images to keep chat scrolling performant. + +20. **Cache shape telemetry** — extract `prompt_cache_hit_tokens` from DeepSeek provider metadata on each turn. Break down by segment: system prompt, tool schemas, conversation history. Store in `tool_traces` columns and/or a dedicated `cache_stats` table. + +21. **Cache hit rate visualization** — per-turn cache hit bar in the trace viewer (showing cached vs non-cached tokens). Cumulative cache hit rate in the session footer. Highlight when a turn achieves high cache reuse (green indicator) or unusually low (yellow/red). + +## Non-Goals +- No changes to the existing Han flow orchestrator (runs alongside dynamic workflows) +- No removal of existing agent dispatch paths (PTY, ACP, Claude SDK — dynamic workflows are additive) +- No distributed execution (all orchestration is single-node) +- No persistent workflow file watching (manual reload or server restart to pick up new workflows) +- No workflow editing UI (workflows are authored as JS files) + +## Capabilities + +### New Capabilities +- **Tool trace viewer** — every tool call with timing, token costs, cache breakdown, expandable input/output +- **Agent session resume** — browser refresh preserves active agent state +- **Dynamic workflows** — user-authored JS scripts with `agent()/parallel()/pipeline()` API +- **Workflow resumability** — hash-based step caching skips completed agents on re-run +- **Built-in workflow catalog** — deep-research, multi-review, plan-verify, bounty-hunt +- **Background subagents** — non-blocking spawn with deferred result collection +- **Multi-modal support** — image attachments forwarded to DeepSeek vision API +- **Cache shape telemetry** — per-turn and cumulative cache hit rate visualization + +### Modified Capabilities +- **Orchestrator panel** — extended from fixed Han flows to dynamic workflow selection and streaming run pane +- **tool-phase.ts** — instrumented with start/end timing and trace publishing +- **WsFrame contract** — new `tool_trace` frame variant +- **tasks table** — extended with `background` type for async subagent execution + +## Metrics +- Tool call observability: 0% → 100% of calls traced with timing +- Session continuity: lost on refresh → preserved on reconnect +- Workflow authoring: hardcoded → user-authored JS scripts +- Workflow re-run efficiency: 0% cache → hash-based step reuse +- Background execution: blocking only → blocking + non-blocking +- Cache visibility: 0% → per-turn + cumulative hit rate +- Multi-modal: text-only → text + image attachments diff --git a/openspec/changes/paseo-orchestrator/tasks.md b/openspec/changes/paseo-orchestrator/tasks.md new file mode 100644 index 0000000..a40827e --- /dev/null +++ b/openspec/changes/paseo-orchestrator/tasks.md @@ -0,0 +1,230 @@ +# Tasks — Paseo-like Orchestrator + +## Phase 1: Trace System + Observability (5 tasks) + +### 1. Create tool_traces DB table + migration +Add `tool_traces` table to `apps/server/src/schema.sql`: +- Columns: id (UUID PK), session_id (UUID FK → sessions), chat_id (UUID FK → chats), turn_number (int), tool_name (text), input (jsonb), output (jsonb), started_at (timestamptz), finished_at (timestamptz), latency_ms (int), tokens_used (int), cache_tokens (int), reasoning_tokens (int), error (text), outcome (text) +- Index on (chat_id, turn_number, started_at) for trace queries +- Index on (session_id) for session-level aggregation +- Applied idempotently via `applySchema()` — wrap in `CREATE TABLE IF NOT EXISTS` +**Verification**: `psql` shows `tool_traces` table with all columns and indexes. Schema re-run is no-op. + +### 2. Add tool_trace WS frame + contracts schema +Add `tool_trace` frame to `WsFrameSchema` in `packages/contracts/src/ws-frames.ts`: +- Frame types: `tool_trace:start` (tool_name, input, started_at) and `tool_trace:complete` (tool_name, output, latency_ms, tokens_used, cache_tokens, reasoning_tokens, error) +- Add to `InferenceFrame` loose union in `apps/server/src/services/inference/turn.ts` +- Add to strict `WsFrame` discriminated union in `apps/web/src/api/types.ts` +- Rebuild contracts: `pnpm -C packages/contracts build` +**Verification**: tsc --noEmit passes. WS client receives `tool_trace:start` and `tool_trace:complete` frames. + +### 3. Instrument tool-phase.ts with start/end timing +Update `apps/server/src/services/tools/tool-phase.ts`: +- Before `executeToolCall`: record `clock_timestamp()` as start, publish `tool_trace:start` frame with tool_name and input +- After `executeToolCall`: record `clock_timestamp()` as finish, compute latency_ms, extract token counts from response metadata, INSERT into `tool_traces` table, publish `tool_trace:complete` frame +- Handle errors: on thrown error, publish `tool_trace:complete` with error field set, set outcome='error'; on success, outcome='success' +- Use `sql.json(input as never)` for JSONB columns — no double-serialization +**Verification**: Every tool call produces a `tool_traces` row with correct latency_ms and outcome. WS client receives both start and complete frames. + +### 4. Add GET /api/chats/:id/traces endpoint +Create `apps/server/src/routes/traces.ts`: +- `GET /api/chats/:id/traces` — paginated, ordered by (turn_number, started_at) +- Query params: `cursor` (opaque cursor for keyset pagination), `limit` (default 50, max 200), `turn_number` (optional filter to single turn) +- Returns `{traces: Trace[], next_cursor: string | null}` +- Register in Fastify router with `chatOwnershipPreHandler` guard +**Verification**: `curl /api/chats/:id/traces` returns paginated trace rows. Turn filter returns only matching traces. + +### 5. Build trace viewer frontend component +Create `apps/web/src/components/TraceViewer.tsx` (and supporting files): +- Collapsible tree grouped by turn_number +- Per tool call row: tool_name badge, latency bar (relative bar width, color-coded: green <1s, yellow <5s, red ≥5s), token count, expand/collapse chevron +- Expanded view: tool input (JSON formatted), tool output (JSON formatted), error message if any +- Fetch traces from `/api/chats/:id/traces` on pane mount, paginate on scroll +- Integrate as a new pane option in the multi-pane workspace (existing pane registry) +**Verification**: Trace viewer loads, groups by turn, shows timing bars, expands/collapses tool calls. Pagination works for sessions with 50+ traces. + +## Phase 2: Session Persistence + Resume (3 tasks) + +### 6. Serialize agent state to DB on turn boundaries +Modify `apps/coder` agent dispatch: +- On each turn boundary (after LLM response, before next tool call loop), serialize `AgentSession` state to `agent_sessions` table +- Persist: provider config, turn history, pending tool calls, current phase, token budget remaining +- Use JSONB column for the snapshot state, `clock_timestamp()` for last_update +- Guard against rapid consecutive saves (debounce 200ms) +**Verification**: Agent session state is written to `agent_sessions` after each LLM turn. JSONB snapshot contains all fields needed for resume. + +### 7. Restore state on WS reconnect +Update `apps/server/src/services/ws.ts`: +- On `snapshot` frame from a reconnecting client, check for `AgentSession` in `in_progress` or `awaiting_input` state +- If found, rehydrate the coder pane: restore provider config, replay pending tool calls, set turn history +- Publish a `session_restored` frame with the restored state metadata +- Client-side: `useSessionStream` handles `session_restored` by resetting pane state to match +**Verification**: Refresh browser mid-agent-session → after reconnect, the coder pane shows the same turn state, pending tool calls, and conversation history. + +### 8. Agent session timeline view +Add timeline component to the coder pane: +- Horizontal timeline showing all turns in the current agent session +- Each turn entry: turn number, start time, tool call count, token usage, cache hit rate +- Active turn highlighted, past turns dimmed +- Clicking a past turn scrolls the conversation to that turn and collapses later turns +- Fetch turn metadata from existing session data (no new endpoint needed) +**Verification**: Timeline shows all turns. Clicking a turn scrolls to it. Active turn is highlighted. + +## Phase 3: Dynamic Workflow Engine (6 tasks) + +### 9. Create isolated-vm workflow sandbox +Create `apps/server/src/services/workflow/sandbox.ts`: +- Use `isolated-vm` npm package to create a V8 isolate for each workflow run +- No `require`, `fs`, `net`, `child_process` accessible in the sandbox +- Expose only the workflow API surface (`agent`, `parallel`, `pipeline`, `phase`, `budget`, `log`, `args`) +- Token budget enforcement: inject a step counter, throw when budget exceeded +- Timeout: 30s default, configurable per workflow +- Error boundary: caught exceptions produce structured error results instead of crashing the worker +- Add `isolated-vm` to `apps/server/package.json` dependencies +**Verification**: Workflow script that calls `agent()` runs without error. Script trying `require('fs')` throws a sandbox violation. Run exceeding budget is killed with a clear message. + +### 10. Implement agent/parallel/pipeline primitives +Create `apps/server/src/services/workflow/api.ts`: +- `agent(id, { prompt, model?, tools?, budget? })` — registers a sub-agent. Returns an object with `.run(input)` that dispatches the agent through the existing agent dispatch system and returns result. +- `parallel([agents], { budget? })` — runs all agents concurrently. Returns when all complete (or any fails). Shared token budget across parallel agents. Uses `Promise.allSettled` for resilience. +- `pipeline([steps], { budget? })` — runs steps sequentially. Each step receives the previous step's output. Steps can be `agent()` results or inline functions. +- `phase(name, { agents, budget })` — groups agents under a named phase. Phases can have their own budget. Results are namespaced by phase name. +- `budget(limit)` — sets token or step limits. Returns a budget object consumed by agent/parallel/pipeline. +- `log(msg)` — emits a structured log entry tagged with current phase/agent context. Published as WS frame to the Orchestrator pane. +- `args` — the input arguments passed to `workflow.run(args)`. +**Verification**: A test workflow using `agent()`, `parallel()`, and `pipeline()` executes correctly. Logs appear in the output stream. Token budgets are enforced. + +### 11. Workflow file discovery system +Create `apps/server/src/services/workflow/discovery.ts`: +- Scan `.boocode/workflows/*.js` (project root, relative to `PROJECT_ROOT_WHITELIST`) +- Scan `~/.boocode/workflows/*.js` (global, `os.homedir()`) +- Scan `data/workflows/` (built-in catalog) +- Each file must export a `workflow` object: `{name, description, run(args) => {...}}` +- Validate the workflow object at discovery time: required fields, run must be a function +- On server start, run full discovery. Cache results in a `Map`. +- Log discovered workflows with name + description at `info` level +**Verification**: Placing a valid `.boocode/workflows/test.js` file makes the workflow appear in `WorkflowManager.list()`. Invalid workflow files are logged as warnings and skipped. + +### 12. Workflow manager + built-in catalog +Create `apps/server/src/services/workflow/manager.ts`: +- `WorkflowManager` singleton class: + - `list()` — returns all discovered workflows with name, description, and arg schema + - `get(name)` — returns a workflow by name + - `run(workflow, args)` — creates a sandbox, injects args, executes `workflow.run()`. Returns a runId (UUID). + - `cancel(runId)` — terminates the sandbox, marks run as cancelled + - `status(runId)` — returns run status: `pending|running|completed|failed|cancelled`, with progress info +- Concurrency limit: configurable via `WORKFLOW_MAX_CONCURRENT` env var (default 3) +- Token budget: configurable via `WORKFLOW_DEFAULT_BUDGET` env var (default 100_000 tokens) +- Run state tracked in-memory with optional DB persistence + +Built-in workflows in `data/workflows/`: +- `deep-research` — parallel source search → per-source analysis → synthesis report +- `multi-review` — run code health + security + standards reviews in parallel, merge findings +- `plan-verify` — generate implementation plan → verify plan → generate work items +- `bounty-hunt` — parallel vulnerability scans with different focus areas (injection, auth, crypto, business logic) +**Verification**: `list()` returns built-in workflows. `run()` executes a workflow and returns runId. `status()` reflects progress. `cancel()` stops execution cleanly. + +### 13. Workflow resumability (hash-based cache) +Create `apps/server/src/services/workflow/cache.ts`: +- Compute SHA-256 hash of each agent spec: `crypto.createHash('sha256').update(JSON.stringify({prompt, options})).digest('hex')` +- Before executing an agent, check in-memory LRU cache for existing result matching the hash +- Hit: return cached result, emit `log('cached', agentId, hash)` — no actual dispatch +- Miss: execute agent, store result in cache keyed by hash +- LRU eviction: `WORKFLOW_CACHE_SIZE` env var (default 100 entries) +- Optional DB persistence: `workflow_cache` table with `hash`, `result`, `created_at` — cross-session reuse +- Re-run detection: identical workflow with same args → all agents skipped +- Partial re-run: changed args → only changed agents re-execute, unchanged ones read from cache +**Verification**: First run of a workflow executes all agents. Second run with identical args skips all agents (logs show 'cached'). Run with modified args for one agent only re-executes that agent. + +### 14. Workflow UI integration with Orchestrator panel +Extend `apps/web/src/components/Orchestrator/`: +- Add workflow selector dropdown listing workflows from `WorkflowManager.list()` +- Add "Run Workflow" button that opens workflow args editor (JSON or form) +- Extend existing run pane to show workflow steps with per-agent progress +- Live log stream from workflow `log()` calls, displayed in a scrollable log view +- Cancel button for running workflows +- Resumability indicator: "3/5 steps cached — skipping" when hash cache hits +- Fetch workflow list via new API endpoint or WS message (add `GET /api/orchestrator/workflows`) +**Verification**: Workflow selector lists built-in workflows. Running a workflow shows step-by-step progress in the run pane. Cancelling a running workflow works. Cached steps show "skipped" indicator. + +## Phase 4: Background Subagents (3 tasks) + +### 15. Background task queue + spawn_subagent tool +Modify `apps/coder/` and `apps/server/`: +- Extend `tasks` table usage with a new task type marker for background subagent tasks +- Create `spawn_subagent` tool in `apps/server/src/services/tools/`: + - Schema: `{prompt, model?, tools?, budget?, metadata?}` + - Creates a `tasks` row with state=`pending`, type=`background_subagent` + - Returns `{task_id, status: 'pending'}` immediately — does NOT block +- Background worker loop: polls `tasks` table for `background_subagent` tasks in `pending` state, picks one up, executes it via existing agent dispatch, writes result back to tasks row on completion +- Max concurrency: `BACKGROUND_MAX_CONCURRENT` env var (default 2) +- Worker polls interval: 1s (configurable) +**Verification**: Calling `spawn_subagent` returns immediately with a task_id. The task eventually completes with a result in the tasks table. Multiple background tasks run concurrently up to the concurrency limit. + +### 16. subagent_status + subagent_result tools +Create two tools in `apps/server/src/services/tools/`: +- `subagent_status(task_id)`: + - Schema: `{task_id}` + - Returns: `{task_id, status: 'pending'|'running'|'completed'|'failed', progress?: string, started_at?, finished_at?}` + - Queries `tasks` table for the status +- `subagent_result(task_id)`: + - Schema: `{task_id}` + - Returns: `{task_id, status, result?: json, error?: string}` + - Only returns result when status='completed'; returns empty result otherwise with a message + - Updates task state to `read` on successful result retrieval (optional) +**Verification**: Calling `subagent_status` on a running task returns 'running'. Calling `subagent_result` on a completed task returns the full result. Calling `subagent_result` on a pending task returns a clear "not ready yet" message. + +### 17. Background agent pane +Create `apps/web/src/components/BackgroundAgentPane.tsx`: +- New pane type showing running, completed, and failed background subagents +- Each entry: agent name/description, status badge, duration (elapsed or total), progress indicator +- Running entries: progress bar (if available), cancel button +- Completed entries: "View Result" action that opens a modal or inline view with the full output +- Failed entries: error message, "Retry" action +- Badge counter on pane tab showing number of running tasks +- Poll status every 2s for running entries, stop polling on completion +- Register in pane registry alongside existing pane types +**Verification**: Background pane shows spawning tasks as "pending", transitioning to "running", then "completed"/"failed". "View Result" shows the full output. Badge counter reflects active running tasks. + +## Phase 5: Multi-modal + Cache Shape (4 tasks) + +### 18. Multi-modal attachment pipeline +Add file upload support: +- Accept file uploads via drag-drop or file picker in the message input area +- Store uploaded files on tmpfs (`/tmp/boocode-uploads/` by default, configurable via `UPLOAD_DIR`) +- Reference attachments in message row via `message_parts` with `type='image'` and a `url` pointing to the tmpfs path +- Forward to DeepSeek API: encode image as base64 data URI, send as multimodal content part in the user message +- Supported formats: png, jpg, jpeg, gif, webp +- Size limit: 20MB default, configurable via `MAX_ATTACHMENT_SIZE_MB` env var +- Server-side cleanup: delete tmpfs files after message is fully processed or on a periodic sweep +**Verification**: Uploading an image creates a file on tmpfs and a referenced `message_parts` row. DeepSeek API call includes the image as a base64 content part. Error on files over size limit. + +### 19. Image render in message bubble +Update message rendering in `apps/web/src/components/MessageBubble.tsx`: +- Detect `message_parts` with `type='image'` in the message content +- Render attached images inline in the chat bubble, below the text content +- Thumbnail: max 300px wide, aspect-ratio preserved, rounded corners +- Lightbox: clicking the thumbnail opens a full-size overlay with close button +- Loading state: skeleton placeholder while image loads from tmpfs URL +- Error state: broken image placeholder with retry option +- Clean layout: images displayed in a grid (1-2 columns depending on count) +**Verification**: Chat messages with image attachments render inline thumbnails. Clicking opens lightbox. Large images are thumbnailed. Broken images show error state. + +### 20. Cache shape telemetry data pipeline +Extract and store cache metrics: +- In the DeepSeek provider response handler, extract `prompt_cache_hit_tokens` and `prompt_cache_miss_tokens` from the API response metadata +- Break down cache segments: system prompt tokens, tool schema tokens, conversation history tokens (approximate by measuring each segment length) +- Store cache metrics in `tool_traces.cache_tokens` column (already created in Phase 1) +- Optionally create a `cache_stats` table for per-segment breakdown: `{turn_id, segment_name, hit_tokens, miss_tokens}` +- Expose via existing traces API (cache fields already part of the Trace schema) +**Verification**: After a DeepSeek call, `tool_traces` row has `cache_tokens` populated. Cache segment breakdown is available when querying traces. + +### 21. Cache shape visualization in trace viewer +Update the TraceViewer component with cache metrics: +- Per-turn cache hit bar: horizontal stacked bar showing cached (green) vs non-cached (gray) tokens +- Hit rate percentage displayed as a badge next to token count +- Cumulative cache hit rate in the session footer: "Cache hit rate: 67% (45K/67K tokens)" +- Color coding: green ≥60%, yellow 30-59%, red <30% +- Tooltip on hover showing segment breakdown if available +- Animate transitions when new trace data arrives +**Verification**: Trace viewer shows cache hit/miss bars per turn. Cumulative rate in footer updates as new traces load. Color coding matches thresholds. diff --git a/packages/contracts/src/ws-frames.ts b/packages/contracts/src/ws-frames.ts index 2a4bf7c..ff575ff 100644 --- a/packages/contracts/src/ws-frames.ts +++ b/packages/contracts/src/ws-frames.ts @@ -407,11 +407,115 @@ export const BattleUpdatedFrame = z.object({ cross_exam_id: Uuid.optional(), }); +// ---- agent snapshot restore frame ------------------------------------------ + +export const AgentSnapshotFrame = z.object({ + type: z.literal('agent_snapshot'), + chat_id: z.string().uuid(), + agent: z.string().nullable().optional(), + model: z.string(), + mode: z.string().nullable().optional(), + turn_number: z.number().int().nonnegative(), +}); + +// ---- tool trace frames ----------------------------------------------------- + +export const ToolTraceStartFrame = z.object({ + type: z.literal('tool_trace_start'), + trace_id: z.string().uuid(), + message_id: z.string().uuid(), + chat_id: z.string().uuid(), + tool_name: z.string().min(1), + tool_input: z.record(z.unknown()), + started_at: z.string().datetime(), +}); + +export const ToolTraceFinishFrame = z.object({ + type: z.literal('tool_trace_finish'), + trace_id: z.string().uuid(), + message_id: z.string().uuid(), + chat_id: z.string().uuid(), + tool_name: z.string().min(1), + tool_output: z.union([z.string(), z.null()]).optional(), + latency_ms: z.number().int().nonnegative().optional(), + tokens_used: z.number().int().nonnegative().nullable().optional(), + cache_tokens: z.number().int().nonnegative().nullable().optional(), + reasoning_tokens: z.number().int().nonnegative().nullable().optional(), + error: z.string().optional(), + outcome: z.string().optional(), + finished_at: z.string().datetime(), +}); + +// ---- channel-delta frames (streaming v2) ---------------------------------- +// +// Each channel frame carries a monotonic `seq` counter so the client can +// reorder out-of-order deltas per-channel, detect gaps, and request replay on +// reconnect. The `channel` discriminator tells the reducer which substate to +// update. + +const TextChannelPayload = z.object({ + message_id: Uuid, + chat_id: Uuid.optional(), + content: z.string(), +}); + +const ToolCallChannelPayload = z.object({ + message_id: Uuid, + chat_id: Uuid.optional(), + tool_call: ToolCallShape, +}); + +const ToolResultChannelPayload = z.object({ + tool_message_id: Uuid, + chat_id: Uuid.optional(), + tool_call_id: ToolCallId, + output: z.unknown(), + truncated: z.boolean(), + error: z.string().optional(), +}); + +const StatusChannelPayload = z.object({ + message_id: Uuid, + chat_id: Uuid.optional(), + status: z.enum(['running', 'complete', 'cancelled', 'failed']).optional(), + tokens_used: z.number().int().nonnegative().nullable().optional(), + ctx_used: z.number().int().nonnegative().nullable().optional(), + ctx_max: z.number().int().positive().nullable().optional(), + cache_tokens: z.number().int().nonnegative().nullable().optional(), + reasoning_tokens: z.number().int().nonnegative().nullable().optional(), + started_at: IsoTimestamp.nullable().optional(), + finished_at: IsoTimestamp.nullable().optional(), + model: z.string().nullable().optional(), + metadata: OpaqueObject.nullable().optional(), +}); + +const ErrorChannelPayload = z.object({ + message_id: Uuid.optional(), + chat_id: Uuid.optional(), + error: z.string(), + reason: ErrorReasonValue.optional(), +}); + +const ChannelDeltaPayload = z.discriminatedUnion('channel', [ + z.object({ channel: z.literal('text'), ...TextChannelPayload.shape }), + z.object({ channel: z.literal('tool_call'), ...ToolCallChannelPayload.shape }), + z.object({ channel: z.literal('tool_result'), ...ToolResultChannelPayload.shape }), + z.object({ channel: z.literal('status'), ...StatusChannelPayload.shape }), + z.object({ channel: z.literal('error'), ...ErrorChannelPayload.shape }), +]); + +export const ChannelDeltaFrame = z.object({ + type: z.literal('channel_delta'), + seq: z.number().int().nonnegative(), + ...ChannelDeltaPayload.shape, +}); + // ---- discriminated union --------------------------------------------------- export const WsFrameSchema = z.discriminatedUnion('type', [ // per-session SnapshotFrame, + AgentSnapshotFrame, MessageStartedFrame, DeltaFrame, ReasoningDeltaFrame, @@ -434,6 +538,11 @@ export const WsFrameSchema = z.discriminatedUnion('type', [ BattleStartedFrame, ContestantUpdatedFrame, BattleUpdatedFrame, + // tool trace + ToolTraceStartFrame, + ToolTraceFinishFrame, + // channel-delta (streaming v2) + ChannelDeltaFrame, // per-user ChatStatusFrame, SessionUpdatedFrame, @@ -461,6 +570,7 @@ export type WsFrame = z.infer; // by the drift test in src/__tests__/ws-frames.test.ts. export const KNOWN_FRAME_TYPES: readonly WsFrame['type'][] = [ 'snapshot', + 'agent_snapshot', 'message_started', 'delta', 'reasoning_delta', @@ -481,6 +591,9 @@ export const KNOWN_FRAME_TYPES: readonly WsFrame['type'][] = [ 'battle_started', 'contestant_updated', 'battle_updated', + 'tool_trace_start', + 'tool_trace_finish', + 'channel_delta', 'chat_status', 'session_updated', 'session_renamed',