import { useEffect, useRef, useState } from 'react'; import { toast } from 'sonner'; import type { Message, WsFrame } from '@/api/types'; import { WsFrameSchema } from '@boocode/contracts/ws-frames'; import { api } from '@/api/client'; import { sessionEvents } from './sessionEvents'; import { recordUsage } from './useChatThroughput'; // session_renamed frame removed from WsFrame — it was declared but never // published on the per-session WS channel (server publishes via broker.publishUser // since v1.4). chat_renamed remains; auto_name.ts publishes it on session WS. interface State { messages: Message[]; connected: boolean; error: string | null; } type Channel = 'text' | 'tool_call' | 'tool_result' | 'status' | 'error'; // Per-channel out-of-order frame buffer with contiguous-seq flush logic. // Stores incoming channel_delta frames and releases them only when seq // becomes contiguous with the expected next value. class ChannelBuffer { private expectedSeq = 0; private buffer = new Map(); push(frame: ChannelDeltaWsFrame): ChannelDeltaWsFrame[] { if (frame.seq < this.expectedSeq) { return []; } if (frame.seq === this.expectedSeq) { this.expectedSeq++; const flushed = [frame]; while (this.buffer.has(this.expectedSeq)) { const next = this.buffer.get(this.expectedSeq)!; this.buffer.delete(this.expectedSeq); this.expectedSeq++; flushed.push(next); } return flushed; } this.buffer.set(frame.seq, frame); return []; } get expectedNextSeq(): number { return this.expectedSeq; } get bufferedCount(): number { return this.buffer.size; } reset(seq = 0) { this.expectedSeq = seq; this.buffer.clear(); } } type ChannelDeltaWsFrame = WsFrame & { type: 'channel_delta' }; // Converts a flushed channel_delta into the equivalent legacy frame so the // existing applyFrame reducer handles the per-message mutation. Status // deltas are handled separately (they may need to create the message first // and apply throughput metadata independently of terminal status). function channelDeltaToLegacyFrame(delta: ChannelDeltaWsFrame): WsFrame | null { switch (delta.channel) { case 'text': return { type: 'delta', message_id: delta.message_id!, content: delta.content! }; case 'tool_call': return { type: 'tool_call', message_id: delta.message_id!, tool_call: delta.tool_call! }; case 'tool_result': return { type: 'tool_result', tool_message_id: delta.tool_message_id!, chat_id: delta.chat_id, tool_call_id: delta.tool_call_id!, output: delta.output, truncated: delta.truncated!, ...(delta.error ? { error: delta.error } : {}), }; case 'error': return { type: 'error', message_id: delta.message_id, chat_id: delta.chat_id, error: delta.error!, ...(delta.reason ? { reason: delta.reason as never } : {}), }; case 'status': return null; } } // Apply a flushed status channel_delta to state. Status deltas carry both // intermediate throughput metadata (tokens_used, ctx_used, model, etc.) // and optional terminal transitions (complete / cancelled / failed). function applyStatusDelta(state: State, delta: ChannelDeltaWsFrame): State { const { message_id, chat_id, status, channel: _c, seq: _s, type: _t, ...meta } = delta; if (!message_id) return state; let next = state; const exists = next.messages.some((m) => m.id === message_id); if (!exists && status === 'running') { next = applyFrame(next, { type: 'message_started', message_id, chat_id, role: 'assistant', }); } const metaFields: Record = {}; if (meta.tokens_used !== undefined) metaFields.tokens_used = meta.tokens_used; if (meta.ctx_used !== undefined) metaFields.ctx_used = meta.ctx_used; if (meta.ctx_max !== undefined) metaFields.ctx_max = meta.ctx_max; if (meta.cache_tokens !== undefined) metaFields.cache_tokens = meta.cache_tokens; if (meta.reasoning_tokens !== undefined) metaFields.reasoning_tokens = meta.reasoning_tokens; if (meta.started_at !== undefined) metaFields.started_at = meta.started_at; if (meta.finished_at !== undefined) metaFields.finished_at = meta.finished_at; if (meta.model !== undefined) metaFields.model = meta.model; if (meta.metadata !== undefined) metaFields.metadata = meta.metadata; if (Object.keys(metaFields).length > 0) { next = { ...next, messages: next.messages.map((m) => m.id === message_id ? { ...m, ...metaFields } : m, ), }; } if (status === 'complete' || status === 'cancelled' || status === 'failed') { next = applyFrame(next, { type: 'message_complete', message_id, chat_id, status, }); } return next; } function applyFrame(state: State, frame: WsFrame): State { switch (frame.type) { case 'snapshot': { return { ...state, messages: frame.messages }; } case 'message_started': { const exists = state.messages.some((m) => m.id === frame.message_id); if (exists) return state; const newMsg: Message = { id: frame.message_id, session_id: '', chat_id: frame.chat_id ?? '', role: frame.role, content: '', kind: 'message', tool_calls: null, tool_results: null, status: frame.role === 'system' ? 'complete' : 'streaming', last_seq: 0, tokens_used: null, ctx_used: null, ctx_max: null, cache_tokens: null, reasoning_tokens: null, model: null, started_at: null, finished_at: null, created_at: new Date().toISOString(), metadata: null, }; return { ...state, messages: [...state.messages, newMsg] }; } case 'delta': { const next = state.messages.map((m) => { if (m.id !== frame.message_id) return m; const chunk = frame.content ?? ''; if (m.role === 'user') { return { ...m, content: chunk || m.content }; } return { ...m, content: m.content + chunk }; }); return { ...state, messages: next }; } case 'tool_call': { const next = state.messages.map((m) => m.id === frame.message_id ? { ...m, tool_calls: [...(m.tool_calls ?? []), frame.tool_call] } : m, ); return { ...state, messages: next }; } case 'tool_result': { const exists = state.messages.some((m) => m.id === frame.tool_message_id); if (exists) { const next = state.messages.map((m) => m.id === frame.tool_message_id ? { ...m, role: 'tool' as const, tool_results: { tool_call_id: frame.tool_call_id, output: frame.output, truncated: frame.truncated, ...(frame.error ? { error: frame.error } : {}), }, status: 'complete' as const, } : m, ); return { ...state, messages: next }; } const newMsg: Message = { id: frame.tool_message_id, session_id: '', chat_id: frame.chat_id ?? '', role: 'tool', content: '', kind: 'message', tool_calls: null, tool_results: { tool_call_id: frame.tool_call_id, output: frame.output, truncated: frame.truncated, ...(frame.error ? { error: frame.error } : {}), }, status: 'complete', last_seq: 0, tokens_used: null, ctx_used: null, ctx_max: null, cache_tokens: null, reasoning_tokens: null, model: null, started_at: null, finished_at: null, created_at: new Date().toISOString(), metadata: null, }; return { ...state, messages: [...state.messages, newMsg] }; } case 'message_complete': { const next = state.messages.map((m) => m.id === frame.message_id ? { ...m, status: 'complete' as const, ...(frame.tokens_used !== undefined ? { tokens_used: frame.tokens_used } : {}), ...(frame.ctx_used !== undefined ? { ctx_used: frame.ctx_used } : {}), ...(frame.ctx_max !== undefined ? { ctx_max: frame.ctx_max } : {}), ...(frame.cache_tokens !== undefined ? { cache_tokens: frame.cache_tokens } : {}), ...(frame.reasoning_tokens !== undefined ? { reasoning_tokens: frame.reasoning_tokens } : {}), ...(frame.started_at !== undefined ? { started_at: frame.started_at } : {}), ...(frame.finished_at !== undefined ? { finished_at: frame.finished_at } : {}), ...(frame.model !== undefined ? { model: frame.model } : {}), ...(frame.metadata !== undefined ? { metadata: frame.metadata } : {}), } : m, ); return { ...state, messages: next }; } case 'usage': { if (frame.chat_id) { recordUsage(frame.chat_id, { completion_tokens: frame.completion_tokens, ctx_used: frame.ctx_used, ctx_max: frame.ctx_max, }); } return state; } case 'messages_deleted': { const removeSet = new Set(frame.message_ids); return { ...state, messages: state.messages.filter((m) => !removeSet.has(m.id)), }; } case 'chat_renamed': { sessionEvents.emit({ type: 'chat_updated', chat_id: frame.chat_id, session_id: '', name: frame.name, updated_at: new Date().toISOString(), }); return state; } case 'error': { const errorMeta = frame.reason ? { kind: 'error' as const, error_reason: frame.reason, error_text: frame.error } : null; const next = frame.message_id ? state.messages.map((m) => m.id === frame.message_id ? { ...m, status: 'failed' as const, ...(errorMeta ? { metadata: errorMeta } : {}), } : m, ) : state.messages; return { ...state, messages: next, error: frame.error }; } case 'compacted': { return state; } case 'agent_snapshot': { return state; } case 'agent_status_updated': { return state; } case 'flow_run_started': case 'flow_run_step_updated': { return state; } case 'battle_started': case 'contestant_updated': case 'battle_updated': { return state; } case 'channel_delta': { return state; } default: { return state; } } } const RECONNECT_INITIAL_MS = 1000; const RECONNECT_MAX_MS = 30_000; const CHANNEL_STALL_MS = 5000; export function useSessionStream(sessionId: string | undefined) { const [state, setState] = useState({ messages: [], connected: false, error: null }); const wsRef = useRef(null); const channelBuffersRef = useRef>(new Map()); const lastFrameTimeRef = useRef>>({}); // Reset channel buffers when session changes useEffect(() => { channelBuffersRef.current = new Map(); lastFrameTimeRef.current = {}; }, [sessionId]); useEffect(() => { if (!sessionId) return; setState({ messages: [], connected: false, error: null }); let unmounted = false; let reconnectTimer: ReturnType | null = null; let reconnectDelay = RECONNECT_INITIAL_MS; const getLastSeqPerChannel = () => { const seqs: Partial> = {}; for (const [ch, buf] of channelBuffersRef.current) { seqs[ch] = buf.expectedNextSeq; } return seqs; }; const flushDeltaToState = (delta: ChannelDeltaWsFrame) => { console.error('FDS', delta.channel, 'flushed'); if (delta.channel === 'status') { setState((s) => applyStatusDelta(s, delta)); } else { const legacy = channelDeltaToLegacyFrame(delta); if (legacy) { setState((s) => applyFrame(s, legacy)); } } }; const handleChannelDelta = (frame: ChannelDeltaWsFrame) => { console.error('HCD', frame.channel, frame.seq, 'bufs', channelBuffersRef.current.size); const buffers = channelBuffersRef.current; let buffer = buffers.get(frame.channel); if (!buffer) { buffer = new ChannelBuffer(); buffers.set(frame.channel, buffer); } const flushed = buffer.push(frame); if (flushed.length === 0) return; for (const delta of flushed) { flushDeltaToState(delta); } let emittedRefresh = false; for (const delta of flushed) { if (delta.channel === 'status' && (delta.status === 'complete' || delta.status === 'cancelled' || delta.status === 'failed')) { emittedRefresh = true; } } if (emittedRefresh) { sessionEvents.emit({ type: 'git_diff_refresh' }); } lastFrameTimeRef.current[frame.channel] = Date.now(); }; // Periodic channel stall check: if any channel has buffered frames // but no progress for 5s, force a snapshot refetch. let stallTimer: ReturnType | null = null; const startStallTimer = () => { stallTimer = setInterval(() => { const now = Date.now(); for (const [channel, buffer] of channelBuffersRef.current) { if (buffer.bufferedCount === 0) continue; const lastTime = lastFrameTimeRef.current[channel as Channel] ?? 0; if (now - lastTime >= CHANNEL_STALL_MS) { buffer.reset(); sessionEvents.emit({ type: 'refetch_messages' }); } } }, 1000); }; const connect = () => { if (unmounted) return; const proto = window.location.protocol === 'https:' ? 'wss' : 'ws'; const url = `${proto}://${window.location.host}/api/ws/sessions/${sessionId}`; const ws = new WebSocket(url); wsRef.current = ws; ws.onopen = () => { reconnectDelay = RECONNECT_INITIAL_MS; setState((s) => ({ ...s, connected: true, error: null })); // Mid-stream reconnection protocol: send last known seq per channel // so the server can replay deltas or fall back to a full snapshot. const lastSeq = getLastSeqPerChannel(); ws.send(JSON.stringify({ type: 'reconnect', lastSeqPerChannel: lastSeq })); startStallTimer(); }; ws.onmessage = (ev) => { let raw: unknown; try { raw = JSON.parse(typeof ev.data === 'string' ? ev.data : ''); } catch (err) { console.warn('bad ws frame (parse)', err); return; } const validated = WsFrameSchema.safeParse(raw); if (!validated.success) { console.error('ws-frame-validation-failed (session channel)', { frame_type: (raw as { type?: unknown })?.type, errors: validated.error.flatten(), }); return; } try { const frame = validated.data as unknown as WsFrame; if (frame.type === 'channel_delta') { console.error('RAW_PARSE', JSON.stringify(validated.data).slice(0, 200)); console.error('CD', frame.channel, frame.seq, JSON.stringify(frame).slice(0, 80)); handleChannelDelta(frame); return; } if (frame.type === 'compacted') { toast.success('Context compacted to free space'); void api.messages .list(frame.session_id) .then((messages) => { setState((s) => applyFrame(s, { type: 'snapshot', messages })); }) .catch((err: unknown) => { console.warn('compacted refetch failed', err); }); return; } setState((s) => applyFrame(s, frame)); if (frame.type === 'message_complete') { sessionEvents.emit({ type: 'git_diff_refresh' }); } } catch (err) { console.warn('bad ws frame', err); } }; ws.onerror = () => { try { ws.close(); } catch {} }; ws.onclose = () => { if (unmounted) return; setState((s) => ({ ...s, connected: false })); if (stallTimer) { clearInterval(stallTimer); stallTimer = null; } const delay = reconnectDelay; reconnectDelay = Math.min(reconnectDelay * 2, RECONNECT_MAX_MS); reconnectTimer = setTimeout(connect, delay); }; }; connect(); return () => { unmounted = true; if (reconnectTimer) clearTimeout(reconnectTimer); if (stallTimer) clearInterval(stallTimer); const ws = wsRef.current; wsRef.current = null; if (ws) try { ws.close(); } catch {} }; }, [sessionId]); useEffect(() => { if (!sessionId) return; return sessionEvents.subscribe((event) => { if (event.type === 'refetch_messages') { void api.messages .list(sessionId) .then((messages) => { setState((s) => applyFrame(s, { type: 'snapshot', messages })); }) .catch((err: unknown) => { console.warn('refetch_messages failed', err); }); } }); }, [sessionId]); return state; }