batch4: chats-in-sessions, force-send, /compact, right-rail file browser
Session 1:N Chat data model with backfill. Workspace switches to client-side multi-tab pane management. Right-rail file browser with float-over viewer and click-drag line selection replaces FileBrowserPane. Adds /compact streaming summarizer (respects compact markers in context builder), force-send (cancels in-flight, persists partial as 'cancelled', awaits cancellation completion via deferred Promise + 5s timeout), message queue, stop generation, chat auto-rename, session archive/unarchive with Closed Sessions section on repo landing page. CHECK constraints on sessions.status, messages.role, messages.status with KEEP IN SYNC comments tying to MESSAGE_ROLES / MESSAGE_STATUSES const arrays. Deletes dead pane routes/hook and the api.panes.* client block. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,31 +0,0 @@
|
||||
import type { FastifyInstance, FastifyRequest } from 'fastify';
|
||||
|
||||
declare module 'fastify' {
|
||||
interface FastifyRequest {
|
||||
user?: string;
|
||||
}
|
||||
}
|
||||
|
||||
const PUBLIC_PATHS = new Set<string>(['/api/health']);
|
||||
|
||||
export function registerAuth(app: FastifyInstance): void {
|
||||
app.addHook('onRequest', async (req, reply) => {
|
||||
if (!req.url.startsWith('/api')) return;
|
||||
if (PUBLIC_PATHS.has(req.routeOptions.url ?? req.url.split('?')[0]!)) return;
|
||||
|
||||
const header = req.headers['remote-user'];
|
||||
const user = Array.isArray(header) ? header[0] : header;
|
||||
if (!user || user.trim() === '') {
|
||||
reply.code(401).send({ error: 'unauthenticated' });
|
||||
return reply;
|
||||
}
|
||||
req.user = user.trim();
|
||||
});
|
||||
}
|
||||
|
||||
export function requireUser(req: FastifyRequest): string {
|
||||
if (!req.user) {
|
||||
throw new Error('user not set on request — auth hook must run first');
|
||||
}
|
||||
return req.user;
|
||||
}
|
||||
@@ -5,15 +5,15 @@ import { existsSync } from 'node:fs';
|
||||
import { resolve } from 'node:path';
|
||||
import { loadConfig } from './config.js';
|
||||
import { getSql, applySchema, pingDb, closeDb } from './db.js';
|
||||
import { registerAuth } from './auth.js';
|
||||
|
||||
import { registerProjectRoutes } from './routes/projects.js';
|
||||
import { registerSessionRoutes } from './routes/sessions.js';
|
||||
import { registerSettingsRoutes } from './routes/settings.js';
|
||||
import { registerMessageRoutes } from './routes/messages.js';
|
||||
import { registerChatRoutes } from './routes/chats.js';
|
||||
import { registerSidebarRoutes } from './routes/sidebar.js';
|
||||
import { registerWebSocket } from './routes/ws.js';
|
||||
import { registerModelRoutes } from './routes/models.js';
|
||||
import { registerPaneRoutes } from './routes/panes.js';
|
||||
import { createInferenceRunner } from './services/inference.js';
|
||||
import { createBroker } from './services/broker.js';
|
||||
|
||||
@@ -30,8 +30,6 @@ async function main() {
|
||||
|
||||
await app.register(fastifyWebsocket);
|
||||
|
||||
registerAuth(app);
|
||||
|
||||
app.get('/api/health', async () => {
|
||||
const dbOk = await pingDb(sql);
|
||||
return { status: dbOk ? 'ok' : 'degraded', db: dbOk };
|
||||
@@ -44,7 +42,7 @@ async function main() {
|
||||
registerSettingsRoutes(app, sql);
|
||||
registerModelRoutes(app, config);
|
||||
registerSidebarRoutes(app, sql);
|
||||
registerPaneRoutes(app, sql);
|
||||
registerChatRoutes(app, sql, broker);
|
||||
|
||||
const inference = createInferenceRunner(
|
||||
{
|
||||
@@ -60,29 +58,39 @@ async function main() {
|
||||
}
|
||||
);
|
||||
registerMessageRoutes(app, sql, {
|
||||
enqueueInference: (sessionId, assistantId, user) => {
|
||||
inference.enqueue(sessionId, assistantId, user);
|
||||
enqueueInference: (sessionId, chatId, assistantId, user) => {
|
||||
inference.enqueue(sessionId, chatId, assistantId, user);
|
||||
},
|
||||
publishUserMessage: (sessionId, userMessageId, content) => {
|
||||
enqueueCompact: (sessionId, chatId, compactId, user) => {
|
||||
inference.enqueueCompact(sessionId, chatId, compactId, user);
|
||||
},
|
||||
cancelInference: async (sessionId, chatId) => {
|
||||
return inference.cancel(sessionId, chatId);
|
||||
},
|
||||
publishUserMessage: (sessionId, chatId, userMessageId, content) => {
|
||||
broker.publish(sessionId, {
|
||||
type: 'message_started',
|
||||
message_id: userMessageId,
|
||||
chat_id: chatId,
|
||||
role: 'user',
|
||||
});
|
||||
broker.publish(sessionId, {
|
||||
type: 'delta',
|
||||
message_id: userMessageId,
|
||||
chat_id: chatId,
|
||||
content,
|
||||
});
|
||||
broker.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: userMessageId,
|
||||
chat_id: chatId,
|
||||
});
|
||||
},
|
||||
publishMessagesDeleted: (sessionId, messageIds) => {
|
||||
publishMessagesDeleted: (sessionId, chatId, messageIds) => {
|
||||
broker.publish(sessionId, {
|
||||
type: 'messages_deleted',
|
||||
message_ids: messageIds,
|
||||
chat_id: chatId,
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
147
apps/server/src/routes/chats.ts
Normal file
147
apps/server/src/routes/chats.ts
Normal file
@@ -0,0 +1,147 @@
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import { z } from 'zod';
|
||||
import type { Sql } from '../db.js';
|
||||
import type { Broker } from '../services/broker.js';
|
||||
import type { Chat, Message } from '../types/api.js';
|
||||
|
||||
const CreateBody = z.object({
|
||||
name: z.string().min(1).max(200).optional(),
|
||||
});
|
||||
|
||||
const PatchBody = z.object({
|
||||
name: z.string().min(1).max(200).optional(),
|
||||
status: z.enum(['open', 'closed']).optional(),
|
||||
});
|
||||
|
||||
export function registerChatRoutes(
|
||||
app: FastifyInstance,
|
||||
sql: Sql,
|
||||
broker: Broker
|
||||
): void {
|
||||
app.get<{ Params: { id: string } }>(
|
||||
'/api/sessions/:id/chats',
|
||||
async (req, reply) => {
|
||||
const session = await sql`SELECT id FROM sessions WHERE id = ${req.params.id}`;
|
||||
if (session.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'session not found' };
|
||||
}
|
||||
const rows = await sql<Chat[]>`
|
||||
SELECT id, session_id, name, status, created_at, updated_at
|
||||
FROM chats
|
||||
WHERE session_id = ${req.params.id}
|
||||
ORDER BY updated_at DESC
|
||||
`;
|
||||
return rows;
|
||||
}
|
||||
);
|
||||
|
||||
app.post<{ Params: { id: string } }>(
|
||||
'/api/sessions/:id/chats',
|
||||
async (req, reply) => {
|
||||
const parsed = CreateBody.safeParse(req.body ?? {});
|
||||
if (!parsed.success) {
|
||||
reply.code(400);
|
||||
return { error: 'invalid body', details: parsed.error.flatten() };
|
||||
}
|
||||
const session = await sql`SELECT id FROM sessions WHERE id = ${req.params.id}`;
|
||||
if (session.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'session not found' };
|
||||
}
|
||||
const [chat] = await sql<Chat[]>`
|
||||
INSERT INTO chats (session_id, name, status)
|
||||
VALUES (${req.params.id}, ${parsed.data.name ?? null}, 'open')
|
||||
RETURNING id, session_id, name, status, created_at, updated_at
|
||||
`;
|
||||
broker.publishUser('default', {
|
||||
type: 'chat_created',
|
||||
chat: chat!,
|
||||
session_id: req.params.id,
|
||||
});
|
||||
reply.code(201);
|
||||
return chat;
|
||||
}
|
||||
);
|
||||
|
||||
app.patch<{ Params: { id: string } }>(
|
||||
'/api/chats/:id',
|
||||
async (req, reply) => {
|
||||
const parsed = PatchBody.safeParse(req.body ?? {});
|
||||
if (!parsed.success) {
|
||||
reply.code(400);
|
||||
return { error: 'invalid body', details: parsed.error.flatten() };
|
||||
}
|
||||
const { name, status } = parsed.data;
|
||||
if (name === undefined && status === undefined) {
|
||||
reply.code(400);
|
||||
return { error: 'must provide name or status' };
|
||||
}
|
||||
const rows = await sql<Chat[]>`
|
||||
UPDATE chats
|
||||
SET
|
||||
name = COALESCE(${name ?? null}, name),
|
||||
status = COALESCE(${status ?? null}, status),
|
||||
updated_at = clock_timestamp()
|
||||
WHERE id = ${req.params.id}
|
||||
RETURNING id, session_id, name, status, created_at, updated_at
|
||||
`;
|
||||
if (rows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'chat not found' };
|
||||
}
|
||||
const chat = rows[0]!;
|
||||
if (status === 'closed') {
|
||||
broker.publishUser('default', {
|
||||
type: 'chat_closed',
|
||||
chat_id: chat.id,
|
||||
session_id: chat.session_id,
|
||||
});
|
||||
} else {
|
||||
broker.publishUser('default', {
|
||||
type: 'chat_updated',
|
||||
chat_id: chat.id,
|
||||
session_id: chat.session_id,
|
||||
name: chat.name,
|
||||
updated_at: chat.updated_at,
|
||||
});
|
||||
}
|
||||
return chat;
|
||||
}
|
||||
);
|
||||
|
||||
app.delete<{ Params: { id: string } }>(
|
||||
'/api/chats/:id',
|
||||
async (req, reply) => {
|
||||
const result = await sql<{ id: string; session_id: string }[]>`
|
||||
DELETE FROM chats WHERE id = ${req.params.id}
|
||||
RETURNING id, session_id
|
||||
`;
|
||||
if (result.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'chat not found' };
|
||||
}
|
||||
reply.code(204);
|
||||
return null;
|
||||
}
|
||||
);
|
||||
|
||||
app.get<{ Params: { id: string } }>(
|
||||
'/api/chats/:id/messages',
|
||||
async (req, reply) => {
|
||||
const chat = await sql`SELECT id FROM chats WHERE id = ${req.params.id}`;
|
||||
if (chat.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'chat not found' };
|
||||
}
|
||||
const rows = await sql<Message[]>`
|
||||
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
|
||||
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at
|
||||
FROM messages
|
||||
WHERE chat_id = ${req.params.id}
|
||||
ORDER BY created_at ASC, id ASC
|
||||
`;
|
||||
return rows;
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -1,21 +1,23 @@
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import { z } from 'zod';
|
||||
import type { Sql } from '../db.js';
|
||||
import type { Message, Session } from '../types/api.js';
|
||||
import { requireUser } from '../auth.js';
|
||||
import type { Chat, Message, Session } from '../types/api.js';
|
||||
|
||||
const SendBody = z.object({
|
||||
content: z.string().min(1).max(64_000),
|
||||
});
|
||||
|
||||
interface MessageHandlers {
|
||||
enqueueInference: (sessionId: string, assistantMessageId: string, user: string) => void;
|
||||
enqueueInference: (sessionId: string, chatId: string, assistantMessageId: string, user: string) => void;
|
||||
enqueueCompact: (sessionId: string, chatId: string, compactMessageId: string, user: string) => void;
|
||||
publishUserMessage: (
|
||||
sessionId: string,
|
||||
chatId: string,
|
||||
userMessageId: string,
|
||||
content: string
|
||||
) => void;
|
||||
publishMessagesDeleted: (sessionId: string, messageIds: string[]) => void;
|
||||
publishMessagesDeleted: (sessionId: string, chatId: string, messageIds: string[]) => void;
|
||||
cancelInference: (sessionId: string, chatId: string) => Promise<boolean>;
|
||||
}
|
||||
|
||||
export function registerMessageRoutes(
|
||||
@@ -32,7 +34,7 @@ export function registerMessageRoutes(
|
||||
return { error: 'session not found' };
|
||||
}
|
||||
const rows = await sql<Message[]>`
|
||||
SELECT id, session_id, role, content, tool_calls, tool_results, status, last_seq,
|
||||
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
|
||||
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at
|
||||
FROM messages
|
||||
WHERE session_id = ${req.params.id}
|
||||
@@ -43,7 +45,7 @@ export function registerMessageRoutes(
|
||||
);
|
||||
|
||||
app.post<{ Params: { id: string } }>(
|
||||
'/api/sessions/:id/messages',
|
||||
'/api/chats/:id/messages',
|
||||
async (req, reply) => {
|
||||
const parsed = SendBody.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
@@ -51,33 +53,39 @@ export function registerMessageRoutes(
|
||||
return { error: 'invalid body', details: parsed.error.flatten() };
|
||||
}
|
||||
|
||||
const session = await sql<Session[]>`SELECT id FROM sessions WHERE id = ${req.params.id}`;
|
||||
if (session.length === 0) {
|
||||
const chatRows = await sql<Chat[]>`
|
||||
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
|
||||
`;
|
||||
if (chatRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'session not found' };
|
||||
return { error: 'chat not found' };
|
||||
}
|
||||
const chat = chatRows[0]!;
|
||||
const sessionId = chat.session_id;
|
||||
|
||||
const result = await sql.begin(async (tx) => {
|
||||
const [userMsg] = await tx<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, role, content, status, created_at)
|
||||
VALUES (${req.params.id}, 'user', ${parsed.data.content}, 'complete', clock_timestamp())
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chat.id}, 'user', ${parsed.data.content}, 'complete', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
const [assistantMsg] = await tx<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, role, content, status, created_at)
|
||||
VALUES (${req.params.id}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
await tx`UPDATE sessions SET updated_at = NOW() WHERE id = ${req.params.id}`;
|
||||
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
|
||||
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`;
|
||||
return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id };
|
||||
});
|
||||
|
||||
handlers.publishUserMessage(
|
||||
req.params.id,
|
||||
sessionId,
|
||||
chat.id,
|
||||
result.user_message_id,
|
||||
parsed.data.content
|
||||
);
|
||||
handlers.enqueueInference(req.params.id, result.assistant_message_id, requireUser(req));
|
||||
handlers.enqueueInference(sessionId, chat.id, result.assistant_message_id, 'default');
|
||||
|
||||
reply.code(202);
|
||||
return result;
|
||||
@@ -85,14 +93,24 @@ export function registerMessageRoutes(
|
||||
);
|
||||
|
||||
app.post<{ Params: { id: string; message_id: string } }>(
|
||||
'/api/sessions/:id/messages/:message_id/regenerate',
|
||||
'/api/chats/:id/messages/:message_id/regenerate',
|
||||
async (req, reply) => {
|
||||
const { id: sessionId, message_id: targetId } = req.params;
|
||||
const { id: chatId, message_id: targetId } = req.params;
|
||||
|
||||
const chatRows = await sql<Chat[]>`
|
||||
SELECT id, session_id FROM chats WHERE id = ${chatId}
|
||||
`;
|
||||
if (chatRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'chat not found' };
|
||||
}
|
||||
const chat = chatRows[0]!;
|
||||
const sessionId = chat.session_id;
|
||||
|
||||
const target = await sql<{ id: string; role: string; status: string }[]>`
|
||||
SELECT id, role, status
|
||||
FROM messages
|
||||
WHERE session_id = ${sessionId} AND id = ${targetId}
|
||||
WHERE chat_id = ${chatId} AND id = ${targetId}
|
||||
`;
|
||||
if (target.length === 0) {
|
||||
reply.code(404);
|
||||
@@ -109,34 +127,141 @@ export function registerMessageRoutes(
|
||||
}
|
||||
|
||||
const { newAssistantId, deletedIds } = await sql.begin(async (tx) => {
|
||||
// Subquery keeps created_at in postgres at TIMESTAMPTZ µs precision.
|
||||
// Round-tripping through JS Date loses sub-ms precision and can pull
|
||||
// earlier rows (e.g. the triggering user message) into the >= bound.
|
||||
const deletedRows = await tx<{ id: string }[]>`
|
||||
DELETE FROM messages
|
||||
WHERE session_id = ${sessionId}
|
||||
WHERE chat_id = ${chatId}
|
||||
AND created_at >= (
|
||||
SELECT created_at FROM messages WHERE id = ${targetId}
|
||||
)
|
||||
RETURNING id
|
||||
`;
|
||||
const [row] = await tx<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
await tx`UPDATE sessions SET updated_at = NOW() WHERE id = ${sessionId}`;
|
||||
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
|
||||
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chatId}`;
|
||||
return {
|
||||
newAssistantId: row!.id,
|
||||
deletedIds: deletedRows.map((r) => r.id),
|
||||
};
|
||||
});
|
||||
|
||||
handlers.publishMessagesDeleted(sessionId, deletedIds);
|
||||
handlers.enqueueInference(sessionId, newAssistantId, requireUser(req));
|
||||
handlers.publishMessagesDeleted(sessionId, chatId, deletedIds);
|
||||
handlers.enqueueInference(sessionId, chatId, newAssistantId, 'default');
|
||||
|
||||
reply.code(202);
|
||||
return { assistant_message_id: newAssistantId };
|
||||
}
|
||||
);
|
||||
|
||||
app.post<{ Params: { id: string } }>(
|
||||
'/api/chats/:id/compact',
|
||||
async (req, reply) => {
|
||||
const chatRows = await sql<Chat[]>`
|
||||
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
|
||||
`;
|
||||
if (chatRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'chat not found' };
|
||||
}
|
||||
const chat = chatRows[0]!;
|
||||
const sessionId = chat.session_id;
|
||||
|
||||
const [compactMsg] = await sql<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, kind, status, created_at)
|
||||
VALUES (${sessionId}, ${chat.id}, 'system', '', 'compact', 'streaming', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
handlers.enqueueCompact(sessionId, chat.id, compactMsg!.id, 'default');
|
||||
|
||||
reply.code(202);
|
||||
return { compact_message_id: compactMsg!.id };
|
||||
}
|
||||
);
|
||||
|
||||
app.post<{ Params: { id: string } }>(
|
||||
'/api/chats/:id/stop',
|
||||
async (req, reply) => {
|
||||
const chatRows = await sql<Chat[]>`
|
||||
SELECT id, session_id FROM chats WHERE id = ${req.params.id}
|
||||
`;
|
||||
if (chatRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'chat not found' };
|
||||
}
|
||||
const chat = chatRows[0]!;
|
||||
|
||||
const cancelled = await handlers.cancelInference(chat.session_id, chat.id);
|
||||
if (!cancelled) {
|
||||
reply.code(409);
|
||||
return { error: 'no active generation to stop' };
|
||||
}
|
||||
|
||||
reply.code(200);
|
||||
return { stopped: true };
|
||||
}
|
||||
);
|
||||
|
||||
app.post<{ Params: { id: string } }>(
|
||||
'/api/chats/:id/force_send',
|
||||
async (req, reply) => {
|
||||
const parsed = SendBody.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
reply.code(400);
|
||||
return { error: 'invalid body', details: parsed.error.flatten() };
|
||||
}
|
||||
|
||||
const chatRows = await sql<Chat[]>`
|
||||
SELECT id, session_id FROM chats WHERE id = ${req.params.id} AND status = 'open'
|
||||
`;
|
||||
if (chatRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'chat not found' };
|
||||
}
|
||||
const chat = chatRows[0]!;
|
||||
const sessionId = chat.session_id;
|
||||
|
||||
// Await actual cancellation completion (catch block persists state).
|
||||
// 5s timeout guards against llama-swap stalls; if hit, proceed anyway.
|
||||
await Promise.race([
|
||||
handlers.cancelInference(sessionId, chat.id).then(() => undefined),
|
||||
new Promise<void>((_, rej) =>
|
||||
setTimeout(() => rej(new Error('cancel-timeout')), 5000)
|
||||
),
|
||||
]).catch((e: Error) => {
|
||||
if (e.message !== 'cancel-timeout') throw e;
|
||||
req.log.warn({ chatId: chat.id }, 'cancel timeout exceeded, proceeding with force-send');
|
||||
});
|
||||
|
||||
const result = await sql.begin(async (tx) => {
|
||||
const [userMsg] = await tx<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chat.id}, 'user', ${parsed.data.content}, 'complete', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
const [assistantMsg] = await tx<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chat.id}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
await tx`UPDATE sessions SET updated_at = clock_timestamp() WHERE id = ${sessionId}`;
|
||||
await tx`UPDATE chats SET updated_at = clock_timestamp() WHERE id = ${chat.id}`;
|
||||
return { user_message_id: userMsg!.id, assistant_message_id: assistantMsg!.id };
|
||||
});
|
||||
|
||||
handlers.publishUserMessage(
|
||||
sessionId,
|
||||
chat.id,
|
||||
result.user_message_id,
|
||||
parsed.data.content
|
||||
);
|
||||
handlers.enqueueInference(sessionId, chat.id, result.assistant_message_id, 'default');
|
||||
|
||||
reply.code(202);
|
||||
return result;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,217 +0,0 @@
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import type { TransactionSql } from 'postgres';
|
||||
import type { Sql } from '../db.js';
|
||||
import type { Pane, PaneCreateRequest, PaneUpdateRequest } from '../types/api.js';
|
||||
|
||||
const VALID_KINDS = new Set(['chat', 'file_browser']);
|
||||
const MAX_PANES = 5;
|
||||
|
||||
async function movePane(
|
||||
tx: TransactionSql,
|
||||
paneId: string,
|
||||
sid: string,
|
||||
oldPos: number,
|
||||
newPos: number
|
||||
): Promise<void> {
|
||||
if (oldPos === newPos) return;
|
||||
// Move target pane to a sentinel well outside the negate range [-MAX_PANES, -1]
|
||||
// so it never collides with negated rows during the shift steps.
|
||||
await tx`UPDATE session_panes SET position = -100 WHERE id = ${paneId}`;
|
||||
if (newPos > oldPos) {
|
||||
await tx`UPDATE session_panes SET position = -position
|
||||
WHERE session_id = ${sid} AND position > ${oldPos} AND position <= ${newPos}`;
|
||||
await tx`UPDATE session_panes SET position = -position - 1
|
||||
WHERE session_id = ${sid} AND position < 0 AND id != ${paneId}`;
|
||||
} else {
|
||||
await tx`UPDATE session_panes SET position = -position - 2
|
||||
WHERE session_id = ${sid} AND position >= ${newPos} AND position < ${oldPos}`;
|
||||
await tx`UPDATE session_panes SET position = -position - 1
|
||||
WHERE session_id = ${sid} AND position < 0 AND id != ${paneId}`;
|
||||
}
|
||||
await tx`UPDATE session_panes SET position = ${newPos} WHERE id = ${paneId}`;
|
||||
}
|
||||
|
||||
export function registerPaneRoutes(app: FastifyInstance, sql: Sql): void {
|
||||
// GET /api/sessions/:id/panes — list panes ordered by position ASC
|
||||
app.get<{ Params: { id: string } }>(
|
||||
'/api/sessions/:id/panes',
|
||||
async (req, reply) => {
|
||||
const sessionRows = await sql`SELECT id FROM sessions WHERE id = ${req.params.id}`;
|
||||
if (sessionRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'session not found' };
|
||||
}
|
||||
const panes = await sql<Pane[]>`
|
||||
SELECT id, session_id, position, kind, state, created_at
|
||||
FROM session_panes
|
||||
WHERE session_id = ${req.params.id}
|
||||
ORDER BY position ASC
|
||||
`;
|
||||
return { panes };
|
||||
}
|
||||
);
|
||||
|
||||
// POST /api/sessions/:id/panes — create a new pane
|
||||
app.post<{ Params: { id: string } }>(
|
||||
'/api/sessions/:id/panes',
|
||||
async (req, reply) => {
|
||||
const body = (req.body ?? {}) as PaneCreateRequest;
|
||||
const { kind, position } = body;
|
||||
|
||||
if (!kind || !VALID_KINDS.has(kind)) {
|
||||
reply.code(400);
|
||||
return { error: 'kind must be "chat" or "file_browser"' };
|
||||
}
|
||||
|
||||
const sessionRows = await sql`SELECT id FROM sessions WHERE id = ${req.params.id}`;
|
||||
if (sessionRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'session not found' };
|
||||
}
|
||||
|
||||
const sid = req.params.id;
|
||||
const state = {};
|
||||
|
||||
let insertError: string | null = null;
|
||||
const inserted = await sql.begin(async (tx) => {
|
||||
const countResult = await tx<{ n: number }[]>`
|
||||
SELECT COUNT(*)::int AS n FROM session_panes WHERE session_id = ${sid}
|
||||
`;
|
||||
const n = countResult[0]!.n;
|
||||
if (n >= MAX_PANES) {
|
||||
throw new Error('MAX_PANES_EXCEEDED');
|
||||
}
|
||||
let insertPos: number;
|
||||
if (position === undefined || position === null) {
|
||||
insertPos = n;
|
||||
} else {
|
||||
if (position < 0 || position > n) {
|
||||
throw new Error('OUT_OF_BOUNDS');
|
||||
}
|
||||
insertPos = position;
|
||||
}
|
||||
await tx`UPDATE session_panes SET position = -position - 1
|
||||
WHERE session_id = ${sid} AND position >= ${insertPos}`;
|
||||
const [row] = await tx<Pane[]>`
|
||||
INSERT INTO session_panes (session_id, position, kind, state)
|
||||
VALUES (${sid}, ${insertPos}, ${kind}, ${JSON.stringify(state)}::jsonb)
|
||||
RETURNING id, session_id, position, kind, state, created_at
|
||||
`;
|
||||
await tx`UPDATE session_panes SET position = -position
|
||||
WHERE session_id = ${sid} AND position < 0`;
|
||||
return row;
|
||||
}).catch((err: Error) => {
|
||||
insertError = err.message;
|
||||
return null;
|
||||
});
|
||||
|
||||
if (insertError === 'MAX_PANES_EXCEEDED') {
|
||||
reply.code(400);
|
||||
return { error: `session already has ${MAX_PANES} panes (maximum)` };
|
||||
}
|
||||
if (insertError === 'OUT_OF_BOUNDS') {
|
||||
reply.code(400);
|
||||
return { error: `position out of bounds` };
|
||||
}
|
||||
if (insertError) {
|
||||
reply.code(500);
|
||||
return { error: 'internal error' };
|
||||
}
|
||||
|
||||
reply.code(201);
|
||||
return inserted as Pane;
|
||||
}
|
||||
);
|
||||
|
||||
// PATCH /api/panes/:id — update state and/or position
|
||||
app.patch<{ Params: { id: string } }>(
|
||||
'/api/panes/:id',
|
||||
async (req, reply) => {
|
||||
const body = (req.body ?? {}) as PaneUpdateRequest;
|
||||
const { state, position } = body;
|
||||
|
||||
if (state === undefined && position === undefined) {
|
||||
reply.code(400);
|
||||
return { error: 'must provide at least one of: state, position' };
|
||||
}
|
||||
|
||||
const paneRows = await sql<Pane[]>`
|
||||
SELECT id, session_id, position, kind, state, created_at
|
||||
FROM session_panes WHERE id = ${req.params.id}
|
||||
`;
|
||||
if (paneRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'pane not found' };
|
||||
}
|
||||
const pane = paneRows[0]!;
|
||||
const sid = pane.session_id;
|
||||
const oldPos = pane.position;
|
||||
|
||||
// Apply position and/or state changes atomically
|
||||
let patchError: string | null = null;
|
||||
await sql.begin(async (tx) => {
|
||||
if (position !== undefined) {
|
||||
const countRows = await tx<{ n: number }[]>`
|
||||
SELECT COUNT(*)::int AS n FROM session_panes WHERE session_id = ${sid}
|
||||
`;
|
||||
const count = countRows[0]?.n ?? 0;
|
||||
if (position < 0 || position >= count) {
|
||||
throw `position must be between 0 and ${count - 1}`;
|
||||
}
|
||||
}
|
||||
if (position !== undefined && position !== oldPos) {
|
||||
await movePane(tx, req.params.id, sid, oldPos, position);
|
||||
}
|
||||
if (state !== undefined) {
|
||||
await tx`
|
||||
UPDATE session_panes SET state = ${JSON.stringify(state)}::jsonb
|
||||
WHERE id = ${req.params.id}
|
||||
`;
|
||||
}
|
||||
}).catch((err: unknown) => {
|
||||
if (typeof err === 'string') {
|
||||
patchError = err;
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
if (patchError !== null) {
|
||||
reply.code(400);
|
||||
return { error: patchError };
|
||||
}
|
||||
|
||||
const [updated] = await sql<Pane[]>`
|
||||
SELECT id, session_id, position, kind, state, created_at
|
||||
FROM session_panes WHERE id = ${req.params.id}
|
||||
`;
|
||||
return updated as Pane;
|
||||
}
|
||||
);
|
||||
|
||||
// DELETE /api/panes/:id — delete a pane, shift remaining down
|
||||
app.delete<{ Params: { id: string } }>(
|
||||
'/api/panes/:id',
|
||||
async (req, reply) => {
|
||||
const paneRows = await sql<{ id: string; session_id: string; position: number }[]>`
|
||||
SELECT id, session_id, position FROM session_panes WHERE id = ${req.params.id}
|
||||
`;
|
||||
if (paneRows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'pane not found' };
|
||||
}
|
||||
const { session_id: sid, position: P } = paneRows[0]!;
|
||||
|
||||
await sql.begin(async (tx) => {
|
||||
await tx`DELETE FROM session_panes WHERE id = ${req.params.id}`;
|
||||
await tx`UPDATE session_panes SET position = -position
|
||||
WHERE session_id = ${sid} AND position > ${P}`;
|
||||
await tx`UPDATE session_panes SET position = -position - 1
|
||||
WHERE session_id = ${sid} AND position < 0`;
|
||||
});
|
||||
|
||||
reply.code(204);
|
||||
return null;
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import type { Sql } from '../db.js';
|
||||
import type { Config } from '../config.js';
|
||||
import type { Broker } from '../services/broker.js';
|
||||
import type { Project, AvailableProject } from '../types/api.js';
|
||||
import { requireUser } from '../auth.js';
|
||||
import { resolveProjectRoot, PathScopeError } from '../services/path_guard.js';
|
||||
import { listDir, viewFile } from '../services/file_ops.js';
|
||||
import { getProjectFiles } from '../services/file_index.js';
|
||||
@@ -77,7 +76,7 @@ export function registerProjectRoutes(
|
||||
VALUES (${name}, ${resolved.real})
|
||||
RETURNING id, name, path, added_at, last_session_id
|
||||
`;
|
||||
broker.publishUser(requireUser(req), { type: 'project_created', project: row as unknown as Project });
|
||||
broker.publishUser('default', { type: 'project_created', project: row as unknown as Project });
|
||||
reply.code(201);
|
||||
return row;
|
||||
} catch (err) {
|
||||
@@ -96,7 +95,7 @@ export function registerProjectRoutes(
|
||||
reply.code(404);
|
||||
return { error: 'not found' };
|
||||
}
|
||||
broker.publishUser(requireUser(req), { type: 'project_deleted', project_id: id });
|
||||
broker.publishUser('default', { type: 'project_deleted', project_id: id });
|
||||
reply.code(204);
|
||||
return null;
|
||||
});
|
||||
|
||||
@@ -5,7 +5,6 @@ import type { Config } from '../config.js';
|
||||
import type { Broker } from '../services/broker.js';
|
||||
import type { Session } from '../types/api.js';
|
||||
import { getSetting } from './settings.js';
|
||||
import { requireUser } from '../auth.js';
|
||||
|
||||
const CreateBody = z.object({
|
||||
name: z.string().min(1).max(200).optional(),
|
||||
@@ -31,7 +30,7 @@ export function registerSessionRoutes(
|
||||
config: Config,
|
||||
broker: Broker
|
||||
): void {
|
||||
app.get<{ Params: { id: string } }>(
|
||||
app.get<{ Params: { id: string }; Querystring: { status?: string } }>(
|
||||
'/api/projects/:id/sessions',
|
||||
async (req, reply) => {
|
||||
const project = await sql`SELECT id FROM projects WHERE id = ${req.params.id}`;
|
||||
@@ -39,10 +38,11 @@ export function registerSessionRoutes(
|
||||
reply.code(404);
|
||||
return { error: 'project not found' };
|
||||
}
|
||||
const status = req.query.status === 'archived' ? 'archived' : 'open';
|
||||
const rows = await sql<Session[]>`
|
||||
SELECT id, project_id, name, model, system_prompt, created_at, updated_at
|
||||
SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at
|
||||
FROM sessions
|
||||
WHERE project_id = ${req.params.id}
|
||||
WHERE project_id = ${req.params.id} AND status = ${status}
|
||||
ORDER BY updated_at DESC
|
||||
`;
|
||||
return rows;
|
||||
@@ -81,15 +81,15 @@ export function registerSessionRoutes(
|
||||
const [session] = await tx<Session[]>`
|
||||
INSERT INTO sessions (project_id, name, model, system_prompt)
|
||||
VALUES (${req.params.id}, ${name}, ${model}, ${systemPrompt})
|
||||
RETURNING id, project_id, name, model, system_prompt, created_at, updated_at
|
||||
RETURNING id, project_id, name, model, system_prompt, status, created_at, updated_at
|
||||
`;
|
||||
await tx`
|
||||
INSERT INTO session_panes (session_id, position, kind, state)
|
||||
VALUES (${session!.id}, 0, 'chat', '{}'::jsonb)
|
||||
INSERT INTO chats (session_id, name, status)
|
||||
VALUES (${session!.id}, NULL, 'open')
|
||||
`;
|
||||
return session!;
|
||||
});
|
||||
broker.publishUser(requireUser(req), {
|
||||
broker.publishUser('default', {
|
||||
type: 'session_created',
|
||||
session: row,
|
||||
project_id: row.project_id,
|
||||
@@ -101,7 +101,7 @@ export function registerSessionRoutes(
|
||||
|
||||
app.get<{ Params: { id: string } }>('/api/sessions/:id', async (req, reply) => {
|
||||
const rows = await sql<Session[]>`
|
||||
SELECT id, project_id, name, model, system_prompt, created_at, updated_at
|
||||
SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at
|
||||
FROM sessions WHERE id = ${req.params.id}
|
||||
`;
|
||||
if (rows.length === 0) {
|
||||
@@ -128,7 +128,7 @@ export function registerSessionRoutes(
|
||||
system_prompt = COALESCE(${system_prompt ?? null}, system_prompt),
|
||||
updated_at = clock_timestamp()
|
||||
WHERE id = ${req.params.id}
|
||||
RETURNING id, project_id, name, model, system_prompt, created_at, updated_at
|
||||
RETURNING id, project_id, name, model, system_prompt, status, created_at, updated_at
|
||||
`;
|
||||
if (rows.length === 0) {
|
||||
reply.code(404);
|
||||
@@ -138,6 +138,51 @@ export function registerSessionRoutes(
|
||||
}
|
||||
);
|
||||
|
||||
app.post<{ Params: { id: string } }>(
|
||||
'/api/sessions/:id/archive',
|
||||
async (req, reply) => {
|
||||
const rows = await sql<{ id: string; project_id: string }[]>`
|
||||
UPDATE sessions SET status = 'archived', updated_at = clock_timestamp()
|
||||
WHERE id = ${req.params.id} AND status = 'open'
|
||||
RETURNING id, project_id
|
||||
`;
|
||||
if (rows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'session not found or already archived' };
|
||||
}
|
||||
broker.publishUser('default', {
|
||||
type: 'session_archived',
|
||||
session_id: rows[0]!.id,
|
||||
project_id: rows[0]!.project_id,
|
||||
});
|
||||
reply.code(204);
|
||||
return null;
|
||||
}
|
||||
);
|
||||
|
||||
app.post<{ Params: { id: string } }>(
|
||||
'/api/sessions/:id/unarchive',
|
||||
async (req, reply) => {
|
||||
const rows = await sql<Session[]>`
|
||||
UPDATE sessions SET status = 'open', updated_at = clock_timestamp()
|
||||
WHERE id = ${req.params.id} AND status = 'archived'
|
||||
RETURNING id, project_id, name, model, system_prompt, status, created_at, updated_at
|
||||
`;
|
||||
if (rows.length === 0) {
|
||||
reply.code(404);
|
||||
return { error: 'session not found or not archived' };
|
||||
}
|
||||
const session = rows[0]!;
|
||||
broker.publishUser('default', {
|
||||
type: 'session_created',
|
||||
session: session,
|
||||
project_id: session.project_id,
|
||||
});
|
||||
reply.code(200);
|
||||
return session;
|
||||
}
|
||||
);
|
||||
|
||||
app.delete<{ Params: { id: string } }>(
|
||||
'/api/sessions/:id',
|
||||
async (req, reply) => {
|
||||
@@ -150,7 +195,7 @@ export function registerSessionRoutes(
|
||||
return { error: 'not found' };
|
||||
}
|
||||
const project_id = deleted[0]!.project_id;
|
||||
broker.publishUser(requireUser(req), { type: 'session_deleted', session_id: id, project_id });
|
||||
broker.publishUser('default', { type: 'session_deleted', session_id: id, project_id });
|
||||
reply.code(204);
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -20,14 +20,14 @@ export function registerSidebarRoutes(app: FastifyInstance, sql: Sql): void {
|
||||
sql<SidebarSession[]>`
|
||||
SELECT id, project_id, name, model, updated_at
|
||||
FROM sessions
|
||||
WHERE project_id = ${p.id}
|
||||
WHERE project_id = ${p.id} AND status = 'open'
|
||||
ORDER BY updated_at DESC
|
||||
LIMIT 6
|
||||
`,
|
||||
sql<{ n: number }[]>`
|
||||
SELECT COUNT(*)::int AS n
|
||||
FROM sessions
|
||||
WHERE project_id = ${p.id}
|
||||
WHERE project_id = ${p.id} AND status = 'open'
|
||||
`,
|
||||
]);
|
||||
return {
|
||||
|
||||
@@ -22,7 +22,7 @@ export function registerWebSocket(
|
||||
}
|
||||
|
||||
const messages = await sql<Message[]>`
|
||||
SELECT id, session_id, role, content, tool_calls, tool_results, status, last_seq,
|
||||
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
|
||||
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at
|
||||
FROM messages
|
||||
WHERE session_id = ${sessionId}
|
||||
@@ -44,15 +44,8 @@ export function registerWebSocket(
|
||||
}
|
||||
);
|
||||
|
||||
app.get('/api/ws/user', { websocket: true }, async (socket, req) => {
|
||||
const user = req.user;
|
||||
// defensive: global auth hook (auth.ts) already rejects unauthenticated /api/* requests;
|
||||
// keep the explicit check here to close the WS cleanly (1008) rather than throwing.
|
||||
if (!user) {
|
||||
socket.close(1008, 'unauthenticated');
|
||||
return;
|
||||
}
|
||||
// No snapshot — user channel is purely live updates.
|
||||
app.get('/api/ws/user', { websocket: true }, async (socket) => {
|
||||
const user = 'default';
|
||||
const unsubscribe = broker.subscribeUser(user, (frame) => {
|
||||
if (socket.readyState !== socket.OPEN) return;
|
||||
try {
|
||||
|
||||
@@ -21,11 +21,11 @@ CREATE INDEX IF NOT EXISTS idx_sessions_project ON sessions(project_id, updated_
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
|
||||
role TEXT NOT NULL CHECK (role IN ('user', 'assistant', 'tool')),
|
||||
role TEXT NOT NULL,
|
||||
content TEXT NOT NULL DEFAULT '',
|
||||
tool_calls JSONB,
|
||||
tool_results JSONB,
|
||||
status TEXT NOT NULL DEFAULT 'complete' CHECK (status IN ('streaming', 'complete', 'failed')),
|
||||
status TEXT NOT NULL DEFAULT 'complete',
|
||||
last_seq INT NOT NULL DEFAULT 0,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
@@ -47,6 +47,8 @@ CREATE TABLE IF NOT EXISTS settings (
|
||||
|
||||
INSERT INTO settings (key, value) VALUES ('default_model', '"qwen3.6-35b-a3b-mxfp4"') ON CONFLICT (key) DO NOTHING;
|
||||
|
||||
-- DEPRECATED: client-side pane state as of v1.2-batch4. Table retained per
|
||||
-- additive schema rule; no writes. Drop in a future destructive migration.
|
||||
CREATE TABLE IF NOT EXISTS session_panes (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
|
||||
@@ -66,3 +68,67 @@ FROM sessions s
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM session_panes p WHERE p.session_id = s.id
|
||||
);
|
||||
|
||||
-- v1.2: sessions.status (open | archived)
|
||||
ALTER TABLE sessions ADD COLUMN IF NOT EXISTS status TEXT NOT NULL DEFAULT 'open';
|
||||
|
||||
-- v1.2: chats table
|
||||
CREATE TABLE IF NOT EXISTS chats (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
|
||||
name TEXT,
|
||||
status TEXT NOT NULL DEFAULT 'open' CHECK (status IN ('open', 'closed')),
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_chats_session_status ON chats (session_id, status, updated_at DESC);
|
||||
|
||||
-- v1.2: messages.chat_id + messages.kind
|
||||
ALTER TABLE messages ADD COLUMN IF NOT EXISTS chat_id UUID REFERENCES chats(id) ON DELETE CASCADE;
|
||||
ALTER TABLE messages ADD COLUMN IF NOT EXISTS kind TEXT NOT NULL DEFAULT 'message';
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_chat ON messages (chat_id, created_at);
|
||||
|
||||
-- Backfill: one chat per existing session that has none yet
|
||||
INSERT INTO chats (session_id, name, status, created_at, updated_at)
|
||||
SELECT s.id, s.name, 'open', s.created_at, s.updated_at
|
||||
FROM sessions s
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM chats c WHERE c.session_id = s.id
|
||||
);
|
||||
|
||||
-- Backfill: link orphaned messages to their session's first chat
|
||||
UPDATE messages SET chat_id = (
|
||||
SELECT c.id FROM chats c WHERE c.session_id = messages.session_id ORDER BY c.created_at ASC LIMIT 1
|
||||
)
|
||||
WHERE chat_id IS NULL;
|
||||
|
||||
-- Enforce NOT NULL on chat_id once all rows are backfilled
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM information_schema.columns
|
||||
WHERE table_name = 'messages' AND column_name = 'chat_id' AND is_nullable = 'YES'
|
||||
) AND NOT EXISTS (
|
||||
SELECT 1 FROM messages WHERE chat_id IS NULL
|
||||
) THEN
|
||||
ALTER TABLE messages ALTER COLUMN chat_id SET NOT NULL;
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
-- v1.2.1: CHECK constraints for sessions.status and messages (role, status)
|
||||
-- KEEP IN SYNC: apps/server/src/types/api.ts (MESSAGE_ROLES, MESSAGE_STATUSES, SessionStatus)
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'sessions_status_chk') THEN
|
||||
ALTER TABLE sessions ADD CONSTRAINT sessions_status_chk
|
||||
CHECK (status IN ('open', 'archived'));
|
||||
END IF;
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'messages_role_chk') THEN
|
||||
ALTER TABLE messages ADD CONSTRAINT messages_role_chk
|
||||
CHECK (role IN ('user', 'assistant', 'system', 'tool'));
|
||||
END IF;
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'messages_status_chk') THEN
|
||||
ALTER TABLE messages ADD CONSTRAINT messages_status_chk
|
||||
CHECK (status IN ('streaming', 'complete', 'failed', 'cancelled'));
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
@@ -5,31 +5,12 @@ const NAMING_SYSTEM_PROMPT =
|
||||
|
||||
const MAX_TITLE_CHARS = 60;
|
||||
|
||||
// QWEN3 NON-STREAMING UTILITY-CALL PATTERN
|
||||
// ----------------------------------------
|
||||
// Qwen3-family chat templates default to chain-of-thought reasoning: the
|
||||
// model emits a long <think>…</think> block into `reasoning_content` and
|
||||
// only finalizes a real reply in `content`. For short utility calls
|
||||
// (naming, classification, routing, summarization) with a tight token
|
||||
// budget, the model burns the entire budget on reasoning and returns:
|
||||
// - content: ""
|
||||
// - reasoning_content: "Thinking Process: 1. ..." (mid-thought, truncated)
|
||||
// - finish_reason: "length"
|
||||
// Fix: pass `chat_template_kwargs: { enable_thinking: false }` to skip the
|
||||
// thinking block, and keep `max_tokens` low (~30 is plenty for a 4-word
|
||||
// title). The kwarg is a no-op for non-Qwen chat templates, so it's safe
|
||||
// to apply unconditionally for any short non-streaming model call.
|
||||
// Apply this same pattern to: fork-message (planned), agent-routing
|
||||
// (planned), web-search summarization (planned).
|
||||
|
||||
function cleanTitle(raw: string): string {
|
||||
let name = raw.trim();
|
||||
// Strip surrounding straight or smart quotes (one layer).
|
||||
const quotes = ['"', "'", '`', '‘', '’', '“', '”'];
|
||||
while (name.length >= 2 && quotes.includes(name[0]!) && quotes.includes(name[name.length - 1]!)) {
|
||||
name = name.slice(1, -1).trim();
|
||||
}
|
||||
// Drop a leading "Title:" prefix if the model added one despite instructions.
|
||||
name = name.replace(/^title\s*:\s*/i, '').trim();
|
||||
if (name.length > MAX_TITLE_CHARS) {
|
||||
name = name.slice(0, MAX_TITLE_CHARS).trim();
|
||||
@@ -46,13 +27,10 @@ interface NamingResponse {
|
||||
}>;
|
||||
}
|
||||
|
||||
// Some Qwen-family models emit "thinking" tokens into reasoning_content and
|
||||
// only finalize a real reply in content. Pull a sensible candidate string.
|
||||
function pickTitleSource(data: NamingResponse): string {
|
||||
const choice = data.choices?.[0]?.message;
|
||||
if (!choice) return '';
|
||||
if (choice.content && choice.content.trim().length > 0) return choice.content;
|
||||
// Fallback: try to extract a last-line title from reasoning, if present.
|
||||
const reasoning = choice.reasoning_content ?? '';
|
||||
if (reasoning.length === 0) return '';
|
||||
const lines = reasoning
|
||||
@@ -62,38 +40,44 @@ function pickTitleSource(data: NamingResponse): string {
|
||||
return lines[lines.length - 1] ?? '';
|
||||
}
|
||||
|
||||
export async function maybeAutoNameSession(
|
||||
export async function maybeAutoNameChat(
|
||||
ctx: InferenceContext,
|
||||
chatId: string,
|
||||
sessionId: string
|
||||
): Promise<void> {
|
||||
const counts = await ctx.sql<{ n: number }[]>`
|
||||
SELECT COUNT(*)::int AS n
|
||||
FROM messages
|
||||
WHERE session_id = ${sessionId}
|
||||
WHERE chat_id = ${chatId}
|
||||
AND role = 'assistant'
|
||||
AND status = 'complete'
|
||||
`;
|
||||
if (counts[0]?.n !== 1) return;
|
||||
|
||||
const sessionRows = await ctx.sql<
|
||||
{ id: string; name: string; model: string }[]
|
||||
const chatRows = await ctx.sql<
|
||||
{ id: string; name: string | null; session_id: string }[]
|
||||
>`
|
||||
SELECT id, name, model FROM sessions WHERE id = ${sessionId}
|
||||
SELECT id, name, session_id FROM chats WHERE id = ${chatId}
|
||||
`;
|
||||
const session = sessionRows[0];
|
||||
if (!session) return;
|
||||
const existingName = session.name ?? '';
|
||||
if (existingName !== '' && existingName !== 'New session') return;
|
||||
const chat = chatRows[0];
|
||||
if (!chat) return;
|
||||
if (chat.name !== null && chat.name !== '') return;
|
||||
|
||||
const sessionRows = await ctx.sql<{ model: string }[]>`
|
||||
SELECT model FROM sessions WHERE id = ${sessionId}
|
||||
`;
|
||||
const model = sessionRows[0]?.model;
|
||||
if (!model) return;
|
||||
|
||||
const userMsg = await ctx.sql<{ content: string }[]>`
|
||||
SELECT content FROM messages
|
||||
WHERE session_id = ${sessionId} AND role = 'user'
|
||||
WHERE chat_id = ${chatId} AND role = 'user'
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 1
|
||||
`;
|
||||
const assistantMsg = await ctx.sql<{ content: string }[]>`
|
||||
SELECT content FROM messages
|
||||
WHERE session_id = ${sessionId}
|
||||
WHERE chat_id = ${chatId}
|
||||
AND role = 'assistant'
|
||||
AND status = 'complete'
|
||||
ORDER BY created_at ASC
|
||||
@@ -105,7 +89,7 @@ export async function maybeAutoNameSession(
|
||||
const assistantText = assistantMsg[0].content.slice(0, 2000);
|
||||
|
||||
const body = {
|
||||
model: session.model,
|
||||
model,
|
||||
messages: [
|
||||
{ role: 'system', content: NAMING_SYSTEM_PROMPT },
|
||||
{
|
||||
@@ -116,9 +100,6 @@ export async function maybeAutoNameSession(
|
||||
max_tokens: 30,
|
||||
temperature: 0.3,
|
||||
stream: false,
|
||||
// Qwen-family models default to chain-of-thought; this template kwarg
|
||||
// tells llama.cpp's chat template renderer to skip the thinking block.
|
||||
// Harmless for non-Qwen models.
|
||||
chat_template_kwargs: { enable_thinking: false },
|
||||
};
|
||||
|
||||
@@ -135,23 +116,30 @@ export async function maybeAutoNameSession(
|
||||
const raw = pickTitleSource(data);
|
||||
const name = cleanTitle(raw);
|
||||
if (!name) {
|
||||
ctx.log.warn({ sessionId, raw }, 'auto-name: empty title from model');
|
||||
ctx.log.warn({ chatId, raw }, 'auto-name: empty title from model');
|
||||
return;
|
||||
}
|
||||
|
||||
const updated = await ctx.sql<{ id: string; name: string }[]>`
|
||||
UPDATE sessions
|
||||
SET name = ${name}, updated_at = NOW()
|
||||
WHERE id = ${sessionId}
|
||||
AND (name IS NULL OR name = '' OR name = 'New session')
|
||||
RETURNING id, name
|
||||
const updated = await ctx.sql<{ id: string; name: string; session_id: string; updated_at: string }[]>`
|
||||
UPDATE chats
|
||||
SET name = ${name}, updated_at = clock_timestamp()
|
||||
WHERE id = ${chatId}
|
||||
AND (name IS NULL OR name = '')
|
||||
RETURNING id, name, session_id, updated_at
|
||||
`;
|
||||
if (updated.length === 0) return;
|
||||
|
||||
ctx.publish(sessionId, {
|
||||
type: 'session_renamed',
|
||||
session_id: sessionId,
|
||||
type: 'chat_renamed',
|
||||
chat_id: chatId,
|
||||
name,
|
||||
});
|
||||
ctx.log.info({ sessionId, name }, 'session auto-named');
|
||||
ctx.publishUser({
|
||||
type: 'chat_updated',
|
||||
chat_id: chatId,
|
||||
session_id: sessionId,
|
||||
name,
|
||||
updated_at: updated[0]!.updated_at,
|
||||
});
|
||||
ctx.log.info({ chatId, name }, 'chat auto-named');
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import type { Config } from '../config.js';
|
||||
import type { Message, Project, Session, ToolCall, UserStreamFrame } from '../types/api.js';
|
||||
import { ALL_TOOLS, TOOLS_BY_NAME, toolJsonSchemas } from './tools.js';
|
||||
import { PathScopeError, resolveProjectRoot } from './path_guard.js';
|
||||
import { maybeAutoNameSession } from './auto_name.js';
|
||||
import { maybeAutoNameChat } from './auto_name.js';
|
||||
|
||||
const BASE_SYSTEM_PROMPT = (projectPath: string) =>
|
||||
`You are BooCode Chat, a code investigation assistant. The user is working on a project located at ${projectPath}. Use the file-read tools (view_file, list_dir, grep, find_files) to investigate code when needed. Be concise. Cite file paths and line numbers when discussing code. Do not hallucinate file contents — read the file first. Tool results may be truncated; if so, narrow your query rather than guessing.`;
|
||||
@@ -21,9 +21,11 @@ export interface InferenceFrame {
|
||||
| 'message_complete'
|
||||
| 'messages_deleted'
|
||||
| 'session_renamed'
|
||||
| 'chat_renamed'
|
||||
| 'error';
|
||||
message_id?: string;
|
||||
message_ids?: string[];
|
||||
chat_id?: string;
|
||||
tool_message_id?: string;
|
||||
tool_call_id?: string;
|
||||
role?: 'assistant' | 'tool' | 'user';
|
||||
@@ -101,8 +103,23 @@ export function buildMessagesPayload(
|
||||
}
|
||||
out.push({ role: 'system', content: systemPrompt });
|
||||
|
||||
for (const m of history) {
|
||||
// Find the latest compact marker — only send messages from that point onwards
|
||||
let startIdx = 0;
|
||||
for (let i = history.length - 1; i >= 0; i--) {
|
||||
if (history[i]!.kind === 'compact') {
|
||||
startIdx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (let i = startIdx; i < history.length; i++) {
|
||||
const m = history[i]!;
|
||||
if (m.kind === 'compact') {
|
||||
out.push({ role: 'system', content: m.content });
|
||||
continue;
|
||||
}
|
||||
if (m.role === 'assistant' && m.status === 'streaming') continue;
|
||||
if (m.role === 'assistant' && m.status === 'cancelled') continue;
|
||||
if (m.role === 'tool') {
|
||||
const tr = m.tool_results;
|
||||
if (!tr) continue;
|
||||
@@ -140,10 +157,11 @@ export function buildMessagesPayload(
|
||||
|
||||
async function loadContext(
|
||||
sql: Sql,
|
||||
sessionId: string
|
||||
sessionId: string,
|
||||
chatId: string
|
||||
): Promise<{ session: Session; project: Project; history: Message[] } | null> {
|
||||
const sessionRows = await sql<Session[]>`
|
||||
SELECT id, project_id, name, model, system_prompt, created_at, updated_at
|
||||
SELECT id, project_id, name, model, system_prompt, status, created_at, updated_at
|
||||
FROM sessions WHERE id = ${sessionId}
|
||||
`;
|
||||
if (sessionRows.length === 0) return null;
|
||||
@@ -157,10 +175,10 @@ async function loadContext(
|
||||
const project = projectRows[0]!;
|
||||
|
||||
const history = await sql<Message[]>`
|
||||
SELECT id, session_id, role, content, tool_calls, tool_results, status, last_seq,
|
||||
SELECT id, session_id, chat_id, role, content, kind, tool_calls, tool_results, status, last_seq,
|
||||
tokens_used, ctx_used, ctx_max, started_at, finished_at, created_at
|
||||
FROM messages
|
||||
WHERE session_id = ${sessionId}
|
||||
WHERE chat_id = ${chatId}
|
||||
ORDER BY created_at ASC, id ASC
|
||||
`;
|
||||
|
||||
@@ -204,7 +222,8 @@ async function streamCompletion(
|
||||
model: string,
|
||||
messages: OpenAiMessage[],
|
||||
includeTools: boolean,
|
||||
onDelta: (content: string) => void
|
||||
onDelta: (content: string) => void,
|
||||
signal?: AbortSignal
|
||||
): Promise<StreamResult> {
|
||||
const body: Record<string, unknown> = {
|
||||
model,
|
||||
@@ -221,6 +240,7 @@ async function streamCompletion(
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(body),
|
||||
signal,
|
||||
});
|
||||
if (!res.ok || !res.body) {
|
||||
const text = await res.text().catch(() => '');
|
||||
@@ -331,8 +351,10 @@ async function executeToolCall(
|
||||
async function runAssistantTurn(
|
||||
ctx: InferenceContext,
|
||||
sessionId: string,
|
||||
chatId: string,
|
||||
assistantMessageId: string,
|
||||
depth: number
|
||||
depth: number,
|
||||
signal?: AbortSignal
|
||||
): Promise<void> {
|
||||
if (depth > MAX_TOOL_LOOP_DEPTH) {
|
||||
await ctx.sql`
|
||||
@@ -345,12 +367,13 @@ async function runAssistantTurn(
|
||||
ctx.publish(sessionId, {
|
||||
type: 'error',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
error: 'tool loop depth exceeded',
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const loaded = await loadContext(ctx.sql, sessionId);
|
||||
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
||||
if (!loaded) {
|
||||
ctx.log.warn({ sessionId }, 'inference: session or project missing');
|
||||
return;
|
||||
@@ -370,6 +393,7 @@ async function runAssistantTurn(
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_started',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
role: 'assistant',
|
||||
});
|
||||
|
||||
@@ -408,21 +432,25 @@ async function runAssistantTurn(
|
||||
ctx.publish(sessionId, {
|
||||
type: 'delta',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
content: delta,
|
||||
});
|
||||
ctx.log.debug({ sessionId, delta }, 'inference delta');
|
||||
scheduleFlush();
|
||||
}
|
||||
},
|
||||
signal
|
||||
);
|
||||
} catch (err) {
|
||||
if (pendingFlushTimer) {
|
||||
clearTimeout(pendingFlushTimer);
|
||||
pendingFlushTimer = null;
|
||||
}
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
await flushPromise;
|
||||
const isAbort = err instanceof Error && err.name === 'AbortError';
|
||||
const finalStatus = isAbort ? 'cancelled' : 'failed';
|
||||
await ctx.sql`
|
||||
UPDATE messages
|
||||
SET status = 'failed',
|
||||
SET status = ${finalStatus},
|
||||
content = ${accumulated},
|
||||
finished_at = clock_timestamp()
|
||||
WHERE id = ${assistantMessageId}
|
||||
@@ -433,12 +461,23 @@ async function runAssistantTurn(
|
||||
RETURNING project_id, name, updated_at
|
||||
`;
|
||||
ctx.publishUser({ type: 'session_updated', session_id: sessionId, project_id: failSessRow!.project_id, name: failSessRow!.name, updated_at: failSessRow!.updated_at });
|
||||
ctx.publish(sessionId, {
|
||||
type: 'error',
|
||||
message_id: assistantMessageId,
|
||||
error: errMsg,
|
||||
});
|
||||
ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed');
|
||||
if (isAbort) {
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
});
|
||||
ctx.log.info({ sessionId, chatId, assistantMessageId }, 'inference cancelled');
|
||||
} else {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
ctx.publish(sessionId, {
|
||||
type: 'error',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
error: errMsg,
|
||||
});
|
||||
ctx.log.error({ err, sessionId, assistantMessageId }, 'inference failed');
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -475,12 +514,14 @@ async function runAssistantTurn(
|
||||
ctx.publish(sessionId, {
|
||||
type: 'tool_call',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
tool_call: tc,
|
||||
});
|
||||
}
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
tokens_used: updated?.tokens_used ?? null,
|
||||
ctx_used: updated?.ctx_used ?? null,
|
||||
ctx_max: updated?.ctx_max ?? null,
|
||||
@@ -492,8 +533,8 @@ async function runAssistantTurn(
|
||||
await Promise.all(
|
||||
toolCalls.map(async (tc) => {
|
||||
const [toolRow] = await ctx.sql<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, 'tool', '', 'complete', clock_timestamp())
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chatId}, 'tool', '', 'complete', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
const toolMessageId = toolRow!.id;
|
||||
@@ -512,6 +553,7 @@ async function runAssistantTurn(
|
||||
ctx.publish(sessionId, {
|
||||
type: 'tool_result',
|
||||
tool_message_id: toolMessageId,
|
||||
chat_id: chatId,
|
||||
tool_call_id: tc.id,
|
||||
output: tres.output,
|
||||
truncated: tres.truncated,
|
||||
@@ -521,11 +563,11 @@ async function runAssistantTurn(
|
||||
);
|
||||
|
||||
const [nextAssistant] = await ctx.sql<{ id: string }[]>`
|
||||
INSERT INTO messages (session_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
|
||||
RETURNING id
|
||||
`;
|
||||
await runAssistantTurn(ctx, sessionId, nextAssistant!.id, depth + 1);
|
||||
await runAssistantTurn(ctx, sessionId, chatId, nextAssistant!.id, depth + 1, signal);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -551,6 +593,7 @@ async function runAssistantTurn(
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: assistantMessageId,
|
||||
chat_id: chatId,
|
||||
tokens_used: updated?.tokens_used ?? null,
|
||||
ctx_used: updated?.ctx_used ?? null,
|
||||
ctx_max: updated?.ctx_max ?? null,
|
||||
@@ -561,6 +604,7 @@ async function runAssistantTurn(
|
||||
ctx.log.info(
|
||||
{
|
||||
sessionId,
|
||||
chatId,
|
||||
assistantMessageId,
|
||||
finishReason,
|
||||
chars: content.length,
|
||||
@@ -574,36 +618,153 @@ async function runAssistantTurn(
|
||||
export async function runInference(
|
||||
ctx: InferenceContext,
|
||||
sessionId: string,
|
||||
assistantMessageId: string
|
||||
chatId: string,
|
||||
assistantMessageId: string,
|
||||
signal?: AbortSignal
|
||||
): Promise<void> {
|
||||
return runAssistantTurn(ctx, sessionId, assistantMessageId, 0);
|
||||
return runAssistantTurn(ctx, sessionId, chatId, assistantMessageId, 0, signal);
|
||||
}
|
||||
|
||||
const COMPACT_SYSTEM_PROMPT =
|
||||
'Summarize the preceding conversation into a dense but complete context paragraph. Preserve all key facts, decisions, file paths, code patterns, and action items. Do not add any new information. Output only the summary paragraph.';
|
||||
|
||||
async function runCompact(
|
||||
ctx: InferenceContext,
|
||||
sessionId: string,
|
||||
chatId: string,
|
||||
compactMessageId: string
|
||||
): Promise<void> {
|
||||
const loaded = await loadContext(ctx.sql, sessionId, chatId);
|
||||
if (!loaded) return;
|
||||
const { session, project, history } = loaded;
|
||||
|
||||
const messagesForSummary = buildMessagesPayload(session, project,
|
||||
history.filter((m) => m.id !== compactMessageId)
|
||||
);
|
||||
messagesForSummary.push({
|
||||
role: 'system',
|
||||
content: COMPACT_SYSTEM_PROMPT,
|
||||
});
|
||||
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_started',
|
||||
message_id: compactMessageId,
|
||||
chat_id: chatId,
|
||||
role: 'assistant',
|
||||
});
|
||||
|
||||
let content = '';
|
||||
try {
|
||||
const result = await streamCompletion(
|
||||
ctx,
|
||||
session.model,
|
||||
messagesForSummary,
|
||||
false,
|
||||
(delta) => {
|
||||
content += delta;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'delta',
|
||||
message_id: compactMessageId,
|
||||
chat_id: chatId,
|
||||
content: delta,
|
||||
});
|
||||
}
|
||||
);
|
||||
content = result.content;
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
await ctx.sql`
|
||||
UPDATE messages SET status = 'failed', content = ${content}, finished_at = clock_timestamp()
|
||||
WHERE id = ${compactMessageId}
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'error',
|
||||
message_id: compactMessageId,
|
||||
chat_id: chatId,
|
||||
error: errMsg,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const preCompactCount = history.filter((m) => m.id !== compactMessageId && m.kind !== 'compact').length;
|
||||
const summary = `[Context compacted — ${preCompactCount} messages summarized]\n\n${content}`;
|
||||
|
||||
await ctx.sql`
|
||||
UPDATE messages SET content = ${summary}, status = 'complete', finished_at = clock_timestamp()
|
||||
WHERE id = ${compactMessageId}
|
||||
`;
|
||||
ctx.publish(sessionId, {
|
||||
type: 'message_complete',
|
||||
message_id: compactMessageId,
|
||||
chat_id: chatId,
|
||||
});
|
||||
}
|
||||
|
||||
interface InferenceRegistration {
|
||||
controller: AbortController;
|
||||
completed: Promise<void>;
|
||||
}
|
||||
|
||||
export function createInferenceRunner(
|
||||
ctx: Omit<InferenceContext, 'publishUser'>,
|
||||
publishUserFn: (user: string, frame: UserStreamFrame) => void
|
||||
) {
|
||||
const registry = new Map<string, InferenceRegistration>();
|
||||
|
||||
return {
|
||||
enqueue(sessionId: string, assistantMessageId: string, user: string) {
|
||||
enqueue(sessionId: string, chatId: string, assistantMessageId: string, user: string) {
|
||||
const callCtx: InferenceContext = {
|
||||
...ctx,
|
||||
publishUser: (frame) => publishUserFn(user, frame),
|
||||
};
|
||||
const controller = new AbortController();
|
||||
let resolveCompleted!: () => void;
|
||||
const completed = new Promise<void>((res) => { resolveCompleted = res; });
|
||||
const registration: InferenceRegistration = { controller, completed };
|
||||
registry.set(chatId, registration);
|
||||
void (async () => {
|
||||
try {
|
||||
await runInference(callCtx, sessionId, chatId, assistantMessageId, controller.signal);
|
||||
setImmediate(() => {
|
||||
void maybeAutoNameChat(callCtx, chatId, sessionId).catch((err: Error) => {
|
||||
callCtx.log.warn({ err, chatId }, 'auto-name failed');
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
callCtx.log.error({ err }, 'unhandled inference error');
|
||||
} finally {
|
||||
resolveCompleted();
|
||||
// Only clear our own registration; a force-send may have replaced it.
|
||||
if (registry.get(chatId) === registration) {
|
||||
registry.delete(chatId);
|
||||
}
|
||||
}
|
||||
})();
|
||||
},
|
||||
|
||||
enqueueCompact(sessionId: string, chatId: string, compactMessageId: string, user: string) {
|
||||
const callCtx: InferenceContext = {
|
||||
...ctx,
|
||||
publishUser: (frame) => publishUserFn(user, frame),
|
||||
};
|
||||
void (async () => {
|
||||
try {
|
||||
await runInference(callCtx, sessionId, assistantMessageId);
|
||||
setImmediate(() => {
|
||||
void maybeAutoNameSession(callCtx, sessionId).catch((err) => {
|
||||
callCtx.log.warn({ err, sessionId }, 'auto-name failed');
|
||||
});
|
||||
});
|
||||
await runCompact(callCtx, sessionId, chatId, compactMessageId);
|
||||
} catch (err) {
|
||||
callCtx.log.error({ err }, 'unhandled inference error');
|
||||
callCtx.log.error({ err }, 'unhandled compact error');
|
||||
}
|
||||
})();
|
||||
},
|
||||
|
||||
async cancel(_sessionId: string, chatId: string): Promise<boolean> {
|
||||
const reg = registry.get(chatId);
|
||||
if (!reg) return false;
|
||||
reg.controller.abort();
|
||||
// Swallow — we just need to wait for the catch/finally to persist state.
|
||||
await reg.completed.catch(() => {});
|
||||
return true;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// Reference to keep ALL_TOOLS imported for type checks if needed
|
||||
export const _toolNames = ALL_TOOLS.map((t) => t.name);
|
||||
|
||||
@@ -11,18 +11,39 @@ export interface AvailableProject {
|
||||
name: string;
|
||||
}
|
||||
|
||||
export type SessionStatus = 'open' | 'archived';
|
||||
|
||||
export interface Session {
|
||||
id: string;
|
||||
project_id: string;
|
||||
name: string;
|
||||
model: string;
|
||||
system_prompt: string;
|
||||
status: SessionStatus;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
}
|
||||
|
||||
export type MessageRole = 'user' | 'assistant' | 'tool';
|
||||
export type MessageStatus = 'streaming' | 'complete' | 'failed';
|
||||
export type ChatStatus = 'open' | 'closed';
|
||||
|
||||
export interface Chat {
|
||||
id: string;
|
||||
session_id: string;
|
||||
name: string | null;
|
||||
status: ChatStatus;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
}
|
||||
|
||||
// KEEP IN SYNC: apps/server/src/schema.sql messages_role_chk / messages_status_chk
|
||||
export const MESSAGE_ROLES = ['user', 'assistant', 'system', 'tool'] as const;
|
||||
export type MessageRole = typeof MESSAGE_ROLES[number];
|
||||
|
||||
export const MESSAGE_STATUSES = ['streaming', 'complete', 'failed', 'cancelled'] as const;
|
||||
export type MessageStatus = typeof MESSAGE_STATUSES[number];
|
||||
|
||||
export const MESSAGE_KINDS = ['message', 'compact'] as const;
|
||||
export type MessageKind = typeof MESSAGE_KINDS[number];
|
||||
|
||||
export interface ToolCall {
|
||||
id: string;
|
||||
@@ -40,8 +61,10 @@ export interface ToolResult {
|
||||
export interface Message {
|
||||
id: string;
|
||||
session_id: string;
|
||||
chat_id: string;
|
||||
role: MessageRole;
|
||||
content: string;
|
||||
kind: MessageKind;
|
||||
tool_calls: ToolCall[] | null;
|
||||
tool_results: ToolResult | null;
|
||||
status: MessageStatus;
|
||||
@@ -139,9 +162,35 @@ export interface SessionUpdatedFrame {
|
||||
name: string;
|
||||
updated_at: string;
|
||||
}
|
||||
export interface SessionArchivedFrame {
|
||||
type: 'session_archived';
|
||||
session_id: string;
|
||||
project_id: string;
|
||||
}
|
||||
export interface ChatCreatedFrame {
|
||||
type: 'chat_created';
|
||||
chat: Chat;
|
||||
session_id: string;
|
||||
}
|
||||
export interface ChatUpdatedFrame {
|
||||
type: 'chat_updated';
|
||||
chat_id: string;
|
||||
session_id: string;
|
||||
name: string | null;
|
||||
updated_at: string;
|
||||
}
|
||||
export interface ChatClosedFrame {
|
||||
type: 'chat_closed';
|
||||
chat_id: string;
|
||||
session_id: string;
|
||||
}
|
||||
export type UserStreamFrame =
|
||||
| ProjectCreatedFrame
|
||||
| ProjectDeletedFrame
|
||||
| SessionCreatedFrame
|
||||
| SessionDeletedFrame
|
||||
| SessionUpdatedFrame;
|
||||
| SessionUpdatedFrame
|
||||
| SessionArchivedFrame
|
||||
| ChatCreatedFrame
|
||||
| ChatUpdatedFrame
|
||||
| ChatClosedFrame;
|
||||
|
||||
Reference in New Issue
Block a user