Compare commits

...

1 Commits

Author SHA1 Message Date
bc376c878d v1.13.11-b: convert raw broker.publish call sites to typed publishFrame
Second half of the WebSocket-frame-typing batch. Phase A (8b568b3)
landed the schemas + frontend receive validation + publishFrame /
publishUserFrame wrappers. This commit converts the existing publish
call sites so every server-emitted WS frame now goes through Zod
validation at the broker boundary.

Conversion strategy: change once in the inference / skills adapters in
index.ts (so ctx.publish / ctx.publishUser propagate to publishFrame /
publishUserFrame for ALL ~50 inference + auto_name call sites in one
move), then bulk-replace the ~30 direct broker.publish* call sites in
the routes + compaction.

Files touched:
- index.ts: inference + skills route adapters now call publishFrame /
  publishUserFrame internally; raw broker.publishUser('default', ...)
  call in the stale-row sweeper also converted.
- routes/projects.ts (7 sites), routes/chats.ts (9 sites),
  routes/sessions.ts (8 sites): all broker.publishUser(...) → broker.
  publishUserFrame(...).
- services/compaction.ts (3 sites): 2 publishUser, 1 publish.

Real protocol drift surfaced by Zod, fixed in the same commit:

  services/compaction.ts:442 was publishing chat_status with status:
  'working' — the v1.12.1 chat_status widening (CLAUDE.md:55) dropped
  this enum value in favor of streaming|tool_running|waiting_for_input|
  idle|error. The compaction.ts site was missed during v1.12.1; the
  frame had been published with an unknown enum value ever since (the
  frontend useChatStatus quietly ignored it). Corrected to 'streaming'
  — compaction's LLM call has the same dot-state semantic as an
  inference turn. This is exactly the class of bug v1.13.11 exists to
  catch.

Schema relaxation: OpaqueObject (the bag type for nested entities like
Project / Chat / Session / WorkspacePane embedded in WS frames) was
z.object({}).passthrough(), which Zod outputs as {} & {[k:string]:
unknown}. The strict-typed entities don't have index signatures so
TypeScript rejected them at publishFrame call sites. Relaxed to
z.unknown() — runtime validation still accepts the value, dev-time
narrowing happens via the existing hand-maintained types. Trade-off:
frame-level drift detection stays sharp; nested-payload validation
goes to follow-up work as the brief intended.

Schema audit:
  grep -rn "broker\.publish(\|broker\.publishUser(" apps/server/src \
    --include="*.ts" | grep -v "broker.ts\|__tests__\|.bak"
  → 0 results. Every server publish goes through publishFrame /
  publishUserFrame. The remaining ctx.publish / ctx.publishUser sites
  in services/inference/* + services/auto_name.ts route through the
  index.ts adapter, which calls publishFrame internally.

Tests: 219/219 pass (unchanged from v1.13.11-a; the Phase B conversion
is mechanical and doesn't add test cases).

Smoke: clean container boot, no ws-frame-validation-failed entries
under normal traffic. Sidebar list refresh + agent picker open both
pass through useUserEvents without drops.

~70 LoC across 7 files. v1.13.11 closed.
2026-05-22 15:54:00 +00:00
7 changed files with 62 additions and 51 deletions

View File

@@ -101,7 +101,9 @@ async function main() {
config,
log: app.log,
publish: (sessionId, frame) => {
broker.publish(sessionId, frame as unknown as Record<string, unknown> & { type: string });
// v1.13.11-b: route through the typed publishFrame so the broker's
// Zod gate validates every inference frame before delivery.
broker.publishFrame(sessionId, frame as unknown as import('./types/ws-frames.js').WsFrame);
},
// v1.11: broker handle for compaction.process to publish 'compacted'
// frames on the per-session channel. Inference's regular publish path
@@ -110,7 +112,7 @@ async function main() {
broker,
},
(user, frame) => {
broker.publishUser(user, frame as unknown as Record<string, unknown> & { type: string });
broker.publishUserFrame(user, frame as unknown as import('./types/ws-frames.js').WsFrame);
}
);
registerMessageRoutes(app, sql, {
@@ -129,33 +131,33 @@ async function main() {
},
hasActiveInference: (chatId) => inference.hasActive(chatId),
publishUserMessage: (sessionId, chatId, userMessageId, content) => {
broker.publish(sessionId, {
broker.publishFrame(sessionId, {
type: 'message_started',
message_id: userMessageId,
chat_id: chatId,
role: 'user',
});
broker.publish(sessionId, {
broker.publishFrame(sessionId, {
type: 'delta',
message_id: userMessageId,
chat_id: chatId,
content,
});
broker.publish(sessionId, {
broker.publishFrame(sessionId, {
type: 'message_complete',
message_id: userMessageId,
chat_id: chatId,
});
},
publishMessagesDeleted: (sessionId, chatId, messageIds) => {
broker.publish(sessionId, {
broker.publishFrame(sessionId, {
type: 'messages_deleted',
message_ids: messageIds,
chat_id: chatId,
});
},
publishSessionFrame: (sessionId, frame) => {
broker.publish(sessionId, frame);
broker.publishFrame(sessionId, frame as import('./types/ws-frames.js').WsFrame);
},
});
registerSkillsRoutes(app, sql, {
@@ -163,26 +165,26 @@ async function main() {
inference.enqueue(sessionId, chatId, assistantId, user);
},
publishUserMessage: (sessionId, chatId, userMessageId, content) => {
broker.publish(sessionId, {
broker.publishFrame(sessionId, {
type: 'message_started',
message_id: userMessageId,
chat_id: chatId,
role: 'user',
});
broker.publish(sessionId, {
broker.publishFrame(sessionId, {
type: 'delta',
message_id: userMessageId,
chat_id: chatId,
content,
});
broker.publish(sessionId, {
broker.publishFrame(sessionId, {
type: 'message_complete',
message_id: userMessageId,
chat_id: chatId,
});
},
publishSessionFrame: (sessionId, frame) => {
broker.publish(sessionId, frame);
broker.publishFrame(sessionId, frame as import('./types/ws-frames.js').WsFrame);
},
});
registerWebSocket(app, sql, broker);
@@ -230,7 +232,7 @@ async function main() {
for (const row of rows) {
if (seenChats.has(row.chat_id)) continue;
seenChats.add(row.chat_id);
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_status',
chat_id: row.chat_id,
status: 'idle',

View File

@@ -102,7 +102,7 @@ export function registerChatRoutes(
VALUES (${req.params.id}, ${parsed.data.name ?? null}, 'open')
RETURNING id, session_id, name, status, created_at, updated_at
`;
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_created',
chat: chat!,
session_id: req.params.id,
@@ -132,7 +132,7 @@ export function registerChatRoutes(
return { error: 'chat not found' };
}
const chat = rows[0]!;
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_updated',
chat_id: chat.id,
session_id: chat.session_id,
@@ -162,7 +162,7 @@ export function registerChatRoutes(
`;
const ids = rows.map((r) => r.id);
for (const id of ids) {
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_archived',
chat_id: id,
session_id: req.params.id,
@@ -203,7 +203,7 @@ export function registerChatRoutes(
return { error: 'chat not found or already archived' };
}
const row = rows[0]!;
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_archived',
chat_id: row.id,
session_id: row.session_id,
@@ -226,7 +226,7 @@ export function registerChatRoutes(
return { error: 'chat not found or not archived' };
}
const chat = rows[0]!;
broker.publishUser('default', { type: 'chat_unarchived', chat });
broker.publishUserFrame('default', { type: 'chat_unarchived', chat });
return chat;
}
);
@@ -243,7 +243,7 @@ export function registerChatRoutes(
return { error: 'chat not found' };
}
const row = result[0]!;
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_deleted',
chat_id: row.id,
session_id: row.session_id,
@@ -338,7 +338,7 @@ export function registerChatRoutes(
return chat!;
});
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_created',
chat: newChat,
session_id: source.session_id,
@@ -400,13 +400,13 @@ export function registerChatRoutes(
reply.code(409);
return { error: 'message status changed mid-request' };
}
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_status',
chat_id: msg.chat_id,
status: 'idle',
at: new Date().toISOString(),
});
broker.publish(msg.session_id, {
broker.publishFrame(msg.session_id, {
type: 'message_complete',
message_id: msg.id,
chat_id: msg.chat_id,

View File

@@ -129,7 +129,7 @@ export function registerProjectRoutes(
RETURNING id, name, path, added_at, last_session_id, status, gitea_remote,
default_system_prompt, default_web_search_enabled
`;
broker.publishUser('default', { type: 'project_created', project: row as unknown as Project });
broker.publishUserFrame('default', { type: 'project_created', project: row as unknown as Project });
reply.code(201);
return {
project: row,
@@ -186,11 +186,11 @@ export function registerProjectRoutes(
`;
if (existing.length === 0) {
broker.publishUser('default', { type: 'project_created', project: row as unknown as Project });
broker.publishUserFrame('default', { type: 'project_created', project: row as unknown as Project });
reply.code(201);
} else {
// existing.status was 'archived' — row has been restored.
broker.publishUser('default', { type: 'project_unarchived', project: row as unknown as Project });
broker.publishUserFrame('default', { type: 'project_unarchived', project: row as unknown as Project });
reply.code(200);
}
return row;
@@ -243,7 +243,7 @@ export function registerProjectRoutes(
// v1.9: the project_updated frame still only carries id + name. Clients
// that need the new fields refetch via api.projects.list() — keeps the
// frame payload lean, per the locked recon decision (d).
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'project_updated',
project_id: project.id,
name: project.name,
@@ -260,7 +260,7 @@ export function registerProjectRoutes(
reply.code(404);
return { error: 'not found or already archived' };
}
broker.publishUser('default', { type: 'project_archived', project_id: req.params.id });
broker.publishUserFrame('default', { type: 'project_archived', project_id: req.params.id });
reply.code(204);
return null;
});
@@ -277,7 +277,7 @@ export function registerProjectRoutes(
return { error: 'not found or not archived' };
}
const project = rows[0]!;
broker.publishUser('default', { type: 'project_unarchived', project });
broker.publishUserFrame('default', { type: 'project_unarchived', project });
return project;
});
@@ -288,7 +288,7 @@ export function registerProjectRoutes(
reply.code(404);
return { error: 'not found' };
}
broker.publishUser('default', { type: 'project_deleted', project_id: id });
broker.publishUserFrame('default', { type: 'project_deleted', project_id: id });
reply.code(204);
return null;
});

View File

@@ -112,7 +112,7 @@ export function registerSessionRoutes(
`;
return session!;
});
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'session_created',
session: row,
project_id: row.project_id,
@@ -178,7 +178,7 @@ export function registerSessionRoutes(
}
const session = rows[0]!;
if (name !== undefined && session.name !== priorName) {
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'session_renamed',
session_id: session.id,
name: session.name,
@@ -188,7 +188,7 @@ export function registerSessionRoutes(
// (notably the SettingsPane open in another tab) can refetch and pick
// up the new fields. Frame stays lean (decision d) — payload is just
// ids + name + updated_at, the client refetches via api.sessions.get.
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'session_updated',
session_id: session.id,
project_id: session.project_id,
@@ -220,7 +220,7 @@ export function registerSessionRoutes(
return { error: 'session not found' };
}
const session = rows[0]!;
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'session_workspace_updated',
session_id: session.id,
workspace_panes: session.workspace_panes,
@@ -248,7 +248,7 @@ export function registerSessionRoutes(
`;
const ids = rows.map((r) => r.id);
for (const id of ids) {
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'session_archived',
session_id: id,
project_id: req.params.id,
@@ -289,7 +289,7 @@ export function registerSessionRoutes(
reply.code(404);
return { error: 'session not found or already archived' };
}
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'session_archived',
session_id: rows[0]!.id,
project_id: rows[0]!.project_id,
@@ -312,7 +312,7 @@ export function registerSessionRoutes(
return { error: 'session not found or not archived' };
}
const session = rows[0]!;
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'session_created',
session: session,
project_id: session.project_id,
@@ -334,7 +334,7 @@ export function registerSessionRoutes(
return { error: 'not found' };
}
const project_id = deleted[0]!.project_id;
broker.publishUser('default', { type: 'session_deleted', session_id: id, project_id });
broker.publishUserFrame('default', { type: 'session_deleted', session_id: id, project_id });
reply.code(204);
return null;
}

View File

@@ -431,15 +431,16 @@ export async function process(input: ProcessInput): Promise<void> {
'compaction: invoking model',
);
// 6a. Flip the chat dot amber for the duration of the LLM call + DB writes.
// Same { type: 'chat_status', status: 'working', at } shape inference.ts
// emits at runner enqueue. publishUser → broadcasts on the per-user channel
// (all devices / tabs see it) since chat_status is a user-channel frame in
// BooCode (see useChatStatus.ts, which is the consumer).
broker.publishUser('default', {
// 6a. Flip the chat dot for the duration of the LLM call + DB writes.
// v1.13.11-b: publish status='streaming' (the v1.12.1-widened replacement
// for the dropped 'working' value). Compaction's LLM call has the same
// semantic as an inference turn for dot-state purposes. The v1.12.1
// chat_status widening missed this site; v1.13.11's WsFrame Zod schema
// surfaced the drift via the unknown-enum-value check.
broker.publishUserFrame('default', {
type: 'chat_status',
chat_id: chatId,
status: 'working',
status: 'streaming',
at: new Date().toISOString(),
});
@@ -508,7 +509,7 @@ export async function process(input: ProcessInput): Promise<void> {
// Always restore the dot. Status='idle' (not 'error') even on failure —
// the caller logs/re-surfaces the error separately; the dot doesn't
// need to stay red across reloads for a transient compaction blip.
broker.publishUser('default', {
broker.publishUserFrame('default', {
type: 'chat_status',
chat_id: chatId,
status: 'idle',
@@ -522,7 +523,7 @@ export async function process(input: ProcessInput): Promise<void> {
// toast. Order matters: idle must precede 'compacted' so the dot is
// already green by the time the refetch toast appears.
if (succeeded) {
broker.publish(sessionId, {
broker.publishFrame(sessionId, {
type: 'compacted',
session_id: sessionId,
chat_id: chatId,

View File

@@ -46,9 +46,13 @@ const ToolCallShape = z.object({
});
// Free-form bags: opaque to the frame schema; deep validation is out of
// scope. passthrough preserves unknown keys so the consumer sees the full
// shape even when this schema doesn't enumerate every field.
const OpaqueObject = z.object({}).passthrough();
// scope for v1.13.11 (frame-level drift detection is the goal; per-kind
// payload narrowing is follow-up work). z.unknown() means the consumer
// must narrow before reading — TypeScript-side this is fine because every
// consumer already operates on the hand-maintained Project / Chat / Session
// / WorkspacePane types (the brief's "Don't strip existing types yet"
// rule), and the Zod-typed shape is only used at the publishFrame boundary.
const OpaqueObject = z.unknown();
// ---- per-session channel frames --------------------------------------------

View File

@@ -46,9 +46,13 @@ const ToolCallShape = z.object({
});
// Free-form bags: opaque to the frame schema; deep validation is out of
// scope. passthrough preserves unknown keys so the consumer sees the full
// shape even when this schema doesn't enumerate every field.
const OpaqueObject = z.object({}).passthrough();
// scope for v1.13.11 (frame-level drift detection is the goal; per-kind
// payload narrowing is follow-up work). z.unknown() means the consumer
// must narrow before reading — TypeScript-side this is fine because every
// consumer already operates on the hand-maintained Project / Chat / Session
// / WorkspacePane types (the brief's "Don't strip existing types yet"
// rule), and the Zod-typed shape is only used at the publishFrame boundary.
const OpaqueObject = z.unknown();
// ---- per-session channel frames --------------------------------------------