import { useEffect, useRef, useState, useCallback } from 'react'; import type { Message, WsFrame, PendingChange } from '@/api/types'; interface State { messages: Message[]; connected: boolean; error: string | null; } function applyFrame(state: State, frame: WsFrame): State { switch (frame.type) { case 'snapshot': { return { ...state, messages: frame.messages }; } case 'message_started': { const exists = state.messages.some((m) => m.id === frame.message_id); if (exists) return state; const newMsg: Message = { id: frame.message_id, session_id: '', chat_id: frame.chat_id, role: frame.role, content: '', kind: 'message', tool_calls: null, tool_results: null, status: frame.role === 'system' ? 'complete' : 'streaming', tokens_used: null, ctx_used: null, ctx_max: null, started_at: null, finished_at: null, created_at: new Date().toISOString(), metadata: null, }; return { ...state, messages: [...state.messages, newMsg] }; } case 'delta': { const next = state.messages.map((m) => m.id === frame.message_id ? { ...m, content: m.content + frame.content } : m, ); return { ...state, messages: next }; } case 'tool_call': { const next = state.messages.map((m) => m.id === frame.message_id ? { ...m, tool_calls: [...(m.tool_calls ?? []), frame.tool_call] } : m, ); return { ...state, messages: next }; } case 'tool_result': { const exists = state.messages.some((m) => m.id === frame.tool_message_id); if (exists) { const next = state.messages.map((m) => m.id === frame.tool_message_id ? { ...m, role: 'tool' as const, tool_results: { tool_call_id: frame.tool_call_id, output: frame.output, truncated: frame.truncated, ...(frame.error ? { error: frame.error } : {}), }, status: 'complete' as const, } : m, ); return { ...state, messages: next }; } const newMsg: Message = { id: frame.tool_message_id, session_id: '', chat_id: frame.chat_id, role: 'tool', content: '', kind: 'message', tool_calls: null, tool_results: { tool_call_id: frame.tool_call_id, output: frame.output, truncated: frame.truncated, ...(frame.error ? { error: frame.error } : {}), }, status: 'complete', tokens_used: null, ctx_used: null, ctx_max: null, started_at: null, finished_at: null, created_at: new Date().toISOString(), metadata: null, }; return { ...state, messages: [...state.messages, newMsg] }; } case 'message_complete': { const next = state.messages.map((m) => m.id === frame.message_id ? { ...m, status: 'complete' as const, ...(frame.tokens_used !== undefined ? { tokens_used: frame.tokens_used } : {}), ...(frame.ctx_used !== undefined ? { ctx_used: frame.ctx_used } : {}), ...(frame.ctx_max !== undefined ? { ctx_max: frame.ctx_max } : {}), ...(frame.started_at !== undefined ? { started_at: frame.started_at } : {}), ...(frame.finished_at !== undefined ? { finished_at: frame.finished_at } : {}), ...(frame.metadata !== undefined ? { metadata: frame.metadata } : {}), } : m, ); return { ...state, messages: next }; } case 'error': { const next = frame.message_id ? state.messages.map((m) => m.id === frame.message_id ? { ...m, status: 'failed' as const } : m, ) : state.messages; return { ...state, messages: next, error: frame.error }; } case 'pending_change_added': case 'pending_change_updated': // These are handled by the pending changes listener, not the message state return state; } } const RECONNECT_INITIAL_MS = 1000; const RECONNECT_MAX_MS = 30_000; interface SessionStreamResult { messages: Message[]; connected: boolean; error: string | null; isStreaming: boolean; /** Listeners for pending change frames */ onPendingChange: (cb: (change: PendingChange) => void) => () => void; } export function useSessionStream(sessionId: string | undefined): SessionStreamResult { const [state, setState] = useState({ messages: [], connected: false, error: null }); const wsRef = useRef(null); const pendingListenersRef = useRef void>>(new Set()); useEffect(() => { if (!sessionId) return; setState({ messages: [], connected: false, error: null }); let unmounted = false; let reconnectTimer: ReturnType | null = null; let reconnectDelay = RECONNECT_INITIAL_MS; const connect = () => { if (unmounted) return; const proto = window.location.protocol === 'https:' ? 'wss' : 'ws'; const url = `${proto}://${window.location.host}/api/ws/sessions/${sessionId}`; const ws = new WebSocket(url); wsRef.current = ws; ws.onopen = () => { reconnectDelay = RECONNECT_INITIAL_MS; setState((s) => ({ ...s, connected: true, error: null })); }; ws.onmessage = (ev) => { let frame: WsFrame; try { frame = JSON.parse(typeof ev.data === 'string' ? ev.data : '') as WsFrame; } catch { return; } // Notify pending change listeners if (frame.type === 'pending_change_added' || frame.type === 'pending_change_updated') { for (const cb of pendingListenersRef.current) { cb(frame.change); } } setState((s) => applyFrame(s, frame)); }; ws.onerror = () => { try { ws.close(); } catch {} }; ws.onclose = () => { if (unmounted) return; setState((s) => ({ ...s, connected: false })); const delay = reconnectDelay; reconnectDelay = Math.min(reconnectDelay * 2, RECONNECT_MAX_MS); reconnectTimer = setTimeout(connect, delay); }; }; connect(); return () => { unmounted = true; if (reconnectTimer) clearTimeout(reconnectTimer); const ws = wsRef.current; wsRef.current = null; if (ws) try { ws.close(); } catch {} }; }, [sessionId]); const isStreaming = state.messages.some((m) => m.status === 'streaming'); const onPendingChange = useCallback((cb: (change: PendingChange) => void) => { pendingListenersRef.current.add(cb); return () => { pendingListenersRef.current.delete(cb); }; }, []); return { messages: state.messages, connected: state.connected, error: state.error, isStreaming, onPendingChange, }; }