First half of the WebSocket-frame-typing batch (split per recon — total
scope was ~535 LoC, larger than the roadmap's ~300 estimate, so the
server-side publish-site conversion lands separately in v1.13.11-b).
Phase A scope:
(1) apps/server/src/types/ws-frames.ts (NEW) — Zod schemas for all 27
wire-format WS frame types. Discriminated union (WsFrameSchema) plus
KNOWN_FRAME_TYPES const for diagnostic lookup. UUIDs are z.string().
uuid(); model-emitted tool_call_id stays z.string().min(1) since OpenAI-
compatible APIs emit "call_<random>" not UUID. Per-kind payload narrowing
(tool args, message_parts payloads) intentionally stays z.unknown() —
frame-level drift detection is the goal; deep payload validation is
follow-up work.
(2) apps/web/src/api/ws-frames.ts (NEW) — byte-identical mirror of the
authoritative server file. No path alias from web→server in the existing
tsconfig setup; sync-by-hand was chosen over a new packages/shared/ dir.
A ws-frames.test.ts test asserts the two files match.
(3) apps/server/src/services/broker.ts — adds publishFrame() and
publishUserFrame() methods to the Broker interface. Both validate via
WsFrameSchema and fail-closed: log + drop on invalid. createBroker now
accepts an optional FastifyBaseLogger so validation failures land in
the pino stream (with console.error fallback for unit tests). The
existing publish() / publishUser() raw methods stay legal — they get
converted to the typed variants in v1.13.11-b.
(4) apps/web/src/hooks/useSessionStream.ts + useUserEvents.ts — wrap
ws.onmessage with WsFrameSchema.safeParse. Fail-closed: invalid frames
log + return without dispatching. Hand-maintained WsFrame and
SessionEvent types stay in place; one cast bridges Zod-typed → narrowed
shape (Zod uses OpaqueObject for nested Message[] / WorkspacePane[] etc.,
which are dev-time-narrowed via the existing hand-maintained types).
(5) apps/web/package.json — adds zod ^3.23.8 as a direct dep. Was a
transitive dep via ai-sdk / postgres; promotion makes the import legal.
(6) Tests: 15 new in ws-frames.test.ts covering happy-path per major
frame type, drift-catchers (unknown type, invalid enum, non-UUID, negative
tokens), parts-authoritative read variants, the mirror-file diff check,
and four broker fail-closed scenarios. 219/219 server tests pass (was
204; +15 new).
Two recon corrections to the dispatch brief, both flagged before
implementation:
- No 'parts_appended' frame exists. The brief assumed one; the codebase
reads parts via the messages_with_parts view after message_complete
triggers a refetch. MessagePartSchema is therefore unused this batch.
- No 'tool_running' frame exists. The brief listed it as standalone; it
is in fact a 'chat_status' variant ({ status: 'tool_running' }), already
covered by ChatStatusFrame.
Smoke: clean container boot, no validation errors in the server log. Real
production frames pass validation (the schemas were derived from the
existing hand-maintained types in api/types.ts and sessionEvents.ts).
v1.13.11-b will follow immediately: convert all ~85 raw broker.publish /
ctx.publish call sites across 11 server files to publishFrame /
publishUserFrame. Mechanical edit; the wiring done here means the diff
in -b is just the call-site swaps.
~310 LoC across 9 files (4 new + 5 modified).
294 lines
10 KiB
TypeScript
294 lines
10 KiB
TypeScript
import { useEffect, useRef, useState } from 'react';
|
|
import { toast } from 'sonner';
|
|
import type { Message, WsFrame } from '@/api/types';
|
|
import { WsFrameSchema } from '@/api/ws-frames';
|
|
import { api } from '@/api/client';
|
|
import { sessionEvents } from './sessionEvents';
|
|
import { recordUsage } from './useChatThroughput';
|
|
|
|
// session_renamed frame removed from WsFrame — it was declared but never
|
|
// published on the per-session WS channel (server publishes via broker.publishUser
|
|
// since v1.4). chat_renamed remains; auto_name.ts publishes it on session WS.
|
|
|
|
interface State {
|
|
messages: Message[];
|
|
connected: boolean;
|
|
error: string | null;
|
|
}
|
|
|
|
function applyFrame(state: State, frame: WsFrame): State {
|
|
switch (frame.type) {
|
|
case 'snapshot': {
|
|
return { ...state, messages: frame.messages };
|
|
}
|
|
case 'message_started': {
|
|
const exists = state.messages.some((m) => m.id === frame.message_id);
|
|
if (exists) return state;
|
|
const newMsg: Message = {
|
|
id: frame.message_id,
|
|
session_id: '',
|
|
chat_id: frame.chat_id ?? '',
|
|
role: frame.role,
|
|
content: '',
|
|
kind: 'message',
|
|
tool_calls: null,
|
|
tool_results: null,
|
|
// v1.8.2: cap-hit sentinels arrive role='system' and are static, so
|
|
// skipping the streaming dot for them keeps the UI accurate.
|
|
status: frame.role === 'system' ? 'complete' : 'streaming',
|
|
last_seq: 0,
|
|
tokens_used: null,
|
|
ctx_used: null,
|
|
ctx_max: null,
|
|
started_at: null,
|
|
finished_at: null,
|
|
created_at: new Date().toISOString(),
|
|
metadata: null,
|
|
};
|
|
return { ...state, messages: [...state.messages, newMsg] };
|
|
}
|
|
case 'delta': {
|
|
const next = state.messages.map((m) =>
|
|
m.id === frame.message_id ? { ...m, content: m.content + frame.content } : m
|
|
);
|
|
return { ...state, messages: next };
|
|
}
|
|
case 'tool_call': {
|
|
const next = state.messages.map((m) =>
|
|
m.id === frame.message_id
|
|
? { ...m, tool_calls: [...(m.tool_calls ?? []), frame.tool_call] }
|
|
: m
|
|
);
|
|
return { ...state, messages: next };
|
|
}
|
|
case 'tool_result': {
|
|
const exists = state.messages.some((m) => m.id === frame.tool_message_id);
|
|
if (exists) {
|
|
const next = state.messages.map((m) =>
|
|
m.id === frame.tool_message_id
|
|
? {
|
|
...m,
|
|
role: 'tool' as const,
|
|
tool_results: {
|
|
tool_call_id: frame.tool_call_id,
|
|
output: frame.output,
|
|
truncated: frame.truncated,
|
|
...(frame.error ? { error: frame.error } : {}),
|
|
},
|
|
status: 'complete' as const,
|
|
}
|
|
: m
|
|
);
|
|
return { ...state, messages: next };
|
|
}
|
|
const newMsg: Message = {
|
|
id: frame.tool_message_id,
|
|
session_id: '',
|
|
chat_id: frame.chat_id ?? '',
|
|
role: 'tool',
|
|
content: '',
|
|
kind: 'message',
|
|
tool_calls: null,
|
|
tool_results: {
|
|
tool_call_id: frame.tool_call_id,
|
|
output: frame.output,
|
|
truncated: frame.truncated,
|
|
...(frame.error ? { error: frame.error } : {}),
|
|
},
|
|
status: 'complete',
|
|
last_seq: 0,
|
|
tokens_used: null,
|
|
ctx_used: null,
|
|
ctx_max: null,
|
|
started_at: null,
|
|
finished_at: null,
|
|
created_at: new Date().toISOString(),
|
|
metadata: null,
|
|
};
|
|
return { ...state, messages: [...state.messages, newMsg] };
|
|
}
|
|
case 'message_complete': {
|
|
const next = state.messages.map((m) =>
|
|
m.id === frame.message_id
|
|
? {
|
|
...m,
|
|
status: 'complete' as const,
|
|
...(frame.tokens_used !== undefined ? { tokens_used: frame.tokens_used } : {}),
|
|
...(frame.ctx_used !== undefined ? { ctx_used: frame.ctx_used } : {}),
|
|
...(frame.ctx_max !== undefined ? { ctx_max: frame.ctx_max } : {}),
|
|
...(frame.started_at !== undefined ? { started_at: frame.started_at } : {}),
|
|
...(frame.finished_at !== undefined ? { finished_at: frame.finished_at } : {}),
|
|
// v1.8.2: cap-hit sentinels (and future stamped metadata) ride
|
|
// in on this terminal frame so the reducer can attach it
|
|
// without waiting for a refetch.
|
|
...(frame.metadata !== undefined ? { metadata: frame.metadata } : {}),
|
|
}
|
|
: m
|
|
);
|
|
return { ...state, messages: next };
|
|
}
|
|
case 'usage': {
|
|
// v1.12.2: live throughput. Side-effects into the module-level
|
|
// singleton consumed by ChatThroughput; no message-state mutation.
|
|
// chat_id is the optional ws-frame field; usage frames always include it.
|
|
if (frame.chat_id) {
|
|
recordUsage(frame.chat_id, {
|
|
completion_tokens: frame.completion_tokens,
|
|
ctx_used: frame.ctx_used,
|
|
ctx_max: frame.ctx_max,
|
|
});
|
|
}
|
|
return state;
|
|
}
|
|
case 'messages_deleted': {
|
|
const removeSet = new Set(frame.message_ids);
|
|
return {
|
|
...state,
|
|
messages: state.messages.filter((m) => !removeSet.has(m.id)),
|
|
};
|
|
}
|
|
case 'chat_renamed': {
|
|
sessionEvents.emit({
|
|
type: 'chat_updated',
|
|
chat_id: frame.chat_id,
|
|
session_id: '',
|
|
name: frame.name,
|
|
updated_at: new Date().toISOString(),
|
|
});
|
|
return state;
|
|
}
|
|
case 'error': {
|
|
// v1.8.2: when the frame carries a structured reason, stamp it onto the
|
|
// failed message's metadata so the bubble can render specifics inline
|
|
// (the WS error frame is one-shot; refresh-safe rendering needs the
|
|
// value persisted on the message).
|
|
const errorMeta = frame.reason
|
|
? { kind: 'error' as const, error_reason: frame.reason, error_text: frame.error }
|
|
: null;
|
|
const next = frame.message_id
|
|
? state.messages.map((m) =>
|
|
m.id === frame.message_id
|
|
? {
|
|
...m,
|
|
status: 'failed' as const,
|
|
...(errorMeta ? { metadata: errorMeta } : {}),
|
|
}
|
|
: m
|
|
)
|
|
: state.messages;
|
|
return { ...state, messages: next, error: frame.error };
|
|
}
|
|
case 'compacted': {
|
|
// v1.11: side effects (refetch + toast) live in ws.onmessage; the
|
|
// reducer just no-ops so TS exhaustiveness is satisfied without
|
|
// duplicating async work inside a synchronous reducer.
|
|
return state;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Matches useUserEvents — exponential backoff with the same ceiling so the
|
|
// two channels reconnect on the same cadence after a network handoff.
|
|
const RECONNECT_INITIAL_MS = 1000;
|
|
const RECONNECT_MAX_MS = 30_000;
|
|
|
|
export function useSessionStream(sessionId: string | undefined) {
|
|
const [state, setState] = useState<State>({ messages: [], connected: false, error: null });
|
|
const wsRef = useRef<WebSocket | null>(null);
|
|
|
|
useEffect(() => {
|
|
if (!sessionId) return;
|
|
|
|
setState({ messages: [], connected: false, error: null });
|
|
|
|
let unmounted = false;
|
|
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
|
let reconnectDelay = RECONNECT_INITIAL_MS;
|
|
|
|
const connect = () => {
|
|
if (unmounted) return;
|
|
const proto = window.location.protocol === 'https:' ? 'wss' : 'ws';
|
|
const url = `${proto}://${window.location.host}/api/ws/sessions/${sessionId}`;
|
|
const ws = new WebSocket(url);
|
|
wsRef.current = ws;
|
|
|
|
ws.onopen = () => {
|
|
reconnectDelay = RECONNECT_INITIAL_MS;
|
|
setState((s) => ({ ...s, connected: true, error: null }));
|
|
};
|
|
ws.onmessage = (ev) => {
|
|
// v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid
|
|
// frames are logged and dropped. WsFrameSchema is the runtime guard;
|
|
// the hand-maintained WsFrame type stays as the narrowed dev-time
|
|
// shape (Zod uses OpaqueObject for nested types like Message[]). One
|
|
// cast bridges the two.
|
|
let raw: unknown;
|
|
try {
|
|
raw = JSON.parse(typeof ev.data === 'string' ? ev.data : '');
|
|
} catch (err) {
|
|
console.warn('bad ws frame (parse)', err);
|
|
return;
|
|
}
|
|
const validated = WsFrameSchema.safeParse(raw);
|
|
if (!validated.success) {
|
|
console.error('ws-frame-validation-failed (session channel)', {
|
|
frame_type: (raw as { type?: unknown })?.type,
|
|
errors: validated.error.flatten(),
|
|
});
|
|
return;
|
|
}
|
|
try {
|
|
const frame = validated.data as unknown as WsFrame;
|
|
// v1.11: on a compaction completion, re-fetch the message list so
|
|
// the new summary row + the cohort of compacted_at-stamped older
|
|
// rows render correctly. We dispatch the fresh list as a synthetic
|
|
// 'snapshot' frame so the reducer's existing path handles state
|
|
// replacement (no need for a parallel "refetched" path).
|
|
// The toast is purely UX feedback; missing it would still leave
|
|
// the chat in a valid state.
|
|
if (frame.type === 'compacted') {
|
|
toast.success('Context compacted to free space');
|
|
void api.messages
|
|
.list(frame.session_id)
|
|
.then((messages) => {
|
|
setState((s) => applyFrame(s, { type: 'snapshot', messages }));
|
|
})
|
|
.catch((err: unknown) => {
|
|
console.warn('compacted refetch failed', err);
|
|
});
|
|
return;
|
|
}
|
|
setState((s) => applyFrame(s, frame));
|
|
} catch (err) {
|
|
console.warn('bad ws frame', err);
|
|
}
|
|
};
|
|
// v1.8.1: WS errors no longer surface as user-facing toasts here. The
|
|
// user-channel hook (useUserEvents) owns the debounced "reconnecting…"
|
|
// UI; this channel just reconnects silently on the same backoff.
|
|
ws.onerror = () => {
|
|
try { ws.close(); } catch {}
|
|
};
|
|
ws.onclose = () => {
|
|
if (unmounted) return;
|
|
setState((s) => ({ ...s, connected: false }));
|
|
const delay = reconnectDelay;
|
|
reconnectDelay = Math.min(reconnectDelay * 2, RECONNECT_MAX_MS);
|
|
reconnectTimer = setTimeout(connect, delay);
|
|
};
|
|
};
|
|
|
|
connect();
|
|
|
|
return () => {
|
|
unmounted = true;
|
|
if (reconnectTimer) clearTimeout(reconnectTimer);
|
|
const ws = wsRef.current;
|
|
wsRef.current = null;
|
|
if (ws) try { ws.close(); } catch {}
|
|
};
|
|
}, [sessionId]);
|
|
|
|
return state;
|
|
}
|