// P5: Optional declarative state graph engine for the inference turn loop. // // Replaces the procedural `while (stepNumber < effectiveCap)` in turn.ts // with a node-based execution model. Default OFF via // session.state_graph_enabled — zero behavior change when disabled. // // Nodes wrap EXISTING infrastructure (no new I/O patterns): // PLAN → top-of-loop gate, compaction, loadContext, buildMessagesPayload, // executeStreamPhase // CALL_TOOL → executeToolPhase // OBSERVE → process tool results, update loop locals // REFLECT → decidePostToolAction, sentinel insertion, mistake tracker // SYNTHESIZE → terminal (graph loop exits) import type { Agent, Project, Session, ToolCall } from '../../types/api.js'; import { resolveProjectRoot } from '../path_guard.js'; import { rewriteSearchQuery } from '../task-search-rewrite.js'; import * as compaction from '../compaction.js'; import { decideStep, decidePostToolAction } from './step-decision.js'; import { recordStep, MISTAKE_RECOVERY_NOTE, type MistakeState, } from './mistake-tracker.js'; import { buildMessagesPayload, loadContext, } from './payload.js'; import { toDcpMessages, transformMessages, fromDcpMessages } from './dcp/index.js'; import { finalizeCompletion, finalizeEmpty, handleAbortOrError, } from './error-handler.js'; import { executeStreamPhase, } from './stream-phase.js'; import { executeToolPhase, type ToolPhaseResult } from './tool-phase.js'; import type { InferenceContext, StreamPhaseState, StreamResult, TurnArgs, } from './types.js'; import { runCapHitSummary, runDoomLoopSummary, insertMistakeRecoverySentinel, } from './sentinel-summaries.js'; import { execFile } from 'node:child_process'; import { readFileSync, existsSync } from 'node:fs'; import { join } from 'node:path'; const BUILD_TIMEOUT_MS = 60_000; const BUILD_OUTPUT_CAP = 8_000; async function detectAndRunBuild( ctx: InferenceContext, projectRoot: string, sessionId: string, chatId: string, model: string, existingNote: string | undefined, ): Promise { if (!model.startsWith('deepseek-')) return undefined; const pkgPath = join(projectRoot, 'package.json'); if (!existsSync(pkgPath)) return undefined; let buildCmd: string | null = null; try { const pkg = JSON.parse(readFileSync(pkgPath, 'utf8')) as { scripts?: Record }; if (pkg.scripts?.build) buildCmd = 'build'; else if (pkg.scripts?.compile) buildCmd = 'compile'; else if (pkg.scripts?.typecheck) buildCmd = 'typecheck'; } catch { return undefined; } if (!buildCmd) return undefined; const hasPnpm = existsSync(join(projectRoot, 'pnpm-lock.yaml')); const hasYarn = existsSync(join(projectRoot, 'yarn.lock')); const pm = hasPnpm ? 'pnpm' : hasYarn ? 'yarn' : 'npm'; try { const out = await new Promise((resolve, reject) => { execFile(pm, ['run', buildCmd!], { cwd: projectRoot, timeout: BUILD_TIMEOUT_MS, maxBuffer: BUILD_OUTPUT_CAP * 2 }, (err, stdout, stderr) => { if (err && (err as NodeJS.ErrnoException).code === 'ENOENT') { resolve(''); return; } const merged = (stdout + '\n' + stderr).trim(); resolve(merged.slice(0, BUILD_OUTPUT_CAP)); }, ); }); if (!out) return undefined; ctx.log.info({ sessionId, chatId, buildCmd, outputLen: out.length }, 'auto-fix: build failed'); const combined = existingNote ? existingNote + '\n\n--- Build error ---\n' + out.slice(0, BUILD_OUTPUT_CAP - existingNote.length) : '--- Build error ---\n' + out.slice(0, BUILD_OUTPUT_CAP); return combined; } catch { return undefined; } } // -- Types ---------------------------------------------------------------- export type GraphNodeType = 'PLAN' | 'CALL_TOOL' | 'OBSERVE' | 'REFLECT' | 'SYNTHESIZE'; export interface GraphState { stepNumber: number; toolsUsed: number; recentToolCalls: ToolCall[]; assistantMessageId: string; mistakeTracker: MistakeState; pendingRecoveryNote?: string; effectiveCap: number; budget: number; projectRoot: string; iterSession?: Session; iterProject?: Project; streamResult?: StreamResult; startedAt?: string | null; toolPhaseResult?: ToolPhaseResult; shouldStop: boolean; } interface GraphNode { type: GraphNodeType; edges: Array<{ to: GraphNodeType; condition: (state: GraphState) => boolean }>; execute: ( ctx: InferenceContext, args: TurnArgs, state: GraphState, agent: Agent | null, ) => Promise; } export interface GraphResult { stepNumber: number; assistantMessageId: string; toolsUsed: number; recentToolCalls: ToolCall[]; mistakeTracker: MistakeState; } // -- Default graph -------------------------------------------------------- export function createDefaultGraph(): GraphNode[] { return [ { type: 'PLAN', edges: [ { to: 'CALL_TOOL', condition: (s) => !!s.streamResult && s.streamResult.toolCalls.length > 0 }, { to: 'SYNTHESIZE', condition: () => true }, ], execute: planNode, }, { type: 'CALL_TOOL', edges: [ { to: 'OBSERVE', condition: () => true }, ], execute: callToolNode, }, { type: 'OBSERVE', edges: [ { to: 'REFLECT', condition: (s) => s.toolPhaseResult?.action === 'continue' }, { to: 'SYNTHESIZE', condition: () => true }, ], execute: observeNode, }, { type: 'REFLECT', edges: [ { to: 'PLAN', condition: (s) => s.stepNumber < s.effectiveCap }, { to: 'SYNTHESIZE', condition: () => true }, ], execute: reflectNode, }, { type: 'SYNTHESIZE', edges: [], execute: async () => {}, }, ]; } // -- Graph runner --------------------------------------------------------- export async function runGraph( ctx: InferenceContext, args: TurnArgs, extra: { effectiveCap: number; budget: number; agent: Agent | null; projectRoot: string }, ): Promise { const { effectiveCap, budget, agent } = extra; const state: GraphState = { stepNumber: 0, toolsUsed: args.toolsUsed, recentToolCalls: args.recentToolCalls, assistantMessageId: args.assistantMessageId, mistakeTracker: args.mistakeTracker, pendingRecoveryNote: args.pendingRecoveryNote, effectiveCap, budget, projectRoot: extra.projectRoot, shouldStop: false, }; const graph = createDefaultGraph(); let currentNode: GraphNodeType = 'PLAN'; while (currentNode !== 'SYNTHESIZE' && !state.shouldStop) { const node = graph.find((n) => n.type === currentNode)!; await node.execute(ctx, args, state, agent); if (state.shouldStop) break; const nextEdge = node.edges.find((e) => e.condition(state)); if (!nextEdge) break; currentNode = nextEdge.to; } return { stepNumber: state.stepNumber, assistantMessageId: state.assistantMessageId, toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, mistakeTracker: state.mistakeTracker, }; } // -- PLAN node ------------------------------------------------------------ // Top-of-loop gate → compaction → loadContext → DCP → buildPayload → stream async function planNode( ctx: InferenceContext, args: TurnArgs, state: GraphState, agent: Agent | null, ): Promise { const { sessionId, chatId, signal } = args; // 1. Top-of-loop gate: doom-loop, then budget (pure decisions) const decision = decideStep({ recentToolCalls: state.recentToolCalls, toolsUsed: state.toolsUsed, budget: state.budget, }); if (decision.kind === 'doom') { const loaded = await loadContext(ctx.sql, sessionId, chatId); if (loaded) { const dlSession = args.modelOverride ? { ...loaded.session, model: args.modelOverride } : loaded.session; const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId: state.assistantMessageId, toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, mistakeTracker: state.mistakeTracker, signal, }; await runDoomLoopSummary(ctx, iterArgs, dlSession, loaded.project, loaded.history, agent, decision.loop); } state.shouldStop = true; return; } if (decision.kind === 'budget') { const loaded = await loadContext(ctx.sql, sessionId, chatId); if (loaded) { const bhSession = args.modelOverride ? { ...loaded.session, model: args.modelOverride } : loaded.session; const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId: state.assistantMessageId, toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, mistakeTracker: state.mistakeTracker, signal, }; await runCapHitSummary(ctx, iterArgs, bhSession, loaded.project, loaded.history, agent, state.budget); } state.shouldStop = true; return; } // decision.kind === 'stream' → proceed. // 2. Compaction check const chatFlag = await ctx.sql<{ needs_compaction: boolean }[]>` SELECT needs_compaction FROM chats WHERE id = ${chatId} `; if (chatFlag[0]?.needs_compaction) { try { await compaction.process({ sql: ctx.sql, config: ctx.config, log: ctx.log, broker: ctx.broker, chatId, hooks: ctx.hooks, }); } catch (err) { ctx.log.warn({ err, chatId }, 'auto-compaction failed; clearing flag and proceeding'); await ctx.sql`UPDATE chats SET needs_compaction = false WHERE id = ${chatId}`; } } // 3. Load context (must re-load each iteration — new messages) const loaded = await loadContext(ctx.sql, sessionId, chatId); if (!loaded) { ctx.log.warn({ sessionId }, 'inference: session or project missing mid-loop'); state.shouldStop = true; return; } let { session: iterSession, project: iterProject, history } = loaded; if (args.modelOverride) { iterSession = { ...iterSession, model: args.modelOverride }; } state.iterSession = iterSession; state.iterProject = iterProject; const projectRoot = await resolveProjectRoot(iterProject.path); state.projectRoot = projectRoot; // 4. DCP transform try { const dcpMsgs = toDcpMessages(history); const { messages: pruned, stats } = transformMessages(chatId, dcpMsgs); if (stats.removedCount > 0) { ctx.log.info({ chatId, ...stats }, 'dcp: transform removed messages'); history = fromDcpMessages(pruned) as typeof history; } } catch (err) { ctx.log.warn({ err: err instanceof Error ? err.message : String(err), chatId }, 'dcp: transform skipped'); } // 5. Log step boundary ctx.log.info( { sessionId, chatId, step: state.stepNumber, assistantMessageId: state.assistantMessageId }, 'step_start', ); // 6. Build messages + stream phase const messages = await buildMessagesPayload(iterSession, iterProject, history, agent, ctx.log); const webToolsEnabled = iterSession.web_search_enabled ?? iterProject.default_web_search_enabled ?? false; if (state.stepNumber === 0 && webToolsEnabled && messages.length >= 2) { const lastUserMsg = [...messages].reverse().find((m) => m.role === 'user'); if (lastUserMsg?.content) { const hint = await rewriteSearchQuery(lastUserMsg.content); if (hint && messages[0]?.role === 'system' && messages[0].content) { messages[0].content += `\n\nThe user's search intent can be summarized as: "${hint}"`; } } } if (state.pendingRecoveryNote) { messages.push({ role: 'system', content: state.pendingRecoveryNote }); state.pendingRecoveryNote = undefined; } // 7. Stream phase const iterArgs: TurnArgs = { sessionId, chatId, assistantMessageId: state.assistantMessageId, toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, mistakeTracker: state.mistakeTracker, signal, }; const streamState: StreamPhaseState = { accumulated: '', startedAt: null }; try { const result = await executeStreamPhase(ctx, iterArgs, iterSession, messages, streamState, agent, webToolsEnabled); state.streamResult = result; state.startedAt = streamState.startedAt; // Non-tool finish: Stop hook + finalize here (edge from PLAN → SYNTHESIZE // will break the graph loop after this node returns). if (result.toolCalls.length === 0) { if (ctx.hooks) { ctx.hooks.run('Stop', { event: 'Stop', session_id: sessionId, chat_id: chatId, last_assistant_text: result.content.slice(0, 500), turn: state.stepNumber, }).catch(() => {}); } await finalizeCompletion(ctx, iterArgs, result, streamState.startedAt, iterSession); } } catch (err) { await handleAbortOrError(ctx, iterArgs, streamState.accumulated, err); state.shouldStop = true; } } // -- CALL_TOOL node ------------------------------------------------------- // Executes the tool phase and stores the result for OBSERVE. async function callToolNode( ctx: InferenceContext, args: TurnArgs, state: GraphState, agent: Agent | null, ): Promise { const { sessionId, chatId } = args; const result = state.streamResult; if (!result) { ctx.log.warn({ sessionId }, 'state-graph: CALL_TOOL without stream result'); state.shouldStop = true; return; } const session = state.iterSession; if (!session) { ctx.log.warn({ sessionId }, 'state-graph: CALL_TOOL without iterSession'); state.shouldStop = true; return; } try { state.toolPhaseResult = await executeToolPhase( ctx, args, result, state.startedAt ?? null, session, state.projectRoot, agent, state.stepNumber, ); } catch (err) { ctx.log.error({ err, sessionId, chatId, step: state.stepNumber }, 'tool phase threw unexpectedly'); state.shouldStop = true; } } // -- OBSERVE node --------------------------------------------------------- // Processes tool results: updates loop locals, mistake tracking, build errors. async function observeNode( ctx: InferenceContext, args: TurnArgs, state: GraphState, _agent: Agent | null, ): Promise { const { sessionId, chatId } = args; const tpr = state.toolPhaseResult; if (!tpr) { state.shouldStop = true; return; } // Update loop locals (mirrors the existing while-loop post-tool logic) state.toolsUsed += tpr.toolCallCount; state.recentToolCalls = [...state.recentToolCalls, ...tpr.toolCalls]; state.stepNumber++; // Fold tool outcomes into the mistake tracker for (const o of tpr.outcomes) { recordStep(state.mistakeTracker, o); } // Auto-fix: after write tools, attempt build and inject errors. const WRITE_TOOLS = new Set(['edit_file', 'create_file', 'delete_file', 'apply_pending']); const hasWriteTools = tpr.toolCalls.some((tc) => WRITE_TOOLS.has(tc.name)); if (hasWriteTools && state.iterSession) { detectAndRunBuild(ctx, state.projectRoot, sessionId, chatId, state.iterSession.model, state.pendingRecoveryNote) .then((buildError) => { if (buildError) state.pendingRecoveryNote = buildError; }) .catch(() => {}); } } // -- REFLECT node --------------------------------------------------------- // Post-tool decision: decidePostToolAction, nudge/escalate/continue handling. async function reflectNode( ctx: InferenceContext, args: TurnArgs, state: GraphState, _agent: Agent | null, ): Promise { const { sessionId, chatId, signal } = args; const tpr = state.toolPhaseResult; if (!tpr) { state.shouldStop = true; return; } const post = decidePostToolAction(tpr.action, state.mistakeTracker); if (post === 'stop') { state.shouldStop = true; return; } if (post === 'nudge') { state.pendingRecoveryNote = MISTAKE_RECOVERY_NOTE; const failureKinds = [...state.mistakeTracker.run]; await insertMistakeRecoverySentinel(ctx, sessionId, chatId, { failureKinds, count: failureKinds.length, escalated: false, canContinue: true, }); state.mistakeTracker.nudges += 1; state.mistakeTracker.run = []; ctx.log.info( { sessionId, chatId, step: state.stepNumber, nudges: state.mistakeTracker.nudges, failureKinds }, 'mistake_recovery nudge', ); // Continue to next PLAN node — edges check step < cap. if (state.assistantMessageId !== tpr.nextAssistantId && tpr.nextAssistantId) { state.assistantMessageId = tpr.nextAssistantId; } return; } if (post === 'escalate') { const failureKinds = [...state.mistakeTracker.run]; if (tpr.nextAssistantId) { state.assistantMessageId = tpr.nextAssistantId; } const escalateArgs: TurnArgs = { sessionId, chatId, assistantMessageId: state.assistantMessageId, toolsUsed: state.toolsUsed, recentToolCalls: state.recentToolCalls, mistakeTracker: state.mistakeTracker, signal, }; await finalizeEmpty(ctx, escalateArgs); await insertMistakeRecoverySentinel(ctx, sessionId, chatId, { failureKinds, count: failureKinds.length, escalated: true, canContinue: true, }); ctx.publishUser({ type: 'chat_status', chat_id: chatId, status: 'idle', at: new Date().toISOString() }); ctx.log.info( { sessionId, chatId, step: state.stepNumber, failureKinds }, 'mistake_recovery escalate — stopping turn', ); state.shouldStop = true; return; } // 'continue' — advance to next assistant message. if (tpr.nextAssistantId) { state.assistantMessageId = tpr.nextAssistantId; } }