Compare commits
1 Commits
v2.0.0-fin
...
v2.0.1
| Author | SHA1 | Date | |
|---|---|---|---|
| 3d6055518b |
@@ -23,7 +23,7 @@ RUN pnpm deploy --filter=@boocode/coder --prod --legacy /out/coder
|
||||
|
||||
|
||||
FROM node:20-bookworm-slim AS runtime
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends ripgrep git && rm -rf /var/lib/apt/lists/*
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends ripgrep git openssh-client && rm -rf /var/lib/apt/lists/*
|
||||
WORKDIR /app
|
||||
|
||||
COPY --from=builder /out/coder ./
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
"test": "vitest run"
|
||||
},
|
||||
"dependencies": {
|
||||
"@agentclientprotocol/sdk": "^0.22.1",
|
||||
"@boocode/server": "workspace:*",
|
||||
"@fastify/static": "^7.0.4",
|
||||
"@fastify/websocket": "^10.0.1",
|
||||
|
||||
@@ -23,6 +23,9 @@ const ConfigSchema = z.object({
|
||||
GITEA_TOKEN: z.string().optional(),
|
||||
GITEA_SSH_HOST: z.string().default('100.114.205.53:2222'),
|
||||
MCP_CONFIG_PATH: z.string().optional(),
|
||||
// SSH access to the host for external agent dispatch (Phase 5)
|
||||
BOOCODER_SSH_HOST: z.string().default('100.114.205.53'),
|
||||
BOOCODER_SSH_USER: z.string().default('samkintop'),
|
||||
});
|
||||
|
||||
export type Config = z.infer<typeof ConfigSchema>;
|
||||
|
||||
271
apps/coder/src/services/acp-dispatch.ts
Normal file
271
apps/coder/src/services/acp-dispatch.ts
Normal file
@@ -0,0 +1,271 @@
|
||||
/**
|
||||
* ACP dispatch — runs ACP-capable agents (opencode, goose) on the host via SSH.
|
||||
*
|
||||
* Uses the @agentclientprotocol/sdk to establish a structured JSON-RPC session
|
||||
* with the agent subprocess. The SSH tunnel provides stdio transport.
|
||||
*
|
||||
* Flow:
|
||||
* 1. SSH to host, start `opencode acp` (or `goose acp`) in the worktree
|
||||
* 2. Wrap SSH child's stdin/stdout into NDJSON streams
|
||||
* 3. Create a ClientSideConnection from the SDK
|
||||
* 4. Initialize → newSession → prompt(task)
|
||||
* 5. Collect session updates (tool calls, text output)
|
||||
* 6. On prompt completion → return collected output
|
||||
*/
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import { Readable, Writable } from 'node:stream';
|
||||
import {
|
||||
ClientSideConnection,
|
||||
ndJsonStream,
|
||||
type Client,
|
||||
type SessionNotification,
|
||||
type RequestPermissionRequest,
|
||||
type RequestPermissionResponse,
|
||||
type ReadTextFileRequest,
|
||||
type ReadTextFileResponse,
|
||||
type WriteTextFileRequest,
|
||||
type WriteTextFileResponse,
|
||||
type CreateTerminalRequest,
|
||||
type CreateTerminalResponse,
|
||||
} from '@agentclientprotocol/sdk';
|
||||
import { sshSpawn } from './ssh.js';
|
||||
|
||||
export interface AcpDispatchResult {
|
||||
exitCode: number;
|
||||
output: string;
|
||||
toolCalls: Array<{ title: string; input: unknown; output?: unknown }>;
|
||||
stopReason: string;
|
||||
}
|
||||
|
||||
export interface AcpDispatchOpts {
|
||||
agent: string;
|
||||
task: string;
|
||||
worktreePath: string;
|
||||
model?: string;
|
||||
signal?: AbortSignal;
|
||||
log: FastifyBaseLogger;
|
||||
}
|
||||
|
||||
/** Map agent name to the ACP command it exposes. */
|
||||
function acpCommand(agent: string): string | null {
|
||||
switch (agent) {
|
||||
case 'opencode':
|
||||
return 'opencode acp';
|
||||
case 'goose':
|
||||
return 'goose acp';
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a Node.js Readable stream to a web ReadableStream<Uint8Array>.
|
||||
*/
|
||||
function nodeReadableToWeb(nodeStream: NodeJS.ReadableStream): ReadableStream<Uint8Array> {
|
||||
return new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
nodeStream.on('data', (chunk: Buffer) => {
|
||||
controller.enqueue(new Uint8Array(chunk));
|
||||
});
|
||||
nodeStream.on('end', () => {
|
||||
controller.close();
|
||||
});
|
||||
nodeStream.on('error', (err) => {
|
||||
controller.error(err);
|
||||
});
|
||||
},
|
||||
cancel() {
|
||||
if ('destroy' in nodeStream && typeof (nodeStream as Readable).destroy === 'function') {
|
||||
(nodeStream as Readable).destroy();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a Node.js Writable stream to a web WritableStream<Uint8Array>.
|
||||
*/
|
||||
function nodeWritableToWeb(nodeStream: NodeJS.WritableStream): WritableStream<Uint8Array> {
|
||||
return new WritableStream<Uint8Array>({
|
||||
write(chunk) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const ok = (nodeStream as Writable).write(chunk, (err) => {
|
||||
if (err) reject(err);
|
||||
});
|
||||
if (ok) resolve();
|
||||
else (nodeStream as Writable).once('drain', resolve);
|
||||
});
|
||||
},
|
||||
close() {
|
||||
return new Promise<void>((resolve) => {
|
||||
(nodeStream as Writable).end(resolve);
|
||||
});
|
||||
},
|
||||
abort() {
|
||||
(nodeStream as Writable).destroy();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch a task to an ACP-capable agent via SSH.
|
||||
*
|
||||
* Opens a structured ACP session, sends the task as a prompt, and collects
|
||||
* all session updates. Returns the collected output and tool calls.
|
||||
*/
|
||||
export async function dispatchViaAcp(opts: AcpDispatchOpts): Promise<AcpDispatchResult> {
|
||||
const { agent, task, worktreePath, signal, log } = opts;
|
||||
|
||||
const cmd = acpCommand(agent);
|
||||
if (!cmd) {
|
||||
return {
|
||||
exitCode: 1,
|
||||
output: `Agent '${agent}' does not support ACP.`,
|
||||
toolCalls: [],
|
||||
stopReason: 'error',
|
||||
};
|
||||
}
|
||||
|
||||
// Spawn SSH with the ACP command running in the worktree
|
||||
const escapedPath = worktreePath.replace(/'/g, "'\\''");
|
||||
const fullCommand = `cd '${escapedPath}' && ${cmd}`;
|
||||
|
||||
log.info({ agent, worktreePath }, 'acp-dispatch: spawning');
|
||||
const child = sshSpawn(fullCommand);
|
||||
|
||||
// Wire up abort
|
||||
let killed = false;
|
||||
const cleanup = () => {
|
||||
if (!killed) {
|
||||
killed = true;
|
||||
child.kill('SIGTERM');
|
||||
setTimeout(() => child.kill('SIGKILL'), 5_000);
|
||||
}
|
||||
};
|
||||
|
||||
if (signal) {
|
||||
if (signal.aborted) {
|
||||
cleanup();
|
||||
return { exitCode: 130, output: 'Aborted before start', toolCalls: [], stopReason: 'cancelled' };
|
||||
}
|
||||
signal.addEventListener('abort', cleanup, { once: true });
|
||||
}
|
||||
|
||||
try {
|
||||
// Create web streams from the child process stdio
|
||||
const inputStream = nodeReadableToWeb(child.stdout!);
|
||||
const outputStream = nodeWritableToWeb(child.stdin!);
|
||||
|
||||
// Create the NDJSON ACP stream
|
||||
const stream = ndJsonStream(outputStream, inputStream);
|
||||
|
||||
// Collected session updates
|
||||
const textChunks: string[] = [];
|
||||
const toolCalls: Array<{ title: string; input: unknown; output?: unknown }> = [];
|
||||
|
||||
// Create client-side connection — we are the "client" (editor), the agent is remote
|
||||
const connection = new ClientSideConnection(
|
||||
(_agentInterface): Client => ({
|
||||
// Handle session updates from the agent
|
||||
async sessionUpdate(params: SessionNotification): Promise<void> {
|
||||
const update = params.update;
|
||||
if (update.sessionUpdate === 'agent_message_chunk') {
|
||||
// ContentChunk with content: ContentBlock
|
||||
const content = update.content;
|
||||
if (content.type === 'text' && 'text' in content) {
|
||||
textChunks.push((content as { text: string }).text);
|
||||
}
|
||||
} else if (update.sessionUpdate === 'tool_call') {
|
||||
toolCalls.push({
|
||||
title: update.title,
|
||||
input: update.rawInput,
|
||||
});
|
||||
} else if (update.sessionUpdate === 'tool_call_update') {
|
||||
const last = toolCalls[toolCalls.length - 1];
|
||||
if (last && update.rawOutput !== undefined) {
|
||||
last.output = update.rawOutput;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Permission requests — auto-approve by selecting the first option (worktree is isolated)
|
||||
async requestPermission(params: RequestPermissionRequest): Promise<RequestPermissionResponse> {
|
||||
// Select the first available option to auto-approve
|
||||
const firstOption = params.options[0];
|
||||
if (firstOption) {
|
||||
return {
|
||||
outcome: { outcome: 'selected', optionId: firstOption.optionId },
|
||||
};
|
||||
}
|
||||
// No options available — cancel
|
||||
return { outcome: { outcome: 'cancelled' } };
|
||||
},
|
||||
|
||||
// File system operations — let the agent handle them directly in the worktree
|
||||
async readTextFile(_params: ReadTextFileRequest): Promise<ReadTextFileResponse> {
|
||||
return { content: '' };
|
||||
},
|
||||
async writeTextFile(_params: WriteTextFileRequest): Promise<WriteTextFileResponse> {
|
||||
return {};
|
||||
},
|
||||
async createTerminal(_params: CreateTerminalRequest): Promise<CreateTerminalResponse> {
|
||||
return { terminalId: 'noop' };
|
||||
},
|
||||
}),
|
||||
stream,
|
||||
);
|
||||
|
||||
// Initialize the connection
|
||||
// ProtocolVersion is a number in this SDK version
|
||||
const initResult = await connection.initialize({
|
||||
protocolVersion: 1,
|
||||
clientInfo: { name: 'boocoder', version: '2.0.1' },
|
||||
clientCapabilities: {},
|
||||
});
|
||||
log.info({ agentInfo: initResult.agentInfo }, 'acp-dispatch: initialized');
|
||||
|
||||
// Create a new session
|
||||
const session = await connection.newSession({
|
||||
cwd: worktreePath,
|
||||
mcpServers: [],
|
||||
});
|
||||
log.info({ sessionId: session.sessionId }, 'acp-dispatch: session created');
|
||||
|
||||
// Send the prompt
|
||||
const promptResult = await connection.prompt({
|
||||
sessionId: session.sessionId,
|
||||
prompt: [{ type: 'text', text: task }],
|
||||
});
|
||||
|
||||
const stopReason = promptResult.stopReason ?? 'end_turn';
|
||||
log.info({ agent, stopReason, toolCallCount: toolCalls.length }, 'acp-dispatch: prompt completed');
|
||||
|
||||
// Clean shutdown
|
||||
await connection.closeSession({ sessionId: session.sessionId }).catch(() => {});
|
||||
|
||||
return {
|
||||
exitCode: 0,
|
||||
output: textChunks.join(''),
|
||||
toolCalls,
|
||||
stopReason,
|
||||
};
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
log.error({ agent, err: message }, 'acp-dispatch: error');
|
||||
return {
|
||||
exitCode: 1,
|
||||
output: message,
|
||||
toolCalls: [],
|
||||
stopReason: 'error',
|
||||
};
|
||||
} finally {
|
||||
if (signal) signal.removeEventListener('abort', cleanup);
|
||||
cleanup();
|
||||
|
||||
// Wait for child to exit
|
||||
await new Promise<void>((resolve) => {
|
||||
child.on('close', resolve);
|
||||
setTimeout(resolve, 3_000);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,6 @@
|
||||
import { execFile } from 'node:child_process';
|
||||
import { promisify } from 'node:util';
|
||||
import type { Sql } from '../db.js';
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
|
||||
const execFileAsync = promisify(execFile);
|
||||
import { sshExec } from './ssh.js';
|
||||
|
||||
const KNOWN_AGENTS: Array<{ name: string; supportsAcp: boolean }> = [
|
||||
{ name: 'opencode', supportsAcp: true },
|
||||
@@ -12,38 +9,60 @@ const KNOWN_AGENTS: Array<{ name: string; supportsAcp: boolean }> = [
|
||||
{ name: 'pi', supportsAcp: false },
|
||||
];
|
||||
|
||||
/**
|
||||
* Probe for available agents on the HOST via SSH.
|
||||
*
|
||||
* The boocoder container can't run agents locally — they live on the host.
|
||||
* We SSH to the host (same mechanism BooTerm uses) and check which agent
|
||||
* binaries are on PATH.
|
||||
*/
|
||||
export async function probeAgents(sql: Sql, log: FastifyBaseLogger): Promise<void> {
|
||||
log.info('agent-probe: scanning PATH for known agents');
|
||||
log.info('agent-probe: scanning HOST for known agents via SSH');
|
||||
|
||||
for (const agent of KNOWN_AGENTS) {
|
||||
try {
|
||||
// Check if the agent binary is on PATH
|
||||
const { stdout: whichOut } = await execFileAsync('which', [agent.name], { timeout: 5_000 });
|
||||
const installPath = whichOut.trim();
|
||||
if (!installPath) continue;
|
||||
// Check if the agent binary is on the host's PATH
|
||||
const whichResult = await sshExec(`which ${agent.name}`, { timeoutMs: 10_000 });
|
||||
const installPath = whichResult.stdout.trim();
|
||||
if (whichResult.exitCode !== 0 || !installPath) continue;
|
||||
|
||||
// Get version
|
||||
let version: string | null = null;
|
||||
try {
|
||||
const { stdout: verOut } = await execFileAsync(agent.name, ['--version'], { timeout: 10_000 });
|
||||
version = verOut.trim().slice(0, 100);
|
||||
const verResult = await sshExec(`${agent.name} --version`, { timeoutMs: 15_000 });
|
||||
if (verResult.exitCode === 0) {
|
||||
version = verResult.stdout.trim().slice(0, 100);
|
||||
}
|
||||
} catch {
|
||||
// Some agents may not support --version — that's fine
|
||||
}
|
||||
|
||||
// For ACP-capable agents, verify ACP mode actually works
|
||||
let supportsAcp = agent.supportsAcp;
|
||||
if (supportsAcp) {
|
||||
try {
|
||||
const acpCheck = await sshExec(`${agent.name} acp --help`, { timeoutMs: 10_000 });
|
||||
supportsAcp = acpCheck.exitCode === 0;
|
||||
} catch {
|
||||
supportsAcp = false;
|
||||
}
|
||||
}
|
||||
|
||||
// UPSERT into available_agents
|
||||
await sql`
|
||||
INSERT INTO available_agents (name, install_path, version, supports_acp, last_probed_at)
|
||||
VALUES (${agent.name}, ${installPath}, ${version}, ${agent.supportsAcp}, clock_timestamp())
|
||||
VALUES (${agent.name}, ${installPath}, ${version}, ${supportsAcp}, clock_timestamp())
|
||||
ON CONFLICT (name) DO UPDATE SET
|
||||
install_path = EXCLUDED.install_path,
|
||||
version = EXCLUDED.version,
|
||||
supports_acp = EXCLUDED.supports_acp,
|
||||
last_probed_at = EXCLUDED.last_probed_at
|
||||
`;
|
||||
log.info({ agent: agent.name, version, installPath }, 'agent-probe: found');
|
||||
} catch {
|
||||
// Agent not found on PATH — skip silently
|
||||
log.info({ agent: agent.name, version, installPath, supportsAcp }, 'agent-probe: found on host');
|
||||
} catch (err) {
|
||||
// SSH failed or agent not found — skip silently
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
log.debug({ agent: agent.name, err: msg }, 'agent-probe: not found or SSH failed');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,9 @@ import type { Sql } from '../db.js';
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { Broker } from '@boocode/server/broker';
|
||||
import type { Config } from '../config.js';
|
||||
import { createWorktree, diffWorktree, cleanupWorktree } from './worktrees.js';
|
||||
import { dispatchViaAcp } from './acp-dispatch.js';
|
||||
import { dispatchViaPty } from './pty-dispatch.js';
|
||||
|
||||
interface InferenceRunner {
|
||||
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
|
||||
@@ -50,7 +53,29 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
||||
|
||||
async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise<void> {
|
||||
const taskId = task.id;
|
||||
log.info({ taskId }, 'dispatcher: starting task');
|
||||
|
||||
// Determine execution path: if agent is specified AND exists in available_agents → Path B
|
||||
if (task.agent) {
|
||||
const [agentRow] = await sql<{ name: string; supports_acp: boolean }[]>`
|
||||
SELECT name, supports_acp FROM available_agents WHERE name = ${task.agent}
|
||||
`;
|
||||
if (agentRow) {
|
||||
await runExternalAgent(task, agentRow.supports_acp);
|
||||
return;
|
||||
}
|
||||
// Agent specified but not available — fall through to Path A with a warning
|
||||
log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native');
|
||||
}
|
||||
|
||||
// Path A — native inference (existing behavior)
|
||||
await runNativeInference(task);
|
||||
}
|
||||
|
||||
// ─── Path A: Native Inference ───────────────────────────────────────────────
|
||||
|
||||
async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise<void> {
|
||||
const taskId = task.id;
|
||||
log.info({ taskId }, 'dispatcher: starting task (path A — native)');
|
||||
|
||||
try {
|
||||
// Mark running
|
||||
@@ -101,7 +126,6 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
||||
const finalStatus = await waitForCompletion(assistantId);
|
||||
|
||||
if (stopping) {
|
||||
// Graceful shutdown — mark cancelled
|
||||
await sql`
|
||||
UPDATE tasks
|
||||
SET state = 'cancelled', ended_at = clock_timestamp()
|
||||
@@ -111,7 +135,6 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
||||
}
|
||||
|
||||
if (finalStatus === 'complete') {
|
||||
// Grab assistant content for output_summary
|
||||
const [msg] = await sql<{ content: string | null }[]>`
|
||||
SELECT content FROM messages WHERE id = ${assistantId}
|
||||
`;
|
||||
@@ -121,9 +144,8 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
||||
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}
|
||||
WHERE id = ${taskId}
|
||||
`;
|
||||
log.info({ taskId }, 'dispatcher: task completed');
|
||||
log.info({ taskId }, 'dispatcher: task completed (native)');
|
||||
} else {
|
||||
// failed or cancelled
|
||||
const [msg] = await sql<{ content: string | null }[]>`
|
||||
SELECT content FROM messages WHERE id = ${assistantId}
|
||||
`;
|
||||
@@ -133,21 +155,176 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
||||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}
|
||||
WHERE id = ${taskId}
|
||||
`;
|
||||
log.warn({ taskId, finalStatus }, 'dispatcher: task failed');
|
||||
log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)');
|
||||
}
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
log.error({ taskId, err: errMsg }, 'dispatcher: task error');
|
||||
log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)');
|
||||
await sql`
|
||||
UPDATE tasks
|
||||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
|
||||
WHERE id = ${taskId}
|
||||
`.catch(() => {}); // best-effort
|
||||
`.catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Path B: External Agent Dispatch ──────<E29480><E29480><EFBFBD>─────────────────────────────────
|
||||
|
||||
async function runExternalAgent(
|
||||
task: { id: string; project_id: string; input: string; agent: string | null; model: string | null },
|
||||
supportsAcp: boolean,
|
||||
): Promise<void> {
|
||||
const taskId = task.id;
|
||||
const agent = task.agent!;
|
||||
const executionPath = supportsAcp ? 'acp' : 'pty';
|
||||
|
||||
log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)');
|
||||
|
||||
// Resolve the project's root path
|
||||
const [project] = await sql<{ root_path: string | null }[]>`
|
||||
SELECT root_path FROM projects WHERE id = ${task.project_id}
|
||||
`;
|
||||
const projectPath = project?.root_path;
|
||||
if (!projectPath) {
|
||||
await sql`
|
||||
UPDATE tasks
|
||||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no root_path — cannot create worktree'
|
||||
WHERE id = ${taskId}
|
||||
`;
|
||||
return;
|
||||
}
|
||||
|
||||
// Create an abort controller for this task
|
||||
const ac = new AbortController();
|
||||
|
||||
try {
|
||||
// Mark running
|
||||
await sql`
|
||||
UPDATE tasks
|
||||
SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath}
|
||||
WHERE id = ${taskId}
|
||||
`;
|
||||
|
||||
// Create session + chat for this task (same as Path A — for output tracking)
|
||||
const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`;
|
||||
const [session] = await sql<{ id: string }[]>`
|
||||
INSERT INTO sessions (project_id, name, model, status)
|
||||
VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open')
|
||||
RETURNING id
|
||||
`;
|
||||
const sessionId = session!.id;
|
||||
|
||||
const [chat] = await sql<{ id: string }[]>`
|
||||
INSERT INTO chats (session_id, name, status)
|
||||
VALUES (${sessionId}, 'External agent execution', 'open')
|
||||
RETURNING id
|
||||
`;
|
||||
const chatId = chat!.id;
|
||||
|
||||
// Link task to session
|
||||
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
|
||||
|
||||
// Create user message for the task input
|
||||
await sql`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
|
||||
`;
|
||||
|
||||
// Step 1: Create worktree
|
||||
log.info({ taskId, projectPath }, 'dispatcher: creating worktree');
|
||||
const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal });
|
||||
log.info({ taskId, worktreePath }, 'dispatcher: worktree created');
|
||||
|
||||
// Step 2: Dispatch to agent
|
||||
let outputSummary: string;
|
||||
|
||||
if (supportsAcp) {
|
||||
const result = await dispatchViaAcp({
|
||||
agent,
|
||||
task: task.input,
|
||||
worktreePath,
|
||||
model: task.model ?? undefined,
|
||||
signal: ac.signal,
|
||||
log,
|
||||
});
|
||||
outputSummary = result.output.slice(0, 500);
|
||||
|
||||
// Store agent output as an assistant message
|
||||
await sql`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chatId}, 'assistant', ${result.output.slice(0, 50_000)}, 'complete', clock_timestamp())
|
||||
`;
|
||||
} else {
|
||||
const result = await dispatchViaPty({
|
||||
agent,
|
||||
task: task.input,
|
||||
worktreePath,
|
||||
model: task.model ?? undefined,
|
||||
signal: ac.signal,
|
||||
log,
|
||||
});
|
||||
outputSummary = (result.stdout || result.stderr).slice(0, 500);
|
||||
|
||||
// Store agent output as an assistant message
|
||||
const content = result.stdout || result.stderr || '(no output)';
|
||||
await sql`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chatId}, 'assistant', ${content.slice(0, 50_000)}, 'complete', clock_timestamp())
|
||||
`;
|
||||
}
|
||||
|
||||
if (stopping) {
|
||||
await sql`
|
||||
UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}
|
||||
`;
|
||||
await cleanupWorktree(projectPath, taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 3: Diff the worktree and queue pending changes
|
||||
log.info({ taskId }, 'dispatcher: diffing worktree');
|
||||
const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal });
|
||||
|
||||
if (diff) {
|
||||
// Queue a single pending_change entry with the full unified diff
|
||||
await sql`
|
||||
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff)
|
||||
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff})
|
||||
`;
|
||||
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change');
|
||||
} else {
|
||||
log.info({ taskId }, 'dispatcher: no changes detected in worktree');
|
||||
}
|
||||
|
||||
// Step 4: Cleanup worktree
|
||||
await cleanupWorktree(projectPath, taskId);
|
||||
|
||||
// Step 5: Mark task completed
|
||||
await sql`
|
||||
UPDATE tasks
|
||||
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}
|
||||
WHERE id = ${taskId}
|
||||
`;
|
||||
log.info({ taskId, agent }, 'dispatcher: task completed (external)');
|
||||
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error');
|
||||
|
||||
await sql`
|
||||
UPDATE tasks
|
||||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
|
||||
WHERE id = ${taskId}
|
||||
`.catch(() => {});
|
||||
|
||||
// Best-effort cleanup
|
||||
await cleanupWorktree(projectPath, taskId);
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
async function waitForCompletion(assistantId: string): Promise<string> {
|
||||
// Poll until the assistant message is no longer streaming
|
||||
for (;;) {
|
||||
if (stopping) return 'cancelled';
|
||||
|
||||
|
||||
139
apps/coder/src/services/pty-dispatch.ts
Normal file
139
apps/coder/src/services/pty-dispatch.ts
Normal file
@@ -0,0 +1,139 @@
|
||||
/**
|
||||
* PTY dispatch — runs external agents on the host via SSH.
|
||||
*
|
||||
* For agents without ACP support (claude, pi), we pipe the task into their
|
||||
* non-interactive mode and capture stdout/stderr. The agent runs in a git
|
||||
* worktree so it can modify files freely.
|
||||
*
|
||||
* Supported agents:
|
||||
* - claude: `claude -p --model <model>` (print mode, reads task from stdin)
|
||||
* - opencode: `echo <task> | opencode` (stdin pipe — exact flags TBD)
|
||||
* - goose: stub (not yet supported)
|
||||
* - pi: stub (not yet supported)
|
||||
*/
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import { sshSpawnWithStdin } from './ssh.js';
|
||||
|
||||
export interface DispatchResult {
|
||||
exitCode: number;
|
||||
stdout: string;
|
||||
stderr: string;
|
||||
}
|
||||
|
||||
export interface PtyDispatchOpts {
|
||||
agent: string;
|
||||
task: string;
|
||||
worktreePath: string;
|
||||
model?: string;
|
||||
signal?: AbortSignal;
|
||||
log: FastifyBaseLogger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the shell command that runs the agent non-interactively.
|
||||
* The command will be executed inside `cd <worktreePath> && ...`.
|
||||
*/
|
||||
function buildAgentCommand(agent: string, task: string, model?: string): string | null {
|
||||
// Escape the task for embedding in a shell command
|
||||
const escapedTask = task.replace(/'/g, "'\\''");
|
||||
|
||||
switch (agent) {
|
||||
case 'claude':
|
||||
// Claude Code's print mode: reads prompt from stdin, runs autonomously, prints result
|
||||
return model
|
||||
? `echo '${escapedTask}' | claude -p --model '${model}'`
|
||||
: `echo '${escapedTask}' | claude -p`;
|
||||
|
||||
case 'opencode':
|
||||
// opencode non-interactive: pipe task via stdin
|
||||
// NOTE: exact flags may vary — opencode may need --non-interactive or --pipe
|
||||
return model
|
||||
? `echo '${escapedTask}' | opencode --model '${model}'`
|
||||
: `echo '${escapedTask}' | opencode`;
|
||||
|
||||
case 'goose':
|
||||
// Not yet verified for non-interactive use
|
||||
return null;
|
||||
|
||||
case 'pi':
|
||||
// Not yet verified for non-interactive use
|
||||
return null;
|
||||
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch a task to an external agent via SSH.
|
||||
*
|
||||
* The agent runs in the worktree directory on the host. stdout/stderr are
|
||||
* captured in full and returned. The SSH process is killed on abort signal.
|
||||
*/
|
||||
export async function dispatchViaPty(opts: PtyDispatchOpts): Promise<DispatchResult> {
|
||||
const { agent, task, worktreePath, model, signal, log } = opts;
|
||||
|
||||
const agentCmd = buildAgentCommand(agent, task, model);
|
||||
if (!agentCmd) {
|
||||
return {
|
||||
exitCode: 1,
|
||||
stdout: '',
|
||||
stderr: `Agent '${agent}' is not yet supported for PTY dispatch.`,
|
||||
};
|
||||
}
|
||||
|
||||
// Wrap in cd to the worktree
|
||||
const fullCommand = `cd '${worktreePath.replace(/'/g, "'\\''")}' && ${agentCmd}`;
|
||||
|
||||
log.info({ agent, worktreePath }, 'pty-dispatch: starting');
|
||||
|
||||
return new Promise<DispatchResult>((resolve, reject) => {
|
||||
const child = sshSpawnWithStdin(fullCommand, '');
|
||||
// Note: sshSpawnWithStdin already closes stdin. For agents that read from
|
||||
// stdin via echo piping, the command itself handles the piping on the remote
|
||||
// side. We just need the SSH tunnel.
|
||||
|
||||
// Actually, re-think: sshSpawnWithStdin writes input and closes stdin on the
|
||||
// LOCAL ssh process. But the remote command is `echo '...' | agent`, which
|
||||
// provides its own stdin. So we should use sshSpawn (no local stdin needed)
|
||||
// or just let the empty stdin close — the remote shell handles piping internally.
|
||||
// This is fine as-is because the echo piping happens WITHIN the remote shell command.
|
||||
|
||||
let stdout = '';
|
||||
let stderr = '';
|
||||
let killed = false;
|
||||
|
||||
child.stdout!.on('data', (chunk: Buffer) => { stdout += chunk.toString(); });
|
||||
child.stderr!.on('data', (chunk: Buffer) => { stderr += chunk.toString(); });
|
||||
|
||||
const cleanup = () => {
|
||||
if (!killed) {
|
||||
killed = true;
|
||||
child.kill('SIGTERM');
|
||||
// Give it a moment then force-kill
|
||||
setTimeout(() => child.kill('SIGKILL'), 5_000);
|
||||
}
|
||||
};
|
||||
|
||||
if (signal) {
|
||||
if (signal.aborted) {
|
||||
cleanup();
|
||||
resolve({ exitCode: 130, stdout: '', stderr: 'Aborted before start' });
|
||||
return;
|
||||
}
|
||||
signal.addEventListener('abort', cleanup, { once: true });
|
||||
}
|
||||
|
||||
child.on('close', (code) => {
|
||||
if (signal) signal.removeEventListener('abort', cleanup);
|
||||
log.info({ agent, exitCode: code }, 'pty-dispatch: completed');
|
||||
resolve({ exitCode: code ?? 1, stdout, stderr });
|
||||
});
|
||||
|
||||
child.on('error', (err) => {
|
||||
if (signal) signal.removeEventListener('abort', cleanup);
|
||||
log.error({ agent, err: err.message }, 'pty-dispatch: spawn error');
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
126
apps/coder/src/services/ssh.ts
Normal file
126
apps/coder/src/services/ssh.ts
Normal file
@@ -0,0 +1,126 @@
|
||||
/**
|
||||
* SSH helper — spawns commands on the host via SSH.
|
||||
*
|
||||
* BooCode's container cannot directly spawn host processes (opencode, goose, claude, pi).
|
||||
* They live on the HOST at /usr/local/bin/ or Sam's PATH. We SSH to the host over the
|
||||
* Tailscale IP (same mechanism BooTerm uses: samkintop@100.114.205.53).
|
||||
*/
|
||||
import { spawn, type ChildProcess } from 'node:child_process';
|
||||
|
||||
export const SSH_HOST = process.env.BOOCODER_SSH_HOST ?? '100.114.205.53';
|
||||
export const SSH_USER = process.env.BOOCODER_SSH_USER ?? 'samkintop';
|
||||
|
||||
/** Common SSH args — strict host checking disabled for container-to-host trust. */
|
||||
const SSH_BASE_ARGS = [
|
||||
'-o', 'StrictHostKeyChecking=no',
|
||||
'-o', 'UserKnownHostsFile=/dev/null',
|
||||
'-o', 'LogLevel=ERROR',
|
||||
'-o', 'BatchMode=yes',
|
||||
];
|
||||
|
||||
export interface SshExecResult {
|
||||
exitCode: number;
|
||||
stdout: string;
|
||||
stderr: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a command on the host via SSH, collecting all output.
|
||||
* Returns when the remote process exits.
|
||||
*/
|
||||
export async function sshExec(
|
||||
command: string,
|
||||
opts?: { signal?: AbortSignal; timeoutMs?: number },
|
||||
): Promise<SshExecResult> {
|
||||
return new Promise<SshExecResult>((resolve, reject) => {
|
||||
const child = spawn('ssh', [
|
||||
...SSH_BASE_ARGS,
|
||||
`${SSH_USER}@${SSH_HOST}`,
|
||||
command,
|
||||
], {
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
});
|
||||
|
||||
let stdout = '';
|
||||
let stderr = '';
|
||||
let killed = false;
|
||||
|
||||
child.stdout!.on('data', (chunk: Buffer) => { stdout += chunk.toString(); });
|
||||
child.stderr!.on('data', (chunk: Buffer) => { stderr += chunk.toString(); });
|
||||
|
||||
const cleanup = () => {
|
||||
if (!killed) {
|
||||
killed = true;
|
||||
child.kill('SIGTERM');
|
||||
}
|
||||
};
|
||||
|
||||
// Abort signal
|
||||
if (opts?.signal) {
|
||||
if (opts.signal.aborted) {
|
||||
cleanup();
|
||||
reject(new Error('SSH exec aborted before start'));
|
||||
return;
|
||||
}
|
||||
opts.signal.addEventListener('abort', cleanup, { once: true });
|
||||
}
|
||||
|
||||
// Timeout
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
if (opts?.timeoutMs) {
|
||||
timer = setTimeout(() => {
|
||||
cleanup();
|
||||
reject(new Error(`SSH exec timed out after ${opts.timeoutMs}ms`));
|
||||
}, opts.timeoutMs);
|
||||
}
|
||||
|
||||
child.on('close', (code) => {
|
||||
if (timer) clearTimeout(timer);
|
||||
if (opts?.signal) opts.signal.removeEventListener('abort', cleanup);
|
||||
resolve({ exitCode: code ?? 1, stdout, stderr });
|
||||
});
|
||||
|
||||
child.on('error', (err) => {
|
||||
if (timer) clearTimeout(timer);
|
||||
if (opts?.signal) opts.signal.removeEventListener('abort', cleanup);
|
||||
reject(err);
|
||||
});
|
||||
|
||||
// Close stdin immediately — we're not sending input via sshExec
|
||||
child.stdin!.end();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn an SSH child process with a command on the host.
|
||||
* Returns the raw ChildProcess for callers that need streaming I/O (ACP, PTY).
|
||||
*/
|
||||
export function sshSpawn(command: string): ChildProcess {
|
||||
return spawn('ssh', [
|
||||
...SSH_BASE_ARGS,
|
||||
`${SSH_USER}@${SSH_HOST}`,
|
||||
command,
|
||||
], {
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn an SSH child process that pipes stdin through.
|
||||
* Used for agents that read a task from stdin (e.g. `echo "task" | claude -p`).
|
||||
*/
|
||||
export function sshSpawnWithStdin(command: string, input: string): ChildProcess {
|
||||
const child = spawn('ssh', [
|
||||
...SSH_BASE_ARGS,
|
||||
`${SSH_USER}@${SSH_HOST}`,
|
||||
command,
|
||||
], {
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
});
|
||||
|
||||
// Write the input and close stdin
|
||||
child.stdin!.write(input);
|
||||
child.stdin!.end();
|
||||
|
||||
return child;
|
||||
}
|
||||
118
apps/coder/src/services/worktrees.ts
Normal file
118
apps/coder/src/services/worktrees.ts
Normal file
@@ -0,0 +1,118 @@
|
||||
/**
|
||||
* Git worktree management for external agent dispatch.
|
||||
*
|
||||
* Each dispatched task gets its own git worktree so the external agent
|
||||
* can modify files freely without touching the main working tree.
|
||||
* After the agent completes, we diff the worktree against HEAD and
|
||||
* queue the diff into pending_changes.
|
||||
*/
|
||||
import { sshExec } from './ssh.js';
|
||||
|
||||
const WORKTREE_BASE = '/tmp/booworktrees';
|
||||
|
||||
/**
|
||||
* Create a git worktree for a task on the host.
|
||||
* Returns the absolute path to the worktree directory.
|
||||
*/
|
||||
export async function createWorktree(
|
||||
projectPath: string,
|
||||
taskId: string,
|
||||
opts?: { signal?: AbortSignal },
|
||||
): Promise<string> {
|
||||
const worktreePath = `${WORKTREE_BASE}/${taskId}`;
|
||||
const branchName = `task-${taskId}`;
|
||||
|
||||
// Ensure the base directory exists
|
||||
await sshExec(`mkdir -p ${WORKTREE_BASE}`, { signal: opts?.signal });
|
||||
|
||||
// Create the worktree with a new branch from HEAD
|
||||
const result = await sshExec(
|
||||
`git -C ${shellEscape(projectPath)} worktree add ${shellEscape(worktreePath)} -b ${shellEscape(branchName)} HEAD`,
|
||||
{ signal: opts?.signal, timeoutMs: 30_000 },
|
||||
);
|
||||
|
||||
if (result.exitCode !== 0) {
|
||||
throw new Error(`Failed to create worktree: ${result.stderr.trim() || result.stdout.trim()}`);
|
||||
}
|
||||
|
||||
return worktreePath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the unified diff of changes made in the worktree vs the parent branch (HEAD).
|
||||
* Returns an empty string if there are no changes.
|
||||
*/
|
||||
export async function diffWorktree(
|
||||
worktreePath: string,
|
||||
projectPath: string,
|
||||
opts?: { signal?: AbortSignal },
|
||||
): Promise<string> {
|
||||
// First, commit any uncommitted changes in the worktree so we can diff branches
|
||||
// Stage all changes
|
||||
const addResult = await sshExec(
|
||||
`cd ${shellEscape(worktreePath)} && git add -A`,
|
||||
{ signal: opts?.signal, timeoutMs: 30_000 },
|
||||
);
|
||||
if (addResult.exitCode !== 0) {
|
||||
throw new Error(`Failed to stage worktree changes: ${addResult.stderr.trim()}`);
|
||||
}
|
||||
|
||||
// Check if there are staged changes
|
||||
const statusResult = await sshExec(
|
||||
`cd ${shellEscape(worktreePath)} && git diff --cached --quiet`,
|
||||
{ signal: opts?.signal, timeoutMs: 10_000 },
|
||||
);
|
||||
|
||||
if (statusResult.exitCode === 0) {
|
||||
// No changes
|
||||
return '';
|
||||
}
|
||||
|
||||
// Commit staged changes (needed to produce a clean branch diff)
|
||||
await sshExec(
|
||||
`cd ${shellEscape(worktreePath)} && git -c user.email=boocoder@local -c user.name=BooCoder commit -m "task changes" --allow-empty`,
|
||||
{ signal: opts?.signal, timeoutMs: 15_000 },
|
||||
);
|
||||
|
||||
// Diff the worktree branch against the parent commit (HEAD of main tree)
|
||||
const diffResult = await sshExec(
|
||||
`git -C ${shellEscape(projectPath)} diff HEAD...$(git -C ${shellEscape(worktreePath)} rev-parse HEAD)`,
|
||||
{ signal: opts?.signal, timeoutMs: 60_000 },
|
||||
);
|
||||
|
||||
if (diffResult.exitCode !== 0) {
|
||||
throw new Error(`Failed to diff worktree: ${diffResult.stderr.trim()}`);
|
||||
}
|
||||
|
||||
return diffResult.stdout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a worktree and its associated branch.
|
||||
* Best-effort — does not throw on failure (task may have already been cleaned up).
|
||||
*/
|
||||
export async function cleanupWorktree(
|
||||
projectPath: string,
|
||||
taskId: string,
|
||||
): Promise<void> {
|
||||
const worktreePath = `${WORKTREE_BASE}/${taskId}`;
|
||||
const branchName = `task-${taskId}`;
|
||||
|
||||
// Remove the worktree (--force handles dirty state)
|
||||
await sshExec(
|
||||
`git -C ${shellEscape(projectPath)} worktree remove ${shellEscape(worktreePath)} --force`,
|
||||
{ timeoutMs: 15_000 },
|
||||
).catch(() => {});
|
||||
|
||||
// Delete the task branch
|
||||
await sshExec(
|
||||
`git -C ${shellEscape(projectPath)} branch -D ${shellEscape(branchName)}`,
|
||||
{ timeoutMs: 10_000 },
|
||||
).catch(() => {});
|
||||
}
|
||||
|
||||
/** Minimal shell escape for paths (single-quote wrapping). */
|
||||
function shellEscape(s: string): string {
|
||||
// Replace single quotes with escaped version, wrap in single quotes
|
||||
return "'" + s.replace(/'/g, "'\\''") + "'";
|
||||
}
|
||||
12
pnpm-lock.yaml
generated
12
pnpm-lock.yaml
generated
@@ -48,6 +48,9 @@ importers:
|
||||
|
||||
apps/coder:
|
||||
dependencies:
|
||||
'@agentclientprotocol/sdk':
|
||||
specifier: ^0.22.1
|
||||
version: 0.22.1(zod@3.25.76)
|
||||
'@boocode/server':
|
||||
specifier: workspace:*
|
||||
version: link:../server
|
||||
@@ -268,6 +271,11 @@ importers:
|
||||
|
||||
packages:
|
||||
|
||||
'@agentclientprotocol/sdk@0.22.1':
|
||||
resolution: {integrity: sha512-DfqXtl/8gO9NImq094MTaCXEU2vkhh6v7q/kT+9UjZxUqj8hYaya2OjLVIqn16MzNHcXEpShTR2RIauLSYeDQQ==}
|
||||
peerDependencies:
|
||||
zod: ^3.25.0 || ^4.0.0
|
||||
|
||||
'@ai-sdk/gateway@3.0.119':
|
||||
resolution: {integrity: sha512-VAhfRWC+JexZakkVfmjaJKaTj00x7/UHdE8kMWL3NhuQAlf8oXtg9r4dfvFZrByXxchGRBvYE3biEUyibkg0xg==}
|
||||
engines: {node: '>=18'}
|
||||
@@ -4097,6 +4105,10 @@ packages:
|
||||
|
||||
snapshots:
|
||||
|
||||
'@agentclientprotocol/sdk@0.22.1(zod@3.25.76)':
|
||||
dependencies:
|
||||
zod: 3.25.76
|
||||
|
||||
'@ai-sdk/gateway@3.0.119(zod@3.25.76)':
|
||||
dependencies:
|
||||
'@ai-sdk/provider': 3.0.10
|
||||
|
||||
Reference in New Issue
Block a user