/** * Pure opencode `Event` → normalized `AgentEvent` translation. * * Extracted (v2.7 audit reshape) from `OpenCodeServerBackend.dispatchEvent` / * `handleUpdatedPart` and the file-local helpers. NO I/O, no timers, no DB, no * `byOpencodeId` — every function here is a deterministic transform over its * arguments (the dedup state is caller-owned and mutated in place, mirroring the * `acp-event-map.ts` `priorSnapshots` pattern). This is the unit-testable core; the * backend keeps the routing + side effects (watchdog, usage persistence, settle). * * Depends only on SDK TYPES + AcpToolSnapshot — safe to import anywhere. */ import type { Event, Part, ToolPart, ToolState } from '@opencode-ai/sdk/v2/client'; import type { ToolCallStatus } from '@agentclientprotocol/sdk'; import type { AcpToolSnapshot } from '../acp-tool-snapshot.js'; import type { AgentEvent } from '../agent-backend.js'; /** Per-(opencode session) dedup state the part-stream classifiers read + mutate. */ export interface DedupState { /** dedup gate: `${type}:${id}` added on delta, deleted-and-tested on updated. */ streamedPartKeys: Set; /** partID → 'text' | 'reasoning', so a delta with a non-'reasoning' field is still classed right. */ partTypeById: Map; } /** Strip opencode-dcp plugin tags that render as literal text in the UI. */ export function stripDcpTags(s: string): string { return s.replace(/[^<]*<\/dcp-message-id>/g, ''); } /** Extract the opencode sessionID an event belongs to, across event shapes. * Most carry `properties.sessionID`; `message.part.updated` nests it under * `properties.part.sessionID`. Returns null when the event has no session * (the per-session loop then leaves it to dispatchEvent, which drops it). */ export function eventSessionId(ev: Event): string | null { const props = (ev as { properties?: unknown }).properties; if (!props || typeof props !== 'object') return null; if (ev.type === 'message.part.updated') { const part = (props as { part?: { sessionID?: string } }).part; return part?.sessionID ?? null; } return (props as { sessionID?: string }).sessionID ?? null; } /** Ported verbatim from Paseo opencode-agent.ts: id → message-id fallback → null. */ export function resolvePartDedupeKey(part: { id: string; messageID: string }, type: string): string | null { if (part.id.trim().length > 0) return `${type}:${part.id}`; if (part.messageID.trim().length > 0) return `${type}:message:${part.messageID}`; return null; } export function mapToolStatus(s: ToolState['status'] | undefined): ToolCallStatus | null { switch (s) { case 'pending': return 'pending'; case 'running': return 'in_progress'; case 'completed': return 'completed'; case 'error': return 'failed'; default: return null; } } /** opencode ToolPart → ACP-shaped snapshot (reuses the existing persist/render path). */ export function toolPartToSnapshot(part: ToolPart): AcpToolSnapshot { const state = part.state; let rawInput: unknown; let rawOutput: unknown; let title: string | undefined; if (state) { if ('input' in state) rawInput = (state as { input?: unknown }).input; if ('output' in state) rawOutput = (state as { output?: unknown }).output; else if ('error' in state) rawOutput = (state as { error?: unknown }).error; if ('title' in state) title = (state as { title?: string }).title; } return { toolCallId: part.callID, title: title ?? part.tool, kind: null, status: mapToolStatus(state?.status), rawInput, rawOutput, }; } // ─── session.next.tool.* snapshot builders ─────────────────────────────────── /** `session.next.tool.called` → an in-progress tool_call snapshot. */ export function toolCalledSnapshot(p: { callID: string; tool: string; input: unknown }): AcpToolSnapshot { return { toolCallId: p.callID, title: p.tool, kind: null, status: 'in_progress', rawInput: p.input, rawOutput: undefined, }; } /** `session.next.tool.success` → a completed tool snapshot (text content joined). */ export function toolSuccessSnapshot(p: { callID: string; content?: ReadonlyArray | null }): AcpToolSnapshot { const output = p.content?.map((c) => (c && typeof c === 'object' && 'text' in c ? (c as { text: string }).text : '')).join('') ?? ''; return { toolCallId: p.callID, title: p.callID, kind: null, status: 'completed', rawInput: undefined, rawOutput: output, }; } /** `session.next.tool.failed` → a failed tool snapshot (error stringified). */ export function toolFailedSnapshot(p: { callID: string; error: unknown }): AcpToolSnapshot { return { toolCallId: p.callID, title: p.callID, kind: null, status: 'failed', rawInput: undefined, rawOutput: errToString(p.error), }; } // ─── message.part.* dedup gate ──────────────────────────────────────────────── /** * `message.part.delta`: mark the part as streamed (so a later `message.part.updated` * for the same part is deduped) and return the AgentEvent to emit, or null when the * field is neither reasoning nor text, or a text delta strips down to empty. Mutates * `st.streamedPartKeys` exactly as the original inline arm did (the key is recorded * for text even when the cleaned delta is empty). */ export function classifyPartDelta( p: { partID: string; field?: string; delta: string }, st: DedupState, ): AgentEvent | null { const isReasoning = p.field === 'reasoning' || st.partTypeById.get(p.partID) === 'reasoning'; if (isReasoning) { st.streamedPartKeys.add(`reasoning:${p.partID}`); return { type: 'reasoning', text: p.delta }; } if (p.field === 'text') { st.streamedPartKeys.add(`text:${p.partID}`); const cleaned = stripDcpTags(p.delta); return cleaned ? { type: 'text', text: cleaned } : null; } return null; } /** * `message.part.updated` terminal part: the dedup gate for text/reasoning (drop a * part already streamed via deltas; otherwise emit the finished text) plus the * tool-part → tool_call/tool_update mapping. Returns null when nothing should be * emitted. Mutates `st.partTypeById` / `st.streamedPartKeys` like the original. */ export function classifyUpdatedPart(part: Part, st: DedupState): AgentEvent | null { if (part.type === 'text' || part.type === 'reasoning') { st.partTypeById.set(part.id, part.type); const key = resolvePartDedupeKey(part, part.type); if (key && st.streamedPartKeys.delete(key)) return null; // already streamed via delta const raw = part.text ?? ''; const text = part.type === 'text' ? stripDcpTags(raw) : raw; if (text && part.time?.end != null) { return { type: part.type, text }; } return null; } if (part.type === 'tool') { const snap = toolPartToSnapshot(part); const status = part.state?.status; // tool_call on start (pending/running), tool_update on terminal (completed/error). // The current ACP path merges both into one frame; the contract keeps them // distinct because opencode's SSE distinguishes start from result. return status === 'completed' || status === 'error' ? { type: 'tool_update', toolCall: snap } : { type: 'tool_call', toolCall: snap }; } // NOTE: opencode's SSE payload union carries no available-commands event, so the // AgentEvent 'commands' arm is intentionally never emitted here. return null; } // ─── shared error formatters (pure) ─────────────────────────────────────────── export function errMsg(e: unknown): string { return e instanceof Error ? e.message : String(e); } export function errToString(e: unknown): string { if (e == null) return 'unknown error'; if (typeof e === 'string') return e; if (e instanceof Error) return e.message; try { return JSON.stringify(e); } catch { return String(e); } }