worktree-risk.ts now returns the package's WorktreeRiskReport (local RiskReport interface removed); frame-emitter.ts imports WsFrame from @boocode/contracts/ws-frames (the deleted @boocode/server/ws-frames subpath). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
143 lines
4.8 KiB
TypeScript
143 lines
4.8 KiB
TypeScript
/**
|
|
* AgentEvent → WS-frame emitter + turn accumulators.
|
|
*
|
|
* Extracted (v2.7 audit reshape) from `AcpStreamContext.handleSessionUpdate` in
|
|
* `acp-dispatch.ts` — the `AgentEvent → broker.publishFrame` switch that maps a
|
|
* backend's normalized events onto the wire frames the UI consumes, while
|
|
* accumulating the turn's text / reasoning / tool snapshots for persistence.
|
|
*
|
|
* The same shape backs the dispatcher's 4 inline `onEvent` copies (DEFERRED while
|
|
* dispatcher.ts has uncommitted edits), hence the optional `dcp` stripper + the
|
|
* `finalize()` flush: the opencode dispatch path strips dcp tags from text deltas,
|
|
* the ACP path does not (passes no `dcp`, so text is emitted verbatim — identical
|
|
* to the prior AcpStreamContext behavior).
|
|
*
|
|
* Publishing is gated on `canStream()` (all of broker/sessionId/chatId/assistantId
|
|
* present) exactly as the original — a one-shot dispatch with no broker accumulates
|
|
* but never publishes.
|
|
*/
|
|
import type { Broker } from '@boocode/server/broker';
|
|
import type { WsFrame } from '@boocode/contracts/ws-frames';
|
|
import type { AgentEvent } from './agent-backend.js';
|
|
import { type AcpToolSnapshot, snapshotToWireToolCall } from './acp-tool-snapshot.js';
|
|
import { mergeTaskCommands, getTaskCommands } from './agent-commands-cache.js';
|
|
import type { DcpStreamStripper } from './dcp-strip.js';
|
|
|
|
export interface FrameEmitterOpts {
|
|
broker?: Broker;
|
|
sessionId?: string;
|
|
chatId?: string;
|
|
/** The assistant message id — the frames' `message_id`. */
|
|
assistantId?: string;
|
|
/** Per-turn task id, for the agent_commands frame + command cache. */
|
|
taskId?: string;
|
|
/** Optional cross-chunk dcp stripper for text deltas (opencode path). When
|
|
* provided, text is stripped before push/publish and `finalize()` flushes the
|
|
* held-back tail. The ACP path passes none → text emitted verbatim. */
|
|
dcp?: DcpStreamStripper;
|
|
}
|
|
|
|
export interface FrameEmitter {
|
|
/** Map one AgentEvent to its WS frame(s) + accumulate it. */
|
|
onEvent: (e: AgentEvent) => void;
|
|
/** Flush a dcp stripper's held-back tail at turn end (no-op without `dcp`). */
|
|
finalize: () => void;
|
|
/** The merge accumulator for tool snapshots (toolCallId → snapshot). */
|
|
readonly toolSnapshots: Map<string, AcpToolSnapshot>;
|
|
/** Accumulated assistant text (post-dcp-strip when a stripper is set). */
|
|
readonly output: string;
|
|
/** Accumulated reasoning text. */
|
|
readonly reasoningText: string;
|
|
/** Tool snapshots in insertion order. */
|
|
readonly snapshots: AcpToolSnapshot[];
|
|
}
|
|
|
|
export function makeFrameEmitter(opts: FrameEmitterOpts): FrameEmitter {
|
|
const { broker, sessionId, chatId, assistantId, taskId, dcp } = opts;
|
|
const textChunks: string[] = [];
|
|
const reasoningChunks: string[] = [];
|
|
const toolSnapshots = new Map<string, AcpToolSnapshot>();
|
|
|
|
const canStream = (): boolean => !!(broker && sessionId && chatId && assistantId);
|
|
|
|
const publishText = (content: string): void => {
|
|
textChunks.push(content);
|
|
if (canStream()) {
|
|
broker!.publishFrame(sessionId!, {
|
|
type: 'delta',
|
|
message_id: assistantId!,
|
|
chat_id: chatId!,
|
|
content,
|
|
} as WsFrame);
|
|
}
|
|
};
|
|
|
|
const onEvent = (e: AgentEvent): void => {
|
|
switch (e.type) {
|
|
case 'text': {
|
|
const safe = dcp ? dcp.push(e.text) : e.text;
|
|
if (safe) publishText(safe);
|
|
break;
|
|
}
|
|
case 'reasoning':
|
|
reasoningChunks.push(e.text);
|
|
if (canStream()) {
|
|
broker!.publishFrame(sessionId!, {
|
|
type: 'reasoning_delta',
|
|
message_id: assistantId!,
|
|
chat_id: chatId!,
|
|
content: e.text,
|
|
} as WsFrame);
|
|
}
|
|
break;
|
|
case 'tool_call':
|
|
case 'tool_update':
|
|
toolSnapshots.set(e.toolCall.toolCallId, e.toolCall);
|
|
if (canStream()) {
|
|
broker!.publishFrame(sessionId!, {
|
|
type: 'tool_call',
|
|
message_id: assistantId!,
|
|
chat_id: chatId!,
|
|
tool_call: snapshotToWireToolCall(e.toolCall),
|
|
} as WsFrame);
|
|
}
|
|
break;
|
|
case 'commands':
|
|
if (taskId && e.commands.length > 0) {
|
|
mergeTaskCommands(taskId, e.commands);
|
|
if (canStream() && sessionId) {
|
|
const all = getTaskCommands(taskId) ?? e.commands;
|
|
broker!.publishFrame(sessionId, {
|
|
type: 'agent_commands',
|
|
task_id: taskId,
|
|
session_id: sessionId,
|
|
commands: all,
|
|
} as WsFrame);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
};
|
|
|
|
const finalize = (): void => {
|
|
if (!dcp) return;
|
|
const tail = dcp.flush();
|
|
if (tail) publishText(tail);
|
|
};
|
|
|
|
return {
|
|
onEvent,
|
|
finalize,
|
|
toolSnapshots,
|
|
get output() {
|
|
return textChunks.join('');
|
|
},
|
|
get reasoningText() {
|
|
return reasoningChunks.join('');
|
|
},
|
|
get snapshots() {
|
|
return [...toolSnapshots.values()];
|
|
},
|
|
};
|
|
}
|