import type { FastifyInstance } from 'fastify'; import WebSocket from 'ws'; import type { FleetState, HostState } from '../services/fleet-state.js'; import type { DeltaEmitter } from '../index.js'; import type { LogRelay } from '../services/log-relay.js'; /** * WS endpoint: /api/ws/control * * On join: send snapshot carrying current fleet state + seqs. * B6: After snapshot, replay in-memory log tail for late joiners. * On delta: forward seq-stamped deltas to subscribers. * * Client rule: buffer pre-snapshot deltas, replay after snapshot applying only * seq > snapshot_seq. On service restart, rebuild fleet state from DB before * serving snapshots. */ export function registerControlWebSocket( app: FastifyInstance, fleet: FleetState, emitter: DeltaEmitter, logRelay: LogRelay | null = null, ): void { app.get('/api/ws/control', { websocket: true }, (socket, req) => { const fleetState = fleet; const snapshot = buildSnapshot(fleetState); // B4 fix: send snapshot at top level matching ControlFleetFrame Zod schema. const maxSeq = snapshot.hosts.reduce((max, h) => Math.max(max, h.seq), 0); socket.send(JSON.stringify({ type: 'control_fleet' as const, seq: maxSeq, hosts: snapshot.hosts, })); // B6: Replay in-memory log tail for late joiners. if (logRelay && socket.readyState === WebSocket.OPEN) { const tails = logRelay.getAllTails(); for (const entry of tails) { socket.send(JSON.stringify({ type: 'control_log' as const, seq: maxSeq, // tail lines don't carry per-host seq; use snapshot seq providerId: entry.providerId, source: entry.source, line: entry.line, })); } } // B3 fix: subscribe to delta emitter so WS clients receive live updates. const unsub = emitter.subscribe((delta: unknown) => { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify(delta)); } }); const heartbeat = setInterval(() => { if (socket.readyState !== WebSocket.OPEN) { clearInterval(heartbeat); return; } socket.send(JSON.stringify({ type: 'ping' as const })); }, 30_000); socket.on('close', () => { clearInterval(heartbeat); unsub(); }); socket.on('error', () => { clearInterval(heartbeat); unsub(); }); }); } /** * Build a snapshot from the in-memory fleet state. * On restart, this is rebuilt from DB before serving snapshots. */ function buildSnapshot(fleet: FleetState): { hosts: Array<{ providerId: string; liveness: 'connected' | 'reconnecting' | 'down'; lastSeenAt: string | null; seq: number; models: Array<{ model: string; state: string; ts: string; ttlDeadline: string | null; inflight: number; }>; }> } { const hosts = Array.from(fleet.hosts.values()).map((h) => ({ providerId: h.providerId, liveness: h.liveness, lastSeenAt: h.lastSeenAt?.toISOString() ?? null, seq: h.seq, models: Array.from(h.models.values()).map((m) => ({ model: m.model, state: m.state, ts: m.ts.toISOString(), ttlDeadline: m.ttlDeadline?.toISOString() ?? null, inflight: m.inflight, })), })); return { hosts }; }