v1.13.11-a: WS frame schemas + frontend receive validation

First half of the WebSocket-frame-typing batch (split per recon — total
scope was ~535 LoC, larger than the roadmap's ~300 estimate, so the
server-side publish-site conversion lands separately in v1.13.11-b).

Phase A scope:

(1) apps/server/src/types/ws-frames.ts (NEW) — Zod schemas for all 27
wire-format WS frame types. Discriminated union (WsFrameSchema) plus
KNOWN_FRAME_TYPES const for diagnostic lookup. UUIDs are z.string().
uuid(); model-emitted tool_call_id stays z.string().min(1) since OpenAI-
compatible APIs emit "call_<random>" not UUID. Per-kind payload narrowing
(tool args, message_parts payloads) intentionally stays z.unknown() —
frame-level drift detection is the goal; deep payload validation is
follow-up work.

(2) apps/web/src/api/ws-frames.ts (NEW) — byte-identical mirror of the
authoritative server file. No path alias from web→server in the existing
tsconfig setup; sync-by-hand was chosen over a new packages/shared/ dir.
A ws-frames.test.ts test asserts the two files match.

(3) apps/server/src/services/broker.ts — adds publishFrame() and
publishUserFrame() methods to the Broker interface. Both validate via
WsFrameSchema and fail-closed: log + drop on invalid. createBroker now
accepts an optional FastifyBaseLogger so validation failures land in
the pino stream (with console.error fallback for unit tests). The
existing publish() / publishUser() raw methods stay legal — they get
converted to the typed variants in v1.13.11-b.

(4) apps/web/src/hooks/useSessionStream.ts + useUserEvents.ts — wrap
ws.onmessage with WsFrameSchema.safeParse. Fail-closed: invalid frames
log + return without dispatching. Hand-maintained WsFrame and
SessionEvent types stay in place; one cast bridges Zod-typed → narrowed
shape (Zod uses OpaqueObject for nested Message[] / WorkspacePane[] etc.,
which are dev-time-narrowed via the existing hand-maintained types).

(5) apps/web/package.json — adds zod ^3.23.8 as a direct dep. Was a
transitive dep via ai-sdk / postgres; promotion makes the import legal.

(6) Tests: 15 new in ws-frames.test.ts covering happy-path per major
frame type, drift-catchers (unknown type, invalid enum, non-UUID, negative
tokens), parts-authoritative read variants, the mirror-file diff check,
and four broker fail-closed scenarios. 219/219 server tests pass (was
204; +15 new).

Two recon corrections to the dispatch brief, both flagged before
implementation:

- No 'parts_appended' frame exists. The brief assumed one; the codebase
  reads parts via the messages_with_parts view after message_complete
  triggers a refetch. MessagePartSchema is therefore unused this batch.
- No 'tool_running' frame exists. The brief listed it as standalone; it
  is in fact a 'chat_status' variant ({ status: 'tool_running' }), already
  covered by ChatStatusFrame.

Smoke: clean container boot, no validation errors in the server log. Real
production frames pass validation (the schemas were derived from the
existing hand-maintained types in api/types.ts and sessionEvents.ts).

v1.13.11-b will follow immediately: convert all ~85 raw broker.publish /
ctx.publish call sites across 11 server files to publishFrame /
publishUserFrame. Mechanical edit; the wiring done here means the diff
in -b is just the call-site swaps.

~310 LoC across 9 files (4 new + 5 modified).
This commit is contained in:
2026-05-22 15:48:32 +00:00
parent 34cbecf975
commit 8b568b36d3
9 changed files with 940 additions and 8 deletions

View File

@@ -0,0 +1,314 @@
// v1.13.11-a: Zod schemas for every WebSocket frame published by the server.
// Validation runs both on send (broker.publishFrame / publishUserFrame) and
// on receive (apps/web/src/hooks/useSessionStream + useUserEvents). Catches
// silent protocol drift between publisher and consumer.
//
// IMPORTANT: This file is duplicated byte-identical at
// apps/web/src/api/ws-frames.ts. The two apps have separate tsconfigs and
// no path alias; the duplication is sync-by-hand. A test asserts the two
// files match. If you change one, change the other.
//
// Per-kind payload schemas (tool_call args, message_parts payloads, etc.)
// stay z.unknown() in v1.13.11. Frame-level drift detection is the goal;
// deep payload validation is follow-up work.
import { z } from 'zod';
// ---- shared primitives -----------------------------------------------------
const Uuid = z.string().uuid();
// Tool call IDs are model-emitted (e.g. "call_abc123") — not UUIDs.
const ToolCallId = z.string().min(1);
const IsoTimestamp = z.string().min(1);
const ChatStatusValue = z.enum([
'streaming',
'tool_running',
'waiting_for_input',
'idle',
'error',
]);
const ErrorReasonValue = z.enum([
'llm_provider_error',
'doom_loop',
'doom_loop_summary_failed',
'cap_hit',
'cap_hit_summary_failed',
]);
const MessageRoleValue = z.enum(['user', 'assistant', 'system', 'tool']);
const ToolCallShape = z.object({
id: ToolCallId,
name: z.string().min(1),
args: z.record(z.string(), z.unknown()),
});
// Free-form bags: opaque to the frame schema; deep validation is out of
// scope. passthrough preserves unknown keys so the consumer sees the full
// shape even when this schema doesn't enumerate every field.
const OpaqueObject = z.object({}).passthrough();
// ---- per-session channel frames --------------------------------------------
export const SnapshotFrame = z.object({
type: z.literal('snapshot'),
messages: z.array(OpaqueObject),
});
export const MessageStartedFrame = z.object({
type: z.literal('message_started'),
message_id: Uuid,
chat_id: Uuid.optional(),
role: MessageRoleValue,
});
export const DeltaFrame = z.object({
type: z.literal('delta'),
message_id: Uuid,
chat_id: Uuid.optional(),
content: z.string(),
});
export const ToolCallFrame = z.object({
type: z.literal('tool_call'),
message_id: Uuid,
chat_id: Uuid.optional(),
tool_call: ToolCallShape,
});
export const ToolResultFrame = z.object({
type: z.literal('tool_result'),
tool_message_id: Uuid,
chat_id: Uuid.optional(),
tool_call_id: ToolCallId,
output: z.unknown(),
truncated: z.boolean(),
error: z.string().optional(),
});
export const MessageCompleteFrame = z.object({
type: z.literal('message_complete'),
message_id: Uuid,
chat_id: Uuid.optional(),
tokens_used: z.number().int().nonnegative().nullable().optional(),
ctx_used: z.number().int().nonnegative().nullable().optional(),
ctx_max: z.number().int().positive().nullable().optional(),
started_at: IsoTimestamp.nullable().optional(),
finished_at: IsoTimestamp.nullable().optional(),
model: z.string().optional(),
metadata: OpaqueObject.nullable().optional(),
});
export const UsageFrame = z.object({
type: z.literal('usage'),
message_id: Uuid,
chat_id: Uuid.optional(),
completion_tokens: z.number().int().nonnegative().nullable(),
ctx_used: z.number().int().nonnegative().nullable(),
ctx_max: z.number().int().positive().nullable(),
});
export const MessagesDeletedFrame = z.object({
type: z.literal('messages_deleted'),
message_ids: z.array(Uuid),
chat_id: Uuid.optional(),
});
export const ChatRenamedFrame = z.object({
type: z.literal('chat_renamed'),
chat_id: Uuid,
name: z.string(),
});
export const CompactedFrame = z.object({
type: z.literal('compacted'),
session_id: Uuid,
chat_id: Uuid,
summary_message_id: Uuid,
});
export const ErrorFrame = z.object({
type: z.literal('error'),
message_id: Uuid.optional(),
chat_id: Uuid.optional(),
error: z.string(),
reason: ErrorReasonValue.optional(),
});
// ---- per-user channel frames (sidebar refresh) -----------------------------
export const ChatStatusFrame = z.object({
type: z.literal('chat_status'),
chat_id: Uuid,
status: ChatStatusValue,
at: IsoTimestamp,
reason: ErrorReasonValue.optional(),
});
export const SessionUpdatedFrame = z.object({
type: z.literal('session_updated'),
session_id: Uuid,
project_id: Uuid,
name: z.string(),
updated_at: IsoTimestamp,
});
export const SessionRenamedFrame = z.object({
type: z.literal('session_renamed'),
session_id: Uuid,
name: z.string(),
});
export const SessionCreatedFrame = z.object({
type: z.literal('session_created'),
session: OpaqueObject,
project_id: Uuid,
});
export const SessionArchivedFrame = z.object({
type: z.literal('session_archived'),
session_id: Uuid,
project_id: Uuid,
});
export const SessionDeletedFrame = z.object({
type: z.literal('session_deleted'),
session_id: Uuid,
project_id: Uuid,
});
export const SessionWorkspaceUpdatedFrame = z.object({
type: z.literal('session_workspace_updated'),
session_id: Uuid,
workspace_panes: z.array(OpaqueObject),
});
export const ChatCreatedFrame = z.object({
type: z.literal('chat_created'),
chat: OpaqueObject,
session_id: Uuid,
});
export const ChatUpdatedFrame = z.object({
type: z.literal('chat_updated'),
chat_id: Uuid,
session_id: Uuid,
name: z.string().nullable(),
updated_at: IsoTimestamp,
});
export const ChatArchivedFrame = z.object({
type: z.literal('chat_archived'),
chat_id: Uuid,
session_id: Uuid,
});
export const ChatUnarchivedFrame = z.object({
type: z.literal('chat_unarchived'),
chat: OpaqueObject,
});
export const ChatDeletedFrame = z.object({
type: z.literal('chat_deleted'),
chat_id: Uuid,
session_id: Uuid,
});
export const ProjectCreatedFrame = z.object({
type: z.literal('project_created'),
project: OpaqueObject,
});
export const ProjectArchivedFrame = z.object({
type: z.literal('project_archived'),
project_id: Uuid,
});
export const ProjectUnarchivedFrame = z.object({
type: z.literal('project_unarchived'),
project: OpaqueObject,
});
export const ProjectUpdatedFrame = z.object({
type: z.literal('project_updated'),
project_id: Uuid,
name: z.string(),
});
export const ProjectDeletedFrame = z.object({
type: z.literal('project_deleted'),
project_id: Uuid,
});
// ---- discriminated union ---------------------------------------------------
export const WsFrameSchema = z.discriminatedUnion('type', [
// per-session
SnapshotFrame,
MessageStartedFrame,
DeltaFrame,
ToolCallFrame,
ToolResultFrame,
MessageCompleteFrame,
UsageFrame,
MessagesDeletedFrame,
ChatRenamedFrame,
CompactedFrame,
ErrorFrame,
// per-user
ChatStatusFrame,
SessionUpdatedFrame,
SessionRenamedFrame,
SessionCreatedFrame,
SessionArchivedFrame,
SessionDeletedFrame,
SessionWorkspaceUpdatedFrame,
ChatCreatedFrame,
ChatUpdatedFrame,
ChatArchivedFrame,
ChatUnarchivedFrame,
ChatDeletedFrame,
ProjectCreatedFrame,
ProjectArchivedFrame,
ProjectUnarchivedFrame,
ProjectUpdatedFrame,
ProjectDeletedFrame,
]);
export type WsFrame = z.infer<typeof WsFrameSchema>;
// Convenience: the set of known frame types. Useful for the publishFrame
// helper to log the offending type name when validation fails. Kept in sync
// by hand with the discriminated union above.
export const KNOWN_FRAME_TYPES: readonly WsFrame['type'][] = [
'snapshot',
'message_started',
'delta',
'tool_call',
'tool_result',
'message_complete',
'usage',
'messages_deleted',
'chat_renamed',
'compacted',
'error',
'chat_status',
'session_updated',
'session_renamed',
'session_created',
'session_archived',
'session_deleted',
'session_workspace_updated',
'chat_created',
'chat_updated',
'chat_archived',
'chat_unarchived',
'chat_deleted',
'project_created',
'project_archived',
'project_unarchived',
'project_updated',
'project_deleted',
] as const;

View File

@@ -1,6 +1,7 @@
import { useEffect, useRef, useState } from 'react';
import { toast } from 'sonner';
import type { Message, WsFrame } from '@/api/types';
import { WsFrameSchema } from '@/api/ws-frames';
import { api } from '@/api/client';
import { sessionEvents } from './sessionEvents';
import { recordUsage } from './useChatThroughput';
@@ -216,8 +217,28 @@ export function useSessionStream(sessionId: string | undefined) {
setState((s) => ({ ...s, connected: true, error: null }));
};
ws.onmessage = (ev) => {
// v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid
// frames are logged and dropped. WsFrameSchema is the runtime guard;
// the hand-maintained WsFrame type stays as the narrowed dev-time
// shape (Zod uses OpaqueObject for nested types like Message[]). One
// cast bridges the two.
let raw: unknown;
try {
const frame = JSON.parse(typeof ev.data === 'string' ? ev.data : '') as WsFrame;
raw = JSON.parse(typeof ev.data === 'string' ? ev.data : '');
} catch (err) {
console.warn('bad ws frame (parse)', err);
return;
}
const validated = WsFrameSchema.safeParse(raw);
if (!validated.success) {
console.error('ws-frame-validation-failed (session channel)', {
frame_type: (raw as { type?: unknown })?.type,
errors: validated.error.flatten(),
});
return;
}
try {
const frame = validated.data as unknown as WsFrame;
// v1.11: on a compaction completion, re-fetch the message list so
// the new summary row + the cohort of compacted_at-stamped older
// rows render correctly. We dispatch the fresh list as a synthetic

View File

@@ -1,4 +1,5 @@
import { useEffect } from 'react';
import { WsFrameSchema } from '@/api/ws-frames';
import { sessionEvents } from './sessionEvents';
import { createWsReconnectToast } from './wsReconnectToast';
@@ -38,14 +39,33 @@ export function useUserEvents(): void {
};
ws.onmessage = (ev) => {
// v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid
// frames are logged and dropped instead of dispatched onto the
// sessionEvents bus where a stale or wrong shape would silently
// corrupt sidebar / chat state.
let raw: unknown;
try {
const parsed: unknown = JSON.parse(ev.data);
if (parsed && typeof (parsed as { type?: unknown }).type === 'string') {
sessionEvents.emit(parsed as import('./sessionEvents').SessionEvent);
}
raw = JSON.parse(ev.data);
} catch (err) {
console.warn('useUserEvents: failed to parse frame', err);
return;
}
const validated = WsFrameSchema.safeParse(raw);
if (!validated.success) {
console.error('ws-frame-validation-failed (user channel)', {
frame_type: (raw as { type?: unknown })?.type,
errors: validated.error.flatten(),
});
return;
}
// Bridge cast: Zod's union is broader than SessionEvent (it includes
// per-session-channel frames too, which never arrive on the user
// channel). sessionEvents.emit only dispatches frames whose type
// appears in SessionEvent; the narrowing happens via the existing
// useSidebar.ts applyEvent switch.
sessionEvents.emit(
validated.data as unknown as import('./sessionEvents').SessionEvent,
);
};
ws.onclose = () => {