Pass 1 — ask_user_input correlation port (messages.ts:478, :549):
- The two correlation queries that backed the elicitation flow used to scan
messages.tool_calls and messages.tool_results JSON columns directly. They
now JOIN message_parts on payload->>'id' (for the caller assistant) and
payload->>'tool_call_id' (for the pending tool row). Semantics preserved:
ORDER BY m.created_at DESC LIMIT 1 still picks the latest issuance, the
already-answered 409 guard now reads payload.output, and the UPDATE +
parts replace inside sql.begin is unchanged from v1.13.0.
- Pre-v1.13.0 history has no parts rows and is unreachable to this lookup
path (404). Acceptable per dispatch decision — no pending elicitation
from before v1.13.0 will still be open. JSON-column fallback can land as
a hotfix if it ever surfaces.
Pass 2 — reasoning_parts wired end-to-end:
- types.ts/StreamResult gains `reasoning: string`. stream-phase.ts accumulates
reasoning-delta text per stream (replacing the v1.13.1-A counter-only
diagnostic) and returns it on the result.
- parts.ts/partsFromAssistantMessage gains an optional `reasoning` param.
When present it emits a kind='reasoning' part at sequence 0, ahead of
the text and tool_call parts.
- error-handler.ts/finalizeCompletion and tool-phase.ts/executeToolPhase
both thread result.reasoning into the dual-write call so reasoning-channel
models (qwen3.6) get persistent reasoning rows.
- payload.ts: loadContext SELECT pulls reasoning_parts from the v1.13.1-B
view; OpenAiMessage gains an optional `reasoning` field; buildMessagesPayload
collapses reasoning_parts into a single string per assistant message.
- stream-phase.ts/toModelMessages converts assistant messages with reasoning
into an AI SDK ModelMessage content array starting with a ReasoningPart,
matching the @ai-sdk/provider-utils AssistantContent union. Reasoning
models can now replay prior reasoning context across tool-call boundaries.
- types/api.ts and apps/web/src/api/types.ts Message interface gain
reasoning_parts (optional, nullable). Frontend doesn't render this yet —
field reserved for a v1.14 UI surface.
Tests: 2 new in parts.test.ts cover reasoning-at-sequence-0 with and
without text content. 172 tests pass (170 prior + 2 new).
Smoke verified against the live container:
- A reasoning-prompt ("walk through 17 × 23 step by step") produced one
message with kind='reasoning' (361 chars) at sequence 0 and kind='text'
(429 chars) at sequence 1. Adapter log confirmed reasoning capture.
- The new correlation SQL was validated against existing tool_call /
tool_result parts: returns the expected message_id + payload shape with
pending state correctly identified via payload.output IS NULL.
- ask_user_input end-to-end through the UI is Sam's smoke — the Prompt
Builder agent does not always trigger ask_user_input for these prompts,
so synthetic verification via SQL substituted for traffic-driven cover.
Annotation: the v1.13.1-A abort-throw site in stream-phase.ts got a
one-liner comment ("AI SDK v6 fullStream returns normally on abort; check
signal explicitly.") to prevent a future refactor removing it.
v1.13.2 drops the dual-write + the JSON columns + collapses the view.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
257 lines
10 KiB
TypeScript
257 lines
10 KiB
TypeScript
import type { Session, ToolCall } from '../../types/api.js';
|
|
import * as modelContext from '../model-context.js';
|
|
import { PathScopeError } from '../path_guard.js';
|
|
import { TOOLS_BY_NAME } from '../tools.js';
|
|
import { maybeFlagForCompaction } from './payload.js';
|
|
import { insertParts, partsFromAssistantMessage, partsFromToolMessage } from './parts.js';
|
|
import type {
|
|
InferenceContext,
|
|
StreamResult,
|
|
TurnArgs,
|
|
} from './turn.js';
|
|
// v1.12.4: ESM value-import cycle. executeToolPhase recurses into
|
|
// runAssistantTurn which lives in inference.ts. The cycle is safe because
|
|
// the reference is read at call time (inside an async function body), not
|
|
// at module top-level. Node + tsc resolve this cleanly.
|
|
import { runAssistantTurn } from './turn.js';
|
|
|
|
async function executeToolCall(
|
|
projectRoot: string,
|
|
toolCall: ToolCall
|
|
): Promise<{ output: unknown; truncated: boolean; error?: string }> {
|
|
const tool = TOOLS_BY_NAME[toolCall.name];
|
|
if (!tool) {
|
|
return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` };
|
|
}
|
|
const parsed = tool.inputSchema.safeParse(toolCall.args);
|
|
if (!parsed.success) {
|
|
// v1.12 Track B.2: enrich the zod-reject path so the model sees a
|
|
// one-line, tool-named hint ("tool 'search_symbols' rejected — query:
|
|
// Required") instead of a JSON blob of flatten output. Higher recovery
|
|
// rate on the next turn; doom-loop guard still bounds infinite retries.
|
|
// The cast is because tool.inputSchema is ZodType<unknown>, so zod can't
|
|
// statically narrow flatten()'s fieldErrors key set — but the runtime
|
|
// shape is the standard { formErrors: string[]; fieldErrors: Record<...> }.
|
|
const flatten = parsed.error.flatten() as {
|
|
formErrors: string[];
|
|
fieldErrors: Record<string, string[] | undefined>;
|
|
};
|
|
const fieldErrors = Object.entries(flatten.fieldErrors)
|
|
.map(([field, errs]) => `${field}: ${errs?.[0] ?? 'invalid'}`)
|
|
.join('; ');
|
|
const formError = flatten.formErrors[0];
|
|
const hint = fieldErrors || formError || 'unknown validation error';
|
|
return {
|
|
output: null,
|
|
truncated: false,
|
|
error: `tool '${toolCall.name}' rejected — ${hint}`,
|
|
};
|
|
}
|
|
try {
|
|
const output = await tool.execute(parsed.data, projectRoot);
|
|
const truncated =
|
|
typeof output === 'object' && output !== null && 'truncated' in output
|
|
? Boolean((output as { truncated: unknown }).truncated)
|
|
: false;
|
|
return { output, truncated };
|
|
} catch (err) {
|
|
if (err instanceof PathScopeError) {
|
|
return { output: null, truncated: false, error: err.message };
|
|
}
|
|
return {
|
|
output: null,
|
|
truncated: false,
|
|
error: err instanceof Error ? err.message : String(err),
|
|
};
|
|
}
|
|
}
|
|
|
|
export async function executeToolPhase(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
result: StreamResult,
|
|
startedAt: string | null,
|
|
session: Session,
|
|
projectRoot: string
|
|
): Promise<void> {
|
|
const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args;
|
|
const { content, toolCalls, promptTokens, completionTokens } = result;
|
|
|
|
// v1.11.3: ctx_max comes from llama-swap /upstream/<model>/props, not the
|
|
// streaming completion (which doesn't emit n_ctx). getModelContext caches
|
|
// the positive lookup for the process lifetime, so this is a single Map
|
|
// hit after the first invocation per model.
|
|
const mctx = await modelContext.getModelContext(session.model);
|
|
const nCtx = mctx?.n_ctx ?? null;
|
|
|
|
const [updated] = await ctx.sql<
|
|
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
|
>`
|
|
UPDATE messages
|
|
SET content = ${content},
|
|
status = 'complete',
|
|
tool_calls = ${ctx.sql.json(toolCalls as never)},
|
|
tokens_used = ${completionTokens},
|
|
ctx_used = ${promptTokens},
|
|
ctx_max = ${nCtx},
|
|
finished_at = clock_timestamp()
|
|
WHERE id = ${assistantMessageId}
|
|
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
|
`;
|
|
// v1.13.0: dual-write to message_parts. v1.13.1-B made parts authoritative
|
|
// for reads via the messages_with_parts view; the JSON column write above
|
|
// remains for v1.13.1 fallback compatibility (dropped in v1.13.2).
|
|
// v1.13.1-C: include result.reasoning so models with separate reasoning
|
|
// channels (qwen3.6) get a kind='reasoning' part at sequence 0.
|
|
// TODO(v1.13.1): wrap the UPDATE above and this insertParts in a single
|
|
// sql.begin before flipping read authority to message_parts. Without the
|
|
// transaction, a crash between the two leaves an orphan message that
|
|
// becomes invisible in the parts-authoritative read path.
|
|
await insertParts(
|
|
ctx.sql,
|
|
partsFromAssistantMessage({
|
|
content,
|
|
tool_calls: toolCalls,
|
|
reasoning: result.reasoning,
|
|
}).map((p) => ({
|
|
...p,
|
|
message_id: assistantMessageId,
|
|
})),
|
|
);
|
|
// v1.11: flag for compaction if this turn pushed us over the usable budget.
|
|
// We never compact mid-loop (the recursive runAssistantTurn keeps tools
|
|
// flowing); the flag fires on the NEXT turn's pre-fetch hook above.
|
|
await maybeFlagForCompaction(ctx, chatId, updated);
|
|
const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
|
UPDATE sessions SET updated_at = clock_timestamp()
|
|
WHERE id = ${sessionId}
|
|
RETURNING project_id, name, updated_at
|
|
`;
|
|
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at });
|
|
for (const tc of toolCalls) {
|
|
ctx.publish(sessionId, {
|
|
type: 'tool_call',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
tool_call: tc,
|
|
});
|
|
}
|
|
ctx.publish(sessionId, {
|
|
type: 'message_complete',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
tokens_used: updated?.tokens_used ?? null,
|
|
ctx_used: updated?.ctx_used ?? null,
|
|
ctx_max: updated?.ctx_max ?? null,
|
|
started_at: startedAt,
|
|
finished_at: updated?.finished_at ?? null,
|
|
model: session.model,
|
|
});
|
|
|
|
// Batch 9.7: ask_user_input pauses the loop. The tool row is still inserted
|
|
// (the answer endpoint needs a target row to UPDATE), but tool_results is
|
|
// pre-stamped with output=null as a "pending" sentinel and no tool_result
|
|
// frame goes out — the card renders from the tool_call frame alone. Mixed
|
|
// batches still execute the other tools normally.
|
|
ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'tool_running', at: new Date().toISOString() });
|
|
let pausingForUserInput = false;
|
|
await Promise.all(
|
|
toolCalls.map(async (tc) => {
|
|
const [toolRow] = await ctx.sql<{ id: string }[]>`
|
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp())
|
|
RETURNING id
|
|
`;
|
|
const toolMessageId = toolRow!.id;
|
|
if (tc.name === 'ask_user_input') {
|
|
pausingForUserInput = true;
|
|
const sentinel = { tool_call_id: tc.id, output: null, truncated: false };
|
|
await ctx.sql`
|
|
UPDATE messages
|
|
SET tool_results = ${ctx.sql.json(sentinel as never)}
|
|
WHERE id = ${toolMessageId}
|
|
`;
|
|
// v1.13.0: mirror the pending sentinel into message_parts. The
|
|
// answer-endpoint UPDATE later (messages.ts:576) will delete and
|
|
// re-insert this part when the user submits their answer.
|
|
// TODO(v1.13.1): wrap the INSERT + UPDATE + insertParts triple in
|
|
// a per-iteration sql.begin before flipping read authority.
|
|
await insertParts(
|
|
ctx.sql,
|
|
partsFromToolMessage({ tool_results: sentinel }).map((p) => ({
|
|
...p,
|
|
message_id: toolMessageId,
|
|
})),
|
|
);
|
|
return;
|
|
}
|
|
const tres = await executeToolCall(projectRoot, tc);
|
|
const stored = {
|
|
tool_call_id: tc.id,
|
|
output: tres.output,
|
|
truncated: tres.truncated,
|
|
...(tres.error ? { error: tres.error } : {}),
|
|
};
|
|
await ctx.sql`
|
|
UPDATE messages
|
|
SET tool_results = ${ctx.sql.json(stored as never)}
|
|
WHERE id = ${toolMessageId}
|
|
`;
|
|
// v1.13.0: dual-write the tool_result part.
|
|
// TODO(v1.13.1): wrap the INSERT + UPDATE + insertParts triple in a
|
|
// per-iteration sql.begin before flipping read authority.
|
|
await insertParts(
|
|
ctx.sql,
|
|
partsFromToolMessage({ tool_results: stored }).map((p) => ({
|
|
...p,
|
|
message_id: toolMessageId,
|
|
})),
|
|
);
|
|
ctx.publish(sessionId, {
|
|
type: 'tool_result',
|
|
tool_message_id: toolMessageId,
|
|
chat_id: chatId,
|
|
tool_call_id: tc.id,
|
|
output: tres.output,
|
|
truncated: tres.truncated,
|
|
...(tres.error ? { error: tres.error } : {}),
|
|
});
|
|
})
|
|
);
|
|
|
|
if (pausingForUserInput) {
|
|
ctx.publishUser({
|
|
type: 'chat_status',
|
|
chat_id: chatId,
|
|
status: 'waiting_for_input',
|
|
at: new Date().toISOString(),
|
|
});
|
|
ctx.log.info(
|
|
{ sessionId, chatId, assistantMessageId },
|
|
'inference paused awaiting user input',
|
|
);
|
|
return;
|
|
}
|
|
|
|
const [nextAssistant] = await ctx.sql<{ id: string }[]>`
|
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
|
RETURNING id
|
|
`;
|
|
await runAssistantTurn(ctx, {
|
|
sessionId,
|
|
chatId,
|
|
assistantMessageId: nextAssistant!.id,
|
|
// v1.8.2: charge this turn's actual tool invocations against the budget.
|
|
// One assistant message can emit multiple tool_calls, so we add the run
|
|
// count, not 1. The next turn's budget check sees the cumulative total.
|
|
toolsUsed: toolsUsed + result.toolCalls.length,
|
|
// v1.11.6: append the just-executed tool calls to the per-turn history
|
|
// so the next runAssistantTurn's doom-loop check can see them. We don't
|
|
// cap the array length here — per-turn budgets keep it bounded
|
|
// (typically <30 entries), and slicing happens inside detectDoomLoop.
|
|
recentToolCalls: [...args.recentToolCalls, ...result.toolCalls],
|
|
signal,
|
|
});
|
|
}
|