First half of the WebSocket-frame-typing batch (split per recon — total
scope was ~535 LoC, larger than the roadmap's ~300 estimate, so the
server-side publish-site conversion lands separately in v1.13.11-b).
Phase A scope:
(1) apps/server/src/types/ws-frames.ts (NEW) — Zod schemas for all 27
wire-format WS frame types. Discriminated union (WsFrameSchema) plus
KNOWN_FRAME_TYPES const for diagnostic lookup. UUIDs are z.string().
uuid(); model-emitted tool_call_id stays z.string().min(1) since OpenAI-
compatible APIs emit "call_<random>" not UUID. Per-kind payload narrowing
(tool args, message_parts payloads) intentionally stays z.unknown() —
frame-level drift detection is the goal; deep payload validation is
follow-up work.
(2) apps/web/src/api/ws-frames.ts (NEW) — byte-identical mirror of the
authoritative server file. No path alias from web→server in the existing
tsconfig setup; sync-by-hand was chosen over a new packages/shared/ dir.
A ws-frames.test.ts test asserts the two files match.
(3) apps/server/src/services/broker.ts — adds publishFrame() and
publishUserFrame() methods to the Broker interface. Both validate via
WsFrameSchema and fail-closed: log + drop on invalid. createBroker now
accepts an optional FastifyBaseLogger so validation failures land in
the pino stream (with console.error fallback for unit tests). The
existing publish() / publishUser() raw methods stay legal — they get
converted to the typed variants in v1.13.11-b.
(4) apps/web/src/hooks/useSessionStream.ts + useUserEvents.ts — wrap
ws.onmessage with WsFrameSchema.safeParse. Fail-closed: invalid frames
log + return without dispatching. Hand-maintained WsFrame and
SessionEvent types stay in place; one cast bridges Zod-typed → narrowed
shape (Zod uses OpaqueObject for nested Message[] / WorkspacePane[] etc.,
which are dev-time-narrowed via the existing hand-maintained types).
(5) apps/web/package.json — adds zod ^3.23.8 as a direct dep. Was a
transitive dep via ai-sdk / postgres; promotion makes the import legal.
(6) Tests: 15 new in ws-frames.test.ts covering happy-path per major
frame type, drift-catchers (unknown type, invalid enum, non-UUID, negative
tokens), parts-authoritative read variants, the mirror-file diff check,
and four broker fail-closed scenarios. 219/219 server tests pass (was
204; +15 new).
Two recon corrections to the dispatch brief, both flagged before
implementation:
- No 'parts_appended' frame exists. The brief assumed one; the codebase
reads parts via the messages_with_parts view after message_complete
triggers a refetch. MessagePartSchema is therefore unused this batch.
- No 'tool_running' frame exists. The brief listed it as standalone; it
is in fact a 'chat_status' variant ({ status: 'tool_running' }), already
covered by ChatStatusFrame.
Smoke: clean container boot, no validation errors in the server log. Real
production frames pass validation (the schemas were derived from the
existing hand-maintained types in api/types.ts and sessionEvents.ts).
v1.13.11-b will follow immediately: convert all ~85 raw broker.publish /
ctx.publish call sites across 11 server files to publishFrame /
publishUserFrame. Mechanical edit; the wiring done here means the diff
in -b is just the call-site swaps.
~310 LoC across 9 files (4 new + 5 modified).
98 lines
3.4 KiB
TypeScript
98 lines
3.4 KiB
TypeScript
import type { FastifyBaseLogger } from 'fastify';
|
|
import { WsFrameSchema, type WsFrame } from '../types/ws-frames.js';
|
|
|
|
export type Frame = Record<string, unknown> & { type: string };
|
|
export type Listener = (frame: Frame) => void;
|
|
|
|
export interface Broker {
|
|
publish(sessionId: string, frame: Frame): void;
|
|
subscribe(sessionId: string, listener: Listener): () => void;
|
|
publishUser(user: string, frame: Frame): void;
|
|
subscribeUser(user: string, listener: Listener): () => void;
|
|
// v1.13.11-a: typed publish wrappers. Validate against WsFrameSchema and
|
|
// delegate to publish / publishUser on success; log + drop on failure
|
|
// (fail-closed). Existing publish / publishUser callers stay legal — they
|
|
// get converted to the typed variant in v1.13.11-b.
|
|
publishFrame(sessionId: string, frame: WsFrame): void;
|
|
publishUserFrame(user: string, frame: WsFrame): void;
|
|
}
|
|
|
|
export function createBroker(log?: FastifyBaseLogger): Broker {
|
|
const topics = new Map<string, Set<Listener>>();
|
|
const userTopics = new Map<string, Set<Listener>>();
|
|
|
|
function publishTo(map: Map<string, Set<Listener>>, key: string, frame: Frame): void {
|
|
const set = map.get(key);
|
|
if (!set) return;
|
|
for (const listener of set) {
|
|
try {
|
|
listener(frame);
|
|
} catch {
|
|
// ignore listener errors so one bad subscriber doesn't break the rest
|
|
}
|
|
}
|
|
}
|
|
|
|
function subscribeTo(map: Map<string, Set<Listener>>, key: string, listener: Listener): () => void {
|
|
let set = map.get(key);
|
|
if (!set) {
|
|
set = new Set();
|
|
map.set(key, set);
|
|
}
|
|
set.add(listener);
|
|
return () => {
|
|
const s = map.get(key);
|
|
if (!s) return;
|
|
s.delete(listener);
|
|
if (s.size === 0) map.delete(key);
|
|
};
|
|
}
|
|
|
|
// v1.13.11-a: shared validation guard. Returns the parsed/typed frame on
|
|
// success, or null on failure (after logging). Brief mandates fail-closed
|
|
// semantics: invalid frames don't reach subscribers; throwing here could
|
|
// cascade into stream-phase aborts which v1.13.7 already had to defend
|
|
// against, so log + drop is the right shape.
|
|
function validate(channel: 'session' | 'user', key: string, frame: WsFrame): WsFrame | null {
|
|
const parsed = WsFrameSchema.safeParse(frame);
|
|
if (parsed.success) return parsed.data;
|
|
const frameType = (frame as { type?: unknown })?.type;
|
|
const errors = parsed.error.flatten();
|
|
if (log) {
|
|
log.error(
|
|
{ channel, key, frame_type: frameType, errors },
|
|
'ws-frame-validation-failed: dropping invalid frame',
|
|
);
|
|
} else {
|
|
// Fallback for callers that didn't pass a logger (e.g. unit tests).
|
|
console.error('ws-frame-validation-failed', { channel, key, frame_type: frameType, errors });
|
|
}
|
|
return null;
|
|
}
|
|
|
|
return {
|
|
publish(sessionId, frame) {
|
|
publishTo(topics, sessionId, frame);
|
|
},
|
|
subscribe(sessionId, listener) {
|
|
return subscribeTo(topics, sessionId, listener);
|
|
},
|
|
publishUser(user, frame) {
|
|
publishTo(userTopics, user, frame);
|
|
},
|
|
subscribeUser(user, listener) {
|
|
return subscribeTo(userTopics, user, listener);
|
|
},
|
|
publishFrame(sessionId, frame) {
|
|
const valid = validate('session', sessionId, frame);
|
|
if (!valid) return;
|
|
publishTo(topics, sessionId, valid as Frame);
|
|
},
|
|
publishUserFrame(user, frame) {
|
|
const valid = validate('user', user, frame);
|
|
if (!valid) return;
|
|
publishTo(userTopics, user, valid as Frame);
|
|
},
|
|
};
|
|
}
|