batch3 T3: broker user channel + /api/ws/user + project/session/inference emits

- broker.subscribeUser/publishUser via separate user topics map
- /api/ws/user WS route subscribes to the user channel
- projects/sessions POST/DELETE handlers emit lifecycle frames
- inference 3 terminal-state sites emit session_updated with RETURNING

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-15 15:06:31 +00:00
parent d88b3348a2
commit 8fc525eab9
7 changed files with 132 additions and 50 deletions

View File

@@ -37,25 +37,31 @@ async function main() {
return { status: dbOk ? 'ok' : 'degraded', db: dbOk }; return { status: dbOk ? 'ok' : 'degraded', db: dbOk };
}); });
registerProjectRoutes(app, sql, config); const broker = createBroker();
registerSessionRoutes(app, sql, config);
registerProjectRoutes(app, sql, config, broker);
registerSessionRoutes(app, sql, config, broker);
registerSettingsRoutes(app, sql); registerSettingsRoutes(app, sql);
registerModelRoutes(app, config); registerModelRoutes(app, config);
registerSidebarRoutes(app, sql); registerSidebarRoutes(app, sql);
registerPaneRoutes(app, sql); registerPaneRoutes(app, sql);
const broker = createBroker(); const inference = createInferenceRunner(
const inference = createInferenceRunner({ {
sql, sql,
config, config,
log: app.log, log: app.log,
publish: (sessionId, frame) => { publish: (sessionId, frame) => {
broker.publish(sessionId, frame as unknown as Record<string, unknown> & { type: string }); broker.publish(sessionId, frame as unknown as Record<string, unknown> & { type: string });
},
}, },
}); (user, frame) => {
broker.publishUser(user, frame as unknown as Record<string, unknown> & { type: string });
}
);
registerMessageRoutes(app, sql, { registerMessageRoutes(app, sql, {
enqueueInference: (sessionId, assistantId) => { enqueueInference: (sessionId, assistantId, user) => {
inference.enqueue(sessionId, assistantId); inference.enqueue(sessionId, assistantId, user);
}, },
publishUserMessage: (sessionId, userMessageId, content) => { publishUserMessage: (sessionId, userMessageId, content) => {
broker.publish(sessionId, { broker.publish(sessionId, {

View File

@@ -8,7 +8,7 @@ const SendBody = z.object({
}); });
interface MessageHandlers { interface MessageHandlers {
enqueueInference: (sessionId: string, assistantMessageId: string) => void; enqueueInference: (sessionId: string, assistantMessageId: string, user: string) => void;
publishUserMessage: ( publishUserMessage: (
sessionId: string, sessionId: string,
userMessageId: string, userMessageId: string,
@@ -76,7 +76,7 @@ export function registerMessageRoutes(
result.user_message_id, result.user_message_id,
parsed.data.content parsed.data.content
); );
handlers.enqueueInference(req.params.id, result.assistant_message_id); handlers.enqueueInference(req.params.id, result.assistant_message_id, req.user!);
reply.code(202); reply.code(202);
return result; return result;
@@ -132,7 +132,7 @@ export function registerMessageRoutes(
}); });
handlers.publishMessagesDeleted(sessionId, deletedIds); handlers.publishMessagesDeleted(sessionId, deletedIds);
handlers.enqueueInference(sessionId, newAssistantId); handlers.enqueueInference(sessionId, newAssistantId, req.user!);
reply.code(202); reply.code(202);
return { assistant_message_id: newAssistantId }; return { assistant_message_id: newAssistantId };

View File

@@ -4,6 +4,7 @@ import { realpath, stat, readdir, access } from 'node:fs/promises';
import { basename, resolve, sep } from 'node:path'; import { basename, resolve, sep } from 'node:path';
import type { Sql } from '../db.js'; import type { Sql } from '../db.js';
import type { Config } from '../config.js'; import type { Config } from '../config.js';
import type { Broker } from '../services/broker.js';
import type { Project, AvailableProject } from '../types/api.js'; import type { Project, AvailableProject } from '../types/api.js';
const AddProjectBody = z.object({ const AddProjectBody = z.object({
@@ -42,7 +43,8 @@ async function resolveProjectPath(
export function registerProjectRoutes( export function registerProjectRoutes(
app: FastifyInstance, app: FastifyInstance,
sql: Sql, sql: Sql,
config: Config config: Config,
broker: Broker
): void { ): void {
app.get('/api/projects', async () => { app.get('/api/projects', async () => {
const rows = await sql<Project[]>` const rows = await sql<Project[]>`
@@ -71,6 +73,7 @@ export function registerProjectRoutes(
VALUES (${name}, ${resolved.real}) VALUES (${name}, ${resolved.real})
RETURNING id, name, path, added_at, last_session_id RETURNING id, name, path, added_at, last_session_id
`; `;
broker.publishUser(req.user!, { type: 'project_created', project: row as unknown as Project });
reply.code(201); reply.code(201);
return row; return row;
} catch (err) { } catch (err) {
@@ -89,6 +92,7 @@ export function registerProjectRoutes(
reply.code(404); reply.code(404);
return { error: 'not found' }; return { error: 'not found' };
} }
broker.publishUser(req.user!, { type: 'project_deleted', project_id: id });
reply.code(204); reply.code(204);
return null; return null;
}); });

View File

@@ -2,6 +2,7 @@ import type { FastifyInstance } from 'fastify';
import { z } from 'zod'; import { z } from 'zod';
import type { Sql } from '../db.js'; import type { Sql } from '../db.js';
import type { Config } from '../config.js'; import type { Config } from '../config.js';
import type { Broker } from '../services/broker.js';
import type { Session } from '../types/api.js'; import type { Session } from '../types/api.js';
import { getSetting } from './settings.js'; import { getSetting } from './settings.js';
@@ -26,7 +27,8 @@ async function resolveDefaultModel(sql: Sql, config: Config): Promise<string> {
export function registerSessionRoutes( export function registerSessionRoutes(
app: FastifyInstance, app: FastifyInstance,
sql: Sql, sql: Sql,
config: Config config: Config,
broker: Broker
): void { ): void {
app.get<{ Params: { id: string } }>( app.get<{ Params: { id: string } }>(
'/api/projects/:id/sessions', '/api/projects/:id/sessions',
@@ -86,6 +88,11 @@ export function registerSessionRoutes(
`; `;
return session!; return session!;
}); });
broker.publishUser(req.user!, {
type: 'session_created',
session: row,
project_id: row.project_id,
});
reply.code(201); reply.code(201);
return row; return row;
} }
@@ -133,11 +140,16 @@ export function registerSessionRoutes(
app.delete<{ Params: { id: string } }>( app.delete<{ Params: { id: string } }>(
'/api/sessions/:id', '/api/sessions/:id',
async (req, reply) => { async (req, reply) => {
const result = await sql`DELETE FROM sessions WHERE id = ${req.params.id}`; const id = req.params.id;
if (result.count === 0) { const deleted = await sql<{ project_id: string }[]>`
DELETE FROM sessions WHERE id = ${id} RETURNING project_id
`;
if (deleted.length === 0) {
reply.code(404); reply.code(404);
return { error: 'not found' }; return { error: 'not found' };
} }
const project_id = deleted[0]!.project_id;
broker.publishUser(req.user!, { type: 'session_deleted', session_id: id, project_id });
reply.code(204); reply.code(204);
return null; return null;
} }

View File

@@ -43,4 +43,23 @@ export function registerWebSocket(
socket.on('error', () => unsubscribe()); socket.on('error', () => unsubscribe());
} }
); );
app.get('/api/ws/user', { websocket: true }, async (socket, req) => {
const user = req.user;
if (!user) {
socket.close(1008, 'unauthenticated');
return;
}
// No snapshot — user channel is purely live updates.
const unsubscribe = broker.subscribeUser(user, (frame) => {
if (socket.readyState !== socket.OPEN) return;
try {
socket.send(JSON.stringify(frame));
} catch (err) {
app.log.warn({ err, user }, 'user ws send failed');
}
});
socket.on('close', () => unsubscribe());
socket.on('error', () => unsubscribe());
});
} }

View File

@@ -4,35 +4,53 @@ export type Listener = (frame: Frame) => void;
export interface Broker { export interface Broker {
publish(sessionId: string, frame: Frame): void; publish(sessionId: string, frame: Frame): void;
subscribe(sessionId: string, listener: Listener): () => void; subscribe(sessionId: string, listener: Listener): () => void;
publishUser(user: string, frame: Frame): void;
subscribeUser(user: string, listener: Listener): () => void;
} }
export function createBroker(): Broker { export function createBroker(): Broker {
const topics = new Map<string, Set<Listener>>(); const topics = new Map<string, Set<Listener>>();
const userTopics = new Map<string, Set<Listener>>();
function publishTo(map: Map<string, Set<Listener>>, key: string, frame: Frame): void {
const set = map.get(key);
if (!set) return;
for (const listener of set) {
try {
listener(frame);
} catch {
// ignore listener errors so one bad subscriber doesn't break the rest
}
}
}
function subscribeTo(map: Map<string, Set<Listener>>, key: string, listener: Listener): () => void {
let set = map.get(key);
if (!set) {
set = new Set();
map.set(key, set);
}
set.add(listener);
return () => {
const s = map.get(key);
if (!s) return;
s.delete(listener);
if (s.size === 0) map.delete(key);
};
}
return { return {
publish(sessionId, frame) { publish(sessionId, frame) {
const set = topics.get(sessionId); publishTo(topics, sessionId, frame);
if (!set) return;
for (const listener of set) {
try {
listener(frame);
} catch {
// ignore listener errors so one bad subscriber doesn't break the rest
}
}
}, },
subscribe(sessionId, listener) { subscribe(sessionId, listener) {
let set = topics.get(sessionId); return subscribeTo(topics, sessionId, listener);
if (!set) { },
set = new Set(); publishUser(user, frame) {
topics.set(sessionId, set); publishTo(userTopics, user, frame);
} },
set.add(listener); subscribeUser(user, listener) {
return () => { return subscribeTo(userTopics, user, listener);
const s = topics.get(sessionId);
if (!s) return;
s.delete(listener);
if (s.size === 0) topics.delete(sessionId);
};
}, },
}; };
} }

View File

@@ -1,7 +1,7 @@
import type { FastifyBaseLogger } from 'fastify'; import type { FastifyBaseLogger } from 'fastify';
import type { Sql } from '../db.js'; import type { Sql } from '../db.js';
import type { Config } from '../config.js'; import type { Config } from '../config.js';
import type { Message, Project, Session, ToolCall } from '../types/api.js'; import type { Message, Project, Session, ToolCall, UserStreamFrame } from '../types/api.js';
import { ALL_TOOLS, TOOLS_BY_NAME, toolJsonSchemas } from './tools.js'; import { ALL_TOOLS, TOOLS_BY_NAME, toolJsonSchemas } from './tools.js';
import { PathScopeError, resolveProjectRoot } from './path_guard.js'; import { PathScopeError, resolveProjectRoot } from './path_guard.js';
import { maybeAutoNameSession } from './auto_name.js'; import { maybeAutoNameSession } from './auto_name.js';
@@ -86,6 +86,7 @@ export interface InferenceContext {
config: Config; config: Config;
log: FastifyBaseLogger; log: FastifyBaseLogger;
publish: FramePublisher; publish: FramePublisher;
publishUser: (frame: UserStreamFrame) => void;
} }
export function buildMessagesPayload( export function buildMessagesPayload(
@@ -426,7 +427,12 @@ async function runAssistantTurn(
finished_at = clock_timestamp() finished_at = clock_timestamp()
WHERE id = ${assistantMessageId} WHERE id = ${assistantMessageId}
`; `;
await ctx.sql`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; const [failSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
UPDATE sessions SET updated_at = clock_timestamp()
WHERE id = ${sessionId}
RETURNING project_id, name, updated_at
`;
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at });
ctx.publish(sessionId, { ctx.publish(sessionId, {
type: 'error', type: 'error',
message_id: assistantMessageId, message_id: assistantMessageId,
@@ -459,7 +465,12 @@ async function runAssistantTurn(
WHERE id = ${assistantMessageId} WHERE id = ${assistantMessageId}
RETURNING tokens_used, ctx_used, ctx_max, finished_at RETURNING tokens_used, ctx_used, ctx_max, finished_at
`; `;
await ctx.sql`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; const [toolSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
UPDATE sessions SET updated_at = clock_timestamp()
WHERE id = ${sessionId}
RETURNING project_id, name, updated_at
`;
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: toolSessRow!.project_id, name: toolSessRow!.name, updated_at: toolSessRow!.updated_at });
for (const tc of toolCalls) { for (const tc of toolCalls) {
ctx.publish(sessionId, { ctx.publish(sessionId, {
type: 'tool_call', type: 'tool_call',
@@ -531,7 +542,12 @@ async function runAssistantTurn(
WHERE id = ${assistantMessageId} WHERE id = ${assistantMessageId}
RETURNING tokens_used, ctx_used, ctx_max, finished_at RETURNING tokens_used, ctx_used, ctx_max, finished_at
`; `;
await ctx.sql`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`; const [completeSessRow] = await ctx.sql<{ project_id: string; name: string; updated_at: string }[]>`
UPDATE sessions SET updated_at = clock_timestamp()
WHERE id = ${sessionId}
RETURNING project_id, name, updated_at
`;
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: completeSessRow!.project_id, name: completeSessRow!.name, updated_at: completeSessRow!.updated_at });
ctx.publish(sessionId, { ctx.publish(sessionId, {
type: 'message_complete', type: 'message_complete',
message_id: assistantMessageId, message_id: assistantMessageId,
@@ -563,19 +579,26 @@ export async function runInference(
return runAssistantTurn(ctx, sessionId, assistantMessageId, 0); return runAssistantTurn(ctx, sessionId, assistantMessageId, 0);
} }
export function createInferenceRunner(ctx: InferenceContext) { export function createInferenceRunner(
ctx: Omit<InferenceContext, 'publishUser'>,
publishUserFn: (user: string, frame: UserStreamFrame) => void
) {
return { return {
enqueue(sessionId: string, assistantMessageId: string) { enqueue(sessionId: string, assistantMessageId: string, user: string) {
const callCtx: InferenceContext = {
...ctx,
publishUser: (frame) => publishUserFn(user, frame),
};
void (async () => { void (async () => {
try { try {
await runInference(ctx, sessionId, assistantMessageId); await runInference(callCtx, sessionId, assistantMessageId);
setImmediate(() => { setImmediate(() => {
void maybeAutoNameSession(ctx, sessionId).catch((err) => { void maybeAutoNameSession(callCtx, sessionId).catch((err) => {
ctx.log.warn({ err, sessionId }, 'auto-name failed'); callCtx.log.warn({ err, sessionId }, 'auto-name failed');
}); });
}); });
} catch (err) { } catch (err) {
ctx.log.error({ err }, 'unhandled inference error'); callCtx.log.error({ err }, 'unhandled inference error');
} }
})(); })();
}, },