/** * AgentEvent → WS-frame emitter + turn accumulators. * * Extracted (v2.7 audit reshape) from `AcpStreamContext.handleSessionUpdate` in * `acp-dispatch.ts` — the `AgentEvent → broker.publishFrame` switch that maps a * backend's normalized events onto the wire frames the UI consumes, while * accumulating the turn's text / reasoning / tool snapshots for persistence. * * The same shape backs the dispatcher's 4 inline `onEvent` copies (DEFERRED while * dispatcher.ts has uncommitted edits), hence the optional `dcp` stripper + the * `finalize()` flush: the opencode dispatch path strips dcp tags from text deltas, * the ACP path does not (passes no `dcp`, so text is emitted verbatim — identical * to the prior AcpStreamContext behavior). * * Publishing is gated on `canStream()` (all of broker/sessionId/chatId/assistantId * present) exactly as the original — a one-shot dispatch with no broker accumulates * but never publishes. */ import type { Broker } from '@boocode/server/broker'; import type { WsFrame } from '@boocode/contracts/ws-frames'; import type { AgentEvent } from './agent-backend.js'; import { type AcpToolSnapshot, snapshotToWireToolCall, mapToolLifecycleStatus } from './acp-tool-snapshot.js'; import { mergeTaskCommands, getTaskCommands } from './agent-commands-cache.js'; import type { DcpStreamStripper } from './dcp-strip.js'; import { emitHook } from '../plugins/host.js'; export interface FrameEmitterOpts { broker?: Broker; sessionId?: string; chatId?: string; /** The assistant message id — the frames' `message_id`. */ assistantId?: string; /** Per-turn task id, for the agent_commands frame + command cache. */ taskId?: string; /** Optional cross-chunk dcp stripper for text deltas (opencode path). When * provided, text is stripped before push/publish and `finalize()` flushes the * held-back tail. The ACP path passes none → text emitted verbatim. */ dcp?: DcpStreamStripper; } export interface FrameEmitter { /** Map one AgentEvent to its WS frame(s) + accumulate it. */ onEvent: (e: AgentEvent) => void; /** Flush a dcp stripper's held-back tail at turn end (no-op without `dcp`). */ finalize: () => void; /** The merge accumulator for tool snapshots (toolCallId → snapshot). */ readonly toolSnapshots: Map; /** Accumulated assistant text (post-dcp-strip when a stripper is set). */ readonly output: string; /** Accumulated reasoning text. */ readonly reasoningText: string; /** Tool snapshots in insertion order. */ readonly snapshots: AcpToolSnapshot[]; } export function makeFrameEmitter(opts: FrameEmitterOpts): FrameEmitter { const { broker, sessionId, chatId, assistantId, taskId, dcp } = opts; const textChunks: string[] = []; const reasoningChunks: string[] = []; const toolSnapshots = new Map(); const canStream = (): boolean => !!(broker && sessionId && chatId && assistantId); const publishText = (content: string): void => { textChunks.push(content); if (canStream()) { broker!.publishFrame(sessionId!, { type: 'delta', message_id: assistantId!, chat_id: chatId!, content, } as WsFrame); } }; const onEvent = (e: AgentEvent): void => { switch (e.type) { case 'text': { const safe = dcp ? dcp.push(e.text) : e.text; if (safe) publishText(safe); break; } case 'reasoning': reasoningChunks.push(e.text); if (canStream()) { broker!.publishFrame(sessionId!, { type: 'reasoning_delta', message_id: assistantId!, chat_id: chatId!, content: e.text, } as WsFrame); } break; case 'tool_call': toolSnapshots.set(e.toolCall.toolCallId, e.toolCall); if (canStream()) { broker!.publishFrame(sessionId!, { type: 'tool_call', message_id: assistantId!, chat_id: chatId!, tool_call: snapshotToWireToolCall(e.toolCall), } as WsFrame); } break; case 'tool_update': toolSnapshots.set(e.toolCall.toolCallId, e.toolCall); { const lifecycle = mapToolLifecycleStatus(e.toolCall.status, e.toolCall.rawOutput); if (lifecycle === 'completed' || lifecycle === 'failed') { void emitHook('tool.execute.after', { toolName: e.toolCall.title, args: e.toolCall.rawInput, result: e.toolCall.rawOutput, duration: undefined, }); } } if (canStream()) { broker!.publishFrame(sessionId!, { type: 'tool_call', message_id: assistantId!, chat_id: chatId!, tool_call: snapshotToWireToolCall(e.toolCall), } as WsFrame); } break; case 'commands': if (taskId && e.commands.length > 0) { mergeTaskCommands(taskId, e.commands); if (canStream() && sessionId) { const all = getTaskCommands(taskId) ?? e.commands; broker!.publishFrame(sessionId, { type: 'agent_commands', task_id: taskId, session_id: sessionId, commands: all, } as WsFrame); } } break; } }; const finalize = (): void => { if (!dcp) return; const tail = dcp.flush(); if (tail) publishText(tail); }; return { onEvent, finalize, toolSnapshots, get output() { return textChunks.join(''); }, get reasoningText() { return reasoningChunks.join(''); }, get snapshots() { return [...toolSnapshots.values()]; }, }; }