import { describe, it, expect, beforeAll, afterAll } from 'vitest'; import { readFileSync } from 'node:fs'; import { resolve } from 'node:path'; import postgres from 'postgres'; import type { WsFrame } from '@boocode/contracts/ws-frames'; import { classifyTerminalStatus, finalizeStreamingMessage } from '../finalize-message.js'; /** * F1 (D-7 / OCE-001 / OCE-002) — finalizing a Stop'd or errored external turn. * * `classifyTerminalStatus` is the pure D-7 decision (user Stop / AbortError → * cancelled, genuine error → failed). `finalizeStreamingMessage` writes that * terminal state onto the streaming assistant row and publishes the matching * message_complete frame — idempotently, guarded by `WHERE status='streaming'`, * so a double-Stop or an abort-then-catch settles the message exactly once and * never clobbers a row that already finished cleanly. */ describe('classifyTerminalStatus (pure, D-7)', () => { it('maps a fired abort signal to cancelled (user Stop)', () => { expect(classifyTerminalStatus({ aborted: true })).toBe('cancelled'); }); it('maps a thrown AbortError to cancelled', () => { const e = new Error('the operation was aborted'); e.name = 'AbortError'; expect(classifyTerminalStatus({ aborted: false, error: e })).toBe('cancelled'); }); it('maps a genuine thrown error to failed', () => { expect(classifyTerminalStatus({ aborted: false, error: new Error('boom') })).toBe('failed'); }); it('defaults a no-abort / no-error catch to failed', () => { expect(classifyTerminalStatus({ aborted: false })).toBe('failed'); }); }); describe.runIf(!!process.env.DATABASE_URL)('finalizeStreamingMessage (DB)', () => { let sql: ReturnType; let projectId: string; let sessionId: string; let chatId: string; beforeAll(async () => { sql = postgres(process.env.DATABASE_URL!, { max: 3 }); // Server schema owns messages/sessions/chats (FK targets); coder schema after. const serverSchema = resolve(__dirname, '../../../../server/src/schema.sql'); const coderSchema = resolve(__dirname, '../../schema.sql'); await sql.unsafe(readFileSync(serverSchema, 'utf8')); await sql.unsafe(readFileSync(coderSchema, 'utf8')); const [p] = await sql<{ id: string }[]>` INSERT INTO projects (name, path, status) VALUES ('f1-finalize', '/tmp/f1-finalize', 'open') RETURNING id `; projectId = p!.id; const [s] = await sql<{ id: string }[]>` INSERT INTO sessions (project_id, name, model, status) VALUES (${projectId}, 'f1', 'm', 'open') RETURNING id `; sessionId = s!.id; const [c] = await sql<{ id: string }[]>` INSERT INTO chats (session_id, name, status) VALUES (${sessionId}, 'tab', 'open') RETURNING id `; chatId = c!.id; }); afterAll(async () => { if (!sql) return; await sql`DELETE FROM messages WHERE session_id = ${sessionId}`.catch(() => {}); await sql`DELETE FROM chats WHERE id = ${chatId}`.catch(() => {}); await sql`DELETE FROM sessions WHERE id = ${sessionId}`.catch(() => {}); await sql`DELETE FROM projects WHERE id = ${projectId}`.catch(() => {}); await sql.end({ timeout: 5 }); }); async function insertStreaming(): Promise { const [m] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status) VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming') RETURNING id `; return m!.id; } it('finalizes a streaming row to cancelled, persists partial content, publishes one frame', async () => { const id = await insertStreaming(); const frames: WsFrame[] = []; const did = await finalizeStreamingMessage(sql, (_s, f) => frames.push(f), { sessionId, chatId, assistantId: id, status: 'cancelled', model: 'qwen', content: 'partial answer', }); expect(did).toBe(true); const [row] = await sql<{ status: string; content: string; finished_at: Date | null }[]>` SELECT status, content, finished_at FROM messages WHERE id = ${id} `; expect(row!.status).toBe('cancelled'); expect(row!.content).toBe('partial answer'); expect(row!.finished_at).not.toBeNull(); expect(frames).toHaveLength(1); expect(frames[0]!.type).toBe('message_complete'); expect((frames[0] as { status?: string }).status).toBe('cancelled'); }); it('is idempotent for a double-Stop: second call updates nothing and re-publishes nothing', async () => { const id = await insertStreaming(); const frames: WsFrame[] = []; const push = (_s: string, f: WsFrame): void => { frames.push(f); }; expect( await finalizeStreamingMessage(sql, push, { sessionId, chatId, assistantId: id, status: 'cancelled', model: null }), ).toBe(true); expect( await finalizeStreamingMessage(sql, push, { sessionId, chatId, assistantId: id, status: 'cancelled', model: null }), ).toBe(false); expect(frames).toHaveLength(1); const [row] = await sql<{ status: string }[]>`SELECT status FROM messages WHERE id = ${id}`; expect(row!.status).toBe('cancelled'); }); it('never clobbers a row that already finished cleanly (abort raced a clean finish)', async () => { const [m] = await sql<{ id: string }[]>` INSERT INTO messages (session_id, chat_id, role, content, status) VALUES (${sessionId}, ${chatId}, 'assistant', 'done', 'complete') RETURNING id `; const id = m!.id; const frames: WsFrame[] = []; const did = await finalizeStreamingMessage(sql, (_s, f) => frames.push(f), { sessionId, chatId, assistantId: id, status: 'cancelled', model: null, }); expect(did).toBe(false); expect(frames).toHaveLength(0); const [row] = await sql<{ status: string; content: string }[]>` SELECT status, content FROM messages WHERE id = ${id} `; expect(row!.status).toBe('complete'); expect(row!.content).toBe('done'); }); it('no-ops on an empty assistantId (throw happened before the row was created)', async () => { const frames: WsFrame[] = []; const did = await finalizeStreamingMessage(sql, (_s, f) => frames.push(f), { sessionId, chatId, assistantId: '', status: 'failed', model: null, }); expect(did).toBe(false); expect(frames).toHaveLength(0); }); });