- Add state-graph.ts: typed state machine for inference lifecycle - Add supervisor.ts: agent supervisor pattern for multi-agent coordination - Add export-formatter.ts: structured export formatting - Add manage_memory.ts: memory CRUD tool for agent persistence - Add get_wiki_article.ts: codecontext wiki article retrieval - Extend memory/index.ts: 3-tier memory (context/daily/core) - Extend MCP client: mcp-config.ts env-var substitution - Update schema.sql: agent_sessions, tasks, pending_changes extensions - Update API types: MessageMetadata, ErrorReason, AgentSessionConfig - Update routes: chats, messages, sessions — column renames and agent_session_id - Update inference: error handler, payload builder, stream phase, turn orchestrator
151 lines
5.5 KiB
TypeScript
151 lines
5.5 KiB
TypeScript
// P5 (SPLIT SKETCH): stream-phase.ts is now the BooCode I/O layer for the
|
|
// stream phase — `executeStreamPhase` owns the row UPDATE, message_started
|
|
// frame, debounced content flush, throttled usage publish, model-context
|
|
// lookup, and tool-whitelist filter. The generic AI-SDK adapter
|
|
// (streamCompletion / toModelMessages / buildAiTools / sampler helpers) moved
|
|
// to ./stream-phase-adapter.ts, which has no SQL/broker/publish deps and is
|
|
// unit-testable on its own. The adapter's public names are re-exported below so
|
|
// existing importers of './stream-phase.js' (sentinel-summaries, synthesis
|
|
// pipeline, the helper tests) keep working unchanged.
|
|
|
|
import type { Agent, Session } from '../../types/api.js';
|
|
import * as modelContext from '../model-context.js';
|
|
import { toolJsonSchemas, type ToolJsonSchema } from '../tools.js';
|
|
import { matchToolGlob } from '../agents.js';
|
|
import type { OpenAiMessage } from './payload.js';
|
|
import { createContentFlusher } from './content-flusher.js';
|
|
import type {
|
|
StreamPhaseState,
|
|
InferenceContext,
|
|
StreamResult,
|
|
TurnArgs,
|
|
} from './types.js';
|
|
import { streamCompletion, samplerOptsFromAgent } from './stream-phase-adapter.js';
|
|
|
|
export {
|
|
streamCompletion,
|
|
samplerOptsFromAgent,
|
|
type StreamOptions,
|
|
type SamplerOpts,
|
|
type StreamAdapterContext,
|
|
} from './stream-phase-adapter.js';
|
|
|
|
export async function executeStreamPhase(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
session: Session,
|
|
messages: OpenAiMessage[],
|
|
state: StreamPhaseState,
|
|
agent: Agent | null,
|
|
// v1.11.8: when false, web_search and web_fetch are stripped from the
|
|
// tool list sent to the LLM, so the model can't even attempt them.
|
|
webToolsEnabled: boolean,
|
|
): Promise<StreamResult> {
|
|
const { sessionId, chatId, assistantMessageId, signal } = args;
|
|
|
|
const startedRow = await ctx.sql<{ started_at: string }[]>`
|
|
UPDATE messages
|
|
SET started_at = clock_timestamp()
|
|
WHERE id = ${assistantMessageId}
|
|
RETURNING started_at
|
|
`;
|
|
state.startedAt = startedRow[0]?.started_at ?? null;
|
|
|
|
ctx.publish(sessionId, {
|
|
type: 'message_started',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
role: 'assistant',
|
|
...(args.compareGroupId ? { compare_group_id: args.compareGroupId } : {}),
|
|
});
|
|
|
|
const flusher = createContentFlusher(ctx.sql, assistantMessageId, () => state.accumulated);
|
|
|
|
// Tool whitelist: if an agent is set, filter the global tool list to only the
|
|
// tool names it allows. v1.15.0-mcp-multi: uses matchToolGlob for glob
|
|
// pattern support (e.g. `context7_*`, `!web_*`). When no agent: send all tools.
|
|
// v1.11.8: a second filter strips web_search + web_fetch unless the chat
|
|
// has them explicitly enabled. Counts as an opt-in security boundary: the
|
|
// model can't summon a tool that wasn't offered to it.
|
|
const WEB_TOOL_NAMES: ReadonlySet<string> = new Set(['web_search', 'web_fetch']);
|
|
const effectiveTools: ToolJsonSchema[] = (agent
|
|
? toolJsonSchemas().filter((t) => matchToolGlob(t.function.name, agent.tools))
|
|
: toolJsonSchemas()
|
|
).filter((t) => webToolsEnabled || !WEB_TOOL_NAMES.has(t.function.name));
|
|
|
|
// v1.12.2: ctx_max lookup is cached after the first hit per model, so this
|
|
// is a Map probe in steady state. We capture nCtx once at the top of the
|
|
// stream so the throttled usage publish doesn't refetch each tick.
|
|
const mctxForStream = await modelContext.getModelContext(session.model);
|
|
const nCtxForStream = mctxForStream?.n_ctx ?? null;
|
|
|
|
// v1.12.2 → v1.13.1-A: live usage publishes were throttled to ~500ms when
|
|
// the manual SSE parser saw `parsed.usage` per chunk. AI SDK v6 surfaces
|
|
// usage only at stream end (result.usage promise), so the throttle is
|
|
// effectively a single trailing publish. ChatThroughput will tick once at
|
|
// stream completion rather than mid-stream — known regression vs v1.12.2,
|
|
// recovered if a future dispatch interpolates from delta cadence.
|
|
const USAGE_THROTTLE_MS = 500;
|
|
let lastUsageAt = 0;
|
|
let pendingUsage: { p: number | null; c: number | null } | null = null;
|
|
let usageTimer: NodeJS.Timeout | null = null;
|
|
const flushUsage = () => {
|
|
if (!pendingUsage) return;
|
|
const { p, c } = pendingUsage;
|
|
pendingUsage = null;
|
|
lastUsageAt = Date.now();
|
|
ctx.publish(sessionId, {
|
|
type: 'usage',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
completion_tokens: c,
|
|
ctx_used: p,
|
|
ctx_max: nCtxForStream,
|
|
});
|
|
};
|
|
|
|
try {
|
|
return await streamCompletion(
|
|
ctx,
|
|
session.model,
|
|
messages,
|
|
{
|
|
tools: effectiveTools,
|
|
...samplerOptsFromAgent(agent),
|
|
},
|
|
(delta) => {
|
|
state.accumulated += delta;
|
|
ctx.publish(sessionId, {
|
|
type: 'delta',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
content: delta,
|
|
...(args.compareGroupId ? { compare_group_id: args.compareGroupId } : {}),
|
|
});
|
|
ctx.log.debug({ sessionId, delta }, 'inference delta');
|
|
flusher.scheduleFlush();
|
|
},
|
|
(prompt, completion) => {
|
|
pendingUsage = { p: prompt, c: completion };
|
|
const elapsed = Date.now() - lastUsageAt;
|
|
if (elapsed >= USAGE_THROTTLE_MS) {
|
|
flushUsage();
|
|
} else if (!usageTimer) {
|
|
usageTimer = setTimeout(() => {
|
|
usageTimer = null;
|
|
flushUsage();
|
|
}, USAGE_THROTTLE_MS - elapsed);
|
|
}
|
|
},
|
|
signal,
|
|
agent,
|
|
);
|
|
} finally {
|
|
if (usageTimer) {
|
|
clearTimeout(usageTimer);
|
|
usageTimer = null;
|
|
}
|
|
await flusher.drain();
|
|
}
|
|
}
|