import { useEffect, useRef, useState } from 'react'; import { toast } from 'sonner'; import type { Message, WsFrame } from '@/api/types'; import { WsFrameSchema } from '@/api/ws-frames'; import { api } from '@/api/client'; import { sessionEvents } from './sessionEvents'; import { recordUsage } from './useChatThroughput'; // session_renamed frame removed from WsFrame — it was declared but never // published on the per-session WS channel (server publishes via broker.publishUser // since v1.4). chat_renamed remains; auto_name.ts publishes it on session WS. interface State { messages: Message[]; connected: boolean; error: string | null; } function applyFrame(state: State, frame: WsFrame): State { switch (frame.type) { case 'snapshot': { return { ...state, messages: frame.messages }; } case 'message_started': { const exists = state.messages.some((m) => m.id === frame.message_id); if (exists) return state; const newMsg: Message = { id: frame.message_id, session_id: '', chat_id: frame.chat_id ?? '', role: frame.role, content: '', kind: 'message', tool_calls: null, tool_results: null, // v1.8.2: cap-hit sentinels arrive role='system' and are static, so // skipping the streaming dot for them keeps the UI accurate. status: frame.role === 'system' ? 'complete' : 'streaming', last_seq: 0, tokens_used: null, ctx_used: null, ctx_max: null, started_at: null, finished_at: null, created_at: new Date().toISOString(), metadata: null, }; return { ...state, messages: [...state.messages, newMsg] }; } case 'delta': { const next = state.messages.map((m) => { if (m.id !== frame.message_id) return m; const chunk = frame.content ?? ''; if (m.role === 'user') { return { ...m, content: chunk || m.content }; } return { ...m, content: m.content + chunk }; }); return { ...state, messages: next }; } case 'tool_call': { const next = state.messages.map((m) => m.id === frame.message_id ? { ...m, tool_calls: [...(m.tool_calls ?? []), frame.tool_call] } : m ); return { ...state, messages: next }; } case 'tool_result': { const exists = state.messages.some((m) => m.id === frame.tool_message_id); if (exists) { const next = state.messages.map((m) => m.id === frame.tool_message_id ? { ...m, role: 'tool' as const, tool_results: { tool_call_id: frame.tool_call_id, output: frame.output, truncated: frame.truncated, ...(frame.error ? { error: frame.error } : {}), }, status: 'complete' as const, } : m ); return { ...state, messages: next }; } const newMsg: Message = { id: frame.tool_message_id, session_id: '', chat_id: frame.chat_id ?? '', role: 'tool', content: '', kind: 'message', tool_calls: null, tool_results: { tool_call_id: frame.tool_call_id, output: frame.output, truncated: frame.truncated, ...(frame.error ? { error: frame.error } : {}), }, status: 'complete', last_seq: 0, tokens_used: null, ctx_used: null, ctx_max: null, started_at: null, finished_at: null, created_at: new Date().toISOString(), metadata: null, }; return { ...state, messages: [...state.messages, newMsg] }; } case 'message_complete': { const next = state.messages.map((m) => m.id === frame.message_id ? { ...m, status: 'complete' as const, ...(frame.tokens_used !== undefined ? { tokens_used: frame.tokens_used } : {}), ...(frame.ctx_used !== undefined ? { ctx_used: frame.ctx_used } : {}), ...(frame.ctx_max !== undefined ? { ctx_max: frame.ctx_max } : {}), ...(frame.started_at !== undefined ? { started_at: frame.started_at } : {}), ...(frame.finished_at !== undefined ? { finished_at: frame.finished_at } : {}), // v1.8.2: cap-hit sentinels (and future stamped metadata) ride // in on this terminal frame so the reducer can attach it // without waiting for a refetch. ...(frame.metadata !== undefined ? { metadata: frame.metadata } : {}), } : m ); return { ...state, messages: next }; } case 'usage': { // v1.12.2: live throughput. Side-effects into the module-level // singleton consumed by ChatThroughput; no message-state mutation. // chat_id is the optional ws-frame field; usage frames always include it. if (frame.chat_id) { recordUsage(frame.chat_id, { completion_tokens: frame.completion_tokens, ctx_used: frame.ctx_used, ctx_max: frame.ctx_max, }); } return state; } case 'messages_deleted': { const removeSet = new Set(frame.message_ids); return { ...state, messages: state.messages.filter((m) => !removeSet.has(m.id)), }; } case 'chat_renamed': { sessionEvents.emit({ type: 'chat_updated', chat_id: frame.chat_id, session_id: '', name: frame.name, updated_at: new Date().toISOString(), }); return state; } case 'error': { // v1.8.2: when the frame carries a structured reason, stamp it onto the // failed message's metadata so the bubble can render specifics inline // (the WS error frame is one-shot; refresh-safe rendering needs the // value persisted on the message). const errorMeta = frame.reason ? { kind: 'error' as const, error_reason: frame.reason, error_text: frame.error } : null; const next = frame.message_id ? state.messages.map((m) => m.id === frame.message_id ? { ...m, status: 'failed' as const, ...(errorMeta ? { metadata: errorMeta } : {}), } : m ) : state.messages; return { ...state, messages: next, error: frame.error }; } case 'compacted': { // v1.11: side effects (refetch + toast) live in ws.onmessage; the // reducer just no-ops so TS exhaustiveness is satisfied without // duplicating async work inside a synchronous reducer. return state; } } } // Matches useUserEvents — exponential backoff with the same ceiling so the // two channels reconnect on the same cadence after a network handoff. const RECONNECT_INITIAL_MS = 1000; const RECONNECT_MAX_MS = 30_000; export function useSessionStream(sessionId: string | undefined) { const [state, setState] = useState({ messages: [], connected: false, error: null }); const wsRef = useRef(null); useEffect(() => { if (!sessionId) return; setState({ messages: [], connected: false, error: null }); let unmounted = false; let reconnectTimer: ReturnType | null = null; let reconnectDelay = RECONNECT_INITIAL_MS; const connect = () => { if (unmounted) return; const proto = window.location.protocol === 'https:' ? 'wss' : 'ws'; const url = `${proto}://${window.location.host}/api/ws/sessions/${sessionId}`; const ws = new WebSocket(url); wsRef.current = ws; ws.onopen = () => { reconnectDelay = RECONNECT_INITIAL_MS; setState((s) => ({ ...s, connected: true, error: null })); }; ws.onmessage = (ev) => { // v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid // frames are logged and dropped. WsFrameSchema is the runtime guard; // the hand-maintained WsFrame type stays as the narrowed dev-time // shape (Zod uses OpaqueObject for nested types like Message[]). One // cast bridges the two. let raw: unknown; try { raw = JSON.parse(typeof ev.data === 'string' ? ev.data : ''); } catch (err) { console.warn('bad ws frame (parse)', err); return; } const validated = WsFrameSchema.safeParse(raw); if (!validated.success) { console.error('ws-frame-validation-failed (session channel)', { frame_type: (raw as { type?: unknown })?.type, errors: validated.error.flatten(), }); return; } try { const frame = validated.data as unknown as WsFrame; // v1.11: on a compaction completion, re-fetch the message list so // the new summary row + the cohort of compacted_at-stamped older // rows render correctly. We dispatch the fresh list as a synthetic // 'snapshot' frame so the reducer's existing path handles state // replacement (no need for a parallel "refetched" path). // The toast is purely UX feedback; missing it would still leave // the chat in a valid state. if (frame.type === 'compacted') { toast.success('Context compacted to free space'); void api.messages .list(frame.session_id) .then((messages) => { setState((s) => applyFrame(s, { type: 'snapshot', messages })); }) .catch((err: unknown) => { console.warn('compacted refetch failed', err); }); return; } setState((s) => applyFrame(s, frame)); } catch (err) { console.warn('bad ws frame', err); } }; // v1.8.1: WS errors no longer surface as user-facing toasts here. The // user-channel hook (useUserEvents) owns the debounced "reconnecting…" // UI; this channel just reconnects silently on the same backoff. ws.onerror = () => { try { ws.close(); } catch {} }; ws.onclose = () => { if (unmounted) return; setState((s) => ({ ...s, connected: false })); const delay = reconnectDelay; reconnectDelay = Math.min(reconnectDelay * 2, RECONNECT_MAX_MS); reconnectTimer = setTimeout(connect, delay); }; }; connect(); return () => { unmounted = true; if (reconnectTimer) clearTimeout(reconnectTimer); const ws = wsRef.current; wsRef.current = null; if (ws) try { ws.close(); } catch {} }; }, [sessionId]); useEffect(() => { if (!sessionId) return; return sessionEvents.subscribe((event) => { if (event.type === 'refetch_messages') { void api.messages .list(sessionId) .then((messages) => { setState((s) => applyFrame(s, { type: 'snapshot', messages })); }) .catch((err: unknown) => { console.warn('refetch_messages failed', err); }); } }); }, [sessionId]); return state; }