- FileBrowserPane.tsx: deleted (unreferenced post-v1.4 PaneTab.tsx removal; the legacy file_browser pane kind isn't part of the active WorkspacePane taxonomy). - Workspace.tsx (524 -> 172 lines): extracted useWorkspacePanes(sessionId) and useSessionChats(sessionId) hooks. Workspace is layout-only composition now. localStorage key + WS frame handling + drag semantics unchanged. - inference.ts runAssistantTurn (~265 -> 48 lines): bundled args into TurnArgs interface, extracted executeStreamPhase / executeToolPhase / finalizeCompletion / handleAbortOrError. All WS publish ordering preserved byte-for-byte (mentally traced for tool / non-tool / abort / error / depth-exceeded paths). flushPromise chain + setImmediate + signal propagation unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
836 lines
25 KiB
TypeScript
836 lines
25 KiB
TypeScript
import type { FastifyBaseLogger } from 'fastify';
|
|
import type { Sql } from '../db.js';
|
|
import type { Config } from '../config.js';
|
|
import type { Message, Project, Session, ToolCall, UserStreamFrame } from '../types/api.js';
|
|
import { ALL_TOOLS, TOOLS_BY_NAME, toolJsonSchemas } from './tools.js';
|
|
import { PathScopeError, resolveProjectRoot } from './path_guard.js';
|
|
import { maybeAutoNameChat } from './auto_name.js';
|
|
|
|
const BASE_SYSTEM_PROMPT = (projectPath: string) =>
|
|
`You are BooCode Chat, a code investigation assistant. The user is working on a project located at ${projectPath}. Use the file-read tools (view_file, list_dir, grep, find_files) to investigate code when needed. Be concise. Cite file paths and line numbers when discussing code. Do not hallucinate file contents — read the file first. Tool results may be truncated; if so, narrow your query rather than guessing.`;
|
|
|
|
const DB_FLUSH_INTERVAL_MS = 500;
|
|
const MAX_TOOL_LOOP_DEPTH = 5;
|
|
|
|
export interface InferenceFrame {
|
|
type:
|
|
| 'message_started'
|
|
| 'delta'
|
|
| 'tool_call'
|
|
| 'tool_result'
|
|
| 'message_complete'
|
|
| 'messages_deleted'
|
|
| 'session_renamed'
|
|
| 'chat_renamed'
|
|
| 'error';
|
|
message_id?: string;
|
|
message_ids?: string[];
|
|
chat_id?: string;
|
|
tool_message_id?: string;
|
|
tool_call_id?: string;
|
|
role?: 'assistant' | 'tool' | 'user';
|
|
content?: string;
|
|
tool_call?: ToolCall;
|
|
output?: unknown;
|
|
truncated?: boolean;
|
|
error?: string;
|
|
tokens_used?: number | null;
|
|
ctx_used?: number | null;
|
|
ctx_max?: number | null;
|
|
started_at?: string | null;
|
|
finished_at?: string | null;
|
|
model?: string;
|
|
session_id?: string;
|
|
name?: string;
|
|
}
|
|
|
|
export type FramePublisher = (sessionId: string, frame: InferenceFrame) => void;
|
|
|
|
interface OpenAiMessage {
|
|
role: 'system' | 'user' | 'assistant' | 'tool';
|
|
content: string | null;
|
|
tool_calls?: Array<{
|
|
id: string;
|
|
type: 'function';
|
|
function: { name: string; arguments: string };
|
|
}>;
|
|
tool_call_id?: string;
|
|
}
|
|
|
|
interface ChatCompletionDelta {
|
|
role?: string;
|
|
content?: string | null;
|
|
tool_calls?: Array<{
|
|
index: number;
|
|
id?: string;
|
|
type?: 'function';
|
|
function?: { name?: string; arguments?: string };
|
|
}>;
|
|
}
|
|
|
|
interface ChatCompletionChunk {
|
|
choices?: Array<{
|
|
delta: ChatCompletionDelta;
|
|
finish_reason: string | null;
|
|
}>;
|
|
usage?: {
|
|
prompt_tokens?: number;
|
|
completion_tokens?: number;
|
|
total_tokens?: number;
|
|
};
|
|
timings?: {
|
|
n_ctx?: number;
|
|
};
|
|
}
|
|
|
|
export interface InferenceContext {
|
|
sql: Sql;
|
|
config: Config;
|
|
log: FastifyBaseLogger;
|
|
publish: FramePublisher;
|
|
publishUser: (frame: UserStreamFrame) => void;
|
|
}
|
|
|
|
export function buildMessagesPayload(
|
|
session: Session,
|
|
project: Project,
|
|
history: Message[]
|
|
): OpenAiMessage[] {
|
|
const out: OpenAiMessage[] = [];
|
|
let systemPrompt = BASE_SYSTEM_PROMPT(project.path);
|
|
if (session.system_prompt && session.system_prompt.trim().length > 0) {
|
|
systemPrompt += '\n\n' + session.system_prompt.trim();
|
|
}
|
|
out.push({ role: 'system', content: systemPrompt });
|
|
|
|
// Find the latest compact marker — only send messages from that point onwards
|
|
let startIdx = 0;
|
|
for (let i = history.length - 1; i >= 0; i--) {
|
|
if (history[i]!.kind === 'compact') {
|
|
startIdx = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (let i = startIdx; i < history.length; i++) {
|
|
const m = history[i]!;
|
|
if (m.kind === 'compact') {
|
|
out.push({ role: 'system', content: m.content });
|
|
continue;
|
|
}
|
|
if (m.role === 'assistant' && m.status === 'streaming') continue;
|
|
if (m.role === 'assistant' && m.status === 'cancelled') continue;
|
|
if (m.role === 'tool') {
|
|
const tr = m.tool_results;
|
|
if (!tr) continue;
|
|
const outputText = tr.error
|
|
? `error: ${tr.error}`
|
|
: typeof tr.output === 'string'
|
|
? tr.output
|
|
: JSON.stringify(tr.output);
|
|
out.push({
|
|
role: 'tool',
|
|
content: outputText,
|
|
tool_call_id: tr.tool_call_id,
|
|
});
|
|
continue;
|
|
}
|
|
if (m.role === 'assistant') {
|
|
const msg: OpenAiMessage = {
|
|
role: 'assistant',
|
|
content: m.content && m.content.length > 0 ? m.content : null,
|
|
};
|
|
if (m.tool_calls && m.tool_calls.length > 0) {
|
|
msg.tool_calls = m.tool_calls.map((tc) => ({
|
|
id: tc.id,
|
|
type: 'function' as const,
|
|
function: { name: tc.name, arguments: JSON.stringify(tc.args) },
|
|
}));
|
|
}
|
|
out.push(msg);
|
|
continue;
|
|
}
|
|
out.push({ role: 'user', content: m.content });
|
|
}
|
|
return out;
|
|
}
|
|
|
|
async function loadContext(
|
|
sql: Sql,
|
|
sessionId: string,
|
|
chatId: string
|
|
): Promise<{ session: Session; project: Project; history: Message[] } | null> {
|
|
const sessionRows = await sql<Session[]>`
|
|
SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at
|
|
FROM sessions WHERE id = ${sessionId}
|
|
`;
|
|
if (sessionRows.length === 0) return null;
|
|
const session = sessionRows[0]!;
|
|
|
|
const projectRows = await sql<Project[]>`
|
|
SELECT id, name, path, added_at, last_session_id
|
|
FROM projects WHERE id = ${session.project_id}
|
|
`;
|
|
if (projectRows.length === 0) return null;
|
|
const project = projectRows[0]!;
|
|
|
|
const history = await sql<Message[]>`
|
|
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
|
|
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at
|
|
FROM messages
|
|
WHERE chat_id = ${chatId}
|
|
ORDER BY created_at ASC, id ASC
|
|
`;
|
|
|
|
return { session, project, history };
|
|
}
|
|
|
|
async function* sseLines(stream: ReadableStream<Uint8Array>): AsyncGenerator<string> {
|
|
const reader = stream.getReader();
|
|
const decoder = new TextDecoder('utf-8');
|
|
let buffer = '';
|
|
try {
|
|
while (true) {
|
|
const { value, done } = await reader.read();
|
|
if (done) break;
|
|
buffer += decoder.decode(value, { stream: true });
|
|
let idx;
|
|
while ((idx = buffer.indexOf('\n')) >= 0) {
|
|
const line = buffer.slice(0, idx).replace(/\r$/, '');
|
|
buffer = buffer.slice(idx + 1);
|
|
if (line.length === 0) continue;
|
|
yield line;
|
|
}
|
|
}
|
|
if (buffer.length > 0) yield buffer;
|
|
} finally {
|
|
reader.releaseLock();
|
|
}
|
|
}
|
|
|
|
interface StreamResult {
|
|
finishReason: string | null;
|
|
content: string;
|
|
toolCalls: ToolCall[];
|
|
promptTokens: number | null;
|
|
completionTokens: number | null;
|
|
nCtx: number | null;
|
|
}
|
|
|
|
async function streamCompletion(
|
|
ctx: InferenceContext,
|
|
model: string,
|
|
messages: OpenAiMessage[],
|
|
includeTools: boolean,
|
|
onDelta: (content: string) => void,
|
|
signal?: AbortSignal
|
|
): Promise<StreamResult> {
|
|
const body: Record<string, unknown> = {
|
|
model,
|
|
messages,
|
|
stream: true,
|
|
stream_options: { include_usage: true },
|
|
};
|
|
if (includeTools) {
|
|
body['tools'] = toolJsonSchemas();
|
|
body['tool_choice'] = 'auto';
|
|
}
|
|
|
|
const res = await fetch(`${ctx.config.LLAMA_SWAP_URL}/v1/chat/completions`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(body),
|
|
signal,
|
|
});
|
|
if (!res.ok || !res.body) {
|
|
const text = await res.text().catch(() => '');
|
|
throw new Error(`llama-swap returned ${res.status}: ${text.slice(0, 200)}`);
|
|
}
|
|
|
|
let content = '';
|
|
let finishReason: string | null = null;
|
|
let promptTokens: number | null = null;
|
|
let completionTokens: number | null = null;
|
|
let nCtx: number | null = null;
|
|
const toolCallsBuffer = new Map<number, { id: string; name: string; argsText: string }>();
|
|
|
|
for await (const line of sseLines(res.body)) {
|
|
if (!line.startsWith('data:')) continue;
|
|
const payload = line.slice(5).trim();
|
|
if (payload === '[DONE]') break;
|
|
let parsed: ChatCompletionChunk;
|
|
try {
|
|
parsed = JSON.parse(payload);
|
|
} catch {
|
|
continue;
|
|
}
|
|
|
|
if (parsed.usage) {
|
|
if (typeof parsed.usage.prompt_tokens === 'number') {
|
|
promptTokens = parsed.usage.prompt_tokens;
|
|
}
|
|
if (typeof parsed.usage.completion_tokens === 'number') {
|
|
completionTokens = parsed.usage.completion_tokens;
|
|
}
|
|
}
|
|
if (parsed.timings && typeof parsed.timings.n_ctx === 'number') {
|
|
nCtx = parsed.timings.n_ctx;
|
|
}
|
|
|
|
const choice = parsed.choices?.[0];
|
|
if (!choice) continue;
|
|
const delta = choice.delta ?? {};
|
|
if (typeof delta.content === 'string' && delta.content.length > 0) {
|
|
content += delta.content;
|
|
onDelta(delta.content);
|
|
}
|
|
if (Array.isArray(delta.tool_calls)) {
|
|
for (const tc of delta.tool_calls) {
|
|
const idx = tc.index;
|
|
const existing = toolCallsBuffer.get(idx) ?? { id: '', name: '', argsText: '' };
|
|
if (tc.id) existing.id = tc.id;
|
|
if (tc.function?.name) existing.name = tc.function.name;
|
|
if (typeof tc.function?.arguments === 'string') existing.argsText += tc.function.arguments;
|
|
toolCallsBuffer.set(idx, existing);
|
|
}
|
|
}
|
|
if (choice.finish_reason) finishReason = choice.finish_reason;
|
|
}
|
|
|
|
const toolCalls: ToolCall[] = [];
|
|
for (const [, t] of [...toolCallsBuffer.entries()].sort(([a], [b]) => a - b)) {
|
|
let args: Record<string, unknown> = {};
|
|
if (t.argsText.length > 0) {
|
|
try {
|
|
args = JSON.parse(t.argsText);
|
|
} catch {
|
|
args = { _raw: t.argsText };
|
|
}
|
|
}
|
|
toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args });
|
|
}
|
|
|
|
return { finishReason, content, toolCalls, promptTokens, completionTokens, nCtx };
|
|
}
|
|
|
|
async function executeToolCall(
|
|
projectRoot: string,
|
|
toolCall: ToolCall
|
|
): Promise<{ output: unknown; truncated: boolean; error?: string }> {
|
|
const tool = TOOLS_BY_NAME[toolCall.name];
|
|
if (!tool) {
|
|
return { output: null, truncated: false, error: `unknown tool: ${toolCall.name}` };
|
|
}
|
|
const parsed = tool.inputSchema.safeParse(toolCall.args);
|
|
if (!parsed.success) {
|
|
return {
|
|
output: null,
|
|
truncated: false,
|
|
error: `invalid input: ${JSON.stringify(parsed.error.flatten())}`,
|
|
};
|
|
}
|
|
try {
|
|
const output = await tool.execute(parsed.data, projectRoot);
|
|
const truncated =
|
|
typeof output === 'object' && output !== null && 'truncated' in output
|
|
? Boolean((output as { truncated: unknown }).truncated)
|
|
: false;
|
|
return { output, truncated };
|
|
} catch (err) {
|
|
if (err instanceof PathScopeError) {
|
|
return { output: null, truncated: false, error: err.message };
|
|
}
|
|
return {
|
|
output: null,
|
|
truncated: false,
|
|
error: err instanceof Error ? err.message : String(err),
|
|
};
|
|
}
|
|
}
|
|
|
|
interface TurnArgs {
|
|
sessionId: string;
|
|
chatId: string;
|
|
assistantMessageId: string;
|
|
depth: number;
|
|
signal: AbortSignal | undefined;
|
|
}
|
|
|
|
interface StreamPhaseState {
|
|
accumulated: string;
|
|
startedAt: string | null;
|
|
}
|
|
|
|
async function executeStreamPhase(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
session: Session,
|
|
messages: OpenAiMessage[],
|
|
state: StreamPhaseState
|
|
): Promise<StreamResult> {
|
|
const { sessionId, chatId, assistantMessageId, signal } = args;
|
|
|
|
const startedRow = await ctx.sql<{ started_at: string }[]>`
|
|
UPDATE messages
|
|
SET started_at = clock_timestamp()
|
|
WHERE id = ${assistantMessageId}
|
|
RETURNING started_at
|
|
`;
|
|
state.startedAt = startedRow[0]?.started_at ?? null;
|
|
|
|
ctx.publish(sessionId, {
|
|
type: 'message_started',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
role: 'assistant',
|
|
});
|
|
|
|
let pendingFlushTimer: NodeJS.Timeout | null = null;
|
|
let flushPromise: Promise<unknown> = Promise.resolve();
|
|
|
|
const flushNow = () => {
|
|
if (pendingFlushTimer) {
|
|
clearTimeout(pendingFlushTimer);
|
|
pendingFlushTimer = null;
|
|
}
|
|
const snapshot = state.accumulated;
|
|
flushPromise = flushPromise.then(() =>
|
|
ctx.sql`UPDATE messages SET content = ${snapshot} WHERE id = ${assistantMessageId}`
|
|
);
|
|
};
|
|
|
|
const scheduleFlush = () => {
|
|
if (pendingFlushTimer) return;
|
|
pendingFlushTimer = setTimeout(() => {
|
|
pendingFlushTimer = null;
|
|
flushNow();
|
|
}, DB_FLUSH_INTERVAL_MS);
|
|
};
|
|
|
|
try {
|
|
return await streamCompletion(
|
|
ctx,
|
|
session.model,
|
|
messages,
|
|
true,
|
|
(delta) => {
|
|
state.accumulated += delta;
|
|
ctx.publish(sessionId, {
|
|
type: 'delta',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
content: delta,
|
|
});
|
|
ctx.log.debug({ sessionId, delta }, 'inference delta');
|
|
scheduleFlush();
|
|
},
|
|
signal
|
|
);
|
|
} finally {
|
|
if (pendingFlushTimer) {
|
|
clearTimeout(pendingFlushTimer);
|
|
pendingFlushTimer = null;
|
|
}
|
|
await flushPromise;
|
|
}
|
|
}
|
|
|
|
async function handleAbortOrError(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
accumulated: string,
|
|
err: unknown
|
|
): Promise<void> {
|
|
const { sessionId, chatId, assistantMessageId } = args;
|
|
const isAbort = err instanceof Error && err.name === 'AbortError';
|
|
const finalStatus = isAbort ? 'cancelled' : 'failed';
|
|
await ctx.sql`
|
|
UPDATE messages
|
|
SET status = ${finalStatus},
|
|
content = ${accumulated},
|
|
finished_at = clock_timestamp()
|
|
WHERE id = ${assistantMessageId}
|
|
`;
|
|
const [failSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
|
UPDATE sessions SET updated_at = clock_timestamp()
|
|
WHERE id = ${sessionId}
|
|
RETURNING project_id, name, updated_at
|
|
`;
|
|
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at });
|
|
if (isAbort) {
|
|
ctx.publish(sessionId, {
|
|
type: 'message_complete',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
});
|
|
ctx.log.info({ sessionId, chatId, assistantMessageId }, 'inference cancelled');
|
|
} else {
|
|
const errMsg = err instanceof Error ? err.message : String(err);
|
|
ctx.publish(sessionId, {
|
|
type: 'error',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
error: errMsg,
|
|
});
|
|
ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed');
|
|
}
|
|
}
|
|
|
|
async function executeToolPhase(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
result: StreamResult,
|
|
startedAt: string | null,
|
|
session: Session,
|
|
projectRoot: string
|
|
): Promise<void> {
|
|
const { sessionId, chatId, assistantMessageId, depth, signal } = args;
|
|
const { content, toolCalls, promptTokens, completionTokens, nCtx } = result;
|
|
|
|
const [updated] = await ctx.sql<
|
|
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
|
>`
|
|
UPDATE messages
|
|
SET content = ${content},
|
|
status = 'complete',
|
|
tool_calls = ${ctx.sql.json(toolCalls as never)},
|
|
tokens_used = ${completionTokens},
|
|
ctx_used = ${promptTokens},
|
|
ctx_max = ${nCtx},
|
|
finished_at = clock_timestamp()
|
|
WHERE id = ${assistantMessageId}
|
|
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
|
`;
|
|
const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
|
UPDATE sessions SET updated_at = clock_timestamp()
|
|
WHERE id = ${sessionId}
|
|
RETURNING project_id, name, updated_at
|
|
`;
|
|
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at });
|
|
for (const tc of toolCalls) {
|
|
ctx.publish(sessionId, {
|
|
type: 'tool_call',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
tool_call: tc,
|
|
});
|
|
}
|
|
ctx.publish(sessionId, {
|
|
type: 'message_complete',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
tokens_used: updated?.tokens_used ?? null,
|
|
ctx_used: updated?.ctx_used ?? null,
|
|
ctx_max: updated?.ctx_max ?? null,
|
|
started_at: startedAt,
|
|
finished_at: updated?.finished_at ?? null,
|
|
model: session.model,
|
|
});
|
|
|
|
await Promise.all(
|
|
toolCalls.map(async (tc) => {
|
|
const [toolRow] = await ctx.sql<{ id: string }[]>`
|
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp())
|
|
RETURNING id
|
|
`;
|
|
const toolMessageId = toolRow!.id;
|
|
const tres = await executeToolCall(projectRoot, tc);
|
|
const stored = {
|
|
tool_call_id: tc.id,
|
|
output: tres.output,
|
|
truncated: tres.truncated,
|
|
...(tres.error ? { error: tres.error } : {}),
|
|
};
|
|
await ctx.sql`
|
|
UPDATE messages
|
|
SET tool_results = ${ctx.sql.json(stored as never)}
|
|
WHERE id = ${toolMessageId}
|
|
`;
|
|
ctx.publish(sessionId, {
|
|
type: 'tool_result',
|
|
tool_message_id: toolMessageId,
|
|
chat_id: chatId,
|
|
tool_call_id: tc.id,
|
|
output: tres.output,
|
|
truncated: tres.truncated,
|
|
...(tres.error ? { error: tres.error } : {}),
|
|
});
|
|
})
|
|
);
|
|
|
|
const [nextAssistant] = await ctx.sql<{ id: string }[]>`
|
|
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
|
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
|
RETURNING id
|
|
`;
|
|
await runAssistantTurn(ctx, {
|
|
sessionId,
|
|
chatId,
|
|
assistantMessageId: nextAssistant!.id,
|
|
depth: depth + 1,
|
|
signal,
|
|
});
|
|
}
|
|
|
|
async function finalizeCompletion(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
result: StreamResult,
|
|
startedAt: string | null,
|
|
session: Session
|
|
): Promise<void> {
|
|
const { sessionId, chatId, assistantMessageId } = args;
|
|
const { content, finishReason, promptTokens, completionTokens, nCtx } = result;
|
|
|
|
const [updated] = await ctx.sql<
|
|
{ tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[]
|
|
>`
|
|
UPDATE messages
|
|
SET content = ${content},
|
|
status = 'complete',
|
|
tokens_used = ${completionTokens},
|
|
ctx_used = ${promptTokens},
|
|
ctx_max = ${nCtx},
|
|
finished_at = clock_timestamp()
|
|
WHERE id = ${assistantMessageId}
|
|
RETURNING tokens_used, ctx_used, ctx_max, finished_at
|
|
`;
|
|
const [completeSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
|
|
UPDATE sessions SET updated_at = clock_timestamp()
|
|
WHERE id = ${sessionId}
|
|
RETURNING project_id, name, updated_at
|
|
`;
|
|
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: completeSessRow!.project_id, name: completeSessRow!.name, updated_at: completeSessRow!.updated_at });
|
|
ctx.publish(sessionId, {
|
|
type: 'message_complete',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
tokens_used: updated?.tokens_used ?? null,
|
|
ctx_used: updated?.ctx_used ?? null,
|
|
ctx_max: updated?.ctx_max ?? null,
|
|
started_at: startedAt,
|
|
finished_at: updated?.finished_at ?? null,
|
|
model: session.model,
|
|
});
|
|
ctx.log.info(
|
|
{
|
|
sessionId,
|
|
chatId,
|
|
assistantMessageId,
|
|
finishReason,
|
|
chars: content.length,
|
|
tokens_used: updated?.tokens_used,
|
|
ctx_used: updated?.ctx_used,
|
|
},
|
|
'inference complete'
|
|
);
|
|
}
|
|
|
|
async function runAssistantTurn(
|
|
ctx: InferenceContext,
|
|
args: TurnArgs,
|
|
): Promise<void> {
|
|
const { sessionId, chatId, assistantMessageId, depth } = args;
|
|
|
|
if (depth > MAX_TOOL_LOOP_DEPTH) {
|
|
await ctx.sql`
|
|
UPDATE messages
|
|
SET status = 'failed',
|
|
content = ${'tool loop depth exceeded'},
|
|
finished_at = clock_timestamp()
|
|
WHERE id = ${assistantMessageId}
|
|
`;
|
|
ctx.publish(sessionId, {
|
|
type: 'error',
|
|
message_id: assistantMessageId,
|
|
chat_id: chatId,
|
|
error: 'tool loop depth exceeded',
|
|
});
|
|
return;
|
|
}
|
|
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (!loaded) {
|
|
ctx.log.warn({ sessionId }, 'inference: session or project missing');
|
|
return;
|
|
}
|
|
const { session, project, history } = loaded;
|
|
const projectRoot = await resolveProjectRoot(project.path);
|
|
const messages = buildMessagesPayload(session, project, history);
|
|
|
|
const state: StreamPhaseState = { accumulated: '', startedAt: null };
|
|
let result: StreamResult;
|
|
try {
|
|
result = await executeStreamPhase(ctx, args, session, messages, state);
|
|
} catch (err) {
|
|
await handleAbortOrError(ctx, args, state.accumulated, err);
|
|
return;
|
|
}
|
|
|
|
if (result.toolCalls.length > 0) {
|
|
await executeToolPhase(ctx, args, result, state.startedAt, session, projectRoot);
|
|
return;
|
|
}
|
|
|
|
await finalizeCompletion(ctx, args, result, state.startedAt, session);
|
|
}
|
|
|
|
export async function runInference(
|
|
ctx: InferenceContext,
|
|
sessionId: string,
|
|
chatId: string,
|
|
assistantMessageId: string,
|
|
signal?: AbortSignal
|
|
): Promise<void> {
|
|
return runAssistantTurn(ctx, { sessionId, chatId, assistantMessageId, depth: 0, signal });
|
|
}
|
|
|
|
const COMPACT_SYSTEM_PROMPT =
|
|
'Summarize the preceding conversation into a dense but complete context paragraph. Preserve all key facts, decisions, file paths, code patterns, and action items. Do not add any new information. Output only the summary paragraph.';
|
|
|
|
async function runCompact(
|
|
ctx: InferenceContext,
|
|
sessionId: string,
|
|
chatId: string,
|
|
compactMessageId: string
|
|
): Promise<void> {
|
|
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
|
if (!loaded) return;
|
|
const { session, project, history } = loaded;
|
|
|
|
const messagesForSummary = buildMessagesPayload(session, project,
|
|
history.filter((m) => m.id !== compactMessageId)
|
|
);
|
|
messagesForSummary.push({
|
|
role: 'system',
|
|
content: COMPACT_SYSTEM_PROMPT,
|
|
});
|
|
|
|
ctx.publish(sessionId, {
|
|
type: 'message_started',
|
|
message_id: compactMessageId,
|
|
chat_id: chatId,
|
|
role: 'assistant',
|
|
});
|
|
|
|
let content = '';
|
|
try {
|
|
const result = await streamCompletion(
|
|
ctx,
|
|
session.model,
|
|
messagesForSummary,
|
|
false,
|
|
(delta) => {
|
|
content += delta;
|
|
ctx.publish(sessionId, {
|
|
type: 'delta',
|
|
message_id: compactMessageId,
|
|
chat_id: chatId,
|
|
content: delta,
|
|
});
|
|
}
|
|
);
|
|
content = result.content;
|
|
} catch (err) {
|
|
const errMsg = err instanceof Error ? err.message : String(err);
|
|
await ctx.sql`
|
|
UPDATE messages SET status = 'failed', content = ${content}, finished_at = clock_timestamp()
|
|
WHERE id = ${compactMessageId}
|
|
`;
|
|
ctx.publish(sessionId, {
|
|
type: 'error',
|
|
message_id: compactMessageId,
|
|
chat_id: chatId,
|
|
error: errMsg,
|
|
});
|
|
return;
|
|
}
|
|
|
|
const preCompactCount = history.filter((m) => m.id !== compactMessageId && m.kind !== 'compact').length;
|
|
const summary = `[Context compacted — ${preCompactCount} messages summarized]\n\n${content}`;
|
|
|
|
await ctx.sql`
|
|
UPDATE messages SET content = ${summary}, status = 'complete', finished_at = clock_timestamp()
|
|
WHERE id = ${compactMessageId}
|
|
`;
|
|
ctx.publish(sessionId, {
|
|
type: 'message_complete',
|
|
message_id: compactMessageId,
|
|
chat_id: chatId,
|
|
});
|
|
}
|
|
|
|
interface InferenceRegistration {
|
|
controller: AbortController;
|
|
completed: Promise<void>;
|
|
}
|
|
|
|
export function createInferenceRunner(
|
|
ctx: Omit<InferenceContext, 'publishUser'>,
|
|
publishUserFn: (user: string, frame: UserStreamFrame) => void
|
|
) {
|
|
const registry = new Map<string, InferenceRegistration>();
|
|
|
|
return {
|
|
enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) {
|
|
const callCtx: InferenceContext = {
|
|
...ctx,
|
|
publishUser: (frame) => publishUserFn(user, frame),
|
|
};
|
|
const controller = new AbortController();
|
|
let resolveCompleted!: () => void;
|
|
const completed = new Promise<void>((res) => { resolveCompleted = res; });
|
|
const registration: InferenceRegistration = { controller, completed };
|
|
registry.set(chatId, registration);
|
|
void (async () => {
|
|
try {
|
|
await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal);
|
|
setImmediate(() => {
|
|
void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => {
|
|
callCtx.log.warn({ err, chatId }, 'auto-name failed');
|
|
});
|
|
});
|
|
} catch (err) {
|
|
callCtx.log.error({ err }, 'unhandled inference error');
|
|
} finally {
|
|
resolveCompleted();
|
|
// Only clear our own registration; a force-send may have replaced it.
|
|
if (registry.get(chatId) === registration) {
|
|
registry.delete(chatId);
|
|
}
|
|
}
|
|
})();
|
|
},
|
|
|
|
enqueueCompact(sessionId: string, chatId: string, compactMessageId: string, user: string) {
|
|
const callCtx: InferenceContext = {
|
|
...ctx,
|
|
publishUser: (frame) => publishUserFn(user, frame),
|
|
};
|
|
void (async () => {
|
|
try {
|
|
await runCompact(callCtx, sessionId, chatId, compactMessageId);
|
|
} catch (err) {
|
|
callCtx.log.error({ err }, 'unhandled compact error');
|
|
}
|
|
})();
|
|
},
|
|
|
|
async cancel(_sessionId: string, chatId: string): Promise<boolean> {
|
|
const reg = registry.get(chatId);
|
|
if (!reg) return false;
|
|
reg.controller.abort();
|
|
// Swallow — we just need to wait for the catch/finally to persist state.
|
|
await reg.completed.catch(() => {});
|
|
return true;
|
|
},
|
|
|
|
hasActive(chatId: string): boolean {
|
|
return registry.has(chatId);
|
|
},
|
|
};
|
|
}
|
|
|
|
export const _toolNames = ALL_TOOLS.map((t) => t.name);
|