import type { Session, ToolCall } from '../../types/api.js'; import * as modelContext from '../model-context.js'; import { PathScopeError } from '../path_guard.js'; import { TOOLS_BY_NAME } from '../tools.js'; import { maybeFlagForCompaction } from './payload.js'; import { insertParts, partsFromAssistantMessage, partsFromToolMessage } from './parts.js'; import type { InferenceContext, StreamResult, TurnArgs, } from './turn.js'; // v1.12.4: ESM value-import cycle. executeToolPhase recurses into // runAssistantTurn which lives in inference.ts. The cycle is safe because // the reference is read at call time (inside an async function body), not // at module top-level. Node + tsc resolve this cleanly. import { runAssistantTurn } from './turn.js'; async function executeToolCall( projectRoot: string, toolCall: ToolCall ): Promise<{ output: unknown; truncated: boolean; error?: string }> { const tool = TOOLS_BY_NAME[toolCall.name]; if (!tool) { return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` }; } const parsed = tool.inputSchema.safeParse(toolCall.args); if (!parsed.success) { // v1.12 Track B.2: enrich the zod-reject path so the model sees a // one-line, tool-named hint ("tool 'search_symbols' rejected — query: // Required") instead of a JSON blob of flatten output. Higher recovery // rate on the next turn; doom-loop guard still bounds infinite retries. // The cast is because tool.inputSchema is ZodType, so zod can't // statically narrow flatten()'s fieldErrors key set — but the runtime // shape is the standard { formErrors: string[]; fieldErrors: Record<...> }. const flatten = parsed.error.flatten() as { formErrors: string[]; fieldErrors: Record; }; const fieldErrors = Object.entries(flatten.fieldErrors) .map(([field, errs]) => `${field}: ${errs?.[0] ?? 'invalid'}`) .join('; '); const formError = flatten.formErrors[0]; const hint = fieldErrors || formError || 'unknown validation error'; return { output: null, truncated: false, error: `tool '${toolCall.name}' rejected — ${hint}`, }; } try { const output = await tool.execute(parsed.data, projectRoot); const truncated = typeof output === 'object' && output !== null && 'truncated' in output ? Boolean((output as { truncated: unknown }).truncated) : false; return { output, truncated }; } catch (err) { if (err instanceof PathScopeError) { return { output: null, truncated: false, error: err.message }; } return { output: null, truncated: false, error: err instanceof Error ? err.message : String(err), }; } } export async function executeToolPhase( ctx: InferenceContext, args: TurnArgs, result: StreamResult, startedAt: string | null, session: Session, projectRoot: string ): Promise { const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args; const { content, toolCalls, promptTokens, completionTokens } = result; // v1.11.3: ctx_max comes from llama-swap /upstream//props, not the // streaming completion (which doesn't emit n_ctx). getModelContext caches // the positive lookup for the process lifetime, so this is a single Map // hit after the first invocation per model. const mctx = await modelContext.getModelContext(session.model); const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` UPDATE messages SET content = ${content}, status = 'complete', tool_calls = ${ctx.sql.json(toolCalls as never)}, tokens_used = ${completionTokens}, ctx_used = ${promptTokens}, ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; // v1.13.0: dual-write to message_parts. v1.13.1-B made parts authoritative // for reads via the messages_with_parts view; the JSON column write above // remains for v1.13.1 fallback compatibility (dropped in v1.13.2). // v1.13.1-C: include result.reasoning so models with separate reasoning // channels (qwen3.6) get a kind='reasoning' part at sequence 0. // TODO(v1.13.1): wrap the UPDATE above and this insertParts in a single // sql.begin before flipping read authority to message_parts. Without the // transaction, a crash between the two leaves an orphan message that // becomes invisible in the parts-authoritative read path. await insertParts( ctx.sql, partsFromAssistantMessage({ content, tool_calls: toolCalls, reasoning: result.reasoning, }).map((p) => ({ ...p, message_id: assistantMessageId, })), ); // v1.11: flag for compaction if this turn pushed us over the usable budget. // We never compact mid-loop (the recursive runAssistantTurn keeps tools // flowing); the flag fires on the NEXT turn's pre-fetch hook above. await maybeFlagForCompaction(ctx, chatId, updated); const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId} RETURNING project_id, name, updated_at `; ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at }); for (const tc of toolCalls) { ctx.publish(sessionId, { type: 'tool_call', message_id: assistantMessageId, chat_id: chatId, tool_call: tc, }); } ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, chat_id: chatId, tokens_used: updated?.tokens_used ?? null, ctx_used: updated?.ctx_used ?? null, ctx_max: updated?.ctx_max ?? null, started_at: startedAt, finished_at: updated?.finished_at ?? null, model: session.model, }); // Batch 9.7: ask_user_input pauses the loop. The tool row is still inserted // (the answer endpoint needs a target row to UPDATE), but tool_results is // pre-stamped with output=null as a "pending" sentinel and no tool_result // frame goes out — the card renders from the tool_call frame alone. Mixed // batches still execute the other tools normally. ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'tool_running', at: new Date().toISOString() }); let pausingForUserInput = false; await Promise.all( toolCalls.map(async (tc) => { const [toolRow] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp()) RETURNING id `; const toolMessageId = toolRow!.id; if (tc.name === 'ask_user_input') { pausingForUserInput = true; const sentinel = { tool_call_id: tc.id, output: null, truncated: false }; await ctx.sql` UPDATE messages SET tool_results = ${ctx.sql.json(sentinel as never)} WHERE id = ${toolMessageId} `; // v1.13.0: mirror the pending sentinel into message_parts. The // answer-endpoint UPDATE later (messages.ts:576) will delete and // re-insert this part when the user submits their answer. // TODO(v1.13.1): wrap the INSERT + UPDATE + insertParts triple in // a per-iteration sql.begin before flipping read authority. await insertParts( ctx.sql, partsFromToolMessage({ tool_results: sentinel }).map((p) => ({ ...p, message_id: toolMessageId, })), ); return; } const tres = await executeToolCall(projectRoot, tc); const stored = { tool_call_id: tc.id, output: tres.output, truncated: tres.truncated, ...(tres.error ? { error: tres.error } : {}), }; await ctx.sql` UPDATE messages SET tool_results = ${ctx.sql.json(stored as never)} WHERE id = ${toolMessageId} `; // v1.13.0: dual-write the tool_result part. // TODO(v1.13.1): wrap the INSERT + UPDATE + insertParts triple in a // per-iteration sql.begin before flipping read authority. await insertParts( ctx.sql, partsFromToolMessage({ tool_results: stored }).map((p) => ({ ...p, message_id: toolMessageId, })), ); ctx.publish(sessionId, { type: 'tool_result', tool_message_id: toolMessageId, chat_id: chatId, tool_call_id: tc.id, output: tres.output, truncated: tres.truncated, ...(tres.error ? { error: tres.error } : {}), }); }) ); if (pausingForUserInput) { ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'waiting_for_input', at: new Date().toISOString(), }); ctx.log.info( { sessionId, chatId, assistantMessageId }, 'inference paused awaiting user input', ); return; } const [nextAssistant] = await ctx.sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status, created_at) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) RETURNING id `; await runAssistantTurn(ctx, { sessionId, chatId, assistantMessageId: nextAssistant!.id, // v1.8.2: charge this turn's actual tool invocations against the budget. // One assistant message can emit multiple tool_calls, so we add the run // count, not 1. The next turn's budget check sees the cumulative total. toolsUsed: toolsUsed + result.toolCalls.length, // v1.11.6: append the just-executed tool calls to the per-turn history // so the next runAssistantTurn's doom-loop check can see them. We don't // cap the array length here — per-turn budgets keep it bounded // (typically <30 entries), and slicing happens inside detectDoomLoop. recentToolCalls: [...args.recentToolCalls, ...result.toolCalls], signal, }); }