refactor: codebase audit cleanup — dead code, dedup, module splits
Multi-agent audit + aggressive cleanup across server/web/coder/booterm, delivered behind a DEFER discipline so none of the in-flight files were touched. Removes dead code/deps/columns, dedups server + coder helpers, and splits the oversized modules (tools.ts, opencode-server.ts, sentinel-summaries, turn.ts, TerminalPane.tsx) behind stable contracts. Adds 78 parity/unit tests (server 587, coder 323); fixes two latent bugs (ChatPane queue keys, FileViewerOverlay blank-line parity). Intended tag: v2.7.12-audit-cleanup. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
181
apps/coder/src/services/backends/opencode-sse.ts
Normal file
181
apps/coder/src/services/backends/opencode-sse.ts
Normal file
@@ -0,0 +1,181 @@
|
||||
/**
|
||||
* Per-session SSE subscribe loop + reconnect/backoff + eventSessionId demux.
|
||||
*
|
||||
* Extracted (v2.7 audit reshape) from `OpenCodeServerBackend.startSessionEventLoop`
|
||||
* / `runSessionEventLoop`. opencode scopes events by the `directory` query param, so
|
||||
* each session runs its own dir-scoped stream and never drops a sibling's events.
|
||||
*
|
||||
* The loop is intentionally thin: it owns subscribe + the demux filter + reconnect
|
||||
* timing only. Translating an event into turn side effects (watchdog, usage,
|
||||
* settle) stays on the backend via the injected `dispatchEvent` / `reconcile`
|
||||
* callbacks — `opencode-sse` knows nothing about turns or the DB.
|
||||
*
|
||||
* v2.7 concurrency hardening: the throw-driven reconnect path now backs off
|
||||
* exponentially and trips a circuit-breaker (`onReconnectGiveUp`) after a bounded
|
||||
* number of consecutive failures, instead of looping forever at a flat 1s. The
|
||||
* HAPPY PATH is unchanged — a clean stream end (server still up) reconnects after
|
||||
* `baseMs` (1s, as before) and resets the failure counter, so a long-lived session
|
||||
* that re-subscribes normally never backs off.
|
||||
*/
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { Event, OpencodeClient } from '@opencode-ai/sdk/v2/client';
|
||||
import type { AgentEvent } from '../agent-backend.js';
|
||||
import type { TurnResult } from '../agent-backend.js';
|
||||
import { eventSessionId, errMsg } from './opencode-event-map.js';
|
||||
|
||||
export const SSE_RECONNECT_DELAY_MS = 1_000;
|
||||
|
||||
/** One in-flight turn's emitter + completion settler. */
|
||||
export interface TurnState {
|
||||
onEvent: (e: AgentEvent) => void;
|
||||
settle: (r: TurnResult) => void;
|
||||
}
|
||||
|
||||
/** Per-(opencode session) demux state. dedup sets scoped here, cleared per turn. */
|
||||
export interface SessionState {
|
||||
boocodeSessionId: string;
|
||||
agentSessionId: string;
|
||||
/** Worktree directory for SDK `directory` routing; refreshed each turn from ctx. */
|
||||
worktreePath: string;
|
||||
/** dedup gate: `${type}:${id}` added on delta, deleted-and-tested on updated. Cleared at turn end. */
|
||||
streamedPartKeys: Set<string>;
|
||||
/** partID → 'text' | 'reasoning', so a delta with a non-'reasoning' field is still classed right. Cleared at turn end. */
|
||||
partTypeById: Map<string, string>;
|
||||
activeTurn: TurnState | null;
|
||||
/** Inactivity backstop timer for the active turn; null when no turn in flight. */
|
||||
watchdog: ReturnType<typeof setTimeout> | null;
|
||||
/** Per-session SSE subscription handle. Non-null while the loop is running;
|
||||
* aborting it tears down the underlying fetch and exits the loop. */
|
||||
sseAbort: AbortController | null;
|
||||
/** F.1 post-abort orphan-terminal guard: swallow the one session.idle/error
|
||||
* opencode emits for an aborted turn so it can't settle the next turn. */
|
||||
swallowNextTerminal: boolean;
|
||||
}
|
||||
|
||||
// ─── reconnect backoff (pure) ────────────────────────────────────────────────
|
||||
|
||||
export interface ReconnectPolicy {
|
||||
/** First retry delay (and the steady-state clean-reconnect delay). */
|
||||
baseMs: number;
|
||||
/** Cap on the exponential delay. */
|
||||
maxMs: number;
|
||||
/** Consecutive failures tolerated before the breaker trips (give up). */
|
||||
maxAttempts: number;
|
||||
}
|
||||
|
||||
export const DEFAULT_RECONNECT_POLICY: ReconnectPolicy = {
|
||||
baseMs: SSE_RECONNECT_DELAY_MS,
|
||||
maxMs: 30_000,
|
||||
maxAttempts: 6,
|
||||
};
|
||||
|
||||
export type ReconnectDecision =
|
||||
| { action: 'reconnect'; delayMs: number }
|
||||
| { action: 'give-up' };
|
||||
|
||||
/**
|
||||
* Pure backoff decision after `failures` consecutive throwing reconnect attempts
|
||||
* (1-based: the first failure passes `failures=1`). Returns an exponentially
|
||||
* growing delay capped at `maxMs`, or `give-up` once the count exceeds
|
||||
* `maxAttempts`. `failures=1` yields `baseMs`, so the very first retry matches the
|
||||
* pre-hardening flat delay (happy-path-preserving).
|
||||
*/
|
||||
export function reconnectDecision(
|
||||
failures: number,
|
||||
policy: ReconnectPolicy = DEFAULT_RECONNECT_POLICY,
|
||||
): ReconnectDecision {
|
||||
if (failures > policy.maxAttempts) return { action: 'give-up' };
|
||||
const exp = policy.baseMs * 2 ** (failures - 1);
|
||||
return { action: 'reconnect', delayMs: Math.min(policy.maxMs, exp) };
|
||||
}
|
||||
|
||||
// ─── the loop ────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface SseLoopDeps {
|
||||
/** Live iff the server is up (read each iteration so a crash stops the loop). */
|
||||
isUp: () => boolean;
|
||||
/** The current opencode client (null between server restarts). */
|
||||
getClient: () => OpencodeClient | null;
|
||||
/** Route one demuxed event to its turn (backend side effects live here). */
|
||||
dispatchEvent: (ev: Event) => void;
|
||||
/** Recover an idle/error lost during an SSE gap. Returns true if it settled. */
|
||||
reconcile: (state: SessionState) => Promise<boolean>;
|
||||
/** Circuit-breaker: called once the backoff gives up; fail the active turn. */
|
||||
onReconnectGiveUp: (state: SessionState) => Promise<void> | void;
|
||||
log: FastifyBaseLogger;
|
||||
/** Injectable for tests; defaults to a real timer sleep. */
|
||||
sleep?: (ms: number) => Promise<void>;
|
||||
policy?: ReconnectPolicy;
|
||||
}
|
||||
|
||||
function defaultSleep(ms: number): Promise<void> {
|
||||
return new Promise((r) => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
/** Per-session SSE subscription, scoped to the session's worktree directory.
|
||||
* Idempotent: a no-op if this session's loop is already running. */
|
||||
export function startSessionEventLoop(state: SessionState, deps: SseLoopDeps): void {
|
||||
if (state.sseAbort) return; // already running
|
||||
const abort = new AbortController();
|
||||
state.sseAbort = abort;
|
||||
void runSessionEventLoop(state, abort, deps).finally(() => {
|
||||
// Only clear if this controller is still the live one (a later restart may
|
||||
// have already installed a new one).
|
||||
if (state.sseAbort === abort) state.sseAbort = null;
|
||||
});
|
||||
}
|
||||
|
||||
export async function runSessionEventLoop(
|
||||
state: SessionState,
|
||||
abort: AbortController,
|
||||
deps: SseLoopDeps,
|
||||
): Promise<void> {
|
||||
const signal = abort.signal;
|
||||
const sleep = deps.sleep ?? defaultSleep;
|
||||
const policy = deps.policy ?? DEFAULT_RECONNECT_POLICY;
|
||||
let failures = 0;
|
||||
while (deps.isUp() && deps.getClient() && !signal.aborted) {
|
||||
const client = deps.getClient()!;
|
||||
try {
|
||||
// Re-read worktreePath each (re)subscribe so a directory refresh is picked
|
||||
// up on reconnect. Passing `signal` lets close/dispose tear down a stream
|
||||
// that's parked in `for await` between events.
|
||||
const sub = await client.event.subscribe({ directory: state.worktreePath }, { signal });
|
||||
for await (const ev of sub.stream) {
|
||||
if (signal.aborted) break;
|
||||
// Dir-scoped streams should only carry this session's events, but two
|
||||
// sessions sharing a worktree (possible post-P1.5-b) each receive BOTH
|
||||
// sessions' events — so drop anything that isn't ours, else the other
|
||||
// session's deltas get processed twice (once per loop).
|
||||
const sid = eventSessionId(ev);
|
||||
if (sid != null && sid !== state.agentSessionId) continue;
|
||||
deps.dispatchEvent(ev);
|
||||
}
|
||||
// Clean stream end — a healthy reconnect, NOT a failure: recover any lost
|
||||
// terminal then re-subscribe at the base delay (pre-hardening behavior).
|
||||
failures = 0;
|
||||
if (deps.isUp() && !signal.aborted) {
|
||||
await deps.reconcile(state); // recover an idle/error lost during the gap
|
||||
await sleep(policy.baseMs);
|
||||
}
|
||||
} catch (err) {
|
||||
if (!deps.isUp() || signal.aborted) break;
|
||||
failures += 1;
|
||||
const decision = reconnectDecision(failures, policy);
|
||||
deps.log.warn(
|
||||
{ err: errMsg(err), agentSessionId: state.agentSessionId, failures, action: decision.action },
|
||||
'opencode-server: session event loop error; reconnecting',
|
||||
);
|
||||
await deps.reconcile(state);
|
||||
if (decision.action === 'give-up') {
|
||||
deps.log.warn(
|
||||
{ agentSessionId: state.agentSessionId, failures },
|
||||
'opencode-server: SSE reconnect gave up (circuit breaker) — failing active turn',
|
||||
);
|
||||
await deps.onReconnectGiveUp(state);
|
||||
break;
|
||||
}
|
||||
await sleep(decision.delayMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user