Ship Paseo-equivalent provider snapshot, AgentComposerBar, ACP dispatch rewrite with streaming/persist, permission prompts, and agent commands. Follow-up: pane-scoped chat resolution, CoderMessageList tool timeline, WS user-delta replace, and inference orphan tool_call stripping. Archive openspec v2-2; update CHANGELOG and CURRENT. Co-authored-by: Cursor <cursoragent@cursor.com>
488 lines
17 KiB
TypeScript
488 lines
17 KiB
TypeScript
import type { Sql } from '../db.js';
|
||
import type { FastifyBaseLogger } from 'fastify';
|
||
import type { Broker } from '@boocode/server/broker';
|
||
import type { WsFrame } from '@boocode/server/ws-frames';
|
||
import type { Config } from '../config.js';
|
||
import { createWorktree, diffWorktree, cleanupWorktree } from './worktrees.js';
|
||
import { dispatchViaAcp } from './acp-dispatch.js';
|
||
import { dispatchViaPty } from './pty-dispatch.js';
|
||
import { clearTaskCommands, setTaskCommands } from './agent-commands-cache.js';
|
||
import { getManifestCommands } from './provider-commands.js';
|
||
import { persistExternalAgentTurn } from './agent-turn-persist.js';
|
||
|
||
interface InferenceRunner {
|
||
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
|
||
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
|
||
hasActive: (chatId: string) => boolean;
|
||
}
|
||
|
||
interface Deps {
|
||
sql: Sql;
|
||
inference: InferenceRunner;
|
||
broker: Broker;
|
||
log: FastifyBaseLogger;
|
||
config: Config;
|
||
}
|
||
|
||
const POLL_INTERVAL_MS = 5_000;
|
||
const COMPLETION_POLL_MS = 2_000;
|
||
|
||
export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<void> } {
|
||
const { sql, inference, broker, log, config } = deps;
|
||
let timer: ReturnType<typeof setInterval> | null = null;
|
||
let running = false;
|
||
let stopping = false;
|
||
let inflightPromise: Promise<void> | null = null;
|
||
|
||
async function poll(): Promise<void> {
|
||
if (running || stopping) return;
|
||
|
||
// Grab one pending task
|
||
const rows = await sql<{
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
}[]>`
|
||
SELECT id, project_id, input, agent, model, mode_id, thinking_option_id, session_id
|
||
FROM tasks
|
||
WHERE state = 'pending'
|
||
ORDER BY created_at
|
||
LIMIT 1
|
||
`;
|
||
if (rows.length === 0) return;
|
||
|
||
const task = rows[0]!;
|
||
running = true;
|
||
inflightPromise = runTask(task).finally(() => {
|
||
running = false;
|
||
inflightPromise = null;
|
||
});
|
||
}
|
||
|
||
async function runTask(task: {
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
}): Promise<void> {
|
||
const taskId = task.id;
|
||
|
||
// Determine execution path: if agent is specified AND exists in available_agents → Path B
|
||
if (task.agent) {
|
||
const [agentRow] = await sql<{ name: string; supports_acp: boolean; install_path: string | null }[]>`
|
||
SELECT name, supports_acp, install_path FROM available_agents WHERE name = ${task.agent}
|
||
`;
|
||
if (agentRow) {
|
||
await runExternalAgent(task, agentRow.supports_acp, agentRow.install_path);
|
||
return;
|
||
}
|
||
// Agent specified but not available — fall through to Path A with a warning
|
||
log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native');
|
||
}
|
||
|
||
// Path A — native inference (existing behavior)
|
||
await runNativeInference(task);
|
||
}
|
||
|
||
// ─── Path A: Native Inference ───────────────────────────────────────────────
|
||
|
||
async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; session_id: string | null }): Promise<void> {
|
||
const taskId = task.id;
|
||
log.info({ taskId }, 'dispatcher: starting task (path A — native)');
|
||
|
||
try {
|
||
// Mark running
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'running', started_at = clock_timestamp(), execution_path = 'native'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
|
||
// Create session + chat for this task
|
||
const model = task.model ?? config.DEFAULT_MODEL;
|
||
const sessionName = 'Task: ' + task.input.slice(0, 40);
|
||
|
||
const [session] = await sql<{ id: string }[]>`
|
||
INSERT INTO sessions (project_id, name, model, status)
|
||
VALUES (${task.project_id}, ${sessionName}, ${model}, 'open')
|
||
RETURNING id
|
||
`;
|
||
const sessionId = session!.id;
|
||
|
||
const [chat] = await sql<{ id: string }[]>`
|
||
INSERT INTO chats (session_id, name, status)
|
||
VALUES (${sessionId}, 'Task execution', 'open')
|
||
RETURNING id
|
||
`;
|
||
const chatId = chat!.id;
|
||
|
||
// Link task to session
|
||
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
|
||
|
||
// Create user message + streaming assistant
|
||
await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
const [assistantMsg] = await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
const assistantId = assistantMsg!.id;
|
||
|
||
// Enqueue inference
|
||
inference.enqueue(sessionId, chatId, assistantId, 'default');
|
||
|
||
// Wait for inference to complete (poll message status)
|
||
const finalStatus = await waitForCompletion(assistantId);
|
||
|
||
if (stopping) {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'cancelled', ended_at = clock_timestamp()
|
||
WHERE id = ${taskId}
|
||
`;
|
||
return;
|
||
}
|
||
|
||
// Aggregate token cost for the task's session
|
||
const [costRow] = await sql<{ total: number | null }[]>`
|
||
SELECT SUM(tokens_used)::int AS total
|
||
FROM messages
|
||
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
|
||
`;
|
||
const costTokens = costRow?.total ?? null;
|
||
|
||
if (finalStatus === 'complete') {
|
||
const [msg] = await sql<{ content: string | null }[]>`
|
||
SELECT content FROM messages WHERE id = ${assistantId}
|
||
`;
|
||
const summary = (msg?.content ?? '').slice(0, 500);
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.info({ taskId, costTokens }, 'dispatcher: task completed (native)');
|
||
} else {
|
||
const [msg] = await sql<{ content: string | null }[]>`
|
||
SELECT content FROM messages WHERE id = ${assistantId}
|
||
`;
|
||
const summary = (msg?.content ?? 'Inference failed').slice(0, 500);
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)');
|
||
}
|
||
} catch (err) {
|
||
const errMsg = err instanceof Error ? err.message : String(err);
|
||
log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)');
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
|
||
WHERE id = ${taskId}
|
||
`.catch(() => {});
|
||
}
|
||
}
|
||
|
||
// ─── Path B: External Agent Dispatch ──────<E29480><E29480><EFBFBD>─────────────────────────────────
|
||
|
||
async function runExternalAgent(
|
||
task: {
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
},
|
||
supportsAcp: boolean,
|
||
installPath: string | null,
|
||
): Promise<void> {
|
||
const taskId = task.id;
|
||
const agent = task.agent!;
|
||
const executionPath = supportsAcp ? 'acp' : 'pty';
|
||
|
||
log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)');
|
||
|
||
// Resolve the project's root path
|
||
const [project] = await sql<{ path: string | null }[]>`
|
||
SELECT path FROM projects WHERE id = ${task.project_id}
|
||
`;
|
||
const projectPath = project?.path;
|
||
if (!projectPath) {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
return;
|
||
}
|
||
|
||
// Create an abort controller for this task
|
||
const ac = new AbortController();
|
||
|
||
try {
|
||
// Mark running
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
|
||
let sessionId: string;
|
||
let chatId: string;
|
||
|
||
if (task.session_id) {
|
||
sessionId = task.session_id;
|
||
const chats = await sql<{ id: string }[]>`
|
||
SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1
|
||
`;
|
||
if (chats.length === 0) {
|
||
const [chat] = await sql<{ id: string }[]>`
|
||
INSERT INTO chats (session_id, name, status)
|
||
VALUES (${sessionId}, 'External agent execution', 'open')
|
||
RETURNING id
|
||
`;
|
||
chatId = chat!.id;
|
||
} else {
|
||
chatId = chats[0]!.id;
|
||
}
|
||
} else {
|
||
const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`;
|
||
const [session] = await sql<{ id: string }[]>`
|
||
INSERT INTO sessions (project_id, name, model, status)
|
||
VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open')
|
||
RETURNING id
|
||
`;
|
||
sessionId = session!.id;
|
||
|
||
const [chat] = await sql<{ id: string }[]>`
|
||
INSERT INTO chats (session_id, name, status)
|
||
VALUES (${sessionId}, 'External agent execution', 'open')
|
||
RETURNING id
|
||
`;
|
||
chatId = chat!.id;
|
||
|
||
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
|
||
}
|
||
|
||
if (!task.session_id) {
|
||
await sql`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
|
||
`;
|
||
}
|
||
|
||
// Step 1: Create worktree
|
||
log.info({ taskId, projectPath }, 'dispatcher: creating worktree');
|
||
const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal });
|
||
log.info({ taskId, worktreePath }, 'dispatcher: worktree created');
|
||
|
||
// Step 2: Dispatch to agent
|
||
let outputSummary: string;
|
||
let assistantContent = '';
|
||
let acpReasoning = '';
|
||
|
||
const [assistantMsg] = await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
const assistantId = assistantMsg!.id;
|
||
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_started',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
role: 'assistant',
|
||
} as WsFrame);
|
||
|
||
const manifestCommands = getManifestCommands(agent);
|
||
if (manifestCommands.length > 0) {
|
||
setTaskCommands(taskId, manifestCommands);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'agent_commands',
|
||
task_id: taskId,
|
||
session_id: sessionId,
|
||
commands: manifestCommands,
|
||
} as WsFrame);
|
||
}
|
||
|
||
if (supportsAcp) {
|
||
const result = await dispatchViaAcp({
|
||
agent,
|
||
task: task.input,
|
||
worktreePath,
|
||
installPath: installPath ?? undefined,
|
||
model: task.model ?? undefined,
|
||
modeId: task.mode_id ?? undefined,
|
||
thinkingOptionId: task.thinking_option_id ?? undefined,
|
||
taskId,
|
||
sessionId,
|
||
chatId,
|
||
messageId: assistantId,
|
||
broker,
|
||
signal: ac.signal,
|
||
log,
|
||
});
|
||
assistantContent = result.output.slice(0, 50_000);
|
||
acpReasoning = result.reasoningText.slice(0, 200_000);
|
||
outputSummary = result.output.slice(0, 500);
|
||
await persistExternalAgentTurn(sql, assistantId, result.toolSnapshots, acpReasoning);
|
||
} else {
|
||
const result = await dispatchViaPty({
|
||
agent,
|
||
task: task.input,
|
||
worktreePath,
|
||
installPath: installPath ?? undefined,
|
||
model: task.model ?? undefined,
|
||
modeId: task.mode_id ?? undefined,
|
||
thinkingOptionId: task.thinking_option_id ?? undefined,
|
||
signal: ac.signal,
|
||
log,
|
||
});
|
||
assistantContent = (result.stdout || result.stderr || '(no output)').slice(0, 50_000);
|
||
outputSummary = (result.stdout || result.stderr).slice(0, 500);
|
||
|
||
if (assistantContent) {
|
||
broker.publishFrame(sessionId, {
|
||
type: 'delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: assistantContent,
|
||
} as WsFrame);
|
||
}
|
||
}
|
||
|
||
await sql`
|
||
UPDATE messages
|
||
SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp()
|
||
WHERE id = ${assistantId}
|
||
`;
|
||
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_complete',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
} as WsFrame);
|
||
|
||
if (stopping) {
|
||
await sql`
|
||
UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}
|
||
`;
|
||
await cleanupWorktree(projectPath, taskId);
|
||
return;
|
||
}
|
||
|
||
// Step 3: Diff the worktree and queue pending changes
|
||
log.info({ taskId }, 'dispatcher: diffing worktree');
|
||
const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal });
|
||
|
||
if (diff) {
|
||
// Queue a single pending_change entry with the full unified diff
|
||
await sql`
|
||
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff)
|
||
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff})
|
||
`;
|
||
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change');
|
||
} else {
|
||
log.info({ taskId }, 'dispatcher: no changes detected in worktree');
|
||
}
|
||
|
||
// Step 4: Cleanup worktree
|
||
await cleanupWorktree(projectPath, taskId);
|
||
|
||
// Step 5: Aggregate token cost
|
||
const [extCostRow] = await sql<{ total: number | null }[]>`
|
||
SELECT SUM(tokens_used)::int AS total
|
||
FROM messages
|
||
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
|
||
`;
|
||
const extCostTokens = extCostRow?.total ?? null;
|
||
|
||
// Step 6: Mark task completed
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.info({ taskId, agent, costTokens: extCostTokens }, 'dispatcher: task completed (external)');
|
||
clearTaskCommands(taskId);
|
||
|
||
} catch (err) {
|
||
const errMsg = err instanceof Error ? err.message : String(err);
|
||
log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error');
|
||
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
|
||
WHERE id = ${taskId}
|
||
`.catch(() => {});
|
||
|
||
// Best-effort cleanup
|
||
await cleanupWorktree(projectPath, taskId);
|
||
clearTaskCommands(taskId);
|
||
}
|
||
}
|
||
|
||
// ─── Helpers ────────────────────────────────────────────────────────────────
|
||
|
||
async function waitForCompletion(assistantId: string): Promise<string> {
|
||
for (;;) {
|
||
if (stopping) return 'cancelled';
|
||
|
||
const [row] = await sql<{ status: string }[]>`
|
||
SELECT status FROM messages WHERE id = ${assistantId}
|
||
`;
|
||
const status = row?.status ?? 'failed';
|
||
if (status !== 'streaming') return status;
|
||
|
||
await sleep(COMPLETION_POLL_MS);
|
||
}
|
||
}
|
||
|
||
function sleep(ms: number): Promise<void> {
|
||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||
}
|
||
|
||
return {
|
||
start() {
|
||
log.info('dispatcher: starting poll loop');
|
||
timer = setInterval(() => {
|
||
poll().catch((err) => {
|
||
log.error({ err }, 'dispatcher: poll error');
|
||
});
|
||
}, POLL_INTERVAL_MS);
|
||
},
|
||
|
||
async stop() {
|
||
stopping = true;
|
||
if (timer) {
|
||
clearInterval(timer);
|
||
timer = null;
|
||
}
|
||
if (inflightPromise) {
|
||
log.info('dispatcher: waiting for in-flight task');
|
||
await inflightPromise;
|
||
}
|
||
log.info('dispatcher: stopped');
|
||
},
|
||
};
|
||
}
|