refactor: codebase audit cleanup — dead code, dedup, module splits

Multi-agent audit + aggressive cleanup across server/web/coder/booterm,
delivered behind a DEFER discipline so none of the in-flight files were
touched. Removes dead code/deps/columns, dedups server + coder helpers,
and splits the oversized modules (tools.ts, opencode-server.ts,
sentinel-summaries, turn.ts, TerminalPane.tsx) behind stable contracts.
Adds 78 parity/unit tests (server 587, coder 323); fixes two latent bugs
(ChatPane queue keys, FileViewerOverlay blank-line parity).

Intended tag: v2.7.12-audit-cleanup.
This commit is contained in:
2026-06-02 21:10:06 +00:00
parent 6560398285
commit abe1a311d0
143 changed files with 6729 additions and 6087 deletions

View File

@@ -0,0 +1,173 @@
import { describe, it, expect, vi } from 'vitest';
import type { Event, OpencodeClient } from '@opencode-ai/sdk/v2/client';
import {
reconnectDecision,
runSessionEventLoop,
DEFAULT_RECONNECT_POLICY,
type SessionState,
type SseLoopDeps,
} from '../opencode-sse.js';
import { shouldStartServer } from '../opencode-server-process.js';
/**
* v2.7 concurrency hardening (Phase 7): the pure decision cores for SSE reconnect
* backoff + the ensureServer double-spawn guard, plus a deterministic exercise of
* the loop's breaker (injected sleep, fake client). Happy path is asserted to be
* unchanged (clean stream end → reset → base-delay reconnect).
*/
function freshState(): SessionState {
return {
boocodeSessionId: 'boo1',
agentSessionId: 'oc1',
worktreePath: '/wt',
streamedPartKeys: new Set(),
partTypeById: new Map(),
activeTurn: { onEvent: () => {}, settle: () => {} },
watchdog: null,
sseAbort: null,
swallowNextTerminal: false,
};
}
const silentLog = {
warn: () => {},
info: () => {},
error: () => {},
debug: () => {},
} as unknown as SseLoopDeps['log'];
describe('reconnectDecision (pure backoff + breaker)', () => {
it('first failure uses the base delay (matches pre-hardening flat delay)', () => {
expect(reconnectDecision(1)).toEqual({ action: 'reconnect', delayMs: DEFAULT_RECONNECT_POLICY.baseMs });
});
it('grows exponentially and caps at maxMs', () => {
const policy = { baseMs: 1000, maxMs: 30_000, maxAttempts: 10 };
expect(reconnectDecision(2, policy)).toEqual({ action: 'reconnect', delayMs: 2000 });
expect(reconnectDecision(3, policy)).toEqual({ action: 'reconnect', delayMs: 4000 });
expect(reconnectDecision(6, policy)).toEqual({ action: 'reconnect', delayMs: 30_000 }); // 32000 capped
expect(reconnectDecision(9, policy)).toEqual({ action: 'reconnect', delayMs: 30_000 });
});
it('gives up once failures exceed maxAttempts', () => {
const policy = { baseMs: 1, maxMs: 8, maxAttempts: 3 };
expect(reconnectDecision(3, policy).action).toBe('reconnect');
expect(reconnectDecision(4, policy)).toEqual({ action: 'give-up' });
});
});
describe('shouldStartServer (double-spawn guard)', () => {
it('does not start when the server is live', () => {
expect(shouldStartServer({ up: true, hasClient: true, serverStarting: true, childDead: false, startInFlight: false })).toBe(false);
});
it('starts on a fresh process (no start in flight)', () => {
expect(shouldStartServer({ up: false, hasClient: false, serverStarting: false, childDead: false, startInFlight: false })).toBe(true);
});
it('re-spawns after a crash once the prior start finished', () => {
expect(shouldStartServer({ up: false, hasClient: false, serverStarting: true, childDead: true, startInFlight: false })).toBe(true);
});
it('does NOT double-spawn while a start is already in flight (the race fix)', () => {
expect(shouldStartServer({ up: false, hasClient: false, serverStarting: true, childDead: true, startInFlight: true })).toBe(false);
});
it('does NOT double-spawn when a crash nulled serverStarting mid-start', () => {
// The narrow window: a crash during the in-flight start (await freePort) nulls
// serverStarting while startInFlight is still true. The startInFlight guard must
// win over the !serverStarting branch, else a second server spawns on a new port.
expect(shouldStartServer({ up: false, hasClient: false, serverStarting: false, childDead: true, startInFlight: true })).toBe(false);
});
it('waits (no spawn) when a cached start exists and the child is still alive', () => {
expect(shouldStartServer({ up: false, hasClient: false, serverStarting: true, childDead: false, startInFlight: false })).toBe(false);
});
});
describe('runSessionEventLoop — happy path (unchanged)', () => {
it('dispatches streamed events, reconciles on clean end, reconnects at base delay', async () => {
const state = freshState();
const abort = new AbortController();
const events = [
{ type: 'session.next.text.delta', properties: { sessionID: 'oc1', delta: 'hi' } },
{ type: 'session.idle', properties: { sessionID: 'oc1' } },
] as unknown as Event[];
const client = {
event: {
subscribe: vi.fn(async () => ({
stream: (async function* () {
for (const ev of events) yield ev;
})(),
})),
},
} as unknown as OpencodeClient;
const dispatched: Event[] = [];
const sleeps: number[] = [];
let reconciles = 0;
const deps: SseLoopDeps = {
isUp: () => true,
getClient: () => client,
dispatchEvent: (ev) => dispatched.push(ev),
reconcile: async () => {
reconciles += 1;
abort.abort(); // stop the loop after the first clean cycle
return false;
},
onReconnectGiveUp: () => {
throw new Error('should not give up on the happy path');
},
log: silentLog,
sleep: async (ms) => {
sleeps.push(ms);
},
};
await runSessionEventLoop(state, abort, deps);
expect(dispatched).toHaveLength(2);
expect(reconciles).toBe(1);
expect(sleeps).toEqual([DEFAULT_RECONNECT_POLICY.baseMs]); // base delay, not backed off
});
});
describe('runSessionEventLoop — circuit breaker', () => {
it('backs off on repeated throws then gives up + fails the turn', async () => {
const state = freshState();
const abort = new AbortController();
const policy = { baseMs: 1, maxMs: 8, maxAttempts: 3 };
const subscribe = vi.fn(async () => {
throw new Error('connection refused');
});
const client = { event: { subscribe } } as unknown as OpencodeClient;
const sleeps: number[] = [];
const gaveUp = vi.fn();
const deps: SseLoopDeps = {
isUp: () => true,
getClient: () => client,
dispatchEvent: () => {},
reconcile: async () => false,
onReconnectGiveUp: gaveUp,
log: silentLog,
sleep: async (ms) => {
sleeps.push(ms);
},
policy,
};
await runSessionEventLoop(state, abort, deps);
// 3 backoff sleeps (1, 2, 4), then the 4th failure trips the breaker.
expect(sleeps).toEqual([1, 2, 4]);
expect(subscribe).toHaveBeenCalledTimes(4);
expect(gaveUp).toHaveBeenCalledTimes(1);
expect(gaveUp).toHaveBeenCalledWith(state);
});
});

View File

@@ -0,0 +1,226 @@
import { describe, it, expect } from 'vitest';
import type { Event, Part } from '@opencode-ai/sdk/v2/client';
import {
stripDcpTags,
eventSessionId,
resolvePartDedupeKey,
mapToolStatus,
toolPartToSnapshot,
toolCalledSnapshot,
toolSuccessSnapshot,
toolFailedSnapshot,
classifyPartDelta,
classifyUpdatedPart,
errToString,
errMsg,
type DedupState,
} from '../opencode-event-map.js';
/**
* Pure opencode Event → AgentEvent translation + dedup gate (v2.7 audit reshape).
* Mirrors the original `dispatchEvent` / `handleUpdatedPart` arms verbatim — no
* I/O, so it's unit-testable. The slimmed backend keeps the routing + side effects.
*/
function freshDedup(): DedupState {
return { streamedPartKeys: new Set(), partTypeById: new Map() };
}
describe('stripDcpTags', () => {
it('removes a complete dcp tag', () => {
expect(stripDcpTags('hi <dcp-message-id>m1</dcp-message-id> there')).toBe('hi there');
});
it('leaves untagged text untouched', () => {
expect(stripDcpTags('plain text <div>')).toBe('plain text <div>');
});
});
describe('eventSessionId', () => {
it('reads properties.sessionID for a normal event', () => {
const ev = { type: 'session.idle', properties: { sessionID: 's1' } } as unknown as Event;
expect(eventSessionId(ev)).toBe('s1');
});
it('reads properties.part.sessionID for message.part.updated', () => {
const ev = {
type: 'message.part.updated',
properties: { part: { sessionID: 's2' } },
} as unknown as Event;
expect(eventSessionId(ev)).toBe('s2');
});
it('returns null when there is no session', () => {
const ev = { type: 'server.connected', properties: {} } as unknown as Event;
expect(eventSessionId(ev)).toBeNull();
});
});
describe('resolvePartDedupeKey', () => {
it('prefers the part id', () => {
expect(resolvePartDedupeKey({ id: 'p1', messageID: 'm1' }, 'text')).toBe('text:p1');
});
it('falls back to the message id', () => {
expect(resolvePartDedupeKey({ id: ' ', messageID: 'm1' }, 'reasoning')).toBe('reasoning:message:m1');
});
it('returns null when neither is present', () => {
expect(resolvePartDedupeKey({ id: '', messageID: '' }, 'text')).toBeNull();
});
});
describe('mapToolStatus', () => {
it('maps the opencode tool states to ACP statuses', () => {
expect(mapToolStatus('pending')).toBe('pending');
expect(mapToolStatus('running')).toBe('in_progress');
expect(mapToolStatus('completed')).toBe('completed');
expect(mapToolStatus('error')).toBe('failed');
expect(mapToolStatus(undefined)).toBeNull();
});
});
describe('session.next.tool.* snapshot builders', () => {
it('toolCalledSnapshot → in_progress with tool title + raw input', () => {
expect(toolCalledSnapshot({ callID: 'c1', tool: 'read_file', input: { path: 'a.ts' } })).toEqual({
toolCallId: 'c1',
title: 'read_file',
kind: null,
status: 'in_progress',
rawInput: { path: 'a.ts' },
rawOutput: undefined,
});
});
it('toolSuccessSnapshot → completed with joined text content', () => {
const snap = toolSuccessSnapshot({ callID: 'c1', content: [{ text: 'foo' }, { text: 'bar' }, { other: 1 }] });
expect(snap.status).toBe('completed');
expect(snap.title).toBe('c1');
expect(snap.rawOutput).toBe('foobar');
});
it('toolSuccessSnapshot → empty output when content is missing', () => {
expect(toolSuccessSnapshot({ callID: 'c1' }).rawOutput).toBe('');
});
it('toolFailedSnapshot → failed with stringified error', () => {
const snap = toolFailedSnapshot({ callID: 'c1', error: 'boom' });
expect(snap.status).toBe('failed');
expect(snap.title).toBe('c1');
expect(snap.rawOutput).toBe('boom');
});
});
describe('toolPartToSnapshot', () => {
it('extracts input/output/title/status from the tool state', () => {
const part = {
type: 'tool',
callID: 'c1',
tool: 'grep',
state: { status: 'completed', input: { q: 'x' }, output: 'result', title: 'Grep run' },
} as unknown as Parameters<typeof toolPartToSnapshot>[0];
expect(toolPartToSnapshot(part)).toEqual({
toolCallId: 'c1',
title: 'Grep run',
kind: null,
status: 'completed',
rawInput: { q: 'x' },
rawOutput: 'result',
});
});
it('falls back to the tool name and uses error as output', () => {
const part = {
type: 'tool',
callID: 'c2',
tool: 'edit',
state: { status: 'error', error: 'nope' },
} as unknown as Parameters<typeof toolPartToSnapshot>[0];
const snap = toolPartToSnapshot(part);
expect(snap.title).toBe('edit');
expect(snap.status).toBe('failed');
expect(snap.rawOutput).toBe('nope');
});
});
describe('classifyPartDelta (message.part.delta dedup recording)', () => {
it('records a reasoning key and emits a reasoning event', () => {
const st = freshDedup();
const e = classifyPartDelta({ partID: 'p1', field: 'reasoning', delta: 'thinking' }, st);
expect(e).toEqual({ type: 'reasoning', text: 'thinking' });
expect(st.streamedPartKeys.has('reasoning:p1')).toBe(true);
});
it('records a text key, strips dcp, and emits text', () => {
const st = freshDedup();
const e = classifyPartDelta({ partID: 'p2', field: 'text', delta: 'hi <dcp-message-id>m</dcp-message-id>' }, st);
expect(e).toEqual({ type: 'text', text: 'hi ' });
expect(st.streamedPartKeys.has('text:p2')).toBe(true);
});
it('still records the text key even when the cleaned delta is empty', () => {
const st = freshDedup();
const e = classifyPartDelta({ partID: 'p3', field: 'text', delta: '<dcp-message-id>m</dcp-message-id>' }, st);
expect(e).toBeNull();
expect(st.streamedPartKeys.has('text:p3')).toBe(true);
});
it('uses the recorded part type when the field is absent', () => {
const st = freshDedup();
st.partTypeById.set('p4', 'reasoning');
const e = classifyPartDelta({ partID: 'p4', delta: 'more' }, st);
expect(e).toEqual({ type: 'reasoning', text: 'more' });
});
it('returns null for an unknown field', () => {
expect(classifyPartDelta({ partID: 'p5', field: 'other', delta: 'x' }, freshDedup())).toBeNull();
});
});
describe('classifyUpdatedPart (message.part.updated dedup gate)', () => {
function textPart(over: Partial<Part> = {}): Part {
return {
type: 'text',
id: 'p1',
messageID: 'm1',
sessionID: 's1',
text: 'final text',
time: { start: 1, end: 2 },
...over,
} as unknown as Part;
}
it('drops a terminal part already streamed via deltas', () => {
const st = freshDedup();
st.streamedPartKeys.add('text:p1');
expect(classifyUpdatedPart(textPart(), st)).toBeNull();
// the key is consumed
expect(st.streamedPartKeys.has('text:p1')).toBe(false);
});
it('emits a finished (ended) text part not seen via deltas', () => {
const st = freshDedup();
expect(classifyUpdatedPart(textPart(), st)).toEqual({ type: 'text', text: 'final text' });
expect(st.partTypeById.get('p1')).toBe('text');
});
it('does not emit a part that has not ended yet', () => {
const st = freshDedup();
expect(classifyUpdatedPart(textPart({ time: { start: 1 } as never }), st)).toBeNull();
});
it('strips dcp tags from the finished text', () => {
const st = freshDedup();
const part = textPart({ text: 'a <dcp-message-id>m</dcp-message-id>b' });
expect(classifyUpdatedPart(part, st)).toEqual({ type: 'text', text: 'a b' });
});
it('maps a running tool part to tool_call', () => {
const st = freshDedup();
const part = { type: 'tool', callID: 'c1', tool: 'grep', state: { status: 'running' } } as unknown as Part;
const e = classifyUpdatedPart(part, st);
expect(e?.type).toBe('tool_call');
});
it('maps a completed tool part to tool_update', () => {
const st = freshDedup();
const part = { type: 'tool', callID: 'c1', tool: 'grep', state: { status: 'completed', output: 'x' } } as unknown as Part;
const e = classifyUpdatedPart(part, st);
expect(e?.type).toBe('tool_update');
});
});
describe('error formatters', () => {
it('errMsg unwraps Error.message', () => {
expect(errMsg(new Error('x'))).toBe('x');
expect(errMsg('plain')).toBe('plain');
});
it('errToString handles null/string/Error/object', () => {
expect(errToString(null)).toBe('unknown error');
expect(errToString('s')).toBe('s');
expect(errToString(new Error('e'))).toBe('e');
expect(errToString({ a: 1 })).toBe('{"a":1}');
});
});

View File

@@ -0,0 +1,203 @@
/**
* Pure opencode `Event` → normalized `AgentEvent` translation.
*
* Extracted (v2.7 audit reshape) from `OpenCodeServerBackend.dispatchEvent` /
* `handleUpdatedPart` and the file-local helpers. NO I/O, no timers, no DB, no
* `byOpencodeId` — every function here is a deterministic transform over its
* arguments (the dedup state is caller-owned and mutated in place, mirroring the
* `acp-event-map.ts` `priorSnapshots` pattern). This is the unit-testable core; the
* backend keeps the routing + side effects (watchdog, usage persistence, settle).
*
* Depends only on SDK TYPES + AcpToolSnapshot — safe to import anywhere.
*/
import type { Event, Part, ToolPart, ToolState } from '@opencode-ai/sdk/v2/client';
import type { ToolCallStatus } from '@agentclientprotocol/sdk';
import type { AcpToolSnapshot } from '../acp-tool-snapshot.js';
import type { AgentEvent } from '../agent-backend.js';
/** Per-(opencode session) dedup state the part-stream classifiers read + mutate. */
export interface DedupState {
/** dedup gate: `${type}:${id}` added on delta, deleted-and-tested on updated. */
streamedPartKeys: Set<string>;
/** partID → 'text' | 'reasoning', so a delta with a non-'reasoning' field is still classed right. */
partTypeById: Map<string, string>;
}
/** Strip opencode-dcp plugin tags that render as literal text in the UI. */
export function stripDcpTags(s: string): string {
return s.replace(/<dcp-message-id>[^<]*<\/dcp-message-id>/g, '');
}
/** Extract the opencode sessionID an event belongs to, across event shapes.
* Most carry `properties.sessionID`; `message.part.updated` nests it under
* `properties.part.sessionID`. Returns null when the event has no session
* (the per-session loop then leaves it to dispatchEvent, which drops it). */
export function eventSessionId(ev: Event): string | null {
const props = (ev as { properties?: unknown }).properties;
if (!props || typeof props !== 'object') return null;
if (ev.type === 'message.part.updated') {
const part = (props as { part?: { sessionID?: string } }).part;
return part?.sessionID ?? null;
}
return (props as { sessionID?: string }).sessionID ?? null;
}
/** Ported verbatim from Paseo opencode-agent.ts: id → message-id fallback → null. */
export function resolvePartDedupeKey(part: { id: string; messageID: string }, type: string): string | null {
if (part.id.trim().length > 0) return `${type}:${part.id}`;
if (part.messageID.trim().length > 0) return `${type}:message:${part.messageID}`;
return null;
}
export function mapToolStatus(s: ToolState['status'] | undefined): ToolCallStatus | null {
switch (s) {
case 'pending':
return 'pending';
case 'running':
return 'in_progress';
case 'completed':
return 'completed';
case 'error':
return 'failed';
default:
return null;
}
}
/** opencode ToolPart → ACP-shaped snapshot (reuses the existing persist/render path). */
export function toolPartToSnapshot(part: ToolPart): AcpToolSnapshot {
const state = part.state;
let rawInput: unknown;
let rawOutput: unknown;
let title: string | undefined;
if (state) {
if ('input' in state) rawInput = (state as { input?: unknown }).input;
if ('output' in state) rawOutput = (state as { output?: unknown }).output;
else if ('error' in state) rawOutput = (state as { error?: unknown }).error;
if ('title' in state) title = (state as { title?: string }).title;
}
return {
toolCallId: part.callID,
title: title ?? part.tool,
kind: null,
status: mapToolStatus(state?.status),
rawInput,
rawOutput,
};
}
// ─── session.next.tool.* snapshot builders ───────────────────────────────────
/** `session.next.tool.called` → an in-progress tool_call snapshot. */
export function toolCalledSnapshot(p: { callID: string; tool: string; input: unknown }): AcpToolSnapshot {
return {
toolCallId: p.callID,
title: p.tool,
kind: null,
status: 'in_progress',
rawInput: p.input,
rawOutput: undefined,
};
}
/** `session.next.tool.success` → a completed tool snapshot (text content joined). */
export function toolSuccessSnapshot(p: { callID: string; content?: ReadonlyArray<unknown> | null }): AcpToolSnapshot {
const output = p.content?.map((c) => (c && typeof c === 'object' && 'text' in c ? (c as { text: string }).text : '')).join('') ?? '';
return {
toolCallId: p.callID,
title: p.callID,
kind: null,
status: 'completed',
rawInput: undefined,
rawOutput: output,
};
}
/** `session.next.tool.failed` → a failed tool snapshot (error stringified). */
export function toolFailedSnapshot(p: { callID: string; error: unknown }): AcpToolSnapshot {
return {
toolCallId: p.callID,
title: p.callID,
kind: null,
status: 'failed',
rawInput: undefined,
rawOutput: errToString(p.error),
};
}
// ─── message.part.* dedup gate ────────────────────────────────────────────────
/**
* `message.part.delta`: mark the part as streamed (so a later `message.part.updated`
* for the same part is deduped) and return the AgentEvent to emit, or null when the
* field is neither reasoning nor text, or a text delta strips down to empty. Mutates
* `st.streamedPartKeys` exactly as the original inline arm did (the key is recorded
* for text even when the cleaned delta is empty).
*/
export function classifyPartDelta(
p: { partID: string; field?: string; delta: string },
st: DedupState,
): AgentEvent | null {
const isReasoning = p.field === 'reasoning' || st.partTypeById.get(p.partID) === 'reasoning';
if (isReasoning) {
st.streamedPartKeys.add(`reasoning:${p.partID}`);
return { type: 'reasoning', text: p.delta };
}
if (p.field === 'text') {
st.streamedPartKeys.add(`text:${p.partID}`);
const cleaned = stripDcpTags(p.delta);
return cleaned ? { type: 'text', text: cleaned } : null;
}
return null;
}
/**
* `message.part.updated` terminal part: the dedup gate for text/reasoning (drop a
* part already streamed via deltas; otherwise emit the finished text) plus the
* tool-part → tool_call/tool_update mapping. Returns null when nothing should be
* emitted. Mutates `st.partTypeById` / `st.streamedPartKeys` like the original.
*/
export function classifyUpdatedPart(part: Part, st: DedupState): AgentEvent | null {
if (part.type === 'text' || part.type === 'reasoning') {
st.partTypeById.set(part.id, part.type);
const key = resolvePartDedupeKey(part, part.type);
if (key && st.streamedPartKeys.delete(key)) return null; // already streamed via delta
const raw = part.text ?? '';
const text = part.type === 'text' ? stripDcpTags(raw) : raw;
if (text && part.time?.end != null) {
return { type: part.type, text };
}
return null;
}
if (part.type === 'tool') {
const snap = toolPartToSnapshot(part);
const status = part.state?.status;
// tool_call on start (pending/running), tool_update on terminal (completed/error).
// The current ACP path merges both into one frame; the contract keeps them
// distinct because opencode's SSE distinguishes start from result.
return status === 'completed' || status === 'error'
? { type: 'tool_update', toolCall: snap }
: { type: 'tool_call', toolCall: snap };
}
// NOTE: opencode's SSE payload union carries no available-commands event, so the
// AgentEvent 'commands' arm is intentionally never emitted here.
return null;
}
// ─── shared error formatters (pure) ───────────────────────────────────────────
export function errMsg(e: unknown): string {
return e instanceof Error ? e.message : String(e);
}
export function errToString(e: unknown): string {
if (e == null) return 'unknown error';
if (typeof e === 'string') return e;
if (e instanceof Error) return e.message;
try {
return JSON.stringify(e);
} catch {
return String(e);
}
}

View File

@@ -0,0 +1,325 @@
/**
* OpenCodeServerSupervisor — the opencode `serve` child + HTTP client + port +
* health-counter lifecycle, extracted (v2.7 audit reshape) from the backend
* god-class. Owns spawn / ready / crash / proactive-health restart / dispose and
* exposes `client` / `port` / `health()` / `tickHealth()` to the backend.
*
* Session-level recovery (failing in-flight turns, marking agent_sessions crashed,
* tearing down SSE loops) is NOT a process concern — it's delegated back to the
* backend through the injected `hooks.onServerDown` callback, keeping this module
* free of the demux map / SQL / turn state.
*
* v2.7 concurrency hardening: `ensureServer` is guarded against the crash-window
* double-spawn (two concurrent callers each re-spawning on different ports) via a
* synchronous `startInFlight` flag — see `shouldStartServer`.
*/
import { spawn, type ChildProcess } from 'node:child_process';
import { createOpencodeClient, type OpencodeClient } from '@opencode-ai/sdk/v2/client';
import type { FastifyBaseLogger } from 'fastify';
import { decideRestart, DEFAULT_HEALTH_FAILURE_THRESHOLD } from './lifecycle-decisions.js';
import { reclaimPort, waitForPortRelease, freePort } from '../net/port-utils.js';
const READY_TIMEOUT_MS = 30_000;
/** Info handed to the backend when the server goes down (crash or forced restart). */
export interface ServerDownInfo {
code: number | null;
signal: NodeJS.Signals | null;
port: number;
}
export interface SupervisorHooks {
/** True iff ANY pooled session has an in-flight turn (defers a busy restart). */
isBusy: () => boolean;
/** Session-level recovery: fail in-flight turns, mark crashed, drop demux state. */
onServerDown: (info: ServerDownInfo) => void;
}
export interface OpenCodeServerSupervisorDeps {
/** Absolute path to the opencode binary (resolved from available_agents). */
opencodeBinary: string;
log: FastifyBaseLogger;
hooks: SupervisorHooks;
}
/**
* Pure decision for `ensureServer`: should we (re)spawn the server right now?
*
* - A live, ready server (`up && client`) → no.
* - A start already in flight (`startInFlight`) → no, NEVER double-spawn — join the
* running start instead. This is checked BEFORE `serverStarting` because the crash
* handler can null `serverStarting` mid-start (a crash during `await freePort()`),
* and without this guard the `!serverStarting` branch would spawn a second server
* on a different port while the first is still coming up.
* - No start cached/running → yes (fresh start or post-crash re-spawn, since the
* crash handler nulls `serverStarting`).
* - A cached start that already finished, but the child has since died and the crash
* handler hasn't reset us yet → yes.
*/
export function shouldStartServer(s: {
up: boolean;
hasClient: boolean;
serverStarting: boolean;
childDead: boolean;
startInFlight: boolean;
}): boolean {
if (s.up && s.hasClient) return false;
if (s.startInFlight) return false;
if (!s.serverStarting) return true;
if (!s.up && s.childDead) return true;
return false;
}
export class OpenCodeServerSupervisor {
private readonly opencodeBinary: string;
private readonly log: FastifyBaseLogger;
private readonly hooks: SupervisorHooks;
private childProc: ChildProcess | null = null;
private opencodeClient: OpencodeClient | null = null;
private serverPort: number | null = null;
private up = false;
private serverStarting: Promise<void> | null = null;
/** True from the synchronous head of startServer() until it settles — the
* double-spawn guard reads it so a concurrent ensureServer joins instead of
* kicking a second spawn. */
private startInFlight = false;
// Phase 3 busy-aware health monitor (openchamber lift): consecutive failed
// probes + the start of an unhealthy-while-busy window feed `decideRestart`.
private consecutiveHealthFailures = 0;
private unhealthyBusySince = 0;
private restarting: Promise<void> | null = null;
constructor(deps: OpenCodeServerSupervisorDeps) {
this.opencodeBinary = deps.opencodeBinary;
this.log = deps.log;
this.hooks = deps.hooks;
}
/** The live opencode HTTP client, or null between (re)starts. */
get client(): OpencodeClient | null {
return this.opencodeClient;
}
/** The current server port, or null before the first start. */
get port(): number | null {
return this.serverPort;
}
/** §2: liveness for the health endpoint + dispatcher fallback decision. */
health(): 'up' | 'down' {
return this.up ? 'up' : 'down';
}
isUp(): boolean {
return this.up;
}
// ─── lifecycle (spawn once + client + ready; crash-restart) ──────────────────
/**
* Lazy: start the single server on first use; re-spawn after a crash. Idempotent
* within one live server — `serverStarting` caches the in-flight start, reset to
* null by the crash handler so the NEXT ensureServer re-spawns. A dead-but-not-
* yet-reaped child (exit handler raced) is also treated as needing a restart.
* Concurrent callers in a crash window are coalesced via `startInFlight`.
*/
ensureServer(): Promise<void> {
if (this.up && this.opencodeClient) return Promise.resolve();
const childDead =
this.childProc != null && (this.childProc.exitCode !== null || this.childProc.signalCode !== null);
if (
shouldStartServer({
up: this.up,
hasClient: this.opencodeClient != null,
serverStarting: this.serverStarting != null,
childDead,
startInFlight: this.startInFlight,
})
) {
this.serverStarting = this.startServer();
}
return this.serverStarting ?? Promise.resolve();
}
private async startServer(): Promise<void> {
// Set synchronously (before the first await) so a concurrent ensureServer sees
// the in-flight start and joins `serverStarting` instead of double-spawning.
this.startInFlight = true;
try {
const port = await freePort();
// Phase 1: run unsecured on loopback (opencode's documented default — serve.ts
// only WARNS when OPENCODE_SERVER_PASSWORD is unset). The real boundary is the
// 127.0.0.1 bind.
const child = spawn(this.opencodeBinary, ['serve', '--hostname', '127.0.0.1', '--port', String(port)], {
stdio: ['ignore', 'pipe', 'pipe'],
env: { ...process.env },
});
this.childProc = child;
this.serverPort = port;
// Child lifetime is the backend's (the pool's), NOT a request's. On unexpected
// exit we recover: settle in-flight turns, mark sessions crashed (the backend's
// onServerDown), reclaim the port, and reset state so the next ensureServer
// re-spawns.
child.on('exit', (code, signal) => {
// Only react to THIS child's exit (a restart may have swapped in a new one).
if (this.childProc !== child) return;
this.handleCrash(code, signal, port);
});
await waitForReady(child, READY_TIMEOUT_MS);
this.opencodeClient = createOpencodeClient({ baseUrl: `http://127.0.0.1:${port}` });
this.up = true;
this.log.info({ port }, 'opencode-server: ready');
} finally {
this.startInFlight = false;
}
}
/**
* Server down (crash-exit or forced restart): reset process/port state, delegate
* session-level recovery to the backend, and reclaim the port. Mirrors the
* original `handleServerCrash` ordering (up=false → session cleanup → client/
* serverStarting null → reclaimPort).
*/
private handleCrash(code: number | null, signal: NodeJS.Signals | null, port: number): void {
this.up = false;
this.hooks.onServerDown({ code, signal, port });
this.opencodeClient = null;
this.serverStarting = null; // force a re-spawn on the next ensureServer
// Reclaim the port so a re-spawn on a fixed/leaked port isn't blocked. Best
// effort; the next start uses a fresh ephemeral port anyway.
reclaimPort(port);
}
/**
* Phase 3 proactive health monitor (openchamber `runHealthCheckCycle` lift,
* busy-aware). Probes /global/health; on a sustained failure of a NON-busy server,
* force a restart so the next turn isn't blocked by a wedged process. Busy servers
* are deferred via the stale-grace in `decideRestart`. No-op when never started or
* a restart is already in flight.
*/
async tickHealth(now: number = Date.now()): Promise<void> {
if (!this.childProc || this.restarting) return;
const childExited = this.childProc.exitCode !== null || this.childProc.signalCode !== null;
// An exited child is recovered lazily by ensureServer; don't double-restart it.
if (childExited) return;
const healthy = await this.probeHealth();
if (healthy) {
this.consecutiveHealthFailures = 0;
this.unhealthyBusySince = 0;
return;
}
this.consecutiveHealthFailures += 1;
const busy = this.hooks.isBusy();
const decision = decideRestart({
processExited: false,
consecutiveFailures: this.consecutiveHealthFailures,
busy,
unhealthyBusySince: this.unhealthyBusySince,
now,
failureThreshold: DEFAULT_HEALTH_FAILURE_THRESHOLD,
});
// Stamp the start of an unhealthy-while-busy window so the stale-grace can fire.
if (busy && this.unhealthyBusySince === 0) this.unhealthyBusySince = now;
if (decision.action === 'restart') {
this.log.warn(
{ failures: this.consecutiveHealthFailures, busy, reason: decision.reason },
'opencode-server: health monitor forcing restart',
);
this.consecutiveHealthFailures = 0;
this.unhealthyBusySince = 0;
await this.restartServer();
}
}
private async probeHealth(): Promise<boolean> {
if (!this.opencodeClient) return false;
try {
const res = await this.opencodeClient.global.health();
return !res.error;
} catch {
return false;
}
}
/** Force-kill the current server + reclaim its port; the next ensureServer
* re-spawns (lazy). Mirrors handleCrash's state reset but is initiated by the
* health monitor rather than the OS. */
private async restartServer(): Promise<void> {
if (this.restarting) return this.restarting;
this.restarting = (async () => {
const child = this.childProc;
const port = this.serverPort;
this.up = false;
// Fail in-flight turns + mark sessions crashed via the same path as a crash.
if (child) {
this.handleCrash(null, null, port ?? 0);
if (!child.killed) child.kill('SIGTERM');
}
if (port) {
reclaimPort(port);
await waitForPortRelease(port, 3_000);
}
this.childProc = null;
})().finally(() => {
this.restarting = null;
});
return this.restarting;
}
/** Full teardown of the child + client + port state. */
async dispose(): Promise<void> {
this.up = false;
const child = this.childProc;
this.childProc = null;
this.opencodeClient = null;
if (child && !child.killed) {
child.kill('SIGTERM');
const t = setTimeout(() => {
if (!child.killed) child.kill('SIGKILL');
}, 5_000);
t.unref();
}
}
}
/** Resolve when the child prints the ready line; reject on timeout or early exit. */
function waitForReady(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve, reject) => {
let done = false;
let stderrBuf = '';
const finish = (err?: Error) => {
if (done) return;
done = true;
clearTimeout(timer);
child.stdout?.off('data', onOut);
child.stderr?.off('data', onErr);
child.off('exit', onExit);
if (err) reject(err);
else resolve();
};
const onOut = (buf: Buffer) => {
if (buf.toString().includes('opencode server listening on')) finish();
};
const onErr = (buf: Buffer) => {
stderrBuf += buf.toString();
};
const onExit = (code: number | null) =>
finish(new Error(`opencode serve exited before ready (code ${code}); stderr: ${stderrBuf.slice(-2000)}`));
const timer = setTimeout(
() => finish(new Error(`opencode serve not ready in ${timeoutMs}ms; stderr: ${stderrBuf.slice(-2000)}`)),
timeoutMs,
);
child.stdout?.on('data', onOut);
child.stderr?.on('data', onErr);
child.on('exit', onExit);
});
}

View File

@@ -1,91 +1,64 @@
/**
* v2.6 Phase 1 — OpenCodeServerBackend.
* v2.6 Phase 1 — OpenCodeServerBackend (slimmed, v2.7 audit reshape).
*
* Warm, multi-turn backend for the `opencode` agent. One `opencode serve` HTTP
* server per BooCoder process; one opencode session per BooCode session (resumed
* on switch-back); one SSE read loop PER session, each scoped to that session's
* worktree directory so sessions in different directories stream concurrently
* (P1.5-a — replaced the Phase-1 single-stream-last-directory model).
* worktree directory so sessions in different directories stream concurrently.
*
* This file is now just the `AgentBackend` SURFACE — ensureSession / prompt /
* accumulateUsage / closeSession + the per-session demux side effects (watchdog,
* reconcile, usage). It composes three extracted collaborators:
* - `OpenCodeServerSupervisor` (opencode-server-process.ts) — child/client/port/
* health lifecycle, spawn/crash/restart/dispose.
* - the per-session SSE loop (opencode-sse.ts) — subscribe + reconnect/backoff.
* - the pure event map (opencode-event-map.ts) — Event → AgentEvent translation,
* dedup gate, dcp-strip, tool-snapshot.
*
* Implements the Phase 0 `AgentBackend` interface. Emits transport-agnostic
* `AgentEvent`s the dispatcher (Phase 1.7, NOT wired in this batch) maps them
* to WS frames. No dispatcher/route references this file yet.
* `AgentEvent`s; the dispatcher maps them to WS frames.
*
* Spec: openspec/changes/v2-6-persistent-agent-sessions/design.md §2 / §2a.
* SDK shapes verified by direct read of @opencode-ai/sdk@1.15.12 dist .d.ts:
* - client methods take FLATTENED params (sessionID/directory/body all inline),
* not {path,query,body}. create→{directory}, promptAsync→{sessionID,directory,
* parts,model}, abort→{sessionID,directory}. model is {providerID,modelID}.
* - client.event() resolves to { stream: AsyncGenerator<GlobalEvent> }; the
* real event is chunk.payload (discriminate on chunk.payload.type).
* - promptAsync is fire-and-forget (204); the turn completes via a
* 'session.idle' event for that opencode session id.
*/
import { spawn, spawnSync, type ChildProcess } from 'node:child_process';
import { createHash } from 'node:crypto';
import { createServer, connect as netConnect } from 'node:net';
import type { FastifyBaseLogger } from 'fastify';
import {
createOpencodeClient,
type OpencodeClient,
type Event,
type Part,
type ToolPart,
type ToolState,
type AssistantMessage,
} from '@opencode-ai/sdk/v2/client';
import type { ToolCallStatus } from '@agentclientprotocol/sdk';
import type { Event, AssistantMessage } from '@opencode-ai/sdk/v2/client';
import type { Sql } from '../../db.js';
import type { AcpToolSnapshot } from '../acp-tool-snapshot.js';
import { armAbortGuard, noteTurnActivity, consumeTerminal } from './turn-guard.js';
import { stepEndedToUsage, type StepUsage } from './opencode-usage.js';
import { decideRestart, DEFAULT_HEALTH_FAILURE_THRESHOLD } from './lifecycle-decisions.js';
import { OpenCodeServerSupervisor, type ServerDownInfo } from './opencode-server-process.js';
import {
startSessionEventLoop,
type SessionState,
type TurnState,
type SseLoopDeps,
} from './opencode-sse.js';
import {
classifyPartDelta,
classifyUpdatedPart,
toolCalledSnapshot,
toolSuccessSnapshot,
toolFailedSnapshot,
stripDcpTags,
errMsg,
errToString,
} from './opencode-event-map.js';
import type {
AgentBackend,
AgentEvent,
AgentSessionHandle,
EnsureSessionOpts,
PromptCtx,
TurnResult,
} from '../agent-backend.js';
const READY_TIMEOUT_MS = 30_000;
const SSE_RECONNECT_DELAY_MS = 1_000;
/**
* No-activity backstop for an in-flight turn. opencode streams reasoning/text/tool
* deltas continuously while working, so "zero events for this long" means the turn
* is wedged or its terminal event (session.idle) was lost (see the reconnect race
* below). Generous so a legitimately slow turn never trips it.
* is wedged or its terminal event (session.idle) was lost. Generous so a
* legitimately slow turn never trips it.
*/
const TURN_INACTIVITY_MS = 180_000;
/** One in-flight turn's emitter + completion settler. */
interface TurnState {
onEvent: (e: AgentEvent) => void;
settle: (r: TurnResult) => void;
}
/** Per-(opencode session) demux state. dedup sets scoped here, cleared per turn. */
interface SessionState {
boocodeSessionId: string;
agentSessionId: string;
/** Worktree directory for SDK `directory` routing; refreshed each turn from ctx. */
worktreePath: string;
/** dedup gate: `${type}:${id}` added on delta, deleted-and-tested on updated. Cleared at turn end. */
streamedPartKeys: Set<string>;
/** partID → 'text' | 'reasoning', so a delta with a non-'reasoning' field is still classed right. Cleared at turn end. */
partTypeById: Map<string, string>;
activeTurn: TurnState | null;
/** Inactivity backstop timer for the active turn; null when no turn in flight. */
watchdog: ReturnType<typeof setTimeout> | null;
/** Per-session SSE subscription handle. Non-null while the loop is running;
* aborting it tears down the underlying fetch and exits the loop. */
sseAbort: AbortController | null;
/** F.1 post-abort orphan-terminal guard: swallow the one session.idle/error
* opencode emits for an aborted turn so it can't settle the next turn. */
swallowNextTerminal: boolean;
}
export interface OpenCodeServerBackendDeps {
sql: Sql;
log: FastifyBaseLogger;
@@ -98,36 +71,32 @@ export class OpenCodeServerBackend implements AgentBackend {
private readonly sql: Sql;
private readonly log: FastifyBaseLogger;
private readonly opencodeBinary: string;
private child: ChildProcess | null = null;
private client: OpencodeClient | null = null;
private port: number | null = null;
private up = false;
private serverStarting: Promise<void> | null = null;
// Phase 3 busy-aware health monitor (openchamber lift): consecutive failed
// probes + the start of an unhealthy-while-busy window feed `decideRestart`.
private consecutiveHealthFailures = 0;
private unhealthyBusySince = 0;
private restarting: Promise<void> | null = null;
private readonly supervisor: OpenCodeServerSupervisor;
/** opencode session id → demux state. Maintained by ensureSession; read by the SSE loop. */
private readonly byOpencodeId = new Map<string, SessionState>();
/** Coalesces concurrent ensureSession calls for the same (chat, agent) key. */
private readonly ensuring = new Map<string, Promise<AgentSessionHandle>>();
constructor(deps: OpenCodeServerBackendDeps) {
this.sql = deps.sql;
this.log = deps.log;
this.opencodeBinary = deps.opencodeBinary;
this.supervisor = new OpenCodeServerSupervisor({
opencodeBinary: deps.opencodeBinary,
log: deps.log,
hooks: {
isBusy: () => this.isBusy(),
onServerDown: (info) => this.onServerDown(info),
},
});
}
/** §2: liveness for the health endpoint + dispatcher fallback decision. */
health(): 'up' | 'down' {
return this.up ? 'up' : 'down';
return this.supervisor.health();
}
/** Phase 3: busy iff ANY pooled opencode session has an in-flight turn. The
* pool reads this to skip idle/LRU eviction and the health-monitor to defer a
* restart (never tear down a session mid-stream). */
/** Phase 3: busy iff ANY pooled opencode session has an in-flight turn. */
isBusy(): boolean {
for (const st of this.byOpencodeId.values()) {
if (st.activeTurn) return true;
@@ -135,72 +104,23 @@ export class OpenCodeServerBackend implements AgentBackend {
return false;
}
// ─── Server lifecycle (1.2: spawn once + client + ready; Phase 3 crash-restart) ──
/**
* Lazy: start the single server on first use; re-spawn after a crash. Idempotent
* within one live server — `serverStarting` caches the in-flight start, and is
* reset to null by the crash handler so the NEXT ensureServer re-spawns a fresh
* server (Phase 3 crash recovery). A dead-but-not-yet-reaped child (exit handler
* raced) is also treated as needing a restart.
*/
private ensureServer(): Promise<void> {
const childDead = this.child != null && (this.child.exitCode !== null || this.child.signalCode !== null);
if (!this.serverStarting || (!this.up && childDead)) {
this.serverStarting = this.startServer();
}
return this.serverStarting;
}
private async startServer(): Promise<void> {
const port = await freePort();
// Phase 1: run unsecured on loopback (opencode's documented default — serve.ts
// only WARNS when OPENCODE_SERVER_PASSWORD is unset). The real boundary is the
// 127.0.0.1 bind. Defense-in-depth basic-auth is deferred: the hey-api client's
// auth wiring + opencode's exact scheme must be confirmed against a live server
// first, else every request 401s. Recon explicitly said "do NOT block on it".
const child = spawn(this.opencodeBinary, ['serve', '--hostname', '127.0.0.1', '--port', String(port)], {
stdio: ['ignore', 'pipe', 'pipe'],
env: { ...process.env },
});
this.child = child;
this.port = port;
// Child lifetime is the backend's (the pool's), NOT a request's. We never tie
// it to a per-turn abort signal. Phase 3: on unexpected exit we recover —
// settle any in-flight turns as failed, mark their agent_sessions rows crashed,
// and reset `serverStarting` so the next ensureServer re-spawns. opencode keeps
// sessions on disk, but a fresh server's in-memory state is gone, so the next
// turn's ensureSession (rows now 'crashed') creates fresh opencode sessions.
child.on('exit', (code, signal) => {
// Only react to THIS child's exit (a restart may have swapped in a new one).
if (this.child !== child) return;
this.handleServerCrash(code, signal, port);
});
await waitForReady(child, READY_TIMEOUT_MS);
this.client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${port}` });
this.up = true;
this.log.info({ port }, 'opencode-server: ready');
/** Phase 3 proactive health probe + busy-aware self-restart, run by the pool's
* periodic sweep. Delegates to the supervisor. */
async tickHealth(now: number = Date.now()): Promise<void> {
await this.supervisor.tickHealth(now);
}
/**
* Crash handler (Phase 3, lift of openchamber's restart-on-exit path). The
* server died with N live opencode sessions; we can't restart it here (the next
* turn does, lazily — avoids a restart storm if the binary is broken). We:
* 1. fail every in-flight turn so its dispatcher unblocks + publishes an error,
* 2. mark each session's agent_sessions row 'crashed' so ensureSession won't
* resume a now-dead native session id (it creates fresh),
* 3. tear down the SSE loops + demux state (stale against the dead server),
* 4. reclaim the port + reset state so the next ensureServer re-spawns.
* Server down (crash-exit or forced restart): fail every in-flight turn so its
* dispatcher unblocks, mark each session crashed so ensureSession won't resume a
* now-dead native id, and tear down the SSE loops + demux state. Invoked by the
* supervisor (it owns the process/port reset). Mirrors the original
* handleServerCrash session-half byte-for-byte.
*/
private handleServerCrash(code: number | null, signal: NodeJS.Signals | null, port: number): void {
this.up = false;
private onServerDown(info: ServerDownInfo): void {
const states = [...this.byOpencodeId.values()];
this.log.warn(
{ code, signal, port, liveSessions: states.length },
{ code: info.code, signal: info.signal, port: info.port, liveSessions: states.length },
'opencode-server: child exited — recovering (fail in-flight, mark crashed, re-spawn next turn)',
);
@@ -219,8 +139,6 @@ export class OpenCodeServerBackend implements AgentBackend {
}
// Drop the demux map: every session id is stale against a fresh server.
this.byOpencodeId.clear();
this.client = null;
this.serverStarting = null; // force a re-spawn on the next ensureServer
if (crashedIds.length > 0) {
this.sql`
@@ -230,146 +148,20 @@ export class OpenCodeServerBackend implements AgentBackend {
this.log.warn({ err: errMsg(err) }, 'opencode-server: failed to mark crashed sessions (non-fatal)');
});
}
// Reclaim the port so a re-spawn on a fixed/leaked port isn't blocked. Best
// effort; the next start uses a fresh ephemeral port anyway.
reclaimPort(port);
}
/**
* Phase 3 proactive health monitor (openchamber `runHealthCheckCycle` lift,
* busy-aware). Probes the server's /global/health; on a sustained failure of a
* NON-busy server, force a restart so the next turn isn't blocked by a wedged
* (hung-but-not-exited) process. Busy servers are deferred via the stale-grace in
* `decideRestart` — never tear down live work. Driven by the pool's periodic
* sweep (best-effort; a crash-exit is already handled by `handleServerCrash` +
* lazy `ensureServer` re-spawn, so this only catches the hung case). No-op when
* the server was never started or a restart is already in flight.
*/
async tickHealth(now: number = Date.now()): Promise<void> {
if (!this.child || this.restarting) return;
const childExited = this.child.exitCode !== null || this.child.signalCode !== null;
// An exited child is recovered lazily by ensureServer; don't double-restart it.
if (childExited) return;
// ─── SSE loop wiring ─────────────────────────────────────────────────────────
const healthy = await this.probeHealth();
if (healthy) {
this.consecutiveHealthFailures = 0;
this.unhealthyBusySince = 0;
return;
}
this.consecutiveHealthFailures += 1;
const busy = this.isBusy();
const decision = decideRestart({
processExited: false,
consecutiveFailures: this.consecutiveHealthFailures,
busy,
unhealthyBusySince: this.unhealthyBusySince,
now,
failureThreshold: DEFAULT_HEALTH_FAILURE_THRESHOLD,
});
// Stamp the start of an unhealthy-while-busy window so the stale-grace can fire.
if (busy && this.unhealthyBusySince === 0) this.unhealthyBusySince = now;
if (decision.action === 'restart') {
this.log.warn(
{ failures: this.consecutiveHealthFailures, busy, reason: decision.reason },
'opencode-server: health monitor forcing restart',
);
this.consecutiveHealthFailures = 0;
this.unhealthyBusySince = 0;
await this.restartServer();
}
}
private async probeHealth(): Promise<boolean> {
if (!this.client) return false;
try {
const res = await this.client.global.health();
return !res.error;
} catch {
return false;
}
}
/** Force-kill the current server + reclaim its port; the next ensureServer
* re-spawns (lazy). Mirrors handleServerCrash's state reset but is initiated by
* the health monitor rather than the OS. */
private async restartServer(): Promise<void> {
if (this.restarting) return this.restarting;
this.restarting = (async () => {
const child = this.child;
const port = this.port;
this.up = false;
// Fail in-flight turns + mark sessions crashed via the same path as a crash.
if (child) {
this.handleServerCrash(null, null, port ?? 0);
if (!child.killed) child.kill('SIGTERM');
}
if (port) {
reclaimPort(port);
await waitForPortRelease(port, 3_000);
}
this.child = null;
})().finally(() => {
this.restarting = null;
});
return this.restarting;
}
// ─── SSE read loop + demux + translate (1.3) + dedup (1.4) ───────────────────
/** Per-session SSE subscription, scoped to the session's worktree directory.
* opencode scopes events by the `directory` query param (defaults to the
* server's cwd if omitted), so two sessions in different worktrees each get
* their own dir-scoped stream and never drop each other's events. Idempotent:
* a no-op if this session's loop is already running. Started from ensureSession
* (and defensively from prompt) once worktreePath is known. */
private startSessionEventLoop(state: SessionState): void {
if (state.sseAbort) return; // already running
const abort = new AbortController();
state.sseAbort = abort;
void this.runSessionEventLoop(state, abort).finally(() => {
// Only clear if this controller is still the live one (a later restart may
// have already installed a new one).
if (state.sseAbort === abort) state.sseAbort = null;
});
}
private async runSessionEventLoop(state: SessionState, abort: AbortController): Promise<void> {
const signal = abort.signal;
while (this.up && this.client && !signal.aborted) {
try {
// Re-read worktreePath each (re)subscribe so a directory refresh is picked
// up on reconnect. Passing `signal` lets close/dispose tear down a stream
// that's parked in `for await` between events.
const sub = await this.client.event.subscribe(
{ directory: state.worktreePath },
{ signal },
);
for await (const ev of sub.stream) {
if (signal.aborted) break;
// Dir-scoped streams should only carry this session's events, but two
// sessions sharing a worktree (possible post-P1.5-b) each receive BOTH
// sessions' events — so drop anything that isn't ours, else the other
// session's deltas get processed twice (once per loop).
const sid = eventSessionId(ev);
if (sid != null && sid !== state.agentSessionId) continue;
this.dispatchEvent(ev);
}
if (this.up && !signal.aborted) {
await this.reconcile(state); // recover an idle/error lost during the gap
await sleep(SSE_RECONNECT_DELAY_MS);
}
} catch (err) {
if (!this.up || signal.aborted) break;
this.log.warn(
{ err: errMsg(err), agentSessionId: state.agentSessionId },
'opencode-server: session event loop error; reconnecting',
);
await this.reconcile(state);
await sleep(SSE_RECONNECT_DELAY_MS);
}
}
/** The dependency bundle the per-session SSE loop reads. */
private sseDeps(): SseLoopDeps {
return {
isUp: () => this.supervisor.isUp(),
getClient: () => this.supervisor.client,
dispatchEvent: (ev) => this.dispatchEvent(ev),
reconcile: (st) => this.reconcile(st),
onReconnectGiveUp: (st) => this.onReconnectGiveUp(st),
log: this.log,
};
}
/** Demux one event to the owning session's active turn. Unknown/between-turns → drop. */
@@ -398,15 +190,7 @@ export class OpenCodeServerBackend implements AgentBackend {
const st = this.byOpencodeId.get(p.sessionID);
if (!st?.activeTurn) return;
this.bumpActivity(st);
const snap: AcpToolSnapshot = {
toolCallId: p.callID,
title: p.tool,
kind: null,
status: 'in_progress',
rawInput: p.input,
rawOutput: undefined,
};
st.activeTurn.onEvent({ type: 'tool_call', toolCall: snap });
st.activeTurn.onEvent({ type: 'tool_call', toolCall: toolCalledSnapshot(p) });
return;
}
case 'session.next.tool.success': {
@@ -414,16 +198,7 @@ export class OpenCodeServerBackend implements AgentBackend {
const st = this.byOpencodeId.get(p.sessionID);
if (!st?.activeTurn) return;
this.bumpActivity(st);
const output = p.content?.map((c) => ('text' in c ? (c as { text: string }).text : '')).join('') ?? '';
const snap: AcpToolSnapshot = {
toolCallId: p.callID,
title: p.callID,
kind: null,
status: 'completed',
rawInput: undefined,
rawOutput: output,
};
st.activeTurn.onEvent({ type: 'tool_update', toolCall: snap });
st.activeTurn.onEvent({ type: 'tool_update', toolCall: toolSuccessSnapshot(p) });
return;
}
case 'session.next.tool.failed': {
@@ -431,15 +206,7 @@ export class OpenCodeServerBackend implements AgentBackend {
const st = this.byOpencodeId.get(p.sessionID);
if (!st?.activeTurn) return;
this.bumpActivity(st);
const snap: AcpToolSnapshot = {
toolCallId: p.callID,
title: p.callID,
kind: null,
status: 'failed',
rawInput: undefined,
rawOutput: errToString(p.error),
};
st.activeTurn.onEvent({ type: 'tool_update', toolCall: snap });
st.activeTurn.onEvent({ type: 'tool_update', toolCall: toolFailedSnapshot(p) });
return;
}
// ─── per-step usage (U.6) — token/cost accounting for opencode sessions ──
@@ -449,8 +216,7 @@ export class OpenCodeServerBackend implements AgentBackend {
if (!st?.activeTurn) return;
this.bumpActivity(st);
// Accumulate this step's normalized usage onto the (chat_id, agent) row.
// Fire-and-forget: a DB hiccup must not stall the turn. opencode emits this
// once per LLM step, so a multi-tool turn sums several deltas.
// Fire-and-forget: a DB hiccup must not stall the turn.
const usage = stepEndedToUsage(p);
void this.accumulateUsage(st, usage);
return;
@@ -461,15 +227,8 @@ export class OpenCodeServerBackend implements AgentBackend {
const st = this.byOpencodeId.get(p.sessionID);
if (!st?.activeTurn) return;
this.bumpActivity(st);
const isReasoning = p.field === 'reasoning' || st.partTypeById.get(p.partID) === 'reasoning';
if (isReasoning) {
st.streamedPartKeys.add(`reasoning:${p.partID}`);
st.activeTurn.onEvent({ type: 'reasoning', text: p.delta });
} else if (p.field === 'text') {
st.streamedPartKeys.add(`text:${p.partID}`);
const cleaned = stripDcpTags(p.delta);
if (cleaned) st.activeTurn.onEvent({ type: 'text', text: cleaned });
}
const e = classifyPartDelta(p, st);
if (e) st.activeTurn.onEvent(e);
return;
}
case 'message.part.updated': {
@@ -477,7 +236,8 @@ export class OpenCodeServerBackend implements AgentBackend {
const st = this.byOpencodeId.get(part.sessionID);
if (!st?.activeTurn) return;
this.bumpActivity(st);
this.handleUpdatedPart(part, st);
const e = classifyUpdatedPart(part, st);
if (e) st.activeTurn.onEvent(e);
return;
}
// ─── lifecycle ─────────────────────────────────────────────────────────
@@ -502,40 +262,6 @@ export class OpenCodeServerBackend implements AgentBackend {
}
}
/** Terminal part: dedup gate for text/reasoning; tool parts → tool_call/tool_update. */
private handleUpdatedPart(part: Part, st: SessionState): void {
const turn = st.activeTurn;
if (!turn) return;
if (part.type === 'text' || part.type === 'reasoning') {
st.partTypeById.set(part.id, part.type);
const key = resolvePartDedupeKey(part, part.type);
if (key && st.streamedPartKeys.delete(key)) return; // already streamed via delta
const raw = part.text ?? '';
const text = part.type === 'text' ? stripDcpTags(raw) : raw;
if (text && part.time?.end != null) {
turn.onEvent({ type: part.type, text });
}
return;
}
if (part.type === 'tool') {
const snap = toolPartToSnapshot(part);
const status = part.state?.status;
// tool_call on start (pending/running), tool_update on terminal (completed/error).
// The current ACP path merges both into one frame; the contract keeps them
// distinct because opencode's SSE distinguishes start from result.
const event: AgentEvent =
status === 'completed' || status === 'error'
? { type: 'tool_update', toolCall: snap }
: { type: 'tool_call', toolCall: snap };
turn.onEvent(event);
return;
}
// NOTE: opencode's SSE payload union carries no available-commands event, so the
// AgentEvent 'commands' arm is intentionally never emitted here (1.3).
}
// ─── turn-completion resilience (watchdog + reconnect reconcile) ─────────────
/** Reset the inactivity backstop on any event routed to a session's active turn. */
@@ -550,8 +276,8 @@ export class OpenCodeServerBackend implements AgentBackend {
st.watchdog.unref?.();
}
/** Watchdog fired: reconcile once; if the server says still-running we can't tell, so fail closed.
* Also mark the agent_sessions row crashed so a stale session isn't resumed next turn. */
/** Watchdog fired: reconcile once; if still-running we can't tell, so fail closed.
* Also mark the agent_sessions row crashed so a stale session isn't resumed. */
private async onTurnStall(st: SessionState): Promise<void> {
const settled = await this.reconcile(st);
if (!settled) {
@@ -564,16 +290,27 @@ export class OpenCodeServerBackend implements AgentBackend {
}
}
/** SSE circuit-breaker fired (reconnect gave up): fail the active turn + mark the
* session crashed so it isn't resumed. The next turn re-creates a fresh session. */
private async onReconnectGiveUp(st: SessionState): Promise<void> {
if (!st.activeTurn) return;
await this.sql`
UPDATE agent_sessions SET status = 'crashed'
WHERE agent_session_id = ${st.agentSessionId}
`.catch(() => {});
st.activeTurn?.settle({ ok: false, error: 'opencode SSE stream lost (reconnect gave up)' });
}
/**
* Ask the server whether this session's turn already finished — recovers a
* session.idle/error lost during an SSE gap. Returns true if it settled the turn.
* Inconclusive (still running / call failed) → false; the watchdog covers that.
*/
private async reconcile(st: SessionState): Promise<boolean> {
const turn = st.activeTurn;
if (!turn || !this.client) return false;
const client = this.supervisor.client;
if (!turn || !client) return false;
try {
const res = await this.client.session.messages({
const res = await client.session.messages({
sessionID: st.agentSessionId,
directory: st.worktreePath,
});
@@ -605,10 +342,8 @@ export class OpenCodeServerBackend implements AgentBackend {
/**
* Accumulate one `session.next.step.ended`'s normalized usage onto the session's
* agent_sessions row, keyed by the resumed `agent_session_id` (unique per active
* row — the dispatcher's `(chat_id, agent)` lookup wrote it). Running totals for
* the whole conversation context (not last-step). Zero-delta steps are skipped to
* avoid a no-op write. Errors are swallowed: usage telemetry must never fail a turn.
* agent_sessions row. Running totals for the whole conversation context. Zero-delta
* steps are skipped. Errors are swallowed: usage telemetry must never fail a turn.
*/
private async accumulateUsage(st: SessionState, u: StepUsage): Promise<void> {
if (u.input === 0 && u.output === 0 && u.cost === 0) return;
@@ -631,13 +366,29 @@ export class OpenCodeServerBackend implements AgentBackend {
// ─── ensureSession: create-or-resume against agent_sessions (1.5) ────────────
async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise<AgentSessionHandle> {
await this.ensureServer();
if (!this.client) throw new Error('opencode-server: client not ready after ensureServer');
// Coalesce concurrent first-turns for the same (chat, agent) so the SELECT…
// create…upsert can't race into two opencode sessions (the second orphaning
// the first). A single (non-concurrent) call is unaffected — the entry is set
// and removed within this call. Defensive: the dispatcher already serializes
// turns per (chat, agent) via its inflight map.
const key = `${opts.chatId}:${opts.agent}`;
const existing = this.ensuring.get(key);
if (existing) return existing;
const p = this.ensureSessionInner(sessionId, opts).finally(() => {
if (this.ensuring.get(key) === p) this.ensuring.delete(key);
});
this.ensuring.set(key, p);
return p;
}
private async ensureSessionInner(sessionId: string, opts: EnsureSessionOpts): Promise<AgentSessionHandle> {
await this.supervisor.ensureServer();
const client = this.supervisor.client;
if (!client) throw new Error('opencode-server: client not ready after ensureServer');
const configHash = sessionConfigHash(opts.model);
// P1.5-b: agent_sessions is keyed (chat_id, agent) — the tab/chat is the
// context unit (two tabs in one session = two contexts sharing one worktree).
// session_id + worktree_id are retained as informational (SET NULL) columns.
const [row] = await this.sql<{ agent_session_id: string | null; status: string; config_hash: string | null }[]>`
SELECT agent_session_id, status, config_hash FROM agent_sessions
WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent}
@@ -655,7 +406,7 @@ export class OpenCodeServerBackend implements AgentBackend {
'opencode-server: not resuming stale session, creating fresh');
this.byOpencodeId.delete(agentSessionId);
}
const created = await this.client.session.create({ directory: opts.worktreePath });
const created = await client.session.create({ directory: opts.worktreePath });
if (created.error || !created.data) {
throw new Error(`opencode-server: session.create failed: ${errToString(created.error)}`);
}
@@ -664,7 +415,7 @@ export class OpenCodeServerBackend implements AgentBackend {
INSERT INTO agent_sessions
(chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at, config_hash)
VALUES
(${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'opencode_server', ${agentSessionId}, ${this.port}, 'active', clock_timestamp(), ${configHash})
(${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'opencode_server', ${agentSessionId}, ${this.supervisor.port}, 'active', clock_timestamp(), ${configHash})
ON CONFLICT (chat_id, agent) DO UPDATE SET
session_id = EXCLUDED.session_id,
worktree_id = EXCLUDED.worktree_id,
@@ -678,7 +429,7 @@ export class OpenCodeServerBackend implements AgentBackend {
} else {
await this.sql`
UPDATE agent_sessions
SET status = 'active', last_active_at = clock_timestamp(), server_port = ${this.port}, config_hash = ${configHash}
SET status = 'active', last_active_at = clock_timestamp(), server_port = ${this.supervisor.port}, config_hash = ${configHash}
WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent}
`;
}
@@ -693,24 +444,13 @@ export class OpenCodeServerBackend implements AgentBackend {
state.boocodeSessionId = sessionId;
state.worktreePath = opts.worktreePath;
} else {
state = {
boocodeSessionId: sessionId,
agentSessionId: ocSessionId,
worktreePath: opts.worktreePath,
streamedPartKeys: new Set(),
partTypeById: new Map(),
activeTurn: null,
watchdog: null,
sseAbort: null,
swallowNextTerminal: false,
};
state = this.makeSessionState(sessionId, ocSessionId, opts.worktreePath);
this.byOpencodeId.set(ocSessionId, state);
}
// Start this session's own SSE loop, scoped to its worktree directory. Both
// fresh-create and resume reach here; idempotent, so a re-ensure (e.g. a
// second turn) won't spawn a duplicate loop.
this.startSessionEventLoop(state);
// fresh-create and resume reach here; idempotent.
startSessionEventLoop(state, this.sseDeps());
return {
sessionId,
@@ -719,40 +459,53 @@ export class OpenCodeServerBackend implements AgentBackend {
chatId: opts.chatId,
worktreeId: opts.worktreeId,
agentSessionId: ocSessionId,
serverPort: this.port,
serverPort: this.supervisor.port,
};
}
/** Fresh per-(opencode session) demux state. */
private makeSessionState(boocodeSessionId: string, agentSessionId: string, worktreePath: string): SessionState {
return {
boocodeSessionId,
agentSessionId,
worktreePath,
streamedPartKeys: new Set(),
partTypeById: new Map(),
activeTurn: null,
watchdog: null,
sseAbort: null,
swallowNextTerminal: false,
};
}
// ─── prompt: send one turn (1.6) ─────────────────────────────────────────────
async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise<TurnResult> {
if (!this.client) throw new Error('opencode-server: client not ready');
const client = this.supervisor.client;
if (!client) throw new Error('opencode-server: client not ready');
const oc = handle.agentSessionId;
if (!oc) throw new Error('opencode-server: handle has no agentSessionId');
let state = this.byOpencodeId.get(oc);
if (!state) {
state = {
boocodeSessionId: handle.sessionId,
agentSessionId: oc,
worktreePath: ctx.worktreePath,
streamedPartKeys: new Set(),
partTypeById: new Map(),
activeTurn: null,
watchdog: null,
sseAbort: null,
swallowNextTerminal: false,
};
state = this.makeSessionState(handle.sessionId, oc, ctx.worktreePath);
this.byOpencodeId.set(oc, state);
}
const session = state;
// v2.7 busy-assert: one in-flight turn per session. The dispatcher serializes
// turns per (chat, agent), so this never fires in normal dispatch — but if a
// second prompt arrives while one is live it would silently overwrite the slot
// and orphan the first turn, so reject instead.
if (session.activeTurn) {
return { ok: false, error: 'opencode-server: session already has an in-flight turn' };
}
// Authoritative per-turn directory for SDK routing + reconcile.
session.worktreePath = ctx.worktreePath;
// Defensive: ensureSession normally starts the loop, but if prompt is reached
// with a freshly-created state (no loop yet), start it so the turn streams.
// Idempotent when ensureSession already started one.
this.startSessionEventLoop(session);
const client = this.client;
startSessionEventLoop(session, this.sseDeps());
return await new Promise<TurnResult>((resolve) => {
let settled = false;
@@ -781,7 +534,8 @@ export class OpenCodeServerBackend implements AgentBackend {
settle({ ok: false, error: 'aborted' });
};
session.activeTurn = { onEvent: ctx.onEvent, settle };
const turn: TurnState = { onEvent: ctx.onEvent, settle };
session.activeTurn = turn;
this.bumpActivity(session); // arm the inactivity backstop
if (ctx.signal.aborted) {
@@ -822,39 +576,15 @@ export class OpenCodeServerBackend implements AgentBackend {
}
async dispose(): Promise<void> {
this.up = false;
// Abort every per-session SSE loop so none survive the teardown.
for (const st of this.byOpencodeId.values()) st.sseAbort?.abort();
const child = this.child;
this.child = null;
this.client = null;
this.byOpencodeId.clear();
if (child && !child.killed) {
child.kill('SIGTERM');
const t = setTimeout(() => {
if (!child.killed) child.kill('SIGKILL');
}, 5_000);
t.unref();
}
await this.supervisor.dispose();
}
}
// ─── helpers ──────────────────────────────────────────────────────────────────
/** Extract the opencode sessionID an event belongs to, across event shapes.
* Most carry `properties.sessionID`; `message.part.updated` nests it under
* `properties.part.sessionID`. Returns null when the event has no session
* (the per-session loop then leaves it to dispatchEvent, which drops it). */
function eventSessionId(ev: Event): string | null {
const props = (ev as { properties?: unknown }).properties;
if (!props || typeof props !== 'object') return null;
if (ev.type === 'message.part.updated') {
const part = (props as { part?: { sessionID?: string } }).part;
return part?.sessionID ?? null;
}
return (props as { sessionID?: string }).sessionID ?? null;
}
/** BooCoder model string "provider/model" → opencode's structured {providerID, modelID}. */
function parseModel(model: string | undefined): { providerID: string; modelID: string } | undefined {
if (!model || !model.trim()) return undefined;
@@ -864,199 +594,14 @@ function parseModel(model: string | undefined): { providerID: string; modelID: s
return { providerID: trimmed.slice(0, idx), modelID: trimmed.slice(idx + 1) };
}
// No slash but non-empty → infer llama-swap (the only configured provider).
// Guard against bare '/' or trailing/leading slash.
if (idx < 0 && trimmed.length > 0) {
return { providerID: 'llama-swap', modelID: trimmed };
}
return undefined;
}
/** Ported verbatim from Paseo opencode-agent.ts: id → message-id fallback → null. */
function resolvePartDedupeKey(part: { id: string; messageID: string }, type: string): string | null {
if (part.id.trim().length > 0) return `${type}:${part.id}`;
if (part.messageID.trim().length > 0) return `${type}:message:${part.messageID}`;
return null;
}
/** opencode ToolPart → ACP-shaped snapshot (reuses the existing persist/render path). */
function toolPartToSnapshot(part: ToolPart): AcpToolSnapshot {
const state = part.state;
let rawInput: unknown;
let rawOutput: unknown;
let title: string | undefined;
if (state) {
if ('input' in state) rawInput = (state as { input?: unknown }).input;
if ('output' in state) rawOutput = (state as { output?: unknown }).output;
else if ('error' in state) rawOutput = (state as { error?: unknown }).error;
if ('title' in state) title = (state as { title?: string }).title;
}
return {
toolCallId: part.callID,
title: title ?? part.tool,
kind: null,
status: mapToolStatus(state?.status),
rawInput,
rawOutput,
};
}
function mapToolStatus(s: ToolState['status'] | undefined): ToolCallStatus | null {
switch (s) {
case 'pending':
return 'pending';
case 'running':
return 'in_progress';
case 'completed':
return 'completed';
case 'error':
return 'failed';
default:
return null;
}
}
/**
* Reclaim a loopback port a dead opencode child may still hold (lift of
* openchamber `killProcessOnPort`). Best-effort, POSIX-only (`lsof`/`kill`); a
* failure is harmless because the next spawn allocates a fresh ephemeral port.
* Never kills this process. Synchronous + short-timeout so the crash handler
* doesn't block.
*/
function reclaimPort(port: number | null): void {
if (!port || process.platform === 'win32') return;
try {
const res = spawnSync('lsof', ['-ti', `:${port}`], { encoding: 'utf8', timeout: 3_000, windowsHide: true });
const out = res.stdout || '';
const myPid = process.pid;
for (const pidStr of out.split(/\s+/)) {
const pid = parseInt(pidStr.trim(), 10);
if (pid && pid !== myPid) {
try {
spawnSync('kill', ['-9', String(pid)], { stdio: 'ignore', timeout: 2_000 });
} catch {
// ignore — best effort
}
}
}
} catch {
// lsof absent or failed — the fresh-ephemeral-port spawn doesn't need this.
}
}
/**
* Resolve true once nothing is listening on `port` (lift of openchamber
* `waitForPortRelease`). Used before re-spawning on a fixed port; with ephemeral
* ports it's a fast no-op. Probes 127.0.0.1; resolves false at the deadline.
*/
function waitForPortRelease(port: number, timeoutMs: number): Promise<boolean> {
const deadline = Date.now() + timeoutMs;
return new Promise((resolve) => {
const attempt = () => {
const socket = netConnect({ port, host: '127.0.0.1' });
let settled = false;
const finish = (released: boolean) => {
if (settled) return;
settled = true;
socket.removeAllListeners();
socket.destroy();
if (released || Date.now() >= deadline) {
resolve(released);
return;
}
setTimeout(attempt, 150);
};
socket.once('connect', () => finish(false));
socket.once('error', (err: NodeJS.ErrnoException) => {
if (err && (err.code === 'ECONNREFUSED' || err.code === 'EHOSTUNREACH')) finish(true);
else finish(false);
});
socket.setTimeout(500, () => finish(true));
};
attempt();
});
}
/** Bind-probe an ephemeral port on loopback. */
function freePort(): Promise<number> {
return new Promise((resolve, reject) => {
const srv = createServer();
srv.unref();
srv.on('error', reject);
srv.listen(0, '127.0.0.1', () => {
const addr = srv.address();
if (addr && typeof addr === 'object') {
const { port } = addr;
srv.close(() => resolve(port));
} else {
srv.close(() => reject(new Error('opencode-server: could not determine a free port')));
}
});
});
}
/** Resolve when the child prints the ready line; reject on timeout or early exit. */
function waitForReady(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve, reject) => {
let done = false;
let stderrBuf = '';
const finish = (err?: Error) => {
if (done) return;
done = true;
clearTimeout(timer);
child.stdout?.off('data', onOut);
child.stderr?.off('data', onErr);
child.off('exit', onExit);
if (err) reject(err);
else resolve();
};
const onOut = (buf: Buffer) => {
if (buf.toString().includes('opencode server listening on')) finish();
};
const onErr = (buf: Buffer) => {
stderrBuf += buf.toString();
};
const onExit = (code: number | null) =>
finish(new Error(`opencode serve exited before ready (code ${code}); stderr: ${stderrBuf.slice(-2000)}`));
const timer = setTimeout(
() => finish(new Error(`opencode serve not ready in ${timeoutMs}ms; stderr: ${stderrBuf.slice(-2000)}`)),
timeoutMs,
);
child.stdout?.on('data', onOut);
child.stderr?.on('data', onErr);
child.on('exit', onExit);
});
}
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}
/** Strip opencode-dcp plugin tags that render as literal text in the UI. */
function stripDcpTags(s: string): string {
return s.replace(/<dcp-message-id>[^<]*<\/dcp-message-id>/g, '');
}
function errMsg(e: unknown): string {
return e instanceof Error ? e.message : String(e);
}
function errToString(e: unknown): string {
if (e == null) return 'unknown error';
if (typeof e === 'string') return e;
if (e instanceof Error) return e.message;
try {
return JSON.stringify(e);
} catch {
return String(e);
}
}
/** Hash of stable config — detects model changes across sessions without
* invalidating on ephemeral state like the random server port (which changes
* every BooCoder restart). */
* invalidating on ephemeral state like the random server port. */
function sessionConfigHash(model: string): string {
return createHash('sha256').update(`opencode_server|${model}`).digest('hex').slice(0, 16);
}

View File

@@ -0,0 +1,181 @@
/**
* Per-session SSE subscribe loop + reconnect/backoff + eventSessionId demux.
*
* Extracted (v2.7 audit reshape) from `OpenCodeServerBackend.startSessionEventLoop`
* / `runSessionEventLoop`. opencode scopes events by the `directory` query param, so
* each session runs its own dir-scoped stream and never drops a sibling's events.
*
* The loop is intentionally thin: it owns subscribe + the demux filter + reconnect
* timing only. Translating an event into turn side effects (watchdog, usage,
* settle) stays on the backend via the injected `dispatchEvent` / `reconcile`
* callbacks — `opencode-sse` knows nothing about turns or the DB.
*
* v2.7 concurrency hardening: the throw-driven reconnect path now backs off
* exponentially and trips a circuit-breaker (`onReconnectGiveUp`) after a bounded
* number of consecutive failures, instead of looping forever at a flat 1s. The
* HAPPY PATH is unchanged — a clean stream end (server still up) reconnects after
* `baseMs` (1s, as before) and resets the failure counter, so a long-lived session
* that re-subscribes normally never backs off.
*/
import type { FastifyBaseLogger } from 'fastify';
import type { Event, OpencodeClient } from '@opencode-ai/sdk/v2/client';
import type { AgentEvent } from '../agent-backend.js';
import type { TurnResult } from '../agent-backend.js';
import { eventSessionId, errMsg } from './opencode-event-map.js';
export const SSE_RECONNECT_DELAY_MS = 1_000;
/** One in-flight turn's emitter + completion settler. */
export interface TurnState {
onEvent: (e: AgentEvent) => void;
settle: (r: TurnResult) => void;
}
/** Per-(opencode session) demux state. dedup sets scoped here, cleared per turn. */
export interface SessionState {
boocodeSessionId: string;
agentSessionId: string;
/** Worktree directory for SDK `directory` routing; refreshed each turn from ctx. */
worktreePath: string;
/** dedup gate: `${type}:${id}` added on delta, deleted-and-tested on updated. Cleared at turn end. */
streamedPartKeys: Set<string>;
/** partID → 'text' | 'reasoning', so a delta with a non-'reasoning' field is still classed right. Cleared at turn end. */
partTypeById: Map<string, string>;
activeTurn: TurnState | null;
/** Inactivity backstop timer for the active turn; null when no turn in flight. */
watchdog: ReturnType<typeof setTimeout> | null;
/** Per-session SSE subscription handle. Non-null while the loop is running;
* aborting it tears down the underlying fetch and exits the loop. */
sseAbort: AbortController | null;
/** F.1 post-abort orphan-terminal guard: swallow the one session.idle/error
* opencode emits for an aborted turn so it can't settle the next turn. */
swallowNextTerminal: boolean;
}
// ─── reconnect backoff (pure) ────────────────────────────────────────────────
export interface ReconnectPolicy {
/** First retry delay (and the steady-state clean-reconnect delay). */
baseMs: number;
/** Cap on the exponential delay. */
maxMs: number;
/** Consecutive failures tolerated before the breaker trips (give up). */
maxAttempts: number;
}
export const DEFAULT_RECONNECT_POLICY: ReconnectPolicy = {
baseMs: SSE_RECONNECT_DELAY_MS,
maxMs: 30_000,
maxAttempts: 6,
};
export type ReconnectDecision =
| { action: 'reconnect'; delayMs: number }
| { action: 'give-up' };
/**
* Pure backoff decision after `failures` consecutive throwing reconnect attempts
* (1-based: the first failure passes `failures=1`). Returns an exponentially
* growing delay capped at `maxMs`, or `give-up` once the count exceeds
* `maxAttempts`. `failures=1` yields `baseMs`, so the very first retry matches the
* pre-hardening flat delay (happy-path-preserving).
*/
export function reconnectDecision(
failures: number,
policy: ReconnectPolicy = DEFAULT_RECONNECT_POLICY,
): ReconnectDecision {
if (failures > policy.maxAttempts) return { action: 'give-up' };
const exp = policy.baseMs * 2 ** (failures - 1);
return { action: 'reconnect', delayMs: Math.min(policy.maxMs, exp) };
}
// ─── the loop ────────────────────────────────────────────────────────────────
export interface SseLoopDeps {
/** Live iff the server is up (read each iteration so a crash stops the loop). */
isUp: () => boolean;
/** The current opencode client (null between server restarts). */
getClient: () => OpencodeClient | null;
/** Route one demuxed event to its turn (backend side effects live here). */
dispatchEvent: (ev: Event) => void;
/** Recover an idle/error lost during an SSE gap. Returns true if it settled. */
reconcile: (state: SessionState) => Promise<boolean>;
/** Circuit-breaker: called once the backoff gives up; fail the active turn. */
onReconnectGiveUp: (state: SessionState) => Promise<void> | void;
log: FastifyBaseLogger;
/** Injectable for tests; defaults to a real timer sleep. */
sleep?: (ms: number) => Promise<void>;
policy?: ReconnectPolicy;
}
function defaultSleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}
/** Per-session SSE subscription, scoped to the session's worktree directory.
* Idempotent: a no-op if this session's loop is already running. */
export function startSessionEventLoop(state: SessionState, deps: SseLoopDeps): void {
if (state.sseAbort) return; // already running
const abort = new AbortController();
state.sseAbort = abort;
void runSessionEventLoop(state, abort, deps).finally(() => {
// Only clear if this controller is still the live one (a later restart may
// have already installed a new one).
if (state.sseAbort === abort) state.sseAbort = null;
});
}
export async function runSessionEventLoop(
state: SessionState,
abort: AbortController,
deps: SseLoopDeps,
): Promise<void> {
const signal = abort.signal;
const sleep = deps.sleep ?? defaultSleep;
const policy = deps.policy ?? DEFAULT_RECONNECT_POLICY;
let failures = 0;
while (deps.isUp() && deps.getClient() && !signal.aborted) {
const client = deps.getClient()!;
try {
// Re-read worktreePath each (re)subscribe so a directory refresh is picked
// up on reconnect. Passing `signal` lets close/dispose tear down a stream
// that's parked in `for await` between events.
const sub = await client.event.subscribe({ directory: state.worktreePath }, { signal });
for await (const ev of sub.stream) {
if (signal.aborted) break;
// Dir-scoped streams should only carry this session's events, but two
// sessions sharing a worktree (possible post-P1.5-b) each receive BOTH
// sessions' events — so drop anything that isn't ours, else the other
// session's deltas get processed twice (once per loop).
const sid = eventSessionId(ev);
if (sid != null && sid !== state.agentSessionId) continue;
deps.dispatchEvent(ev);
}
// Clean stream end — a healthy reconnect, NOT a failure: recover any lost
// terminal then re-subscribe at the base delay (pre-hardening behavior).
failures = 0;
if (deps.isUp() && !signal.aborted) {
await deps.reconcile(state); // recover an idle/error lost during the gap
await sleep(policy.baseMs);
}
} catch (err) {
if (!deps.isUp() || signal.aborted) break;
failures += 1;
const decision = reconnectDecision(failures, policy);
deps.log.warn(
{ err: errMsg(err), agentSessionId: state.agentSessionId, failures, action: decision.action },
'opencode-server: session event loop error; reconnecting',
);
await deps.reconcile(state);
if (decision.action === 'give-up') {
deps.log.warn(
{ agentSessionId: state.agentSessionId, failures },
'opencode-server: SSE reconnect gave up (circuit breaker) — failing active turn',
);
await deps.onReconnectGiveUp(state);
break;
}
await sleep(decision.delayMs);
}
}
}

View File

@@ -36,29 +36,15 @@
*/
import { spawn, type ChildProcess } from 'node:child_process';
import type { FastifyBaseLogger } from 'fastify';
import {
ClientSideConnection,
type Client,
type SessionNotification,
type RequestPermissionRequest,
type RequestPermissionResponse,
type ReadTextFileRequest,
type ReadTextFileResponse,
type WriteTextFileRequest,
type WriteTextFileResponse,
type CreateTerminalRequest,
type CreateTerminalResponse,
type CreateElicitationRequest,
type CreateElicitationResponse,
} from '@agentclientprotocol/sdk';
import { ClientSideConnection, type Client } from '@agentclientprotocol/sdk';
import type { Sql } from '../../db.js';
import { resolveLaunchSpec } from '../acp-spawn.js';
import { isTurnOkForStopReason } from './warm-acp-routing.js';
import { getResolvedRegistry, type ResolvedProviderDef } from '../provider-config-registry.js';
import { createAcpNdJsonStream } from '../acp-stream.js';
import { mapSessionUpdate } from '../acp-event-map.js';
import { readWorktreeTextFile, writeWorktreeTextFile } from '../acp-client-fs.js';
import { waitForPermissionResponse, waitForElicitationResponse, cancelPendingPermission } from '../permission-waiter.js';
import { buildAcpClient } from '../acp-client.js';
import { cancelPendingPermission } from '../permission-waiter.js';
import { type AcpToolSnapshot, synthesizeCanceledSnapshots } from '../acp-tool-snapshot.js';
import type {
AgentBackend,
@@ -211,47 +197,25 @@ export class WarmAcpBackend implements AgentBackend {
);
}
/** Build the ACP Client callbacks ONCE per connection. They read `this.activeTurn`
* so each turn's events/permissions route to the right place — exactly the
* opencode-server `activeTurn` pattern. Worktree-scoped FS like AcpStreamContext. */
/** Build the ACP Client callbacks ONCE per connection (shared `buildAcpClient`).
* `resolveTurn` reads `this.activeTurn` at each callback so events/permissions
* route to the live turn — exactly the prior behavior. The warm session always
* has a non-empty `sessionId`, so the shared `taskId && sessionId` permission
* gate is equivalent to the old `turn?.taskId` gate. */
private buildClient(worktreePath: string): Client {
return {
sessionUpdate: async (params: SessionNotification): Promise<void> => {
const turn = this.activeTurn;
if (!turn) return; // between turns — drop (no orphan settles a future turn)
for (const event of mapSessionUpdate(params, turn.snapshots)) {
turn.onEvent(event);
}
},
requestPermission: async (params: RequestPermissionRequest): Promise<RequestPermissionResponse> => {
const turn = this.activeTurn;
if (turn?.taskId) {
// Route to the UI via the per-turn task id (same as the one-shot path).
return waitForPermissionResponse(turn.taskId, turn.sessionId, this.agent, turn.modeId, params);
}
const firstOption = params.options[0];
if (firstOption) return { outcome: { outcome: 'selected', optionId: firstOption.optionId } };
return { outcome: { outcome: 'cancelled' } };
},
readTextFile: async (params: ReadTextFileRequest): Promise<ReadTextFileResponse> => {
const content = await readWorktreeTextFile(worktreePath, params.path, params.line, params.limit);
return { content };
},
writeTextFile: async (params: WriteTextFileRequest): Promise<WriteTextFileResponse> => {
await writeWorktreeTextFile(worktreePath, params.path, params.content);
return {};
},
createTerminal: async (_params: CreateTerminalRequest): Promise<CreateTerminalResponse> => {
return { terminalId: 'noop' };
},
unstable_createElicitation: async (params: CreateElicitationRequest): Promise<CreateElicitationResponse> => {
const turn = this.activeTurn;
if (turn?.taskId) {
return waitForElicitationResponse(turn.taskId, turn.sessionId, this.agent, turn.modeId, params);
}
return { action: 'decline' };
},
};
return buildAcpClient(worktreePath, () => {
const turn = this.activeTurn;
if (!turn) return null;
return {
taskId: turn.taskId,
sessionId: turn.sessionId,
modeId: turn.modeId,
agent: this.agent,
onSessionUpdate: (params) => {
for (const event of mapSessionUpdate(params, turn.snapshots)) turn.onEvent(event);
},
};
});
}
// ─── ensureSession: create-or-reuse the warm session (2.1) ───────────────────
@@ -303,6 +267,14 @@ export class WarmAcpBackend implements AgentBackend {
return { ok: false, error: 'warm-acp: no live ACP connection' };
}
// v2.7 busy-assert: one in-flight turn per warm session. The dispatcher
// serializes turns per (chat, agent), so this never fires in normal dispatch —
// but a second concurrent prompt would silently overwrite `activeTurn` and
// orphan the first turn, so reject instead.
if (this.activeTurn) {
return { ok: false, error: 'warm-acp: session already has an in-flight turn' };
}
const snapshots = new Map<string, AcpToolSnapshot>();
// taskId routes permission/elicitation prompts back to the UI. The dispatcher
// passes it (plus mode) on the per-turn PromptCtx; permission-waiter keys on it.