Files
boocode/apps/coder/src/services/dispatcher.ts
indifferentketchup 3d6055518b v2.0.1: ACP dispatch + PTY fallback + worktree management
Phase 5 of v2.0. External agent dispatch via SSH to host.

ACP dispatch (acp-dispatch.ts): spawns agent via SSH with JSON-RPC
stdio pipe. Wraps opencode/goose in ACP mode. Captures structured
events (file operations, tool calls) mapped to parts taxonomy.
Falls back to PTY if ACP handshake fails.

PTY dispatch (pty-dispatch.ts): raw SSH spawn for agents without ACP
support (claude, pi). Captures stdout/stderr as plain text. Simpler
but less structured than ACP.

SSH helper (ssh.ts): shared spawn wrapper for SSH commands to
samkintop@100.114.205.53 (Tailscale IP, same as booterm). Uses
openssh-client installed in the runtime Dockerfile stage.

Worktree management (worktrees.ts): createWorktree (git worktree add
via SSH), diffWorktree (git diff HEAD...task-branch), cleanupWorktree
(git worktree remove --force). One worktree per task at
/tmp/booworktrees/<taskId>.

Dispatcher updated: checks available_agents.supports_acp to pick
transport. Path B flow: create worktree → dispatch agent → diff
worktree → queue diff into pending_changes → cleanup worktree →
mark task complete.

Agent probe updated: probes via SSH to find host-installed agents
(which opencode && opencode --version over SSH).

Dockerfile: openssh-client added to runtime stage.
Config: SSH_HOST env var (default 100.114.205.53).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 04:10:46 +00:00

369 lines
13 KiB
TypeScript
Raw Blame History

import type { Sql } from '../db.js';
import type { FastifyBaseLogger } from 'fastify';
import type { Broker } from '@boocode/server/broker';
import type { Config } from '../config.js';
import { createWorktree, diffWorktree, cleanupWorktree } from './worktrees.js';
import { dispatchViaAcp } from './acp-dispatch.js';
import { dispatchViaPty } from './pty-dispatch.js';
interface InferenceRunner {
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
hasActive: (chatId: string) => boolean;
}
interface Deps {
sql: Sql;
inference: InferenceRunner;
broker: Broker;
log: FastifyBaseLogger;
config: Config;
}
const POLL_INTERVAL_MS = 5_000;
const COMPLETION_POLL_MS = 2_000;
export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<void> } {
const { sql, inference, log, config } = deps;
let timer: ReturnType<typeof setInterval> | null = null;
let running = false;
let stopping = false;
let inflightPromise: Promise<void> | null = null;
async function poll(): Promise<void> {
if (running || stopping) return;
// Grab one pending task
const rows = await sql<{ id: string; project_id: string; input: string; agent: string | null; model: string | null }[]>`
SELECT id, project_id, input, agent, model
FROM tasks
WHERE state = 'pending'
ORDER BY created_at
LIMIT 1
`;
if (rows.length === 0) return;
const task = rows[0]!;
running = true;
inflightPromise = runTask(task).finally(() => {
running = false;
inflightPromise = null;
});
}
async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise<void> {
const taskId = task.id;
// Determine execution path: if agent is specified AND exists in available_agents → Path B
if (task.agent) {
const [agentRow] = await sql<{ name: string; supports_acp: boolean }[]>`
SELECT name, supports_acp FROM available_agents WHERE name = ${task.agent}
`;
if (agentRow) {
await runExternalAgent(task, agentRow.supports_acp);
return;
}
// Agent specified but not available — fall through to Path A with a warning
log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native');
}
// Path A — native inference (existing behavior)
await runNativeInference(task);
}
// ─── Path A: Native Inference ───────────────────────────────────────────────
async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise<void> {
const taskId = task.id;
log.info({ taskId }, 'dispatcher: starting task (path A — native)');
try {
// Mark running
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = 'native'
WHERE id = ${taskId}
`;
// Create session + chat for this task
const model = task.model ?? config.DEFAULT_MODEL;
const sessionName = 'Task: ' + task.input.slice(0, 40);
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${task.project_id}, ${sessionName}, ${model}, 'open')
RETURNING id
`;
const sessionId = session!.id;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'Task execution', 'open')
RETURNING id
`;
const chatId = chat!.id;
// Link task to session
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
// Create user message + streaming assistant
await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
RETURNING id
`;
const [assistantMsg] = await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
const assistantId = assistantMsg!.id;
// Enqueue inference
inference.enqueue(sessionId, chatId, assistantId, 'default');
// Wait for inference to complete (poll message status)
const finalStatus = await waitForCompletion(assistantId);
if (stopping) {
await sql`
UPDATE tasks
SET state = 'cancelled', ended_at = clock_timestamp()
WHERE id = ${taskId}
`;
return;
}
if (finalStatus === 'complete') {
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
const summary = (msg?.content ?? '').slice(0, 500);
await sql`
UPDATE tasks
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}
WHERE id = ${taskId}
`;
log.info({ taskId }, 'dispatcher: task completed (native)');
} else {
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
const summary = (msg?.content ?? 'Inference failed').slice(0, 500);
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}
WHERE id = ${taskId}
`;
log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)');
}
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {});
}
}
// ─── Path B: External Agent Dispatch ──────<E29480><E29480><EFBFBD>─────────────────────────────────
async function runExternalAgent(
task: { id: string; project_id: string; input: string; agent: string | null; model: string | null },
supportsAcp: boolean,
): Promise<void> {
const taskId = task.id;
const agent = task.agent!;
const executionPath = supportsAcp ? 'acp' : 'pty';
log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)');
// Resolve the project's root path
const [project] = await sql<{ root_path: string | null }[]>`
SELECT root_path FROM projects WHERE id = ${task.project_id}
`;
const projectPath = project?.root_path;
if (!projectPath) {
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no root_path — cannot create worktree'
WHERE id = ${taskId}
`;
return;
}
// Create an abort controller for this task
const ac = new AbortController();
try {
// Mark running
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath}
WHERE id = ${taskId}
`;
// Create session + chat for this task (same as Path A — for output tracking)
const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`;
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open')
RETURNING id
`;
const sessionId = session!.id;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'External agent execution', 'open')
RETURNING id
`;
const chatId = chat!.id;
// Link task to session
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
// Create user message for the task input
await sql`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
`;
// Step 1: Create worktree
log.info({ taskId, projectPath }, 'dispatcher: creating worktree');
const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal });
log.info({ taskId, worktreePath }, 'dispatcher: worktree created');
// Step 2: Dispatch to agent
let outputSummary: string;
if (supportsAcp) {
const result = await dispatchViaAcp({
agent,
task: task.input,
worktreePath,
model: task.model ?? undefined,
signal: ac.signal,
log,
});
outputSummary = result.output.slice(0, 500);
// Store agent output as an assistant message
await sql`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', ${result.output.slice(0, 50_000)}, 'complete', clock_timestamp())
`;
} else {
const result = await dispatchViaPty({
agent,
task: task.input,
worktreePath,
model: task.model ?? undefined,
signal: ac.signal,
log,
});
outputSummary = (result.stdout || result.stderr).slice(0, 500);
// Store agent output as an assistant message
const content = result.stdout || result.stderr || '(no output)';
await sql`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', ${content.slice(0, 50_000)}, 'complete', clock_timestamp())
`;
}
if (stopping) {
await sql`
UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}
`;
await cleanupWorktree(projectPath, taskId);
return;
}
// Step 3: Diff the worktree and queue pending changes
log.info({ taskId }, 'dispatcher: diffing worktree');
const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal });
if (diff) {
// Queue a single pending_change entry with the full unified diff
await sql`
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff)
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff})
`;
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change');
} else {
log.info({ taskId }, 'dispatcher: no changes detected in worktree');
}
// Step 4: Cleanup worktree
await cleanupWorktree(projectPath, taskId);
// Step 5: Mark task completed
await sql`
UPDATE tasks
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}
WHERE id = ${taskId}
`;
log.info({ taskId, agent }, 'dispatcher: task completed (external)');
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {});
// Best-effort cleanup
await cleanupWorktree(projectPath, taskId);
}
}
// ─── Helpers ────────────────────────────────────────────────────────────────
async function waitForCompletion(assistantId: string): Promise<string> {
for (;;) {
if (stopping) return 'cancelled';
const [row] = await sql<{ status: string }[]>`
SELECT status FROM messages WHERE id = ${assistantId}
`;
const status = row?.status ?? 'failed';
if (status !== 'streaming') return status;
await sleep(COMPLETION_POLL_MS);
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
return {
start() {
log.info('dispatcher: starting poll loop');
timer = setInterval(() => {
poll().catch((err) => {
log.error({ err }, 'dispatcher: poll error');
});
}, POLL_INTERVAL_MS);
},
async stop() {
stopping = true;
if (timer) {
clearInterval(timer);
timer = null;
}
if (inflightPromise) {
log.info('dispatcher: waiting for in-flight task');
await inflightPromise;
}
log.info('dispatcher: stopped');
},
};
}