fix(coder): strip dcp-message-id tags split across stream chunks
The dcp tag (<dcp-message-id>mNNNN</dcp-message-id>) is streamed token-by-token, so it arrives split across SSE deltas. The existing per-chunk stripDcpTags never sees a complete tag in any single fragment, so fragments pass through and the dispatcher reassembles the tag in textChunks (persisted + shown) — and the terminal message.part.updated path that would strip the full text is suppressed by the dedup gate. Add a stateful cross-chunk stripper (dcp-strip.ts: makeDcpStreamStripper) at the dispatcher's opencode frame boundary: it emits text that cannot be part of a forming tag, holds back only a trailing partial-tag prefix (without swallowing legitimate <…> content), and flushes at turn end. Fixes both live delta frames and persisted content. 11 unit tests incl. split-at-every-boundary and the documented per-chunk-fails case. opencode path only; ACP (goose/qwen/claude) untouched. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
73
apps/coder/src/services/__tests__/dcp-strip.test.ts
Normal file
73
apps/coder/src/services/__tests__/dcp-strip.test.ts
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
import { describe, it, expect } from 'vitest';
|
||||||
|
import { stripDcpTags, makeDcpStreamStripper } from '../dcp-strip.js';
|
||||||
|
|
||||||
|
// Feed chunks through a fresh stripper and return the fully reassembled output
|
||||||
|
// (everything emitted during streaming + the final flush) — i.e. what the
|
||||||
|
// dispatcher would accumulate into the persisted message content.
|
||||||
|
function run(chunks: string[]): string {
|
||||||
|
const s = makeDcpStreamStripper();
|
||||||
|
let out = '';
|
||||||
|
for (const c of chunks) out += s.push(c);
|
||||||
|
out += s.flush();
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('stripDcpTags (one-shot)', () => {
|
||||||
|
it('removes a complete tag', () => {
|
||||||
|
expect(stripDcpTags('Yes — "Test".\n\n<dcp-message-id>m0019</dcp-message-id>')).toBe(
|
||||||
|
'Yes — "Test".\n\n',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
it('leaves text without a tag untouched', () => {
|
||||||
|
expect(stripDcpTags('no tag here')).toBe('no tag here');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('per-chunk strip is INSUFFICIENT (documents the bug)', () => {
|
||||||
|
it('a tag split across chunks survives a naive per-chunk .replace()', () => {
|
||||||
|
const chunks = ['Yes.\n\n<dcp', '-message', '-id>m0019</dcp', '-message-id>'];
|
||||||
|
const naive = chunks.map(stripDcpTags).join('');
|
||||||
|
// The reassembled content still contains the tag — this is the screenshot bug.
|
||||||
|
expect(naive).toContain('<dcp-message-id>m0019</dcp-message-id>');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('makeDcpStreamStripper (cross-chunk fix)', () => {
|
||||||
|
it('strips a tag split across chunks (the real opencode case)', () => {
|
||||||
|
expect(run(['Yes.\n\n<dcp', '-message', '-id>m0019</dcp', '-message-id>'])).toBe('Yes.\n\n');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('strips a tag split at EVERY character boundary', () => {
|
||||||
|
const full = 'Answer.<dcp-message-id>m0019</dcp-message-id>';
|
||||||
|
expect(run([...full])).toBe('Answer.');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('strips a tag delivered whole in one chunk', () => {
|
||||||
|
expect(run(['Answer.<dcp-message-id>m0019</dcp-message-id>'])).toBe('Answer.');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('passes through text with no tag', () => {
|
||||||
|
expect(run(['hello ', 'world'])).toBe('hello world');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT swallow legitimate < content (code/HTML/generics)', () => {
|
||||||
|
expect(run(['use ', '<div>', ' and ', 'Array<', 'string>'])).toBe('use <div> and Array<string>');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles a lone < that is not a dcp tag, split across chunks', () => {
|
||||||
|
expect(run(['a <', 'b c'])).toBe('a <b c');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits surrounding text and strips a mid-text tag', () => {
|
||||||
|
expect(run(['before ', '<dcp-message-id>', 'm1', '</dcp-message-id>', ' after'])).toBe(
|
||||||
|
'before after',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('flushes a truncated/never-closed partial tag without leaking it as a complete tag', () => {
|
||||||
|
// If the stream ends mid-tag, flush strips complete tags; an incomplete
|
||||||
|
// remnant is returned as-is (no complete tag ever existed to render).
|
||||||
|
const out = run(['done.<dcp-message-id>m00']);
|
||||||
|
expect(out).not.toContain('</dcp-message-id>');
|
||||||
|
});
|
||||||
|
});
|
||||||
77
apps/coder/src/services/dcp-strip.ts
Normal file
77
apps/coder/src/services/dcp-strip.ts
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
/**
|
||||||
|
* Strip opencode-dcp plugin tags (`<dcp-message-id>mNNNN</dcp-message-id>`) that
|
||||||
|
* the @tarquinen/opencode-dcp plugin appends to assistant text and which
|
||||||
|
* otherwise render as literal text in the UI.
|
||||||
|
*
|
||||||
|
* Why a streaming stripper and not a per-chunk `.replace()`: opencode streams
|
||||||
|
* assistant text token-by-token, so the tag arrives SPLIT across many SSE deltas
|
||||||
|
* (`<dcp`, `-message`, `-id>`, `m0019`, `</dcp`, …). A per-chunk regex never sees
|
||||||
|
* a complete tag in any single fragment, so the fragments pass through and the
|
||||||
|
* dispatcher reassembles the full tag in the persisted/displayed content. The
|
||||||
|
* stripper below buffers across chunks: it emits everything that cannot be part
|
||||||
|
* of a forming tag and holds back only a trailing partial-tag prefix until the
|
||||||
|
* next chunk resolves it — without holding back legitimate `<…>` content.
|
||||||
|
*/
|
||||||
|
|
||||||
|
const DCP_TAG_RE = /<dcp-message-id>[^<]*<\/dcp-message-id>/g;
|
||||||
|
const OPEN = '<dcp-message-id>';
|
||||||
|
const CLOSE = '</dcp-message-id>';
|
||||||
|
|
||||||
|
/** One-shot strip of COMPLETE tags. Safe for non-streaming / final content. */
|
||||||
|
export function stripDcpTags(s: string): string {
|
||||||
|
return s.replace(DCP_TAG_RE, '');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Could `tail` (a substring starting at a `<`) still grow into a complete dcp
|
||||||
|
* tag on a future chunk? If so the caller must hold it back rather than emit it.
|
||||||
|
* Returns false for unrelated `<` content (`<div>`, `<T>`, …) so those stream
|
||||||
|
* normally.
|
||||||
|
*/
|
||||||
|
function isPartialDcp(tail: string): boolean {
|
||||||
|
// A prefix of the opening marker: '<', '<d', …, '<dcp-message-id'.
|
||||||
|
if (OPEN.startsWith(tail)) return true;
|
||||||
|
// Opening marker fully seen — content (and maybe a forming close) still streaming.
|
||||||
|
if (tail.startsWith(OPEN)) {
|
||||||
|
const rest = tail.slice(OPEN.length);
|
||||||
|
const lt = rest.indexOf('<');
|
||||||
|
if (lt === -1) return true; // still inside the [^<]* content run
|
||||||
|
return CLOSE.startsWith(rest.slice(lt)); // a partial close marker forming
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface DcpStreamStripper {
|
||||||
|
/** Feed one text chunk; returns the portion safe to emit now (may be ''). */
|
||||||
|
push(chunk: string): string;
|
||||||
|
/** Stream end: returns whatever was held back, with complete tags stripped. */
|
||||||
|
flush(): string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Stateful, cross-chunk-safe dcp stripper. One instance per turn. */
|
||||||
|
export function makeDcpStreamStripper(): DcpStreamStripper {
|
||||||
|
let buf = '';
|
||||||
|
return {
|
||||||
|
push(chunk: string): string {
|
||||||
|
buf += chunk;
|
||||||
|
buf = buf.replace(DCP_TAG_RE, ''); // drop any now-complete tags
|
||||||
|
// Find the earliest `<` whose suffix is a forming dcp tag; hold from there,
|
||||||
|
// emit everything before it (real text, including unrelated `<…>`).
|
||||||
|
for (let i = buf.indexOf('<'); i !== -1; i = buf.indexOf('<', i + 1)) {
|
||||||
|
if (isPartialDcp(buf.slice(i))) {
|
||||||
|
const emit = buf.slice(0, i);
|
||||||
|
buf = buf.slice(i);
|
||||||
|
return emit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const emit = buf;
|
||||||
|
buf = '';
|
||||||
|
return emit;
|
||||||
|
},
|
||||||
|
flush(): string {
|
||||||
|
const out = stripDcpTags(buf);
|
||||||
|
buf = '';
|
||||||
|
return out;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import type { Broker } from '@boocode/server/broker';
|
|||||||
import type { WsFrame } from '@boocode/server/ws-frames';
|
import type { WsFrame } from '@boocode/server/ws-frames';
|
||||||
import type { Config } from '../config.js';
|
import type { Config } from '../config.js';
|
||||||
import { createWorktree, diffWorktree, cleanupWorktree, ensureSessionWorktree } from './worktrees.js';
|
import { createWorktree, diffWorktree, cleanupWorktree, ensureSessionWorktree } from './worktrees.js';
|
||||||
|
import { makeDcpStreamStripper } from './dcp-strip.js';
|
||||||
import { dispatchViaAcp } from './acp-dispatch.js';
|
import { dispatchViaAcp } from './acp-dispatch.js';
|
||||||
import { getResolvedRegistry } from './provider-config-registry.js';
|
import { getResolvedRegistry } from './provider-config-registry.js';
|
||||||
import { dispatchViaPty } from './pty-dispatch.js';
|
import { dispatchViaPty } from './pty-dispatch.js';
|
||||||
@@ -620,21 +621,30 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
|||||||
const textChunks: string[] = [];
|
const textChunks: string[] = [];
|
||||||
const reasoningChunks: string[] = [];
|
const reasoningChunks: string[] = [];
|
||||||
const toolSnaps = new Map<string, AcpToolSnapshot>();
|
const toolSnaps = new Map<string, AcpToolSnapshot>();
|
||||||
|
// opencode's dcp plugin appends <dcp-message-id>…</dcp-message-id> to the
|
||||||
|
// text, streamed split across deltas — a per-chunk regex misses it (see
|
||||||
|
// dcp-strip.ts). Buffer text through a cross-chunk stripper so neither the
|
||||||
|
// live `delta` frames nor the persisted content ever carry the tag.
|
||||||
|
const dcp = makeDcpStreamStripper();
|
||||||
|
|
||||||
// Map transport-agnostic AgentEvents → the SAME WS frames the ACP path emits.
|
// Map transport-agnostic AgentEvents → the SAME WS frames the ACP path emits.
|
||||||
// This boundary is where message_id/chat_id get attached (the backend never
|
// This boundary is where message_id/chat_id get attached (the backend never
|
||||||
// owns them).
|
// owns them).
|
||||||
const onEvent = (e: AgentEvent): void => {
|
const onEvent = (e: AgentEvent): void => {
|
||||||
switch (e.type) {
|
switch (e.type) {
|
||||||
case 'text':
|
case 'text': {
|
||||||
textChunks.push(e.text);
|
const safe = dcp.push(e.text);
|
||||||
|
if (safe) {
|
||||||
|
textChunks.push(safe);
|
||||||
broker.publishFrame(sessionId, {
|
broker.publishFrame(sessionId, {
|
||||||
type: 'delta',
|
type: 'delta',
|
||||||
message_id: assistantId,
|
message_id: assistantId,
|
||||||
chat_id: chatId,
|
chat_id: chatId,
|
||||||
content: e.text,
|
content: safe,
|
||||||
} as WsFrame);
|
} as WsFrame);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case 'reasoning':
|
case 'reasoning':
|
||||||
reasoningChunks.push(e.text);
|
reasoningChunks.push(e.text);
|
||||||
broker.publishFrame(sessionId, {
|
broker.publishFrame(sessionId, {
|
||||||
@@ -680,6 +690,18 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
|||||||
onEvent,
|
onEvent,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Flush any text held back mid-tag at stream end (complete tags stripped).
|
||||||
|
const dcpTail = dcp.flush();
|
||||||
|
if (dcpTail) {
|
||||||
|
textChunks.push(dcpTail);
|
||||||
|
broker.publishFrame(sessionId, {
|
||||||
|
type: 'delta',
|
||||||
|
message_id: assistantId,
|
||||||
|
chat_id: chatId,
|
||||||
|
content: dcpTail,
|
||||||
|
} as WsFrame);
|
||||||
|
}
|
||||||
|
|
||||||
const assistantContent = textChunks.join('').slice(0, 50_000);
|
const assistantContent = textChunks.join('').slice(0, 50_000);
|
||||||
const reasoningText = reasoningChunks.join('').slice(0, 200_000);
|
const reasoningText = reasoningChunks.join('').slice(0, 200_000);
|
||||||
const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'opencode turn failed').slice(0, 500);
|
const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'opencode turn failed').slice(0, 500);
|
||||||
|
|||||||
Reference in New Issue
Block a user