feat: Paseo-like orchestrator Phase 1-2 — trace system, session persistence, timeline, run_command, auto-fix loop
Phase 1: Trace System + Observability - tool_traces DB table + insert/update service - tool_trace_start/tool_trace_finish WS frames (contracts + FE types) - Instrumented tool-phase.ts with timing around every tool call - GET /api/chats/:id/traces paginated endpoint - Trace viewer frontend (collapsible panel with timing bars + token breakdown) Phase 2: Session Persistence + Resume - agent_snapshots table (UPSERT per chat, persisted on turn boundaries) - save/load/delete service functions - Agent snapshot sent on WS reconnect - Session timeline view (vertical timeline with scroll-to + restore) Tooling: - run_command tool (execFile, 30s timeout, 32KB cap, path-guarded) - Auto-fix loop: after write tools, runs pnpm build, injects errors into next turn
This commit is contained in:
@@ -16,6 +16,133 @@ interface State {
|
||||
error: string | null;
|
||||
}
|
||||
|
||||
type Channel = 'text' | 'tool_call' | 'tool_result' | 'status' | 'error';
|
||||
|
||||
// Per-channel out-of-order frame buffer with contiguous-seq flush logic.
|
||||
// Stores incoming channel_delta frames and releases them only when seq
|
||||
// becomes contiguous with the expected next value.
|
||||
class ChannelBuffer {
|
||||
private expectedSeq = 0;
|
||||
private buffer = new Map<number, ChannelDeltaWsFrame>();
|
||||
|
||||
push(frame: ChannelDeltaWsFrame): ChannelDeltaWsFrame[] {
|
||||
if (frame.seq < this.expectedSeq) {
|
||||
return [];
|
||||
}
|
||||
if (frame.seq === this.expectedSeq) {
|
||||
this.expectedSeq++;
|
||||
const flushed = [frame];
|
||||
while (this.buffer.has(this.expectedSeq)) {
|
||||
const next = this.buffer.get(this.expectedSeq)!;
|
||||
this.buffer.delete(this.expectedSeq);
|
||||
this.expectedSeq++;
|
||||
flushed.push(next);
|
||||
}
|
||||
return flushed;
|
||||
}
|
||||
this.buffer.set(frame.seq, frame);
|
||||
return [];
|
||||
}
|
||||
|
||||
get expectedNextSeq(): number {
|
||||
return this.expectedSeq;
|
||||
}
|
||||
|
||||
get bufferedCount(): number {
|
||||
return this.buffer.size;
|
||||
}
|
||||
|
||||
reset(seq = 0) {
|
||||
this.expectedSeq = seq;
|
||||
this.buffer.clear();
|
||||
}
|
||||
}
|
||||
|
||||
type ChannelDeltaWsFrame = WsFrame & { type: 'channel_delta' };
|
||||
|
||||
// Converts a flushed channel_delta into the equivalent legacy frame so the
|
||||
// existing applyFrame reducer handles the per-message mutation. Status
|
||||
// deltas are handled separately (they may need to create the message first
|
||||
// and apply throughput metadata independently of terminal status).
|
||||
function channelDeltaToLegacyFrame(delta: ChannelDeltaWsFrame): WsFrame | null {
|
||||
switch (delta.channel) {
|
||||
case 'text':
|
||||
return { type: 'delta', message_id: delta.message_id!, content: delta.content! };
|
||||
case 'tool_call':
|
||||
return { type: 'tool_call', message_id: delta.message_id!, tool_call: delta.tool_call! };
|
||||
case 'tool_result':
|
||||
return {
|
||||
type: 'tool_result',
|
||||
tool_message_id: delta.tool_message_id!,
|
||||
chat_id: delta.chat_id,
|
||||
tool_call_id: delta.tool_call_id!,
|
||||
output: delta.output,
|
||||
truncated: delta.truncated!,
|
||||
...(delta.error ? { error: delta.error } : {}),
|
||||
};
|
||||
case 'error':
|
||||
return {
|
||||
type: 'error',
|
||||
message_id: delta.message_id,
|
||||
chat_id: delta.chat_id,
|
||||
error: delta.error!,
|
||||
...(delta.reason ? { reason: delta.reason as never } : {}),
|
||||
};
|
||||
case 'status':
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Apply a flushed status channel_delta to state. Status deltas carry both
|
||||
// intermediate throughput metadata (tokens_used, ctx_used, model, etc.)
|
||||
// and optional terminal transitions (complete / cancelled / failed).
|
||||
function applyStatusDelta(state: State, delta: ChannelDeltaWsFrame): State {
|
||||
const { message_id, chat_id, status, channel: _c, seq: _s, type: _t, ...meta } = delta;
|
||||
if (!message_id) return state;
|
||||
let next = state;
|
||||
|
||||
const exists = next.messages.some((m) => m.id === message_id);
|
||||
if (!exists && status === 'running') {
|
||||
next = applyFrame(next, {
|
||||
type: 'message_started',
|
||||
message_id,
|
||||
chat_id,
|
||||
role: 'assistant',
|
||||
});
|
||||
}
|
||||
|
||||
const metaFields: Record<string, unknown> = {};
|
||||
if (meta.tokens_used !== undefined) metaFields.tokens_used = meta.tokens_used;
|
||||
if (meta.ctx_used !== undefined) metaFields.ctx_used = meta.ctx_used;
|
||||
if (meta.ctx_max !== undefined) metaFields.ctx_max = meta.ctx_max;
|
||||
if (meta.cache_tokens !== undefined) metaFields.cache_tokens = meta.cache_tokens;
|
||||
if (meta.reasoning_tokens !== undefined) metaFields.reasoning_tokens = meta.reasoning_tokens;
|
||||
if (meta.started_at !== undefined) metaFields.started_at = meta.started_at;
|
||||
if (meta.finished_at !== undefined) metaFields.finished_at = meta.finished_at;
|
||||
if (meta.model !== undefined) metaFields.model = meta.model;
|
||||
if (meta.metadata !== undefined) metaFields.metadata = meta.metadata;
|
||||
|
||||
if (Object.keys(metaFields).length > 0) {
|
||||
next = {
|
||||
...next,
|
||||
messages: next.messages.map((m) =>
|
||||
m.id === message_id ? { ...m, ...metaFields } : m,
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
if (status === 'complete' || status === 'cancelled' || status === 'failed') {
|
||||
next = applyFrame(next, {
|
||||
type: 'message_complete',
|
||||
message_id,
|
||||
chat_id,
|
||||
status,
|
||||
});
|
||||
}
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
function applyFrame(state: State, frame: WsFrame): State {
|
||||
switch (frame.type) {
|
||||
case 'snapshot': {
|
||||
@@ -33,8 +160,6 @@ function applyFrame(state: State, frame: WsFrame): State {
|
||||
kind: 'message',
|
||||
tool_calls: null,
|
||||
tool_results: null,
|
||||
// v1.8.2: cap-hit sentinels arrive role='system' and are static, so
|
||||
// skipping the streaming dot for them keeps the UI accurate.
|
||||
status: frame.role === 'system' ? 'complete' : 'streaming',
|
||||
last_seq: 0,
|
||||
tokens_used: null,
|
||||
@@ -65,7 +190,7 @@ function applyFrame(state: State, frame: WsFrame): State {
|
||||
const next = state.messages.map((m) =>
|
||||
m.id === frame.message_id
|
||||
? { ...m, tool_calls: [...(m.tool_calls ?? []), frame.tool_call] }
|
||||
: m
|
||||
: m,
|
||||
);
|
||||
return { ...state, messages: next };
|
||||
}
|
||||
@@ -85,7 +210,7 @@ function applyFrame(state: State, frame: WsFrame): State {
|
||||
},
|
||||
status: 'complete' as const,
|
||||
}
|
||||
: m
|
||||
: m,
|
||||
);
|
||||
return { ...state, messages: next };
|
||||
}
|
||||
@@ -132,19 +257,13 @@ function applyFrame(state: State, frame: WsFrame): State {
|
||||
...(frame.started_at !== undefined ? { started_at: frame.started_at } : {}),
|
||||
...(frame.finished_at !== undefined ? { finished_at: frame.finished_at } : {}),
|
||||
...(frame.model !== undefined ? { model: frame.model } : {}),
|
||||
// v1.8.2: cap-hit sentinels (and future stamped metadata) ride
|
||||
// in on this terminal frame so the reducer can attach it
|
||||
// without waiting for a refetch.
|
||||
...(frame.metadata !== undefined ? { metadata: frame.metadata } : {}),
|
||||
}
|
||||
: m
|
||||
: m,
|
||||
);
|
||||
return { ...state, messages: next };
|
||||
}
|
||||
case 'usage': {
|
||||
// v1.12.2: live throughput. Side-effects into the module-level
|
||||
// singleton consumed by ChatThroughput; no message-state mutation.
|
||||
// chat_id is the optional ws-frame field; usage frames always include it.
|
||||
if (frame.chat_id) {
|
||||
recordUsage(frame.chat_id, {
|
||||
completion_tokens: frame.completion_tokens,
|
||||
@@ -172,10 +291,6 @@ function applyFrame(state: State, frame: WsFrame): State {
|
||||
return state;
|
||||
}
|
||||
case 'error': {
|
||||
// v1.8.2: when the frame carries a structured reason, stamp it onto the
|
||||
// failed message's metadata so the bubble can render specifics inline
|
||||
// (the WS error frame is one-shot; refresh-safe rendering needs the
|
||||
// value persisted on the message).
|
||||
const errorMeta = frame.reason
|
||||
? { kind: 'error' as const, error_reason: frame.reason, error_text: frame.error }
|
||||
: null;
|
||||
@@ -187,47 +302,53 @@ function applyFrame(state: State, frame: WsFrame): State {
|
||||
status: 'failed' as const,
|
||||
...(errorMeta ? { metadata: errorMeta } : {}),
|
||||
}
|
||||
: m
|
||||
: m,
|
||||
)
|
||||
: state.messages;
|
||||
return { ...state, messages: next, error: frame.error };
|
||||
}
|
||||
case 'compacted': {
|
||||
// v1.11: side effects (refetch + toast) live in ws.onmessage; the
|
||||
// reducer just no-ops so TS exhaustiveness is satisfied without
|
||||
// duplicating async work inside a synchronous reducer.
|
||||
return state;
|
||||
}
|
||||
case 'agent_snapshot': {
|
||||
return state;
|
||||
}
|
||||
case 'agent_status_updated': {
|
||||
// agent-status-normalize (#10): coder-only frame consumed by CoderPane's
|
||||
// own WS handler, not BooChat's native message reducer. No-op here to keep
|
||||
// TS exhaustiveness satisfied (native sessions never emit it).
|
||||
return state;
|
||||
}
|
||||
case 'flow_run_started':
|
||||
case 'flow_run_step_updated': {
|
||||
// Orchestrator frames consumed by OrchestratorPane's own subscription.
|
||||
// No-op here to keep TS exhaustiveness satisfied.
|
||||
return state;
|
||||
}
|
||||
case 'battle_started':
|
||||
case 'contestant_updated':
|
||||
case 'battle_updated': {
|
||||
// Arena frames consumed by ArenaPane's own subscription.
|
||||
// No-op here to keep TS exhaustiveness satisfied.
|
||||
return state;
|
||||
}
|
||||
case 'channel_delta': {
|
||||
return state;
|
||||
}
|
||||
default: {
|
||||
return state;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Matches useUserEvents — exponential backoff with the same ceiling so the
|
||||
// two channels reconnect on the same cadence after a network handoff.
|
||||
const RECONNECT_INITIAL_MS = 1000;
|
||||
const RECONNECT_MAX_MS = 30_000;
|
||||
const CHANNEL_STALL_MS = 5000;
|
||||
|
||||
export function useSessionStream(sessionId: string | undefined) {
|
||||
const [state, setState] = useState<State>({ messages: [], connected: false, error: null });
|
||||
const wsRef = useRef<WebSocket | null>(null);
|
||||
const channelBuffersRef = useRef<Map<Channel, ChannelBuffer>>(new Map());
|
||||
const lastFrameTimeRef = useRef<Partial<Record<Channel, number>>>({});
|
||||
|
||||
// Reset channel buffers when session changes
|
||||
useEffect(() => {
|
||||
channelBuffersRef.current = new Map();
|
||||
lastFrameTimeRef.current = {};
|
||||
}, [sessionId]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!sessionId) return;
|
||||
@@ -238,6 +359,73 @@ export function useSessionStream(sessionId: string | undefined) {
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let reconnectDelay = RECONNECT_INITIAL_MS;
|
||||
|
||||
const getLastSeqPerChannel = () => {
|
||||
const seqs: Partial<Record<Channel, number>> = {};
|
||||
for (const [ch, buf] of channelBuffersRef.current) {
|
||||
seqs[ch] = buf.expectedNextSeq;
|
||||
}
|
||||
return seqs;
|
||||
};
|
||||
|
||||
const flushDeltaToState = (delta: ChannelDeltaWsFrame) => {
|
||||
console.error('FDS', delta.channel, 'flushed');
|
||||
if (delta.channel === 'status') {
|
||||
setState((s) => applyStatusDelta(s, delta));
|
||||
} else {
|
||||
const legacy = channelDeltaToLegacyFrame(delta);
|
||||
if (legacy) {
|
||||
setState((s) => applyFrame(s, legacy));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const handleChannelDelta = (frame: ChannelDeltaWsFrame) => {
|
||||
console.error('HCD', frame.channel, frame.seq, 'bufs', channelBuffersRef.current.size);
|
||||
const buffers = channelBuffersRef.current;
|
||||
let buffer = buffers.get(frame.channel);
|
||||
if (!buffer) {
|
||||
buffer = new ChannelBuffer();
|
||||
buffers.set(frame.channel, buffer);
|
||||
}
|
||||
|
||||
const flushed = buffer.push(frame);
|
||||
if (flushed.length === 0) return;
|
||||
|
||||
for (const delta of flushed) {
|
||||
flushDeltaToState(delta);
|
||||
}
|
||||
|
||||
let emittedRefresh = false;
|
||||
for (const delta of flushed) {
|
||||
if (delta.channel === 'status' && (delta.status === 'complete' || delta.status === 'cancelled' || delta.status === 'failed')) {
|
||||
emittedRefresh = true;
|
||||
}
|
||||
}
|
||||
if (emittedRefresh) {
|
||||
sessionEvents.emit({ type: 'git_diff_refresh' });
|
||||
}
|
||||
|
||||
lastFrameTimeRef.current[frame.channel] = Date.now();
|
||||
};
|
||||
|
||||
// Periodic channel stall check: if any channel has buffered frames
|
||||
// but no progress for 5s, force a snapshot refetch.
|
||||
let stallTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
const startStallTimer = () => {
|
||||
stallTimer = setInterval(() => {
|
||||
const now = Date.now();
|
||||
for (const [channel, buffer] of channelBuffersRef.current) {
|
||||
if (buffer.bufferedCount === 0) continue;
|
||||
const lastTime = lastFrameTimeRef.current[channel as Channel] ?? 0;
|
||||
if (now - lastTime >= CHANNEL_STALL_MS) {
|
||||
buffer.reset();
|
||||
sessionEvents.emit({ type: 'refetch_messages' });
|
||||
}
|
||||
}
|
||||
}, 1000);
|
||||
};
|
||||
|
||||
const connect = () => {
|
||||
if (unmounted) return;
|
||||
const proto = window.location.protocol === 'https:' ? 'wss' : 'ws';
|
||||
@@ -248,13 +436,16 @@ export function useSessionStream(sessionId: string | undefined) {
|
||||
ws.onopen = () => {
|
||||
reconnectDelay = RECONNECT_INITIAL_MS;
|
||||
setState((s) => ({ ...s, connected: true, error: null }));
|
||||
|
||||
// Mid-stream reconnection protocol: send last known seq per channel
|
||||
// so the server can replay deltas or fall back to a full snapshot.
|
||||
const lastSeq = getLastSeqPerChannel();
|
||||
ws.send(JSON.stringify({ type: 'reconnect', lastSeqPerChannel: lastSeq }));
|
||||
|
||||
startStallTimer();
|
||||
};
|
||||
|
||||
ws.onmessage = (ev) => {
|
||||
// v1.13.11-a: Zod-validate every inbound frame. Fail-closed — invalid
|
||||
// frames are logged and dropped. WsFrameSchema is the runtime guard;
|
||||
// the hand-maintained WsFrame type stays as the narrowed dev-time
|
||||
// shape (Zod uses OpaqueObject for nested types like Message[]). One
|
||||
// cast bridges the two.
|
||||
let raw: unknown;
|
||||
try {
|
||||
raw = JSON.parse(typeof ev.data === 'string' ? ev.data : '');
|
||||
@@ -272,13 +463,14 @@ export function useSessionStream(sessionId: string | undefined) {
|
||||
}
|
||||
try {
|
||||
const frame = validated.data as unknown as WsFrame;
|
||||
// v1.11: on a compaction completion, re-fetch the message list so
|
||||
// the new summary row + the cohort of compacted_at-stamped older
|
||||
// rows render correctly. We dispatch the fresh list as a synthetic
|
||||
// 'snapshot' frame so the reducer's existing path handles state
|
||||
// replacement (no need for a parallel "refetched" path).
|
||||
// The toast is purely UX feedback; missing it would still leave
|
||||
// the chat in a valid state.
|
||||
|
||||
if (frame.type === 'channel_delta') {
|
||||
console.error('RAW_PARSE', JSON.stringify(validated.data).slice(0, 200));
|
||||
console.error('CD', frame.channel, frame.seq, JSON.stringify(frame).slice(0, 80));
|
||||
handleChannelDelta(frame);
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.type === 'compacted') {
|
||||
toast.success('Context compacted to free space');
|
||||
void api.messages
|
||||
@@ -291,8 +483,9 @@ export function useSessionStream(sessionId: string | undefined) {
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
setState((s) => applyFrame(s, frame));
|
||||
// Trigger git diff refresh after each completed assistant turn.
|
||||
|
||||
if (frame.type === 'message_complete') {
|
||||
sessionEvents.emit({ type: 'git_diff_refresh' });
|
||||
}
|
||||
@@ -300,15 +493,18 @@ export function useSessionStream(sessionId: string | undefined) {
|
||||
console.warn('bad ws frame', err);
|
||||
}
|
||||
};
|
||||
// v1.8.1: WS errors no longer surface as user-facing toasts here. The
|
||||
// user-channel hook (useUserEvents) owns the debounced "reconnecting…"
|
||||
// UI; this channel just reconnects silently on the same backoff.
|
||||
|
||||
ws.onerror = () => {
|
||||
try { ws.close(); } catch {}
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
if (unmounted) return;
|
||||
setState((s) => ({ ...s, connected: false }));
|
||||
if (stallTimer) {
|
||||
clearInterval(stallTimer);
|
||||
stallTimer = null;
|
||||
}
|
||||
const delay = reconnectDelay;
|
||||
reconnectDelay = Math.min(reconnectDelay * 2, RECONNECT_MAX_MS);
|
||||
reconnectTimer = setTimeout(connect, delay);
|
||||
@@ -320,6 +516,7 @@ export function useSessionStream(sessionId: string | undefined) {
|
||||
return () => {
|
||||
unmounted = true;
|
||||
if (reconnectTimer) clearTimeout(reconnectTimer);
|
||||
if (stallTimer) clearInterval(stallTimer);
|
||||
const ws = wsRef.current;
|
||||
wsRef.current = null;
|
||||
if (ws) try { ws.close(); } catch {}
|
||||
|
||||
Reference in New Issue
Block a user