Files
boocode/apps/web/src/hooks/useSessionStream.ts
indifferentketchup 93d3f86c2b v2.2-paseo-providers: Paseo provider stack + v2.2.1 pane-scoped chat fixes
Ship Paseo-equivalent provider snapshot, AgentComposerBar, ACP dispatch
rewrite with streaming/persist, permission prompts, and agent commands.
Follow-up: pane-scoped chat resolution, CoderMessageList tool timeline,
WS user-delta replace, and inference orphan tool_call stripping.
Archive openspec v2-2; update CHANGELOG and CURRENT.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-26 15:18:31 +00:00

299 lines
11 KiB
TypeScript

import { useEffect, useRef, useState } from 'react';
import { toast } from 'sonner';
import type { Message, WsFrame } from '@/api/types';
import { WsFrameSchema } from '@/api/ws-frames';
import { api } from '@/api/client';
import { sessionEvents } from './sessionEvents';
import { recordUsage } from './useChatThroughput';
// session_renamed frame removed from WsFrame — it was declared but never
// published on the per-session WS channel (server publishes via broker.publishUser
// since v1.4). chat_renamed remains; auto_name.ts publishes it on session WS.
interface State {
messages: Message[];
connected: boolean;
error: string | null;
}
function applyFrame(state: State, frame: WsFrame): State {
switch (frame.type) {
case 'snapshot': {
return { ...state, messages: frame.messages };
}
case 'message_started': {
const exists = state.messages.some((m) => m.id === frame.message_id);
if (exists) return state;
const newMsg: Message = {
id: frame.message_id,
session_id: '',
chat_id: frame.chat_id ?? '',
role: frame.role,
content: '',
kind: 'message',
tool_calls: null,
tool_results: null,
// v1.8.2: cap-hit sentinels arrive role='system' and are static, so
// skipping the streaming dot for them keeps the UI accurate.
status: frame.role === 'system' ? 'complete' : 'streaming',
last_seq: 0,
tokens_used: null,
ctx_used: null,
ctx_max: null,
started_at: null,
finished_at: null,
created_at: new Date().toISOString(),
metadata: null,
};
return { ...state, messages: [...state.messages, newMsg] };
}
case 'delta': {
const next = state.messages.map((m) => {
if (m.id !== frame.message_id) return m;
const chunk = frame.content ?? '';
if (m.role === 'user') {
return { ...m, content: chunk || m.content };
}
return { ...m, content: m.content + chunk };
});
return { ...state, messages: next };
}
case 'tool_call': {
const next = state.messages.map((m) =>
m.id === frame.message_id
? { ...m, tool_calls: [...(m.tool_calls ?? []), frame.tool_call] }
: m
);
return { ...state, messages: next };
}
case 'tool_result': {
const exists = state.messages.some((m) => m.id === frame.tool_message_id);
if (exists) {
const next = state.messages.map((m) =>
m.id === frame.tool_message_id
? {
...m,
role: 'tool' as const,
tool_results: {
tool_call_id: frame.tool_call_id,
output: frame.output,
truncated: frame.truncated,
...(frame.error ? { error: frame.error } : {}),
},
status: 'complete' as const,
}
: m
);
return { ...state, messages: next };
}
const newMsg: Message = {
id: frame.tool_message_id,
session_id: '',
chat_id: frame.chat_id ?? '',
role: 'tool',
content: '',
kind: 'message',
tool_calls: null,
tool_results: {
tool_call_id: frame.tool_call_id,
output: frame.output,
truncated: frame.truncated,
...(frame.error ? { error: frame.error } : {}),
},
status: 'complete',
last_seq: 0,
tokens_used: null,
ctx_used: null,
ctx_max: null,
started_at: null,
finished_at: null,
created_at: new Date().toISOString(),
metadata: null,
};
return { ...state, messages: [...state.messages, newMsg] };
}
case 'message_complete': {
const next = state.messages.map((m) =>
m.id === frame.message_id
? {
...m,
status: 'complete' as const,
...(frame.tokens_used !== undefined ? { tokens_used: frame.tokens_used } : {}),
...(frame.ctx_used !== undefined ? { ctx_used: frame.ctx_used } : {}),
...(frame.ctx_max !== undefined ? { ctx_max: frame.ctx_max } : {}),
...(frame.started_at !== undefined ? { started_at: frame.started_at } : {}),
...(frame.finished_at !== undefined ? { finished_at: frame.finished_at } : {}),
// v1.8.2: cap-hit sentinels (and future stamped metadata) ride
// in on this terminal frame so the reducer can attach it
// without waiting for a refetch.
...(frame.metadata !== undefined ? { metadata: frame.metadata } : {}),
}
: m
);
return { ...state, messages: next };
}
case 'usage': {
// v1.12.2: live throughput. Side-effects into the module-level
// singleton consumed by ChatThroughput; no message-state mutation.
// chat_id is the optional ws-frame field; usage frames always include it.
if (frame.chat_id) {
recordUsage(frame.chat_id, {
completion_tokens: frame.completion_tokens,
ctx_used: frame.ctx_used,
ctx_max: frame.ctx_max,
});
}
return state;
}
case 'messages_deleted': {
const removeSet = new Set(frame.message_ids);
return {
...state,
messages: state.messages.filter((m) => !removeSet.has(m.id)),
};
}
case 'chat_renamed': {
sessionEvents.emit({
type: 'chat_updated',
chat_id: frame.chat_id,
session_id: '',
name: frame.name,
updated_at: new Date().toISOString(),
});
return state;
}
case 'error': {
// v1.8.2: when the frame carries a structured reason, stamp it onto the
// failed message's metadata so the bubble can render specifics inline
// (the WS error frame is one-shot; refresh-safe rendering needs the
// value persisted on the message).
const errorMeta = frame.reason
? { kind: 'error' as const, error_reason: frame.reason, error_text: frame.error }
: null;
const next = frame.message_id
? state.messages.map((m) =>
m.id === frame.message_id
? {
...m,
status: 'failed' as const,
...(errorMeta ? { metadata: errorMeta } : {}),
}
: m
)
: state.messages;
return { ...state, messages: next, error: frame.error };
}
case 'compacted': {
// v1.11: side effects (refetch + toast) live in ws.onmessage; the
// reducer just no-ops so TS exhaustiveness is satisfied without
// duplicating async work inside a synchronous reducer.
return state;
}
}
}
// Matches useUserEvents — exponential backoff with the same ceiling so the
// two channels reconnect on the same cadence after a network handoff.
const RECONNECT_INITIAL_MS = 1000;
const RECONNECT_MAX_MS = 30_000;
export function useSessionStream(sessionId: string | undefined) {
const [state, setState] = useState<State>({ messages: [], connected: false, error: null });
const wsRef = useRef<WebSocket | null>(null);
useEffect(() => {
if (!sessionId) return;
setState({ messages: [], connected: false, error: null });
let unmounted = false;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let reconnectDelay = RECONNECT_INITIAL_MS;
const connect = () => {
if (unmounted) return;
const proto = window.location.protocol === 'https:' ? 'wss' : 'ws';
const url = `${proto}://${window.location.host}/api/ws/sessions/${sessionId}`;
const ws = new WebSocket(url);
wsRef.current = ws;
ws.onopen = () => {
reconnectDelay = RECONNECT_INITIAL_MS;
setState((s) => ({ ...s, connected: true, error: null }));
};
ws.onmessage = (ev) => {
// v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid
// frames are logged and dropped. WsFrameSchema is the runtime guard;
// the hand-maintained WsFrame type stays as the narrowed dev-time
// shape (Zod uses OpaqueObject for nested types like Message[]). One
// cast bridges the two.
let raw: unknown;
try {
raw = JSON.parse(typeof ev.data === 'string' ? ev.data : '');
} catch (err) {
console.warn('bad ws frame (parse)', err);
return;
}
const validated = WsFrameSchema.safeParse(raw);
if (!validated.success) {
console.error('ws-frame-validation-failed (session channel)', {
frame_type: (raw as { type?: unknown })?.type,
errors: validated.error.flatten(),
});
return;
}
try {
const frame = validated.data as unknown as WsFrame;
// v1.11: on a compaction completion, re-fetch the message list so
// the new summary row + the cohort of compacted_at-stamped older
// rows render correctly. We dispatch the fresh list as a synthetic
// 'snapshot' frame so the reducer's existing path handles state
// replacement (no need for a parallel "refetched" path).
// The toast is purely UX feedback; missing it would still leave
// the chat in a valid state.
if (frame.type === 'compacted') {
toast.success('Context compacted to free space');
void api.messages
.list(frame.session_id)
.then((messages) => {
setState((s) => applyFrame(s, { type: 'snapshot', messages }));
})
.catch((err: unknown) => {
console.warn('compacted refetch failed', err);
});
return;
}
setState((s) => applyFrame(s, frame));
} catch (err) {
console.warn('bad ws frame', err);
}
};
// v1.8.1: WS errors no longer surface as user-facing toasts here. The
// user-channel hook (useUserEvents) owns the debounced "reconnecting…"
// UI; this channel just reconnects silently on the same backoff.
ws.onerror = () => {
try { ws.close(); } catch {}
};
ws.onclose = () => {
if (unmounted) return;
setState((s) => ({ ...s, connected: false }));
const delay = reconnectDelay;
reconnectDelay = Math.min(reconnectDelay * 2, RECONNECT_MAX_MS);
reconnectTimer = setTimeout(connect, delay);
};
};
connect();
return () => {
unmounted = true;
if (reconnectTimer) clearTimeout(reconnectTimer);
const ws = wsRef.current;
wsRef.current = null;
if (ws) try { ws.close(); } catch {}
};
}, [sessionId]);
return state;
}