/** * v1.15.0-mcp-multi: multi-server MCP client registry. * * Connects to multiple MCP servers (Streamable HTTP or stdio transport), * discovers tools from each, wraps them as BooCode ToolDefs with a * `_` name prefix, and routes callTool by prefix. * * Graceful degradation: one failing server doesn't block others. * Read-only invariant: tools with readOnlyHint === false are rejected. */ import { Client } from '@modelcontextprotocol/sdk/client'; import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'; import { z } from 'zod'; import type { FastifyBaseLogger } from 'fastify'; import type { McpServerEntry, McpServerConfig } from './mcp-config.js'; import type { ToolDef } from './tools.js'; // ---- Types ---- interface McpToolAnnotations { readOnlyHint?: boolean; destructiveHint?: boolean; [key: string]: unknown; } interface McpToolDef { name: string; description?: string; inputSchema: Record; annotations?: McpToolAnnotations; } interface ServerState { client: Client; transport: StreamableHTTPClientTransport | StdioClientTransport; tools: ToolDef>[]; type: 'streamableHttp' | 'stdio'; } // ---- Module-level state ---- const servers = new Map(); // Reverse map: prefixed tool name → server name (built during discovery) const toolToServer = new Map(); let log: FastifyBaseLogger | null = null; const MAX_RESULT_BYTES = 5 * 1024 * 1024; // ---- Public API ---- /** * Connect to all configured MCP servers, discover tools, and wrap them. * Per-server graceful degradation: a failing server is logged and skipped. */ export async function initialize( entries: McpServerEntry[], logger: FastifyBaseLogger, ): Promise { log = logger; // Connect servers in parallel — each wrapped in try/catch for isolation await Promise.all( entries.map(async (entry) => { try { await connectServer(entry); } catch (err) { log!.warn( { err, server: entry.name }, `mcp: failed to initialize server "${entry.name}" — its tools will be unavailable`, ); } }), ); if (servers.size > 0) { const totalTools = Array.from(servers.values()).reduce((n, s) => n + s.tools.length, 0); log.info( { servers: servers.size, tools: totalTools }, 'mcp: multi-server initialization complete', ); } } /** * Call an MCP tool by its prefixed name. Routes to the correct server * using the toolToServer reverse map. */ export async function callTool( prefixedName: string, args: Record, ): Promise { const serverName = toolToServer.get(prefixedName); if (!serverName) { return { error: true, output: `MCP tool "${prefixedName}" not found in any server` }; } const state = servers.get(serverName); if (!state) { return { error: true, output: `MCP server "${serverName}" not available` }; } // Strip the "_" prefix to get the original tool name const originalName = prefixedName.slice(serverName.length + 1); try { const result = await state.client.callTool({ name: originalName, arguments: args }); const content = result.content as Array<{ type: string; text?: string; [key: string]: unknown }>; if (!content || content.length === 0) { return '(no output)'; } if (result.isError) { const joined = content .map((block) => (block.type === 'text' ? block.text ?? '' : JSON.stringify(block))) .join('\n'); return { error: true, output: joined || '(MCP error with no details)' }; } const parts = content.map((block) => { if (block.type === 'text') return block.text ?? ''; return JSON.stringify(block); }); const joined = parts.join('\n'); if (joined.length > MAX_RESULT_BYTES) { log?.warn({ tool: originalName, server: serverName, bytes: joined.length, cap: MAX_RESULT_BYTES }, 'mcp: result truncated'); return joined.slice(0, MAX_RESULT_BYTES) + '\n\n[truncated — MCP result exceeded size limit]'; } return joined; } catch (err) { log?.warn({ err, tool: originalName, server: serverName }, 'mcp: callTool failed'); return { error: true, output: err instanceof Error ? err.message : 'MCP server unreachable', }; } } /** Return all wrapped ToolDefs from all connected servers, flattened. */ export function getTools(): ToolDef>[] { const all: ToolDef>[] = []; for (const state of servers.values()) { all.push(...state.tools); } return all; } /** Return status of each server (for debug/status endpoints). */ export function getMcpServers(): Array<{ name: string; type: 'streamableHttp' | 'stdio'; toolCount: number; connected: boolean; }> { return Array.from(servers.entries()).map(([name, state]) => ({ name, type: state.type, toolCount: state.tools.length, connected: true, })); } /** * Graceful shutdown. For stdio servers, the SDK's transport.close() handles * SIGTERM + timeout. For HTTP servers, close the transport. */ export async function shutdown(): Promise { const closePromises: Promise[] = []; for (const [name, state] of servers) { closePromises.push( (async () => { try { await state.transport.close(); log?.info({ server: name }, 'mcp: server transport closed'); } catch (err) { log?.warn({ err, server: name }, 'mcp: error closing server transport'); } })(), ); } await Promise.all(closePromises); servers.clear(); toolToServer.clear(); } // ---- Internal helpers ---- async function connectServer(entry: McpServerEntry): Promise { const { name, config } = entry; const client = new Client({ name: 'boocode', version: '1.15.0' }); let transport: StreamableHTTPClientTransport | StdioClientTransport; if (config.type === 'streamableHttp') { transport = createHttpTransport(config); } else { transport = createStdioTransport(config); } await client.connect(transport); const result = await client.listTools(); const mcpTools = (result.tools ?? []) as McpToolDef[]; const tools: ToolDef>[] = []; for (const t of mcpTools) { if (t.annotations?.readOnlyHint === false) { log!.info({ tool: t.name, server: name }, 'mcp: skipping non-read-only tool'); continue; } const wrapped = wrapMcpTool(name, t); tools.push(wrapped); toolToServer.set(wrapped.name, name); } servers.set(name, { client, transport, tools, type: config.type }); log!.info( { server: name, type: config.type, count: tools.length, names: tools.map((t) => t.name) }, 'mcp: server initialized', ); } function createHttpTransport(config: Extract): StreamableHTTPClientTransport { const requestInit: RequestInit = {}; if (config.headers && Object.keys(config.headers).length > 0) { requestInit.headers = config.headers; } return new StreamableHTTPClientTransport(new URL(config.url), { requestInit }); } function createStdioTransport(config: Extract): StdioClientTransport { return new StdioClientTransport({ command: config.command, args: config.args, env: config.env, stderr: 'pipe', }); } /** Wrap an MCP tool as a BooCode ToolDef with a server-name prefix. */ export function wrapMcpTool( serverName: string, mcpTool: McpToolDef, ): ToolDef> { const prefixedName = `${serverName}_${mcpTool.name}`; return { name: prefixedName, description: mcpTool.description ?? '', inputSchema: z.record(z.unknown()), jsonSchema: { type: 'function' as const, function: { name: prefixedName, description: mcpTool.description ?? '', parameters: mcpTool.inputSchema ?? { type: 'object', properties: {} }, }, }, execute: async (input) => { return callTool(prefixedName, input); }, }; } /** Exposed for unit tests — extract content from an MCP result. */ export function extractContent( content: Array<{ type: string; text?: string; [key: string]: unknown }> | undefined, isError?: boolean, ): unknown { if (!content || content.length === 0) return '(no output)'; const parts = content.map((block) => { if (block.type === 'text') return block.text ?? ''; return JSON.stringify(block); }); const joined = parts.join('\n'); if (isError) { return { error: true, output: joined || '(MCP error with no details)' }; } return joined; } /** Exposed for unit tests — the read-only guard predicate. */ export function isToolReadOnly(annotations?: McpToolAnnotations): boolean { return annotations?.readOnlyHint !== false; }