From 8fc525eab9fc845fd761706a8f85ea2760b0de48 Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Fri, 15 May 2026 15:06:31 +0000 Subject: [PATCH] batch3 T3: broker user channel + /api/ws/user + project/session/inference emits - broker.subscribeUser/publishUser via separate user topics map - /api/ws/user WS route subscribes to the user channel - projects/sessions POST/DELETE handlers emit lifecycle frames - inference 3 terminal-state sites emit session_updated with RETURNING Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/server/src/index.ts | 30 ++++++++------ apps/server/src/routes/messages.ts | 6 +-- apps/server/src/routes/projects.ts | 6 ++- apps/server/src/routes/sessions.ts | 18 ++++++-- apps/server/src/routes/ws.ts | 19 +++++++++ apps/server/src/services/broker.ts | 60 +++++++++++++++++---------- apps/server/src/services/inference.ts | 43 ++++++++++++++----- 7 files changed, 132 insertions(+), 50 deletions(-) diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 2aaa1ff..3e7fa66 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -37,25 +37,31 @@ async function main() { return { status: dbOk ? 'ok' : 'degraded', db: dbOk }; }); - registerProjectRoutes(app, sql, config); - registerSessionRoutes(app, sql, config); + const broker = createBroker(); + + registerProjectRoutes(app, sql, config, broker); + registerSessionRoutes(app, sql, config, broker); registerSettingsRoutes(app, sql); registerModelRoutes(app, config); registerSidebarRoutes(app, sql); registerPaneRoutes(app, sql); - const broker = createBroker(); - const inference = createInferenceRunner({ - sql, - config, - log: app.log, - publish: (sessionId, frame) => { - broker.publish(sessionId, frame as unknown as Record & { type: string }); + const inference = createInferenceRunner( + { + sql, + config, + log: app.log, + publish: (sessionId, frame) => { + broker.publish(sessionId, frame as unknown as Record & { type: string }); + }, }, - }); + (user, frame) => { + broker.publishUser(user, frame as unknown as Record & { type: string }); + } + ); registerMessageRoutes(app, sql, { - enqueueInference: (sessionId, assistantId) => { - inference.enqueue(sessionId, assistantId); + enqueueInference: (sessionId, assistantId, user) => { + inference.enqueue(sessionId, assistantId, user); }, publishUserMessage: (sessionId, userMessageId, content) => { broker.publish(sessionId, { diff --git a/apps/server/src/routes/messages.ts b/apps/server/src/routes/messages.ts index 0c9f9e0..59f97f4 100644 --- a/apps/server/src/routes/messages.ts +++ b/apps/server/src/routes/messages.ts @@ -8,7 +8,7 @@ const SendBody = z.object({ }); interface MessageHandlers { - enqueueInference: (sessionId: string, assistantMessageId: string) => void; + enqueueInference: (sessionId: string, assistantMessageId: string, user: string) => void; publishUserMessage: ( sessionId: string, userMessageId: string, @@ -76,7 +76,7 @@ export function registerMessageRoutes( result.user_message_id, parsed.data.content ); - handlers.enqueueInference(req.params.id, result.assistant_message_id); + handlers.enqueueInference(req.params.id, result.assistant_message_id, req.user!); reply.code(202); return result; @@ -132,7 +132,7 @@ export function registerMessageRoutes( }); handlers.publishMessagesDeleted(sessionId, deletedIds); - handlers.enqueueInference(sessionId, newAssistantId); + handlers.enqueueInference(sessionId, newAssistantId, req.user!); reply.code(202); return { assistant_message_id: newAssistantId }; diff --git a/apps/server/src/routes/projects.ts b/apps/server/src/routes/projects.ts index 41c0844..6b34e75 100644 --- a/apps/server/src/routes/projects.ts +++ b/apps/server/src/routes/projects.ts @@ -4,6 +4,7 @@ import { realpath, stat, readdir, access } from 'node:fs/promises'; import { basename, resolve, sep } from 'node:path'; import type { Sql } from '../db.js'; import type { Config } from '../config.js'; +import type { Broker } from '../services/broker.js'; import type { Project, AvailableProject } from '../types/api.js'; const AddProjectBody = z.object({ @@ -42,7 +43,8 @@ async function resolveProjectPath( export function registerProjectRoutes( app: FastifyInstance, sql: Sql, - config: Config + config: Config, + broker: Broker ): void { app.get('/api/projects', async () => { const rows = await sql` @@ -71,6 +73,7 @@ export function registerProjectRoutes( VALUES (${name}, ${resolved.real}) RETURNING id, name, path, added_at, last_session_id `; + broker.publishUser(req.user!, { type: 'project_created', project: row as unknown as Project }); reply.code(201); return row; } catch (err) { @@ -89,6 +92,7 @@ export function registerProjectRoutes( reply.code(404); return { error: 'not found' }; } + broker.publishUser(req.user!, { type: 'project_deleted', project_id: id }); reply.code(204); return null; }); diff --git a/apps/server/src/routes/sessions.ts b/apps/server/src/routes/sessions.ts index 13cb747..4c34fb0 100644 --- a/apps/server/src/routes/sessions.ts +++ b/apps/server/src/routes/sessions.ts @@ -2,6 +2,7 @@ import type { FastifyInstance } from 'fastify'; import { z } from 'zod'; import type { Sql } from '../db.js'; import type { Config } from '../config.js'; +import type { Broker } from '../services/broker.js'; import type { Session } from '../types/api.js'; import { getSetting } from './settings.js'; @@ -26,7 +27,8 @@ async function resolveDefaultModel(sql: Sql, config: Config): Promise { export function registerSessionRoutes( app: FastifyInstance, sql: Sql, - config: Config + config: Config, + broker: Broker ): void { app.get<{ Params: { id: string } }>( '/api/projects/:id/sessions', @@ -86,6 +88,11 @@ export function registerSessionRoutes( `; return session!; }); + broker.publishUser(req.user!, { + type: 'session_created', + session: row, + project_id: row.project_id, + }); reply.code(201); return row; } @@ -133,11 +140,16 @@ export function registerSessionRoutes( app.delete<{ Params: { id: string } }>( '/api/sessions/:id', async (req, reply) => { - const result = await sql`DELETE FROM sessions WHERE id = ${req.params.id}`; - if (result.count === 0) { + const id = req.params.id; + const deleted = await sql<{ project_id: string }[]>` + DELETE FROM sessions WHERE id = ${id} RETURNING project_id + `; + if (deleted.length === 0) { reply.code(404); return { error: 'not found' }; } + const project_id = deleted[0]!.project_id; + broker.publishUser(req.user!, { type: 'session_deleted', session_id: id, project_id }); reply.code(204); return null; } diff --git a/apps/server/src/routes/ws.ts b/apps/server/src/routes/ws.ts index c5b1a00..994e5ec 100644 --- a/apps/server/src/routes/ws.ts +++ b/apps/server/src/routes/ws.ts @@ -43,4 +43,23 @@ export function registerWebSocket( socket.on('error', () => unsubscribe()); } ); + + app.get('/api/ws/user', { websocket: true }, async (socket, req) => { + const user = req.user; + if (!user) { + socket.close(1008, 'unauthenticated'); + return; + } + // No snapshot — user channel is purely live updates. + const unsubscribe = broker.subscribeUser(user, (frame) => { + if (socket.readyState !== socket.OPEN) return; + try { + socket.send(JSON.stringify(frame)); + } catch (err) { + app.log.warn({ err, user }, 'user ws send failed'); + } + }); + socket.on('close', () => unsubscribe()); + socket.on('error', () => unsubscribe()); + }); } diff --git a/apps/server/src/services/broker.ts b/apps/server/src/services/broker.ts index e9c0005..1cf9d96 100644 --- a/apps/server/src/services/broker.ts +++ b/apps/server/src/services/broker.ts @@ -4,35 +4,53 @@ export type Listener = (frame: Frame) => void; export interface Broker { publish(sessionId: string, frame: Frame): void; subscribe(sessionId: string, listener: Listener): () => void; + publishUser(user: string, frame: Frame): void; + subscribeUser(user: string, listener: Listener): () => void; } export function createBroker(): Broker { const topics = new Map>(); + const userTopics = new Map>(); + + function publishTo(map: Map>, key: string, frame: Frame): void { + const set = map.get(key); + if (!set) return; + for (const listener of set) { + try { + listener(frame); + } catch { + // ignore listener errors so one bad subscriber doesn't break the rest + } + } + } + + function subscribeTo(map: Map>, key: string, listener: Listener): () => void { + let set = map.get(key); + if (!set) { + set = new Set(); + map.set(key, set); + } + set.add(listener); + return () => { + const s = map.get(key); + if (!s) return; + s.delete(listener); + if (s.size === 0) map.delete(key); + }; + } + return { publish(sessionId, frame) { - const set = topics.get(sessionId); - if (!set) return; - for (const listener of set) { - try { - listener(frame); - } catch { - // ignore listener errors so one bad subscriber doesn't break the rest - } - } + publishTo(topics, sessionId, frame); }, subscribe(sessionId, listener) { - let set = topics.get(sessionId); - if (!set) { - set = new Set(); - topics.set(sessionId, set); - } - set.add(listener); - return () => { - const s = topics.get(sessionId); - if (!s) return; - s.delete(listener); - if (s.size === 0) topics.delete(sessionId); - }; + return subscribeTo(topics, sessionId, listener); + }, + publishUser(user, frame) { + publishTo(userTopics, user, frame); + }, + subscribeUser(user, listener) { + return subscribeTo(userTopics, user, listener); }, }; } diff --git a/apps/server/src/services/inference.ts b/apps/server/src/services/inference.ts index 363646e..4737fe8 100644 --- a/apps/server/src/services/inference.ts +++ b/apps/server/src/services/inference.ts @@ -1,7 +1,7 @@ import type { FastifyBaseLogger } from 'fastify'; import type { Sql } from '../db.js'; import type { Config } from '../config.js'; -import type { Message, Project, Session, ToolCall } from '../types/api.js'; +import type { Message, Project, Session, ToolCall, UserStreamFrame } from '../types/api.js'; import { ALL_TOOLS, TOOLS_BY_NAME, toolJsonSchemas } from './tools.js'; import { PathScopeError, resolveProjectRoot } from './path_guard.js'; import { maybeAutoNameSession } from './auto_name.js'; @@ -86,6 +86,7 @@ export interface InferenceContext { config: Config; log: FastifyBaseLogger; publish: FramePublisher; + publishUser: (frame: UserStreamFrame) => void; } export function buildMessagesPayload( @@ -426,7 +427,12 @@ async function runAssistantTurn( finished_at = clock_timestamp() WHERE id = ${assistantMessageId} `; - await ctx.sql`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; + const [failSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` + UPDATE sessions SET updated_at = clock_timestamp() + WHERE id = ${sessionId} + RETURNING project_id, name, updated_at + `; + ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at }); ctx.publish(sessionId, { type: 'error', message_id: assistantMessageId, @@ -459,7 +465,12 @@ async function runAssistantTurn( WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; - await ctx.sql`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; + const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` + UPDATE sessions SET updated_at = clock_timestamp() + WHERE id = ${sessionId} + RETURNING project_id, name, updated_at + `; + ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at }); for (const tc of toolCalls) { ctx.publish(sessionId, { type: 'tool_call', @@ -531,7 +542,12 @@ async function runAssistantTurn( WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at `; - await ctx.sql`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; + const [completeSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>` + UPDATE sessions SET updated_at = clock_timestamp() + WHERE id = ${sessionId} + RETURNING project_id, name, updated_at + `; + ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: completeSessRow!.project_id, name: completeSessRow!.name, updated_at: completeSessRow!.updated_at }); ctx.publish(sessionId, { type: 'message_complete', message_id: assistantMessageId, @@ -563,19 +579,26 @@ export async function runInference( return runAssistantTurn(ctx, sessionId, assistantMessageId, 0); } -export function createInferenceRunner(ctx: InferenceContext) { +export function createInferenceRunner( + ctx: Omit, + publishUserFn: (user: string, frame: UserStreamFrame) => void +) { return { - enqueue(sessionId: string, assistantMessageId: string) { + enqueue(sessionId: string, assistantMessageId: string, user: string) { + const callCtx: InferenceContext = { + ...ctx, + publishUser: (frame) => publishUserFn(user, frame), + }; void (async () => { try { - await runInference(ctx, sessionId, assistantMessageId); + await runInference(callCtx, sessionId, assistantMessageId); setImmediate(() => { - void maybeAutoNameSession(ctx, sessionId).catch((err) => { - ctx.log.warn({ err, sessionId }, 'auto-name failed'); + void maybeAutoNameSession(callCtx, sessionId).catch((err) => { + callCtx.log.warn({ err, sessionId }, 'auto-name failed'); }); }); } catch (err) { - ctx.log.error({ err }, 'unhandled inference error'); + callCtx.log.error({ err }, 'unhandled inference error'); } })(); },