// Gate test: pins the -as-text fallback in the stream-phase text-delta // path. This test will fail if extractToolCallBlocks is ever removed from the // text-delta branch of streamCompletion, which is the only guard for models // that emit tool calls as inline XML rather than structured tool_calls. import { describe, expect, it, vi, afterEach } from 'vitest'; import type { FastifyBaseLogger } from 'fastify'; // vi.mock is hoisted before all module imports. Spread the original so all // other ai exports (tool, jsonSchema, types, …) remain real; only streamText // is replaced with a controllable spy. vi.mock('ai', async (importOriginal) => { const actual = await importOriginal(); return { ...actual, streamText: vi.fn() }; }); import { streamText } from 'ai'; import { streamCompletion, STALL_TIMEOUT_MS } from '../inference/stream-phase-adapter.js'; import type { StreamAdapterContext } from '../inference/stream-phase-adapter.js'; const INVOKE_BLOCK = '/tmp/test.ts'; // One-shot async generator that yields a single text-delta carrying a complete // block, simulating a model that emits its tool call as plain XML text. async function* makeInvokeTextDeltaStream() { yield { type: 'text-delta' as const, text: INVOKE_BLOCK }; } const fakeLog = { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), fatal: vi.fn(), trace: vi.fn(), child: vi.fn(), } as unknown as FastifyBaseLogger; const fakeCtx: StreamAdapterContext = { config: { LLAMA_SWAP_URL: 'http://localhost:11434' } as StreamAdapterContext['config'], log: fakeLog, }; describe('-as-text fallback gate (stream-phase text-delta path)', () => { it('surfaces a plain-text block as a toolCall and strips markup from content and deltas', async () => { vi.mocked(streamText).mockReturnValue({ fullStream: makeInvokeTextDeltaStream(), usage: Promise.resolve({ inputTokens: 1, outputTokens: 1 }), } as unknown as ReturnType); const deltas: string[] = []; const result = await streamCompletion( fakeCtx, 'test-model', [{ role: 'user', content: 'call a tool' }], { tools: null }, (d) => deltas.push(d), undefined, ); // The block must surface as a structured tool call expect(result.toolCalls).toHaveLength(1); expect(result.toolCalls[0]).toMatchObject({ id: 'xml_call_0', name: 'view_file', args: { path: '/tmp/test.ts' }, }); // The XML markup must not appear in the saved content or any flushed delta expect(result.content).not.toContain(''); expect(deltas.join('')).not.toContain(' { afterEach(() => { vi.useRealTimers(); }); it(`aborts the stream after ${STALL_TIMEOUT_MS}ms with no chunks (stall path)`, async () => { vi.useFakeTimers(); // Capture the effectiveSignal the adapter passes to streamText so the fake // generator can unblock when the stall fires (matching real ReadableStream // abort behavior: the stream ends rather than throwing into the generator). let capturedSignal: AbortSignal | undefined; vi.mocked(streamText).mockImplementation((opts: Parameters[0]) => { capturedSignal = opts.abortSignal as AbortSignal | undefined; return { // Hang until the effective signal fires, then return without emitting // any parts — mirrors how a real fetch stream ends when aborted. fullStream: (async function* () { await new Promise((resolve) => { if (capturedSignal?.aborted) { resolve(); return; } capturedSignal?.addEventListener('abort', () => resolve(), { once: true }); }); })(), // Never resolves; the stall throw happens before usage is awaited. usage: new Promise(() => {}), } as unknown as ReturnType; }); const streamPromise = streamCompletion( fakeCtx, 'test-model', [{ role: 'user', content: 'hang' }], { tools: null }, () => {}, undefined, ); // Attach the rejection handler BEFORE advancing timers so the rejection is // never unhandled (Node emits PromiseRejectionHandledWarning otherwise). const assertion = expect(streamPromise).rejects.toMatchObject({ name: 'AbortError' }); // Advance past the stall deadline — the stallAc fires, the hanging generator // resolves, the post-loop check sees stallAc.signal.aborted and throws. await vi.advanceTimersByTimeAsync(STALL_TIMEOUT_MS); await assertion; }); // T10: regression pin — the original post-loop signal check for user-initiated // abort must still fire correctly after the stall logic was added. it('throws AbortError when the inbound signal is aborted (user-abort regression pin)', async () => { const ac = new AbortController(); ac.abort(); vi.mocked(streamText).mockReturnValue({ fullStream: (async function* () { // Yield nothing — stream ends immediately after user abort is already set })(), usage: Promise.resolve({ inputTokens: 0, outputTokens: 0 }), } as unknown as ReturnType); await expect( streamCompletion( fakeCtx, 'test-model', [{ role: 'user', content: 'aborted' }], { tools: null }, () => {}, undefined, ac.signal, ), ).rejects.toMatchObject({ name: 'AbortError' }); }); });