feat(coder): v2.6 Phase 1 — OpenCode warm server backend
Persistent multi-turn opencode backend: one `opencode serve` HTTP server per BooCoder process, one opencode session per BooCode session (resumed on switch-back), single SSE read loop demuxed by session id. - backends/opencode-server.ts: AgentBackend implementation — spawn with waitForReady, session.next.* SSE event translation (text/reasoning/tool deltas), Paseo-ported reasoning dedup (streamedPartKeys), promptAsync fire-and-forget settled by session.idle, per-turn inactivity watchdog (180s) + reconnect reconciliation via session.messages, stale-session guard (crashed-not-resumed + config_hash fingerprint on model). - dispatcher.ts: opencode routes to pool backend (ensureSession→prompt); per-session concurrency Map replaces global running boolean (1.9); model coalesce (empty→DEFAULT_MODEL) + llama-swap/ prefix for opencode; diff-supersede (DELETE+INSERT pending_changes by session, stamp agent). - worktrees.ts: ensureSessionWorktree (session-keyed, captures base_commit, persists to session_worktrees); diffWorktree gains optional baseRef. - agent-probe.ts: mergeLlamaSwap branch fetches /v1/models, prefixes with llama-swap/, populates opencode's available_agents.models (was 0). - provider-snapshot.ts: export fetchLlamaSwapModels for probe reuse. - schema.sql: session_worktrees + agent_sessions tables (Phase 0) + config_hash column on agent_sessions, pending_changes.agent column. - package.json: @opencode-ai/sdk ~1.15.0 (resolved 1.15.12). Known Phase 1 limitation: single SSE stream scoped to most-recent session's directory; concurrent opencode sessions in different worktrees collide (warning logged, watchdog prevents hang). Phase 2 moves to per-session SSE. Smoke 1 verified: two turns in one session, both produce real tokens, same agent_session_id reused, same server port, turn 2 is 9x faster (no spawn). goose/qwen/claude paths untouched (runExternalAgent md5 identical). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
740
apps/coder/src/services/backends/opencode-server.ts
Normal file
740
apps/coder/src/services/backends/opencode-server.ts
Normal file
@@ -0,0 +1,740 @@
|
||||
/**
|
||||
* v2.6 Phase 1 — OpenCodeServerBackend.
|
||||
*
|
||||
* Warm, multi-turn backend for the `opencode` agent. One `opencode serve` HTTP
|
||||
* server per BooCoder process; one opencode session per BooCode session (resumed
|
||||
* on switch-back); a single SSE read loop demuxes all sessions' events.
|
||||
*
|
||||
* Implements the Phase 0 `AgentBackend` interface. Emits transport-agnostic
|
||||
* `AgentEvent`s — the dispatcher (Phase 1.7, NOT wired in this batch) maps them
|
||||
* to WS frames. No dispatcher/route references this file yet.
|
||||
*
|
||||
* Spec: openspec/changes/v2-6-persistent-agent-sessions/design.md §2 / §2a.
|
||||
* SDK shapes verified by direct read of @opencode-ai/sdk@1.15.12 dist .d.ts:
|
||||
* - client methods take FLATTENED params (sessionID/directory/body all inline),
|
||||
* not {path,query,body}. create→{directory}, promptAsync→{sessionID,directory,
|
||||
* parts,model}, abort→{sessionID,directory}. model is {providerID,modelID}.
|
||||
* - client.event() resolves to { stream: AsyncGenerator<GlobalEvent> }; the
|
||||
* real event is chunk.payload (discriminate on chunk.payload.type).
|
||||
* - promptAsync is fire-and-forget (204); the turn completes via a
|
||||
* 'session.idle' event for that opencode session id.
|
||||
*/
|
||||
import { spawn, type ChildProcess } from 'node:child_process';
|
||||
import { createHash } from 'node:crypto';
|
||||
import { createServer } from 'node:net';
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import {
|
||||
createOpencodeClient,
|
||||
type OpencodeClient,
|
||||
type Event,
|
||||
type Part,
|
||||
type ToolPart,
|
||||
type ToolState,
|
||||
type AssistantMessage,
|
||||
} from '@opencode-ai/sdk/v2/client';
|
||||
import type { ToolCallStatus } from '@agentclientprotocol/sdk';
|
||||
import type { Sql } from '../../db.js';
|
||||
import type { AcpToolSnapshot } from '../acp-tool-snapshot.js';
|
||||
import type {
|
||||
AgentBackend,
|
||||
AgentEvent,
|
||||
AgentSessionHandle,
|
||||
EnsureSessionOpts,
|
||||
PromptCtx,
|
||||
TurnResult,
|
||||
} from '../agent-backend.js';
|
||||
|
||||
const READY_TIMEOUT_MS = 30_000;
|
||||
const SSE_RECONNECT_DELAY_MS = 1_000;
|
||||
/**
|
||||
* No-activity backstop for an in-flight turn. opencode streams reasoning/text/tool
|
||||
* deltas continuously while working, so "zero events for this long" means the turn
|
||||
* is wedged or its terminal event (session.idle) was lost (see the reconnect race
|
||||
* below). Generous so a legitimately slow turn never trips it.
|
||||
*/
|
||||
const TURN_INACTIVITY_MS = 180_000;
|
||||
|
||||
/** One in-flight turn's emitter + completion settler. */
|
||||
interface TurnState {
|
||||
onEvent: (e: AgentEvent) => void;
|
||||
settle: (r: TurnResult) => void;
|
||||
}
|
||||
|
||||
/** Per-(opencode session) demux state. dedup sets scoped here, cleared per turn. */
|
||||
interface SessionState {
|
||||
boocodeSessionId: string;
|
||||
agentSessionId: string;
|
||||
/** Worktree directory for SDK `directory` routing; refreshed each turn from ctx. */
|
||||
worktreePath: string;
|
||||
/** dedup gate: `${type}:${id}` added on delta, deleted-and-tested on updated. Cleared at turn end. */
|
||||
streamedPartKeys: Set<string>;
|
||||
/** partID → 'text' | 'reasoning', so a delta with a non-'reasoning' field is still classed right. Cleared at turn end. */
|
||||
partTypeById: Map<string, string>;
|
||||
activeTurn: TurnState | null;
|
||||
/** Inactivity backstop timer for the active turn; null when no turn in flight. */
|
||||
watchdog: ReturnType<typeof setTimeout> | null;
|
||||
}
|
||||
|
||||
export interface OpenCodeServerBackendDeps {
|
||||
sql: Sql;
|
||||
log: FastifyBaseLogger;
|
||||
/** Absolute path to the opencode binary (resolved from available_agents at wiring time, Phase 1.7). */
|
||||
opencodeBinary: string;
|
||||
}
|
||||
|
||||
export class OpenCodeServerBackend implements AgentBackend {
|
||||
readonly backend = 'opencode_server' as const;
|
||||
|
||||
private readonly sql: Sql;
|
||||
private readonly log: FastifyBaseLogger;
|
||||
private readonly opencodeBinary: string;
|
||||
|
||||
private child: ChildProcess | null = null;
|
||||
private client: OpencodeClient | null = null;
|
||||
private port: number | null = null;
|
||||
private up = false;
|
||||
private serverStarting: Promise<void> | null = null;
|
||||
private sseRunning = false;
|
||||
|
||||
/** opencode session id → demux state. Maintained by ensureSession; read by the SSE loop. */
|
||||
private readonly byOpencodeId = new Map<string, SessionState>();
|
||||
|
||||
constructor(deps: OpenCodeServerBackendDeps) {
|
||||
this.sql = deps.sql;
|
||||
this.log = deps.log;
|
||||
this.opencodeBinary = deps.opencodeBinary;
|
||||
}
|
||||
|
||||
/** §2: liveness for the health endpoint + dispatcher fallback decision. */
|
||||
health(): 'up' | 'down' {
|
||||
return this.up ? 'up' : 'down';
|
||||
}
|
||||
|
||||
// ─── Server lifecycle (1.2: spawn once + client + ready) ─────────────────────
|
||||
|
||||
/** Lazy: start the single server on first use. Idempotent — one server per backend. */
|
||||
private ensureServer(): Promise<void> {
|
||||
if (!this.serverStarting) this.serverStarting = this.startServer();
|
||||
return this.serverStarting;
|
||||
}
|
||||
|
||||
private async startServer(): Promise<void> {
|
||||
const port = await freePort();
|
||||
|
||||
// Phase 1: run unsecured on loopback (opencode's documented default — serve.ts
|
||||
// only WARNS when OPENCODE_SERVER_PASSWORD is unset). The real boundary is the
|
||||
// 127.0.0.1 bind. Defense-in-depth basic-auth is deferred: the hey-api client's
|
||||
// auth wiring + opencode's exact scheme must be confirmed against a live server
|
||||
// first, else every request 401s. Recon explicitly said "do NOT block on it".
|
||||
const child = spawn(this.opencodeBinary, ['serve', '--hostname', '127.0.0.1', '--port', String(port)], {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
env: { ...process.env },
|
||||
});
|
||||
this.child = child;
|
||||
this.port = port;
|
||||
|
||||
// Child lifetime is the backend's (the pool's), NOT a request's. We never tie
|
||||
// it to a per-turn abort signal. On unexpected exit we mark down + log; crash
|
||||
// recovery is Phase 3.
|
||||
child.on('exit', (code, signal) => {
|
||||
this.up = false;
|
||||
this.log.warn({ code, signal, port }, 'opencode-server: child exited (recovery is Phase 3)');
|
||||
});
|
||||
|
||||
await waitForReady(child, READY_TIMEOUT_MS);
|
||||
|
||||
this.client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${port}` });
|
||||
this.up = true;
|
||||
this.log.info({ port }, 'opencode-server: ready');
|
||||
}
|
||||
|
||||
// ─── SSE read loop + demux + translate (1.3) + dedup (1.4) ───────────────────
|
||||
|
||||
/** Per-directory SSE subscription. opencode scopes events by directory (defaults
|
||||
* to process.cwd if omitted) — so we must subscribe with the same directory used
|
||||
* to create the session. Called from ensureSession; reconnects while up. */
|
||||
private startEventLoop(directory: string): void {
|
||||
if (this.sseRunning) return;
|
||||
this.sseRunning = true;
|
||||
this.sseDirectory = directory;
|
||||
void this.runEventLoop(directory);
|
||||
}
|
||||
|
||||
private sseDirectory: string | null = null;
|
||||
|
||||
private async runEventLoop(directory: string): Promise<void> {
|
||||
while (this.up && this.client) {
|
||||
try {
|
||||
const sub = await this.client.event.subscribe({ directory });
|
||||
for await (const ev of sub.stream) {
|
||||
this.dispatchEvent(ev);
|
||||
}
|
||||
if (this.up) {
|
||||
await this.reconcileInFlight();
|
||||
await sleep(SSE_RECONNECT_DELAY_MS);
|
||||
}
|
||||
} catch (err) {
|
||||
if (!this.up) break;
|
||||
this.log.warn({ err: errMsg(err) }, 'opencode-server: event loop error; reconnecting');
|
||||
await this.reconcileInFlight();
|
||||
await sleep(SSE_RECONNECT_DELAY_MS);
|
||||
}
|
||||
}
|
||||
this.sseRunning = false;
|
||||
}
|
||||
|
||||
/** Demux one event to the owning session's active turn. Unknown/between-turns → drop. */
|
||||
private dispatchEvent(ev: Event): void {
|
||||
switch (ev.type) {
|
||||
// ─── session.next.* — live streaming events (the primary path) ─────────
|
||||
case 'session.next.text.delta': {
|
||||
const p = ev.properties;
|
||||
const st = this.byOpencodeId.get(p.sessionID);
|
||||
if (!st?.activeTurn) return;
|
||||
this.bumpActivity(st);
|
||||
st.activeTurn.onEvent({ type: 'text', text: p.delta });
|
||||
return;
|
||||
}
|
||||
case 'session.next.reasoning.delta': {
|
||||
const p = ev.properties;
|
||||
const st = this.byOpencodeId.get(p.sessionID);
|
||||
if (!st?.activeTurn) return;
|
||||
this.bumpActivity(st);
|
||||
st.activeTurn.onEvent({ type: 'reasoning', text: p.delta });
|
||||
return;
|
||||
}
|
||||
case 'session.next.tool.called': {
|
||||
const p = ev.properties;
|
||||
const st = this.byOpencodeId.get(p.sessionID);
|
||||
if (!st?.activeTurn) return;
|
||||
this.bumpActivity(st);
|
||||
const snap: AcpToolSnapshot = {
|
||||
toolCallId: p.callID,
|
||||
title: p.tool,
|
||||
kind: null,
|
||||
status: 'in_progress',
|
||||
rawInput: p.input,
|
||||
rawOutput: undefined,
|
||||
};
|
||||
st.activeTurn.onEvent({ type: 'tool_call', toolCall: snap });
|
||||
return;
|
||||
}
|
||||
case 'session.next.tool.success': {
|
||||
const p = ev.properties;
|
||||
const st = this.byOpencodeId.get(p.sessionID);
|
||||
if (!st?.activeTurn) return;
|
||||
this.bumpActivity(st);
|
||||
const output = p.content?.map((c) => ('text' in c ? (c as { text: string }).text : '')).join('') ?? '';
|
||||
const snap: AcpToolSnapshot = {
|
||||
toolCallId: p.callID,
|
||||
title: p.callID,
|
||||
kind: null,
|
||||
status: 'completed',
|
||||
rawInput: undefined,
|
||||
rawOutput: output,
|
||||
};
|
||||
st.activeTurn.onEvent({ type: 'tool_update', toolCall: snap });
|
||||
return;
|
||||
}
|
||||
case 'session.next.tool.failed': {
|
||||
const p = ev.properties;
|
||||
const st = this.byOpencodeId.get(p.sessionID);
|
||||
if (!st?.activeTurn) return;
|
||||
this.bumpActivity(st);
|
||||
const snap: AcpToolSnapshot = {
|
||||
toolCallId: p.callID,
|
||||
title: p.callID,
|
||||
kind: null,
|
||||
status: 'failed',
|
||||
rawInput: undefined,
|
||||
rawOutput: errToString(p.error),
|
||||
};
|
||||
st.activeTurn.onEvent({ type: 'tool_update', toolCall: snap });
|
||||
return;
|
||||
}
|
||||
// ─── message.part.* — terminal/post-hoc events (dedup gate) ────────────
|
||||
case 'message.part.delta': {
|
||||
const p = ev.properties;
|
||||
const st = this.byOpencodeId.get(p.sessionID);
|
||||
if (!st?.activeTurn) return;
|
||||
this.bumpActivity(st);
|
||||
const isReasoning = p.field === 'reasoning' || st.partTypeById.get(p.partID) === 'reasoning';
|
||||
if (isReasoning) {
|
||||
st.streamedPartKeys.add(`reasoning:${p.partID}`);
|
||||
st.activeTurn.onEvent({ type: 'reasoning', text: p.delta });
|
||||
} else if (p.field === 'text') {
|
||||
st.streamedPartKeys.add(`text:${p.partID}`);
|
||||
st.activeTurn.onEvent({ type: 'text', text: p.delta });
|
||||
}
|
||||
return;
|
||||
}
|
||||
case 'message.part.updated': {
|
||||
const part = ev.properties.part;
|
||||
const st = this.byOpencodeId.get(part.sessionID);
|
||||
if (!st?.activeTurn) return;
|
||||
this.bumpActivity(st);
|
||||
this.handleUpdatedPart(part, st);
|
||||
return;
|
||||
}
|
||||
// ─── lifecycle ─────────────────────────────────────────────────────────
|
||||
case 'session.idle': {
|
||||
this.byOpencodeId.get(ev.properties.sessionID)?.activeTurn?.settle({ ok: true });
|
||||
return;
|
||||
}
|
||||
case 'session.error': {
|
||||
const sid = ev.properties.sessionID;
|
||||
if (!sid) return;
|
||||
this.byOpencodeId.get(sid)?.activeTurn?.settle({ ok: false, error: errToString(ev.properties.error) });
|
||||
return;
|
||||
}
|
||||
default:
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/** Terminal part: dedup gate for text/reasoning; tool parts → tool_call/tool_update. */
|
||||
private handleUpdatedPart(part: Part, st: SessionState): void {
|
||||
const turn = st.activeTurn;
|
||||
if (!turn) return;
|
||||
|
||||
if (part.type === 'text' || part.type === 'reasoning') {
|
||||
st.partTypeById.set(part.id, part.type);
|
||||
const key = resolvePartDedupeKey(part, part.type);
|
||||
if (key && st.streamedPartKeys.delete(key)) return; // already streamed via delta
|
||||
const text = part.text ?? '';
|
||||
if (text && part.time?.end != null) {
|
||||
turn.onEvent({ type: part.type, text });
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (part.type === 'tool') {
|
||||
const snap = toolPartToSnapshot(part);
|
||||
const status = part.state?.status;
|
||||
// tool_call on start (pending/running), tool_update on terminal (completed/error).
|
||||
// The current ACP path merges both into one frame; the contract keeps them
|
||||
// distinct because opencode's SSE distinguishes start from result.
|
||||
const event: AgentEvent =
|
||||
status === 'completed' || status === 'error'
|
||||
? { type: 'tool_update', toolCall: snap }
|
||||
: { type: 'tool_call', toolCall: snap };
|
||||
turn.onEvent(event);
|
||||
return;
|
||||
}
|
||||
// NOTE: opencode's SSE payload union carries no available-commands event, so the
|
||||
// AgentEvent 'commands' arm is intentionally never emitted here (1.3).
|
||||
}
|
||||
|
||||
// ─── turn-completion resilience (watchdog + reconnect reconcile) ─────────────
|
||||
|
||||
/** Reset the inactivity backstop on any event routed to a session's active turn. */
|
||||
private bumpActivity(st: SessionState): void {
|
||||
if (!st.activeTurn) return;
|
||||
if (st.watchdog) clearTimeout(st.watchdog);
|
||||
st.watchdog = setTimeout(() => {
|
||||
void this.onTurnStall(st);
|
||||
}, TURN_INACTIVITY_MS);
|
||||
st.watchdog.unref?.();
|
||||
}
|
||||
|
||||
/** Watchdog fired: reconcile once; if the server says still-running we can't tell, so fail closed.
|
||||
* Also mark the agent_sessions row crashed so a stale session isn't resumed next turn. */
|
||||
private async onTurnStall(st: SessionState): Promise<void> {
|
||||
const settled = await this.reconcile(st);
|
||||
if (!settled) {
|
||||
this.log.warn({ agentSessionId: st.agentSessionId }, 'opencode-server: turn stalled (no activity), failing + marking crashed');
|
||||
await this.sql`
|
||||
UPDATE agent_sessions SET status = 'crashed'
|
||||
WHERE agent_session_id = ${st.agentSessionId}
|
||||
`.catch(() => {});
|
||||
st.activeTurn?.settle({ ok: false, error: 'turn timed out (no activity)' });
|
||||
}
|
||||
}
|
||||
|
||||
/** Reconcile every in-flight turn against the server (called after an SSE drop). */
|
||||
private async reconcileInFlight(): Promise<void> {
|
||||
const states = [...this.byOpencodeId.values()].filter((s) => s.activeTurn);
|
||||
if (states.length === 0) return;
|
||||
await Promise.allSettled(states.map((s) => this.reconcile(s)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Ask the server whether this session's turn already finished — recovers a
|
||||
* session.idle/error lost during an SSE gap. Returns true if it settled the turn.
|
||||
* Inconclusive (still running / call failed) → false; the watchdog covers that.
|
||||
*/
|
||||
private async reconcile(st: SessionState): Promise<boolean> {
|
||||
const turn = st.activeTurn;
|
||||
if (!turn || !this.client) return false;
|
||||
try {
|
||||
const res = await this.client.session.messages({
|
||||
sessionID: st.agentSessionId,
|
||||
directory: st.worktreePath,
|
||||
});
|
||||
if (res.error || !res.data) return false;
|
||||
let lastAssistant: AssistantMessage | undefined;
|
||||
for (let i = res.data.length - 1; i >= 0; i--) {
|
||||
const info = res.data[i]!.info;
|
||||
if (info.role === 'assistant') {
|
||||
lastAssistant = info;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!lastAssistant) return false;
|
||||
if (lastAssistant.error != null) {
|
||||
turn.settle({ ok: false, error: errToString(lastAssistant.error) });
|
||||
return true;
|
||||
}
|
||||
if (lastAssistant.time.completed != null) {
|
||||
turn.settle({ ok: true });
|
||||
return true;
|
||||
}
|
||||
return false; // still running — the live stream will deliver session.idle
|
||||
} catch {
|
||||
return false; // inconclusive — watchdog backstop covers it
|
||||
}
|
||||
}
|
||||
|
||||
// ─── ensureSession: create-or-resume against agent_sessions (1.5) ────────────
|
||||
|
||||
async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise<AgentSessionHandle> {
|
||||
await this.ensureServer();
|
||||
if (!this.client) throw new Error('opencode-server: client not ready after ensureServer');
|
||||
|
||||
const configHash = sessionConfigHash(opts.model);
|
||||
const [row] = await this.sql<{ agent_session_id: string | null; status: string; config_hash: string | null }[]>`
|
||||
SELECT agent_session_id, status, config_hash FROM agent_sessions
|
||||
WHERE session_id = ${sessionId} AND agent = ${opts.agent}
|
||||
`;
|
||||
let agentSessionId = row?.agent_session_id ?? null;
|
||||
|
||||
// Don't resume crashed sessions or sessions whose config drifted (model change).
|
||||
const shouldResume = agentSessionId
|
||||
&& row!.status !== 'crashed'
|
||||
&& (row!.config_hash == null || row!.config_hash === configHash);
|
||||
|
||||
if (!shouldResume) {
|
||||
if (agentSessionId) {
|
||||
this.log.info({ sessionId, oldStatus: row!.status, hashMatch: row!.config_hash === configHash },
|
||||
'opencode-server: not resuming stale session, creating fresh');
|
||||
this.byOpencodeId.delete(agentSessionId);
|
||||
}
|
||||
const created = await this.client.session.create({ directory: opts.worktreePath });
|
||||
if (created.error || !created.data) {
|
||||
throw new Error(`opencode-server: session.create failed: ${errToString(created.error)}`);
|
||||
}
|
||||
agentSessionId = created.data.id;
|
||||
await this.sql`
|
||||
INSERT INTO agent_sessions
|
||||
(session_id, agent, backend, agent_session_id, server_port, status, last_active_at, config_hash)
|
||||
VALUES
|
||||
(${sessionId}, ${opts.agent}, 'opencode_server', ${agentSessionId}, ${this.port}, 'active', clock_timestamp(), ${configHash})
|
||||
ON CONFLICT (session_id, agent) DO UPDATE SET
|
||||
backend = 'opencode_server',
|
||||
agent_session_id = EXCLUDED.agent_session_id,
|
||||
server_port = EXCLUDED.server_port,
|
||||
status = 'active',
|
||||
last_active_at = clock_timestamp(),
|
||||
config_hash = EXCLUDED.config_hash
|
||||
`;
|
||||
} else {
|
||||
await this.sql`
|
||||
UPDATE agent_sessions
|
||||
SET status = 'active', last_active_at = clock_timestamp(), server_port = ${this.port}, config_hash = ${configHash}
|
||||
WHERE session_id = ${sessionId} AND agent = ${opts.agent}
|
||||
`;
|
||||
}
|
||||
|
||||
// Both branches above guarantee agentSessionId is non-null.
|
||||
const ocSessionId = agentSessionId!;
|
||||
|
||||
// Start (or re-start) the SSE event loop scoped to this session's directory.
|
||||
// opencode scopes events by the `directory` query param; without it events
|
||||
// default to the server's CWD which doesn't match our worktree paths.
|
||||
//
|
||||
// KNOWN Phase 1 LIMITATION: one SSE stream at a time, scoped to a single
|
||||
// directory. Under 1.9 concurrency, if two opencode sessions use different
|
||||
// worktree directories simultaneously, re-subscribing for the second drops
|
||||
// the first session's events (the watchdog backstop prevents a full hang,
|
||||
// but streamed content is lost). Phase 2 should move to per-session SSE
|
||||
// subscriptions or a directory-agnostic event path.
|
||||
if (!this.sseRunning || this.sseDirectory !== opts.worktreePath) {
|
||||
if (this.sseRunning && this.sseDirectory && this.sseDirectory !== opts.worktreePath) {
|
||||
this.log.warn(
|
||||
{ prev: this.sseDirectory, next: opts.worktreePath },
|
||||
'opencode-server: SSE directory changed — concurrent sessions will lose events from the previous directory',
|
||||
);
|
||||
}
|
||||
this.sseRunning = false;
|
||||
this.startEventLoop(opts.worktreePath);
|
||||
}
|
||||
|
||||
// Register / refresh the demux entry the SSE loop keys on. Preserve an existing
|
||||
// entry (and any in-flight turn) — just refresh the routing fields.
|
||||
const existing = this.byOpencodeId.get(ocSessionId);
|
||||
if (existing) {
|
||||
existing.boocodeSessionId = sessionId;
|
||||
existing.worktreePath = opts.worktreePath;
|
||||
} else {
|
||||
this.byOpencodeId.set(ocSessionId, {
|
||||
boocodeSessionId: sessionId,
|
||||
agentSessionId: ocSessionId,
|
||||
worktreePath: opts.worktreePath,
|
||||
streamedPartKeys: new Set(),
|
||||
partTypeById: new Map(),
|
||||
activeTurn: null,
|
||||
watchdog: null,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
agent: opts.agent,
|
||||
backend: 'opencode_server',
|
||||
agentSessionId: ocSessionId,
|
||||
serverPort: this.port,
|
||||
};
|
||||
}
|
||||
|
||||
// ─── prompt: send one turn (1.6) ─────────────────────────────────────────────
|
||||
|
||||
async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise<TurnResult> {
|
||||
if (!this.client) throw new Error('opencode-server: client not ready');
|
||||
const oc = handle.agentSessionId;
|
||||
if (!oc) throw new Error('opencode-server: handle has no agentSessionId');
|
||||
|
||||
let state = this.byOpencodeId.get(oc);
|
||||
if (!state) {
|
||||
state = {
|
||||
boocodeSessionId: handle.sessionId,
|
||||
agentSessionId: oc,
|
||||
worktreePath: ctx.worktreePath,
|
||||
streamedPartKeys: new Set(),
|
||||
partTypeById: new Map(),
|
||||
activeTurn: null,
|
||||
watchdog: null,
|
||||
};
|
||||
this.byOpencodeId.set(oc, state);
|
||||
}
|
||||
const session = state;
|
||||
// Authoritative per-turn directory for SDK routing + reconcile.
|
||||
session.worktreePath = ctx.worktreePath;
|
||||
const client = this.client;
|
||||
|
||||
return await new Promise<TurnResult>((resolve) => {
|
||||
let settled = false;
|
||||
const cleanup = () => {
|
||||
session.activeTurn = null;
|
||||
if (session.watchdog) {
|
||||
clearTimeout(session.watchdog);
|
||||
session.watchdog = null;
|
||||
}
|
||||
session.streamedPartKeys.clear();
|
||||
session.partTypeById.clear();
|
||||
ctx.signal.removeEventListener('abort', onAbort);
|
||||
};
|
||||
const settle = (r: TurnResult) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
cleanup();
|
||||
resolve(r);
|
||||
};
|
||||
const onAbort = () => {
|
||||
// Abort the turn only — never the server.
|
||||
client.session.abort({ sessionID: oc, directory: ctx.worktreePath }).catch(() => {});
|
||||
settle({ ok: false, error: 'aborted' });
|
||||
};
|
||||
|
||||
session.activeTurn = { onEvent: ctx.onEvent, settle };
|
||||
this.bumpActivity(session); // arm the inactivity backstop
|
||||
|
||||
if (ctx.signal.aborted) {
|
||||
onAbort();
|
||||
return;
|
||||
}
|
||||
ctx.signal.addEventListener('abort', onAbort, { once: true });
|
||||
|
||||
const model = parseModel(ctx.model);
|
||||
client.session
|
||||
.promptAsync({
|
||||
sessionID: oc,
|
||||
directory: ctx.worktreePath,
|
||||
parts: [{ type: 'text', text: input }],
|
||||
...(model ? { model } : {}),
|
||||
})
|
||||
.then((res) => {
|
||||
// promptAsync is fire-and-forget (204); the turn completes via session.idle.
|
||||
// Only a submission error settles here.
|
||||
if (res.error) settle({ ok: false, error: errToString(res.error) });
|
||||
})
|
||||
.catch((err) => settle({ ok: false, error: errMsg(err) }));
|
||||
});
|
||||
}
|
||||
|
||||
// ─── teardown ────────────────────────────────────────────────────────────────
|
||||
|
||||
async closeSession(handle: AgentSessionHandle): Promise<void> {
|
||||
if (handle.agentSessionId) this.byOpencodeId.delete(handle.agentSessionId);
|
||||
await this.sql`
|
||||
UPDATE agent_sessions SET status = 'closed'
|
||||
WHERE session_id = ${handle.sessionId} AND agent = ${handle.agent}
|
||||
`.catch(() => {});
|
||||
}
|
||||
|
||||
async dispose(): Promise<void> {
|
||||
this.up = false;
|
||||
const child = this.child;
|
||||
this.child = null;
|
||||
this.client = null;
|
||||
this.byOpencodeId.clear();
|
||||
if (child && !child.killed) {
|
||||
child.kill('SIGTERM');
|
||||
const t = setTimeout(() => {
|
||||
if (!child.killed) child.kill('SIGKILL');
|
||||
}, 5_000);
|
||||
t.unref();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
/** BooCoder model string "provider/model" → opencode's structured {providerID, modelID}. */
|
||||
function parseModel(model: string | undefined): { providerID: string; modelID: string } | undefined {
|
||||
if (!model || !model.trim()) return undefined;
|
||||
const trimmed = model.trim();
|
||||
const idx = trimmed.indexOf('/');
|
||||
if (idx > 0 && idx < trimmed.length - 1) {
|
||||
return { providerID: trimmed.slice(0, idx), modelID: trimmed.slice(idx + 1) };
|
||||
}
|
||||
// No slash but non-empty → infer llama-swap (the only configured provider).
|
||||
// Guard against bare '/' or trailing/leading slash.
|
||||
if (idx < 0 && trimmed.length > 0) {
|
||||
return { providerID: 'llama-swap', modelID: trimmed };
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** Ported verbatim from Paseo opencode-agent.ts: id → message-id fallback → null. */
|
||||
function resolvePartDedupeKey(part: { id: string; messageID: string }, type: string): string | null {
|
||||
if (part.id.trim().length > 0) return `${type}:${part.id}`;
|
||||
if (part.messageID.trim().length > 0) return `${type}:message:${part.messageID}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
/** opencode ToolPart → ACP-shaped snapshot (reuses the existing persist/render path). */
|
||||
function toolPartToSnapshot(part: ToolPart): AcpToolSnapshot {
|
||||
const state = part.state;
|
||||
let rawInput: unknown;
|
||||
let rawOutput: unknown;
|
||||
let title: string | undefined;
|
||||
if (state) {
|
||||
if ('input' in state) rawInput = (state as { input?: unknown }).input;
|
||||
if ('output' in state) rawOutput = (state as { output?: unknown }).output;
|
||||
else if ('error' in state) rawOutput = (state as { error?: unknown }).error;
|
||||
if ('title' in state) title = (state as { title?: string }).title;
|
||||
}
|
||||
return {
|
||||
toolCallId: part.callID,
|
||||
title: title ?? part.tool,
|
||||
kind: null,
|
||||
status: mapToolStatus(state?.status),
|
||||
rawInput,
|
||||
rawOutput,
|
||||
};
|
||||
}
|
||||
|
||||
function mapToolStatus(s: ToolState['status'] | undefined): ToolCallStatus | null {
|
||||
switch (s) {
|
||||
case 'pending':
|
||||
return 'pending';
|
||||
case 'running':
|
||||
return 'in_progress';
|
||||
case 'completed':
|
||||
return 'completed';
|
||||
case 'error':
|
||||
return 'failed';
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/** Bind-probe an ephemeral port on loopback. */
|
||||
function freePort(): Promise<number> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const srv = createServer();
|
||||
srv.unref();
|
||||
srv.on('error', reject);
|
||||
srv.listen(0, '127.0.0.1', () => {
|
||||
const addr = srv.address();
|
||||
if (addr && typeof addr === 'object') {
|
||||
const { port } = addr;
|
||||
srv.close(() => resolve(port));
|
||||
} else {
|
||||
srv.close(() => reject(new Error('opencode-server: could not determine a free port')));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/** Resolve when the child prints the ready line; reject on timeout or early exit. */
|
||||
function waitForReady(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let done = false;
|
||||
let stderrBuf = '';
|
||||
|
||||
const finish = (err?: Error) => {
|
||||
if (done) return;
|
||||
done = true;
|
||||
clearTimeout(timer);
|
||||
child.stdout?.off('data', onOut);
|
||||
child.stderr?.off('data', onErr);
|
||||
child.off('exit', onExit);
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
};
|
||||
|
||||
const onOut = (buf: Buffer) => {
|
||||
if (buf.toString().includes('opencode server listening on')) finish();
|
||||
};
|
||||
const onErr = (buf: Buffer) => {
|
||||
stderrBuf += buf.toString();
|
||||
};
|
||||
const onExit = (code: number | null) =>
|
||||
finish(new Error(`opencode serve exited before ready (code ${code}); stderr: ${stderrBuf.slice(-2000)}`));
|
||||
const timer = setTimeout(
|
||||
() => finish(new Error(`opencode serve not ready in ${timeoutMs}ms; stderr: ${stderrBuf.slice(-2000)}`)),
|
||||
timeoutMs,
|
||||
);
|
||||
|
||||
child.stdout?.on('data', onOut);
|
||||
child.stderr?.on('data', onErr);
|
||||
child.on('exit', onExit);
|
||||
});
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((r) => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
function errMsg(e: unknown): string {
|
||||
return e instanceof Error ? e.message : String(e);
|
||||
}
|
||||
|
||||
function errToString(e: unknown): string {
|
||||
if (e == null) return 'unknown error';
|
||||
if (typeof e === 'string') return e;
|
||||
if (e instanceof Error) return e.message;
|
||||
try {
|
||||
return JSON.stringify(e);
|
||||
} catch {
|
||||
return String(e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Hash of stable config — detects model changes across sessions without
|
||||
* invalidating on ephemeral state like the random server port (which changes
|
||||
* every BooCoder restart). */
|
||||
function sessionConfigHash(model: string): string {
|
||||
return createHash('sha256').update(`opencode_server|${model}`).digest('hex').slice(0, 16);
|
||||
}
|
||||
Reference in New Issue
Block a user