diff --git a/apps/coder/src/conductor/types.ts b/apps/coder/src/conductor/types.ts index 8da53a9..4231dab 100644 --- a/apps/coder/src/conductor/types.ts +++ b/apps/coder/src/conductor/types.ts @@ -42,6 +42,14 @@ export type StepKind = 'agent' | 'code' | 'approval'; export type TriggerRule = 'all_success' | 'one_success' | 'all_done'; +/** Possible statuses for a flow step (persisted in flow_steps.status). */ +export type StepStatus = 'pending' | 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled' | 'timed_out'; + +/** Retry policy for a step that times out. */ +export interface RetryConfig { + maxRetries: number; +} + export interface Step { /** unique id within the flow; other steps depend on it by this id */ id: string; @@ -59,6 +67,8 @@ export interface Step { run: (ctx: StepContext) => string | Promise; /** optional guard — when it returns false the step is skipped (e.g. no repo) */ when?: (ctx: StepContext) => boolean; + /** max retries on timeout (0 or unset = no retry) */ + maxRetries?: number; } export interface Flow { diff --git a/apps/coder/src/config.ts b/apps/coder/src/config.ts index f588d52..63eb371 100644 --- a/apps/coder/src/config.ts +++ b/apps/coder/src/config.ts @@ -52,6 +52,9 @@ const ConfigSchema = z.object({ ORPHAN_WORKTREE_GRACE_MS: z.coerce.number().int().positive().default(3_600_000), DEEPSEEK_API_KEY: z.string().optional(), DEEPSEEK_BASE_URL: z.string().url().default('https://api.deepseek.com'), + // v2.9.x: flow step timeout (default 5 min). When a 'running' step exceeds + // this duration, it is marked 'timed_out' and may be retried. + FLOW_STEP_TIMEOUT_MS: z.coerce.number().int().positive().default(300_000), }); export type Config = z.infer; diff --git a/apps/coder/src/schema.sql b/apps/coder/src/schema.sql index 6caca52..cdac12d 100644 --- a/apps/coder/src/schema.sql +++ b/apps/coder/src/schema.sql @@ -266,7 +266,7 @@ CREATE INDEX IF NOT EXISTS claude_session_entries_key_idx ON claude_session_entr -- replaces it with the three-value list). ALTER TABLE agent_sessions DROP CONSTRAINT IF EXISTS agent_sessions_backend_chk; ALTER TABLE agent_sessions ADD CONSTRAINT agent_sessions_backend_chk - CHECK (backend IN ('opencode_server', 'acp_warm', 'claude_sdk')); + CHECK (backend IN ('opencode_server', 'acp_warm', 'claude_sdk', 'paseo')); -- LISTEN/NOTIFY fast path: every tasks INSERT (from any call site — routes, -- new_task tool, MCP server) fires pg_notify('tasks_new') in the same @@ -340,11 +340,12 @@ CREATE INDEX IF NOT EXISTS flow_steps_task_id_idx ON flow_steps(task_id); -- edits above are no-ops on the existing DB (CREATE TABLE IF NOT EXISTS skips an -- existing table) — widen via the repo's DROP-IF-EXISTS → guarded-ADD discipline. -- Pure ADD of a new allowed value, so no row UPDATE is needed (no value renamed). +-- v2.9.x: widen status CHECKs to include 'timed_out' for Task State Machine. ALTER TABLE flow_runs DROP CONSTRAINT IF EXISTS flow_runs_status_chk; DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'flow_runs_status_chk') THEN ALTER TABLE flow_runs ADD CONSTRAINT flow_runs_status_chk - CHECK (status IN ('running', 'completed', 'failed', 'cancelled')); + CHECK (status IN ('running', 'completed', 'failed', 'cancelled', 'timed_out')); END IF; END $$; @@ -352,10 +353,14 @@ ALTER TABLE flow_steps DROP CONSTRAINT IF EXISTS flow_steps_status_chk; DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'flow_steps_status_chk') THEN ALTER TABLE flow_steps ADD CONSTRAINT flow_steps_status_chk - CHECK (status IN ('pending', 'running', 'completed', 'failed', 'skipped', 'cancelled')); + CHECK (status IN ('pending', 'running', 'completed', 'failed', 'skipped', 'cancelled', 'timed_out')); END IF; END $$; +-- Task State Machine: retry columns for flow_steps. +ALTER TABLE flow_steps ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0; +ALTER TABLE flow_steps ADD COLUMN IF NOT EXISTS max_retries INTEGER; + -- Arena: battles + contestants + cross_examinations. -- project_id carries no FK (matches tasks.project_id + flow_runs.project_id convention). -- winner_contestant_id FK is deferred (forward reference): added via guarded ALTER below. diff --git a/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts b/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts index acb613b..1b0a23d 100644 --- a/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts +++ b/apps/coder/src/services/__tests__/flow-runner-decisions.test.ts @@ -52,6 +52,7 @@ const emptyState = (over: Partial = {}): SchedulerState => ({ skipped: new Set(), inFlight: new Set(), excluded: new Set(), + timedOut: new Set(), ...over, }); diff --git a/apps/coder/src/services/flow-runner-decisions.ts b/apps/coder/src/services/flow-runner-decisions.ts index 91c3548..3b9f811 100644 --- a/apps/coder/src/services/flow-runner-decisions.ts +++ b/apps/coder/src/services/flow-runner-decisions.ts @@ -33,11 +33,13 @@ export interface SchedulerState { readonly inFlight: ReadonlySet; /** step ids pre-skipped at launch (band/when gating) — never given a row */ readonly excluded: ReadonlySet; + /** step ids that timed out (terminal — no retries remaining or not retriable) */ + readonly timedOut: ReadonlySet; } -/** A dependency is satisfied once it is done, skipped, or excluded. */ +/** A dependency is satisfied once it is done, skipped, excluded, or timed out. */ function isSatisfied(state: SchedulerState, id: string): boolean { - return state.done.has(id) || state.skipped.has(id) || state.excluded.has(id); + return state.done.has(id) || state.skipped.has(id) || state.excluded.has(id) || state.timedOut.has(id); } /** @@ -118,25 +120,50 @@ export function isStuck(flow: Flow, state: SchedulerState): boolean { * - 'mark-cancelled': task was cancelled before the callback ran; propagate so * advance() cancels the run. */ +/** + * True when the step definition allows retries on timeout. + * Pure — no IO. + */ +export function isRetriable(step: { maxRetries?: number }): boolean { + return (step.maxRetries ?? 0) > 0; +} + +/** + * True when the step has retries remaining. + * Pure — no IO. + */ +export function shouldRetry(maxRetries: number | undefined | null, retryCount: number): boolean { + return retryCount < (maxRetries ?? 0); +} + export type ResumeAction = | 'keep' | 're-dispatch' | 'mark-done' | 'mark-failed' - | 'mark-cancelled'; + | 'mark-cancelled' + | 'retry'; /** * Decide what to do with ONE flow step during startup resume (D-9). Pure. * - * @param status - flow_steps.status - * @param taskId - flow_steps.task_id (null for code steps or unstarted agent steps) - * @param taskState - tasks.state for taskId, or null if the task row is absent + * @param status - flow_steps.status + * @param taskId - flow_steps.task_id (null for code steps or unstarted agent steps) + * @param taskState - tasks.state for taskId, or null if the task row is absent + * @param retryCount - flow_steps.retry_count (default 0) + * @param maxRetries - flow_steps.max_retries (null = no retry) */ export function reconcileResumeStep( status: string, taskId: string | null, taskState: string | null, + retryCount?: number, + maxRetries?: number | null, ): ResumeAction { + if (status === 'timed_out') { + if (shouldRetry(maxRetries, retryCount ?? 0)) return 'retry'; + return 'mark-failed'; + } if (status !== 'running') return 'keep'; // Running step: decide by its task's current state. if (!taskId || taskState === null) return 're-dispatch'; // task gone or never created @@ -198,7 +225,7 @@ export function evaluateTriggerRule( * decision per step. Pure — no IO. */ export function reconcileRun( - steps: ReadonlyArray<{ stepId: string; taskId: string | null; status: string }>, + steps: ReadonlyArray<{ stepId: string; taskId: string | null; status: string; retryCount?: number; maxRetries?: number | null }>, taskStates: ReadonlyMap, ): StepResumeDecision[] { return steps.map((step) => ({ @@ -207,6 +234,8 @@ export function reconcileRun( step.status, step.taskId, step.taskId ? (taskStates.get(step.taskId) ?? null) : null, + step.retryCount, + step.maxRetries, ), })); } diff --git a/apps/coder/src/services/flow-runner.ts b/apps/coder/src/services/flow-runner.ts index 1347d80..a4bda15 100644 --- a/apps/coder/src/services/flow-runner.ts +++ b/apps/coder/src/services/flow-runner.ts @@ -100,6 +100,9 @@ interface FlowStepRow { status: string; chat_id: string | null; output: string | null; + updated_at: string | null; + retry_count: number | null; + max_retries: number | null; } export function createFlowRunner(deps: Deps): FlowRunner { @@ -263,7 +266,8 @@ export function createFlowRunner(deps: Deps): FlowRunner { const dispatch: DispatchFn = (agent, task) => dispatchSubAgent(run.project_id, model, agent, task); const rows = await sql` - SELECT step_id, kind, agent, status, chat_id, output FROM flow_steps WHERE run_id = ${runId} + SELECT step_id, kind, agent, status, chat_id, output, updated_at, retry_count, max_retries + FROM flow_steps WHERE run_id = ${runId} `; // Re-derive the excluded set (band/when pre-skips) from the flow def + input — @@ -275,6 +279,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { const done = new Set(); const skipped = new Set(); const inFlight = new Set(); + const timedOut = new Set(); const results: Record = {}; for (const r of rows) { switch (r.status) { @@ -288,6 +293,9 @@ export function createFlowRunner(deps: Deps): FlowRunner { case 'running': inFlight.add(r.step_id); break; + case 'timed_out': + timedOut.add(r.step_id); + break; case 'failed': // A failed worker makes the deterministic report untrustworthy — fail the // whole run (matches the Phase-1 CLI, which throws on a dispatch failure). @@ -300,10 +308,68 @@ export function createFlowRunner(deps: Deps): FlowRunner { } } + // ─── Timeout detection ─────────────────────────────────────────────────────── + // Check running steps. If a step has been 'running' longer than + // FLOW_STEP_TIMEOUT_MS, mark it timed_out or re-dispatch if retriable. + const timeoutMs = config.FLOW_STEP_TIMEOUT_MS; + const nowDate = new Date(); + let detectedTimedOut = false; + for (const r of rows) { + if (r.status !== 'running') continue; + if (!r.updated_at) continue; + const elapsed = nowDate.getTime() - new Date(r.updated_at).getTime(); + if (elapsed <= timeoutMs) continue; + + // Step has exceeded the timeout + detectedTimedOut = true; + const retryCount = r.retry_count ?? 0; + const maxRetries = r.max_retries ?? 0; + + if (maxRetries > 0 && retryCount < maxRetries) { + // Retriable: re-dispatch the step with an incremented retry_count + const step = flow.steps.find((s) => s.id === r.step_id); + if (!step || step.kind !== 'agent') { + // Non-agent steps can't be retried via dispatch + inFlight.delete(r.step_id); + await failRun(runId, flow, input, model, + `step '${r.step_id}' timed out (non-retriable kind)`, r.step_id); + return; + } + inFlight.delete(r.step_id); + await sql` + UPDATE flow_steps + SET retry_count = ${retryCount + 1}, updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${r.step_id} AND status = 'running' + `; + await dispatchAgentStep(runId, run.project_id, model, step, ctx); + inFlight.add(r.step_id); + log.warn({ runId, stepId: r.step_id, retry: retryCount + 1, maxRetries }, + 'flow-runner: step timed out, retrying'); + } else { + // Not retriable — mark as timed_out, fail the run + inFlight.delete(r.step_id); + await sql` + UPDATE flow_steps SET status = 'timed_out', updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${r.step_id} AND status = 'running' + `; + timedOut.add(r.step_id); + publishStep(runId, r.step_id, 'timed_out'); + await failRun(runId, flow, input, model, + `step '${r.step_id}' timed out`, r.step_id); + return; + } + } + + // If we modified any steps, re-query so the state sets reflect the latest DB. + if (detectedTimedOut) { + // Continue with the in-memory state we already adjusted above (inFlight/timedOut + // were mutated directly). No re-query needed. + } + // Drain ready skips + code steps (synchronous), re-evaluating after each batch, // then dispatch the full ready agent wave and wait for their terminal callbacks. for (;;) { - const state: SchedulerState = { done, skipped, inFlight, excluded }; + const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut }; if (isRunComplete(flow, state)) { await finishRun(runId, flow, input, results, model, dispatch); @@ -545,7 +611,7 @@ export function createFlowRunner(deps: Deps): FlowRunner { function publishStep( runId: string, stepId: string, - status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled' | 'blocked', + status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled' | 'blocked' | 'timed_out', extra?: { run_status?: 'running' | 'completed' | 'failed' | 'cancelled'; report?: string }, ): void { publishUser({ @@ -683,6 +749,38 @@ export function createFlowRunner(deps: Deps): FlowRunner { log.info({ runId, stepId: step.step_id, taskId: task!.id }, 'flow-runner: step re-dispatched on resume'); break; } + + case 'retry': { + // Like re-dispatch but increments retry_count and sets status to 'running'. + if (!step.input) { + await sql` + UPDATE flow_steps + SET status = 'failed', error = 'retry: no stored prompt', + updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${step.step_id} + `; + break; + } + const chatIdR = step.chat_id; + const [chatR] = chatIdR + ? await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatIdR}` + : []; + const sessionIdR = chatR?.session_id ?? null; + const [taskR] = await sql<{ id: string }[]>` + INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id) + VALUES (${projectId}, ${step.input}, 'qwen', ${model}, 'plan', ${sessionIdR}, ${chatIdR}) + RETURNING id + `; + await sql` + UPDATE flow_steps + SET task_id = ${taskR!.id}, retry_count = retry_count + 1, status = 'running', + updated_at = clock_timestamp() + WHERE run_id = ${runId} AND step_id = ${step.step_id} + `; + log.info({ runId, stepId: step.step_id, taskId: taskR!.id }, + 'flow-runner: step retried on resume'); + break; + } } } @@ -697,7 +795,9 @@ export function createFlowRunner(deps: Deps): FlowRunner { status: string; chat_id: string | null; input: string | null; - }[]>`SELECT step_id, task_id, status, chat_id, input FROM flow_steps WHERE run_id = ${run.id}`; + retry_count: number | null; + max_retries: number | null; + }[]>`SELECT step_id, task_id, status, chat_id, input, retry_count, max_retries FROM flow_steps WHERE run_id = ${run.id}`; // Load task states for all referenced tasks in one query. const taskIds = rows.map((r) => r.task_id).filter((id): id is string => id !== null); @@ -710,7 +810,13 @@ export function createFlowRunner(deps: Deps): FlowRunner { } const decisions = reconcileRun( - rows.map((r) => ({ stepId: r.step_id, taskId: r.task_id, status: r.status })), + rows.map((r) => ({ + stepId: r.step_id, + taskId: r.task_id, + status: r.status, + retryCount: r.retry_count ?? undefined, + maxRetries: r.max_retries, + })), taskStates, ); @@ -752,13 +858,13 @@ export function createFlowRunner(deps: Deps): FlowRunner { // Mark all non-terminal steps cancelled and collect in-flight task_ids. const steps = await sql<{ step_id: string; task_id: string | null; kind: string }[]>` SELECT step_id, task_id, kind FROM flow_steps - WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped') + WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped', 'timed_out') `; if (steps.length > 0) { await sql` UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp() - WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped') + WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped', 'timed_out') `; for (const s of steps) { if (s.kind === 'agent') publishStep(runId, s.step_id, 'cancelled', { run_status: 'cancelled' }); diff --git a/apps/server/src/services/memory/index.ts b/apps/server/src/services/memory/index.ts index 9b1fae1..b0c85cb 100644 --- a/apps/server/src/services/memory/index.ts +++ b/apps/server/src/services/memory/index.ts @@ -3,4 +3,9 @@ export { formatMemoryBlock } from './prompt.js'; export { scanMemoryScopes } from './scan.js'; export { parseMemoryEntries } from './entries.js'; export { ensureMemoryScaffold, getMemoryRoot } from './paths.js'; +export { ContextTier } from './context-tier.js'; +export { DeepDream } from './deep-dream.js'; +export { CoreTier } from './core-tier.js'; export type { MemoryEntry } from './entries.js'; +export type { ContextTierConfig, ConversationTurn } from './context-tier.js'; +export type { CoreTierEntry, CoreTierSearchResult, CoreTierSearchOptions } from './core-tier.js'; diff --git a/packages/contracts/src/ws-frames.ts b/packages/contracts/src/ws-frames.ts index ff575ff..a0f4e81 100644 --- a/packages/contracts/src/ws-frames.ts +++ b/packages/contracts/src/ws-frames.ts @@ -355,7 +355,7 @@ export const FlowRunStepUpdatedFrame = z.object({ type: z.literal('flow_run_step_updated'), run_id: Uuid, step_id: z.string().min(1), - status: z.enum(['pending', 'running', 'completed', 'failed', 'skipped', 'cancelled']), + status: z.enum(['pending', 'running', 'completed', 'failed', 'skipped', 'cancelled', 'timed_out']), run_status: z.enum(['running', 'completed', 'failed', 'cancelled']).optional(), report: z.string().optional(), }); @@ -446,6 +446,21 @@ export const ToolTraceFinishFrame = z.object({ finished_at: z.string().datetime(), }); +// ---- collision warning frame (v2.8) ---------------------------------------- +// +// Published when the BooCoder detects that multiple worktrees/agents are editing +// the same file concurrently. Advisory only — writes are not blocked. + +const ConflictSeverityValue = z.enum(['same_line', 'adjacent_line', 'different_area']); + +export const CollisionWarningFrame = z.object({ + type: z.literal('collision_warning'), + file_path: z.string().min(1), + worktrees: z.array(z.string().min(1)), + agents: z.array(z.string().min(1)), + severity: ConflictSeverityValue, +}); + // ---- channel-delta frames (streaming v2) ---------------------------------- // // Each channel frame carries a monotonic `seq` counter so the client can @@ -507,7 +522,18 @@ const ChannelDeltaPayload = z.discriminatedUnion('channel', [ export const ChannelDeltaFrame = z.object({ type: z.literal('channel_delta'), seq: z.number().int().nonnegative(), - ...ChannelDeltaPayload.shape, + channel: z.union([ + z.literal('text'), z.literal('tool_call'), + z.literal('tool_result'), z.literal('status'), z.literal('error'), + ]), + message_id: Uuid.optional(), + chat_id: Uuid.optional(), + content: z.string().optional(), + tool_call: ToolCallShape.optional(), + tool_message_id: Uuid.optional(), + tool_call_id: ToolCallId.optional(), + output: z.unknown().optional(), + truncated: z.boolean().optional(), }); // ---- discriminated union --------------------------------------------------- @@ -541,6 +567,8 @@ export const WsFrameSchema = z.discriminatedUnion('type', [ // tool trace ToolTraceStartFrame, ToolTraceFinishFrame, + // collision warning + CollisionWarningFrame, // channel-delta (streaming v2) ChannelDeltaFrame, // per-user @@ -593,6 +621,7 @@ export const KNOWN_FRAME_TYPES: readonly WsFrame['type'][] = [ 'battle_updated', 'tool_trace_start', 'tool_trace_finish', + 'collision_warning', 'channel_delta', 'chat_status', 'session_updated',