import type { FastifyBaseLogger } from 'fastify'; import { WsFrameSchema, type WsFrame } from '../types/ws-frames.js'; export type Frame = Record & { type: string }; export type Listener = (frame: Frame) => void; export interface Broker { publish(sessionId: string, frame: Frame): void; subscribe(sessionId: string, listener: Listener): () => void; publishUser(user: string, frame: Frame): void; subscribeUser(user: string, listener: Listener): () => void; // v1.13.11-a: typed publish wrappers. Validate against WsFrameSchema and // delegate to publish / publishUser on success; log + drop on failure // (fail-closed). Existing publish / publishUser callers stay legal — they // get converted to the typed variant in v1.13.11-b. publishFrame(sessionId: string, frame: WsFrame): void; publishUserFrame(user: string, frame: WsFrame): void; } export function createBroker(log?: FastifyBaseLogger): Broker { const topics = new Map>(); const userTopics = new Map>(); function publishTo(map: Map>, key: string, frame: Frame): void { const set = map.get(key); if (!set) return; for (const listener of set) { try { listener(frame); } catch { // ignore listener errors so one bad subscriber doesn't break the rest } } } function subscribeTo(map: Map>, key: string, listener: Listener): () => void { let set = map.get(key); if (!set) { set = new Set(); map.set(key, set); } set.add(listener); return () => { const s = map.get(key); if (!s) return; s.delete(listener); if (s.size === 0) map.delete(key); }; } // v1.13.11-a: shared validation guard. Returns the parsed/typed frame on // success, or null on failure (after logging). Brief mandates fail-closed // semantics: invalid frames don't reach subscribers; throwing here could // cascade into stream-phase aborts which v1.13.7 already had to defend // against, so log + drop is the right shape. function validate(channel: 'session' | 'user', key: string, frame: WsFrame): WsFrame | null { const parsed = WsFrameSchema.safeParse(frame); if (parsed.success) return parsed.data; const frameType = (frame as { type?: unknown })?.type; const errors = parsed.error.flatten(); if (log) { log.error( { channel, key, frame_type: frameType, errors }, 'ws-frame-validation-failed: dropping invalid frame', ); } else { // Fallback for callers that didn't pass a logger (e.g. unit tests). console.error('ws-frame-validation-failed', { channel, key, frame_type: frameType, errors }); } return null; } return { publish(sessionId, frame) { publishTo(topics, sessionId, frame); }, subscribe(sessionId, listener) { return subscribeTo(topics, sessionId, listener); }, publishUser(user, frame) { publishTo(userTopics, user, frame); }, subscribeUser(user, listener) { return subscribeTo(userTopics, user, listener); }, publishFrame(sessionId, frame) { const valid = validate('session', sessionId, frame); if (!valid) return; publishTo(topics, sessionId, valid as Frame); }, publishUserFrame(user, frame) { const valid = validate('user', user, frame); if (!valid) return; publishTo(userTopics, user, valid as Frame); }, }; }