/** * PTY dispatch — runs external agents directly on the host. * * claude + qwen run with `--output-format stream-json` and emit Claude-Code's * stream-json NDJSON on stdout. When an `onEvent` callback is supplied we * line-buffer that stdout (split on `\n`, hold the partial tail) and feed complete * lines to `makeStreamJsonParser` so deltas surface live as AgentEvents. The raw * stdout is still accumulated + returned for back-compat (and the dispatcher's * fallback when nothing parsed). See `stream-json-parser.ts`. */ import type { FastifyBaseLogger } from 'fastify'; import { spawn } from 'node:child_process'; import type { AgentEvent } from './agent-backend.js'; import { makeStreamJsonParser, type StreamJsonUsage } from './stream-json-parser.js'; export interface DispatchResult { exitCode: number; stdout: string; stderr: string; /** True iff at least one NDJSON AgentEvent was parsed from stdout (v#7). When * false the dispatcher falls back to slicing stdout as the assistant content. */ streamed: boolean; /** Final usage parsed from the stream-json `result` / `message_delta`, if any. */ usage?: StreamJsonUsage; /** Provider session id from the stream-json `system` init line, if any. */ agentSessionId?: string | null; } export interface PtyDispatchOpts { agent: string; task: string; worktreePath: string; model?: string; modeId?: string; thinkingOptionId?: string; installPath?: string; signal?: AbortSignal; log: FastifyBaseLogger; /** Optional live event sink. When set, stdout is line-buffered + NDJSON-parsed * and each AgentEvent is forwarded here as it arrives. Absent → opaque (old) * behavior: stdout is accumulated and returned, no parsing. */ onEvent?: (e: AgentEvent) => void; } interface PtySpawnSpec { binary: string; args: string[]; stdin?: string; } function buildPtySpawnSpec( agent: string, task: string, model?: string, modeId?: string, thinkingOptionId?: string, installPath?: string, ): PtySpawnSpec | null { const binary = installPath ?? agent; switch (agent) { case 'claude': { // stream-json on -p requires --verbose (Claude Code rejects stream-json // print mode without it). qwen needs no such flag. const args = ['-p', '--output-format', 'stream-json', '--verbose']; if (model) args.push('--model', model); if (modeId) args.push('--permission-mode', modeId); if (thinkingOptionId) args.push('--effort', thinkingOptionId); return { binary, args, stdin: task }; } case 'qwen': { const args = ['-p', task, '--output-format', 'stream-json']; if (model) args.push('--model', model); if (modeId) args.push('--approval-mode', modeId); return { binary, args }; } case 'opencode': return { binary, args: model ? ['--model', model] : [], stdin: task, }; case 'goose': return { binary, args: model ? ['run', '--text', task, '--model', model] : ['run', '--text', task], }; default: return null; } } export async function dispatchViaPty(opts: PtyDispatchOpts): Promise { const { agent, task, worktreePath, model, modeId, thinkingOptionId, installPath, signal, log, onEvent } = opts; const cmd = buildPtySpawnSpec(agent, task, model, modeId, thinkingOptionId, installPath); if (!cmd) { return { exitCode: 1, stdout: '', stderr: `Agent '${agent}' is not yet supported for PTY dispatch.`, streamed: false, }; } log.info({ agent, binary: cmd.binary, worktreePath, modeId }, 'pty-dispatch: starting'); return new Promise((resolve, reject) => { const child = spawn(cmd.binary, cmd.args, { cwd: worktreePath, stdio: ['pipe', 'pipe', 'pipe'], env: { ...process.env }, }); if (cmd.stdin) { child.stdin!.write(cmd.stdin); } child.stdin!.end(); let stdout = ''; let stderr = ''; let killed = false; // Live NDJSON parsing (only when a sink is supplied). Line-buffer: split on // '\n', dispatch complete lines, hold the partial tail until the next chunk. const parser = onEvent ? makeStreamJsonParser() : null; let lineBuf = ''; let streamed = false; const feedLine = (line: string): void => { if (!parser || !onEvent) return; for (const e of parser.push(line)) { streamed = true; onEvent(e); } }; child.stdout!.on('data', (chunk: Buffer) => { const text = chunk.toString(); stdout += text; if (!parser) return; lineBuf += text; let nl = lineBuf.indexOf('\n'); while (nl !== -1) { const line = lineBuf.slice(0, nl); lineBuf = lineBuf.slice(nl + 1); feedLine(line); nl = lineBuf.indexOf('\n'); } }); child.stderr!.on('data', (chunk: Buffer) => { stderr += chunk.toString(); }); const cleanup = () => { if (!killed) { killed = true; child.kill('SIGTERM'); setTimeout(() => child.kill('SIGKILL'), 5_000); } }; if (signal) { if (signal.aborted) { cleanup(); resolve({ exitCode: 130, stdout: '', stderr: 'Aborted before start', streamed: false }); return; } signal.addEventListener('abort', cleanup, { once: true }); } child.on('close', (code) => { if (signal) signal.removeEventListener('abort', cleanup); // Flush any final line with no trailing newline. if (lineBuf.trim()) feedLine(lineBuf); lineBuf = ''; log.info({ agent, exitCode: code, streamed }, 'pty-dispatch: completed'); resolve({ exitCode: code ?? 1, stdout, stderr, streamed, usage: parser?.usage(), agentSessionId: parser?.sessionId() ?? null, }); }); child.on('error', (err) => { if (signal) signal.removeEventListener('abort', cleanup); log.error({ agent, err: err.message }, 'pty-dispatch: spawn error'); reject(err); }); }); }