diff --git a/apps/server/src/routes/chats.ts b/apps/server/src/routes/chats.ts index c7a072d..df601a5 100644 --- a/apps/server/src/routes/chats.ts +++ b/apps/server/src/routes/chats.ts @@ -18,6 +18,12 @@ const ForkBody = z.object({ name: z.string().min(1).max(200).optional(), }); +const DiscardStaleBody = z.object({ + message_id: z.string().uuid(), +}); + +const STALE_MIN_AGE_SECONDS = 60; + export function registerChatRoutes( app: FastifyInstance, sql: Sql, @@ -320,6 +326,73 @@ export function registerChatRoutes( } ); + // v1.12.3: explicit recovery from a stuck-streaming assistant row. The + // frontend gates this behind a 60s no-token-activity timer; the server + // re-checks the age and current status for safety. Non-streaming rows + // return 409 (frontend race; idempotent retry is fine). + app.post<{ Params: { id: string } }>( + '/api/chats/:id/discard_stale', + async (req, reply) => { + const parsed = DiscardStaleBody.safeParse(req.body ?? {}); + if (!parsed.success) { + reply.code(400); + return { error: 'invalid body', details: parsed.error.flatten() }; + } + const rows = await sql<{ + id: string; + session_id: string; + chat_id: string; + status: string; + age_seconds: number; + }[]>` + SELECT id, session_id, chat_id, status, + EXTRACT(EPOCH FROM (clock_timestamp() - created_at))::int AS age_seconds + FROM messages + WHERE id = ${parsed.data.message_id} AND chat_id = ${req.params.id} + `; + if (rows.length === 0) { + reply.code(404); + return { error: 'message not found in chat' }; + } + const msg = rows[0]!; + if (msg.status !== 'streaming') { + reply.code(409); + return { error: 'message is no longer streaming', current_status: msg.status }; + } + if (msg.age_seconds < STALE_MIN_AGE_SECONDS) { + reply.code(409); + return { error: 'message is not stale yet', age_seconds: msg.age_seconds }; + } + const updated = await sql` + UPDATE messages + SET status = 'failed', + content = COALESCE(content, ''), + finished_at = clock_timestamp() + WHERE id = ${msg.id} AND status = 'streaming' + RETURNING id, session_id, chat_id, role, content, kind, tool_calls, tool_results, + status, last_seq, tokens_used, ctx_used, ctx_max, started_at, finished_at, + created_at, metadata, summary, tail_start_id, compacted_at + `; + if (updated.length === 0) { + // Race: the row flipped out of 'streaming' between our SELECT and UPDATE. + reply.code(409); + return { error: 'message status changed mid-request' }; + } + broker.publishUser('default', { + type: 'chat_status', + chat_id: msg.chat_id, + status: 'idle', + at: new Date().toISOString(), + }); + broker.publish(msg.session_id, { + type: 'message_complete', + message_id: msg.id, + chat_id: msg.chat_id, + }); + return updated[0]; + } + ); + app.get<{ Params: { id: string } }>( '/api/chats/:id/messages', async (req, reply) => { diff --git a/apps/web/src/api/client.ts b/apps/web/src/api/client.ts index 504be11..f3f70ab 100644 --- a/apps/web/src/api/client.ts +++ b/apps/web/src/api/client.ts @@ -180,6 +180,11 @@ export const api = { request<{ ok: true }>(`/api/chats/${chatId}/compact`, { method: 'POST' }), stop: (chatId: string) => request<{ stopped: boolean }>(`/api/chats/${chatId}/stop`, { method: 'POST' }), + discardStale: (chatId: string, messageId: string) => + request(`/api/chats/${chatId}/discard_stale`, { + method: 'POST', + body: JSON.stringify({ message_id: messageId }), + }), forceSend: (chatId: string, content: string) => request<{ user_message_id: string; assistant_message_id: string }>( `/api/chats/${chatId}/force_send`, diff --git a/apps/web/src/components/StaleStreamBanner.tsx b/apps/web/src/components/StaleStreamBanner.tsx new file mode 100644 index 0000000..39124c1 --- /dev/null +++ b/apps/web/src/components/StaleStreamBanner.tsx @@ -0,0 +1,34 @@ +interface Props { + onRetry: () => void; + onDiscard: () => void; +} + +// v1.12.3: shown when an assistant message has been 'streaming' for 60+ +// seconds without new tokens. Lives above ChatInput in ChatPane. Retry +// discards the stuck row then resends the last user message; Discard just +// clears the row and drops the dot to idle. +export function StaleStreamBanner({ onRetry, onDiscard }: Props) { + return ( +
+ + Previous response didn't complete. + +
+ + +
+
+ ); +} diff --git a/apps/web/src/components/panes/ChatPane.tsx b/apps/web/src/components/panes/ChatPane.tsx index 547245b..1f9f6a7 100644 --- a/apps/web/src/components/panes/ChatPane.tsx +++ b/apps/web/src/components/panes/ChatPane.tsx @@ -5,6 +5,7 @@ import { api } from '@/api/client'; import { useSessionStream } from '@/hooks/useSessionStream'; import { MessageList } from '@/components/MessageList'; import { ChatInput } from '@/components/ChatInput'; +import { StaleStreamBanner } from '@/components/StaleStreamBanner'; import { DropdownMenu, DropdownMenuContent, @@ -44,6 +45,38 @@ export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange, const chatMessages = stream.messages.filter((m) => m.chat_id === chatId); const streaming = chatMessages.some((m) => m.status === 'streaming'); + + // v1.12.3: stale-stream detection. Watches the (at most one) streaming + // assistant row. If its content length doesn't grow for STALE_THRESHOLD_MS, + // assume the upstream call is dead and surface the recovery banner. We use + // content length as the activity signal because every token delta extends + // it; last_seq isn't currently bumped per delta. + const STALE_THRESHOLD_MS = 60_000; + const streamingMsg = chatMessages.find((m) => m.status === 'streaming' && m.role === 'assistant'); + const streamingId = streamingMsg?.id ?? null; + const streamingLen = streamingMsg?.content.length ?? 0; + const lastActivityRef = useRef<{ id: string; len: number; at: number } | null>(null); + const [stale, setStale] = useState(false); + useEffect(() => { + if (!streamingId) { + lastActivityRef.current = null; + setStale(false); + return; + } + const prev = lastActivityRef.current; + if (!prev || prev.id !== streamingId || prev.len !== streamingLen) { + lastActivityRef.current = { id: streamingId, len: streamingLen, at: Date.now() }; + setStale(false); + } + const interval = setInterval(() => { + const a = lastActivityRef.current; + if (!a) return; + if (Date.now() - a.at >= STALE_THRESHOLD_MS) { + setStale(true); + } + }, 5_000); + return () => clearInterval(interval); + }, [streamingId, streamingLen]); // v1.11.5: per-chat model context limit comes from chat.model_context_limit // populated by GET /api/sessions/:id/chats. Threaded into ChatInput so // ContextBar can render a zero-state before the first assistant message. @@ -87,6 +120,45 @@ export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange, } } + const handleDiscardStale = useCallback(async () => { + if (!streamingId) return; + try { + await api.chats.discardStale(chatId, streamingId); + setStale(false); + lastActivityRef.current = null; + } catch (err) { + // 409 (race) is benign — the row already terminated some other way. + const msg = err instanceof Error ? err.message : 'discard failed'; + if (!msg.includes('409')) toast.error(msg); + setStale(false); + } + }, [chatId, streamingId]); + + const handleRetryStale = useCallback(async () => { + if (!streamingId) return; + const lastUser = [...chatMessages].reverse().find((m) => m.role === 'user' && m.kind === 'message'); + if (!lastUser) { + toast.error('no prior user message to retry'); + return; + } + try { + await api.chats.discardStale(chatId, streamingId); + } catch (err) { + const msg = err instanceof Error ? err.message : 'discard failed'; + if (!msg.includes('409')) { + toast.error(msg); + return; + } + } + setStale(false); + lastActivityRef.current = null; + try { + await api.messages.send(chatId, lastUser.content); + } catch (err) { + toast.error(err instanceof Error ? err.message : 'retry send failed'); + } + }, [chatId, streamingId, chatMessages]); + const handleForceSend = useCallback(async (content: string) => { const trimmed = content.trim(); if (!trimmed) return; @@ -187,6 +259,13 @@ export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange, )} + {stale && streamingId && ( + void handleRetryStale()} + onDiscard={() => void handleDiscardStale()} + /> + )} +