feat(coder): per-session SSE subscriptions (P1.5-a concurrency prereq)

Replace the single global SSE loop (scoped to the most-recently-used worktree directory) with one subscription per live opencode session, each scoped to that session's worktree dir. Two sessions in different worktrees now stream concurrently instead of the second silently dropping the first's events. Each session owns an AbortController (SessionState.sseAbort) wired into subscribe(..., {signal}); the loop reconnects, reconciles (per-session), and is torn down on closeSession/dispose by aborting the signal — which also fixes a latent Phase-1 bug where switching directories left the old runEventLoop parked forever in its for-await (zombie loops). A sessionID demux guard (eventSessionId) drops events that aren't this loop's own, so two sessions sharing a worktree (possible after P1.5-b) don't double-process each other's deltas. Removed sseRunning/sseDirectory/startEventLoop/runEventLoop/reconcileInFlight and the 'SSE directory changed' collision warning. dispatchEvent/handleUpdatedPart (translation, dedup, dcp-strip) and the watchdog are unchanged — only the subscription topology changed. SDK confirmed: @opencode-ai/sdk Event.subscribe opens an independent SSE connection per call, so N concurrent dir-scoped streams are supported. No schema/dispatcher/frontend changes; runExternalAgent untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-30 22:15:55 +00:00
parent 3a26563be2
commit f69ea5f494

View File

@@ -3,7 +3,9 @@
* *
* Warm, multi-turn backend for the `opencode` agent. One `opencode serve` HTTP * Warm, multi-turn backend for the `opencode` agent. One `opencode serve` HTTP
* server per BooCoder process; one opencode session per BooCode session (resumed * server per BooCoder process; one opencode session per BooCode session (resumed
* on switch-back); a single SSE read loop demuxes all sessions' events. * on switch-back); one SSE read loop PER session, each scoped to that session's
* worktree directory so sessions in different directories stream concurrently
* (P1.5-a — replaced the Phase-1 single-stream-last-directory model).
* *
* Implements the Phase 0 `AgentBackend` interface. Emits transport-agnostic * Implements the Phase 0 `AgentBackend` interface. Emits transport-agnostic
* `AgentEvent`s — the dispatcher (Phase 1.7, NOT wired in this batch) maps them * `AgentEvent`s — the dispatcher (Phase 1.7, NOT wired in this batch) maps them
@@ -73,6 +75,9 @@ interface SessionState {
activeTurn: TurnState | null; activeTurn: TurnState | null;
/** Inactivity backstop timer for the active turn; null when no turn in flight. */ /** Inactivity backstop timer for the active turn; null when no turn in flight. */
watchdog: ReturnType<typeof setTimeout> | null; watchdog: ReturnType<typeof setTimeout> | null;
/** Per-session SSE subscription handle. Non-null while the loop is running;
* aborting it tears down the underlying fetch and exits the loop. */
sseAbort: AbortController | null;
} }
export interface OpenCodeServerBackendDeps { export interface OpenCodeServerBackendDeps {
@@ -94,7 +99,6 @@ export class OpenCodeServerBackend implements AgentBackend {
private port: number | null = null; private port: number | null = null;
private up = false; private up = false;
private serverStarting: Promise<void> | null = null; private serverStarting: Promise<void> | null = null;
private sseRunning = false;
/** opencode session id → demux state. Maintained by ensureSession; read by the SSE loop. */ /** opencode session id → demux state. Maintained by ensureSession; read by the SSE loop. */
private readonly byOpencodeId = new Map<string, SessionState>(); private readonly byOpencodeId = new Map<string, SessionState>();
@@ -150,37 +154,58 @@ export class OpenCodeServerBackend implements AgentBackend {
// ─── SSE read loop + demux + translate (1.3) + dedup (1.4) ─────────────────── // ─── SSE read loop + demux + translate (1.3) + dedup (1.4) ───────────────────
/** Per-directory SSE subscription. opencode scopes events by directory (defaults /** Per-session SSE subscription, scoped to the session's worktree directory.
* to process.cwd if omitted) — so we must subscribe with the same directory used * opencode scopes events by the `directory` query param (defaults to the
* to create the session. Called from ensureSession; reconnects while up. */ * server's cwd if omitted), so two sessions in different worktrees each get
private startEventLoop(directory: string): void { * their own dir-scoped stream and never drop each other's events. Idempotent:
if (this.sseRunning) return; * a no-op if this session's loop is already running. Started from ensureSession
this.sseRunning = true; * (and defensively from prompt) once worktreePath is known. */
this.sseDirectory = directory; private startSessionEventLoop(state: SessionState): void {
void this.runEventLoop(directory); if (state.sseAbort) return; // already running
const abort = new AbortController();
state.sseAbort = abort;
void this.runSessionEventLoop(state, abort).finally(() => {
// Only clear if this controller is still the live one (a later restart may
// have already installed a new one).
if (state.sseAbort === abort) state.sseAbort = null;
});
} }
private sseDirectory: string | null = null; private async runSessionEventLoop(state: SessionState, abort: AbortController): Promise<void> {
const signal = abort.signal;
private async runEventLoop(directory: string): Promise<void> { while (this.up && this.client && !signal.aborted) {
while (this.up && this.client) {
try { try {
const sub = await this.client.event.subscribe({ directory }); // Re-read worktreePath each (re)subscribe so a directory refresh is picked
// up on reconnect. Passing `signal` lets close/dispose tear down a stream
// that's parked in `for await` between events.
const sub = await this.client.event.subscribe(
{ directory: state.worktreePath },
{ signal },
);
for await (const ev of sub.stream) { for await (const ev of sub.stream) {
if (signal.aborted) break;
// Dir-scoped streams should only carry this session's events, but two
// sessions sharing a worktree (possible post-P1.5-b) each receive BOTH
// sessions' events — so drop anything that isn't ours, else the other
// session's deltas get processed twice (once per loop).
const sid = eventSessionId(ev);
if (sid != null && sid !== state.agentSessionId) continue;
this.dispatchEvent(ev); this.dispatchEvent(ev);
} }
if (this.up) { if (this.up && !signal.aborted) {
await this.reconcileInFlight(); await this.reconcile(state); // recover an idle/error lost during the gap
await sleep(SSE_RECONNECT_DELAY_MS); await sleep(SSE_RECONNECT_DELAY_MS);
} }
} catch (err) { } catch (err) {
if (!this.up) break; if (!this.up || signal.aborted) break;
this.log.warn({ err: errMsg(err) }, 'opencode-server: event loop error; reconnecting'); this.log.warn(
await this.reconcileInFlight(); { err: errMsg(err), agentSessionId: state.agentSessionId },
'opencode-server: session event loop error; reconnecting',
);
await this.reconcile(state);
await sleep(SSE_RECONNECT_DELAY_MS); await sleep(SSE_RECONNECT_DELAY_MS);
} }
} }
this.sseRunning = false;
} }
/** Demux one event to the owning session's active turn. Unknown/between-turns → drop. */ /** Demux one event to the owning session's active turn. Unknown/between-turns → drop. */
@@ -354,13 +379,6 @@ export class OpenCodeServerBackend implements AgentBackend {
} }
} }
/** Reconcile every in-flight turn against the server (called after an SSE drop). */
private async reconcileInFlight(): Promise<void> {
const states = [...this.byOpencodeId.values()].filter((s) => s.activeTurn);
if (states.length === 0) return;
await Promise.allSettled(states.map((s) => this.reconcile(s)));
}
/** /**
* Ask the server whether this session's turn already finished — recovers a * Ask the server whether this session's turn already finished — recovers a
* session.idle/error lost during an SSE gap. Returns true if it settled the turn. * session.idle/error lost during an SSE gap. Returns true if it settled the turn.
@@ -451,35 +469,14 @@ export class OpenCodeServerBackend implements AgentBackend {
// Both branches above guarantee agentSessionId is non-null. // Both branches above guarantee agentSessionId is non-null.
const ocSessionId = agentSessionId!; const ocSessionId = agentSessionId!;
// Start (or re-start) the SSE event loop scoped to this session's directory.
// opencode scopes events by the `directory` query param; without it events
// default to the server's CWD which doesn't match our worktree paths.
//
// KNOWN Phase 1 LIMITATION: one SSE stream at a time, scoped to a single
// directory. Under 1.9 concurrency, if two opencode sessions use different
// worktree directories simultaneously, re-subscribing for the second drops
// the first session's events (the watchdog backstop prevents a full hang,
// but streamed content is lost). Phase 2 should move to per-session SSE
// subscriptions or a directory-agnostic event path.
if (!this.sseRunning || this.sseDirectory !== opts.worktreePath) {
if (this.sseRunning && this.sseDirectory && this.sseDirectory !== opts.worktreePath) {
this.log.warn(
{ prev: this.sseDirectory, next: opts.worktreePath },
'opencode-server: SSE directory changed — concurrent sessions will lose events from the previous directory',
);
}
this.sseRunning = false;
this.startEventLoop(opts.worktreePath);
}
// Register / refresh the demux entry the SSE loop keys on. Preserve an existing // Register / refresh the demux entry the SSE loop keys on. Preserve an existing
// entry (and any in-flight turn) — just refresh the routing fields. // entry (and any in-flight turn) — just refresh the routing fields.
const existing = this.byOpencodeId.get(ocSessionId); let state = this.byOpencodeId.get(ocSessionId);
if (existing) { if (state) {
existing.boocodeSessionId = sessionId; state.boocodeSessionId = sessionId;
existing.worktreePath = opts.worktreePath; state.worktreePath = opts.worktreePath;
} else { } else {
this.byOpencodeId.set(ocSessionId, { state = {
boocodeSessionId: sessionId, boocodeSessionId: sessionId,
agentSessionId: ocSessionId, agentSessionId: ocSessionId,
worktreePath: opts.worktreePath, worktreePath: opts.worktreePath,
@@ -487,9 +484,16 @@ export class OpenCodeServerBackend implements AgentBackend {
partTypeById: new Map(), partTypeById: new Map(),
activeTurn: null, activeTurn: null,
watchdog: null, watchdog: null,
}); sseAbort: null,
};
this.byOpencodeId.set(ocSessionId, state);
} }
// Start this session's own SSE loop, scoped to its worktree directory. Both
// fresh-create and resume reach here; idempotent, so a re-ensure (e.g. a
// second turn) won't spawn a duplicate loop.
this.startSessionEventLoop(state);
return { return {
sessionId, sessionId,
agent: opts.agent, agent: opts.agent,
@@ -516,12 +520,17 @@ export class OpenCodeServerBackend implements AgentBackend {
partTypeById: new Map(), partTypeById: new Map(),
activeTurn: null, activeTurn: null,
watchdog: null, watchdog: null,
sseAbort: null,
}; };
this.byOpencodeId.set(oc, state); this.byOpencodeId.set(oc, state);
} }
const session = state; const session = state;
// Authoritative per-turn directory for SDK routing + reconcile. // Authoritative per-turn directory for SDK routing + reconcile.
session.worktreePath = ctx.worktreePath; session.worktreePath = ctx.worktreePath;
// Defensive: ensureSession normally starts the loop, but if prompt is reached
// with a freshly-created state (no loop yet), start it so the turn streams.
// Idempotent when ensureSession already started one.
this.startSessionEventLoop(session);
const client = this.client; const client = this.client;
return await new Promise<TurnResult>((resolve) => { return await new Promise<TurnResult>((resolve) => {
@@ -577,7 +586,11 @@ export class OpenCodeServerBackend implements AgentBackend {
// ─── teardown ──────────────────────────────────────────────────────────────── // ─── teardown ────────────────────────────────────────────────────────────────
async closeSession(handle: AgentSessionHandle): Promise<void> { async closeSession(handle: AgentSessionHandle): Promise<void> {
if (handle.agentSessionId) this.byOpencodeId.delete(handle.agentSessionId); if (handle.agentSessionId) {
// Stop this session's SSE loop before dropping its demux entry.
this.byOpencodeId.get(handle.agentSessionId)?.sseAbort?.abort();
this.byOpencodeId.delete(handle.agentSessionId);
}
await this.sql` await this.sql`
UPDATE agent_sessions SET status = 'closed' UPDATE agent_sessions SET status = 'closed'
WHERE session_id = ${handle.sessionId} AND agent = ${handle.agent} WHERE session_id = ${handle.sessionId} AND agent = ${handle.agent}
@@ -586,6 +599,8 @@ export class OpenCodeServerBackend implements AgentBackend {
async dispose(): Promise<void> { async dispose(): Promise<void> {
this.up = false; this.up = false;
// Abort every per-session SSE loop so none survive the teardown.
for (const st of this.byOpencodeId.values()) st.sseAbort?.abort();
const child = this.child; const child = this.child;
this.child = null; this.child = null;
this.client = null; this.client = null;
@@ -602,6 +617,20 @@ export class OpenCodeServerBackend implements AgentBackend {
// ─── helpers ────────────────────────────────────────────────────────────────── // ─── helpers ──────────────────────────────────────────────────────────────────
/** Extract the opencode sessionID an event belongs to, across event shapes.
* Most carry `properties.sessionID`; `message.part.updated` nests it under
* `properties.part.sessionID`. Returns null when the event has no session
* (the per-session loop then leaves it to dispatchEvent, which drops it). */
function eventSessionId(ev: Event): string | null {
const props = (ev as { properties?: unknown }).properties;
if (!props || typeof props !== 'object') return null;
if (ev.type === 'message.part.updated') {
const part = (props as { part?: { sessionID?: string } }).part;
return part?.sessionID ?? null;
}
return (props as { sessionID?: string }).sessionID ?? null;
}
/** BooCoder model string "provider/model" → opencode's structured {providerID, modelID}. */ /** BooCoder model string "provider/model" → opencode's structured {providerID, modelID}. */
function parseModel(model: string | undefined): { providerID: string; modelID: string } | undefined { function parseModel(model: string | undefined): { providerID: string; modelID: string } | undefined {
if (!model || !model.trim()) return undefined; if (!model || !model.trim()) return undefined;