import { describe, it, expect, vi } from 'vitest'; import type { Event, OpencodeClient } from '@opencode-ai/sdk/v2/client'; import { reconnectDecision, runSessionEventLoop, DEFAULT_RECONNECT_POLICY, type SessionState, type SseLoopDeps, } from '../opencode-sse.js'; import { shouldStartServer } from '../opencode-server-process.js'; /** * v2.7 concurrency hardening (Phase 7): the pure decision cores for SSE reconnect * backoff + the ensureServer double-spawn guard, plus a deterministic exercise of * the loop's breaker (injected sleep, fake client). Happy path is asserted to be * unchanged (clean stream end → reset → base-delay reconnect). */ function freshState(): SessionState { return { boocodeSessionId: 'boo1', agentSessionId: 'oc1', worktreePath: '/wt', streamedPartKeys: new Set(), partTypeById: new Map(), activeTurn: { onEvent: () => {}, settle: () => {} }, watchdog: null, sseAbort: null, swallowNextTerminal: false, }; } const silentLog = { warn: () => {}, info: () => {}, error: () => {}, debug: () => {}, } as unknown as SseLoopDeps['log']; describe('reconnectDecision (pure backoff + breaker)', () => { it('first failure uses the base delay (matches pre-hardening flat delay)', () => { expect(reconnectDecision(1)).toEqual({ action: 'reconnect', delayMs: DEFAULT_RECONNECT_POLICY.baseMs }); }); it('grows exponentially and caps at maxMs', () => { const policy = { baseMs: 1000, maxMs: 30_000, maxAttempts: 10 }; expect(reconnectDecision(2, policy)).toEqual({ action: 'reconnect', delayMs: 2000 }); expect(reconnectDecision(3, policy)).toEqual({ action: 'reconnect', delayMs: 4000 }); expect(reconnectDecision(6, policy)).toEqual({ action: 'reconnect', delayMs: 30_000 }); // 32000 capped expect(reconnectDecision(9, policy)).toEqual({ action: 'reconnect', delayMs: 30_000 }); }); it('gives up once failures exceed maxAttempts', () => { const policy = { baseMs: 1, maxMs: 8, maxAttempts: 3 }; expect(reconnectDecision(3, policy).action).toBe('reconnect'); expect(reconnectDecision(4, policy)).toEqual({ action: 'give-up' }); }); }); describe('shouldStartServer (double-spawn guard)', () => { it('does not start when the server is live', () => { expect(shouldStartServer({ up: true, hasClient: true, serverStarting: true, childDead: false, startInFlight: false })).toBe(false); }); it('starts on a fresh process (no start in flight)', () => { expect(shouldStartServer({ up: false, hasClient: false, serverStarting: false, childDead: false, startInFlight: false })).toBe(true); }); it('re-spawns after a crash once the prior start finished', () => { expect(shouldStartServer({ up: false, hasClient: false, serverStarting: true, childDead: true, startInFlight: false })).toBe(true); }); it('does NOT double-spawn while a start is already in flight (the race fix)', () => { expect(shouldStartServer({ up: false, hasClient: false, serverStarting: true, childDead: true, startInFlight: true })).toBe(false); }); it('does NOT double-spawn when a crash nulled serverStarting mid-start', () => { // The narrow window: a crash during the in-flight start (await freePort) nulls // serverStarting while startInFlight is still true. The startInFlight guard must // win over the !serverStarting branch, else a second server spawns on a new port. expect(shouldStartServer({ up: false, hasClient: false, serverStarting: false, childDead: true, startInFlight: true })).toBe(false); }); it('waits (no spawn) when a cached start exists and the child is still alive', () => { expect(shouldStartServer({ up: false, hasClient: false, serverStarting: true, childDead: false, startInFlight: false })).toBe(false); }); }); describe('runSessionEventLoop — happy path (unchanged)', () => { it('dispatches streamed events, reconciles on clean end, reconnects at base delay', async () => { const state = freshState(); const abort = new AbortController(); const events = [ { type: 'session.next.text.delta', properties: { sessionID: 'oc1', delta: 'hi' } }, { type: 'session.idle', properties: { sessionID: 'oc1' } }, ] as unknown as Event[]; const client = { event: { subscribe: vi.fn(async () => ({ stream: (async function* () { for (const ev of events) yield ev; })(), })), }, } as unknown as OpencodeClient; const dispatched: Event[] = []; const sleeps: number[] = []; let reconciles = 0; const deps: SseLoopDeps = { isUp: () => true, getClient: () => client, dispatchEvent: (ev) => dispatched.push(ev), reconcile: async () => { reconciles += 1; abort.abort(); // stop the loop after the first clean cycle return false; }, onReconnectGiveUp: () => { throw new Error('should not give up on the happy path'); }, log: silentLog, sleep: async (ms) => { sleeps.push(ms); }, }; await runSessionEventLoop(state, abort, deps); expect(dispatched).toHaveLength(2); expect(reconciles).toBe(1); expect(sleeps).toEqual([DEFAULT_RECONNECT_POLICY.baseMs]); // base delay, not backed off }); }); describe('runSessionEventLoop — circuit breaker', () => { it('backs off on repeated throws then gives up + fails the turn', async () => { const state = freshState(); const abort = new AbortController(); const policy = { baseMs: 1, maxMs: 8, maxAttempts: 3 }; const subscribe = vi.fn(async () => { throw new Error('connection refused'); }); const client = { event: { subscribe } } as unknown as OpencodeClient; const sleeps: number[] = []; const gaveUp = vi.fn(); const deps: SseLoopDeps = { isUp: () => true, getClient: () => client, dispatchEvent: () => {}, reconcile: async () => false, onReconnectGiveUp: gaveUp, log: silentLog, sleep: async (ms) => { sleeps.push(ms); }, policy, }; await runSessionEventLoop(state, abort, deps); // 3 backoff sleeps (1, 2, 4), then the 4th failure trips the breaker. expect(sleeps).toEqual([1, 2, 4]); expect(subscribe).toHaveBeenCalledTimes(4); expect(gaveUp).toHaveBeenCalledTimes(1); expect(gaveUp).toHaveBeenCalledWith(state); }); });