/** * v2.6 Phase 1 — OpenCodeServerBackend (slimmed, v2.7 audit reshape). * * Warm, multi-turn backend for the `opencode` agent. One `opencode serve` HTTP * server per BooCoder process; one opencode session per BooCode session (resumed * on switch-back); one SSE read loop PER session, each scoped to that session's * worktree directory so sessions in different directories stream concurrently. * * This file is now just the `AgentBackend` SURFACE — ensureSession / prompt / * accumulateUsage / closeSession + the per-session demux side effects (watchdog, * reconcile, usage). It composes three extracted collaborators: * - `OpenCodeServerSupervisor` (opencode-server-process.ts) — child/client/port/ * health lifecycle, spawn/crash/restart/dispose. * - the per-session SSE loop (opencode-sse.ts) — subscribe + reconnect/backoff. * - the pure event map (opencode-event-map.ts) — Event → AgentEvent translation, * dedup gate, dcp-strip, tool-snapshot. * * Implements the Phase 0 `AgentBackend` interface. Emits transport-agnostic * `AgentEvent`s; the dispatcher maps them to WS frames. * * Spec: openspec/changes/v2-6-persistent-agent-sessions/design.md §2 / §2a. */ import { createHash } from 'node:crypto'; import type { FastifyBaseLogger } from 'fastify'; import type { Event, AssistantMessage } from '@opencode-ai/sdk/v2/client'; import type { Sql } from '../../db.js'; import { armAbortGuard, noteTurnActivity, consumeTerminal } from './turn-guard.js'; import { stepEndedToUsage, type StepUsage } from './opencode-usage.js'; import { OpenCodeServerSupervisor, type ServerDownInfo } from './opencode-server-process.js'; import { startSessionEventLoop, type SessionState, type TurnState, type SseLoopDeps, } from './opencode-sse.js'; import { classifyPartDelta, classifyUpdatedPart, toolCalledSnapshot, toolSuccessSnapshot, toolFailedSnapshot, stripDcpTags, errMsg, errToString, } from './opencode-event-map.js'; import type { AgentBackend, AgentSessionHandle, EnsureSessionOpts, PromptCtx, TurnResult, } from '../agent-backend.js'; /** * No-activity backstop for an in-flight turn. opencode streams reasoning/text/tool * deltas continuously while working, so "zero events for this long" means the turn * is wedged or its terminal event (session.idle) was lost. Generous so a * legitimately slow turn never trips it. */ const TURN_INACTIVITY_MS = 180_000; export interface OpenCodeServerBackendDeps { sql: Sql; log: FastifyBaseLogger; /** Absolute path to the opencode binary (resolved from available_agents at wiring time, Phase 1.7). */ opencodeBinary: string; } export class OpenCodeServerBackend implements AgentBackend { readonly backend = 'opencode_server' as const; private readonly sql: Sql; private readonly log: FastifyBaseLogger; private readonly supervisor: OpenCodeServerSupervisor; /** opencode session id → demux state. Maintained by ensureSession; read by the SSE loop. */ private readonly byOpencodeId = new Map(); /** Coalesces concurrent ensureSession calls for the same (chat, agent) key. */ private readonly ensuring = new Map>(); constructor(deps: OpenCodeServerBackendDeps) { this.sql = deps.sql; this.log = deps.log; this.supervisor = new OpenCodeServerSupervisor({ opencodeBinary: deps.opencodeBinary, log: deps.log, hooks: { isBusy: () => this.isBusy(), onServerDown: (info) => this.onServerDown(info), }, }); } /** §2: liveness for the health endpoint + dispatcher fallback decision. */ health(): 'up' | 'down' { return this.supervisor.health(); } /** Phase 3: busy iff ANY pooled opencode session has an in-flight turn. */ isBusy(): boolean { for (const st of this.byOpencodeId.values()) { if (st.activeTurn) return true; } return false; } /** Phase 3 proactive health probe + busy-aware self-restart, run by the pool's * periodic sweep. Delegates to the supervisor. */ async tickHealth(now: number = Date.now()): Promise { await this.supervisor.tickHealth(now); } /** * Server down (crash-exit or forced restart): fail every in-flight turn so its * dispatcher unblocks, mark each session crashed so ensureSession won't resume a * now-dead native id, and tear down the SSE loops + demux state. Invoked by the * supervisor (it owns the process/port reset). Mirrors the original * handleServerCrash session-half byte-for-byte. */ private onServerDown(info: ServerDownInfo): void { const states = [...this.byOpencodeId.values()]; this.log.warn( { code: info.code, signal: info.signal, port: info.port, liveSessions: states.length }, 'opencode-server: child exited — recovering (fail in-flight, mark crashed, re-spawn next turn)', ); const crashedIds: string[] = []; for (const st of states) { st.sseAbort?.abort(); if (st.activeTurn) { st.activeTurn.settle({ ok: false, error: 'opencode server crashed mid-turn' }); st.activeTurn = null; } if (st.watchdog) { clearTimeout(st.watchdog); st.watchdog = null; } crashedIds.push(st.agentSessionId); } // Drop the demux map: every session id is stale against a fresh server. this.byOpencodeId.clear(); if (crashedIds.length > 0) { this.sql` UPDATE agent_sessions SET status = 'crashed' WHERE agent_session_id = ANY(${crashedIds}) AND status <> 'closed' `.catch((err) => { this.log.warn({ err: errMsg(err) }, 'opencode-server: failed to mark crashed sessions (non-fatal)'); }); } } // ─── SSE loop wiring ───────────────────────────────────────────────────────── /** The dependency bundle the per-session SSE loop reads. */ private sseDeps(): SseLoopDeps { return { isUp: () => this.supervisor.isUp(), getClient: () => this.supervisor.client, dispatchEvent: (ev) => this.dispatchEvent(ev), reconcile: (st) => this.reconcile(st), onReconnectGiveUp: (st) => this.onReconnectGiveUp(st), log: this.log, }; } /** Demux one event to the owning session's active turn. Unknown/between-turns → drop. */ private dispatchEvent(ev: Event): void { switch (ev.type) { // ─── session.next.* — live streaming events (the primary path) ───────── case 'session.next.text.delta': { const p = ev.properties; const st = this.byOpencodeId.get(p.sessionID); if (!st?.activeTurn) return; this.bumpActivity(st); const cleaned = stripDcpTags(p.delta); if (cleaned) st.activeTurn.onEvent({ type: 'text', text: cleaned }); return; } case 'session.next.reasoning.delta': { const p = ev.properties; const st = this.byOpencodeId.get(p.sessionID); if (!st?.activeTurn) return; this.bumpActivity(st); st.activeTurn.onEvent({ type: 'reasoning', text: p.delta }); return; } case 'session.next.tool.called': { const p = ev.properties; const st = this.byOpencodeId.get(p.sessionID); if (!st?.activeTurn) return; this.bumpActivity(st); st.activeTurn.onEvent({ type: 'tool_call', toolCall: toolCalledSnapshot(p) }); return; } case 'session.next.tool.success': { const p = ev.properties; const st = this.byOpencodeId.get(p.sessionID); if (!st?.activeTurn) return; this.bumpActivity(st); st.activeTurn.onEvent({ type: 'tool_update', toolCall: toolSuccessSnapshot(p) }); return; } case 'session.next.tool.failed': { const p = ev.properties; const st = this.byOpencodeId.get(p.sessionID); if (!st?.activeTurn) return; this.bumpActivity(st); st.activeTurn.onEvent({ type: 'tool_update', toolCall: toolFailedSnapshot(p) }); return; } // ─── per-step usage (U.6) — token/cost accounting for opencode sessions ── case 'session.next.step.ended': { const p = ev.properties; const st = this.byOpencodeId.get(p.sessionID); if (!st?.activeTurn) return; this.bumpActivity(st); // Accumulate this step's normalized usage onto the (chat_id, agent) row. // Fire-and-forget: a DB hiccup must not stall the turn. const usage = stepEndedToUsage(p); void this.accumulateUsage(st, usage); return; } // ─── message.part.* — terminal/post-hoc events (dedup gate) ──────────── case 'message.part.delta': { const p = ev.properties; const st = this.byOpencodeId.get(p.sessionID); if (!st?.activeTurn) return; this.bumpActivity(st); const e = classifyPartDelta(p, st); if (e) st.activeTurn.onEvent(e); return; } case 'message.part.updated': { const part = ev.properties.part; const st = this.byOpencodeId.get(part.sessionID); if (!st?.activeTurn) return; this.bumpActivity(st); const e = classifyUpdatedPart(part, st); if (e) st.activeTurn.onEvent(e); return; } // ─── lifecycle ───────────────────────────────────────────────────────── case 'session.idle': { const st = this.byOpencodeId.get(ev.properties.sessionID); if (!st) return; if (consumeTerminal(st) === 'swallow') return; // F.1: drop the post-abort orphan st.activeTurn?.settle({ ok: true }); return; } case 'session.error': { const sid = ev.properties.sessionID; if (!sid) return; const st = this.byOpencodeId.get(sid); if (!st) return; if (consumeTerminal(st) === 'swallow') return; // F.1: drop the post-abort orphan st.activeTurn?.settle({ ok: false, error: errToString(ev.properties.error) }); return; } default: return; } } // ─── turn-completion resilience (watchdog + reconnect reconcile) ───────────── /** Reset the inactivity backstop on any event routed to a session's active turn. */ private bumpActivity(st: SessionState): void { if (!st.activeTurn) return; // A live turn is producing → the post-abort orphan-terminal window is over. noteTurnActivity(st); if (st.watchdog) clearTimeout(st.watchdog); st.watchdog = setTimeout(() => { void this.onTurnStall(st); }, TURN_INACTIVITY_MS); st.watchdog.unref?.(); } /** Watchdog fired: reconcile once; if still-running we can't tell, so fail closed. * Also mark the agent_sessions row crashed so a stale session isn't resumed. */ private async onTurnStall(st: SessionState): Promise { const settled = await this.reconcile(st); if (!settled) { this.log.warn({ agentSessionId: st.agentSessionId }, 'opencode-server: turn stalled (no activity), failing + marking crashed'); await this.sql` UPDATE agent_sessions SET status = 'crashed' WHERE agent_session_id = ${st.agentSessionId} `.catch(() => {}); st.activeTurn?.settle({ ok: false, error: 'turn timed out (no activity)' }); } } /** SSE circuit-breaker fired (reconnect gave up): fail the active turn + mark the * session crashed so it isn't resumed. The next turn re-creates a fresh session. */ private async onReconnectGiveUp(st: SessionState): Promise { if (!st.activeTurn) return; await this.sql` UPDATE agent_sessions SET status = 'crashed' WHERE agent_session_id = ${st.agentSessionId} `.catch(() => {}); st.activeTurn?.settle({ ok: false, error: 'opencode SSE stream lost (reconnect gave up)' }); } /** * Ask the server whether this session's turn already finished — recovers a * session.idle/error lost during an SSE gap. Returns true if it settled the turn. */ private async reconcile(st: SessionState): Promise { const turn = st.activeTurn; const client = this.supervisor.client; if (!turn || !client) return false; try { const res = await client.session.messages({ sessionID: st.agentSessionId, directory: st.worktreePath, }); if (res.error || !res.data) return false; let lastAssistant: AssistantMessage | undefined; for (let i = res.data.length - 1; i >= 0; i--) { const info = res.data[i]!.info; if (info.role === 'assistant') { lastAssistant = info; break; } } if (!lastAssistant) return false; if (lastAssistant.error != null) { turn.settle({ ok: false, error: errToString(lastAssistant.error) }); return true; } if (lastAssistant.time.completed != null) { turn.settle({ ok: true }); return true; } return false; // still running — the live stream will deliver session.idle } catch { return false; // inconclusive — watchdog backstop covers it } } // ─── per-step usage persistence (U.6) ──────────────────────────────────────── /** * Accumulate one `session.next.step.ended`'s normalized usage onto the session's * agent_sessions row. Running totals for the whole conversation context. Zero-delta * steps are skipped. Errors are swallowed: usage telemetry must never fail a turn. */ private async accumulateUsage(st: SessionState, u: StepUsage): Promise { if (u.input === 0 && u.output === 0 && u.cost === 0) return; try { await this.sql` UPDATE agent_sessions SET input_tokens = input_tokens + ${u.input}, output_tokens = output_tokens + ${u.output}, cost = cost + ${u.cost} WHERE agent_session_id = ${st.agentSessionId} `; } catch (err) { this.log.warn( { err: errMsg(err), agentSessionId: st.agentSessionId }, 'opencode-server: failed to persist step usage (non-fatal)', ); } } // ─── ensureSession: create-or-resume against agent_sessions (1.5) ──────────── async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise { // Coalesce concurrent first-turns for the same (chat, agent) so the SELECT… // create…upsert can't race into two opencode sessions (the second orphaning // the first). A single (non-concurrent) call is unaffected — the entry is set // and removed within this call. Defensive: the dispatcher already serializes // turns per (chat, agent) via its inflight map. const key = `${opts.chatId}:${opts.agent}`; const existing = this.ensuring.get(key); if (existing) return existing; const p = this.ensureSessionInner(sessionId, opts).finally(() => { if (this.ensuring.get(key) === p) this.ensuring.delete(key); }); this.ensuring.set(key, p); return p; } private async ensureSessionInner(sessionId: string, opts: EnsureSessionOpts): Promise { await this.supervisor.ensureServer(); const client = this.supervisor.client; if (!client) throw new Error('opencode-server: client not ready after ensureServer'); const configHash = sessionConfigHash(opts.model); // P1.5-b: agent_sessions is keyed (chat_id, agent) — the tab/chat is the // context unit (two tabs in one session = two contexts sharing one worktree). const [row] = await this.sql<{ agent_session_id: string | null; status: string; config_hash: string | null }[]>` SELECT agent_session_id, status, config_hash FROM agent_sessions WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent} `; let agentSessionId = row?.agent_session_id ?? null; // Don't resume crashed sessions or sessions whose config drifted (model change). const shouldResume = agentSessionId && row!.status !== 'crashed' && (row!.config_hash == null || row!.config_hash === configHash); if (!shouldResume) { if (agentSessionId) { this.log.info({ sessionId, oldStatus: row!.status, hashMatch: row!.config_hash === configHash }, 'opencode-server: not resuming stale session, creating fresh'); this.byOpencodeId.delete(agentSessionId); } const created = await client.session.create({ directory: opts.worktreePath }); if (created.error || !created.data) { throw new Error(`opencode-server: session.create failed: ${errToString(created.error)}`); } agentSessionId = created.data.id; await this.sql` INSERT INTO agent_sessions (chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at, config_hash) VALUES (${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'opencode_server', ${agentSessionId}, ${this.supervisor.port}, 'active', clock_timestamp(), ${configHash}) ON CONFLICT (chat_id, agent) DO UPDATE SET session_id = EXCLUDED.session_id, worktree_id = EXCLUDED.worktree_id, backend = 'opencode_server', agent_session_id = EXCLUDED.agent_session_id, server_port = EXCLUDED.server_port, status = 'active', last_active_at = clock_timestamp(), config_hash = EXCLUDED.config_hash `; } else { await this.sql` UPDATE agent_sessions SET status = 'active', last_active_at = clock_timestamp(), server_port = ${this.supervisor.port}, config_hash = ${configHash} WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent} `; } // Both branches above guarantee agentSessionId is non-null. const ocSessionId = agentSessionId!; // Register / refresh the demux entry the SSE loop keys on. Preserve an existing // entry (and any in-flight turn) — just refresh the routing fields. let state = this.byOpencodeId.get(ocSessionId); if (state) { state.boocodeSessionId = sessionId; state.worktreePath = opts.worktreePath; } else { state = this.makeSessionState(sessionId, ocSessionId, opts.worktreePath); this.byOpencodeId.set(ocSessionId, state); } // Start this session's own SSE loop, scoped to its worktree directory. Both // fresh-create and resume reach here; idempotent. startSessionEventLoop(state, this.sseDeps()); return { sessionId, agent: opts.agent, backend: 'opencode_server', chatId: opts.chatId, worktreeId: opts.worktreeId, agentSessionId: ocSessionId, serverPort: this.supervisor.port, }; } /** Fresh per-(opencode session) demux state. */ private makeSessionState(boocodeSessionId: string, agentSessionId: string, worktreePath: string): SessionState { return { boocodeSessionId, agentSessionId, worktreePath, streamedPartKeys: new Set(), partTypeById: new Map(), activeTurn: null, watchdog: null, sseAbort: null, swallowNextTerminal: false, }; } // ─── prompt: send one turn (1.6) ───────────────────────────────────────────── async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise { const client = this.supervisor.client; if (!client) throw new Error('opencode-server: client not ready'); const oc = handle.agentSessionId; if (!oc) throw new Error('opencode-server: handle has no agentSessionId'); let state = this.byOpencodeId.get(oc); if (!state) { state = this.makeSessionState(handle.sessionId, oc, ctx.worktreePath); this.byOpencodeId.set(oc, state); } const session = state; // v2.7 busy-assert: one in-flight turn per session. The dispatcher serializes // turns per (chat, agent), so this never fires in normal dispatch — but if a // second prompt arrives while one is live it would silently overwrite the slot // and orphan the first turn, so reject instead. if (session.activeTurn) { return { ok: false, error: 'opencode-server: session already has an in-flight turn' }; } // Authoritative per-turn directory for SDK routing + reconcile. session.worktreePath = ctx.worktreePath; // Defensive: ensureSession normally starts the loop, but if prompt is reached // with a freshly-created state (no loop yet), start it so the turn streams. startSessionEventLoop(session, this.sseDeps()); return await new Promise((resolve) => { let settled = false; const cleanup = () => { session.activeTurn = null; if (session.watchdog) { clearTimeout(session.watchdog); session.watchdog = null; } session.streamedPartKeys.clear(); session.partTypeById.clear(); ctx.signal.removeEventListener('abort', onAbort); }; const settle = (r: TurnResult) => { if (settled) return; settled = true; cleanup(); resolve(r); }; const onAbort = () => { // Abort the turn only — never the server. client.session.abort({ sessionID: oc, directory: ctx.worktreePath }).catch(() => {}); // F.1: opencode emits one trailing session.idle/error for the cancelled // turn — arm the guard so it's swallowed, not used to settle the next turn. armAbortGuard(session); settle({ ok: false, error: 'aborted' }); }; const turn: TurnState = { onEvent: ctx.onEvent, settle }; session.activeTurn = turn; this.bumpActivity(session); // arm the inactivity backstop if (ctx.signal.aborted) { onAbort(); return; } ctx.signal.addEventListener('abort', onAbort, { once: true }); const model = parseModel(ctx.model); client.session .promptAsync({ sessionID: oc, directory: ctx.worktreePath, parts: [{ type: 'text', text: input }], ...(model ? { model } : {}), }) .then((res) => { // promptAsync is fire-and-forget (204); the turn completes via session.idle. // Only a submission error settles here. if (res.error) settle({ ok: false, error: errToString(res.error) }); }) .catch((err) => settle({ ok: false, error: errMsg(err) })); }); } // ─── teardown ──────────────────────────────────────────────────────────────── async closeSession(handle: AgentSessionHandle): Promise { if (handle.agentSessionId) { // Stop this session's SSE loop before dropping its demux entry. this.byOpencodeId.get(handle.agentSessionId)?.sseAbort?.abort(); this.byOpencodeId.delete(handle.agentSessionId); } await this.sql` UPDATE agent_sessions SET status = 'closed' WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent} `.catch(() => {}); } async dispose(): Promise { // Abort every per-session SSE loop so none survive the teardown. for (const st of this.byOpencodeId.values()) st.sseAbort?.abort(); this.byOpencodeId.clear(); await this.supervisor.dispose(); } } // ─── helpers ────────────────────────────────────────────────────────────────── /** BooCoder model string "provider/model" → opencode's structured {providerID, modelID}. */ function parseModel(model: string | undefined): { providerID: string; modelID: string } | undefined { if (!model || !model.trim()) return undefined; const trimmed = model.trim(); const idx = trimmed.indexOf('/'); if (idx > 0 && idx < trimmed.length - 1) { return { providerID: trimmed.slice(0, idx), modelID: trimmed.slice(idx + 1) }; } // No slash but non-empty → infer llama-swap (the only configured provider). if (idx < 0 && trimmed.length > 0) { return { providerID: 'llama-swap', modelID: trimmed }; } return undefined; } /** Hash of stable config — detects model changes across sessions without * invalidating on ephemeral state like the random server port. */ function sessionConfigHash(model: string): string { return createHash('sha256').update(`opencode_server|${model}`).digest('hex').slice(0, 16); }