v1.13.11-a: WS frame schemas + frontend receive validation
First half of the WebSocket-frame-typing batch (split per recon — total
scope was ~535 LoC, larger than the roadmap's ~300 estimate, so the
server-side publish-site conversion lands separately in v1.13.11-b).
Phase A scope:
(1) apps/server/src/types/ws-frames.ts (NEW) — Zod schemas for all 27
wire-format WS frame types. Discriminated union (WsFrameSchema) plus
KNOWN_FRAME_TYPES const for diagnostic lookup. UUIDs are z.string().
uuid(); model-emitted tool_call_id stays z.string().min(1) since OpenAI-
compatible APIs emit "call_<random>" not UUID. Per-kind payload narrowing
(tool args, message_parts payloads) intentionally stays z.unknown() —
frame-level drift detection is the goal; deep payload validation is
follow-up work.
(2) apps/web/src/api/ws-frames.ts (NEW) — byte-identical mirror of the
authoritative server file. No path alias from web→server in the existing
tsconfig setup; sync-by-hand was chosen over a new packages/shared/ dir.
A ws-frames.test.ts test asserts the two files match.
(3) apps/server/src/services/broker.ts — adds publishFrame() and
publishUserFrame() methods to the Broker interface. Both validate via
WsFrameSchema and fail-closed: log + drop on invalid. createBroker now
accepts an optional FastifyBaseLogger so validation failures land in
the pino stream (with console.error fallback for unit tests). The
existing publish() / publishUser() raw methods stay legal — they get
converted to the typed variants in v1.13.11-b.
(4) apps/web/src/hooks/useSessionStream.ts + useUserEvents.ts — wrap
ws.onmessage with WsFrameSchema.safeParse. Fail-closed: invalid frames
log + return without dispatching. Hand-maintained WsFrame and
SessionEvent types stay in place; one cast bridges Zod-typed → narrowed
shape (Zod uses OpaqueObject for nested Message[] / WorkspacePane[] etc.,
which are dev-time-narrowed via the existing hand-maintained types).
(5) apps/web/package.json — adds zod ^3.23.8 as a direct dep. Was a
transitive dep via ai-sdk / postgres; promotion makes the import legal.
(6) Tests: 15 new in ws-frames.test.ts covering happy-path per major
frame type, drift-catchers (unknown type, invalid enum, non-UUID, negative
tokens), parts-authoritative read variants, the mirror-file diff check,
and four broker fail-closed scenarios. 219/219 server tests pass (was
204; +15 new).
Two recon corrections to the dispatch brief, both flagged before
implementation:
- No 'parts_appended' frame exists. The brief assumed one; the codebase
reads parts via the messages_with_parts view after message_complete
triggers a refetch. MessagePartSchema is therefore unused this batch.
- No 'tool_running' frame exists. The brief listed it as standalone; it
is in fact a 'chat_status' variant ({ status: 'tool_running' }), already
covered by ChatStatusFrame.
Smoke: clean container boot, no validation errors in the server log. Real
production frames pass validation (the schemas were derived from the
existing hand-maintained types in api/types.ts and sessionEvents.ts).
v1.13.11-b will follow immediately: convert all ~85 raw broker.publish /
ctx.publish call sites across 11 server files to publishFrame /
publishUserFrame. Mechanical edit; the wiring done here means the diff
in -b is just the call-site swaps.
~310 LoC across 9 files (4 new + 5 modified).
This commit is contained in:
@@ -75,7 +75,7 @@ async function main() {
|
||||
return { status: dbOk ? 'ok' : 'degraded', db: dbOk };
|
||||
});
|
||||
|
||||
const broker = createBroker();
|
||||
const broker = createBroker(app.log);
|
||||
|
||||
registerProjectRoutes(app, sql, config, broker);
|
||||
registerSessionRoutes(app, sql, config, broker);
|
||||
|
||||
218
apps/server/src/services/__tests__/ws-frames.test.ts
Normal file
218
apps/server/src/services/__tests__/ws-frames.test.ts
Normal file
@@ -0,0 +1,218 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||
import { readFileSync } from 'node:fs';
|
||||
import { resolve } from 'node:path';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import {
|
||||
WsFrameSchema,
|
||||
KNOWN_FRAME_TYPES,
|
||||
type WsFrame,
|
||||
} from '../../types/ws-frames.js';
|
||||
import { createBroker } from '../broker.js';
|
||||
|
||||
const VALID_UUID_A = '00000000-0000-0000-0000-000000000001';
|
||||
const VALID_UUID_B = '00000000-0000-0000-0000-000000000002';
|
||||
const VALID_UUID_C = '00000000-0000-0000-0000-000000000003';
|
||||
const VALID_TIMESTAMP = '2026-05-22T14:30:00.000Z';
|
||||
|
||||
describe('WsFrameSchema (v1.13.11-a)', () => {
|
||||
it('accepts a well-formed chat_status frame', () => {
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'chat_status',
|
||||
chat_id: VALID_UUID_A,
|
||||
status: 'streaming',
|
||||
at: VALID_TIMESTAMP,
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it('rejects an unknown frame type', () => {
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'cosmic_ray_strike',
|
||||
chat_id: VALID_UUID_A,
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
});
|
||||
|
||||
it('rejects a chat_status frame with invalid status enum', () => {
|
||||
// v1.12.1 dropped the legacy 'working' status. Any frame still emitting it
|
||||
// should fail validation — that's a drift catcher.
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'chat_status',
|
||||
chat_id: VALID_UUID_A,
|
||||
status: 'working',
|
||||
at: VALID_TIMESTAMP,
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
});
|
||||
|
||||
it('rejects a UUID field with a non-UUID string', () => {
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'chat_status',
|
||||
chat_id: 'not-a-uuid',
|
||||
status: 'idle',
|
||||
at: VALID_TIMESTAMP,
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
});
|
||||
|
||||
it('rejects negative token counts in usage frame', () => {
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'usage',
|
||||
message_id: VALID_UUID_A,
|
||||
chat_id: VALID_UUID_B,
|
||||
completion_tokens: -1,
|
||||
ctx_used: 100,
|
||||
ctx_max: 1000,
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
});
|
||||
|
||||
it('accepts a usage frame with nullable token counts (pre-v1.13.7 history)', () => {
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'usage',
|
||||
message_id: VALID_UUID_A,
|
||||
chat_id: VALID_UUID_B,
|
||||
completion_tokens: null,
|
||||
ctx_used: null,
|
||||
ctx_max: null,
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it('accepts a tool_result frame with non-UUID tool_call_id (model-emitted)', () => {
|
||||
// Model-emitted tool_call_ids look like "call_abc123", not UUIDs.
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'tool_result',
|
||||
tool_message_id: VALID_UUID_A,
|
||||
chat_id: VALID_UUID_B,
|
||||
tool_call_id: 'call_abc123',
|
||||
output: { whatever: true },
|
||||
truncated: false,
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it('accepts a compacted frame', () => {
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'compacted',
|
||||
session_id: VALID_UUID_A,
|
||||
chat_id: VALID_UUID_B,
|
||||
summary_message_id: VALID_UUID_C,
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it('accepts a session_workspace_updated frame', () => {
|
||||
const result = WsFrameSchema.safeParse({
|
||||
type: 'session_workspace_updated',
|
||||
session_id: VALID_UUID_A,
|
||||
workspace_panes: [{ id: 'p1', kind: 'chat', chatIds: [], activeChatIdx: 0 }],
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it('every KNOWN_FRAME_TYPES entry has a discriminated branch', () => {
|
||||
// Probe each known type by attempting a minimal valid construction.
|
||||
// Failure here means the union and the KNOWN_FRAME_TYPES list drifted.
|
||||
for (const type of KNOWN_FRAME_TYPES) {
|
||||
const probe = WsFrameSchema.safeParse({ type, __dummy__: true });
|
||||
// We expect FAILURE on every type because we're missing required fields,
|
||||
// but the failure must be ABOUT the missing fields, not about an unknown
|
||||
// type. A "Invalid discriminator value" error means the type isn't in
|
||||
// the union — that's a drift.
|
||||
if (probe.success) continue;
|
||||
const issues = probe.error.issues;
|
||||
const hasInvalidDiscriminator = issues.some(
|
||||
(i) => i.code === 'invalid_union_discriminator',
|
||||
);
|
||||
expect(hasInvalidDiscriminator, `frame type '${type}' is missing from the discriminated union`).toBe(false);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('ws-frames.ts file mirror parity', () => {
|
||||
it('apps/server and apps/web copies are byte-identical', () => {
|
||||
const here = fileURLToPath(import.meta.url);
|
||||
const serverPath = resolve(here, '../../../types/ws-frames.ts');
|
||||
const webPath = resolve(here, '../../../../../web/src/api/ws-frames.ts');
|
||||
const serverContent = readFileSync(serverPath, 'utf8');
|
||||
const webContent = readFileSync(webPath, 'utf8');
|
||||
expect(webContent, 'apps/web/src/api/ws-frames.ts must be byte-identical to apps/server/src/types/ws-frames.ts').toBe(serverContent);
|
||||
});
|
||||
});
|
||||
|
||||
describe('broker.publishFrame / publishUserFrame fail-closed behavior', () => {
|
||||
let logErrors: Array<{ obj: unknown; msg: string }>;
|
||||
let mockLog: Parameters<typeof createBroker>[0];
|
||||
|
||||
beforeEach(() => {
|
||||
logErrors = [];
|
||||
mockLog = {
|
||||
error: (obj: unknown, msg: string) => {
|
||||
logErrors.push({ obj, msg });
|
||||
},
|
||||
info: () => {},
|
||||
warn: () => {},
|
||||
debug: () => {},
|
||||
trace: () => {},
|
||||
fatal: () => {},
|
||||
child: () => mockLog as never,
|
||||
level: 'info',
|
||||
silent: () => {},
|
||||
} as unknown as Parameters<typeof createBroker>[0];
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it('publishFrame delivers a valid frame to subscribers', () => {
|
||||
const broker = createBroker(mockLog);
|
||||
const received: WsFrame[] = [];
|
||||
broker.subscribe('sess-1', (f) => received.push(f as WsFrame));
|
||||
broker.publishFrame('sess-1', {
|
||||
type: 'delta',
|
||||
message_id: VALID_UUID_A,
|
||||
chat_id: VALID_UUID_B,
|
||||
content: 'hello',
|
||||
});
|
||||
expect(received).toHaveLength(1);
|
||||
expect((received[0] as { type: string }).type).toBe('delta');
|
||||
expect(logErrors).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('publishFrame drops + logs an invalid frame instead of delivering it', () => {
|
||||
const broker = createBroker(mockLog);
|
||||
const received: WsFrame[] = [];
|
||||
broker.subscribe('sess-1', (f) => received.push(f as WsFrame));
|
||||
broker.publishFrame('sess-1', {
|
||||
type: 'delta',
|
||||
message_id: 'not-a-uuid',
|
||||
content: 'hello',
|
||||
} as never);
|
||||
expect(received).toHaveLength(0);
|
||||
expect(logErrors).toHaveLength(1);
|
||||
expect(logErrors[0]!.msg).toMatch(/ws-frame-validation-failed/);
|
||||
});
|
||||
|
||||
it('publishUserFrame drops + logs an invalid user-channel frame', () => {
|
||||
const broker = createBroker(mockLog);
|
||||
const received: WsFrame[] = [];
|
||||
broker.subscribeUser('default', (f) => received.push(f as WsFrame));
|
||||
broker.publishUserFrame('default', {
|
||||
type: 'chat_status',
|
||||
chat_id: VALID_UUID_A,
|
||||
status: 'working', // v1.12.1 dropped this enum value
|
||||
at: VALID_TIMESTAMP,
|
||||
} as never);
|
||||
expect(received).toHaveLength(0);
|
||||
expect(logErrors).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('publishFrame validation failure does not throw (no cascade into stream-phase)', () => {
|
||||
const broker = createBroker(mockLog);
|
||||
expect(() =>
|
||||
broker.publishFrame('sess-1', { type: 'unknown_type' } as never),
|
||||
).not.toThrow();
|
||||
});
|
||||
});
|
||||
@@ -1,3 +1,6 @@
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import { WsFrameSchema, type WsFrame } from '../types/ws-frames.js';
|
||||
|
||||
export type Frame = Record<string, unknown> & { type: string };
|
||||
export type Listener = (frame: Frame) => void;
|
||||
|
||||
@@ -6,9 +9,15 @@ export interface Broker {
|
||||
subscribe(sessionId: string, listener: Listener): () => void;
|
||||
publishUser(user: string, frame: Frame): void;
|
||||
subscribeUser(user: string, listener: Listener): () => void;
|
||||
// v1.13.11-a: typed publish wrappers. Validate against WsFrameSchema and
|
||||
// delegate to publish / publishUser on success; log + drop on failure
|
||||
// (fail-closed). Existing publish / publishUser callers stay legal — they
|
||||
// get converted to the typed variant in v1.13.11-b.
|
||||
publishFrame(sessionId: string, frame: WsFrame): void;
|
||||
publishUserFrame(user: string, frame: WsFrame): void;
|
||||
}
|
||||
|
||||
export function createBroker(): Broker {
|
||||
export function createBroker(log?: FastifyBaseLogger): Broker {
|
||||
const topics = new Map<string, Set<Listener>>();
|
||||
const userTopics = new Map<string, Set<Listener>>();
|
||||
|
||||
@@ -39,6 +48,28 @@ export function createBroker(): Broker {
|
||||
};
|
||||
}
|
||||
|
||||
// v1.13.11-a: shared validation guard. Returns the parsed/typed frame on
|
||||
// success, or null on failure (after logging). Brief mandates fail-closed
|
||||
// semantics: invalid frames don't reach subscribers; throwing here could
|
||||
// cascade into stream-phase aborts which v1.13.7 already had to defend
|
||||
// against, so log + drop is the right shape.
|
||||
function validate(channel: 'session' | 'user', key: string, frame: WsFrame): WsFrame | null {
|
||||
const parsed = WsFrameSchema.safeParse(frame);
|
||||
if (parsed.success) return parsed.data;
|
||||
const frameType = (frame as { type?: unknown })?.type;
|
||||
const errors = parsed.error.flatten();
|
||||
if (log) {
|
||||
log.error(
|
||||
{ channel, key, frame_type: frameType, errors },
|
||||
'ws-frame-validation-failed: dropping invalid frame',
|
||||
);
|
||||
} else {
|
||||
// Fallback for callers that didn't pass a logger (e.g. unit tests).
|
||||
console.error('ws-frame-validation-failed', { channel, key, frame_type: frameType, errors });
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
publish(sessionId, frame) {
|
||||
publishTo(topics, sessionId, frame);
|
||||
@@ -52,5 +83,15 @@ export function createBroker(): Broker {
|
||||
subscribeUser(user, listener) {
|
||||
return subscribeTo(userTopics, user, listener);
|
||||
},
|
||||
publishFrame(sessionId, frame) {
|
||||
const valid = validate('session', sessionId, frame);
|
||||
if (!valid) return;
|
||||
publishTo(topics, sessionId, valid as Frame);
|
||||
},
|
||||
publishUserFrame(user, frame) {
|
||||
const valid = validate('user', user, frame);
|
||||
if (!valid) return;
|
||||
publishTo(userTopics, user, valid as Frame);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
314
apps/server/src/types/ws-frames.ts
Normal file
314
apps/server/src/types/ws-frames.ts
Normal file
@@ -0,0 +1,314 @@
|
||||
// v1.13.11-a: Zod schemas for every WebSocket frame published by the server.
|
||||
// Validation runs both on send (broker.publishFrame / publishUserFrame) and
|
||||
// on receive (apps/web/src/hooks/useSessionStream + useUserEvents). Catches
|
||||
// silent protocol drift between publisher and consumer.
|
||||
//
|
||||
// IMPORTANT: This file is duplicated byte-identical at
|
||||
// apps/web/src/api/ws-frames.ts. The two apps have separate tsconfigs and
|
||||
// no path alias; the duplication is sync-by-hand. A test asserts the two
|
||||
// files match. If you change one, change the other.
|
||||
//
|
||||
// Per-kind payload schemas (tool_call args, message_parts payloads, etc.)
|
||||
// stay z.unknown() in v1.13.11. Frame-level drift detection is the goal;
|
||||
// deep payload validation is follow-up work.
|
||||
|
||||
import { z } from 'zod';
|
||||
|
||||
// ---- shared primitives -----------------------------------------------------
|
||||
|
||||
const Uuid = z.string().uuid();
|
||||
// Tool call IDs are model-emitted (e.g. "call_abc123") — not UUIDs.
|
||||
const ToolCallId = z.string().min(1);
|
||||
const IsoTimestamp = z.string().min(1);
|
||||
|
||||
const ChatStatusValue = z.enum([
|
||||
'streaming',
|
||||
'tool_running',
|
||||
'waiting_for_input',
|
||||
'idle',
|
||||
'error',
|
||||
]);
|
||||
|
||||
const ErrorReasonValue = z.enum([
|
||||
'llm_provider_error',
|
||||
'doom_loop',
|
||||
'doom_loop_summary_failed',
|
||||
'cap_hit',
|
||||
'cap_hit_summary_failed',
|
||||
]);
|
||||
|
||||
const MessageRoleValue = z.enum(['user', 'assistant', 'system', 'tool']);
|
||||
|
||||
const ToolCallShape = z.object({
|
||||
id: ToolCallId,
|
||||
name: z.string().min(1),
|
||||
args: z.record(z.string(), z.unknown()),
|
||||
});
|
||||
|
||||
// Free-form bags: opaque to the frame schema; deep validation is out of
|
||||
// scope. passthrough preserves unknown keys so the consumer sees the full
|
||||
// shape even when this schema doesn't enumerate every field.
|
||||
const OpaqueObject = z.object({}).passthrough();
|
||||
|
||||
// ---- per-session channel frames --------------------------------------------
|
||||
|
||||
export const SnapshotFrame = z.object({
|
||||
type: z.literal('snapshot'),
|
||||
messages: z.array(OpaqueObject),
|
||||
});
|
||||
|
||||
export const MessageStartedFrame = z.object({
|
||||
type: z.literal('message_started'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
role: MessageRoleValue,
|
||||
});
|
||||
|
||||
export const DeltaFrame = z.object({
|
||||
type: z.literal('delta'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
content: z.string(),
|
||||
});
|
||||
|
||||
export const ToolCallFrame = z.object({
|
||||
type: z.literal('tool_call'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
tool_call: ToolCallShape,
|
||||
});
|
||||
|
||||
export const ToolResultFrame = z.object({
|
||||
type: z.literal('tool_result'),
|
||||
tool_message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
tool_call_id: ToolCallId,
|
||||
output: z.unknown(),
|
||||
truncated: z.boolean(),
|
||||
error: z.string().optional(),
|
||||
});
|
||||
|
||||
export const MessageCompleteFrame = z.object({
|
||||
type: z.literal('message_complete'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
tokens_used: z.number().int().nonnegative().nullable().optional(),
|
||||
ctx_used: z.number().int().nonnegative().nullable().optional(),
|
||||
ctx_max: z.number().int().positive().nullable().optional(),
|
||||
started_at: IsoTimestamp.nullable().optional(),
|
||||
finished_at: IsoTimestamp.nullable().optional(),
|
||||
model: z.string().optional(),
|
||||
metadata: OpaqueObject.nullable().optional(),
|
||||
});
|
||||
|
||||
export const UsageFrame = z.object({
|
||||
type: z.literal('usage'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
completion_tokens: z.number().int().nonnegative().nullable(),
|
||||
ctx_used: z.number().int().nonnegative().nullable(),
|
||||
ctx_max: z.number().int().positive().nullable(),
|
||||
});
|
||||
|
||||
export const MessagesDeletedFrame = z.object({
|
||||
type: z.literal('messages_deleted'),
|
||||
message_ids: z.array(Uuid),
|
||||
chat_id: Uuid.optional(),
|
||||
});
|
||||
|
||||
export const ChatRenamedFrame = z.object({
|
||||
type: z.literal('chat_renamed'),
|
||||
chat_id: Uuid,
|
||||
name: z.string(),
|
||||
});
|
||||
|
||||
export const CompactedFrame = z.object({
|
||||
type: z.literal('compacted'),
|
||||
session_id: Uuid,
|
||||
chat_id: Uuid,
|
||||
summary_message_id: Uuid,
|
||||
});
|
||||
|
||||
export const ErrorFrame = z.object({
|
||||
type: z.literal('error'),
|
||||
message_id: Uuid.optional(),
|
||||
chat_id: Uuid.optional(),
|
||||
error: z.string(),
|
||||
reason: ErrorReasonValue.optional(),
|
||||
});
|
||||
|
||||
// ---- per-user channel frames (sidebar refresh) -----------------------------
|
||||
|
||||
export const ChatStatusFrame = z.object({
|
||||
type: z.literal('chat_status'),
|
||||
chat_id: Uuid,
|
||||
status: ChatStatusValue,
|
||||
at: IsoTimestamp,
|
||||
reason: ErrorReasonValue.optional(),
|
||||
});
|
||||
|
||||
export const SessionUpdatedFrame = z.object({
|
||||
type: z.literal('session_updated'),
|
||||
session_id: Uuid,
|
||||
project_id: Uuid,
|
||||
name: z.string(),
|
||||
updated_at: IsoTimestamp,
|
||||
});
|
||||
|
||||
export const SessionRenamedFrame = z.object({
|
||||
type: z.literal('session_renamed'),
|
||||
session_id: Uuid,
|
||||
name: z.string(),
|
||||
});
|
||||
|
||||
export const SessionCreatedFrame = z.object({
|
||||
type: z.literal('session_created'),
|
||||
session: OpaqueObject,
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
export const SessionArchivedFrame = z.object({
|
||||
type: z.literal('session_archived'),
|
||||
session_id: Uuid,
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
export const SessionDeletedFrame = z.object({
|
||||
type: z.literal('session_deleted'),
|
||||
session_id: Uuid,
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
export const SessionWorkspaceUpdatedFrame = z.object({
|
||||
type: z.literal('session_workspace_updated'),
|
||||
session_id: Uuid,
|
||||
workspace_panes: z.array(OpaqueObject),
|
||||
});
|
||||
|
||||
export const ChatCreatedFrame = z.object({
|
||||
type: z.literal('chat_created'),
|
||||
chat: OpaqueObject,
|
||||
session_id: Uuid,
|
||||
});
|
||||
|
||||
export const ChatUpdatedFrame = z.object({
|
||||
type: z.literal('chat_updated'),
|
||||
chat_id: Uuid,
|
||||
session_id: Uuid,
|
||||
name: z.string().nullable(),
|
||||
updated_at: IsoTimestamp,
|
||||
});
|
||||
|
||||
export const ChatArchivedFrame = z.object({
|
||||
type: z.literal('chat_archived'),
|
||||
chat_id: Uuid,
|
||||
session_id: Uuid,
|
||||
});
|
||||
|
||||
export const ChatUnarchivedFrame = z.object({
|
||||
type: z.literal('chat_unarchived'),
|
||||
chat: OpaqueObject,
|
||||
});
|
||||
|
||||
export const ChatDeletedFrame = z.object({
|
||||
type: z.literal('chat_deleted'),
|
||||
chat_id: Uuid,
|
||||
session_id: Uuid,
|
||||
});
|
||||
|
||||
export const ProjectCreatedFrame = z.object({
|
||||
type: z.literal('project_created'),
|
||||
project: OpaqueObject,
|
||||
});
|
||||
|
||||
export const ProjectArchivedFrame = z.object({
|
||||
type: z.literal('project_archived'),
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
export const ProjectUnarchivedFrame = z.object({
|
||||
type: z.literal('project_unarchived'),
|
||||
project: OpaqueObject,
|
||||
});
|
||||
|
||||
export const ProjectUpdatedFrame = z.object({
|
||||
type: z.literal('project_updated'),
|
||||
project_id: Uuid,
|
||||
name: z.string(),
|
||||
});
|
||||
|
||||
export const ProjectDeletedFrame = z.object({
|
||||
type: z.literal('project_deleted'),
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
// ---- discriminated union ---------------------------------------------------
|
||||
|
||||
export const WsFrameSchema = z.discriminatedUnion('type', [
|
||||
// per-session
|
||||
SnapshotFrame,
|
||||
MessageStartedFrame,
|
||||
DeltaFrame,
|
||||
ToolCallFrame,
|
||||
ToolResultFrame,
|
||||
MessageCompleteFrame,
|
||||
UsageFrame,
|
||||
MessagesDeletedFrame,
|
||||
ChatRenamedFrame,
|
||||
CompactedFrame,
|
||||
ErrorFrame,
|
||||
// per-user
|
||||
ChatStatusFrame,
|
||||
SessionUpdatedFrame,
|
||||
SessionRenamedFrame,
|
||||
SessionCreatedFrame,
|
||||
SessionArchivedFrame,
|
||||
SessionDeletedFrame,
|
||||
SessionWorkspaceUpdatedFrame,
|
||||
ChatCreatedFrame,
|
||||
ChatUpdatedFrame,
|
||||
ChatArchivedFrame,
|
||||
ChatUnarchivedFrame,
|
||||
ChatDeletedFrame,
|
||||
ProjectCreatedFrame,
|
||||
ProjectArchivedFrame,
|
||||
ProjectUnarchivedFrame,
|
||||
ProjectUpdatedFrame,
|
||||
ProjectDeletedFrame,
|
||||
]);
|
||||
|
||||
export type WsFrame = z.infer<typeof WsFrameSchema>;
|
||||
|
||||
// Convenience: the set of known frame types. Useful for the publishFrame
|
||||
// helper to log the offending type name when validation fails. Kept in sync
|
||||
// by hand with the discriminated union above.
|
||||
export const KNOWN_FRAME_TYPES: readonly WsFrame['type'][] = [
|
||||
'snapshot',
|
||||
'message_started',
|
||||
'delta',
|
||||
'tool_call',
|
||||
'tool_result',
|
||||
'message_complete',
|
||||
'usage',
|
||||
'messages_deleted',
|
||||
'chat_renamed',
|
||||
'compacted',
|
||||
'error',
|
||||
'chat_status',
|
||||
'session_updated',
|
||||
'session_renamed',
|
||||
'session_created',
|
||||
'session_archived',
|
||||
'session_deleted',
|
||||
'session_workspace_updated',
|
||||
'chat_created',
|
||||
'chat_updated',
|
||||
'chat_archived',
|
||||
'chat_unarchived',
|
||||
'chat_deleted',
|
||||
'project_created',
|
||||
'project_archived',
|
||||
'project_unarchived',
|
||||
'project_updated',
|
||||
'project_deleted',
|
||||
] as const;
|
||||
@@ -31,7 +31,8 @@
|
||||
"shiki": "^1.29.2",
|
||||
"sonner": "^2.0.7",
|
||||
"tailwind-merge": "^3.6.0",
|
||||
"tw-animate-css": "^1.4.0"
|
||||
"tw-animate-css": "^1.4.0",
|
||||
"zod": "^3.23.8"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@tailwindcss/postcss": "^4.3.0",
|
||||
|
||||
314
apps/web/src/api/ws-frames.ts
Normal file
314
apps/web/src/api/ws-frames.ts
Normal file
@@ -0,0 +1,314 @@
|
||||
// v1.13.11-a: Zod schemas for every WebSocket frame published by the server.
|
||||
// Validation runs both on send (broker.publishFrame / publishUserFrame) and
|
||||
// on receive (apps/web/src/hooks/useSessionStream + useUserEvents). Catches
|
||||
// silent protocol drift between publisher and consumer.
|
||||
//
|
||||
// IMPORTANT: This file is duplicated byte-identical at
|
||||
// apps/web/src/api/ws-frames.ts. The two apps have separate tsconfigs and
|
||||
// no path alias; the duplication is sync-by-hand. A test asserts the two
|
||||
// files match. If you change one, change the other.
|
||||
//
|
||||
// Per-kind payload schemas (tool_call args, message_parts payloads, etc.)
|
||||
// stay z.unknown() in v1.13.11. Frame-level drift detection is the goal;
|
||||
// deep payload validation is follow-up work.
|
||||
|
||||
import { z } from 'zod';
|
||||
|
||||
// ---- shared primitives -----------------------------------------------------
|
||||
|
||||
const Uuid = z.string().uuid();
|
||||
// Tool call IDs are model-emitted (e.g. "call_abc123") — not UUIDs.
|
||||
const ToolCallId = z.string().min(1);
|
||||
const IsoTimestamp = z.string().min(1);
|
||||
|
||||
const ChatStatusValue = z.enum([
|
||||
'streaming',
|
||||
'tool_running',
|
||||
'waiting_for_input',
|
||||
'idle',
|
||||
'error',
|
||||
]);
|
||||
|
||||
const ErrorReasonValue = z.enum([
|
||||
'llm_provider_error',
|
||||
'doom_loop',
|
||||
'doom_loop_summary_failed',
|
||||
'cap_hit',
|
||||
'cap_hit_summary_failed',
|
||||
]);
|
||||
|
||||
const MessageRoleValue = z.enum(['user', 'assistant', 'system', 'tool']);
|
||||
|
||||
const ToolCallShape = z.object({
|
||||
id: ToolCallId,
|
||||
name: z.string().min(1),
|
||||
args: z.record(z.string(), z.unknown()),
|
||||
});
|
||||
|
||||
// Free-form bags: opaque to the frame schema; deep validation is out of
|
||||
// scope. passthrough preserves unknown keys so the consumer sees the full
|
||||
// shape even when this schema doesn't enumerate every field.
|
||||
const OpaqueObject = z.object({}).passthrough();
|
||||
|
||||
// ---- per-session channel frames --------------------------------------------
|
||||
|
||||
export const SnapshotFrame = z.object({
|
||||
type: z.literal('snapshot'),
|
||||
messages: z.array(OpaqueObject),
|
||||
});
|
||||
|
||||
export const MessageStartedFrame = z.object({
|
||||
type: z.literal('message_started'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
role: MessageRoleValue,
|
||||
});
|
||||
|
||||
export const DeltaFrame = z.object({
|
||||
type: z.literal('delta'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
content: z.string(),
|
||||
});
|
||||
|
||||
export const ToolCallFrame = z.object({
|
||||
type: z.literal('tool_call'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
tool_call: ToolCallShape,
|
||||
});
|
||||
|
||||
export const ToolResultFrame = z.object({
|
||||
type: z.literal('tool_result'),
|
||||
tool_message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
tool_call_id: ToolCallId,
|
||||
output: z.unknown(),
|
||||
truncated: z.boolean(),
|
||||
error: z.string().optional(),
|
||||
});
|
||||
|
||||
export const MessageCompleteFrame = z.object({
|
||||
type: z.literal('message_complete'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
tokens_used: z.number().int().nonnegative().nullable().optional(),
|
||||
ctx_used: z.number().int().nonnegative().nullable().optional(),
|
||||
ctx_max: z.number().int().positive().nullable().optional(),
|
||||
started_at: IsoTimestamp.nullable().optional(),
|
||||
finished_at: IsoTimestamp.nullable().optional(),
|
||||
model: z.string().optional(),
|
||||
metadata: OpaqueObject.nullable().optional(),
|
||||
});
|
||||
|
||||
export const UsageFrame = z.object({
|
||||
type: z.literal('usage'),
|
||||
message_id: Uuid,
|
||||
chat_id: Uuid.optional(),
|
||||
completion_tokens: z.number().int().nonnegative().nullable(),
|
||||
ctx_used: z.number().int().nonnegative().nullable(),
|
||||
ctx_max: z.number().int().positive().nullable(),
|
||||
});
|
||||
|
||||
export const MessagesDeletedFrame = z.object({
|
||||
type: z.literal('messages_deleted'),
|
||||
message_ids: z.array(Uuid),
|
||||
chat_id: Uuid.optional(),
|
||||
});
|
||||
|
||||
export const ChatRenamedFrame = z.object({
|
||||
type: z.literal('chat_renamed'),
|
||||
chat_id: Uuid,
|
||||
name: z.string(),
|
||||
});
|
||||
|
||||
export const CompactedFrame = z.object({
|
||||
type: z.literal('compacted'),
|
||||
session_id: Uuid,
|
||||
chat_id: Uuid,
|
||||
summary_message_id: Uuid,
|
||||
});
|
||||
|
||||
export const ErrorFrame = z.object({
|
||||
type: z.literal('error'),
|
||||
message_id: Uuid.optional(),
|
||||
chat_id: Uuid.optional(),
|
||||
error: z.string(),
|
||||
reason: ErrorReasonValue.optional(),
|
||||
});
|
||||
|
||||
// ---- per-user channel frames (sidebar refresh) -----------------------------
|
||||
|
||||
export const ChatStatusFrame = z.object({
|
||||
type: z.literal('chat_status'),
|
||||
chat_id: Uuid,
|
||||
status: ChatStatusValue,
|
||||
at: IsoTimestamp,
|
||||
reason: ErrorReasonValue.optional(),
|
||||
});
|
||||
|
||||
export const SessionUpdatedFrame = z.object({
|
||||
type: z.literal('session_updated'),
|
||||
session_id: Uuid,
|
||||
project_id: Uuid,
|
||||
name: z.string(),
|
||||
updated_at: IsoTimestamp,
|
||||
});
|
||||
|
||||
export const SessionRenamedFrame = z.object({
|
||||
type: z.literal('session_renamed'),
|
||||
session_id: Uuid,
|
||||
name: z.string(),
|
||||
});
|
||||
|
||||
export const SessionCreatedFrame = z.object({
|
||||
type: z.literal('session_created'),
|
||||
session: OpaqueObject,
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
export const SessionArchivedFrame = z.object({
|
||||
type: z.literal('session_archived'),
|
||||
session_id: Uuid,
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
export const SessionDeletedFrame = z.object({
|
||||
type: z.literal('session_deleted'),
|
||||
session_id: Uuid,
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
export const SessionWorkspaceUpdatedFrame = z.object({
|
||||
type: z.literal('session_workspace_updated'),
|
||||
session_id: Uuid,
|
||||
workspace_panes: z.array(OpaqueObject),
|
||||
});
|
||||
|
||||
export const ChatCreatedFrame = z.object({
|
||||
type: z.literal('chat_created'),
|
||||
chat: OpaqueObject,
|
||||
session_id: Uuid,
|
||||
});
|
||||
|
||||
export const ChatUpdatedFrame = z.object({
|
||||
type: z.literal('chat_updated'),
|
||||
chat_id: Uuid,
|
||||
session_id: Uuid,
|
||||
name: z.string().nullable(),
|
||||
updated_at: IsoTimestamp,
|
||||
});
|
||||
|
||||
export const ChatArchivedFrame = z.object({
|
||||
type: z.literal('chat_archived'),
|
||||
chat_id: Uuid,
|
||||
session_id: Uuid,
|
||||
});
|
||||
|
||||
export const ChatUnarchivedFrame = z.object({
|
||||
type: z.literal('chat_unarchived'),
|
||||
chat: OpaqueObject,
|
||||
});
|
||||
|
||||
export const ChatDeletedFrame = z.object({
|
||||
type: z.literal('chat_deleted'),
|
||||
chat_id: Uuid,
|
||||
session_id: Uuid,
|
||||
});
|
||||
|
||||
export const ProjectCreatedFrame = z.object({
|
||||
type: z.literal('project_created'),
|
||||
project: OpaqueObject,
|
||||
});
|
||||
|
||||
export const ProjectArchivedFrame = z.object({
|
||||
type: z.literal('project_archived'),
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
export const ProjectUnarchivedFrame = z.object({
|
||||
type: z.literal('project_unarchived'),
|
||||
project: OpaqueObject,
|
||||
});
|
||||
|
||||
export const ProjectUpdatedFrame = z.object({
|
||||
type: z.literal('project_updated'),
|
||||
project_id: Uuid,
|
||||
name: z.string(),
|
||||
});
|
||||
|
||||
export const ProjectDeletedFrame = z.object({
|
||||
type: z.literal('project_deleted'),
|
||||
project_id: Uuid,
|
||||
});
|
||||
|
||||
// ---- discriminated union ---------------------------------------------------
|
||||
|
||||
export const WsFrameSchema = z.discriminatedUnion('type', [
|
||||
// per-session
|
||||
SnapshotFrame,
|
||||
MessageStartedFrame,
|
||||
DeltaFrame,
|
||||
ToolCallFrame,
|
||||
ToolResultFrame,
|
||||
MessageCompleteFrame,
|
||||
UsageFrame,
|
||||
MessagesDeletedFrame,
|
||||
ChatRenamedFrame,
|
||||
CompactedFrame,
|
||||
ErrorFrame,
|
||||
// per-user
|
||||
ChatStatusFrame,
|
||||
SessionUpdatedFrame,
|
||||
SessionRenamedFrame,
|
||||
SessionCreatedFrame,
|
||||
SessionArchivedFrame,
|
||||
SessionDeletedFrame,
|
||||
SessionWorkspaceUpdatedFrame,
|
||||
ChatCreatedFrame,
|
||||
ChatUpdatedFrame,
|
||||
ChatArchivedFrame,
|
||||
ChatUnarchivedFrame,
|
||||
ChatDeletedFrame,
|
||||
ProjectCreatedFrame,
|
||||
ProjectArchivedFrame,
|
||||
ProjectUnarchivedFrame,
|
||||
ProjectUpdatedFrame,
|
||||
ProjectDeletedFrame,
|
||||
]);
|
||||
|
||||
export type WsFrame = z.infer<typeof WsFrameSchema>;
|
||||
|
||||
// Convenience: the set of known frame types. Useful for the publishFrame
|
||||
// helper to log the offending type name when validation fails. Kept in sync
|
||||
// by hand with the discriminated union above.
|
||||
export const KNOWN_FRAME_TYPES: readonly WsFrame['type'][] = [
|
||||
'snapshot',
|
||||
'message_started',
|
||||
'delta',
|
||||
'tool_call',
|
||||
'tool_result',
|
||||
'message_complete',
|
||||
'usage',
|
||||
'messages_deleted',
|
||||
'chat_renamed',
|
||||
'compacted',
|
||||
'error',
|
||||
'chat_status',
|
||||
'session_updated',
|
||||
'session_renamed',
|
||||
'session_created',
|
||||
'session_archived',
|
||||
'session_deleted',
|
||||
'session_workspace_updated',
|
||||
'chat_created',
|
||||
'chat_updated',
|
||||
'chat_archived',
|
||||
'chat_unarchived',
|
||||
'chat_deleted',
|
||||
'project_created',
|
||||
'project_archived',
|
||||
'project_unarchived',
|
||||
'project_updated',
|
||||
'project_deleted',
|
||||
] as const;
|
||||
@@ -1,6 +1,7 @@
|
||||
import { useEffect, useRef, useState } from 'react';
|
||||
import { toast } from 'sonner';
|
||||
import type { Message, WsFrame } from '@/api/types';
|
||||
import { WsFrameSchema } from '@/api/ws-frames';
|
||||
import { api } from '@/api/client';
|
||||
import { sessionEvents } from './sessionEvents';
|
||||
import { recordUsage } from './useChatThroughput';
|
||||
@@ -216,8 +217,28 @@ export function useSessionStream(sessionId: string | undefined) {
|
||||
setState((s) => ({ ...s, connected: true, error: null }));
|
||||
};
|
||||
ws.onmessage = (ev) => {
|
||||
// v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid
|
||||
// frames are logged and dropped. WsFrameSchema is the runtime guard;
|
||||
// the hand-maintained WsFrame type stays as the narrowed dev-time
|
||||
// shape (Zod uses OpaqueObject for nested types like Message[]). One
|
||||
// cast bridges the two.
|
||||
let raw: unknown;
|
||||
try {
|
||||
const frame = JSON.parse(typeof ev.data === 'string' ? ev.data : '') as WsFrame;
|
||||
raw = JSON.parse(typeof ev.data === 'string' ? ev.data : '');
|
||||
} catch (err) {
|
||||
console.warn('bad ws frame (parse)', err);
|
||||
return;
|
||||
}
|
||||
const validated = WsFrameSchema.safeParse(raw);
|
||||
if (!validated.success) {
|
||||
console.error('ws-frame-validation-failed (session channel)', {
|
||||
frame_type: (raw as { type?: unknown })?.type,
|
||||
errors: validated.error.flatten(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const frame = validated.data as unknown as WsFrame;
|
||||
// v1.11: on a compaction completion, re-fetch the message list so
|
||||
// the new summary row + the cohort of compacted_at-stamped older
|
||||
// rows render correctly. We dispatch the fresh list as a synthetic
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { useEffect } from 'react';
|
||||
import { WsFrameSchema } from '@/api/ws-frames';
|
||||
import { sessionEvents } from './sessionEvents';
|
||||
import { createWsReconnectToast } from './wsReconnectToast';
|
||||
|
||||
@@ -38,14 +39,33 @@ export function useUserEvents(): void {
|
||||
};
|
||||
|
||||
ws.onmessage = (ev) => {
|
||||
// v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid
|
||||
// frames are logged and dropped instead of dispatched onto the
|
||||
// sessionEvents bus where a stale or wrong shape would silently
|
||||
// corrupt sidebar / chat state.
|
||||
let raw: unknown;
|
||||
try {
|
||||
const parsed: unknown = JSON.parse(ev.data);
|
||||
if (parsed && typeof (parsed as { type?: unknown }).type === 'string') {
|
||||
sessionEvents.emit(parsed as import('./sessionEvents').SessionEvent);
|
||||
}
|
||||
raw = JSON.parse(ev.data);
|
||||
} catch (err) {
|
||||
console.warn('useUserEvents: failed to parse frame', err);
|
||||
return;
|
||||
}
|
||||
const validated = WsFrameSchema.safeParse(raw);
|
||||
if (!validated.success) {
|
||||
console.error('ws-frame-validation-failed (user channel)', {
|
||||
frame_type: (raw as { type?: unknown })?.type,
|
||||
errors: validated.error.flatten(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
// Bridge cast: Zod's union is broader than SessionEvent (it includes
|
||||
// per-session-channel frames too, which never arrive on the user
|
||||
// channel). sessionEvents.emit only dispatches frames whose type
|
||||
// appears in SessionEvent; the narrowing happens via the existing
|
||||
// useSidebar.ts applyEvent switch.
|
||||
sessionEvents.emit(
|
||||
validated.data as unknown as import('./sessionEvents').SessionEvent,
|
||||
);
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
|
||||
3
pnpm-lock.yaml
generated
3
pnpm-lock.yaml
generated
@@ -157,6 +157,9 @@ importers:
|
||||
tw-animate-css:
|
||||
specifier: ^1.4.0
|
||||
version: 1.4.0
|
||||
zod:
|
||||
specifier: ^3.23.8
|
||||
version: 3.25.76
|
||||
devDependencies:
|
||||
'@tailwindcss/postcss':
|
||||
specifier: ^4.3.0
|
||||
|
||||
Reference in New Issue
Block a user