/** * Per-session SSE subscribe loop + reconnect/backoff + eventSessionId demux. * * Extracted (v2.7 audit reshape) from `OpenCodeServerBackend.startSessionEventLoop` * / `runSessionEventLoop`. opencode scopes events by the `directory` query param, so * each session runs its own dir-scoped stream and never drops a sibling's events. * * The loop is intentionally thin: it owns subscribe + the demux filter + reconnect * timing only. Translating an event into turn side effects (watchdog, usage, * settle) stays on the backend via the injected `dispatchEvent` / `reconcile` * callbacks — `opencode-sse` knows nothing about turns or the DB. * * v2.7 concurrency hardening: the throw-driven reconnect path now backs off * exponentially and trips a circuit-breaker (`onReconnectGiveUp`) after a bounded * number of consecutive failures, instead of looping forever at a flat 1s. The * HAPPY PATH is unchanged — a clean stream end (server still up) reconnects after * `baseMs` (1s, as before) and resets the failure counter, so a long-lived session * that re-subscribes normally never backs off. */ import type { FastifyBaseLogger } from 'fastify'; import type { Event, OpencodeClient } from '@opencode-ai/sdk/v2/client'; import type { AgentEvent } from '../agent-backend.js'; import type { TurnResult } from '../agent-backend.js'; import { eventSessionId, errMsg } from './opencode-event-map.js'; export const SSE_RECONNECT_DELAY_MS = 1_000; /** One in-flight turn's emitter + completion settler. */ export interface TurnState { onEvent: (e: AgentEvent) => void; settle: (r: TurnResult) => void; } /** Per-(opencode session) demux state. dedup sets scoped here, cleared per turn. */ export interface SessionState { boocodeSessionId: string; agentSessionId: string; /** Worktree directory for SDK `directory` routing; refreshed each turn from ctx. */ worktreePath: string; /** dedup gate: `${type}:${id}` added on delta, deleted-and-tested on updated. Cleared at turn end. */ streamedPartKeys: Set; /** partID → 'text' | 'reasoning', so a delta with a non-'reasoning' field is still classed right. Cleared at turn end. */ partTypeById: Map; activeTurn: TurnState | null; /** Inactivity backstop timer for the active turn; null when no turn in flight. */ watchdog: ReturnType | null; /** Per-session SSE subscription handle. Non-null while the loop is running; * aborting it tears down the underlying fetch and exits the loop. */ sseAbort: AbortController | null; /** F.1 post-abort orphan-terminal guard: swallow the one session.idle/error * opencode emits for an aborted turn so it can't settle the next turn. */ swallowNextTerminal: boolean; } // ─── reconnect backoff (pure) ──────────────────────────────────────────────── export interface ReconnectPolicy { /** First retry delay (and the steady-state clean-reconnect delay). */ baseMs: number; /** Cap on the exponential delay. */ maxMs: number; /** Consecutive failures tolerated before the breaker trips (give up). */ maxAttempts: number; } export const DEFAULT_RECONNECT_POLICY: ReconnectPolicy = { baseMs: SSE_RECONNECT_DELAY_MS, maxMs: 30_000, maxAttempts: 6, }; export type ReconnectDecision = | { action: 'reconnect'; delayMs: number } | { action: 'give-up' }; /** * Pure backoff decision after `failures` consecutive throwing reconnect attempts * (1-based: the first failure passes `failures=1`). Returns an exponentially * growing delay capped at `maxMs`, or `give-up` once the count exceeds * `maxAttempts`. `failures=1` yields `baseMs`, so the very first retry matches the * pre-hardening flat delay (happy-path-preserving). */ export function reconnectDecision( failures: number, policy: ReconnectPolicy = DEFAULT_RECONNECT_POLICY, ): ReconnectDecision { if (failures > policy.maxAttempts) return { action: 'give-up' }; const exp = policy.baseMs * 2 ** (failures - 1); return { action: 'reconnect', delayMs: Math.min(policy.maxMs, exp) }; } // ─── the loop ──────────────────────────────────────────────────────────────── export interface SseLoopDeps { /** Live iff the server is up (read each iteration so a crash stops the loop). */ isUp: () => boolean; /** The current opencode client (null between server restarts). */ getClient: () => OpencodeClient | null; /** Route one demuxed event to its turn (backend side effects live here). */ dispatchEvent: (ev: Event) => void; /** Recover an idle/error lost during an SSE gap. Returns true if it settled. */ reconcile: (state: SessionState) => Promise; /** Circuit-breaker: called once the backoff gives up; fail the active turn. */ onReconnectGiveUp: (state: SessionState) => Promise | void; log: FastifyBaseLogger; /** Injectable for tests; defaults to a real timer sleep. */ sleep?: (ms: number) => Promise; policy?: ReconnectPolicy; } function defaultSleep(ms: number): Promise { return new Promise((r) => setTimeout(r, ms)); } /** Per-session SSE subscription, scoped to the session's worktree directory. * Idempotent: a no-op if this session's loop is already running. */ export function startSessionEventLoop(state: SessionState, deps: SseLoopDeps): void { if (state.sseAbort) return; // already running const abort = new AbortController(); state.sseAbort = abort; void runSessionEventLoop(state, abort, deps).finally(() => { // Only clear if this controller is still the live one (a later restart may // have already installed a new one). if (state.sseAbort === abort) state.sseAbort = null; }); } export async function runSessionEventLoop( state: SessionState, abort: AbortController, deps: SseLoopDeps, ): Promise { const signal = abort.signal; const sleep = deps.sleep ?? defaultSleep; const policy = deps.policy ?? DEFAULT_RECONNECT_POLICY; let failures = 0; while (deps.isUp() && deps.getClient() && !signal.aborted) { const client = deps.getClient()!; try { // Re-read worktreePath each (re)subscribe so a directory refresh is picked // up on reconnect. Passing `signal` lets close/dispose tear down a stream // that's parked in `for await` between events. const sub = await client.event.subscribe({ directory: state.worktreePath }, { signal }); for await (const ev of sub.stream) { if (signal.aborted) break; // Dir-scoped streams should only carry this session's events, but two // sessions sharing a worktree (possible post-P1.5-b) each receive BOTH // sessions' events — so drop anything that isn't ours, else the other // session's deltas get processed twice (once per loop). const sid = eventSessionId(ev); if (sid != null && sid !== state.agentSessionId) continue; deps.dispatchEvent(ev); } // Clean stream end — a healthy reconnect, NOT a failure: recover any lost // terminal then re-subscribe at the base delay (pre-hardening behavior). failures = 0; if (deps.isUp() && !signal.aborted) { await deps.reconcile(state); // recover an idle/error lost during the gap await sleep(policy.baseMs); } } catch (err) { if (!deps.isUp() || signal.aborted) break; failures += 1; const decision = reconnectDecision(failures, policy); deps.log.warn( { err: errMsg(err), agentSessionId: state.agentSessionId, failures, action: decision.action }, 'opencode-server: session event loop error; reconnecting', ); await deps.reconcile(state); if (decision.action === 'give-up') { deps.log.warn( { agentSessionId: state.agentSessionId, failures }, 'opencode-server: SSE reconnect gave up (circuit breaker) — failing active turn', ); await deps.onReconnectGiveUp(state); break; } await sleep(decision.delayMs); } } }