Compare commits
1 Commits
v1.12.2-li
...
v1.12.3-st
| Author | SHA1 | Date | |
|---|---|---|---|
| eef4782383 |
@@ -18,6 +18,12 @@ const ForkBody = z.object({
|
|||||||
name: z.string().min(1).max(200).optional(),
|
name: z.string().min(1).max(200).optional(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const DiscardStaleBody = z.object({
|
||||||
|
message_id: z.string().uuid(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const STALE_MIN_AGE_SECONDS = 60;
|
||||||
|
|
||||||
export function registerChatRoutes(
|
export function registerChatRoutes(
|
||||||
app: FastifyInstance,
|
app: FastifyInstance,
|
||||||
sql: Sql,
|
sql: Sql,
|
||||||
@@ -320,6 +326,73 @@ export function registerChatRoutes(
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// v1.12.3: explicit recovery from a stuck-streaming assistant row. The
|
||||||
|
// frontend gates this behind a 60s no-token-activity timer; the server
|
||||||
|
// re-checks the age and current status for safety. Non-streaming rows
|
||||||
|
// return 409 (frontend race; idempotent retry is fine).
|
||||||
|
app.post<{ Params: { id: string } }>(
|
||||||
|
'/api/chats/:id/discard_stale',
|
||||||
|
async (req, reply) => {
|
||||||
|
const parsed = DiscardStaleBody.safeParse(req.body ?? {});
|
||||||
|
if (!parsed.success) {
|
||||||
|
reply.code(400);
|
||||||
|
return { error: 'invalid body', details: parsed.error.flatten() };
|
||||||
|
}
|
||||||
|
const rows = await sql<{
|
||||||
|
id: string;
|
||||||
|
session_id: string;
|
||||||
|
chat_id: string;
|
||||||
|
status: string;
|
||||||
|
age_seconds: number;
|
||||||
|
}[]>`
|
||||||
|
SELECT id, session_id, chat_id, status,
|
||||||
|
EXTRACT(EPOCH FROM (clock_timestamp() - created_at))::int AS age_seconds
|
||||||
|
FROM messages
|
||||||
|
WHERE id = ${parsed.data.message_id} AND chat_id = ${req.params.id}
|
||||||
|
`;
|
||||||
|
if (rows.length === 0) {
|
||||||
|
reply.code(404);
|
||||||
|
return { error: 'message not found in chat' };
|
||||||
|
}
|
||||||
|
const msg = rows[0]!;
|
||||||
|
if (msg.status !== 'streaming') {
|
||||||
|
reply.code(409);
|
||||||
|
return { error: 'message is no longer streaming', current_status: msg.status };
|
||||||
|
}
|
||||||
|
if (msg.age_seconds < STALE_MIN_AGE_SECONDS) {
|
||||||
|
reply.code(409);
|
||||||
|
return { error: 'message is not stale yet', age_seconds: msg.age_seconds };
|
||||||
|
}
|
||||||
|
const updated = await sql<Message[]>`
|
||||||
|
UPDATE messages
|
||||||
|
SET status = 'failed',
|
||||||
|
content = COALESCE(content, ''),
|
||||||
|
finished_at = clock_timestamp()
|
||||||
|
WHERE id = ${msg.id} AND status = 'streaming'
|
||||||
|
RETURNING id, session_id, chat_id, role, content, kind, tool_calls, tool_results,
|
||||||
|
status, last_seq, tokens_used, ctx_used, ctx_max, started_at, finished_at,
|
||||||
|
created_at, metadata, summary, tail_start_id, compacted_at
|
||||||
|
`;
|
||||||
|
if (updated.length === 0) {
|
||||||
|
// Race: the row flipped out of 'streaming' between our SELECT and UPDATE.
|
||||||
|
reply.code(409);
|
||||||
|
return { error: 'message status changed mid-request' };
|
||||||
|
}
|
||||||
|
broker.publishUser('default', {
|
||||||
|
type: 'chat_status',
|
||||||
|
chat_id: msg.chat_id,
|
||||||
|
status: 'idle',
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
broker.publish(msg.session_id, {
|
||||||
|
type: 'message_complete',
|
||||||
|
message_id: msg.id,
|
||||||
|
chat_id: msg.chat_id,
|
||||||
|
});
|
||||||
|
return updated[0];
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
app.get<{ Params: { id: string } }>(
|
app.get<{ Params: { id: string } }>(
|
||||||
'/api/chats/:id/messages',
|
'/api/chats/:id/messages',
|
||||||
async (req, reply) => {
|
async (req, reply) => {
|
||||||
|
|||||||
@@ -180,6 +180,11 @@ export const api = {
|
|||||||
request<{ ok: true }>(`/api/chats/${chatId}/compact`, { method: 'POST' }),
|
request<{ ok: true }>(`/api/chats/${chatId}/compact`, { method: 'POST' }),
|
||||||
stop: (chatId: string) =>
|
stop: (chatId: string) =>
|
||||||
request<{ stopped: boolean }>(`/api/chats/${chatId}/stop`, { method: 'POST' }),
|
request<{ stopped: boolean }>(`/api/chats/${chatId}/stop`, { method: 'POST' }),
|
||||||
|
discardStale: (chatId: string, messageId: string) =>
|
||||||
|
request<Message>(`/api/chats/${chatId}/discard_stale`, {
|
||||||
|
method: 'POST',
|
||||||
|
body: JSON.stringify({ message_id: messageId }),
|
||||||
|
}),
|
||||||
forceSend: (chatId: string, content: string) =>
|
forceSend: (chatId: string, content: string) =>
|
||||||
request<{ user_message_id: string; assistant_message_id: string }>(
|
request<{ user_message_id: string; assistant_message_id: string }>(
|
||||||
`/api/chats/${chatId}/force_send`,
|
`/api/chats/${chatId}/force_send`,
|
||||||
|
|||||||
34
apps/web/src/components/StaleStreamBanner.tsx
Normal file
34
apps/web/src/components/StaleStreamBanner.tsx
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
interface Props {
|
||||||
|
onRetry: () => void;
|
||||||
|
onDiscard: () => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
// v1.12.3: shown when an assistant message has been 'streaming' for 60+
|
||||||
|
// seconds without new tokens. Lives above ChatInput in ChatPane. Retry
|
||||||
|
// discards the stuck row then resends the last user message; Discard just
|
||||||
|
// clears the row and drops the dot to idle.
|
||||||
|
export function StaleStreamBanner({ onRetry, onDiscard }: Props) {
|
||||||
|
return (
|
||||||
|
<div className="border border-amber-500/30 bg-amber-500/5 rounded-md p-3 mb-2 mx-4 flex items-center justify-between gap-2">
|
||||||
|
<span className="text-sm text-muted-foreground">
|
||||||
|
Previous response didn't complete.
|
||||||
|
</span>
|
||||||
|
<div className="flex gap-2">
|
||||||
|
<button
|
||||||
|
type="button"
|
||||||
|
onClick={onRetry}
|
||||||
|
className="text-xs px-2 py-1 rounded border border-border hover:bg-accent max-md:min-h-[44px] max-md:px-3"
|
||||||
|
>
|
||||||
|
Retry
|
||||||
|
</button>
|
||||||
|
<button
|
||||||
|
type="button"
|
||||||
|
onClick={onDiscard}
|
||||||
|
className="text-xs px-2 py-1 rounded border border-border hover:bg-accent max-md:min-h-[44px] max-md:px-3"
|
||||||
|
>
|
||||||
|
Discard
|
||||||
|
</button>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ import { api } from '@/api/client';
|
|||||||
import { useSessionStream } from '@/hooks/useSessionStream';
|
import { useSessionStream } from '@/hooks/useSessionStream';
|
||||||
import { MessageList } from '@/components/MessageList';
|
import { MessageList } from '@/components/MessageList';
|
||||||
import { ChatInput } from '@/components/ChatInput';
|
import { ChatInput } from '@/components/ChatInput';
|
||||||
|
import { StaleStreamBanner } from '@/components/StaleStreamBanner';
|
||||||
import {
|
import {
|
||||||
DropdownMenu,
|
DropdownMenu,
|
||||||
DropdownMenuContent,
|
DropdownMenuContent,
|
||||||
@@ -44,6 +45,38 @@ export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange,
|
|||||||
|
|
||||||
const chatMessages = stream.messages.filter((m) => m.chat_id === chatId);
|
const chatMessages = stream.messages.filter((m) => m.chat_id === chatId);
|
||||||
const streaming = chatMessages.some((m) => m.status === 'streaming');
|
const streaming = chatMessages.some((m) => m.status === 'streaming');
|
||||||
|
|
||||||
|
// v1.12.3: stale-stream detection. Watches the (at most one) streaming
|
||||||
|
// assistant row. If its content length doesn't grow for STALE_THRESHOLD_MS,
|
||||||
|
// assume the upstream call is dead and surface the recovery banner. We use
|
||||||
|
// content length as the activity signal because every token delta extends
|
||||||
|
// it; last_seq isn't currently bumped per delta.
|
||||||
|
const STALE_THRESHOLD_MS = 60_000;
|
||||||
|
const streamingMsg = chatMessages.find((m) => m.status === 'streaming' && m.role === 'assistant');
|
||||||
|
const streamingId = streamingMsg?.id ?? null;
|
||||||
|
const streamingLen = streamingMsg?.content.length ?? 0;
|
||||||
|
const lastActivityRef = useRef<{ id: string; len: number; at: number } | null>(null);
|
||||||
|
const [stale, setStale] = useState(false);
|
||||||
|
useEffect(() => {
|
||||||
|
if (!streamingId) {
|
||||||
|
lastActivityRef.current = null;
|
||||||
|
setStale(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const prev = lastActivityRef.current;
|
||||||
|
if (!prev || prev.id !== streamingId || prev.len !== streamingLen) {
|
||||||
|
lastActivityRef.current = { id: streamingId, len: streamingLen, at: Date.now() };
|
||||||
|
setStale(false);
|
||||||
|
}
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
const a = lastActivityRef.current;
|
||||||
|
if (!a) return;
|
||||||
|
if (Date.now() - a.at >= STALE_THRESHOLD_MS) {
|
||||||
|
setStale(true);
|
||||||
|
}
|
||||||
|
}, 5_000);
|
||||||
|
return () => clearInterval(interval);
|
||||||
|
}, [streamingId, streamingLen]);
|
||||||
// v1.11.5: per-chat model context limit comes from chat.model_context_limit
|
// v1.11.5: per-chat model context limit comes from chat.model_context_limit
|
||||||
// populated by GET /api/sessions/:id/chats. Threaded into ChatInput so
|
// populated by GET /api/sessions/:id/chats. Threaded into ChatInput so
|
||||||
// ContextBar can render a zero-state before the first assistant message.
|
// ContextBar can render a zero-state before the first assistant message.
|
||||||
@@ -87,6 +120,45 @@ export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const handleDiscardStale = useCallback(async () => {
|
||||||
|
if (!streamingId) return;
|
||||||
|
try {
|
||||||
|
await api.chats.discardStale(chatId, streamingId);
|
||||||
|
setStale(false);
|
||||||
|
lastActivityRef.current = null;
|
||||||
|
} catch (err) {
|
||||||
|
// 409 (race) is benign — the row already terminated some other way.
|
||||||
|
const msg = err instanceof Error ? err.message : 'discard failed';
|
||||||
|
if (!msg.includes('409')) toast.error(msg);
|
||||||
|
setStale(false);
|
||||||
|
}
|
||||||
|
}, [chatId, streamingId]);
|
||||||
|
|
||||||
|
const handleRetryStale = useCallback(async () => {
|
||||||
|
if (!streamingId) return;
|
||||||
|
const lastUser = [...chatMessages].reverse().find((m) => m.role === 'user' && m.kind === 'message');
|
||||||
|
if (!lastUser) {
|
||||||
|
toast.error('no prior user message to retry');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await api.chats.discardStale(chatId, streamingId);
|
||||||
|
} catch (err) {
|
||||||
|
const msg = err instanceof Error ? err.message : 'discard failed';
|
||||||
|
if (!msg.includes('409')) {
|
||||||
|
toast.error(msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
setStale(false);
|
||||||
|
lastActivityRef.current = null;
|
||||||
|
try {
|
||||||
|
await api.messages.send(chatId, lastUser.content);
|
||||||
|
} catch (err) {
|
||||||
|
toast.error(err instanceof Error ? err.message : 'retry send failed');
|
||||||
|
}
|
||||||
|
}, [chatId, streamingId, chatMessages]);
|
||||||
|
|
||||||
const handleForceSend = useCallback(async (content: string) => {
|
const handleForceSend = useCallback(async (content: string) => {
|
||||||
const trimmed = content.trim();
|
const trimmed = content.trim();
|
||||||
if (!trimmed) return;
|
if (!trimmed) return;
|
||||||
@@ -187,6 +259,13 @@ export function ChatPane({ sessionId, chatId, projectId, agentId, onAgentChange,
|
|||||||
</div>
|
</div>
|
||||||
)}
|
)}
|
||||||
|
|
||||||
|
{stale && streamingId && (
|
||||||
|
<StaleStreamBanner
|
||||||
|
onRetry={() => void handleRetryStale()}
|
||||||
|
onDiscard={() => void handleDiscardStale()}
|
||||||
|
/>
|
||||||
|
)}
|
||||||
|
|
||||||
<ChatInput
|
<ChatInput
|
||||||
disabled={false}
|
disabled={false}
|
||||||
projectId={projectId}
|
projectId={projectId}
|
||||||
|
|||||||
Reference in New Issue
Block a user