diff --git a/apps/coder/src/services/backends/opencode-server.ts b/apps/coder/src/services/backends/opencode-server.ts index babb29c..f056941 100644 --- a/apps/coder/src/services/backends/opencode-server.ts +++ b/apps/coder/src/services/backends/opencode-server.ts @@ -3,7 +3,9 @@ * * Warm, multi-turn backend for the `opencode` agent. One `opencode serve` HTTP * server per BooCoder process; one opencode session per BooCode session (resumed - * on switch-back); a single SSE read loop demuxes all sessions' events. + * on switch-back); one SSE read loop PER session, each scoped to that session's + * worktree directory so sessions in different directories stream concurrently + * (P1.5-a — replaced the Phase-1 single-stream-last-directory model). * * Implements the Phase 0 `AgentBackend` interface. Emits transport-agnostic * `AgentEvent`s — the dispatcher (Phase 1.7, NOT wired in this batch) maps them @@ -73,6 +75,9 @@ interface SessionState { activeTurn: TurnState | null; /** Inactivity backstop timer for the active turn; null when no turn in flight. */ watchdog: ReturnType | null; + /** Per-session SSE subscription handle. Non-null while the loop is running; + * aborting it tears down the underlying fetch and exits the loop. */ + sseAbort: AbortController | null; } export interface OpenCodeServerBackendDeps { @@ -94,7 +99,6 @@ export class OpenCodeServerBackend implements AgentBackend { private port: number | null = null; private up = false; private serverStarting: Promise | null = null; - private sseRunning = false; /** opencode session id → demux state. Maintained by ensureSession; read by the SSE loop. */ private readonly byOpencodeId = new Map(); @@ -150,37 +154,58 @@ export class OpenCodeServerBackend implements AgentBackend { // ─── SSE read loop + demux + translate (1.3) + dedup (1.4) ─────────────────── - /** Per-directory SSE subscription. opencode scopes events by directory (defaults - * to process.cwd if omitted) — so we must subscribe with the same directory used - * to create the session. Called from ensureSession; reconnects while up. */ - private startEventLoop(directory: string): void { - if (this.sseRunning) return; - this.sseRunning = true; - this.sseDirectory = directory; - void this.runEventLoop(directory); + /** Per-session SSE subscription, scoped to the session's worktree directory. + * opencode scopes events by the `directory` query param (defaults to the + * server's cwd if omitted), so two sessions in different worktrees each get + * their own dir-scoped stream and never drop each other's events. Idempotent: + * a no-op if this session's loop is already running. Started from ensureSession + * (and defensively from prompt) once worktreePath is known. */ + private startSessionEventLoop(state: SessionState): void { + if (state.sseAbort) return; // already running + const abort = new AbortController(); + state.sseAbort = abort; + void this.runSessionEventLoop(state, abort).finally(() => { + // Only clear if this controller is still the live one (a later restart may + // have already installed a new one). + if (state.sseAbort === abort) state.sseAbort = null; + }); } - private sseDirectory: string | null = null; - - private async runEventLoop(directory: string): Promise { - while (this.up && this.client) { + private async runSessionEventLoop(state: SessionState, abort: AbortController): Promise { + const signal = abort.signal; + while (this.up && this.client && !signal.aborted) { try { - const sub = await this.client.event.subscribe({ directory }); + // Re-read worktreePath each (re)subscribe so a directory refresh is picked + // up on reconnect. Passing `signal` lets close/dispose tear down a stream + // that's parked in `for await` between events. + const sub = await this.client.event.subscribe( + { directory: state.worktreePath }, + { signal }, + ); for await (const ev of sub.stream) { + if (signal.aborted) break; + // Dir-scoped streams should only carry this session's events, but two + // sessions sharing a worktree (possible post-P1.5-b) each receive BOTH + // sessions' events — so drop anything that isn't ours, else the other + // session's deltas get processed twice (once per loop). + const sid = eventSessionId(ev); + if (sid != null && sid !== state.agentSessionId) continue; this.dispatchEvent(ev); } - if (this.up) { - await this.reconcileInFlight(); + if (this.up && !signal.aborted) { + await this.reconcile(state); // recover an idle/error lost during the gap await sleep(SSE_RECONNECT_DELAY_MS); } } catch (err) { - if (!this.up) break; - this.log.warn({ err: errMsg(err) }, 'opencode-server: event loop error; reconnecting'); - await this.reconcileInFlight(); + if (!this.up || signal.aborted) break; + this.log.warn( + { err: errMsg(err), agentSessionId: state.agentSessionId }, + 'opencode-server: session event loop error; reconnecting', + ); + await this.reconcile(state); await sleep(SSE_RECONNECT_DELAY_MS); } } - this.sseRunning = false; } /** Demux one event to the owning session's active turn. Unknown/between-turns → drop. */ @@ -354,13 +379,6 @@ export class OpenCodeServerBackend implements AgentBackend { } } - /** Reconcile every in-flight turn against the server (called after an SSE drop). */ - private async reconcileInFlight(): Promise { - const states = [...this.byOpencodeId.values()].filter((s) => s.activeTurn); - if (states.length === 0) return; - await Promise.allSettled(states.map((s) => this.reconcile(s))); - } - /** * Ask the server whether this session's turn already finished — recovers a * session.idle/error lost during an SSE gap. Returns true if it settled the turn. @@ -451,35 +469,14 @@ export class OpenCodeServerBackend implements AgentBackend { // Both branches above guarantee agentSessionId is non-null. const ocSessionId = agentSessionId!; - // Start (or re-start) the SSE event loop scoped to this session's directory. - // opencode scopes events by the `directory` query param; without it events - // default to the server's CWD which doesn't match our worktree paths. - // - // KNOWN Phase 1 LIMITATION: one SSE stream at a time, scoped to a single - // directory. Under 1.9 concurrency, if two opencode sessions use different - // worktree directories simultaneously, re-subscribing for the second drops - // the first session's events (the watchdog backstop prevents a full hang, - // but streamed content is lost). Phase 2 should move to per-session SSE - // subscriptions or a directory-agnostic event path. - if (!this.sseRunning || this.sseDirectory !== opts.worktreePath) { - if (this.sseRunning && this.sseDirectory && this.sseDirectory !== opts.worktreePath) { - this.log.warn( - { prev: this.sseDirectory, next: opts.worktreePath }, - 'opencode-server: SSE directory changed — concurrent sessions will lose events from the previous directory', - ); - } - this.sseRunning = false; - this.startEventLoop(opts.worktreePath); - } - // Register / refresh the demux entry the SSE loop keys on. Preserve an existing // entry (and any in-flight turn) — just refresh the routing fields. - const existing = this.byOpencodeId.get(ocSessionId); - if (existing) { - existing.boocodeSessionId = sessionId; - existing.worktreePath = opts.worktreePath; + let state = this.byOpencodeId.get(ocSessionId); + if (state) { + state.boocodeSessionId = sessionId; + state.worktreePath = opts.worktreePath; } else { - this.byOpencodeId.set(ocSessionId, { + state = { boocodeSessionId: sessionId, agentSessionId: ocSessionId, worktreePath: opts.worktreePath, @@ -487,9 +484,16 @@ export class OpenCodeServerBackend implements AgentBackend { partTypeById: new Map(), activeTurn: null, watchdog: null, - }); + sseAbort: null, + }; + this.byOpencodeId.set(ocSessionId, state); } + // Start this session's own SSE loop, scoped to its worktree directory. Both + // fresh-create and resume reach here; idempotent, so a re-ensure (e.g. a + // second turn) won't spawn a duplicate loop. + this.startSessionEventLoop(state); + return { sessionId, agent: opts.agent, @@ -516,12 +520,17 @@ export class OpenCodeServerBackend implements AgentBackend { partTypeById: new Map(), activeTurn: null, watchdog: null, + sseAbort: null, }; this.byOpencodeId.set(oc, state); } const session = state; // Authoritative per-turn directory for SDK routing + reconcile. session.worktreePath = ctx.worktreePath; + // Defensive: ensureSession normally starts the loop, but if prompt is reached + // with a freshly-created state (no loop yet), start it so the turn streams. + // Idempotent when ensureSession already started one. + this.startSessionEventLoop(session); const client = this.client; return await new Promise((resolve) => { @@ -577,7 +586,11 @@ export class OpenCodeServerBackend implements AgentBackend { // ─── teardown ──────────────────────────────────────────────────────────────── async closeSession(handle: AgentSessionHandle): Promise { - if (handle.agentSessionId) this.byOpencodeId.delete(handle.agentSessionId); + if (handle.agentSessionId) { + // Stop this session's SSE loop before dropping its demux entry. + this.byOpencodeId.get(handle.agentSessionId)?.sseAbort?.abort(); + this.byOpencodeId.delete(handle.agentSessionId); + } await this.sql` UPDATE agent_sessions SET status = 'closed' WHERE session_id = ${handle.sessionId} AND agent = ${handle.agent} @@ -586,6 +599,8 @@ export class OpenCodeServerBackend implements AgentBackend { async dispose(): Promise { this.up = false; + // Abort every per-session SSE loop so none survive the teardown. + for (const st of this.byOpencodeId.values()) st.sseAbort?.abort(); const child = this.child; this.child = null; this.client = null; @@ -602,6 +617,20 @@ export class OpenCodeServerBackend implements AgentBackend { // ─── helpers ────────────────────────────────────────────────────────────────── +/** Extract the opencode sessionID an event belongs to, across event shapes. + * Most carry `properties.sessionID`; `message.part.updated` nests it under + * `properties.part.sessionID`. Returns null when the event has no session + * (the per-session loop then leaves it to dispatchEvent, which drops it). */ +function eventSessionId(ev: Event): string | null { + const props = (ev as { properties?: unknown }).properties; + if (!props || typeof props !== 'object') return null; + if (ev.type === 'message.part.updated') { + const part = (props as { part?: { sessionID?: string } }).part; + return part?.sessionID ?? null; + } + return (props as { sessionID?: string }).sessionID ?? null; +} + /** BooCoder model string "provider/model" → opencode's structured {providerID, modelID}. */ function parseModel(model: string | undefined): { providerID: string; modelID: string } | undefined { if (!model || !model.trim()) return undefined;