v2.0.1: ACP dispatch + PTY fallback + worktree management

Phase 5 of v2.0. External agent dispatch via SSH to host.

ACP dispatch (acp-dispatch.ts): spawns agent via SSH with JSON-RPC
stdio pipe. Wraps opencode/goose in ACP mode. Captures structured
events (file operations, tool calls) mapped to parts taxonomy.
Falls back to PTY if ACP handshake fails.

PTY dispatch (pty-dispatch.ts): raw SSH spawn for agents without ACP
support (claude, pi). Captures stdout/stderr as plain text. Simpler
but less structured than ACP.

SSH helper (ssh.ts): shared spawn wrapper for SSH commands to
samkintop@100.114.205.53 (Tailscale IP, same as booterm). Uses
openssh-client installed in the runtime Dockerfile stage.

Worktree management (worktrees.ts): createWorktree (git worktree add
via SSH), diffWorktree (git diff HEAD...task-branch), cleanupWorktree
(git worktree remove --force). One worktree per task at
/tmp/booworktrees/<taskId>.

Dispatcher updated: checks available_agents.supports_acp to pick
transport. Path B flow: create worktree → dispatch agent → diff
worktree → queue diff into pending_changes → cleanup worktree →
mark task complete.

Agent probe updated: probes via SSH to find host-installed agents
(which opencode && opencode --version over SSH).

Dockerfile: openssh-client added to runtime stage.
Config: SSH_HOST env var (default 100.114.205.53).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 04:10:46 +00:00
parent 752ea74f43
commit 3d6055518b
10 changed files with 891 additions and 25 deletions

View File

@@ -2,6 +2,9 @@ import type { Sql } from '../db.js';
import type { FastifyBaseLogger } from 'fastify';
import type { Broker } from '@boocode/server/broker';
import type { Config } from '../config.js';
import { createWorktree, diffWorktree, cleanupWorktree } from './worktrees.js';
import { dispatchViaAcp } from './acp-dispatch.js';
import { dispatchViaPty } from './pty-dispatch.js';
interface InferenceRunner {
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
@@ -50,7 +53,29 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise<void> {
const taskId = task.id;
log.info({ taskId }, 'dispatcher: starting task');
// Determine execution path: if agent is specified AND exists in available_agents → Path B
if (task.agent) {
const [agentRow] = await sql<{ name: string; supports_acp: boolean }[]>`
SELECT name, supports_acp FROM available_agents WHERE name = ${task.agent}
`;
if (agentRow) {
await runExternalAgent(task, agentRow.supports_acp);
return;
}
// Agent specified but not available — fall through to Path A with a warning
log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native');
}
// Path A — native inference (existing behavior)
await runNativeInference(task);
}
// ─── Path A: Native Inference ───────────────────────────────────────────────
async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise<void> {
const taskId = task.id;
log.info({ taskId }, 'dispatcher: starting task (path A — native)');
try {
// Mark running
@@ -101,7 +126,6 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
const finalStatus = await waitForCompletion(assistantId);
if (stopping) {
// Graceful shutdown — mark cancelled
await sql`
UPDATE tasks
SET state = 'cancelled', ended_at = clock_timestamp()
@@ -111,7 +135,6 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
}
if (finalStatus === 'complete') {
// Grab assistant content for output_summary
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
@@ -121,9 +144,8 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}
WHERE id = ${taskId}
`;
log.info({ taskId }, 'dispatcher: task completed');
log.info({ taskId }, 'dispatcher: task completed (native)');
} else {
// failed or cancelled
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
@@ -133,21 +155,176 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}
WHERE id = ${taskId}
`;
log.warn({ taskId, finalStatus }, 'dispatcher: task failed');
log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)');
}
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, err: errMsg }, 'dispatcher: task error');
log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {}); // best-effort
`.catch(() => {});
}
}
// ─── Path B: External Agent Dispatch ──────<E29480><E29480><EFBFBD>─────────────────────────────────
async function runExternalAgent(
task: { id: string; project_id: string; input: string; agent: string | null; model: string | null },
supportsAcp: boolean,
): Promise<void> {
const taskId = task.id;
const agent = task.agent!;
const executionPath = supportsAcp ? 'acp' : 'pty';
log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)');
// Resolve the project's root path
const [project] = await sql<{ root_path: string | null }[]>`
SELECT root_path FROM projects WHERE id = ${task.project_id}
`;
const projectPath = project?.root_path;
if (!projectPath) {
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no root_path — cannot create worktree'
WHERE id = ${taskId}
`;
return;
}
// Create an abort controller for this task
const ac = new AbortController();
try {
// Mark running
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath}
WHERE id = ${taskId}
`;
// Create session + chat for this task (same as Path A — for output tracking)
const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`;
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open')
RETURNING id
`;
const sessionId = session!.id;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'External agent execution', 'open')
RETURNING id
`;
const chatId = chat!.id;
// Link task to session
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
// Create user message for the task input
await sql`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
`;
// Step 1: Create worktree
log.info({ taskId, projectPath }, 'dispatcher: creating worktree');
const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal });
log.info({ taskId, worktreePath }, 'dispatcher: worktree created');
// Step 2: Dispatch to agent
let outputSummary: string;
if (supportsAcp) {
const result = await dispatchViaAcp({
agent,
task: task.input,
worktreePath,
model: task.model ?? undefined,
signal: ac.signal,
log,
});
outputSummary = result.output.slice(0, 500);
// Store agent output as an assistant message
await sql`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', ${result.output.slice(0, 50_000)}, 'complete', clock_timestamp())
`;
} else {
const result = await dispatchViaPty({
agent,
task: task.input,
worktreePath,
model: task.model ?? undefined,
signal: ac.signal,
log,
});
outputSummary = (result.stdout || result.stderr).slice(0, 500);
// Store agent output as an assistant message
const content = result.stdout || result.stderr || '(no output)';
await sql`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', ${content.slice(0, 50_000)}, 'complete', clock_timestamp())
`;
}
if (stopping) {
await sql`
UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}
`;
await cleanupWorktree(projectPath, taskId);
return;
}
// Step 3: Diff the worktree and queue pending changes
log.info({ taskId }, 'dispatcher: diffing worktree');
const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal });
if (diff) {
// Queue a single pending_change entry with the full unified diff
await sql`
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff)
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff})
`;
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change');
} else {
log.info({ taskId }, 'dispatcher: no changes detected in worktree');
}
// Step 4: Cleanup worktree
await cleanupWorktree(projectPath, taskId);
// Step 5: Mark task completed
await sql`
UPDATE tasks
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}
WHERE id = ${taskId}
`;
log.info({ taskId, agent }, 'dispatcher: task completed (external)');
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {});
// Best-effort cleanup
await cleanupWorktree(projectPath, taskId);
}
}
// ─── Helpers ────────────────────────────────────────────────────────────────
async function waitForCompletion(assistantId: string): Promise<string> {
// Poll until the assistant message is no longer streaming
for (;;) {
if (stopping) return 'cancelled';