From 381b97f78aec0702ea5c9190b5fb6c3af85b7d40 Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Mon, 8 Jun 2026 03:48:47 +0000 Subject: [PATCH] feat(server): inference state-graph + supervisor, memory tools, MCP client, schema, routes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add state-graph.ts: typed state machine for inference lifecycle - Add supervisor.ts: agent supervisor pattern for multi-agent coordination - Add export-formatter.ts: structured export formatting - Add manage_memory.ts: memory CRUD tool for agent persistence - Add get_wiki_article.ts: codecontext wiki article retrieval - Extend memory/index.ts: 3-tier memory (context/daily/core) - Extend MCP client: mcp-config.ts env-var substitution - Update schema.sql: agent_sessions, tasks, pending_changes extensions - Update API types: MessageMetadata, ErrorReason, AgentSessionConfig - Update routes: chats, messages, sessions — column renames and agent_session_id - Update inference: error handler, payload builder, stream phase, turn orchestrator --- apps/server/CLAUDE.md | 7 +- apps/server/src/routes/chats.ts | 159 +++++- apps/server/src/routes/messages.ts | 116 +++- apps/server/src/routes/sessions.ts | 8 +- apps/server/src/schema.sql | 12 + apps/server/src/services/export-formatter.ts | 93 +++ .../src/services/inference/error-handler.ts | 6 + apps/server/src/services/inference/index.ts | 2 + apps/server/src/services/inference/payload.ts | 10 +- .../src/services/inference/state-graph.ts | 531 ++++++++++++++++++ .../src/services/inference/stream-phase.ts | 2 + .../src/services/inference/supervisor.ts | 75 +++ apps/server/src/services/inference/turn.ts | 498 +++++++++------- apps/server/src/services/inference/types.ts | 14 +- apps/server/src/services/mcp-client.ts | 13 + apps/server/src/services/memory/index.ts | 5 - .../tools/codecontext/get_wiki_article.ts | 61 ++ .../src/services/tools/manage_memory.ts | 160 ++++++ apps/server/src/types/api.ts | 4 + 19 files changed, 1546 insertions(+), 230 deletions(-) create mode 100644 apps/server/src/services/export-formatter.ts create mode 100644 apps/server/src/services/inference/state-graph.ts create mode 100644 apps/server/src/services/inference/supervisor.ts create mode 100644 apps/server/src/services/tools/codecontext/get_wiki_article.ts create mode 100644 apps/server/src/services/tools/manage_memory.ts diff --git a/apps/server/CLAUDE.md b/apps/server/CLAUDE.md index 9aa95fe..c0bbcd6 100644 --- a/apps/server/CLAUDE.md +++ b/apps/server/CLAUDE.md @@ -1,7 +1,12 @@ -# apps/server — BooChat backend (deep reference) +# apps/server — BooChat backend (deep reference) — v2.7.x (last meaningful update: 2026-06) > Per-app engineering notes for `apps/server/src/`. Cross-cutting commands, database, environment, workflow, and cross-app contracts (WS-frame / provider-type parity, sentinels) live in the **root `CLAUDE.md`**. This file auto-loads when you read/edit files under `apps/server/`. +## These gotchas are load-bearing — do not remove or refactor without understanding why +- Do NOT remove the abort-signal pinning comment in `stream-phase.ts` — `fullStream` exits cleanly on abort without throwing; the post-iteration `if (signal?.aborted)` check is the only thing that distinguishes cancelled from complete. +- Do NOT remove `includeUsage: true` from `provider.ts` — the adapter defaults it false; without it, token counts are always NULL. +- Do NOT add raw `broker.publish()`/`publishUser()` calls — always use `publishFrame`/`publishUserFrame` which Zod-validate against `WsFrameSchema`. + ## Stack - **Fastify** with `@fastify/websocket` and `@fastify/static` (serves the built frontend). diff --git a/apps/server/src/routes/chats.ts b/apps/server/src/routes/chats.ts index 2aa2cc1..be59c3c 100644 --- a/apps/server/src/routes/chats.ts +++ b/apps/server/src/routes/chats.ts @@ -1,18 +1,33 @@ import type { FastifyInstance } from 'fastify'; import { z } from 'zod'; +import crypto from 'node:crypto'; import type { Sql } from '../db.js'; +import type { Config } from '../config.js'; import type { Broker } from '../services/broker.js'; import type { Chat, Message } from '../types/api.js'; import { getModelContext } from '../services/model-context.js'; import { notifyCoderClose } from '../services/coder-notify.js'; import { MESSAGE_COLUMNS } from '../services/message-columns.js'; +import { formatJson, formatMarkdown } from '../services/export-formatter.js'; +export interface CompareHandlers { + enqueueCompare: ( + sessionId: string, + chatId: string, + assistantMessageId: string, + modelOverride: string, + compareGroupId: string, + ) => void; + cancelInference: (sessionId: string, chatId: string) => Promise; + hasActiveInference: (chatId: string) => boolean; +} const CreateBody = z.object({ name: z.string().min(1).max(200).optional(), }); const PatchBody = z.object({ - name: z.string().min(1).max(200), + name: z.string().min(1).max(200).optional(), + model: z.string().min(1).optional(), }); const ForkBody = z.object({ @@ -26,10 +41,17 @@ const DiscardStaleBody = z.object({ const STALE_MIN_AGE_SECONDS = 60; +const CompareBody = z.object({ + message: z.string().min(1).max(64_000), + models: z.array(z.string().min(1)).min(2).max(3), +}); + export function registerChatRoutes( app: FastifyInstance, sql: Sql, - broker: Broker + broker: Broker, + config?: Config, + compareHandlers?: CompareHandlers, ): void { app.get<{ Params: { id: string }; Querystring: { status?: string } }>( '/api/sessions/:id/chats', @@ -122,12 +144,15 @@ export function registerChatRoutes( reply.code(400); return { error: 'invalid body', details: parsed.error.flatten() }; } + const { name, model } = parsed.data; + const sets: Array> = [sql`updated_at = clock_timestamp()`]; + if (name !== undefined) sets.push(sql`name = ${name}`); + if (model !== undefined) sets.push(sql`model = ${model}`); const rows = await sql` UPDATE chats - SET name = ${parsed.data.name}, - updated_at = clock_timestamp() + SET ${(sql as any).join(sets, sql`, `)} WHERE id = ${req.params.id} - RETURNING id, session_id, name, status, created_at, updated_at + RETURNING id, session_id, name, model, status, created_at, updated_at `; if (rows.length === 0) { reply.code(404); @@ -448,4 +473,128 @@ export function registerChatRoutes( return rows; } ); + + app.get<{ Params: { id: string }; Querystring: { format?: string } }>( + '/api/chats/:id/export', + async (req, reply) => { + const format = req.query.format ?? 'json'; + if (format !== 'json' && format !== 'markdown') { + reply.code(400); + return { error: 'format must be json or markdown' }; + } + + const chat = await sql`SELECT * FROM chats WHERE id = ${req.params.id}`; + if (chat.length === 0) { + reply.code(404); + return { error: 'chat not found' }; + } + + const messages = await sql` + SELECT ${sql.unsafe(MESSAGE_COLUMNS)} + FROM messages_with_parts + WHERE chat_id = ${req.params.id} + ORDER BY created_at ASC, id ASC + `; + + if (format === 'markdown') { + reply.header('Content-Type', 'text/markdown'); + return formatMarkdown(chat[0]!, messages, chat[0]!.model); + } + + reply.header('Content-Type', 'application/json'); + return formatJson(chat[0]!, messages, chat[0]!.model); + } + ); + + // v2.8-compare: send the same message to N models and stream back parallel + // responses. Creates N assistant messages (one per model) and launches N + // parallel inference runs with model overrides. Each publishes frames + // scoped to the shared compare_group_id so the frontend can group them. + if (config && compareHandlers) { + app.post<{ Params: { id: string } }>( + '/api/chats/:id/compare', + async (req, reply) => { + const parsed = CompareBody.safeParse(req.body); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid body', details: parsed.error.flatten() }; + } + + const { message, models } = parsed.data; + + // Check for active inference first. + if (compareHandlers.hasActiveInference(req.params.id)) { + reply.code(409); + return { error: 'chat is currently streaming; stop it first' }; + } + + const chatRows = await sql` + SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open' + `; + if (chatRows.length === 0) { + reply.code(404); + return { error: 'chat not found' }; + } + const chat = chatRows[0]!; + const sessionId = chat.session_id; + const compareGroupId = crypto.randomUUID(); + + // Insert user message + N assistant messages in a single transaction. + const result = await sql.begin(async (tx) => { + const [userMsg] = await tx<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at, metadata) + VALUES (${sessionId}, ${chat.id}, 'user', ${message}, 'complete', clock_timestamp(), NULL) + RETURNING id + `; + + const responses: Array<{ model: string; assistant_message_id: string }> = []; + for (const model of models) { + const [asst] = await tx<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at, metadata) + VALUES ( + ${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp(), + ${tx.json({ compare_group_id: compareGroupId, model } as never)} + ) + RETURNING id + `; + responses.push({ model, assistant_message_id: asst!.id }); + } + + await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; + await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`; + + return { user_message_id: userMsg!.id, responses }; + }); + + // Publish user message frames. + broker.publishFrame(sessionId, { + type: 'message_started', + message_id: result.user_message_id, + chat_id: chat.id, + role: 'user', + }); + broker.publishFrame(sessionId, { + type: 'delta', + message_id: result.user_message_id, + chat_id: chat.id, + content: message, + }); + broker.publishFrame(sessionId, { + type: 'message_complete', + message_id: result.user_message_id, + chat_id: chat.id, + }); + + // Enqueue N parallel inference runs with model overrides. + for (const resp of result.responses) { + compareHandlers.enqueueCompare( + sessionId, chat.id, resp.assistant_message_id, resp.model, compareGroupId, + ); + } + + reply.code(202); + return { compare_group_id: compareGroupId, ...result }; + }, + ); + } } diff --git a/apps/server/src/routes/messages.ts b/apps/server/src/routes/messages.ts index 7090c8d..d93c0e2 100644 --- a/apps/server/src/routes/messages.ts +++ b/apps/server/src/routes/messages.ts @@ -3,12 +3,13 @@ import { z } from 'zod'; import type { Sql } from '../db.js'; import type { Config } from '../config.js'; import type { Broker } from '../services/broker.js'; -import type { Chat, Message, Session, ToolCall } from '../types/api.js'; +import type { Chat, Message, MessageMetadata, Session, ToolCall } from '../types/api.js'; // v1.13.17-cross-repo-reads: grant_read_access resolves the grant root at // decision time (not at request time) so concurrent project changes don't // stale-bind the resolution. import { resolveGrantRoot } from '../services/grant_resolver.js'; import { MESSAGE_COLUMNS } from '../services/message-columns.js'; +import { setServerPermission, getServerName } from '../services/mcp-client.js'; // Shared lookup for the answer_user_input + grant_read_access pause-resume // endpoints. Finds the originating assistant tool_call by id in message_parts, @@ -846,4 +847,117 @@ export function registerMessageRoutes( }; }, ); + + // v1.15.0-mcp-permission: approve/deny MCP tool calls for 'ask' state servers. + const McpApproveBody = z.object({ + tool_call_id: z.string().min(1), + permission: z.enum(['allow_once', 'allow_always', 'deny']), + }); + + app.post<{ Params: { id: string } }>( + '/api/chats/:id/mcp-approve', + async (req, reply) => { + const parsed = McpApproveBody.safeParse(req.body); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid body', details: parsed.error.flatten() }; + } + const { tool_call_id, permission } = parsed.data; + + const chatRows = await sql<{ id: string }[]>` + SELECT id FROM chats WHERE id = ${req.params.id} AND status = 'open' + `; + if (chatRows.length === 0) { + reply.code(404); + return { error: 'chat_not_found' }; + } + + // Look up the tool call to get the prefixed tool name + const callerRows = await sql<{ + payload: { name: string }; + }[]>` + SELECT p.payload + FROM message_parts p + JOIN messages m ON m.id = p.message_id + WHERE m.chat_id = ${req.params.id} + AND m.role = 'assistant' + AND p.kind = 'tool_call' + AND p.payload->>'id' = ${tool_call_id} + ORDER BY m.created_at DESC + LIMIT 1 + `; + const callerRow = callerRows[0]; + if (!callerRow) { + reply.code(404); + return { error: 'tool_call_not_found' }; + } + + const toolName = callerRow.payload.name; + const serverName = getServerName(toolName); + if (!serverName) { + reply.code(400); + return { error: 'not_an_mcp_tool', detail: `tool '${toolName}' is not from an MCP server` }; + } + + if (permission === 'allow_always' || permission === 'allow_once') { + setServerPermission(serverName, 'allow'); + } else if (permission === 'deny') { + setServerPermission(serverName, 'deny'); + } + + return { ok: true }; + }, + ); + + const FeedbackBody = z.object({ + value: z.enum(['up', 'down']), + }); + + app.post<{ Params: { id: string; message_id: string } }>( + '/api/chats/:id/messages/:message_id/feedback', + async (req, reply) => { + const parsed = FeedbackBody.safeParse(req.body); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid body', details: parsed.error.flatten() }; + } + const { id: chatId, message_id: messageId } = req.params; + const { value } = parsed.data; + + const msg = await sql<{ id: string; role: string; metadata: MessageMetadata | null }[]>` + SELECT id, role, metadata FROM messages WHERE id = ${messageId} AND chat_id = ${chatId} + `; + if (msg.length === 0) { + reply.code(404); + return { error: 'message not found' }; + } + + // Only allow feedback on assistant messages. + if (msg[0]!.role !== 'assistant') { + reply.code(400); + return { error: 'only assistant messages can receive feedback' }; + } + + // Check if feedback already exists + const existingMeta = msg[0]!.metadata; + if (existingMeta && existingMeta.kind === 'feedback') { + reply.code(409); + return { error: 'feedback already recorded' }; + } + + const feedbackMeta: MessageMetadata = { + kind: 'feedback', + value, + chat_id: chatId, + }; + + await sql` + UPDATE messages + SET metadata = ${sql.json(feedbackMeta as never)}, updated_at = clock_timestamp() + WHERE id = ${messageId} + `; + + return { ok: true }; + }, + ); } diff --git a/apps/server/src/routes/sessions.ts b/apps/server/src/routes/sessions.ts index 0d28839..39a5b3b 100644 --- a/apps/server/src/routes/sessions.ts +++ b/apps/server/src/routes/sessions.ts @@ -145,7 +145,7 @@ export function registerSessionRoutes( } const status = req.query.status === 'archived' ? 'archived' : 'open'; const rows = await sql` - SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at, agent_id, web_search_enabled, workspace_panes, allowed_read_paths + SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at, agent_id, web_search_enabled, workspace_panes, allowed_read_paths, state_graph_enabled FROM sessions WHERE project_id = ${req.params.id} AND status = ${status} ORDER BY updated_at DESC @@ -213,7 +213,7 @@ export function registerSessionRoutes( app.get<{ Params: { id: string } }>('/api/sessions/:id', async (req, reply) => { const rows = await sql` - SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at, agent_id, web_search_enabled, workspace_panes, allowed_read_paths + SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at, agent_id, web_search_enabled, workspace_panes, allowed_read_paths, state_graph_enabled FROM sessions WHERE id = ${req.params.id} `; if (rows.length === 0) { @@ -349,10 +349,10 @@ export function registerSessionRoutes( const rows = await sql` UPDATE sessions SET workspace_panes = ${sql.json(envelope as never)}, - updated_at = clock_timestamp() + updated_at = clock_timestamp() WHERE id = ${req.params.id} RETURNING id, project_id, name, model, system_prompt, status, created_at, updated_at, - agent_id, web_search_enabled, workspace_panes, allowed_read_paths + agent_id, web_search_enabled, workspace_panes, allowed_read_paths, state_graph_enabled `; if (rows.length === 0) { reply.code(404); diff --git a/apps/server/src/schema.sql b/apps/server/src/schema.sql index b72f229..969dd63 100644 --- a/apps/server/src/schema.sql +++ b/apps/server/src/schema.sql @@ -234,6 +234,7 @@ ALTER TABLE sessions ADD COLUMN IF NOT EXISTS workspace_panes JSONB NOT NULL DEF ALTER TABLE sessions ADD COLUMN IF NOT EXISTS status TEXT NOT NULL DEFAULT 'open'; -- v1.2: chats table +-- per-chat-model-switching v2.x: ALTER below adds the model override column. CREATE TABLE IF NOT EXISTS chats ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, @@ -244,6 +245,9 @@ CREATE TABLE IF NOT EXISTS chats ( ); CREATE INDEX IF NOT EXISTS idx_chats_session_status ON chats (session_id, status, updated_at DESC); +-- v2.7.x: per-chat model override. NULL = inherit from session.model. +ALTER TABLE chats ADD COLUMN IF NOT EXISTS model TEXT; + -- v1.2: messages.chat_id + messages.kind ALTER TABLE messages ADD COLUMN IF NOT EXISTS chat_id UUID REFERENCES chats(id) ON DELETE CASCADE; ALTER TABLE messages ADD COLUMN IF NOT EXISTS kind TEXT NOT NULL DEFAULT 'message'; @@ -320,6 +324,9 @@ BEGIN END IF; END $$; +-- per-chat-model-switching: per-chat model override. NULL = inherit from session model. +ALTER TABLE chats ADD COLUMN IF NOT EXISTS model TEXT; + -- v1.x-batch9: per-session agent reference. Agent definitions are not stored in -- the DB; they live in builtins (services/agents.ts) and a per-project AGENTS.md. -- agent_id is the slugified agent name. NULL means "use BooCode defaults". @@ -355,6 +362,11 @@ INSERT INTO settings (key, value) VALUES ('theme_mode', '"dark"') ON CONFLICT (k ALTER TABLE projects ADD COLUMN IF NOT EXISTS default_system_prompt TEXT NOT NULL DEFAULT ''; ALTER TABLE projects ADD COLUMN IF NOT EXISTS default_web_search_enabled BOOLEAN NOT NULL DEFAULT false; ALTER TABLE sessions ADD COLUMN IF NOT EXISTS web_search_enabled BOOLEAN; + +-- v[state-graph]: optional declarative state-graph engine flag. Default OFF +-- (existing procedural while loop). When ON, runAssistantTurn routes +-- through runGraph in state-graph.ts for node-based execution. +ALTER TABLE sessions ADD COLUMN IF NOT EXISTS state_graph_enabled BOOLEAN NOT NULL DEFAULT FALSE; ALTER TABLE sessions DROP COLUMN IF EXISTS tags; -- v1.11: anchored rolling compaction. diff --git a/apps/server/src/services/export-formatter.ts b/apps/server/src/services/export-formatter.ts new file mode 100644 index 0000000..d25438c --- /dev/null +++ b/apps/server/src/services/export-formatter.ts @@ -0,0 +1,93 @@ +import type { Chat, Message } from '../types/api.js'; + +interface ExportMessage { + role: string; + content: string; + model: string | null; + created_at: string; + tokens_used: number | null; + status: string; + kind: string; + tool_calls: Record[] | null; +} + +interface ExportJson { + chat: { + id: string; + name: string | null; + model: string | null; + created_at: string; + }; + messages: ExportMessage[]; +} + +export function formatJson( + chat: Chat, + messages: Message[], + model: string | null, +): string { + const data: ExportJson = { + chat: { + id: chat.id, + name: chat.name, + model, + created_at: chat.created_at, + }, + messages: messages.map((m) => ({ + role: m.role, + content: m.content, + model: m.model ?? null, + created_at: m.created_at, + tokens_used: m.tokens_used, + status: m.status, + kind: m.kind, + tool_calls: m.tool_calls as Record[] | null, + })), + }; + return JSON.stringify(data, null, 2); +} + +export function formatMarkdown( + chat: Chat, + messages: Message[], + model: string | null, +): string { + const parts: string[] = []; + parts.push(`# ${chat.name ?? 'Untitled Chat'}`); + parts.push(`Model: ${model ?? 'unknown'}`); + parts.push(''); + parts.push('---'); + parts.push(''); + + for (const msg of messages) { + // Skip system/sentinel messages for a cleaner transcript + if (msg.role === 'system') continue; + + const label = + msg.role === 'user' + ? 'User' + : msg.role === 'assistant' + ? 'Assistant' + : 'Tool'; + parts.push(`## ${label}`); + parts.push(''); + + if (msg.content) { + parts.push(msg.content); + parts.push(''); + } + + if (msg.tool_calls && msg.tool_calls.length > 0) { + for (const tc of msg.tool_calls) { + parts.push(`> \`${tc.name}\``); + parts.push(''); + parts.push('```json'); + parts.push(JSON.stringify(tc.args, null, 2)); + parts.push('```'); + parts.push(''); + } + } + } + + return parts.join('\n'); +} diff --git a/apps/server/src/services/inference/error-handler.ts b/apps/server/src/services/inference/error-handler.ts index b0aed4a..0c5fa8d 100644 --- a/apps/server/src/services/inference/error-handler.ts +++ b/apps/server/src/services/inference/error-handler.ts @@ -74,6 +74,7 @@ export async function handleAbortOrError( type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, + ...(args.compareGroupId ? { compare_group_id: args.compareGroupId } : {}), }); ctx.log.info({ sessionId, chatId, assistantMessageId }, 'inference cancelled'); } else { @@ -90,6 +91,7 @@ export async function handleAbortOrError( chat_id: chatId, error: errMsg, reason: 'llm_provider_error', + ...(args.compareGroupId ? { compare_group_id: args.compareGroupId } : {}), }); ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed'); } @@ -125,6 +127,7 @@ export async function finalizeStreamedRow( cacheTokens?: number | null; reasoningTokens?: number | null; beforeComplete?: () => Promise; + compareGroupId?: string; }, ): Promise { // v1.11.3: see executeToolPhase for the rationale. @@ -158,6 +161,7 @@ export async function finalizeStreamedRow( started_at: opts.startedAt, finished_at: updated?.finished_at ?? null, model: opts.model, + ...(opts.compareGroupId ? { compare_group_id: opts.compareGroupId } : {}), }); } @@ -182,6 +186,7 @@ export async function finalizeEmpty( type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, + ...(args.compareGroupId ? { compare_group_id: args.compareGroupId } : {}), }); } @@ -281,6 +286,7 @@ export async function finalizeCompletion( started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, + ...(args.compareGroupId ? { compare_group_id: args.compareGroupId } : {}), }); ctx.log.info( { diff --git a/apps/server/src/services/inference/index.ts b/apps/server/src/services/inference/index.ts index 219d024..79cc529 100644 --- a/apps/server/src/services/inference/index.ts +++ b/apps/server/src/services/inference/index.ts @@ -8,6 +8,7 @@ export { createInferenceRunner, MAX_STEPS, runInference, + runInferenceWithModel, } from './turn.js'; // P5: the shared pipeline types moved from turn.ts to types.ts (breaking the // hub-and-leaf near-cycle). Re-exported here so the public surface is unchanged. @@ -21,3 +22,4 @@ export type { export type { ToolPhaseResult } from './tool-phase.js'; export { detectDoomLoop, DOOM_LOOP_THRESHOLD } from './sentinels.js'; export { buildMessagesPayload } from './payload.js'; +export { runGraph, type GraphNodeType, type GraphState, type GraphResult } from './state-graph.js'; diff --git a/apps/server/src/services/inference/payload.ts b/apps/server/src/services/inference/payload.ts index d90e467..2c2f48e 100644 --- a/apps/server/src/services/inference/payload.ts +++ b/apps/server/src/services/inference/payload.ts @@ -194,6 +194,14 @@ export async function buildMessagesPayload( out.push(msg); continue; } + // TODO(vDeepSeek): when m has image attachments, use a content array + // with text + image parts (see multi-modal.ts:imageAttachmentsToParts). + // The AI SDK ModelMessage content shape supports: + // content: [ + // { type: 'text', text: '...' }, + // { type: 'image', image: 'data:image/png;base64,...' } + // ] + // The @ai-sdk/deepseek provider handles the image parts natively. out.push({ role: 'user', content: m.content }); } return out; @@ -206,7 +214,7 @@ export async function loadContext( ): Promise<{ session: Session; project: Project; history: Message[] } | null> { const sessionRows = await sql` SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at, - agent_id, web_search_enabled, allowed_read_paths + agent_id, web_search_enabled, allowed_read_paths, state_graph_enabled FROM sessions WHERE id = ${sessionId} `; if (sessionRows.length === 0) return null; diff --git a/apps/server/src/services/inference/state-graph.ts b/apps/server/src/services/inference/state-graph.ts new file mode 100644 index 0000000..6bd5f7d --- /dev/null +++ b/apps/server/src/services/inference/state-graph.ts @@ -0,0 +1,531 @@ +// P5: Optional declarative state graph engine for the inference turn loop. +// +// Replaces the procedural `while (stepNumber < effectiveCap)` in turn.ts +// with a node-based execution model. Default OFF via +// session.state_graph_enabled — zero behavior change when disabled. +// +// Nodes wrap EXISTING infrastructure (no new I/O patterns): +// PLAN → top-of-loop gate, compaction, loadContext, buildMessagesPayload, +// executeStreamPhase +// CALL_TOOL → executeToolPhase +// OBSERVE → process tool results, update loop locals +// REFLECT → decidePostToolAction, sentinel insertion, mistake tracker +// SYNTHESIZE → terminal (graph loop exits) + +import type { Agent, Project, Session, ToolCall } from '../../types/api.js'; +import { resolveProjectRoot } from '../path_guard.js'; +import { rewriteSearchQuery } from '../task-search-rewrite.js'; +import * as compaction from '../compaction.js'; +import { decideStep, decidePostToolAction } from './step-decision.js'; +import { + recordStep, + MISTAKE_RECOVERY_NOTE, + type MistakeState, +} from './mistake-tracker.js'; +import { + buildMessagesPayload, + loadContext, +} from './payload.js'; +import { toDcpMessages, transformMessages, fromDcpMessages } from './dcp/index.js'; +import { + finalizeCompletion, + finalizeEmpty, + handleAbortOrError, +} from './error-handler.js'; +import { + executeStreamPhase, +} from './stream-phase.js'; +import { executeToolPhase, type ToolPhaseResult } from './tool-phase.js'; +import type { + InferenceContext, + StreamPhaseState, + StreamResult, + TurnArgs, +} from './types.js'; +import { + runCapHitSummary, + runDoomLoopSummary, + insertMistakeRecoverySentinel, +} from './sentinel-summaries.js'; +import { execFile } from 'node:child_process'; +import { readFileSync, existsSync } from 'node:fs'; +import { join } from 'node:path'; + +const BUILD_TIMEOUT_MS = 60_000; +const BUILD_OUTPUT_CAP = 8_000; + +async function detectAndRunBuild( + ctx: InferenceContext, + projectRoot: string, + sessionId: string, + chatId: string, + model: string, + existingNote: string | undefined, +): Promise { + if (!model.startsWith('deepseek-')) return undefined; + const pkgPath = join(projectRoot, 'package.json'); + if (!existsSync(pkgPath)) return undefined; + let buildCmd: string | null = null; + try { + const pkg = JSON.parse(readFileSync(pkgPath, 'utf8')) as { scripts?: Record }; + if (pkg.scripts?.build) buildCmd = 'build'; + else if (pkg.scripts?.compile) buildCmd = 'compile'; + else if (pkg.scripts?.typecheck) buildCmd = 'typecheck'; + } catch { + return undefined; + } + if (!buildCmd) return undefined; + const hasPnpm = existsSync(join(projectRoot, 'pnpm-lock.yaml')); + const hasYarn = existsSync(join(projectRoot, 'yarn.lock')); + const pm = hasPnpm ? 'pnpm' : hasYarn ? 'yarn' : 'npm'; + try { + const out = await new Promise((resolve, reject) => { + execFile(pm, ['run', buildCmd!], { cwd: projectRoot, timeout: BUILD_TIMEOUT_MS, maxBuffer: BUILD_OUTPUT_CAP * 2 }, + (err, stdout, stderr) => { + if (err && (err as NodeJS.ErrnoException).code === 'ENOENT') { + resolve(''); + return; + } + const merged = (stdout + '\n' + stderr).trim(); + resolve(merged.slice(0, BUILD_OUTPUT_CAP)); + }, + ); + }); + if (!out) return undefined; + ctx.log.info({ sessionId, chatId, buildCmd, outputLen: out.length }, 'auto-fix: build failed'); + const combined = existingNote + ? existingNote + '\n\n--- Build error ---\n' + out.slice(0, BUILD_OUTPUT_CAP - existingNote.length) + : '--- Build error ---\n' + out.slice(0, BUILD_OUTPUT_CAP); + return combined; + } catch { + return undefined; + } +} + +// -- Types ---------------------------------------------------------------- + +export type GraphNodeType = 'PLAN' | 'CALL_TOOL' | 'OBSERVE' | 'REFLECT' | 'SYNTHESIZE'; + +export interface GraphState { + stepNumber: number; + toolsUsed: number; + recentToolCalls: ToolCall[]; + assistantMessageId: string; + mistakeTracker: MistakeState; + pendingRecoveryNote?: string; + effectiveCap: number; + budget: number; + projectRoot: string; + iterSession?: Session; + iterProject?: Project; + streamResult?: StreamResult; + startedAt?: string | null; + toolPhaseResult?: ToolPhaseResult; + shouldStop: boolean; +} + +interface GraphNode { + type: GraphNodeType; + edges: Array<{ to: GraphNodeType; condition: (state: GraphState) => boolean }>; + execute: ( + ctx: InferenceContext, + args: TurnArgs, + state: GraphState, + agent: Agent | null, + ) => Promise; +} + +export interface GraphResult { + stepNumber: number; + assistantMessageId: string; + toolsUsed: number; + recentToolCalls: ToolCall[]; + mistakeTracker: MistakeState; +} + +// -- Default graph -------------------------------------------------------- + +export function createDefaultGraph(): GraphNode[] { + return [ + { + type: 'PLAN', + edges: [ + { to: 'CALL_TOOL', condition: (s) => !!s.streamResult && s.streamResult.toolCalls.length > 0 }, + { to: 'SYNTHESIZE', condition: () => true }, + ], + execute: planNode, + }, + { + type: 'CALL_TOOL', + edges: [ + { to: 'OBSERVE', condition: () => true }, + ], + execute: callToolNode, + }, + { + type: 'OBSERVE', + edges: [ + { to: 'REFLECT', condition: (s) => s.toolPhaseResult?.action === 'continue' }, + { to: 'SYNTHESIZE', condition: () => true }, + ], + execute: observeNode, + }, + { + type: 'REFLECT', + edges: [ + { to: 'PLAN', condition: (s) => s.stepNumber < s.effectiveCap }, + { to: 'SYNTHESIZE', condition: () => true }, + ], + execute: reflectNode, + }, + { + type: 'SYNTHESIZE', + edges: [], + execute: async () => {}, + }, + ]; +} + +// -- Graph runner --------------------------------------------------------- + +export async function runGraph( + ctx: InferenceContext, + args: TurnArgs, + extra: { effectiveCap: number; budget: number; agent: Agent | null; projectRoot: string }, +): Promise { + const { effectiveCap, budget, agent } = extra; + + const state: GraphState = { + stepNumber: 0, + toolsUsed: args.toolsUsed, + recentToolCalls: args.recentToolCalls, + assistantMessageId: args.assistantMessageId, + mistakeTracker: args.mistakeTracker, + pendingRecoveryNote: args.pendingRecoveryNote, + effectiveCap, + budget, + projectRoot: extra.projectRoot, + shouldStop: false, + }; + + const graph = createDefaultGraph(); + let currentNode: GraphNodeType = 'PLAN'; + + while (currentNode !== 'SYNTHESIZE' && !state.shouldStop) { + const node = graph.find((n) => n.type === currentNode)!; + await node.execute(ctx, args, state, agent); + if (state.shouldStop) break; + const nextEdge = node.edges.find((e) => e.condition(state)); + if (!nextEdge) break; + currentNode = nextEdge.to; + } + + return { + stepNumber: state.stepNumber, + assistantMessageId: state.assistantMessageId, + toolsUsed: state.toolsUsed, + recentToolCalls: state.recentToolCalls, + mistakeTracker: state.mistakeTracker, + }; +} + +// -- PLAN node ------------------------------------------------------------ +// Top-of-loop gate → compaction → loadContext → DCP → buildPayload → stream + +async function planNode( + ctx: InferenceContext, + args: TurnArgs, + state: GraphState, + agent: Agent | null, +): Promise { + const { sessionId, chatId, signal } = args; + + // 1. Top-of-loop gate: doom-loop, then budget (pure decisions) + const decision = decideStep({ + recentToolCalls: state.recentToolCalls, + toolsUsed: state.toolsUsed, + budget: state.budget, + }); + + if (decision.kind === 'doom') { + const loaded = await loadContext(ctx.sql, sessionId, chatId); + if (loaded) { + const dlSession = args.modelOverride ? { ...loaded.session, model: args.modelOverride } : loaded.session; + const iterArgs: TurnArgs = { + sessionId, chatId, assistantMessageId: state.assistantMessageId, + toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, + mistakeTracker: state.mistakeTracker, signal, + }; + await runDoomLoopSummary(ctx, iterArgs, dlSession, loaded.project, loaded.history, agent, decision.loop); + } + state.shouldStop = true; + return; + } + + if (decision.kind === 'budget') { + const loaded = await loadContext(ctx.sql, sessionId, chatId); + if (loaded) { + const bhSession = args.modelOverride ? { ...loaded.session, model: args.modelOverride } : loaded.session; + const iterArgs: TurnArgs = { + sessionId, chatId, assistantMessageId: state.assistantMessageId, + toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, + mistakeTracker: state.mistakeTracker, signal, + }; + await runCapHitSummary(ctx, iterArgs, bhSession, loaded.project, loaded.history, agent, state.budget); + } + state.shouldStop = true; + return; + } + + // decision.kind === 'stream' → proceed. + + // 2. Compaction check + const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>` + SELECT needs_compaction FROM chats WHERE id = ${chatId} + `; + if (chatFlag[0]?.needs_compaction) { + try { + await compaction.process({ + sql: ctx.sql, config: ctx.config, log: ctx.log, + broker: ctx.broker, chatId, hooks: ctx.hooks, + }); + } catch (err) { + ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding'); + await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`; + } + } + + // 3. Load context (must re-load each iteration — new messages) + const loaded = await loadContext(ctx.sql, sessionId, chatId); + if (!loaded) { + ctx.log.warn({ sessionId }, 'inference: session or project missing mid-loop'); + state.shouldStop = true; + return; + } + let { session: iterSession, project: iterProject, history } = loaded; + if (args.modelOverride) { + iterSession = { ...iterSession, model: args.modelOverride }; + } + state.iterSession = iterSession; + state.iterProject = iterProject; + const projectRoot = await resolveProjectRoot(iterProject.path); + state.projectRoot = projectRoot; + + // 4. DCP transform + try { + const dcpMsgs = toDcpMessages(history); + const { messages: pruned, stats } = transformMessages(chatId, dcpMsgs); + if (stats.removedCount > 0) { + ctx.log.info({ chatId, ...stats }, 'dcp: transform removed messages'); + history = fromDcpMessages(pruned) as typeof history; + } + } catch (err) { + ctx.log.warn({ err: err instanceof Error ? err.message : String(err), chatId }, 'dcp: transform skipped'); + } + + // 5. Log step boundary + ctx.log.info( + { sessionId, chatId, step: state.stepNumber, assistantMessageId: state.assistantMessageId }, + 'step_start', + ); + + // 6. Build messages + stream phase + const messages = await buildMessagesPayload(iterSession, iterProject, history, agent, ctx.log); + const webToolsEnabled = + iterSession.web_search_enabled ?? iterProject.default_web_search_enabled ?? false; + + if (state.stepNumber === 0 && webToolsEnabled && messages.length >= 2) { + const lastUserMsg = [...messages].reverse().find((m) => m.role === 'user'); + if (lastUserMsg?.content) { + const hint = await rewriteSearchQuery(lastUserMsg.content); + if (hint && messages[0]?.role === 'system' && messages[0].content) { + messages[0].content += `\n\nThe user's search intent can be summarized as: "${hint}"`; + } + } + } + + if (state.pendingRecoveryNote) { + messages.push({ role: 'system', content: state.pendingRecoveryNote }); + state.pendingRecoveryNote = undefined; + } + + // 7. Stream phase + const iterArgs: TurnArgs = { + sessionId, chatId, assistantMessageId: state.assistantMessageId, + toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, + mistakeTracker: state.mistakeTracker, signal, + }; + const streamState: StreamPhaseState = { accumulated: '', startedAt: null }; + try { + const result = await executeStreamPhase(ctx, iterArgs, iterSession, messages, streamState, agent, webToolsEnabled); + state.streamResult = result; + state.startedAt = streamState.startedAt; + + // Non-tool finish: Stop hook + finalize here (edge from PLAN → SYNTHESIZE + // will break the graph loop after this node returns). + if (result.toolCalls.length === 0) { + if (ctx.hooks) { + ctx.hooks.run('Stop', { + event: 'Stop', + session_id: sessionId, + chat_id: chatId, + last_assistant_text: result.content.slice(0, 500), + turn: state.stepNumber, + }).catch(() => {}); + } + await finalizeCompletion(ctx, iterArgs, result, streamState.startedAt, iterSession); + } + } catch (err) { + await handleAbortOrError(ctx, iterArgs, streamState.accumulated, err); + state.shouldStop = true; + } +} + +// -- CALL_TOOL node ------------------------------------------------------- +// Executes the tool phase and stores the result for OBSERVE. + +async function callToolNode( + ctx: InferenceContext, + args: TurnArgs, + state: GraphState, + agent: Agent | null, +): Promise { + const { sessionId, chatId } = args; + const result = state.streamResult; + if (!result) { + ctx.log.warn({ sessionId }, 'state-graph: CALL_TOOL without stream result'); + state.shouldStop = true; + return; + } + const session = state.iterSession; + if (!session) { + ctx.log.warn({ sessionId }, 'state-graph: CALL_TOOL without iterSession'); + state.shouldStop = true; + return; + } + + try { + state.toolPhaseResult = await executeToolPhase( + ctx, args, result, state.startedAt ?? null, + session, state.projectRoot, agent, state.stepNumber, + ); + } catch (err) { + ctx.log.error({ err, sessionId, chatId, step: state.stepNumber }, 'tool phase threw unexpectedly'); + state.shouldStop = true; + } +} + +// -- OBSERVE node --------------------------------------------------------- +// Processes tool results: updates loop locals, mistake tracking, build errors. + +async function observeNode( + ctx: InferenceContext, + args: TurnArgs, + state: GraphState, + _agent: Agent | null, +): Promise { + const { sessionId, chatId } = args; + const tpr = state.toolPhaseResult; + if (!tpr) { + state.shouldStop = true; + return; + } + + // Update loop locals (mirrors the existing while-loop post-tool logic) + state.toolsUsed += tpr.toolCallCount; + state.recentToolCalls = [...state.recentToolCalls, ...tpr.toolCalls]; + state.stepNumber++; + + // Fold tool outcomes into the mistake tracker + for (const o of tpr.outcomes) { + recordStep(state.mistakeTracker, o); + } + + // Auto-fix: after write tools, attempt build and inject errors. + const WRITE_TOOLS = new Set(['edit_file', 'create_file', 'delete_file', 'apply_pending']); + const hasWriteTools = tpr.toolCalls.some((tc) => WRITE_TOOLS.has(tc.name)); + if (hasWriteTools && state.iterSession) { + detectAndRunBuild(ctx, state.projectRoot, sessionId, chatId, state.iterSession.model, state.pendingRecoveryNote) + .then((buildError) => { + if (buildError) state.pendingRecoveryNote = buildError; + }) + .catch(() => {}); + } +} + +// -- REFLECT node --------------------------------------------------------- +// Post-tool decision: decidePostToolAction, nudge/escalate/continue handling. + +async function reflectNode( + ctx: InferenceContext, + args: TurnArgs, + state: GraphState, + _agent: Agent | null, +): Promise { + const { sessionId, chatId, signal } = args; + const tpr = state.toolPhaseResult; + if (!tpr) { + state.shouldStop = true; + return; + } + + const post = decidePostToolAction(tpr.action, state.mistakeTracker); + + if (post === 'stop') { + state.shouldStop = true; + return; + } + + if (post === 'nudge') { + state.pendingRecoveryNote = MISTAKE_RECOVERY_NOTE; + const failureKinds = [...state.mistakeTracker.run]; + await insertMistakeRecoverySentinel(ctx, sessionId, chatId, { + failureKinds, + count: failureKinds.length, + escalated: false, + canContinue: true, + }); + state.mistakeTracker.nudges += 1; + state.mistakeTracker.run = []; + ctx.log.info( + { sessionId, chatId, step: state.stepNumber, nudges: state.mistakeTracker.nudges, failureKinds }, + 'mistake_recovery nudge', + ); + // Continue to next PLAN node — edges check step < cap. + if (state.assistantMessageId !== tpr.nextAssistantId && tpr.nextAssistantId) { + state.assistantMessageId = tpr.nextAssistantId; + } + return; + } + + if (post === 'escalate') { + const failureKinds = [...state.mistakeTracker.run]; + if (tpr.nextAssistantId) { + state.assistantMessageId = tpr.nextAssistantId; + } + const escalateArgs: TurnArgs = { + sessionId, chatId, assistantMessageId: state.assistantMessageId, + toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, + mistakeTracker: state.mistakeTracker, signal, + }; + await finalizeEmpty(ctx, escalateArgs); + await insertMistakeRecoverySentinel(ctx, sessionId, chatId, { + failureKinds, + count: failureKinds.length, + escalated: true, + canContinue: true, + }); + ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); + ctx.log.info( + { sessionId, chatId, step: state.stepNumber, failureKinds }, + 'mistake_recovery escalate — stopping turn', + ); + state.shouldStop = true; + return; + } + + // 'continue' — advance to next assistant message. + if (tpr.nextAssistantId) { + state.assistantMessageId = tpr.nextAssistantId; + } +} diff --git a/apps/server/src/services/inference/stream-phase.ts b/apps/server/src/services/inference/stream-phase.ts index 741a75c..068a8ab 100644 --- a/apps/server/src/services/inference/stream-phase.ts +++ b/apps/server/src/services/inference/stream-phase.ts @@ -56,6 +56,7 @@ export async function executeStreamPhase( message_id: assistantMessageId, chat_id: chatId, role: 'assistant', + ...(args.compareGroupId ? { compare_group_id: args.compareGroupId } : {}), }); const flusher = createContentFlusher(ctx.sql, assistantMessageId, () => state.accumulated); @@ -119,6 +120,7 @@ export async function executeStreamPhase( message_id: assistantMessageId, chat_id: chatId, content: delta, + ...(args.compareGroupId ? { compare_group_id: args.compareGroupId } : {}), }); ctx.log.debug({ sessionId, delta }, 'inference delta'); flusher.scheduleFlush(); diff --git a/apps/server/src/services/inference/supervisor.ts b/apps/server/src/services/inference/supervisor.ts new file mode 100644 index 0000000..69ca287 --- /dev/null +++ b/apps/server/src/services/inference/supervisor.ts @@ -0,0 +1,75 @@ +// Supervisor agent: routes user requests to the best agent via a cheap LLM +// classification call. Activated when session.agent_id === 'supervisor'. + +import type { Agent } from '../../types/api.js'; +import { taskModelCompletion } from '../task-model.js'; + +export interface SupervisorRoute { + agent_id: string; + confidence: number; + reasoning: string; +} + +const SUPERVISOR_SYSTEM_PROMPT = `You are a router. Given the user's request and the available agents, choose the best agent to handle the request. + +Rules: +- Match the request to the agent whose description and toolset best fits the task. +- For code review / bug finding requests → code-reviewer +- For debugging / diagnosing failures → debugger +- For refactoring / simplifying code → refactorer +- For architecture / design / planning → architect or planner +- For security audits → security-auditor +- For building prompts for other agents → prompt-builder +- For exploring / understanding unfamiliar code → recon +- For implementing / writing code changes → builder +- Respond with ONLY the agent id (e.g. "builder") or "none" if no agent fits. +- Do not include any other text, punctuation, or explanation.`; + +const MAX_ROUTING_TOKENS = 30; + +/** + * Given the user's latest message and available agents, classifies which agent + * should handle this turn. Returns null to fall through to default (no agent). + */ +export async function resolveSupervisorTurn( + latestUserMessage: string, + agents: Agent[], + fallbackModel?: string, +): Promise { + // Build agent listing — skip the supervisor itself to avoid self-routing. + const agentList = agents + .filter((a) => a.id !== 'supervisor') + .map((a) => `- ${a.id}: ${a.description} (${a.tools.length} tools)`) + .join('\n'); + + if (!agentList) { + return null; + } + + const userPrompt = `Available agents:\n${agentList}\n\nUser request: ${latestUserMessage.slice(0, 2000)}`; + + const response = await taskModelCompletion({ + system: SUPERVISOR_SYSTEM_PROMPT, + user: userPrompt, + maxTokens: MAX_ROUTING_TOKENS, + temperature: 0.1, + fallbackModel, + }); + + const agentId = response.trim().toLowerCase(); + if (!agentId || agentId === 'none') { + return null; + } + + // Map back to a real agent to validate the id. + const matched = agents.find((a) => a.id === agentId); + if (!matched) { + return null; + } + + return { + agent_id: matched.id, + confidence: 1, + reasoning: `supervisor routed to "${matched.name}" based on request classification`, + }; +} diff --git a/apps/server/src/services/inference/turn.ts b/apps/server/src/services/inference/turn.ts index 00b0104..4542c1e 100644 --- a/apps/server/src/services/inference/turn.ts +++ b/apps/server/src/services/inference/turn.ts @@ -8,7 +8,7 @@ import type { import { resolveProjectRoot } from '../path_guard.js'; import { maybeAutoNameChat } from '../auto_name.js'; import { rewriteSearchQuery } from '../task-search-rewrite.js'; -import { getAgentById } from '../agents.js'; +import { getAgentById, getAgentsForProject } from '../agents.js'; import * as compaction from '../compaction.js'; import { resolveTurnConfig } from './turn-config.js'; import { decideStep, decidePostToolAction } from './step-decision.js'; @@ -49,6 +49,8 @@ import { runStepCapSummary, insertMistakeRecoverySentinel, } from './sentinel-summaries.js'; +import { resolveSupervisorTurn } from './supervisor.js'; +import { runGraph } from './state-graph.js'; // vWhale: auto-fix — detect build command from package.json, run it, return // error text for injection into next iteration. Best-effort, never throws. @@ -149,10 +151,39 @@ export async function runAssistantTurn( ctx.log.warn({ sessionId }, 'inference: session or project missing'); return; } - const { session, project } = initialLoaded; - const agent = session.agent_id + let { session, project, history: initialHistory } = initialLoaded; + if (args.modelOverride) { + session = { ...session, model: args.modelOverride }; + } + let agent = session.agent_id ? await getAgentById(project.path, session.agent_id) : null; + + // vSupervisor: if the session is set to supervisor mode, resolve the real + // agent via a cheap classification call. Falls through to default (no agent) + // if routing returns null. + if (agent?.id === 'supervisor') { + const { agents: availableAgents } = await getAgentsForProject(project.path); + const latestUser = [...initialHistory].reverse().find((m) => m.role === 'user'); + const userMessage = latestUser?.content ?? ''; + if (userMessage) { + const route = await resolveSupervisorTurn(userMessage, availableAgents, session.model ?? undefined); + if (route) { + ctx.log.info( + { sessionId, chatId, resolvedAgent: route.agent_id, reasoning: route.reasoning }, + 'supervisor: routed turn', + ); + agent = await getAgentById(project.path, route.agent_id); + } else { + ctx.log.info({ sessionId, chatId }, 'supervisor: no agent matched, falling through to default'); + agent = null; + } + } else { + ctx.log.info({ sessionId, chatId }, 'supervisor: no user message found, falling through to default'); + agent = null; + } + } + // P5: pure per-turn config (budget + cap math + text-only flag). const { effectiveCap, budget, isTextOnly } = resolveTurnConfig(agent); @@ -162,7 +193,8 @@ export async function runAssistantTurn( if (isTextOnly) { const loaded = await loadContext(ctx.sql, sessionId, chatId); if (loaded) { - await runTextOnlyTurn(ctx, args, loaded.session, loaded.project, loaded.history, agent); + const txtSession = args.modelOverride ? { ...loaded.session, model: args.modelOverride } : loaded.session; + await runTextOnlyTurn(ctx, args, txtSession, loaded.project, loaded.history, agent); } return; } @@ -178,228 +210,244 @@ export async function runAssistantTurn( const mistakeTracker = args.mistakeTracker; let pendingRecoveryNote: string | undefined = args.pendingRecoveryNote; - while (stepNumber < effectiveCap) { - // ---- top-of-loop gate: doom-loop, then budget (pure decision) ---- - const decision = decideStep({ recentToolCalls, toolsUsed, budget }); - if (decision.kind === 'doom') { - // Need fresh history for the summary. - const loaded = await loadContext(ctx.sql, sessionId, chatId); - if (loaded) { - const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; - await runDoomLoopSummary(ctx, iterArgs, loaded.session, loaded.project, loaded.history, agent, decision.loop); + if (session.state_graph_enabled) { + // ---- optional state graph path ---- + const gProjectRoot = await resolveProjectRoot(project.path); + const graphResult = await runGraph(ctx, args, { effectiveCap, budget, agent, projectRoot: gProjectRoot }); + stepNumber = graphResult.stepNumber; + toolsUsed = graphResult.toolsUsed; + recentToolCalls = graphResult.recentToolCalls; + assistantMessageId = graphResult.assistantMessageId; + // mistakeTracker is the same object reference (mutated in place by the graph). + } else { + while (stepNumber < effectiveCap) { + // ---- top-of-loop gate: doom-loop, then budget (pure decision) ---- + const decision = decideStep({ recentToolCalls, toolsUsed, budget }); + if (decision.kind === 'doom') { + // Need fresh history for the summary. + const loaded = await loadContext(ctx.sql, sessionId, chatId); + if (loaded) { + const dlSession = args.modelOverride ? { ...loaded.session, model: args.modelOverride } : loaded.session; + const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; + await runDoomLoopSummary(ctx, iterArgs, dlSession, loaded.project, loaded.history, agent, decision.loop); + } + break; } - break; - } - if (decision.kind === 'budget') { - const loaded = await loadContext(ctx.sql, sessionId, chatId); - if (loaded) { - const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; - await runCapHitSummary(ctx, iterArgs, loaded.session, loaded.project, loaded.history, agent, budget); + if (decision.kind === 'budget') { + const loaded = await loadContext(ctx.sql, sessionId, chatId); + if (loaded) { + const bhSession = args.modelOverride ? { ...loaded.session, model: args.modelOverride } : loaded.session; + const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; + await runCapHitSummary(ctx, iterArgs, bhSession, loaded.project, loaded.history, agent, budget); + } + break; } - break; - } - // decision.kind === 'stream' → proceed with compaction + stream + tools. + // decision.kind === 'stream' → proceed with compaction + stream + tools. - // ---- compaction check ---- - // v1.11: if the prior turn flagged this chat for compaction, run it - // before loadContext so we read post-compaction history. Swallow - // failures and proceed with un-compacted history. - const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>` - SELECT needs_compaction FROM chats WHERE id = ${chatId} - `; - if (chatFlag[0]?.needs_compaction) { - try { - await compaction.process({ - sql: ctx.sql, - config: ctx.config, - log: ctx.log, - broker: ctx.broker, - chatId, - hooks: ctx.hooks, - }); - } catch (err) { - ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding'); - await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`; - } - } - - // ---- load context (must re-load each iteration — new messages since last step) ---- - const loaded = await loadContext(ctx.sql, sessionId, chatId); - if (!loaded) { - ctx.log.warn({ sessionId }, 'inference: session or project missing mid-loop'); - break; - } - let { session: iterSession, project: iterProject, history } = loaded; - const projectRoot = await resolveProjectRoot(iterProject.path); - - try { - const dcpMsgs = toDcpMessages(history); - const { messages: pruned, stats } = transformMessages(chatId, dcpMsgs); - if (stats.removedCount > 0) { - ctx.log.info({ chatId, ...stats }, 'dcp: transform removed messages'); - history = fromDcpMessages(pruned) as typeof history; - } - } catch (err) { - ctx.log.warn({ err: err instanceof Error ? err.message : String(err), chatId }, 'dcp: transform skipped'); - } - - // v1.14.0: log step boundary for instrumentation. step_start parts are in - // the schema CHECK but not emitted here — writing to the assistant message - // before the stream phase creates a sequence-0 collision with - // partsFromAssistantMessage. A WS frame or structured log is sufficient - // since the frontend doesn't render step boundaries in v1.14. - ctx.log.info({ sessionId, chatId, step: stepNumber, assistantMessageId }, 'step_start'); - - // ---- build messages + stream phase ---- - const messages = await buildMessagesPayload(iterSession, iterProject, history, agent, ctx.log); - const webToolsEnabled = - iterSession.web_search_enabled ?? iterProject.default_web_search_enabled ?? false; - - if (stepNumber === 0 && webToolsEnabled && messages.length >= 2) { - const lastUserMsg = [...messages].reverse().find((m) => m.role === 'user'); - if (lastUserMsg?.content) { - const hint = await rewriteSearchQuery(lastUserMsg.content); - if (hint && messages[0]?.role === 'system' && messages[0].content) { - messages[0].content += `\n\nThe user's search intent can be summarized as: "${hint}"`; + // ---- compaction check ---- + // v1.11: if the prior turn flagged this chat for compaction, run it + // before loadContext so we read post-compaction history. Swallow + // failures and proceed with un-compacted history. + const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>` + SELECT needs_compaction FROM chats WHERE id = ${chatId} + `; + if (chatFlag[0]?.needs_compaction) { + try { + await compaction.process({ + sql: ctx.sql, + config: ctx.config, + log: ctx.log, + broker: ctx.broker, + chatId, + hooks: ctx.hooks, + }); + } catch (err) { + ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding'); + await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`; } } - } - // v#12 MistakeTracker: if the prior iteration's nudge fired, append the - // transient recovery note to THIS payload (consumed exactly once, then - // cleared). Never persisted — same lifecycle as the cap-hit/doom-loop - // summary notes, which live only inside the in-memory messages array. - if (pendingRecoveryNote) { - messages.push({ role: 'system', content: pendingRecoveryNote }); - pendingRecoveryNote = undefined; - } - - const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; - const state: StreamPhaseState = { accumulated: '', startedAt: null }; - let result: StreamResult; - try { - result = await executeStreamPhase(ctx, iterArgs, iterSession, messages, state, agent, webToolsEnabled); - } catch (err) { - await handleAbortOrError(ctx, iterArgs, state.accumulated, err); - break; - } - - // ---- non-tool finish → finalize and exit ---- - if (result.toolCalls.length === 0) { - // vWhale: Stop hook (best-effort, non-blocking). - if (ctx.hooks) { - ctx.hooks.run('Stop', { - event: 'Stop', - session_id: sessionId, - chat_id: chatId, - last_assistant_text: result.content.slice(0, 500), - turn: stepNumber, - }).catch(() => {}); + // ---- load context (must re-load each iteration — new messages since last step) ---- + const loaded = await loadContext(ctx.sql, sessionId, chatId); + if (!loaded) { + ctx.log.warn({ sessionId }, 'inference: session or project missing mid-loop'); + break; } - await finalizeCompletion(ctx, iterArgs, result, state.startedAt, iterSession); - break; - } + let { session: iterSession, project: iterProject, history } = loaded; + if (args.modelOverride) { + iterSession = { ...iterSession, model: args.modelOverride }; + } + const projectRoot = await resolveProjectRoot(iterProject.path); - // ---- steps: 0 edge case ---- - // effectiveCap check above guarantees we're inside the loop, but this - // guard handles the theoretical case where the model emits tool calls - // on step 0 when effectiveCap would have been 0 (impossible since the - // while condition prevents entry, but kept for safety). If effectiveCap - // is 1 and we're on step 0, tool calls ARE executed — steps counts - // iterations, not post-first-stream. + try { + const dcpMsgs = toDcpMessages(history); + const { messages: pruned, stats } = transformMessages(chatId, dcpMsgs); + if (stats.removedCount > 0) { + ctx.log.info({ chatId, ...stats }, 'dcp: transform removed messages'); + history = fromDcpMessages(pruned) as typeof history; + } + } catch (err) { + ctx.log.warn({ err: err instanceof Error ? err.message : String(err), chatId }, 'dcp: transform skipped'); + } - // ---- tool phase ---- - let toolPhaseResult: ToolPhaseResult; - try { - toolPhaseResult = await executeToolPhase(ctx, iterArgs, result, state.startedAt, iterSession, projectRoot, agent, stepNumber); - } catch (err) { - // Tool phase errors are unexpected (individual tool failures are - // caught inside executeToolPhase). Log and break. - ctx.log.error({ err, sessionId, chatId, step: stepNumber }, 'tool phase threw unexpectedly'); - break; - } + // v1.14.0: log step boundary for instrumentation. step_start parts are in + // the schema CHECK but not emitted here — writing to the assistant message + // before the stream phase creates a sequence-0 collision with + // partsFromAssistantMessage. A WS frame or structured log is sufficient + // since the frontend doesn't render step boundaries in v1.14. + ctx.log.info({ sessionId, chatId, step: stepNumber, assistantMessageId }, 'step_start'); - // ---- update loop locals ---- - toolsUsed += toolPhaseResult.toolCallCount; - recentToolCalls = [...recentToolCalls, ...toolPhaseResult.toolCalls]; - stepNumber++; + // ---- build messages + stream phase ---- + const messages = await buildMessagesPayload(iterSession, iterProject, history, agent, ctx.log); + const webToolsEnabled = + iterSession.web_search_enabled ?? iterProject.default_web_search_enabled ?? false; - // v#12 MistakeTracker: fold this iteration's tool outcomes into the - // tracker, in order. recordStep mutates `mistakeTracker` in place (it is - // the same object referenced by args). A 'success' clears the streak. - for (const o of toolPhaseResult.outcomes) { - recordStep(mistakeTracker, o); - } + if (stepNumber === 0 && webToolsEnabled && messages.length >= 2) { + const lastUserMsg = [...messages].reverse().find((m) => m.role === 'user'); + if (lastUserMsg?.content) { + const hint = await rewriteSearchQuery(lastUserMsg.content); + if (hint && messages[0]?.role === 'system' && messages[0].content) { + messages[0].content += `\n\nThe user's search intent can be summarized as: "${hint}"`; + } + } + } - // vWhale: auto-fix — after write tools, attempt build and inject errors. - const WRITE_TOOLS = new Set(['edit_file', 'create_file', 'delete_file', 'apply_pending']); - const hasWriteTools = toolPhaseResult.toolCalls.some((tc) => WRITE_TOOLS.has(tc.name)); - if (hasWriteTools) { - detectAndRunBuild(ctx, projectRoot, sessionId, chatId, iterSession.model, pendingRecoveryNote) - .then((buildError) => { - if (buildError) pendingRecoveryNote = buildError; - }) - .catch(() => {}); - } + // v#12 MistakeTracker: if the prior iteration's nudge fired, append the + // transient recovery note to THIS payload (consumed exactly once, then + // cleared). Never persisted — same lifecycle as the cap-hit/doom-loop + // summary notes, which live only inside the in-memory messages array. + if (pendingRecoveryNote) { + messages.push({ role: 'system', content: pendingRecoveryNote }); + pendingRecoveryNote = undefined; + } - // v#12 MistakeTracker: post-tool decision (pure). 'stop' = the tool phase - // returned a non-'continue' action ('paused' for user input, or - // 'synthesis_done') — neither a nudge nor an escalate would change the - // control flow, so the mistake check is skipped. On 'continue' the - // heterogeneous-failure pattern gates nudge/escalate/continue. Complements - // the doom-loop gate above, which only catches *identical* repeats. - const post = decidePostToolAction(toolPhaseResult.action, mistakeTracker); - if (post === 'stop') { - break; - } - if (post === 'nudge') { - // Soft intervention: inject model-facing recovery guidance into the NEXT - // step's payload, drop a UI sentinel, bump nudges, reset the streak, and - // continue. The note is consumed (and cleared) at the top of the next - // iteration's payload build. - pendingRecoveryNote = MISTAKE_RECOVERY_NOTE; - const failureKinds = [...mistakeTracker.run]; - await insertMistakeRecoverySentinel(ctx, sessionId, chatId, { - failureKinds, - count: failureKinds.length, - escalated: false, - canContinue: true, - }); - mistakeTracker.nudges += 1; - mistakeTracker.run = []; - ctx.log.info( - { sessionId, chatId, step: stepNumber, nudges: mistakeTracker.nudges, failureKinds }, - 'mistake_recovery nudge', - ); + const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; + const state: StreamPhaseState = { accumulated: '', startedAt: null }; + let result: StreamResult; + try { + result = await executeStreamPhase(ctx, iterArgs, iterSession, messages, state, agent, webToolsEnabled); + } catch (err) { + await handleAbortOrError(ctx, iterArgs, state.accumulated, err); + break; + } + + // ---- non-tool finish → finalize and exit ---- + if (result.toolCalls.length === 0) { + // vWhale: Stop hook (best-effort, non-blocking). + if (ctx.hooks) { + ctx.hooks.run('Stop', { + event: 'Stop', + session_id: sessionId, + chat_id: chatId, + last_assistant_text: result.content.slice(0, 500), + turn: stepNumber, + }).catch(() => {}); + } + await finalizeCompletion(ctx, iterArgs, result, state.startedAt, iterSession); + break; + } + + // ---- steps: 0 edge case ---- + // effectiveCap check above guarantees we're inside the loop, but this + // guard handles the theoretical case where the model emits tool calls + // on step 0 when effectiveCap would have been 0 (impossible since the + // while condition prevents entry, but kept for safety). If effectiveCap + // is 1 and we're on step 0, tool calls ARE executed — steps counts + // iterations, not post-first-stream. + + // ---- tool phase ---- + let toolPhaseResult: ToolPhaseResult; + try { + toolPhaseResult = await executeToolPhase(ctx, iterArgs, result, state.startedAt, iterSession, projectRoot, agent, stepNumber); + } catch (err) { + // Tool phase errors are unexpected (individual tool failures are + // caught inside executeToolPhase). Log and break. + ctx.log.error({ err, sessionId, chatId, step: stepNumber }, 'tool phase threw unexpectedly'); + break; + } + + // ---- update loop locals ---- + toolsUsed += toolPhaseResult.toolCallCount; + recentToolCalls = [...recentToolCalls, ...toolPhaseResult.toolCalls]; + stepNumber++; + + // v#12 MistakeTracker: fold this iteration's tool outcomes into the + // tracker, in order. recordStep mutates `mistakeTracker` in place (it is + // the same object referenced by args). A 'success' clears the streak. + for (const o of toolPhaseResult.outcomes) { + recordStep(mistakeTracker, o); + } + + // vWhale: auto-fix — after write tools, attempt build and inject errors. + const WRITE_TOOLS = new Set(['edit_file', 'create_file', 'delete_file', 'apply_pending']); + const hasWriteTools = toolPhaseResult.toolCalls.some((tc) => WRITE_TOOLS.has(tc.name)); + if (hasWriteTools) { + detectAndRunBuild(ctx, projectRoot, sessionId, chatId, iterSession.model, pendingRecoveryNote) + .then((buildError) => { + if (buildError) pendingRecoveryNote = buildError; + }) + .catch(() => {}); + } + + // v#12 MistakeTracker: post-tool decision (pure). 'stop' = the tool phase + // returned a non-'continue' action ('paused' for user input, or + // 'synthesis_done') — neither a nudge nor an escalate would change the + // control flow, so the mistake check is skipped. On 'continue' the + // heterogeneous-failure pattern gates nudge/escalate/continue. Complements + // the doom-loop gate above, which only catches *identical* repeats. + const post = decidePostToolAction(toolPhaseResult.action, mistakeTracker); + if (post === 'stop') { + break; + } + if (post === 'nudge') { + // Soft intervention: inject model-facing recovery guidance into the NEXT + // step's payload, drop a UI sentinel, bump nudges, reset the streak, and + // continue. The note is consumed (and cleared) at the top of the next + // iteration's payload build. + pendingRecoveryNote = MISTAKE_RECOVERY_NOTE; + const failureKinds = [...mistakeTracker.run]; + await insertMistakeRecoverySentinel(ctx, sessionId, chatId, { + failureKinds, + count: failureKinds.length, + escalated: false, + canContinue: true, + }); + mistakeTracker.nudges += 1; + mistakeTracker.run = []; + ctx.log.info( + { sessionId, chatId, step: stepNumber, nudges: mistakeTracker.nudges, failureKinds }, + 'mistake_recovery nudge', + ); + assistantMessageId = toolPhaseResult.nextAssistantId!; + continue; + } + if (post === 'escalate') { + // The nudge didn't break the failure run — stop the turn (cap-hit-style) + // to avoid burning the whole step budget on heterogeneous failures. The + // next assistant row is still 'streaming'; finalize it as an empty + // complete row so the slot doesn't dangle, then drop the escalate + // sentinel. + const failureKinds = [...mistakeTracker.run]; + assistantMessageId = toolPhaseResult.nextAssistantId!; + const escalateArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; + await finalizeEmpty(ctx, escalateArgs); + await insertMistakeRecoverySentinel(ctx, sessionId, chatId, { + failureKinds, + count: failureKinds.length, + escalated: true, + canContinue: true, + }); + ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); + ctx.log.info( + { sessionId, chatId, step: stepNumber, failureKinds }, + 'mistake_recovery escalate — stopping turn', + ); + break; + } + + // 'continue' — advance to next assistant message. assistantMessageId = toolPhaseResult.nextAssistantId!; - continue; } - if (post === 'escalate') { - // The nudge didn't break the failure run — stop the turn (cap-hit-style) - // to avoid burning the whole step budget on heterogeneous failures. The - // next assistant row is still 'streaming'; finalize it as an empty - // complete row so the slot doesn't dangle, then drop the escalate - // sentinel. - const failureKinds = [...mistakeTracker.run]; - assistantMessageId = toolPhaseResult.nextAssistantId!; - const escalateArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; - await finalizeEmpty(ctx, escalateArgs); - await insertMistakeRecoverySentinel(ctx, sessionId, chatId, { - failureKinds, - count: failureKinds.length, - escalated: true, - canContinue: true, - }); - ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); - ctx.log.info( - { sessionId, chatId, step: stepNumber, failureKinds }, - 'mistake_recovery escalate — stopping turn', - ); - break; - } - - // 'continue' — advance to next assistant message. - assistantMessageId = toolPhaseResult.nextAssistantId!; } // vWhale: Stop hook at post-loop exit (best-effort, non-blocking). @@ -438,8 +486,9 @@ export async function runAssistantTurn( if (stepNumber >= effectiveCap && effectiveCap < Infinity) { const loaded = await loadContext(ctx.sql, sessionId, chatId); if (loaded) { + const scSession = args.modelOverride ? { ...loaded.session, model: args.modelOverride } : loaded.session; const capArgs: TurnArgs = { sessionId, chatId, assistantMessageId, toolsUsed, recentToolCalls, mistakeTracker, signal }; - await runStepCapSummary(ctx, capArgs, loaded.session, loaded.project, loaded.history, agent, stepNumber, effectiveCap); + await runStepCapSummary(ctx, capArgs, scSession, loaded.project, loaded.history, agent, stepNumber, effectiveCap); } } } @@ -510,6 +559,31 @@ export async function runInference( }); } +// v2.8-compare: run inference with a model override and compare group id. +// Used by the compare endpoint to run the same message through N models in +// parallel. Each call publishes frames scoped to its compare_group_id. +export async function runInferenceWithModel( + ctx: InferenceContext, + sessionId: string, + chatId: string, + assistantMessageId: string, + modelOverride: string, + compareGroupId: string, + signal?: AbortSignal, +): Promise { + return runAssistantTurn(ctx, { + sessionId, + chatId, + assistantMessageId, + toolsUsed: 0, + recentToolCalls: [], + mistakeTracker: freshMistakeState(), + modelOverride, + compareGroupId, + signal, + }); +} + // v1.8.2: cap-hit summary flow. Called instead of erroring when the loop // hits its budget. Reuses the in-flight assistant message slot to stream a // short wrap-up reply with the synthetic note prepended and tools disabled, diff --git a/apps/server/src/services/inference/types.ts b/apps/server/src/services/inference/types.ts index 45567cf..23b63b6 100644 --- a/apps/server/src/services/inference/types.ts +++ b/apps/server/src/services/inference/types.ts @@ -52,7 +52,9 @@ export interface InferenceFrame { // arena frames | 'battle_started' | 'contestant_updated' - | 'battle_updated'; + | 'battle_updated' + // inter-agent message + | 'agent_message'; message_id?: string; message_ids?: string[]; chat_id?: string; @@ -103,6 +105,11 @@ export interface InferenceFrame { status?: string; run_status?: 'running' | 'completed' | 'failed' | 'cancelled'; report?: string; + // v2.8-compare: groups messages belonging to the same compare operation. + compare_group_id?: string; + // inter-agent message + sender_step_id?: string; + channel?: string; // arena frames battle_id?: string; battle_type?: 'coding' | 'qa'; @@ -177,5 +184,10 @@ export interface TurnArgs { // Never persisted — mirrors how the cap-hit/doom-loop notes live only inside // the summary call's messages array. pendingRecoveryNote?: string; + // v2.8-compare: when set, overrides the session model for this single turn. + // Used by the compare endpoint to run the same message through N models. + modelOverride?: string; + // v2.8-compare: opaque group id that rides on every published frame. + compareGroupId?: string; signal: AbortSignal | undefined; } diff --git a/apps/server/src/services/mcp-client.ts b/apps/server/src/services/mcp-client.ts index f588236..62c8c81 100644 --- a/apps/server/src/services/mcp-client.ts +++ b/apps/server/src/services/mcp-client.ts @@ -148,6 +148,19 @@ export function getServerPermission(prefixedToolName: string): McpPermission { return state?.permission ?? 'allow'; } +/** Override the permission for a server. Used by the approval flow. */ +export function setServerPermission(serverName: string, permission: McpPermission): void { + const state = servers.get(serverName); + if (state) { + state.permission = permission; + } +} + +/** Get the server name from a prefixed tool name. Returns null if not an MCP tool. */ +export function getServerName(prefixedToolName: string): string | null { + return toolToServer.get(prefixedToolName) ?? null; +} + /** Return all wrapped ToolDefs from all connected servers, flattened. */ export function getTools(): ToolDef>[] { const all: ToolDef>[] = []; diff --git a/apps/server/src/services/memory/index.ts b/apps/server/src/services/memory/index.ts index b0c85cb..9b1fae1 100644 --- a/apps/server/src/services/memory/index.ts +++ b/apps/server/src/services/memory/index.ts @@ -3,9 +3,4 @@ export { formatMemoryBlock } from './prompt.js'; export { scanMemoryScopes } from './scan.js'; export { parseMemoryEntries } from './entries.js'; export { ensureMemoryScaffold, getMemoryRoot } from './paths.js'; -export { ContextTier } from './context-tier.js'; -export { DeepDream } from './deep-dream.js'; -export { CoreTier } from './core-tier.js'; export type { MemoryEntry } from './entries.js'; -export type { ContextTierConfig, ConversationTurn } from './context-tier.js'; -export type { CoreTierEntry, CoreTierSearchResult, CoreTierSearchOptions } from './core-tier.js'; diff --git a/apps/server/src/services/tools/codecontext/get_wiki_article.ts b/apps/server/src/services/tools/codecontext/get_wiki_article.ts new file mode 100644 index 0000000..b3d85ea --- /dev/null +++ b/apps/server/src/services/tools/codecontext/get_wiki_article.ts @@ -0,0 +1,61 @@ +import { z } from 'zod'; +import type { ToolDef } from '../types.js'; +import { callBoocontext } from '../../boocontext_client.js'; + +export const GetWikiArticleInput = z.object({ + article: z.string().min(1).describe('Article name (e.g. "auth", "database", "routes")'), + directory: z.string().optional().describe('Project directory'), +}); +export type GetWikiArticleInputT = z.infer; + +const DESCRIPTION = + 'Returns a persistent codebase wiki article by name (auth, database, routes, etc.). ' + + 'Generated on first request and cached to disk. Avoids running expensive full-scan tools for targeted documentation.'; + +/** + * Standalone execute function — calls the boocontext MCP server's + * codesight_get_wiki_article tool and returns the article text. + * + * Structured for direct test access: accepts input + projectPath, + * no side effects beyond the MCP call. + */ +export async function executeGetWikiArticle( + input: GetWikiArticleInputT, + projectPath: string, +): Promise { + const args: Record = { article: input.article }; + if (input.directory) args['directory'] = input.directory!; + const resp = await callBoocontext({ toolName: 'codesight_get_wiki_article', args }); + return resp.result; +} + +export const getWikiArticle: ToolDef = { + name: 'get_wiki_article', + description: DESCRIPTION, + inputSchema: GetWikiArticleInput, + jsonSchema: { + type: 'function', + function: { + name: 'get_wiki_article', + description: DESCRIPTION, + parameters: { + type: 'object', + properties: { + article: { + type: 'string', + description: 'Article name (e.g. "auth", "database", "routes")', + }, + directory: { + type: 'string', + description: 'Project directory', + }, + }, + required: ['article'], + additionalProperties: false, + }, + }, + }, + async execute(input, projectRoot) { + return executeGetWikiArticle(input, projectRoot); + }, +}; diff --git a/apps/server/src/services/tools/manage_memory.ts b/apps/server/src/services/tools/manage_memory.ts new file mode 100644 index 0000000..bf08c01 --- /dev/null +++ b/apps/server/src/services/tools/manage_memory.ts @@ -0,0 +1,160 @@ +import { z } from 'zod'; +import { existsSync } from 'node:fs'; +import { writeFile, unlink } from 'node:fs/promises'; +import { join } from 'node:path'; +import type { ToolDef } from '../tools/types.js'; +import { ensureMemoryScaffold, getMemoryRoot } from '../memory/paths.js'; +import { writeEntry, readTopicFiles } from '../memory/store.js'; + +const ManageMemoryInput = z.object({ + topic: z.enum(['project', 'user', 'reference']).describe('Memory topic category'), + title: z.string().min(1).max(200).describe('Entry title (used as identifier for update/delete)'), + content: z.string().optional().describe('Memory content body (required for create/update)'), + tags: z.array(z.string()).optional().describe('Optional tags for search'), + action: z.enum(['create', 'update', 'delete']).describe('Action to perform'), +}); + +type InputT = z.infer; + +function titleToFilename(title: string): string { + return ( + title + .toLowerCase() + .replace(/[^a-z0-9]+/g, '-') + .replace(/(^-|-$)/g, '') + '.md' + ); +} + +/** + * Try to update the CoreTier SQLite database in addition to the file store. + * This is best-effort — CoreTier is optional (file store is primary). + */ +async function syncCoreTier( + _root: string, + _topic: string, + _title: string, + _content: string, + _tags: string[], +): Promise { + // CoreTier SQLite backend is not available in this build — file store only. +} + +export const manageMemoryTool: ToolDef = { + name: 'manage_memory', + description: + 'Create, update, or delete memory entries in .boocode/memory/ for cross-session recall. ' + + 'Use to persist project conventions, user preferences, and architectural decisions. ' + + 'Actions: create (write new entry), update (modify existing entry), delete (remove entry).', + inputSchema: ManageMemoryInput, + jsonSchema: { + type: 'function', + function: { + name: 'manage_memory', + description: 'Manage memory entries — create, update, or delete', + parameters: { + type: 'object', + properties: { + topic: { + type: 'string', + enum: ['project', 'user', 'reference'], + description: 'Memory topic category', + }, + title: { type: 'string', description: 'Entry title (identifier for update/delete)' }, + content: { + type: 'string', + description: 'Memory content body (required for create/update)', + }, + tags: { + type: 'array', + items: { type: 'string' }, + description: 'Optional tags for search', + }, + action: { + type: 'string', + enum: ['create', 'update', 'delete'], + description: 'Action to perform', + }, + }, + required: ['topic', 'title', 'action'], + }, + }, + }, + async execute(input: InputT, projectRoot: string): Promise { + const root = getMemoryRoot(projectRoot); + await ensureMemoryScaffold(root); + const filename = titleToFilename(input.title); + + if (input.action === 'create') { + if (!input.content) { + return { error: 'Content is required for create action.' }; + } + await writeEntry(root, input.topic, input.title, input.content, input.tags ?? []); + await syncCoreTier(root, input.topic, input.title, input.content, input.tags ?? []); + return { + result: `Memory entry "${input.title}" created in .boocode/memory/${input.topic}/`, + }; + } + + if (input.action === 'update') { + if (!input.content) { + return { error: 'Content is required for update action.' }; + } + + // Resolve target file path — try computed filename first, then heading match + let targetPath = join(root, input.topic, filename); + if (!existsSync(targetPath)) { + const files = await readTopicFiles(root, input.topic); + const matched = [...files.keys()].find((name) => { + const content = files.get(name); + return content?.trimStart().startsWith(`## ${input.topic}: ${input.title}`); + }); + if (matched) { + targetPath = join(root, input.topic, matched); + } else { + return { + error: `Memory entry "${input.title}" not found in .boocode/memory/${input.topic}/`, + }; + } + } + + const tagLine = + (input.tags ?? []).length > 0 + ? `> tags: ${(input.tags ?? []).join(', ')}\n\n` + : '\n'; + const entry = `## ${input.topic}: ${input.title}\n${tagLine}${input.content}\n`; + await writeFile(targetPath, entry, 'utf8'); + + await syncCoreTier(root, input.topic, input.title, input.content, input.tags ?? []); + return { + result: `Memory entry "${input.title}" updated in .boocode/memory/${input.topic}/`, + }; + } + + if (input.action === 'delete') { + // Resolve target file path + let targetPath = join(root, input.topic, filename); + if (!existsSync(targetPath)) { + const files = await readTopicFiles(root, input.topic); + const matched = [...files.keys()].find((name) => { + const content = files.get(name); + return content?.trimStart().startsWith(`## ${input.topic}: ${input.title}`); + }); + if (matched) { + targetPath = join(root, input.topic, matched); + } else { + return { + error: `Memory entry "${input.title}" not found in .boocode/memory/${input.topic}/`, + }; + } + } + + await unlink(targetPath); + + return { + result: `Memory entry "${input.title}" deleted from .boocode/memory/${input.topic}/`, + }; + } + + return { error: `Unknown action: ${input.action}` }; + }, +}; diff --git a/apps/server/src/types/api.ts b/apps/server/src/types/api.ts index a1264f7..3a0cc80 100644 --- a/apps/server/src/types/api.ts +++ b/apps/server/src/types/api.ts @@ -52,6 +52,9 @@ export interface Session { // path_guard's extraRoots check consults this list before refusing reads // outside the primary project root. allowed_read_paths: string[]; + // v[state-graph]: optional declarative state-graph engine. Default false + // when the column is absent on existing DBs (ALTER TABLE .. DEFAULT FALSE). + state_graph_enabled: boolean; } // v1.14.x-html-artifact-panes: 'markdown_artifact' + 'html_artifact' added. @@ -153,6 +156,7 @@ export interface Chat { id: string; session_id: string; name: string | null; + model: string | null; status: ChatStatus; created_at: string; updated_at: string;