diff --git a/apps/coder/package.json b/apps/coder/package.json index 3729109..e1c0a5c 100644 --- a/apps/coder/package.json +++ b/apps/coder/package.json @@ -16,6 +16,7 @@ "@agentclientprotocol/sdk": "^0.22.1", "@boocode/server": "workspace:*", "@fastify/static": "^7.0.4", + "@opencode-ai/sdk": "~1.15.0", "@fastify/websocket": "^10.0.1", "@modelcontextprotocol/sdk": "^1.29.0", "fastify": "^4.28.1", diff --git a/apps/coder/src/schema.sql b/apps/coder/src/schema.sql index 9e22c64..9052d7c 100644 --- a/apps/coder/src/schema.sql +++ b/apps/coder/src/schema.sql @@ -99,6 +99,9 @@ CREATE TABLE IF NOT EXISTS agent_sessions ( CONSTRAINT agent_sessions_status_chk CHECK (status IN ('idle', 'active', 'crashed', 'closed')) ); +-- v2.6: config fingerprint for stale-session detection (auto-recover on model change). +ALTER TABLE agent_sessions ADD COLUMN IF NOT EXISTS config_hash TEXT; + -- v2.6: attribution for DiffPanel badges (Phase 1 UX reads this). ALTER TABLE pending_changes ADD COLUMN IF NOT EXISTS agent TEXT; diff --git a/apps/coder/src/services/agent-probe.ts b/apps/coder/src/services/agent-probe.ts index f0a77cb..de35b6b 100644 --- a/apps/coder/src/services/agent-probe.ts +++ b/apps/coder/src/services/agent-probe.ts @@ -4,7 +4,7 @@ import { exec as execCb, execFile as execFileCb } from 'node:child_process'; import { promisify } from 'node:util'; import { PROVIDERS_BY_NAME } from './provider-registry.js'; import { resolveAcpProbeBinaries } from './acp-spawn.js'; -import { clearProviderSnapshotCache } from './provider-snapshot.js'; +import { clearProviderSnapshotCache, fetchLlamaSwapModels, prefixLlamaSwapModels } from './provider-snapshot.js'; import { readQwenSettingsModels } from './qwen-settings.js'; import { loadConfig } from '../config.js'; import { loadProviderConfig } from './provider-config-registry.js'; @@ -117,6 +117,15 @@ export async function probeAgents(sql: Sql, log: FastifyBaseLogger): Promise }; the + * real event is chunk.payload (discriminate on chunk.payload.type). + * - promptAsync is fire-and-forget (204); the turn completes via a + * 'session.idle' event for that opencode session id. + */ +import { spawn, type ChildProcess } from 'node:child_process'; +import { createHash } from 'node:crypto'; +import { createServer } from 'node:net'; +import type { FastifyBaseLogger } from 'fastify'; +import { + createOpencodeClient, + type OpencodeClient, + type Event, + type Part, + type ToolPart, + type ToolState, + type AssistantMessage, +} from '@opencode-ai/sdk/v2/client'; +import type { ToolCallStatus } from '@agentclientprotocol/sdk'; +import type { Sql } from '../../db.js'; +import type { AcpToolSnapshot } from '../acp-tool-snapshot.js'; +import type { + AgentBackend, + AgentEvent, + AgentSessionHandle, + EnsureSessionOpts, + PromptCtx, + TurnResult, +} from '../agent-backend.js'; + +const READY_TIMEOUT_MS = 30_000; +const SSE_RECONNECT_DELAY_MS = 1_000; +/** + * No-activity backstop for an in-flight turn. opencode streams reasoning/text/tool + * deltas continuously while working, so "zero events for this long" means the turn + * is wedged or its terminal event (session.idle) was lost (see the reconnect race + * below). Generous so a legitimately slow turn never trips it. + */ +const TURN_INACTIVITY_MS = 180_000; + +/** One in-flight turn's emitter + completion settler. */ +interface TurnState { + onEvent: (e: AgentEvent) => void; + settle: (r: TurnResult) => void; +} + +/** Per-(opencode session) demux state. dedup sets scoped here, cleared per turn. */ +interface SessionState { + boocodeSessionId: string; + agentSessionId: string; + /** Worktree directory for SDK `directory` routing; refreshed each turn from ctx. */ + worktreePath: string; + /** dedup gate: `${type}:${id}` added on delta, deleted-and-tested on updated. Cleared at turn end. */ + streamedPartKeys: Set; + /** partID → 'text' | 'reasoning', so a delta with a non-'reasoning' field is still classed right. Cleared at turn end. */ + partTypeById: Map; + activeTurn: TurnState | null; + /** Inactivity backstop timer for the active turn; null when no turn in flight. */ + watchdog: ReturnType | null; +} + +export interface OpenCodeServerBackendDeps { + sql: Sql; + log: FastifyBaseLogger; + /** Absolute path to the opencode binary (resolved from available_agents at wiring time, Phase 1.7). */ + opencodeBinary: string; +} + +export class OpenCodeServerBackend implements AgentBackend { + readonly backend = 'opencode_server' as const; + + private readonly sql: Sql; + private readonly log: FastifyBaseLogger; + private readonly opencodeBinary: string; + + private child: ChildProcess | null = null; + private client: OpencodeClient | null = null; + private port: number | null = null; + private up = false; + private serverStarting: Promise | null = null; + private sseRunning = false; + + /** opencode session id → demux state. Maintained by ensureSession; read by the SSE loop. */ + private readonly byOpencodeId = new Map(); + + constructor(deps: OpenCodeServerBackendDeps) { + this.sql = deps.sql; + this.log = deps.log; + this.opencodeBinary = deps.opencodeBinary; + } + + /** §2: liveness for the health endpoint + dispatcher fallback decision. */ + health(): 'up' | 'down' { + return this.up ? 'up' : 'down'; + } + + // ─── Server lifecycle (1.2: spawn once + client + ready) ───────────────────── + + /** Lazy: start the single server on first use. Idempotent — one server per backend. */ + private ensureServer(): Promise { + if (!this.serverStarting) this.serverStarting = this.startServer(); + return this.serverStarting; + } + + private async startServer(): Promise { + const port = await freePort(); + + // Phase 1: run unsecured on loopback (opencode's documented default — serve.ts + // only WARNS when OPENCODE_SERVER_PASSWORD is unset). The real boundary is the + // 127.0.0.1 bind. Defense-in-depth basic-auth is deferred: the hey-api client's + // auth wiring + opencode's exact scheme must be confirmed against a live server + // first, else every request 401s. Recon explicitly said "do NOT block on it". + const child = spawn(this.opencodeBinary, ['serve', '--hostname', '127.0.0.1', '--port', String(port)], { + stdio: ['ignore', 'pipe', 'pipe'], + env: { ...process.env }, + }); + this.child = child; + this.port = port; + + // Child lifetime is the backend's (the pool's), NOT a request's. We never tie + // it to a per-turn abort signal. On unexpected exit we mark down + log; crash + // recovery is Phase 3. + child.on('exit', (code, signal) => { + this.up = false; + this.log.warn({ code, signal, port }, 'opencode-server: child exited (recovery is Phase 3)'); + }); + + await waitForReady(child, READY_TIMEOUT_MS); + + this.client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${port}` }); + this.up = true; + this.log.info({ port }, 'opencode-server: ready'); + } + + // ─── SSE read loop + demux + translate (1.3) + dedup (1.4) ─────────────────── + + /** Per-directory SSE subscription. opencode scopes events by directory (defaults + * to process.cwd if omitted) — so we must subscribe with the same directory used + * to create the session. Called from ensureSession; reconnects while up. */ + private startEventLoop(directory: string): void { + if (this.sseRunning) return; + this.sseRunning = true; + this.sseDirectory = directory; + void this.runEventLoop(directory); + } + + private sseDirectory: string | null = null; + + private async runEventLoop(directory: string): Promise { + while (this.up && this.client) { + try { + const sub = await this.client.event.subscribe({ directory }); + for await (const ev of sub.stream) { + this.dispatchEvent(ev); + } + if (this.up) { + await this.reconcileInFlight(); + await sleep(SSE_RECONNECT_DELAY_MS); + } + } catch (err) { + if (!this.up) break; + this.log.warn({ err: errMsg(err) }, 'opencode-server: event loop error; reconnecting'); + await this.reconcileInFlight(); + await sleep(SSE_RECONNECT_DELAY_MS); + } + } + this.sseRunning = false; + } + + /** Demux one event to the owning session's active turn. Unknown/between-turns → drop. */ + private dispatchEvent(ev: Event): void { + switch (ev.type) { + // ─── session.next.* — live streaming events (the primary path) ───────── + case 'session.next.text.delta': { + const p = ev.properties; + const st = this.byOpencodeId.get(p.sessionID); + if (!st?.activeTurn) return; + this.bumpActivity(st); + st.activeTurn.onEvent({ type: 'text', text: p.delta }); + return; + } + case 'session.next.reasoning.delta': { + const p = ev.properties; + const st = this.byOpencodeId.get(p.sessionID); + if (!st?.activeTurn) return; + this.bumpActivity(st); + st.activeTurn.onEvent({ type: 'reasoning', text: p.delta }); + return; + } + case 'session.next.tool.called': { + const p = ev.properties; + const st = this.byOpencodeId.get(p.sessionID); + if (!st?.activeTurn) return; + this.bumpActivity(st); + const snap: AcpToolSnapshot = { + toolCallId: p.callID, + title: p.tool, + kind: null, + status: 'in_progress', + rawInput: p.input, + rawOutput: undefined, + }; + st.activeTurn.onEvent({ type: 'tool_call', toolCall: snap }); + return; + } + case 'session.next.tool.success': { + const p = ev.properties; + const st = this.byOpencodeId.get(p.sessionID); + if (!st?.activeTurn) return; + this.bumpActivity(st); + const output = p.content?.map((c) => ('text' in c ? (c as { text: string }).text : '')).join('') ?? ''; + const snap: AcpToolSnapshot = { + toolCallId: p.callID, + title: p.callID, + kind: null, + status: 'completed', + rawInput: undefined, + rawOutput: output, + }; + st.activeTurn.onEvent({ type: 'tool_update', toolCall: snap }); + return; + } + case 'session.next.tool.failed': { + const p = ev.properties; + const st = this.byOpencodeId.get(p.sessionID); + if (!st?.activeTurn) return; + this.bumpActivity(st); + const snap: AcpToolSnapshot = { + toolCallId: p.callID, + title: p.callID, + kind: null, + status: 'failed', + rawInput: undefined, + rawOutput: errToString(p.error), + }; + st.activeTurn.onEvent({ type: 'tool_update', toolCall: snap }); + return; + } + // ─── message.part.* — terminal/post-hoc events (dedup gate) ──────────── + case 'message.part.delta': { + const p = ev.properties; + const st = this.byOpencodeId.get(p.sessionID); + if (!st?.activeTurn) return; + this.bumpActivity(st); + const isReasoning = p.field === 'reasoning' || st.partTypeById.get(p.partID) === 'reasoning'; + if (isReasoning) { + st.streamedPartKeys.add(`reasoning:${p.partID}`); + st.activeTurn.onEvent({ type: 'reasoning', text: p.delta }); + } else if (p.field === 'text') { + st.streamedPartKeys.add(`text:${p.partID}`); + st.activeTurn.onEvent({ type: 'text', text: p.delta }); + } + return; + } + case 'message.part.updated': { + const part = ev.properties.part; + const st = this.byOpencodeId.get(part.sessionID); + if (!st?.activeTurn) return; + this.bumpActivity(st); + this.handleUpdatedPart(part, st); + return; + } + // ─── lifecycle ───────────────────────────────────────────────────────── + case 'session.idle': { + this.byOpencodeId.get(ev.properties.sessionID)?.activeTurn?.settle({ ok: true }); + return; + } + case 'session.error': { + const sid = ev.properties.sessionID; + if (!sid) return; + this.byOpencodeId.get(sid)?.activeTurn?.settle({ ok: false, error: errToString(ev.properties.error) }); + return; + } + default: + return; + } + } + + /** Terminal part: dedup gate for text/reasoning; tool parts → tool_call/tool_update. */ + private handleUpdatedPart(part: Part, st: SessionState): void { + const turn = st.activeTurn; + if (!turn) return; + + if (part.type === 'text' || part.type === 'reasoning') { + st.partTypeById.set(part.id, part.type); + const key = resolvePartDedupeKey(part, part.type); + if (key && st.streamedPartKeys.delete(key)) return; // already streamed via delta + const text = part.text ?? ''; + if (text && part.time?.end != null) { + turn.onEvent({ type: part.type, text }); + } + return; + } + + if (part.type === 'tool') { + const snap = toolPartToSnapshot(part); + const status = part.state?.status; + // tool_call on start (pending/running), tool_update on terminal (completed/error). + // The current ACP path merges both into one frame; the contract keeps them + // distinct because opencode's SSE distinguishes start from result. + const event: AgentEvent = + status === 'completed' || status === 'error' + ? { type: 'tool_update', toolCall: snap } + : { type: 'tool_call', toolCall: snap }; + turn.onEvent(event); + return; + } + // NOTE: opencode's SSE payload union carries no available-commands event, so the + // AgentEvent 'commands' arm is intentionally never emitted here (1.3). + } + + // ─── turn-completion resilience (watchdog + reconnect reconcile) ───────────── + + /** Reset the inactivity backstop on any event routed to a session's active turn. */ + private bumpActivity(st: SessionState): void { + if (!st.activeTurn) return; + if (st.watchdog) clearTimeout(st.watchdog); + st.watchdog = setTimeout(() => { + void this.onTurnStall(st); + }, TURN_INACTIVITY_MS); + st.watchdog.unref?.(); + } + + /** Watchdog fired: reconcile once; if the server says still-running we can't tell, so fail closed. + * Also mark the agent_sessions row crashed so a stale session isn't resumed next turn. */ + private async onTurnStall(st: SessionState): Promise { + const settled = await this.reconcile(st); + if (!settled) { + this.log.warn({ agentSessionId: st.agentSessionId }, 'opencode-server: turn stalled (no activity), failing + marking crashed'); + await this.sql` + UPDATE agent_sessions SET status = 'crashed' + WHERE agent_session_id = ${st.agentSessionId} + `.catch(() => {}); + st.activeTurn?.settle({ ok: false, error: 'turn timed out (no activity)' }); + } + } + + /** Reconcile every in-flight turn against the server (called after an SSE drop). */ + private async reconcileInFlight(): Promise { + const states = [...this.byOpencodeId.values()].filter((s) => s.activeTurn); + if (states.length === 0) return; + await Promise.allSettled(states.map((s) => this.reconcile(s))); + } + + /** + * Ask the server whether this session's turn already finished — recovers a + * session.idle/error lost during an SSE gap. Returns true if it settled the turn. + * Inconclusive (still running / call failed) → false; the watchdog covers that. + */ + private async reconcile(st: SessionState): Promise { + const turn = st.activeTurn; + if (!turn || !this.client) return false; + try { + const res = await this.client.session.messages({ + sessionID: st.agentSessionId, + directory: st.worktreePath, + }); + if (res.error || !res.data) return false; + let lastAssistant: AssistantMessage | undefined; + for (let i = res.data.length - 1; i >= 0; i--) { + const info = res.data[i]!.info; + if (info.role === 'assistant') { + lastAssistant = info; + break; + } + } + if (!lastAssistant) return false; + if (lastAssistant.error != null) { + turn.settle({ ok: false, error: errToString(lastAssistant.error) }); + return true; + } + if (lastAssistant.time.completed != null) { + turn.settle({ ok: true }); + return true; + } + return false; // still running — the live stream will deliver session.idle + } catch { + return false; // inconclusive — watchdog backstop covers it + } + } + + // ─── ensureSession: create-or-resume against agent_sessions (1.5) ──────────── + + async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise { + await this.ensureServer(); + if (!this.client) throw new Error('opencode-server: client not ready after ensureServer'); + + const configHash = sessionConfigHash(opts.model); + const [row] = await this.sql<{ agent_session_id: string | null; status: string; config_hash: string | null }[]>` + SELECT agent_session_id, status, config_hash FROM agent_sessions + WHERE session_id = ${sessionId} AND agent = ${opts.agent} + `; + let agentSessionId = row?.agent_session_id ?? null; + + // Don't resume crashed sessions or sessions whose config drifted (model change). + const shouldResume = agentSessionId + && row!.status !== 'crashed' + && (row!.config_hash == null || row!.config_hash === configHash); + + if (!shouldResume) { + if (agentSessionId) { + this.log.info({ sessionId, oldStatus: row!.status, hashMatch: row!.config_hash === configHash }, + 'opencode-server: not resuming stale session, creating fresh'); + this.byOpencodeId.delete(agentSessionId); + } + const created = await this.client.session.create({ directory: opts.worktreePath }); + if (created.error || !created.data) { + throw new Error(`opencode-server: session.create failed: ${errToString(created.error)}`); + } + agentSessionId = created.data.id; + await this.sql` + INSERT INTO agent_sessions + (session_id, agent, backend, agent_session_id, server_port, status, last_active_at, config_hash) + VALUES + (${sessionId}, ${opts.agent}, 'opencode_server', ${agentSessionId}, ${this.port}, 'active', clock_timestamp(), ${configHash}) + ON CONFLICT (session_id, agent) DO UPDATE SET + backend = 'opencode_server', + agent_session_id = EXCLUDED.agent_session_id, + server_port = EXCLUDED.server_port, + status = 'active', + last_active_at = clock_timestamp(), + config_hash = EXCLUDED.config_hash + `; + } else { + await this.sql` + UPDATE agent_sessions + SET status = 'active', last_active_at = clock_timestamp(), server_port = ${this.port}, config_hash = ${configHash} + WHERE session_id = ${sessionId} AND agent = ${opts.agent} + `; + } + + // Both branches above guarantee agentSessionId is non-null. + const ocSessionId = agentSessionId!; + + // Start (or re-start) the SSE event loop scoped to this session's directory. + // opencode scopes events by the `directory` query param; without it events + // default to the server's CWD which doesn't match our worktree paths. + // + // KNOWN Phase 1 LIMITATION: one SSE stream at a time, scoped to a single + // directory. Under 1.9 concurrency, if two opencode sessions use different + // worktree directories simultaneously, re-subscribing for the second drops + // the first session's events (the watchdog backstop prevents a full hang, + // but streamed content is lost). Phase 2 should move to per-session SSE + // subscriptions or a directory-agnostic event path. + if (!this.sseRunning || this.sseDirectory !== opts.worktreePath) { + if (this.sseRunning && this.sseDirectory && this.sseDirectory !== opts.worktreePath) { + this.log.warn( + { prev: this.sseDirectory, next: opts.worktreePath }, + 'opencode-server: SSE directory changed — concurrent sessions will lose events from the previous directory', + ); + } + this.sseRunning = false; + this.startEventLoop(opts.worktreePath); + } + + // Register / refresh the demux entry the SSE loop keys on. Preserve an existing + // entry (and any in-flight turn) — just refresh the routing fields. + const existing = this.byOpencodeId.get(ocSessionId); + if (existing) { + existing.boocodeSessionId = sessionId; + existing.worktreePath = opts.worktreePath; + } else { + this.byOpencodeId.set(ocSessionId, { + boocodeSessionId: sessionId, + agentSessionId: ocSessionId, + worktreePath: opts.worktreePath, + streamedPartKeys: new Set(), + partTypeById: new Map(), + activeTurn: null, + watchdog: null, + }); + } + + return { + sessionId, + agent: opts.agent, + backend: 'opencode_server', + agentSessionId: ocSessionId, + serverPort: this.port, + }; + } + + // ─── prompt: send one turn (1.6) ───────────────────────────────────────────── + + async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise { + if (!this.client) throw new Error('opencode-server: client not ready'); + const oc = handle.agentSessionId; + if (!oc) throw new Error('opencode-server: handle has no agentSessionId'); + + let state = this.byOpencodeId.get(oc); + if (!state) { + state = { + boocodeSessionId: handle.sessionId, + agentSessionId: oc, + worktreePath: ctx.worktreePath, + streamedPartKeys: new Set(), + partTypeById: new Map(), + activeTurn: null, + watchdog: null, + }; + this.byOpencodeId.set(oc, state); + } + const session = state; + // Authoritative per-turn directory for SDK routing + reconcile. + session.worktreePath = ctx.worktreePath; + const client = this.client; + + return await new Promise((resolve) => { + let settled = false; + const cleanup = () => { + session.activeTurn = null; + if (session.watchdog) { + clearTimeout(session.watchdog); + session.watchdog = null; + } + session.streamedPartKeys.clear(); + session.partTypeById.clear(); + ctx.signal.removeEventListener('abort', onAbort); + }; + const settle = (r: TurnResult) => { + if (settled) return; + settled = true; + cleanup(); + resolve(r); + }; + const onAbort = () => { + // Abort the turn only — never the server. + client.session.abort({ sessionID: oc, directory: ctx.worktreePath }).catch(() => {}); + settle({ ok: false, error: 'aborted' }); + }; + + session.activeTurn = { onEvent: ctx.onEvent, settle }; + this.bumpActivity(session); // arm the inactivity backstop + + if (ctx.signal.aborted) { + onAbort(); + return; + } + ctx.signal.addEventListener('abort', onAbort, { once: true }); + + const model = parseModel(ctx.model); + client.session + .promptAsync({ + sessionID: oc, + directory: ctx.worktreePath, + parts: [{ type: 'text', text: input }], + ...(model ? { model } : {}), + }) + .then((res) => { + // promptAsync is fire-and-forget (204); the turn completes via session.idle. + // Only a submission error settles here. + if (res.error) settle({ ok: false, error: errToString(res.error) }); + }) + .catch((err) => settle({ ok: false, error: errMsg(err) })); + }); + } + + // ─── teardown ──────────────────────────────────────────────────────────────── + + async closeSession(handle: AgentSessionHandle): Promise { + if (handle.agentSessionId) this.byOpencodeId.delete(handle.agentSessionId); + await this.sql` + UPDATE agent_sessions SET status = 'closed' + WHERE session_id = ${handle.sessionId} AND agent = ${handle.agent} + `.catch(() => {}); + } + + async dispose(): Promise { + this.up = false; + const child = this.child; + this.child = null; + this.client = null; + this.byOpencodeId.clear(); + if (child && !child.killed) { + child.kill('SIGTERM'); + const t = setTimeout(() => { + if (!child.killed) child.kill('SIGKILL'); + }, 5_000); + t.unref(); + } + } +} + +// ─── helpers ────────────────────────────────────────────────────────────────── + +/** BooCoder model string "provider/model" → opencode's structured {providerID, modelID}. */ +function parseModel(model: string | undefined): { providerID: string; modelID: string } | undefined { + if (!model || !model.trim()) return undefined; + const trimmed = model.trim(); + const idx = trimmed.indexOf('/'); + if (idx > 0 && idx < trimmed.length - 1) { + return { providerID: trimmed.slice(0, idx), modelID: trimmed.slice(idx + 1) }; + } + // No slash but non-empty → infer llama-swap (the only configured provider). + // Guard against bare '/' or trailing/leading slash. + if (idx < 0 && trimmed.length > 0) { + return { providerID: 'llama-swap', modelID: trimmed }; + } + return undefined; +} + +/** Ported verbatim from Paseo opencode-agent.ts: id → message-id fallback → null. */ +function resolvePartDedupeKey(part: { id: string; messageID: string }, type: string): string | null { + if (part.id.trim().length > 0) return `${type}:${part.id}`; + if (part.messageID.trim().length > 0) return `${type}:message:${part.messageID}`; + return null; +} + +/** opencode ToolPart → ACP-shaped snapshot (reuses the existing persist/render path). */ +function toolPartToSnapshot(part: ToolPart): AcpToolSnapshot { + const state = part.state; + let rawInput: unknown; + let rawOutput: unknown; + let title: string | undefined; + if (state) { + if ('input' in state) rawInput = (state as { input?: unknown }).input; + if ('output' in state) rawOutput = (state as { output?: unknown }).output; + else if ('error' in state) rawOutput = (state as { error?: unknown }).error; + if ('title' in state) title = (state as { title?: string }).title; + } + return { + toolCallId: part.callID, + title: title ?? part.tool, + kind: null, + status: mapToolStatus(state?.status), + rawInput, + rawOutput, + }; +} + +function mapToolStatus(s: ToolState['status'] | undefined): ToolCallStatus | null { + switch (s) { + case 'pending': + return 'pending'; + case 'running': + return 'in_progress'; + case 'completed': + return 'completed'; + case 'error': + return 'failed'; + default: + return null; + } +} + +/** Bind-probe an ephemeral port on loopback. */ +function freePort(): Promise { + return new Promise((resolve, reject) => { + const srv = createServer(); + srv.unref(); + srv.on('error', reject); + srv.listen(0, '127.0.0.1', () => { + const addr = srv.address(); + if (addr && typeof addr === 'object') { + const { port } = addr; + srv.close(() => resolve(port)); + } else { + srv.close(() => reject(new Error('opencode-server: could not determine a free port'))); + } + }); + }); +} + +/** Resolve when the child prints the ready line; reject on timeout or early exit. */ +function waitForReady(child: ChildProcess, timeoutMs: number): Promise { + return new Promise((resolve, reject) => { + let done = false; + let stderrBuf = ''; + + const finish = (err?: Error) => { + if (done) return; + done = true; + clearTimeout(timer); + child.stdout?.off('data', onOut); + child.stderr?.off('data', onErr); + child.off('exit', onExit); + if (err) reject(err); + else resolve(); + }; + + const onOut = (buf: Buffer) => { + if (buf.toString().includes('opencode server listening on')) finish(); + }; + const onErr = (buf: Buffer) => { + stderrBuf += buf.toString(); + }; + const onExit = (code: number | null) => + finish(new Error(`opencode serve exited before ready (code ${code}); stderr: ${stderrBuf.slice(-2000)}`)); + const timer = setTimeout( + () => finish(new Error(`opencode serve not ready in ${timeoutMs}ms; stderr: ${stderrBuf.slice(-2000)}`)), + timeoutMs, + ); + + child.stdout?.on('data', onOut); + child.stderr?.on('data', onErr); + child.on('exit', onExit); + }); +} + +function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)); +} + +function errMsg(e: unknown): string { + return e instanceof Error ? e.message : String(e); +} + +function errToString(e: unknown): string { + if (e == null) return 'unknown error'; + if (typeof e === 'string') return e; + if (e instanceof Error) return e.message; + try { + return JSON.stringify(e); + } catch { + return String(e); + } +} + +/** Hash of stable config — detects model changes across sessions without + * invalidating on ephemeral state like the random server port (which changes + * every BooCoder restart). */ +function sessionConfigHash(model: string): string { + return createHash('sha256').update(`opencode_server|${model}`).digest('hex').slice(0, 16); +} diff --git a/apps/coder/src/services/dispatcher.ts b/apps/coder/src/services/dispatcher.ts index 4c6deb9..fe14c4d 100644 --- a/apps/coder/src/services/dispatcher.ts +++ b/apps/coder/src/services/dispatcher.ts @@ -3,13 +3,17 @@ import type { FastifyBaseLogger } from 'fastify'; import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/server/ws-frames'; import type { Config } from '../config.js'; -import { createWorktree, diffWorktree, cleanupWorktree } from './worktrees.js'; +import { createWorktree, diffWorktree, cleanupWorktree, ensureSessionWorktree } from './worktrees.js'; import { dispatchViaAcp } from './acp-dispatch.js'; import { getResolvedRegistry } from './provider-config-registry.js'; import { dispatchViaPty } from './pty-dispatch.js'; import { clearTaskCommands, setTaskCommands } from './agent-commands-cache.js'; import { getManifestCommands } from './provider-commands.js'; import { persistExternalAgentTurn } from './agent-turn-persist.js'; +import { snapshotToWireToolCall, type AcpToolSnapshot } from './acp-tool-snapshot.js'; +import { agentPool } from './agent-pool.js'; +import { OpenCodeServerBackend } from './backends/opencode-server.js'; +import type { AgentBackend, AgentEvent } from './agent-backend.js'; interface InferenceRunner { enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void; @@ -35,47 +39,65 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise | null = null; let listener: { unlisten: () => Promise } | null = null; - let running = false; + let polling = false; let stopping = false; - let inflightPromise: Promise | null = null; + // v2.6 (1.9): per-session in-flight registry replaces the global `running` + // boolean. Key = session_id (or `task:` for sessionless tasks). Sessions + // without an in-flight turn run concurrently; within a session, strictly one + // turn at a time. + const inflight = new Map>(); // Shared entry point for both the poll timer and the NOTIFY listener. poll()'s - // `running`/`stopping` guard makes this safe to call concurrently — a notify - // arriving mid-task returns immediately and never double-dispatches. + // `polling`/`stopping` guard makes this safe to call concurrently — a notify + // arriving mid-poll returns immediately and never double-dispatches. function triggerPoll(reason: string): void { poll().catch((err) => { log.error({ err, reason }, 'dispatcher: poll error'); }); } + function concurrencyKey(task: { id: string; session_id: string | null }): string { + return task.session_id ?? `task:${task.id}`; + } + async function poll(): Promise { - if (running || stopping) return; - - // Grab one pending task - const rows = await sql<{ - id: string; - project_id: string; - input: string; - agent: string | null; - model: string | null; - mode_id: string | null; - thinking_option_id: string | null; - session_id: string | null; - }[]>` - SELECT id, project_id, input, agent, model, mode_id, thinking_option_id, session_id - FROM tasks - WHERE state = 'pending' - ORDER BY created_at - LIMIT 1 - `; - if (rows.length === 0) return; - - const task = rows[0]!; - running = true; - inflightPromise = runTask(task).finally(() => { - running = false; - inflightPromise = null; - }); + // `polling` serializes poll() execution itself (timer + NOTIFY can fire + // concurrently) so we never double-select a task. It does NOT serialize task + // execution — that's what `inflight` (keyed per session) governs. + if (polling || stopping) return; + polling = true; + try { + // Oldest-first; start every pending task whose session isn't already busy. + const rows = await sql<{ + id: string; + project_id: string; + input: string; + agent: string | null; + model: string | null; + mode_id: string | null; + thinking_option_id: string | null; + session_id: string | null; + }[]>` + SELECT id, project_id, input, agent, model, mode_id, thinking_option_id, session_id + FROM tasks + WHERE state = 'pending' + ORDER BY created_at + LIMIT 50 + `; + for (const task of rows) { + if (stopping) break; + const key = concurrencyKey(task); + if (inflight.has(key)) continue; // this session already has an in-flight turn + // Register synchronously (before any await) so a later row in this pass + // with the same key is skipped and a concurrent poll can't re-pick it. + const p = runTask(task).finally(() => { + inflight.delete(key); + }); + inflight.set(key, p); + } + } finally { + polling = false; + } } async function runTask(task: { @@ -96,7 +118,13 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise { + const taskId = task.id; + const agent = 'opencode'; + log.info({ taskId, agent }, 'dispatcher: starting task (path B — opencode server)'); + + const [project] = await sql<{ path: string | null }[]>` + SELECT path FROM projects WHERE id = ${task.project_id} + `; + const projectPath = project?.path; + if (!projectPath) { + await sql` + UPDATE tasks + SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree' + WHERE id = ${taskId} + `; + return; + } + + const ac = new AbortController(); + + try { + // execution_path = 'acp' — the schema CHECK has no 'opencode_server' value + // (schema is frozen at Phase 0); the warm-vs-one-shot distinction lives in + // agent_sessions.backend. Reuse the closest existing value. + await sql` + UPDATE tasks + SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp' + WHERE id = ${taskId} + `; + + // Resolve session + chat (mirrors runExternalAgent). + let sessionId: string; + let chatId: string; + if (task.session_id) { + sessionId = task.session_id; + const chats = await sql<{ id: string }[]>` + SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1 + `; + if (chats.length === 0) { + const [chat] = await sql<{ id: string }[]>` + INSERT INTO chats (session_id, name, status) + VALUES (${sessionId}, 'External agent execution', 'open') + RETURNING id + `; + chatId = chat!.id; + } else { + chatId = chats[0]!.id; + } + } else { + const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`; + const [session] = await sql<{ id: string }[]>` + INSERT INTO sessions (project_id, name, model, status) + VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open') + RETURNING id + `; + sessionId = session!.id; + const [chat] = await sql<{ id: string }[]>` + INSERT INTO chats (session_id, name, status) + VALUES (${sessionId}, 'External agent execution', 'open') + RETURNING id + `; + chatId = chat!.id; + await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`; + } + + if (!task.session_id) { + await sql` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp()) + `; + } + + // Persistent, session-keyed worktree (shared across turns; NOT torn down + // per turn — Phase 3 reaps it). Captures base_commit for a stable diff. + const { worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, { + signal: ac.signal, + }); + log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready'); + + const [assistantMsg] = await sql<{ id: string }[]>` + INSERT INTO messages (session_id, chat_id, role, content, status, created_at) + VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp()) + RETURNING id + `; + const assistantId = assistantMsg!.id; + + broker.publishFrame(sessionId, { + type: 'message_started', + message_id: assistantId, + chat_id: chatId, + role: 'assistant', + } as WsFrame); + + const manifestCommands = getManifestCommands(agent); + if (manifestCommands.length > 0) { + setTaskCommands(taskId, manifestCommands); + broker.publishFrame(sessionId, { + type: 'agent_commands', + task_id: taskId, + session_id: sessionId, + commands: manifestCommands, + } as WsFrame); + } + + // Accumulate the turn's stream for persistence + the final message content. + const textChunks: string[] = []; + const reasoningChunks: string[] = []; + const toolSnaps = new Map(); + + // Map transport-agnostic AgentEvents → the SAME WS frames the ACP path emits. + // This boundary is where message_id/chat_id get attached (the backend never + // owns them). + const onEvent = (e: AgentEvent): void => { + switch (e.type) { + case 'text': + textChunks.push(e.text); + broker.publishFrame(sessionId, { + type: 'delta', + message_id: assistantId, + chat_id: chatId, + content: e.text, + } as WsFrame); + break; + case 'reasoning': + reasoningChunks.push(e.text); + broker.publishFrame(sessionId, { + type: 'reasoning_delta', + message_id: assistantId, + chat_id: chatId, + content: e.text, + } as WsFrame); + break; + case 'tool_call': + case 'tool_update': + toolSnaps.set(e.toolCall.toolCallId, e.toolCall); + broker.publishFrame(sessionId, { + type: 'tool_call', + message_id: assistantId, + chat_id: chatId, + tool_call: snapshotToWireToolCall(e.toolCall), + } as WsFrame); + break; + case 'commands': + // opencode-server doesn't emit these today; ignore if it ever does. + break; + } + }; + + // opencode expects provider-prefixed model ids (e.g. 'llama-swap/qwen3.6-35b…'). + // DEFAULT_MODEL is bare (no prefix) because native inference uses it directly + // against llama-swap. Coalesce empty string (frontend sends '' when no models + // listed) and prefix bare ids so parseModel always succeeds. + const rawModel = (task.model && task.model.trim()) || config.DEFAULT_MODEL; + const model = rawModel.includes('/') ? rawModel : `llama-swap/${rawModel}`; + const backend = getOpenCodeBackend(installPath); + const handle = await backend.ensureSession(sessionId, { + agent, + model, + worktreePath, + projectId: task.project_id, + }); + const result = await backend.prompt(handle, task.input, { + worktreePath, + model, + signal: ac.signal, + onEvent, + }); + + const assistantContent = textChunks.join('').slice(0, 50_000); + const reasoningText = reasoningChunks.join('').slice(0, 200_000); + const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'opencode turn failed').slice(0, 500); + + await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText); + + await sql` + UPDATE messages + SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp() + WHERE id = ${assistantId} + `; + broker.publishFrame(sessionId, { + type: 'message_complete', + message_id: assistantId, + chat_id: chatId, + } as WsFrame); + + if (stopping) { + await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`; + return; // worktree persists (no cleanup); backend stays warm + } + + // 1.10: diff the persistent worktree against its captured baseline and + // SUPERSEDE the session's prior pending row (latest-wins, one accumulating + // diff) instead of stacking. Stamp agent for DiffPanel attribution. + const diff = await diffWorktree(worktreePath, projectPath, { + signal: ac.signal, + baseRef: baseCommit ?? 'HEAD', + }); + if (diff) { + await sql` + DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending' + `; + await sql` + INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent) + VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent}) + `; + log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change'); + } else { + log.info({ taskId }, 'dispatcher: no changes detected in session worktree'); + } + + // NO worktree cleanup — it's persistent (Phase 3 reaps it). Backend stays warm. + + const [extCostRow] = await sql<{ total: number | null }[]>` + SELECT SUM(tokens_used)::int AS total + FROM messages + WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL + `; + const extCostTokens = extCostRow?.total ?? null; + + const finalState = result.ok ? 'completed' : 'failed'; + await sql` + UPDATE tasks + SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens} + WHERE id = ${taskId} + `; + log.info({ taskId, agent, finalState, costTokens: extCostTokens }, 'dispatcher: task finished (opencode server)'); + clearTaskCommands(taskId); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + log.error({ taskId, agent, err: errMsg }, 'dispatcher: opencode server error'); + await sql` + UPDATE tasks + SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)} + WHERE id = ${taskId} + `.catch(() => {}); + clearTaskCommands(taskId); + // No worktree cleanup (persistent); backend stays warm for the next turn. + } + } + // ─── Helpers ──────────────────────────────────────────────────────────────── async function waitForCompletion(assistantId: string): Promise { @@ -514,9 +810,9 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise 0) { + log.info({ count: inflight.size }, 'dispatcher: waiting for in-flight tasks'); + await Promise.allSettled([...inflight.values()]); } log.info('dispatcher: stopped'); }, diff --git a/apps/coder/src/services/provider-snapshot.ts b/apps/coder/src/services/provider-snapshot.ts index b0e4472..e9e6138 100644 --- a/apps/coder/src/services/provider-snapshot.ts +++ b/apps/coder/src/services/provider-snapshot.ts @@ -29,7 +29,7 @@ interface AgentRow { last_probed_at: string | Date | null; } -async function fetchLlamaSwapModels(config: Config): Promise { +export async function fetchLlamaSwapModels(config: Config): Promise { try { const res = await fetch(`${config.LLAMA_SWAP_URL}/v1/models`); if (!res.ok) return []; diff --git a/apps/coder/src/services/worktrees.ts b/apps/coder/src/services/worktrees.ts index f62ea5f..f7635a2 100644 --- a/apps/coder/src/services/worktrees.ts +++ b/apps/coder/src/services/worktrees.ts @@ -6,6 +6,7 @@ * After the agent completes, we diff the worktree against HEAD and * queue the diff into pending_changes. */ +import type { Sql } from '../db.js'; import { hostExec } from './host-exec.js'; const WORKTREE_BASE = '/tmp/booworktrees'; @@ -45,7 +46,7 @@ export async function createWorktree( export async function diffWorktree( worktreePath: string, projectPath: string, - opts?: { signal?: AbortSignal }, + opts?: { signal?: AbortSignal; baseRef?: string }, ): Promise { // First, commit any uncommitted changes in the worktree so we can diff branches // Stage all changes @@ -74,9 +75,13 @@ export async function diffWorktree( { signal: opts?.signal, timeoutMs: 15_000 }, ); - // Diff the worktree branch against the parent commit (HEAD of main tree) + // Diff the worktree branch against the baseline. Per-task callers default to the + // main tree's current HEAD; the session-worktree (opencode) path passes the + // captured base_commit so the accumulated diff is stable across turns even if + // project HEAD advances. + const baseRef = opts?.baseRef ?? 'HEAD'; const diffResult = await hostExec( - `git -C ${shellEscape(projectPath)} diff HEAD...$(git -C ${shellEscape(worktreePath)} rev-parse HEAD)`, + `git -C ${shellEscape(projectPath)} diff ${shellEscape(baseRef)}...$(git -C ${shellEscape(worktreePath)} rev-parse HEAD)`, { signal: opts?.signal, timeoutMs: 60_000 }, ); @@ -111,6 +116,72 @@ export async function cleanupWorktree( ).catch(() => {}); } +// ─── v2.6: session-keyed persistent worktree ──────────────────────────────── + +export interface SessionWorktree { + worktreePath: string; + baseCommit: string | null; +} + +/** + * v2.6: create-or-reuse ONE worktree per BooCode session (shared across all + * agents/turns in the session), recorded in `session_worktrees`. Unlike the + * per-task `createWorktree`, this persists — it is NOT torn down per turn + * (cleanup is Phase 3). Captures the project's current HEAD as `base_commit` + * so the accumulating diff has a stable baseline across turns. + * + * Distinct path namespace (`session-` branch, `/sess-` dir) so it never + * collides with the per-task worktrees that arena/new_task/MCP still use. + */ +export async function ensureSessionWorktree( + sql: Sql, + projectPath: string, + sessionId: string, + opts?: { signal?: AbortSignal }, +): Promise { + const [existing] = await sql<{ worktree_path: string; base_commit: string | null }[]>` + SELECT worktree_path, base_commit FROM session_worktrees WHERE session_id = ${sessionId} + `; + if (existing) { + return { worktreePath: existing.worktree_path, baseCommit: existing.base_commit }; + } + + const worktreePath = `${WORKTREE_BASE}/sess-${sessionId}`; + const branchName = `session-${sessionId}`; + + await hostExec(`mkdir -p ${WORKTREE_BASE}`, { signal: opts?.signal }); + + // Capture the baseline commit BEFORE branching, so the diff is stable even if + // project HEAD later advances. + const headResult = await hostExec( + `git -C ${shellEscape(projectPath)} rev-parse HEAD`, + { signal: opts?.signal, timeoutMs: 10_000 }, + ); + const baseCommit = headResult.exitCode === 0 ? headResult.stdout.trim() || null : null; + + const result = await hostExec( + `git -C ${shellEscape(projectPath)} worktree add ${shellEscape(worktreePath)} -b ${shellEscape(branchName)} HEAD`, + { signal: opts?.signal, timeoutMs: 30_000 }, + ); + if (result.exitCode !== 0) { + throw new Error(`Failed to create session worktree: ${result.stderr.trim() || result.stdout.trim()}`); + } + + // Persist. ON CONFLICT keeps the first writer's row if two turns race the create. + await sql` + INSERT INTO session_worktrees (session_id, worktree_path, base_commit) + VALUES (${sessionId}, ${worktreePath}, ${baseCommit}) + ON CONFLICT (session_id) DO NOTHING + `; + const [row] = await sql<{ worktree_path: string; base_commit: string | null }[]>` + SELECT worktree_path, base_commit FROM session_worktrees WHERE session_id = ${sessionId} + `; + return { + worktreePath: row?.worktree_path ?? worktreePath, + baseCommit: row?.base_commit ?? baseCommit, + }; +} + /** Minimal shell escape for paths (single-quote wrapping). */ function shellEscape(s: string): string { // Replace single quotes with escaped version, wrap in single quotes diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 967e61e..e9ceb84 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -63,6 +63,9 @@ importers: '@modelcontextprotocol/sdk': specifier: ^1.29.0 version: 1.29.0(zod@3.25.76) + '@opencode-ai/sdk': + specifier: ~1.15.0 + version: 1.15.12 fastify: specifier: ^4.28.1 version: 4.29.1 @@ -920,6 +923,9 @@ packages: '@open-draft/until@2.1.0': resolution: {integrity: sha512-U69T3ItWHvLwGg5eJ0n3I62nWuE6ilHlmz7zM0npLBRvPRd7e6NYmg54vvRtP5mZG7kZqZCFVdsTWo7BPtBujg==} + '@opencode-ai/sdk@1.15.12': + resolution: {integrity: sha512-lOaBNX93dkakZe6C42ttX1bkSx3K2c6+Yv+w8Qv02v5rPlu1vCXbmdfYDh9/bw+oq+NKPSaBm9d6kPA19hA5Lg==} + '@opentelemetry/api@1.9.1': resolution: {integrity: sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==} engines: {node: '>=8.0.0'} @@ -4702,6 +4708,10 @@ snapshots: '@open-draft/until@2.1.0': {} + '@opencode-ai/sdk@1.15.12': + dependencies: + cross-spawn: 7.0.6 + '@opentelemetry/api@1.9.1': {} '@pinojs/redact@0.4.0': {}