Compare commits
4 Commits
v2.8.19-pa
...
v2.8.23-wa
| Author | SHA1 | Date | |
|---|---|---|---|
| 74da084521 | |||
| c860b6c4b7 | |||
| c4ee377dbc | |||
| f2401352a8 |
@@ -5,6 +5,7 @@ import { getPool, closeDb } from './db.js';
|
||||
import { registerHealthRoutes } from './routes/health.js';
|
||||
import { registerTerminalRoutes } from './routes/terminals.js';
|
||||
import { registerSessionRoutes } from './routes/sessions.js';
|
||||
import { registerSearchRoutes } from './routes/search.js';
|
||||
import { registerWsAttachRoute } from './ws/attach.js';
|
||||
|
||||
async function main(): Promise<void> {
|
||||
@@ -35,6 +36,7 @@ async function main(): Promise<void> {
|
||||
registerHealthRoutes(app);
|
||||
registerTerminalRoutes(app, config.TMUX_CONF_PATH);
|
||||
registerSessionRoutes(app);
|
||||
registerSearchRoutes(app, config.TMUX_CONF_PATH);
|
||||
registerWsAttachRoute(app, config.TMUX_CONF_PATH);
|
||||
|
||||
const shutdown = async (signal: string) => {
|
||||
|
||||
@@ -33,6 +33,7 @@ export function register(
|
||||
|
||||
export function unregister(paneId: string): void {
|
||||
sessions.delete(paneId);
|
||||
ringBuffers.delete(paneId);
|
||||
}
|
||||
|
||||
export function list(): SessionMeta[] {
|
||||
@@ -42,3 +43,120 @@ export function list(): SessionMeta[] {
|
||||
export function get(paneId: string): SessionMeta | undefined {
|
||||
return sessions.get(paneId);
|
||||
}
|
||||
|
||||
// ── Ring buffer for PTY output search ──────────────────────────────────────
|
||||
|
||||
export interface SearchMatch {
|
||||
line: number;
|
||||
content: string;
|
||||
contextBefore: string[];
|
||||
contextAfter: string[];
|
||||
}
|
||||
|
||||
const ringBuffers = new Map<string, string[]>();
|
||||
|
||||
/**
|
||||
* Append raw PTY data to the ring buffer for a given pane.
|
||||
* Splits incoming data on newlines and pushes each line into the buffer,
|
||||
* trimming to `maxLines` (default 5000) from the tail.
|
||||
*/
|
||||
export function appendOutput(
|
||||
paneId: string,
|
||||
data: string,
|
||||
maxLines: number = 5000,
|
||||
): void {
|
||||
let buf = ringBuffers.get(paneId);
|
||||
if (!buf) {
|
||||
buf = [];
|
||||
ringBuffers.set(paneId, buf);
|
||||
}
|
||||
|
||||
// Split on newlines — each chunk may contain multiple complete lines and
|
||||
// potentially a trailing partial line (which we store as-is; the next chunk
|
||||
// will either complete it or be another partial).
|
||||
const lines = data.split('\n');
|
||||
|
||||
// The first element of `lines` may be a continuation of the last partial
|
||||
// line from the previous append. If the buffer is non-empty and the last
|
||||
// stored entry is a partial (no trailing newline previously), glue them.
|
||||
// We detect "partial" by checking whether `data` ended with '\n' — if it
|
||||
// did, the last element after split is '' (empty) which we drop.
|
||||
const endedWithNewline = data.endsWith('\n');
|
||||
if (endedWithNewline) {
|
||||
// The final empty-string element is discarded.
|
||||
lines.pop();
|
||||
}
|
||||
|
||||
if (buf.length > 0 && lines.length > 0) {
|
||||
// Concatenate the last partial line in the buffer with the first split
|
||||
// segment. This avoids splitting ANSI sequences or text across chunks.
|
||||
buf[buf.length - 1] = (buf[buf.length - 1] ?? '') + (lines[0] ?? '');
|
||||
lines.shift();
|
||||
}
|
||||
|
||||
for (const line of lines) {
|
||||
buf.push(line);
|
||||
}
|
||||
|
||||
// Trim from head if over maxLines
|
||||
if (buf.length > maxLines) {
|
||||
buf = buf.slice(buf.length - maxLines);
|
||||
ringBuffers.set(paneId, buf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Search the ring buffer for a pane using a regex pattern.
|
||||
* Returns matches with optional context lines before and after each match.
|
||||
*/
|
||||
export function searchRingBuffer(
|
||||
paneId: string,
|
||||
pattern: string,
|
||||
opts?: { limit?: number; context?: number },
|
||||
): SearchMatch[] {
|
||||
const buf = ringBuffers.get(paneId);
|
||||
if (!buf || buf.length === 0) return [];
|
||||
|
||||
const limit = opts?.limit ?? 50;
|
||||
const context = opts?.context ?? 0;
|
||||
|
||||
let re: RegExp;
|
||||
try {
|
||||
re = new RegExp(pattern, 'u');
|
||||
} catch {
|
||||
return []; // invalid regex — caller should validate, but be defensive
|
||||
}
|
||||
|
||||
const results: SearchMatch[] = [];
|
||||
|
||||
for (let i = 0; i < buf.length; i++) {
|
||||
if (results.length >= limit) break;
|
||||
if (re.test(buf[i]!)) {
|
||||
const contextBefore: string[] = [];
|
||||
const contextAfter: string[] = [];
|
||||
for (let c = 1; c <= context; c++) {
|
||||
const ci = i - c;
|
||||
if (ci >= 0) contextBefore.unshift(buf[ci]!);
|
||||
}
|
||||
for (let c = 1; c <= context; c++) {
|
||||
const ci = i + c;
|
||||
if (ci < buf.length) contextAfter.push(buf[ci]!);
|
||||
}
|
||||
results.push({
|
||||
line: i + 1, // 1-based line number for display
|
||||
content: buf[i]!,
|
||||
contextBefore,
|
||||
contextAfter,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the ring buffer for a pane. Called on session kill / pane close.
|
||||
*/
|
||||
export function clearBuffer(paneId: string): void {
|
||||
ringBuffers.delete(paneId);
|
||||
}
|
||||
|
||||
167
apps/booterm/src/routes/search.ts
Normal file
167
apps/booterm/src/routes/search.ts
Normal file
@@ -0,0 +1,167 @@
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import { z } from 'zod';
|
||||
import { sanitizeId, tmuxSessionName, capturePane } from '../pty/manager.js';
|
||||
import { searchRingBuffer, clearBuffer } from '../pty/registry.js';
|
||||
|
||||
const ParamsSchema = z.object({
|
||||
sid: z.string(),
|
||||
pid: z.string(),
|
||||
});
|
||||
|
||||
const MAX_PATTERN_LENGTH = 200;
|
||||
|
||||
// Zod-refined string: reject empty and overly-long patterns to prevent ReDoS
|
||||
const PatternQuerySchema = z
|
||||
.string()
|
||||
.min(1, 'pattern is required')
|
||||
.max(MAX_PATTERN_LENGTH, `pattern must not exceed ${MAX_PATTERN_LENGTH} characters`);
|
||||
|
||||
const QuerySchema = z.object({
|
||||
pattern: PatternQuerySchema,
|
||||
limit: z.coerce.number().int().min(1).max(500).default(50),
|
||||
context: z.coerce.number().int().min(0).max(50).default(0),
|
||||
});
|
||||
|
||||
interface SearchMatch {
|
||||
line: number;
|
||||
content: string;
|
||||
contextBefore: string[];
|
||||
contextAfter: string[];
|
||||
}
|
||||
|
||||
interface SearchResponse {
|
||||
matches: SearchMatch[];
|
||||
total: number;
|
||||
truncated: boolean;
|
||||
source: 'ring' | 'capture';
|
||||
}
|
||||
|
||||
/**
|
||||
* Search a captured pane buffer using a regex. This is the fallback path
|
||||
* when the ring buffer doesn't have enough matches.
|
||||
*/
|
||||
function grepBuffer(
|
||||
text: string,
|
||||
pattern: string,
|
||||
limit: number,
|
||||
context: number,
|
||||
): SearchMatch[] {
|
||||
let re: RegExp;
|
||||
try {
|
||||
re = new RegExp(pattern, 'u');
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
|
||||
const lines = text.split('\n');
|
||||
const results: SearchMatch[] = [];
|
||||
|
||||
for (let i = 0; i < lines.length; i++) {
|
||||
if (results.length >= limit) break;
|
||||
if (re.test(lines[i]!)) {
|
||||
const contextBefore: string[] = [];
|
||||
const contextAfter: string[] = [];
|
||||
for (let c = 1; c <= context; c++) {
|
||||
const ci = i - c;
|
||||
if (ci >= 0) contextBefore.unshift(lines[ci]!);
|
||||
}
|
||||
for (let c = 1; c <= context; c++) {
|
||||
const ci = i + c;
|
||||
if (ci < lines.length) contextAfter.push(lines[ci]!);
|
||||
}
|
||||
results.push({
|
||||
line: i + 1,
|
||||
content: lines[i]!,
|
||||
contextBefore,
|
||||
contextAfter,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
export function registerSearchRoutes(app: FastifyInstance, tmuxConfPath: string): void {
|
||||
app.get<{
|
||||
Params: { sid: string; pid: string };
|
||||
Querystring: { pattern?: string; limit?: string; context?: string };
|
||||
}>(
|
||||
'/api/term/sessions/:sid/panes/:pid/search',
|
||||
async (req, reply) => {
|
||||
const p = ParamsSchema.safeParse(req.params);
|
||||
if (!p.success) return reply.code(400).send({ error: 'bad_params' });
|
||||
|
||||
const sid = sanitizeId(p.data.sid);
|
||||
const pid = sanitizeId(p.data.pid);
|
||||
if (!sid || !pid) return reply.code(400).send({ error: 'bad_id_format' });
|
||||
|
||||
const q = QuerySchema.safeParse(req.query);
|
||||
if (!q.success) {
|
||||
return reply.code(400).send({
|
||||
error: 'bad_query',
|
||||
details: q.error.flatten().fieldErrors,
|
||||
});
|
||||
}
|
||||
|
||||
const { pattern, limit, context } = q.data;
|
||||
|
||||
// ── Path 1: ring buffer search (fast, no tmux interaction) ──
|
||||
const ringMatches = searchRingBuffer(pid, pattern, { limit, context });
|
||||
if (ringMatches.length >= limit) {
|
||||
return reply.code(200).send({
|
||||
matches: ringMatches,
|
||||
total: ringMatches.length,
|
||||
truncated: ringMatches.length >= limit,
|
||||
source: 'ring' as const,
|
||||
});
|
||||
}
|
||||
|
||||
// ── Path 2: capture-pane + grep fallback (10s timeout) ──
|
||||
const sessionName = tmuxSessionName(pid);
|
||||
|
||||
let capture: string;
|
||||
try {
|
||||
capture = await withTimeout(
|
||||
capturePane(tmuxConfPath, sessionName, 5000),
|
||||
10_000,
|
||||
);
|
||||
} catch (err) {
|
||||
req.log.warn({ err, pid }, 'capture-pane timed out or failed');
|
||||
return reply.code(200).send({
|
||||
matches: ringMatches,
|
||||
total: ringMatches.length,
|
||||
truncated: false,
|
||||
source: 'ring' as const,
|
||||
});
|
||||
}
|
||||
|
||||
if (!capture) {
|
||||
// tmux pane may no longer exist — return whatever ring had
|
||||
return reply.code(200).send({
|
||||
matches: ringMatches,
|
||||
total: ringMatches.length,
|
||||
truncated: false,
|
||||
source: 'ring' as const,
|
||||
});
|
||||
}
|
||||
|
||||
const captureMatches = grepBuffer(capture, pattern, limit, context);
|
||||
|
||||
return reply.code(200).send({
|
||||
matches: captureMatches,
|
||||
total: captureMatches.length,
|
||||
truncated: captureMatches.length >= limit,
|
||||
source: 'capture' as const,
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
|
||||
return Promise.race([
|
||||
promise,
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('timeout')), ms),
|
||||
),
|
||||
]);
|
||||
}
|
||||
@@ -9,7 +9,7 @@ import {
|
||||
} from '../pty/manager.js';
|
||||
import { attachPty } from '../pty/pty.js';
|
||||
import { getUser } from '../auth.js';
|
||||
import { register, unregister } from '../pty/registry.js';
|
||||
import { register, unregister, appendOutput } from '../pty/registry.js';
|
||||
|
||||
export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string): void {
|
||||
app.get<{
|
||||
@@ -106,6 +106,8 @@ export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string
|
||||
} catch (err) {
|
||||
req.log.warn({ err }, 'ws send failed');
|
||||
}
|
||||
// Feed the ring buffer for pattern-based search
|
||||
appendOutput(pid, data);
|
||||
};
|
||||
handle.onData(onData);
|
||||
|
||||
|
||||
@@ -38,10 +38,31 @@ export interface StepContext {
|
||||
readonly model?: string;
|
||||
}
|
||||
|
||||
export type StepKind = 'agent' | 'code' | 'approval';
|
||||
export type StepKind = 'agent' | 'code' | 'approval' | 'switch';
|
||||
|
||||
/**
|
||||
* One branch of a SWITCH step. The first case whose condition evaluates to true
|
||||
* is selected; all other branches' stepIds are excluded from execution.
|
||||
*/
|
||||
export interface SwitchCase {
|
||||
/** Human-readable label for this branch (reported in switch output). */
|
||||
label: string;
|
||||
/** Pure guard — called with the current step context to decide this branch. */
|
||||
condition: (ctx: StepContext) => boolean;
|
||||
/** stepIds belonging to this branch. */
|
||||
stepIds: string[];
|
||||
}
|
||||
|
||||
export type TriggerRule = 'all_success' | 'one_success' | 'all_done';
|
||||
|
||||
/** Possible statuses for a flow step (persisted in flow_steps.status). */
|
||||
export type StepStatus = 'pending' | 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled' | 'timed_out';
|
||||
|
||||
/** Retry policy for a step that times out. */
|
||||
export interface RetryConfig {
|
||||
maxRetries: number;
|
||||
}
|
||||
|
||||
export interface Step {
|
||||
/** unique id within the flow; other steps depend on it by this id */
|
||||
id: string;
|
||||
@@ -55,10 +76,19 @@ export interface Step {
|
||||
/**
|
||||
* For kind:'agent', returns the worker PROMPT (task + any prior outputs).
|
||||
* For kind:'code', returns the step RESULT directly (the fold/transform).
|
||||
* For kind:'switch', unused (the runner evaluates cases internally).
|
||||
*/
|
||||
run: (ctx: StepContext) => string | Promise<string>;
|
||||
/** optional guard — when it returns false the step is skipped (e.g. no repo) */
|
||||
when?: (ctx: StepContext) => boolean;
|
||||
/** max retries on timeout (0 or unset = no retry) */
|
||||
maxRetries?: number;
|
||||
/** batch group id; steps sharing the same batch are gated by batchConfig.maxConcurrent */
|
||||
batch?: string;
|
||||
/** for kind:'switch' — ordered list of branches evaluated in declaration order */
|
||||
cases?: SwitchCase[];
|
||||
/** for kind:'switch' — fallback step ids when no case matches */
|
||||
defaultBranch?: string[];
|
||||
}
|
||||
|
||||
export interface Flow {
|
||||
@@ -69,6 +99,8 @@ export interface Flow {
|
||||
render: (ctx: StepContext) => string;
|
||||
/** optional output filename for the artifact, derived from input */
|
||||
output?: (ctx: StepContext) => string;
|
||||
/** batch parallelism control — gates concurrent dispatch of steps sharing the same batch id */
|
||||
batchConfig?: { maxConcurrent: number; timeoutMs?: number; joinRule?: TriggerRule };
|
||||
}
|
||||
|
||||
export interface RunResult {
|
||||
|
||||
@@ -52,6 +52,9 @@ const ConfigSchema = z.object({
|
||||
ORPHAN_WORKTREE_GRACE_MS: z.coerce.number().int().positive().default(3_600_000),
|
||||
DEEPSEEK_API_KEY: z.string().optional(),
|
||||
DEEPSEEK_BASE_URL: z.string().url().default('https://api.deepseek.com'),
|
||||
// v2.9.x: flow step timeout (default 5 min). When a 'running' step exceeds
|
||||
// this duration, it is marked 'timed_out' and may be retried.
|
||||
FLOW_STEP_TIMEOUT_MS: z.coerce.number().int().positive().default(300_000),
|
||||
});
|
||||
|
||||
export type Config = z.infer<typeof ConfigSchema>;
|
||||
|
||||
@@ -266,7 +266,7 @@ CREATE INDEX IF NOT EXISTS claude_session_entries_key_idx ON claude_session_entr
|
||||
-- replaces it with the three-value list).
|
||||
ALTER TABLE agent_sessions DROP CONSTRAINT IF EXISTS agent_sessions_backend_chk;
|
||||
ALTER TABLE agent_sessions ADD CONSTRAINT agent_sessions_backend_chk
|
||||
CHECK (backend IN ('opencode_server', 'acp_warm', 'claude_sdk'));
|
||||
CHECK (backend IN ('opencode_server', 'acp_warm', 'claude_sdk', 'paseo'));
|
||||
|
||||
-- LISTEN/NOTIFY fast path: every tasks INSERT (from any call site — routes,
|
||||
-- new_task tool, MCP server) fires pg_notify('tasks_new') in the same
|
||||
@@ -340,11 +340,12 @@ CREATE INDEX IF NOT EXISTS flow_steps_task_id_idx ON flow_steps(task_id);
|
||||
-- edits above are no-ops on the existing DB (CREATE TABLE IF NOT EXISTS skips an
|
||||
-- existing table) — widen via the repo's DROP-IF-EXISTS → guarded-ADD discipline.
|
||||
-- Pure ADD of a new allowed value, so no row UPDATE is needed (no value renamed).
|
||||
-- v2.9.x: widen status CHECKs to include 'timed_out' for Task State Machine.
|
||||
ALTER TABLE flow_runs DROP CONSTRAINT IF EXISTS flow_runs_status_chk;
|
||||
DO $$ BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'flow_runs_status_chk') THEN
|
||||
ALTER TABLE flow_runs ADD CONSTRAINT flow_runs_status_chk
|
||||
CHECK (status IN ('running', 'completed', 'failed', 'cancelled'));
|
||||
CHECK (status IN ('running', 'completed', 'failed', 'cancelled', 'timed_out'));
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
@@ -352,10 +353,14 @@ ALTER TABLE flow_steps DROP CONSTRAINT IF EXISTS flow_steps_status_chk;
|
||||
DO $$ BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'flow_steps_status_chk') THEN
|
||||
ALTER TABLE flow_steps ADD CONSTRAINT flow_steps_status_chk
|
||||
CHECK (status IN ('pending', 'running', 'completed', 'failed', 'skipped', 'cancelled'));
|
||||
CHECK (status IN ('pending', 'running', 'completed', 'failed', 'skipped', 'cancelled', 'timed_out'));
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
-- Task State Machine: retry columns for flow_steps.
|
||||
ALTER TABLE flow_steps ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE flow_steps ADD COLUMN IF NOT EXISTS max_retries INTEGER;
|
||||
|
||||
-- Arena: battles + contestants + cross_examinations.
|
||||
-- project_id carries no FK (matches tasks.project_id + flow_runs.project_id convention).
|
||||
-- winner_contestant_id FK is deferred (forward reference): added via guarded ALTER below.
|
||||
|
||||
@@ -1,16 +1,20 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import type { Flow, Step, StepContext } from '../../conductor/types.js';
|
||||
import {
|
||||
buildBatchState,
|
||||
getReadyInBatch,
|
||||
manifestSteps,
|
||||
readySteps,
|
||||
partitionReady,
|
||||
readySteps,
|
||||
isRunComplete,
|
||||
isStuck,
|
||||
reconcileResumeStep,
|
||||
reconcileRun,
|
||||
resolveSwitch,
|
||||
shouldFailOnMissingAgent,
|
||||
type SchedulerState,
|
||||
} from '../flow-runner-decisions.js';
|
||||
import type { StepContext } from '../../conductor/types.js';
|
||||
|
||||
/**
|
||||
* The DB-driven flow-runner replaces the Phase-1 in-memory wave scheduler
|
||||
@@ -52,6 +56,8 @@ const emptyState = (over: Partial<SchedulerState> = {}): SchedulerState => ({
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: new Map(),
|
||||
...over,
|
||||
});
|
||||
|
||||
@@ -237,6 +243,442 @@ describe('isRunComplete / isStuck', () => {
|
||||
});
|
||||
});
|
||||
|
||||
// ─── SWITCH branching (v2.9) ─────────────────────────────────────────────────
|
||||
|
||||
describe('resolveSwitch', () => {
|
||||
const baseCtx: StepContext = { input: { question: 'q', band: 'small' }, results: {} };
|
||||
|
||||
it('selects the first matching case and excludes other branches', () => {
|
||||
const step: Step = {
|
||||
id: 'router',
|
||||
kind: 'switch',
|
||||
run: () => '',
|
||||
cases: [
|
||||
{ label: 'a', condition: () => false, stepIds: ['a1', 'a2'] },
|
||||
{ label: 'b', condition: () => true, stepIds: ['b1', 'b2'] },
|
||||
{ label: 'c', condition: () => true, stepIds: ['c1', 'c2'] },
|
||||
],
|
||||
};
|
||||
const result = resolveSwitch(step, baseCtx);
|
||||
expect(result.chosenCase).toBe('b');
|
||||
expect(result.excluded).toEqual(['a1', 'a2', 'c1', 'c2']);
|
||||
});
|
||||
|
||||
it('falls back to defaultBranch when no case matches', () => {
|
||||
const step: Step = {
|
||||
id: 'router',
|
||||
kind: 'switch',
|
||||
run: () => '',
|
||||
cases: [
|
||||
{ label: 'x', condition: () => false, stepIds: ['x1'] },
|
||||
{ label: 'y', condition: () => false, stepIds: ['y1'] },
|
||||
],
|
||||
defaultBranch: ['z1', 'z2'],
|
||||
};
|
||||
const result = resolveSwitch(step, baseCtx);
|
||||
expect(result.chosenCase).toBeNull();
|
||||
// Only case branch steps are excluded; default steps are not.
|
||||
expect(result.excluded).toEqual(['x1', 'y1']);
|
||||
});
|
||||
|
||||
it('excludes all branch steps when no case matches and no default', () => {
|
||||
const step: Step = {
|
||||
id: 'router',
|
||||
kind: 'switch',
|
||||
run: () => '',
|
||||
cases: [
|
||||
{ label: 'p', condition: () => false, stepIds: ['p1'] },
|
||||
{ label: 'q', condition: () => false, stepIds: ['q1', 'q2'] },
|
||||
],
|
||||
};
|
||||
const result = resolveSwitch(step, baseCtx);
|
||||
expect(result.chosenCase).toBeNull();
|
||||
expect(result.excluded).toEqual(['p1', 'q1', 'q2']);
|
||||
});
|
||||
|
||||
it('excludes defaultBranch when a case matched', () => {
|
||||
const step: Step = {
|
||||
id: 'router',
|
||||
kind: 'switch',
|
||||
run: () => '',
|
||||
cases: [
|
||||
{ label: 'hit', condition: () => true, stepIds: ['h1'] },
|
||||
{ label: 'miss', condition: () => false, stepIds: ['m1'] },
|
||||
],
|
||||
defaultBranch: ['d1'],
|
||||
};
|
||||
const result = resolveSwitch(step, baseCtx);
|
||||
expect(result.chosenCase).toBe('hit');
|
||||
expect(result.excluded).toEqual(['m1', 'd1']);
|
||||
});
|
||||
|
||||
it('returns empty excluded for a degenerate switch with no cases and no default', () => {
|
||||
const step: Step = {
|
||||
id: 'noop',
|
||||
kind: 'switch',
|
||||
run: () => '',
|
||||
};
|
||||
const result = resolveSwitch(step, baseCtx);
|
||||
expect(result.chosenCase).toBeNull();
|
||||
expect(result.excluded).toEqual([]);
|
||||
});
|
||||
|
||||
it('uses ctx.results in condition evaluation', () => {
|
||||
const step: Step = {
|
||||
id: 'router',
|
||||
kind: 'switch',
|
||||
run: () => '',
|
||||
cases: [
|
||||
{ label: 'has', condition: (ctx) => ctx.results['prev'] === 'yes', stepIds: ['yes-branch'] },
|
||||
{ label: 'no', condition: () => true, stepIds: ['no-branch'] },
|
||||
],
|
||||
};
|
||||
const ctxWithResult: StepContext = { input: { question: 'q', band: 'small' }, results: { prev: 'yes' } };
|
||||
const result = resolveSwitch(step, ctxWithResult);
|
||||
expect(result.chosenCase).toBe('has');
|
||||
expect(result.excluded).toEqual(['no-branch']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('readySteps with switch-excluded steps', () => {
|
||||
// Flow: switch router → branch-a/branch-b → fold
|
||||
function switchFlow(): Flow {
|
||||
const steps: Step[] = [
|
||||
{
|
||||
id: 'switch', kind: 'switch', run: () => '',
|
||||
cases: [
|
||||
{ label: 'a', condition: () => true, stepIds: ['branch-a'] },
|
||||
{ label: 'b', condition: () => false, stepIds: ['branch-b'] },
|
||||
],
|
||||
},
|
||||
{ id: 'branch-a', kind: 'agent', agent: 'x', deps: ['switch'], run: () => 'p' },
|
||||
{ id: 'branch-b', kind: 'agent', agent: 'y', deps: ['switch'], run: () => 'q' },
|
||||
{ id: 'fold', kind: 'code', deps: ['branch-a', 'branch-b'], run: () => 'r' },
|
||||
];
|
||||
return { name: 'switch-demo', description: '', steps, render: () => '' };
|
||||
}
|
||||
|
||||
it('excludes non-selected branch steps and treats them as satisfied deps', () => {
|
||||
const flow = switchFlow();
|
||||
// switch completed, branch-b excluded by switch (branch-a selected)
|
||||
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
|
||||
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
|
||||
]);
|
||||
const state: SchedulerState = {
|
||||
done: new Set(['switch']),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: switchResult,
|
||||
};
|
||||
const ready = readySteps(flow, state).map((s) => s.id);
|
||||
// branch-a is ready (dep switch is done), branch-b is excluded
|
||||
expect(ready).toContain('branch-a');
|
||||
expect(ready).not.toContain('branch-b');
|
||||
});
|
||||
|
||||
it('fold unblocks once selected branch completes (excluded branch satisfied)', () => {
|
||||
const flow = switchFlow();
|
||||
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
|
||||
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
|
||||
]);
|
||||
const state: SchedulerState = {
|
||||
done: new Set(['switch', 'branch-a']),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: switchResult,
|
||||
};
|
||||
const ready = readySteps(flow, state).map((s) => s.id);
|
||||
// fold's deps: branch-a done, branch-b excluded (via switch) → satisfied
|
||||
expect(ready).toContain('fold');
|
||||
});
|
||||
|
||||
it('fold stays blocked until selected branch completes, even with excluded dep', () => {
|
||||
const flow = switchFlow();
|
||||
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
|
||||
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
|
||||
]);
|
||||
const state: SchedulerState = {
|
||||
done: new Set(['switch']),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(['branch-a']),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: switchResult,
|
||||
};
|
||||
const ready = readySteps(flow, state).map((s) => s.id);
|
||||
// branch-a in flight, branch-b excluded — only branch-a offered
|
||||
expect(ready).not.toContain('fold');
|
||||
});
|
||||
|
||||
it('isRunComplete returns true when switch-excluded steps are the only unsettled', () => {
|
||||
const flow = switchFlow();
|
||||
// All non-excluded steps done; branch-b is excluded via switch
|
||||
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
|
||||
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
|
||||
]);
|
||||
const state: SchedulerState = {
|
||||
done: new Set(['switch', 'branch-a', 'fold']),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: switchResult,
|
||||
};
|
||||
expect(isRunComplete(flow, state)).toBe(true);
|
||||
expect(isStuck(flow, state)).toBe(false);
|
||||
});
|
||||
|
||||
it('combines static excluded with switch-excluded', () => {
|
||||
const flow = switchFlow();
|
||||
// band gating excludes branch-b at launch, AND switch also excludes it
|
||||
const switchResult = new Map<string, { chosenCase: string | null; excluded: Set<string> }>([
|
||||
['switch', { chosenCase: 'a', excluded: new Set(['branch-b']) }],
|
||||
]);
|
||||
const state: SchedulerState = {
|
||||
done: new Set(['switch', 'branch-a']),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(['branch-b']),
|
||||
timedOut: new Set(),
|
||||
switchResults: switchResult,
|
||||
};
|
||||
// branch-b excluded both ways; fold sees branch-a done, branch-b excluded
|
||||
const ready = readySteps(flow, state).map((s) => s.id);
|
||||
expect(ready).toContain('fold');
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Batch parallelism (v2.8.22) ─────────────────────────────────────────────
|
||||
|
||||
describe('buildBatchState', () => {
|
||||
it('returns empty map when flow has no batchConfig', () => {
|
||||
const flow: Flow = {
|
||||
name: 'no-batch',
|
||||
description: '',
|
||||
steps: [
|
||||
{ id: 'a', kind: 'agent', agent: 'x', run: () => 'p' },
|
||||
{ id: 'b', kind: 'code', deps: ['a'], run: () => 'r' },
|
||||
],
|
||||
render: () => '',
|
||||
};
|
||||
const bs = buildBatchState(flow, new Set());
|
||||
expect(bs.size).toBe(0);
|
||||
});
|
||||
|
||||
it('maps each batch group to its running set and config', () => {
|
||||
const flow: Flow = {
|
||||
name: 'batched',
|
||||
description: '',
|
||||
steps: [
|
||||
{ id: 'a1', kind: 'agent', agent: 'x', batch: 'review', run: () => 'p' },
|
||||
{ id: 'a2', kind: 'agent', agent: 'y', batch: 'review', run: () => 'q' },
|
||||
{ id: 'b1', kind: 'agent', agent: 'z', batch: 'check', run: () => 'r' },
|
||||
{ id: 'fold', kind: 'code', deps: ['a1', 'a2', 'b1'], run: () => 's' },
|
||||
],
|
||||
render: () => '',
|
||||
batchConfig: { maxConcurrent: 2 },
|
||||
};
|
||||
// a1 is in flight → review batch has 1 running, check has 0.
|
||||
const bs = buildBatchState(flow, new Set(['a1']));
|
||||
expect(bs.size).toBe(2);
|
||||
|
||||
const review = bs.get('review');
|
||||
expect(review).toBeDefined();
|
||||
expect([...review!.running]).toEqual(['a1']);
|
||||
expect(review!.maxConcurrent).toBe(2);
|
||||
expect(review!.joinRule).toBe('all_success');
|
||||
|
||||
const check = bs.get('check');
|
||||
expect(check).toBeDefined();
|
||||
expect(check!.running.size).toBe(0);
|
||||
expect(check!.maxConcurrent).toBe(2);
|
||||
});
|
||||
|
||||
it('uses joinRule from batchConfig when provided', () => {
|
||||
const flow: Flow = {
|
||||
name: 'join',
|
||||
description: '',
|
||||
steps: [
|
||||
{ id: 'x', kind: 'agent', agent: 'a', batch: 'g1', run: () => 'p' },
|
||||
],
|
||||
render: () => '',
|
||||
batchConfig: { maxConcurrent: 1, joinRule: 'one_success' },
|
||||
};
|
||||
const bs = buildBatchState(flow, new Set());
|
||||
expect(bs.get('g1')!.joinRule).toBe('one_success');
|
||||
});
|
||||
|
||||
it('ignores steps without a batch field', () => {
|
||||
const flow: Flow = {
|
||||
name: 'mixed',
|
||||
description: '',
|
||||
steps: [
|
||||
{ id: 'a', kind: 'agent', agent: 'x', run: () => 'p' },
|
||||
{ id: 'b', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' },
|
||||
],
|
||||
render: () => '',
|
||||
batchConfig: { maxConcurrent: 3 },
|
||||
};
|
||||
const bs = buildBatchState(flow, new Set(['a', 'b']));
|
||||
// a is inFlight but has no batch — it does not create an entry
|
||||
expect(bs.size).toBe(1);
|
||||
expect(bs.has('g1')).toBe(true);
|
||||
expect(bs.get('g1')!.running.has('b')).toBe(true);
|
||||
// a is not in any batch entry
|
||||
for (const entry of bs.values()) {
|
||||
expect(entry.running.has('a')).toBe(false);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('getReadyInBatch', () => {
|
||||
function makeBatchState(
|
||||
overrides?: Map<string, { running: Set<string>; maxConcurrent: number; joinRule: TriggerRule }>,
|
||||
): Map<string, { running: Set<string>; maxConcurrent: number; joinRule: TriggerRule }> {
|
||||
return overrides ?? new Map();
|
||||
}
|
||||
|
||||
it('passes all steps through when batchState is empty', () => {
|
||||
const steps: Step[] = [
|
||||
{ id: 'a', kind: 'agent', agent: 'x', run: () => 'p' },
|
||||
{ id: 'b', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' },
|
||||
];
|
||||
const state: SchedulerState = {
|
||||
done: new Set(),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: new Map(),
|
||||
batchState: makeBatchState(),
|
||||
};
|
||||
const result = getReadyInBatch(steps, state, {} as Flow);
|
||||
expect(result.map((s) => s.id)).toEqual(['a', 'b']);
|
||||
});
|
||||
|
||||
it('passes non-batched steps through regardless of batch capacity', () => {
|
||||
const batchState = new Map();
|
||||
batchState.set('g1', { running: new Set(['a']), maxConcurrent: 1, joinRule: 'all_success' });
|
||||
const steps: Step[] = [
|
||||
{ id: 'nobatch', kind: 'agent', agent: 'z', run: () => 'r' },
|
||||
{ id: 'batched', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' },
|
||||
];
|
||||
const state: SchedulerState = {
|
||||
done: new Set(),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(['a']),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: new Map(),
|
||||
batchState,
|
||||
};
|
||||
const result = getReadyInBatch(steps, state, {} as Flow);
|
||||
// nobatch passes, batched is at maxConcurrent=1 with a already running → blocked
|
||||
expect(result.map((s) => s.id)).toEqual(['nobatch']);
|
||||
});
|
||||
|
||||
it('allows batch steps up to maxConcurrent', () => {
|
||||
const batchState = new Map();
|
||||
batchState.set('g1', { running: new Set(), maxConcurrent: 2, joinRule: 'all_success' });
|
||||
const steps: Step[] = [
|
||||
{ id: 's1', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' },
|
||||
{ id: 's2', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' },
|
||||
{ id: 's3', kind: 'agent', agent: 'z', batch: 'g1', run: () => 'r' },
|
||||
];
|
||||
const state: SchedulerState = {
|
||||
done: new Set(),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: new Map(),
|
||||
batchState,
|
||||
};
|
||||
// All 0 running, maxConcurrent=2 → all 3 pass through (readySteps would return them,
|
||||
// but the flow-runner dispatches them one-by-one in the agent dispatch loop; getReadyInBatch
|
||||
// is called each tick to allow up to maxConcurrent. Since batch is empty on this tick,
|
||||
// all are allowed — the runner's dispatch loop will put 2 in flight, then next tick blocks.)
|
||||
const result = getReadyInBatch(steps, state, {} as Flow);
|
||||
expect(result.map((s) => s.id)).toEqual(['s1', 's2', 's3']);
|
||||
});
|
||||
|
||||
it('blocks batch steps when at capacity', () => {
|
||||
const batchState = new Map();
|
||||
batchState.set('g1', { running: new Set(['a', 'b']), maxConcurrent: 2, joinRule: 'all_success' });
|
||||
const steps: Step[] = [
|
||||
{ id: 'c', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' },
|
||||
{ id: 'd', kind: 'agent', agent: 'y', batch: 'g1', run: () => 'q' },
|
||||
];
|
||||
const state: SchedulerState = {
|
||||
done: new Set(),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(['a', 'b']),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: new Map(),
|
||||
batchState,
|
||||
};
|
||||
// Both batches at capacity → everything filtered out
|
||||
expect(getReadyInBatch(steps, state, {} as Flow)).toEqual([]);
|
||||
});
|
||||
|
||||
it('handles multiple independent batch groups', () => {
|
||||
const batchState = new Map();
|
||||
batchState.set('g1', { running: new Set(['a']), maxConcurrent: 1, joinRule: 'all_success' });
|
||||
batchState.set('g2', { running: new Set(), maxConcurrent: 5, joinRule: 'all_success' });
|
||||
const steps: Step[] = [
|
||||
{ id: 'b', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' }, // g1 at capacity → blocked
|
||||
{ id: 'c', kind: 'agent', agent: 'y', batch: 'g2', run: () => 'q' }, // g2 has room → passes
|
||||
{ id: 'd', kind: 'agent', agent: 'z', batch: 'g2', run: () => 'r' }, // g2 has room → passes
|
||||
];
|
||||
const state: SchedulerState = {
|
||||
done: new Set(),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(['a']),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: new Map(),
|
||||
batchState,
|
||||
};
|
||||
expect(getReadyInBatch(steps, state, {} as Flow).map((s) => s.id)).toEqual(['c', 'd']);
|
||||
});
|
||||
|
||||
it('lets a step pass when its batch group is known but has no running steps yet', () => {
|
||||
const batchState = new Map();
|
||||
batchState.set('g1', { running: new Set(), maxConcurrent: 2, joinRule: 'all_success' });
|
||||
const steps: Step[] = [
|
||||
{ id: 'first', kind: 'agent', agent: 'x', batch: 'g1', run: () => 'p' },
|
||||
];
|
||||
const state: SchedulerState = {
|
||||
done: new Set(),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: new Map(),
|
||||
batchState,
|
||||
};
|
||||
expect(getReadyInBatch(steps, state, {} as Flow).map((s) => s.id)).toEqual(['first']);
|
||||
});
|
||||
|
||||
it('handles empty step list gracefully', () => {
|
||||
const state: SchedulerState = {
|
||||
done: new Set(),
|
||||
skipped: new Set(),
|
||||
inFlight: new Set(),
|
||||
excluded: new Set(),
|
||||
timedOut: new Set(),
|
||||
switchResults: new Map(),
|
||||
batchState: makeBatchState(),
|
||||
};
|
||||
expect(getReadyInBatch([], state, {} as Flow)).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Resume reconciliation (D-9) ─────────────────────────────────────────────
|
||||
|
||||
describe('reconcileResumeStep', () => {
|
||||
|
||||
195
apps/coder/src/services/__tests__/paseo-client.test.ts
Normal file
195
apps/coder/src/services/__tests__/paseo-client.test.ts
Normal file
@@ -0,0 +1,195 @@
|
||||
import { describe, it, expect, vi } from 'vitest';
|
||||
import { PaseoClient, PaseoClientError } from '../paseo-client.js';
|
||||
|
||||
/**
|
||||
* Create a PaseoClient whose runCli method is replaced with a mock.
|
||||
* The mock is returned as the second tuple element so tests can
|
||||
* control and inspect it directly.
|
||||
*/
|
||||
function makeClient(config?: { paseoBin?: string; cliHost?: string }): {
|
||||
client: PaseoClient;
|
||||
mockRunCli: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
const client = new PaseoClient(config);
|
||||
const mockRunCli = vi.fn();
|
||||
(client as any).runCli = mockRunCli;
|
||||
return { client, mockRunCli };
|
||||
}
|
||||
|
||||
describe('PaseoClient', () => {
|
||||
describe('listAgents', () => {
|
||||
it('returns parsed agent list from paseo ls --json', async () => {
|
||||
const agents = [
|
||||
{ id: 'abc-123', shortId: 'abc', name: 'Agent 1', provider: 'opencode', status: 'running' },
|
||||
{ id: 'def-456', shortId: 'def', name: 'Agent 2', provider: 'claude', status: 'idle' },
|
||||
];
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue(JSON.stringify(agents));
|
||||
|
||||
const result = await client.listAgents();
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith(['ls', '--json']);
|
||||
expect(result).toEqual(agents);
|
||||
});
|
||||
|
||||
it('throws PaseoClientError on non-JSON output', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue('not json');
|
||||
|
||||
await expect(client.listAgents()).rejects.toThrow(PaseoClientError);
|
||||
await expect(client.listAgents()).rejects.toThrow(/invalid JSON/);
|
||||
});
|
||||
|
||||
it('propagates runCli rejection as-is', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
const err = new PaseoClientError('ls failed: connection refused', 'ls', 1, 'connection refused');
|
||||
mockRunCli.mockRejectedValue(err);
|
||||
|
||||
await expect(client.listAgents()).rejects.toThrow(PaseoClientError);
|
||||
await expect(client.listAgents()).rejects.toThrow(/ls failed/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getAgentStatus', () => {
|
||||
it('returns parsed agent detail from paseo inspect --json', async () => {
|
||||
const detail = {
|
||||
Id: 'abc-123', Name: 'Agent 1', Provider: 'opencode',
|
||||
Status: 'idle', Archived: false,
|
||||
CreatedAt: '2026-01-01T00:00:00Z', UpdatedAt: '2026-01-01T01:00:00Z',
|
||||
};
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue(JSON.stringify(detail));
|
||||
|
||||
const result = await client.getAgentStatus('abc-123');
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith(['inspect', '--json', 'abc-123']);
|
||||
expect(result.Id).toBe('abc-123');
|
||||
expect(result.Status).toBe('idle');
|
||||
});
|
||||
});
|
||||
|
||||
describe('health', () => {
|
||||
it('returns ok when paseo ls succeeds', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue('[]');
|
||||
|
||||
const result = await client.health();
|
||||
|
||||
expect(result).toEqual({ status: 'ok' });
|
||||
});
|
||||
|
||||
it('returns error when runCli throws', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockRejectedValue(new Error('connection refused'));
|
||||
|
||||
const result = await client.health();
|
||||
|
||||
expect(result).toEqual({ status: 'error' });
|
||||
});
|
||||
});
|
||||
|
||||
describe('importAgent', () => {
|
||||
it('calls paseo import with provider and labels', async () => {
|
||||
const agentResult = { Id: 'new-789', Name: 'Imported', Provider: 'opencode', Status: 'idle' };
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue(JSON.stringify(agentResult));
|
||||
|
||||
const result = await client.importAgent('ses-001', 'opencode', {
|
||||
origin: 'boocode',
|
||||
project: 'proj-1',
|
||||
});
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith([
|
||||
'import', '--json',
|
||||
'--provider', 'opencode',
|
||||
'--label', 'origin=boocode',
|
||||
'--label', 'project=proj-1',
|
||||
'ses-001',
|
||||
]);
|
||||
expect(result.Id).toBe('new-789');
|
||||
});
|
||||
|
||||
it('works without labels', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue(JSON.stringify({ Id: 'new-789' }));
|
||||
|
||||
const result = await client.importAgent('ses-001', 'claude');
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith([
|
||||
'import', '--json',
|
||||
'--provider', 'claude',
|
||||
'ses-001',
|
||||
]);
|
||||
expect(result.Id).toBe('new-789');
|
||||
});
|
||||
});
|
||||
|
||||
describe('archiveAgent', () => {
|
||||
it('calls paseo archive --json', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue('{}');
|
||||
|
||||
await client.archiveAgent('abc-123');
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith(['archive', '--json', 'abc-123']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('sendPrompt', () => {
|
||||
it('sends prompt and parses JSON result', async () => {
|
||||
const sendResult = { text: 'Hello!', ok: true };
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue(JSON.stringify(sendResult));
|
||||
|
||||
const result = await client.sendPrompt('abc-123', 'Hello');
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith(['send', '--json', 'abc-123', 'Hello'], undefined);
|
||||
expect(result).toEqual(sendResult);
|
||||
});
|
||||
|
||||
it('falls back to plain text on non-JSON output', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue('plain text response');
|
||||
|
||||
const result = await client.sendPrompt('abc-123', 'Hi');
|
||||
|
||||
expect(result).toEqual({ text: 'plain text response', ok: true });
|
||||
});
|
||||
|
||||
it('supports --no-wait flag', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue('{}');
|
||||
|
||||
await client.sendPrompt('abc-123', 'Hi', { noWait: true });
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith([
|
||||
'send', '--json', '--no-wait',
|
||||
'abc-123', 'Hi',
|
||||
], undefined);
|
||||
});
|
||||
});
|
||||
|
||||
describe('stopAgent', () => {
|
||||
it('calls paseo stop', async () => {
|
||||
const { client, mockRunCli } = makeClient();
|
||||
mockRunCli.mockResolvedValue('');
|
||||
|
||||
await client.stopAgent('abc-123');
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith(['stop', 'abc-123']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('cliHost config', () => {
|
||||
it('includes --host flag in args when cliHost is set', async () => {
|
||||
const { client, mockRunCli } = makeClient({ cliHost: 'tcp://localhost:6767?ssl=true' });
|
||||
mockRunCli.mockResolvedValue('[]');
|
||||
|
||||
await client.listAgents();
|
||||
|
||||
expect(mockRunCli).toHaveBeenCalledWith([
|
||||
'ls', '--json', '--host', 'tcp://localhost:6767?ssl=true',
|
||||
]);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -13,7 +13,7 @@ import type { AcpToolSnapshot } from './acp-tool-snapshot.js';
|
||||
import type { AgentCommand } from './provider-types.js';
|
||||
|
||||
/** Backend transport kind. Mirrors `agent_sessions.backend` CHECK in schema.sql. */
|
||||
export type AgentBackendKind = 'opencode_server' | 'acp_warm' | 'claude_sdk';
|
||||
export type AgentBackendKind = 'opencode_server' | 'acp_warm' | 'claude_sdk' | 'paseo';
|
||||
|
||||
/**
|
||||
* Normalized, transport-agnostic events a backend emits during a turn (§2).
|
||||
|
||||
254
apps/coder/src/services/backends/paseo.ts
Normal file
254
apps/coder/src/services/backends/paseo.ts
Normal file
@@ -0,0 +1,254 @@
|
||||
/**
|
||||
* v2.10 — PaseoBackend: Paseo agent integration for the agent-pool.
|
||||
*
|
||||
* Wraps the Paseo CLI daemon as an AgentBackend. Each Paseo agent maps to one
|
||||
* (chat_id, agent) pair and is persisted via `paseo import` (which registers
|
||||
* an agent with the Paseo daemon). Prompts are sent via `paseo send`, and
|
||||
* the session is cleaned up via `paseo archive`.
|
||||
*
|
||||
* Paseo is a meta-agent hub — it wraps provider sessions (opencode, claude,
|
||||
* acp, etc.). The `provider` option in `EnsureSessionOpts` selects which
|
||||
* provider Paseo delegates to.
|
||||
*
|
||||
* Backend kind: 'paseo' (must be added to agent_sessions_backend_chk).
|
||||
*
|
||||
* Spec: openspec/changes/v2-10-paseo-integration/design.md.
|
||||
*/
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { Sql } from '../../db.js';
|
||||
import { PaseoClient, type PaseoSendResult } from '../paseo-client.js';
|
||||
import type {
|
||||
AgentBackend,
|
||||
AgentSessionHandle,
|
||||
EnsureSessionOpts,
|
||||
PromptCtx,
|
||||
TurnResult,
|
||||
} from '../agent-backend.js';
|
||||
|
||||
/** Default provider to use when Paseo wraps a generic agent. */
|
||||
const DEFAULT_PASEO_PROVIDER = 'opencode';
|
||||
|
||||
export interface PaseoBackendDeps {
|
||||
sql: Sql;
|
||||
log: FastifyBaseLogger;
|
||||
/** The (chat, agent) this backend serves — its pool identity + DB key. */
|
||||
chatId: string;
|
||||
/** Agent name (e.g. 'opencode', 'claude', 'paseo'). */
|
||||
agent: string;
|
||||
/** Resolved PaseoClient instance. */
|
||||
client: PaseoClient;
|
||||
/** Provider string to pass to `paseo import --provider`. */
|
||||
provider: string;
|
||||
}
|
||||
|
||||
export class PaseoBackend implements AgentBackend {
|
||||
readonly backend = 'paseo' as const;
|
||||
|
||||
private readonly sql: Sql;
|
||||
private readonly log: FastifyBaseLogger;
|
||||
private readonly chatId: string;
|
||||
private readonly agent: string;
|
||||
private readonly client: PaseoClient;
|
||||
private readonly provider: string;
|
||||
|
||||
/** Map of BooCode sessionId → Paseo agent ID. */
|
||||
private readonly agentIds = new Map<string, string>();
|
||||
/** True between prompt() start and settle. */
|
||||
private busy = false;
|
||||
private up = false;
|
||||
|
||||
constructor(deps: PaseoBackendDeps) {
|
||||
this.sql = deps.sql;
|
||||
this.log = deps.log;
|
||||
this.chatId = deps.chatId;
|
||||
this.agent = deps.agent;
|
||||
this.client = deps.client;
|
||||
this.provider = deps.provider || DEFAULT_PASEO_PROVIDER;
|
||||
}
|
||||
|
||||
/** §2: liveness for the health endpoint + dispatcher fallback decision. */
|
||||
health(): 'up' | 'down' {
|
||||
return this.up ? 'up' : 'down';
|
||||
}
|
||||
|
||||
/** Phase 3: busy iff a turn is in flight (pool never evicts a busy backend). */
|
||||
isBusy(): boolean {
|
||||
return this.busy;
|
||||
}
|
||||
|
||||
// ─── ensureSession: create/import a Paseo agent ─────────────────────────────
|
||||
|
||||
async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise<AgentSessionHandle> {
|
||||
// Check if we already have a Paseo agent ID for this session.
|
||||
let paseoId = this.agentIds.get(sessionId);
|
||||
|
||||
if (!paseoId) {
|
||||
// Resolve existing agent_session_id from DB (e.g. after a restart).
|
||||
const [row] = await this.sql<{ agent_session_id: string | null }[]>`
|
||||
SELECT agent_session_id FROM agent_sessions
|
||||
WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent} AND backend = 'paseo'
|
||||
`;
|
||||
if (row?.agent_session_id) {
|
||||
paseoId = row.agent_session_id;
|
||||
this.agentIds.set(sessionId, paseoId);
|
||||
}
|
||||
}
|
||||
|
||||
if (!paseoId) {
|
||||
// Import a new Paseo agent. Use the session UUID as the provider session id.
|
||||
const labels: Record<string, string> = {
|
||||
origin: 'boocode',
|
||||
project: opts.projectId,
|
||||
chat: opts.chatId,
|
||||
worktree: opts.worktreeId,
|
||||
agent: this.agent,
|
||||
};
|
||||
|
||||
try {
|
||||
const agent = await this.client.importAgent(sessionId, this.provider, labels);
|
||||
paseoId = agent.Id;
|
||||
this.agentIds.set(sessionId, paseoId);
|
||||
this.log.info(
|
||||
{ paseoId, agent: this.agent, chatId: this.chatId },
|
||||
'paseo: imported agent',
|
||||
);
|
||||
} catch (err) {
|
||||
this.log.error(
|
||||
{ err: String(err), agent: this.agent, chatId: this.chatId },
|
||||
'paseo: importAgent failed',
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert the agent_sessions row.
|
||||
await this.sql`
|
||||
INSERT INTO agent_sessions
|
||||
(chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at)
|
||||
VALUES
|
||||
(${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'paseo', ${paseoId}, NULL, 'active', clock_timestamp())
|
||||
ON CONFLICT (chat_id, agent) DO UPDATE SET
|
||||
session_id = EXCLUDED.session_id,
|
||||
worktree_id = EXCLUDED.worktree_id,
|
||||
backend = 'paseo',
|
||||
agent_session_id = COALESCE(EXCLUDED.agent_session_id, agent_sessions.agent_session_id),
|
||||
server_port = NULL,
|
||||
status = 'active',
|
||||
last_active_at = clock_timestamp()
|
||||
`.catch((err) => {
|
||||
this.log.warn(
|
||||
{ err: String(err), chatId: opts.chatId, agent: opts.agent },
|
||||
'paseo: agent_sessions upsert failed (non-fatal)',
|
||||
);
|
||||
});
|
||||
|
||||
this.up = true;
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
agent: opts.agent,
|
||||
backend: 'paseo',
|
||||
chatId: opts.chatId,
|
||||
worktreeId: opts.worktreeId,
|
||||
agentSessionId: paseoId,
|
||||
serverPort: null,
|
||||
};
|
||||
}
|
||||
|
||||
// ─── prompt: send a message to the Paseo agent ─────────────────────────────
|
||||
|
||||
async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise<TurnResult> {
|
||||
const paseoId = handle.agentSessionId;
|
||||
if (!paseoId) {
|
||||
return { ok: false, error: 'paseo: no agent session id in handle' };
|
||||
}
|
||||
|
||||
this.busy = true;
|
||||
try {
|
||||
// Use streamSend for real-time text output via onEvent.
|
||||
const result: PaseoSendResult = await this.client.streamSend(
|
||||
paseoId,
|
||||
input,
|
||||
(event) => {
|
||||
ctx.onEvent(event);
|
||||
},
|
||||
ctx.signal,
|
||||
);
|
||||
|
||||
// Update last_active_at.
|
||||
await this.sql`
|
||||
UPDATE agent_sessions
|
||||
SET last_active_at = clock_timestamp()
|
||||
WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent}
|
||||
`.catch(() => { /* non-fatal */ });
|
||||
|
||||
if (result.error) {
|
||||
return { ok: false, error: result.error };
|
||||
}
|
||||
|
||||
return { ok: true };
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
// Check if abortion
|
||||
if (ctx.signal.aborted) {
|
||||
return { ok: false, error: 'cancelled' };
|
||||
}
|
||||
return { ok: false, error: `paseo: ${msg}` };
|
||||
} finally {
|
||||
this.busy = false;
|
||||
}
|
||||
}
|
||||
|
||||
// ─── closeSession: archive the Paseo agent ─────────────────────────────────
|
||||
|
||||
async closeSession(handle: AgentSessionHandle): Promise<void> {
|
||||
const paseoId = handle.agentSessionId;
|
||||
if (!paseoId) return;
|
||||
|
||||
try {
|
||||
await this.client.archiveAgent(paseoId);
|
||||
this.log.info({ paseoId, agent: handle.agent }, 'paseo: archived agent');
|
||||
} catch (err) {
|
||||
this.log.warn(
|
||||
{ err: String(err), paseoId, agent: handle.agent },
|
||||
'paseo: archiveAgent failed (non-fatal)',
|
||||
);
|
||||
}
|
||||
|
||||
this.agentIds.delete(handle.sessionId);
|
||||
|
||||
// Update DB row.
|
||||
await this.sql`
|
||||
UPDATE agent_sessions
|
||||
SET status = 'closed', last_active_at = clock_timestamp()
|
||||
WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent}
|
||||
`.catch(() => { /* non-fatal */ });
|
||||
}
|
||||
|
||||
// ─── dispose: archive all tracked agents ───────────────────────────────────
|
||||
|
||||
async dispose(): Promise<void> {
|
||||
const ids = [...this.agentIds.values()];
|
||||
this.agentIds.clear();
|
||||
|
||||
for (const paseoId of ids) {
|
||||
try {
|
||||
await this.client.archiveAgent(paseoId);
|
||||
} catch {
|
||||
// Best-effort cleanup during shutdown.
|
||||
}
|
||||
}
|
||||
|
||||
this.up = false;
|
||||
}
|
||||
|
||||
/** Phase 3: periodic health tick — probes the Paseo daemon. */
|
||||
async tickHealth(_now?: number): Promise<void> {
|
||||
try {
|
||||
const h = await this.client.health();
|
||||
this.up = h.status === 'ok';
|
||||
} catch {
|
||||
this.up = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
115
apps/coder/src/services/collision-detector.ts
Normal file
115
apps/coder/src/services/collision-detector.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
// v2.8 Collision detection — pure functions that find file overlaps between
|
||||
// worktrees/agents editing the same files concurrently. Advisory only; writes
|
||||
// are never blocked, but the collision info surfaces in the UI and logs.
|
||||
//
|
||||
// Severity levels:
|
||||
// same_line — the same file, exact same line region
|
||||
// adjacent_line — the same file, lines touch or are within 5 lines
|
||||
// different_area — the same file, distant lines
|
||||
//
|
||||
// Pure functions, no side effects. Testable in isolation.
|
||||
|
||||
export type ConflictSeverity = 'same_line' | 'adjacent_line' | 'different_area';
|
||||
|
||||
export interface ConflictVerdict {
|
||||
filePath: string;
|
||||
worktrees: string[];
|
||||
severity: ConflictSeverity;
|
||||
agents: string[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Registry entry for a single file change recorded by a worktree.
|
||||
* Stored in the ConflictIndex Map value for each file path.
|
||||
*/
|
||||
export interface ConflictEntry {
|
||||
worktreeId: string;
|
||||
agent: string;
|
||||
/**
|
||||
* Approximate line range touched by the change. undefined when the change
|
||||
* creates or deletes the file (full-file collision vs. same-line).
|
||||
*/
|
||||
lineRange?: { start: number; end: number };
|
||||
status: 'pending' | 'applied' | 'reverted';
|
||||
timestamp: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shape of the conflict index consumed by findConflicts.
|
||||
* File path → set of entries from different worktrees/agents.
|
||||
*/
|
||||
export type ConflictIndexData = ReadonlyMap<string, ReadonlySet<ConflictEntry>>;
|
||||
|
||||
/**
|
||||
* Find file overlaps between `changedFiles` and the conflict index, excluding
|
||||
* the caller's own worktree.
|
||||
*
|
||||
* Returns one ConflictVerdict per file that has entries from other worktrees.
|
||||
* Severity is the highest found (same_line > adjacent_line > different_area).
|
||||
*/
|
||||
export function findConflicts(
|
||||
changedFiles: string[],
|
||||
worktreeId: string,
|
||||
/** Approximate line range for the proposed changes, keyed by file path */
|
||||
changedRanges: Map<string, { start: number; end: number }>,
|
||||
conflictIndex: ConflictIndexData,
|
||||
): ConflictVerdict[] {
|
||||
const verdicts: ConflictVerdict[] = [];
|
||||
|
||||
for (const filePath of changedFiles) {
|
||||
const entries = conflictIndex.get(filePath);
|
||||
if (!entries || entries.size === 0) continue;
|
||||
|
||||
// Filter to entries from OTHER worktrees
|
||||
const otherEntries = [...entries].filter((e) => e.worktreeId !== worktreeId);
|
||||
if (otherEntries.length === 0) continue;
|
||||
|
||||
const myRange = changedRanges.get(filePath);
|
||||
let severity: ConflictSeverity = 'different_area';
|
||||
|
||||
for (const entry of otherEntries) {
|
||||
if (!myRange || !entry.lineRange) {
|
||||
// Full-file changes (create/delete) always hit at least different_area
|
||||
continue;
|
||||
}
|
||||
const sev = lineOverlapSeverity(myRange, entry.lineRange);
|
||||
if (sev === 'same_line') {
|
||||
severity = 'same_line';
|
||||
break; // Can't get higher than this
|
||||
}
|
||||
if (sev === 'adjacent_line' && severity === 'different_area') {
|
||||
severity = 'adjacent_line';
|
||||
}
|
||||
}
|
||||
|
||||
const worktrees = [...new Set(otherEntries.map((e) => e.worktreeId))];
|
||||
const agents = [...new Set(otherEntries.map((e) => e.agent))];
|
||||
|
||||
verdicts.push({ filePath, worktrees, severity, agents });
|
||||
}
|
||||
|
||||
return verdicts;
|
||||
}
|
||||
|
||||
const ADJACENT_LINE_THRESHOLD = 5;
|
||||
|
||||
/**
|
||||
* Determine severity of overlap between two line ranges.
|
||||
*/
|
||||
function lineOverlapSeverity(
|
||||
a: { start: number; end: number },
|
||||
b: { start: number; end: number },
|
||||
): ConflictSeverity {
|
||||
// Same_line: ranges intersect
|
||||
if (a.start <= b.end && b.start <= a.end) {
|
||||
return 'same_line';
|
||||
}
|
||||
|
||||
// Adjacent: ranges are within ADJACENT_LINE_THRESHOLD lines of each other
|
||||
const gap = a.start > b.end ? a.start - b.end : b.start - a.end;
|
||||
if (gap <= ADJACENT_LINE_THRESHOLD) {
|
||||
return 'adjacent_line';
|
||||
}
|
||||
|
||||
return 'different_area';
|
||||
}
|
||||
151
apps/coder/src/services/conflict-index.ts
Normal file
151
apps/coder/src/services/conflict-index.ts
Normal file
@@ -0,0 +1,151 @@
|
||||
// v2.8 In-memory conflict index — tracks which worktrees/agents are editing
|
||||
// which files so the collision detector can find overlaps.
|
||||
//
|
||||
// Singleton exported as `conflictIndex`; imported by pending_changes.ts to
|
||||
// register changes at queue time and unregister on worktree teardown.
|
||||
//
|
||||
// NOT persisted — survives only as long as the BooCoder process. Postgres
|
||||
// is the durable record (pending_changes table); this is the hot in-memory
|
||||
// probe for concurrent edit warnings.
|
||||
|
||||
import type { ConflictEntry, ConflictVerdict } from './collision-detector.js';
|
||||
import { findConflicts } from './collision-detector.js';
|
||||
|
||||
export class ConflictIndex {
|
||||
/**
|
||||
* filePath → Set of ConflictEntry from various worktrees.
|
||||
* A single worktree may have multiple entries for the same file
|
||||
* (several pending edits to the same file in one session).
|
||||
*/
|
||||
#map = new Map<string, Set<ConflictEntry>>();
|
||||
|
||||
// ---- mutation -------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Register that `worktreeId` (agent) is touching `filePath`.
|
||||
* Creates an entry in the index so subsequent callers see it as a conflict.
|
||||
*/
|
||||
registerChange(
|
||||
filePath: string,
|
||||
worktreeId: string,
|
||||
agent: string,
|
||||
lineRange?: { start: number; end: number },
|
||||
): void {
|
||||
let entries = this.#map.get(filePath);
|
||||
if (!entries) {
|
||||
entries = new Set();
|
||||
this.#map.set(filePath, entries);
|
||||
}
|
||||
entries.add({
|
||||
worktreeId,
|
||||
agent,
|
||||
lineRange,
|
||||
status: 'pending' as const,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all entries for a given worktree. Called on worktree teardown
|
||||
* so stale entries don't trigger false warnings.
|
||||
*/
|
||||
removeWorktree(worktreeId: string): void {
|
||||
for (const [filePath, entries] of this.#map) {
|
||||
const before = entries.size;
|
||||
for (const entry of entries) {
|
||||
if (entry.worktreeId === worktreeId) {
|
||||
entries.delete(entry);
|
||||
}
|
||||
}
|
||||
if (entries.size === 0) {
|
||||
this.#map.delete(filePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove entries older than `maxAgeMs`. Useful as a periodic cleanup
|
||||
* when worktree teardown was missed (crash, unclean exit).
|
||||
*/
|
||||
sweepStale(maxAgeMs: number): number {
|
||||
const cutoff = Date.now() - maxAgeMs;
|
||||
let removed = 0;
|
||||
|
||||
for (const [filePath, entries] of this.#map) {
|
||||
for (const entry of entries) {
|
||||
if (entry.timestamp < cutoff) {
|
||||
entries.delete(entry);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (entries.size === 0) {
|
||||
this.#map.delete(filePath);
|
||||
}
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
// ---- query ----------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Query the raw ConflictEntry set for a file path. Returns empty set
|
||||
* when there are no entries (never mutated the file).
|
||||
*/
|
||||
getEntriesFor(filePath: string): ReadonlySet<ConflictEntry> {
|
||||
return this.#map.get(filePath) ?? new Set();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all conflict verdicts for a given file path — which other
|
||||
* worktrees are touching it. Returns empty when only one worktree
|
||||
* has entries (no actual conflict).
|
||||
*/
|
||||
getConflictsFor(filePath: string): ConflictVerdict[] {
|
||||
const entries = this.#map.get(filePath);
|
||||
if (!entries || entries.size === 0) return [];
|
||||
|
||||
// Determine distinct worktree IDs. If only one, no conflict.
|
||||
const worktreeIds = new Set<string>();
|
||||
for (const e of entries) worktreeIds.add(e.worktreeId);
|
||||
if (worktreeIds.size <= 1) return [];
|
||||
|
||||
// Use the first worktree as the "caller" so findConflicts excludes
|
||||
// its entries and returns only entries from OTHER worktrees.
|
||||
const caller = [...worktreeIds][0]!;
|
||||
return findConflicts(
|
||||
[filePath],
|
||||
caller,
|
||||
new Map(),
|
||||
this.#toIndexData(),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get conflicts for a set of file changes from a specific worktree.
|
||||
* Delegates to the pure findConflicts function.
|
||||
*/
|
||||
query(
|
||||
changedFiles: string[],
|
||||
worktreeId: string,
|
||||
changedRanges: Map<string, { start: number; end: number }>,
|
||||
): ConflictVerdict[] {
|
||||
return findConflicts(changedFiles, worktreeId, changedRanges, this.#toIndexData());
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshot the current map for testing/inspection.
|
||||
*/
|
||||
snapshot(): Map<string, ReadonlySet<ConflictEntry>> {
|
||||
return new Map(this.#map);
|
||||
}
|
||||
|
||||
// ---- private --------------------------------------------------------
|
||||
|
||||
#toIndexData(): ReadonlyMap<string, ReadonlySet<ConflictEntry>> {
|
||||
return this.#map as ReadonlyMap<string, ReadonlySet<ConflictEntry>>;
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton — the whole BooCoder process shares one conflict index.
|
||||
export const conflictIndex = new ConflictIndex();
|
||||
@@ -33,11 +33,43 @@ export interface SchedulerState {
|
||||
readonly inFlight: ReadonlySet<string>;
|
||||
/** step ids pre-skipped at launch (band/when gating) — never given a row */
|
||||
readonly excluded: ReadonlySet<string>;
|
||||
/** step ids that timed out (terminal — no retries remaining or not retriable) */
|
||||
readonly timedOut: ReadonlySet<string>;
|
||||
/**
|
||||
* Per-batch running sets, populated by buildBatchState from the flow definition
|
||||
* and the current inFlight set. Only read by getReadyInBatch; never mutated by
|
||||
* decision functions (the caller maintains it across ticks).
|
||||
*/
|
||||
readonly batchState?: Map<string, { running: Set<string>; maxConcurrent: number; joinRule: TriggerRule }>;
|
||||
/**
|
||||
* Per-switch-step routing results. Populated when a SWITCH step completes.
|
||||
* Step ids in any result's `excluded` set are treated as excluded for the
|
||||
* remainder of the run — they won't execute and won't block dependents.
|
||||
*/
|
||||
readonly switchResults: ReadonlyMap<string, { chosenCase: string | null; excluded: ReadonlySet<string> }>;
|
||||
}
|
||||
|
||||
/** A dependency is satisfied once it is done, skipped, or excluded. */
|
||||
/** A dependency is satisfied once it is done, skipped, excluded, or timed out. */
|
||||
function isSatisfied(state: SchedulerState, id: string): boolean {
|
||||
return state.done.has(id) || state.skipped.has(id) || state.excluded.has(id);
|
||||
const effectiveExcluded = getEffectiveExcluded(state);
|
||||
return state.done.has(id) || state.skipped.has(id) || effectiveExcluded.has(id) || state.timedOut.has(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* The union of the static `excluded` set and every switch result's excluded
|
||||
* step ids. Steps excluded by a SWITCH evaluation act exactly like launch-time
|
||||
* excluded steps: they never run and they don't block dependents.
|
||||
*/
|
||||
function getEffectiveExcluded(state: SchedulerState): ReadonlySet<string> {
|
||||
// Fast path: no switch results → static excluded only.
|
||||
if (state.switchResults.size === 0) return state.excluded;
|
||||
const combined = new Set(state.excluded);
|
||||
for (const result of state.switchResults.values()) {
|
||||
for (const id of result.excluded) {
|
||||
combined.add(id);
|
||||
}
|
||||
}
|
||||
return combined;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -56,13 +88,14 @@ export function manifestSteps(flow: Flow, launchCtx: StepContext): Step[] {
|
||||
* Faithful to `conductor/flow.ts:27-36`. Pure.
|
||||
*/
|
||||
export function readySteps(flow: Flow, state: SchedulerState): Step[] {
|
||||
const effectiveExcluded = getEffectiveExcluded(state);
|
||||
return flow.steps.filter(
|
||||
(s) =>
|
||||
!state.done.has(s.id) &&
|
||||
!state.skipped.has(s.id) &&
|
||||
!state.inFlight.has(s.id) &&
|
||||
!state.excluded.has(s.id) &&
|
||||
((s.deps ?? []).length === 0 || evaluateTriggerRule(s.deps ?? [], state.done, state.skipped, state.excluded, s.trigger_rule)),
|
||||
!effectiveExcluded.has(s.id) &&
|
||||
((s.deps ?? []).length === 0 || evaluateTriggerRule(s.deps ?? [], state.done, state.skipped, effectiveExcluded, s.trigger_rule)),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -102,6 +135,57 @@ export function isStuck(flow: Flow, state: SchedulerState): boolean {
|
||||
);
|
||||
}
|
||||
|
||||
// ─── Batch parallelism (v2.8.22) ─────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Build the batchState Map from the flow definition and the current inFlight set.
|
||||
* Only steps with a `batch` field are tracked. Empty map when `flow.batchConfig`
|
||||
* is absent or no steps belong to a batch. Pure — no IO.
|
||||
*/
|
||||
export function buildBatchState(
|
||||
flow: Flow,
|
||||
inFlight: ReadonlySet<string>,
|
||||
): Map<string, { running: Set<string>; maxConcurrent: number; joinRule: TriggerRule }> {
|
||||
const result = new Map<string, { running: Set<string>; maxConcurrent: number; joinRule: TriggerRule }>();
|
||||
if (!flow.batchConfig) return result;
|
||||
|
||||
// Collect every unique batch group referenced by the flow's steps.
|
||||
const groups = new Set<string>();
|
||||
for (const s of flow.steps) {
|
||||
if (s.batch) groups.add(s.batch);
|
||||
}
|
||||
|
||||
const { maxConcurrent, joinRule } = flow.batchConfig;
|
||||
for (const batch of groups) {
|
||||
const running = new Set<string>(
|
||||
flow.steps.filter((s) => s.batch === batch && inFlight.has(s.id)).map((s) => s.id),
|
||||
);
|
||||
result.set(batch, { running, maxConcurrent, joinRule: joinRule ?? 'all_success' });
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gate a ready step list by batch parallelism limits. Steps without a `batch`
|
||||
* field always pass through. Steps belonging to a batch are only included if
|
||||
* that batch's currently-running count is below its `maxConcurrent` cap.
|
||||
*
|
||||
* This is ADDITIVE to the existing wave scheduler: pure dep-based readiness
|
||||
* is computed first (readySteps), then this function applies the batch ceiling.
|
||||
* Steps excluded here remain pending and will be picked up on the next tick
|
||||
* when a running batch step completes.
|
||||
*/
|
||||
export function getReadyInBatch(ready: readonly Step[], state: SchedulerState, _flow: Flow): Step[] {
|
||||
const batchState = state.batchState;
|
||||
if (!batchState || batchState.size === 0) return [...ready];
|
||||
return ready.filter((s) => {
|
||||
if (!s.batch) return true;
|
||||
const bs = batchState.get(s.batch);
|
||||
if (!bs) return true;
|
||||
return bs.running.size < bs.maxConcurrent;
|
||||
});
|
||||
}
|
||||
|
||||
// ─── Resume reconciliation (D-9) ─────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
@@ -118,12 +202,29 @@ export function isStuck(flow: Flow, state: SchedulerState): boolean {
|
||||
* - 'mark-cancelled': task was cancelled before the callback ran; propagate so
|
||||
* advance() cancels the run.
|
||||
*/
|
||||
/**
|
||||
* True when the step definition allows retries on timeout.
|
||||
* Pure — no IO.
|
||||
*/
|
||||
export function isRetriable(step: { maxRetries?: number }): boolean {
|
||||
return (step.maxRetries ?? 0) > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* True when the step has retries remaining.
|
||||
* Pure — no IO.
|
||||
*/
|
||||
export function shouldRetry(maxRetries: number | undefined | null, retryCount: number): boolean {
|
||||
return retryCount < (maxRetries ?? 0);
|
||||
}
|
||||
|
||||
export type ResumeAction =
|
||||
| 'keep'
|
||||
| 're-dispatch'
|
||||
| 'mark-done'
|
||||
| 'mark-failed'
|
||||
| 'mark-cancelled';
|
||||
| 'mark-cancelled'
|
||||
| 'retry';
|
||||
|
||||
/**
|
||||
* Decide what to do with ONE flow step during startup resume (D-9). Pure.
|
||||
@@ -131,12 +232,20 @@ export type ResumeAction =
|
||||
* @param status - flow_steps.status
|
||||
* @param taskId - flow_steps.task_id (null for code steps or unstarted agent steps)
|
||||
* @param taskState - tasks.state for taskId, or null if the task row is absent
|
||||
* @param retryCount - flow_steps.retry_count (default 0)
|
||||
* @param maxRetries - flow_steps.max_retries (null = no retry)
|
||||
*/
|
||||
export function reconcileResumeStep(
|
||||
status: string,
|
||||
taskId: string | null,
|
||||
taskState: string | null,
|
||||
retryCount?: number,
|
||||
maxRetries?: number | null,
|
||||
): ResumeAction {
|
||||
if (status === 'timed_out') {
|
||||
if (shouldRetry(maxRetries, retryCount ?? 0)) return 'retry';
|
||||
return 'mark-failed';
|
||||
}
|
||||
if (status !== 'running') return 'keep';
|
||||
// Running step: decide by its task's current state.
|
||||
if (!taskId || taskState === null) return 're-dispatch'; // task gone or never created
|
||||
@@ -167,6 +276,60 @@ export function shouldFailOnMissingAgent(agent: string, modeId: string | null):
|
||||
return agent === 'qwen' && modeId === 'plan';
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluate a SWITCH step: iterate cases in declaration order and return the
|
||||
* label of the first matching case plus every step id that belongs to a
|
||||
* non-selected branch. When no case matches, the defaultBranch (if present)
|
||||
* is the effective choice. If there is no default, all branch steps are
|
||||
* excluded and the switch returns `chosenCase: null`.
|
||||
*
|
||||
* Pure — no IO. The caller adds the returned `excluded` ids to the scheduler
|
||||
* state's switchResults so downstream decision functions see them as excluded.
|
||||
*/
|
||||
export function resolveSwitch(
|
||||
step: Step,
|
||||
ctx: StepContext,
|
||||
): { chosenCase: string | null; excluded: string[] } {
|
||||
const cases = step.cases;
|
||||
if (!cases || cases.length === 0) {
|
||||
// Degenerate switch — nothing to evaluate.
|
||||
return { chosenCase: null, excluded: [] };
|
||||
}
|
||||
|
||||
// Evaluate conditions in order.
|
||||
for (const c of cases) {
|
||||
if (c.condition(ctx)) {
|
||||
// This case matches — exclude all OTHER branches.
|
||||
const excluded: string[] = [];
|
||||
for (const other of cases) {
|
||||
if (other.label !== c.label) {
|
||||
excluded.push(...other.stepIds);
|
||||
}
|
||||
}
|
||||
// The default branch is also excluded when a case matched.
|
||||
if (step.defaultBranch) excluded.push(...step.defaultBranch);
|
||||
return { chosenCase: c.label, excluded };
|
||||
}
|
||||
}
|
||||
|
||||
// No case matched — use default branch if present.
|
||||
if (step.defaultBranch) {
|
||||
// Default is the chosen branch: exclude all explicit case branches.
|
||||
const excluded: string[] = [];
|
||||
for (const c of cases) {
|
||||
excluded.push(...c.stepIds);
|
||||
}
|
||||
return { chosenCase: null, excluded };
|
||||
}
|
||||
|
||||
// No case matched and no default — exclude everything.
|
||||
const excluded: string[] = [];
|
||||
for (const c of cases) {
|
||||
excluded.push(...c.stepIds);
|
||||
}
|
||||
return { chosenCase: null, excluded };
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluate a trigger rule against dependency results.
|
||||
* - all_success: every dep must be done (not skipped/failed)
|
||||
@@ -198,7 +361,7 @@ export function evaluateTriggerRule(
|
||||
* decision per step. Pure — no IO.
|
||||
*/
|
||||
export function reconcileRun(
|
||||
steps: ReadonlyArray<{ stepId: string; taskId: string | null; status: string }>,
|
||||
steps: ReadonlyArray<{ stepId: string; taskId: string | null; status: string; retryCount?: number; maxRetries?: number | null }>,
|
||||
taskStates: ReadonlyMap<string, string>,
|
||||
): StepResumeDecision[] {
|
||||
return steps.map((step) => ({
|
||||
@@ -207,6 +370,8 @@ export function reconcileRun(
|
||||
step.status,
|
||||
step.taskId,
|
||||
step.taskId ? (taskStates.get(step.taskId) ?? null) : null,
|
||||
step.retryCount,
|
||||
step.maxRetries,
|
||||
),
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -40,11 +40,14 @@ import { getFlow } from '../conductor/flows/index.js';
|
||||
import { loadPersona } from '../conductor/persona-loader.js';
|
||||
import type { Band, DispatchFn, Flow, FlowInput, Step, StepContext } from '../conductor/types.js';
|
||||
import {
|
||||
buildBatchState,
|
||||
getReadyInBatch,
|
||||
isRunComplete,
|
||||
manifestSteps,
|
||||
partitionReady,
|
||||
readySteps,
|
||||
reconcileRun,
|
||||
resolveSwitch,
|
||||
type SchedulerState,
|
||||
type StepResumeDecision,
|
||||
} from './flow-runner-decisions.js';
|
||||
@@ -95,11 +98,14 @@ interface Deps {
|
||||
|
||||
interface FlowStepRow {
|
||||
step_id: string;
|
||||
kind: 'agent' | 'code';
|
||||
kind: 'agent' | 'code' | 'switch';
|
||||
agent: string | null;
|
||||
status: string;
|
||||
chat_id: string | null;
|
||||
output: string | null;
|
||||
updated_at: string | null;
|
||||
retry_count: number | null;
|
||||
max_retries: number | null;
|
||||
}
|
||||
|
||||
export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
@@ -263,7 +269,8 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
const dispatch: DispatchFn = (agent, task) => dispatchSubAgent(run.project_id, model, agent, task);
|
||||
|
||||
const rows = await sql<FlowStepRow[]>`
|
||||
SELECT step_id, kind, agent, status, chat_id, output FROM flow_steps WHERE run_id = ${runId}
|
||||
SELECT step_id, kind, agent, status, chat_id, output, updated_at, retry_count, max_retries
|
||||
FROM flow_steps WHERE run_id = ${runId}
|
||||
`;
|
||||
|
||||
// Re-derive the excluded set (band/when pre-skips) from the flow def + input —
|
||||
@@ -275,6 +282,9 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
const done = new Set<string>();
|
||||
const skipped = new Set<string>();
|
||||
const inFlight = new Set<string>();
|
||||
const timedOut = new Set<string>();
|
||||
/** Per-switch routing results — maps switch step id → resolved branch details */
|
||||
const switchExcluded = new Map<string, { chosenCase: string | null; excluded: Set<string> }>();
|
||||
const results: Record<string, string> = {};
|
||||
for (const r of rows) {
|
||||
switch (r.status) {
|
||||
@@ -288,6 +298,9 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
case 'running':
|
||||
inFlight.add(r.step_id);
|
||||
break;
|
||||
case 'timed_out':
|
||||
timedOut.add(r.step_id);
|
||||
break;
|
||||
case 'failed':
|
||||
// A failed worker makes the deterministic report untrustworthy — fail the
|
||||
// whole run (matches the Phase-1 CLI, which throws on a dispatch failure).
|
||||
@@ -300,17 +313,79 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Timeout detection ───────────────────────────────────────────────────────
|
||||
// Check running steps. If a step has been 'running' longer than
|
||||
// FLOW_STEP_TIMEOUT_MS, mark it timed_out or re-dispatch if retriable.
|
||||
// Build a context here so the timeout retry path can re-dispatch the step.
|
||||
const timeoutCtx = buildCtx(input, results, model, dispatch);
|
||||
const timeoutMs = config.FLOW_STEP_TIMEOUT_MS;
|
||||
const nowDate = new Date();
|
||||
let detectedTimedOut = false;
|
||||
for (const r of rows) {
|
||||
if (r.status !== 'running') continue;
|
||||
if (!r.updated_at) continue;
|
||||
const elapsed = nowDate.getTime() - new Date(r.updated_at).getTime();
|
||||
if (elapsed <= timeoutMs) continue;
|
||||
|
||||
// Step has exceeded the timeout
|
||||
detectedTimedOut = true;
|
||||
const retryCount = r.retry_count ?? 0;
|
||||
const maxRetries = r.max_retries ?? 0;
|
||||
|
||||
if (maxRetries > 0 && retryCount < maxRetries) {
|
||||
// Retriable: re-dispatch the step with an incremented retry_count
|
||||
const step = flow.steps.find((s) => s.id === r.step_id);
|
||||
if (!step || step.kind !== 'agent') {
|
||||
// Non-agent steps can't be retried via dispatch
|
||||
inFlight.delete(r.step_id);
|
||||
await failRun(runId, flow, input, model,
|
||||
`step '${r.step_id}' timed out (non-retriable kind)`, r.step_id);
|
||||
return;
|
||||
}
|
||||
inFlight.delete(r.step_id);
|
||||
await sql`
|
||||
UPDATE flow_steps
|
||||
SET retry_count = ${retryCount + 1}, updated_at = clock_timestamp()
|
||||
WHERE run_id = ${runId} AND step_id = ${r.step_id} AND status = 'running'
|
||||
`;
|
||||
await dispatchAgentStep(runId, run.project_id, model, step, timeoutCtx);
|
||||
inFlight.add(r.step_id);
|
||||
log.warn({ runId, stepId: r.step_id, retry: retryCount + 1, maxRetries },
|
||||
'flow-runner: step timed out, retrying');
|
||||
} else {
|
||||
// Not retriable — mark as timed_out, fail the run
|
||||
inFlight.delete(r.step_id);
|
||||
await sql`
|
||||
UPDATE flow_steps SET status = 'timed_out', updated_at = clock_timestamp()
|
||||
WHERE run_id = ${runId} AND step_id = ${r.step_id} AND status = 'running'
|
||||
`;
|
||||
timedOut.add(r.step_id);
|
||||
publishStep(runId, r.step_id, 'timed_out');
|
||||
await failRun(runId, flow, input, model,
|
||||
`step '${r.step_id}' timed out`, r.step_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// If we modified any steps, re-query so the state sets reflect the latest DB.
|
||||
if (detectedTimedOut) {
|
||||
// Continue with the in-memory state we already adjusted above (inFlight/timedOut
|
||||
// were mutated directly). No re-query needed.
|
||||
}
|
||||
|
||||
// Drain ready skips + code steps (synchronous), re-evaluating after each batch,
|
||||
// then dispatch the full ready agent wave and wait for their terminal callbacks.
|
||||
for (;;) {
|
||||
const state: SchedulerState = { done, skipped, inFlight, excluded };
|
||||
// Build per-batch state from the current inFlight set for batch parallelism gating.
|
||||
const batchState = buildBatchState(flow, inFlight);
|
||||
const state: SchedulerState = { done, skipped, inFlight, excluded, timedOut, batchState, switchResults: switchExcluded };
|
||||
|
||||
if (isRunComplete(flow, state)) {
|
||||
await finishRun(runId, flow, input, results, model, dispatch);
|
||||
return;
|
||||
}
|
||||
|
||||
const ready = readySteps(flow, state);
|
||||
const ready = getReadyInBatch(readySteps(flow, state), state, flow);
|
||||
if (ready.length === 0) {
|
||||
if (inFlight.size > 0) return; // agents in flight will re-enter via the hook
|
||||
await failRun(runId, flow, input, model, 'unsatisfiable dependencies / cycle');
|
||||
@@ -329,6 +404,31 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
continue; // re-evaluate — a skip can settle a fan-in step's deps
|
||||
}
|
||||
|
||||
// SWITCH steps run synchronously — evaluate conditions, update the excluded
|
||||
// set in SchedulerState, and mark themselves complete. Non-selected branch
|
||||
// step ids are excluded from ever running.
|
||||
const switchReady = toRun.filter((s) => s.kind === 'switch');
|
||||
if (switchReady.length > 0) {
|
||||
for (const s of switchReady) {
|
||||
let result: { chosenCase: string | null; excluded: string[] };
|
||||
try {
|
||||
result = resolveSwitch(s, buildCtx(input, results, model, dispatch));
|
||||
} catch (err) {
|
||||
await failRun(runId, flow, input, model, `switch step '${s.id}' threw: ${errMsg(err)}`, s.id);
|
||||
return;
|
||||
}
|
||||
switchExcluded.set(s.id, {
|
||||
chosenCase: result.chosenCase,
|
||||
excluded: new Set(result.excluded),
|
||||
});
|
||||
const outputText = result.chosenCase ? `branch:${result.chosenCase}` : '';
|
||||
await markStep(runId, s.id, 'completed', outputText);
|
||||
results[s.id] = outputText;
|
||||
done.add(s.id);
|
||||
}
|
||||
continue; // re-evaluate — excluded steps may unblock dependents
|
||||
}
|
||||
|
||||
const codeReady = toRun.filter((s) => s.kind === 'code');
|
||||
if (codeReady.length > 0) {
|
||||
for (const s of codeReady) {
|
||||
@@ -545,7 +645,7 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
function publishStep(
|
||||
runId: string,
|
||||
stepId: string,
|
||||
status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled' | 'blocked',
|
||||
status: 'running' | 'completed' | 'failed' | 'skipped' | 'cancelled' | 'blocked' | 'timed_out',
|
||||
extra?: { run_status?: 'running' | 'completed' | 'failed' | 'cancelled'; report?: string },
|
||||
): void {
|
||||
publishUser({
|
||||
@@ -683,6 +783,38 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
log.info({ runId, stepId: step.step_id, taskId: task!.id }, 'flow-runner: step re-dispatched on resume');
|
||||
break;
|
||||
}
|
||||
|
||||
case 'retry': {
|
||||
// Like re-dispatch but increments retry_count and sets status to 'running'.
|
||||
if (!step.input) {
|
||||
await sql`
|
||||
UPDATE flow_steps
|
||||
SET status = 'failed', error = 'retry: no stored prompt',
|
||||
updated_at = clock_timestamp()
|
||||
WHERE run_id = ${runId} AND step_id = ${step.step_id}
|
||||
`;
|
||||
break;
|
||||
}
|
||||
const chatIdR = step.chat_id;
|
||||
const [chatR] = chatIdR
|
||||
? await sql<{ session_id: string }[]>`SELECT session_id FROM chats WHERE id = ${chatIdR}`
|
||||
: [];
|
||||
const sessionIdR = chatR?.session_id ?? null;
|
||||
const [taskR] = await sql<{ id: string }[]>`
|
||||
INSERT INTO tasks (project_id, input, agent, model, mode_id, session_id, chat_id)
|
||||
VALUES (${projectId}, ${step.input}, 'qwen', ${model}, 'plan', ${sessionIdR}, ${chatIdR})
|
||||
RETURNING id
|
||||
`;
|
||||
await sql`
|
||||
UPDATE flow_steps
|
||||
SET task_id = ${taskR!.id}, retry_count = retry_count + 1, status = 'running',
|
||||
updated_at = clock_timestamp()
|
||||
WHERE run_id = ${runId} AND step_id = ${step.step_id}
|
||||
`;
|
||||
log.info({ runId, stepId: step.step_id, taskId: taskR!.id },
|
||||
'flow-runner: step retried on resume');
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -697,7 +829,9 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
status: string;
|
||||
chat_id: string | null;
|
||||
input: string | null;
|
||||
}[]>`SELECT step_id, task_id, status, chat_id, input FROM flow_steps WHERE run_id = ${run.id}`;
|
||||
retry_count: number | null;
|
||||
max_retries: number | null;
|
||||
}[]>`SELECT step_id, task_id, status, chat_id, input, retry_count, max_retries FROM flow_steps WHERE run_id = ${run.id}`;
|
||||
|
||||
// Load task states for all referenced tasks in one query.
|
||||
const taskIds = rows.map((r) => r.task_id).filter((id): id is string => id !== null);
|
||||
@@ -710,7 +844,13 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
}
|
||||
|
||||
const decisions = reconcileRun(
|
||||
rows.map((r) => ({ stepId: r.step_id, taskId: r.task_id, status: r.status })),
|
||||
rows.map((r) => ({
|
||||
stepId: r.step_id,
|
||||
taskId: r.task_id,
|
||||
status: r.status,
|
||||
retryCount: r.retry_count ?? undefined,
|
||||
maxRetries: r.max_retries,
|
||||
})),
|
||||
taskStates,
|
||||
);
|
||||
|
||||
@@ -752,13 +892,13 @@ export function createFlowRunner(deps: Deps): FlowRunner {
|
||||
// Mark all non-terminal steps cancelled and collect in-flight task_ids.
|
||||
const steps = await sql<{ step_id: string; task_id: string | null; kind: string }[]>`
|
||||
SELECT step_id, task_id, kind FROM flow_steps
|
||||
WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped')
|
||||
WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped', 'timed_out')
|
||||
`;
|
||||
|
||||
if (steps.length > 0) {
|
||||
await sql`
|
||||
UPDATE flow_steps SET status = 'cancelled', updated_at = clock_timestamp()
|
||||
WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped')
|
||||
WHERE run_id = ${runId} AND status NOT IN ('completed', 'failed', 'cancelled', 'skipped', 'timed_out')
|
||||
`;
|
||||
for (const s of steps) {
|
||||
if (s.kind === 'agent') publishStep(runId, s.step_id, 'cancelled', { run_status: 'cancelled' });
|
||||
|
||||
341
apps/coder/src/services/paseo-client.ts
Normal file
341
apps/coder/src/services/paseo-client.ts
Normal file
@@ -0,0 +1,341 @@
|
||||
/**
|
||||
* v2.10 — PaseoClient: thin CLI-based client for the Paseo daemon.
|
||||
*
|
||||
* Paseo is a multi-agent hub daemon running at a configurable address
|
||||
* (default Unix socket / localhost:6767). This client wraps the `paseo` CLI
|
||||
* via child_process spawn for all operations (the daemon does not expose a
|
||||
* separate REST API for write operations). Read operations (listAgents,
|
||||
* getAgentStatus) use `paseo ls --json` / `paseo inspect --json`; write
|
||||
* operations (import, archive, send) use the corresponding subcommands.
|
||||
*
|
||||
* Spec: openspec/changes/v2-10-paseo-integration/design.md.
|
||||
*/
|
||||
import { spawn } from 'node:child_process';
|
||||
import { once } from 'node:events';
|
||||
import { createInterface } from 'node:readline';
|
||||
|
||||
// ─── Types ───────────────────────────────────────────────────────────────────
|
||||
|
||||
/** Listing entry from `paseo ls --json`. Fields are lowercase. */
|
||||
export interface PaseoAgentListItem {
|
||||
id: string;
|
||||
shortId: string;
|
||||
name: string;
|
||||
provider: string;
|
||||
status: string;
|
||||
cwd?: string;
|
||||
created?: string;
|
||||
thinking?: string;
|
||||
}
|
||||
|
||||
/** Detailed agent info from `paseo inspect --json`. Fields are PascalCase. */
|
||||
export interface PaseoAgentDetail {
|
||||
Id: string;
|
||||
Name: string;
|
||||
Provider: string;
|
||||
Model?: string;
|
||||
Status: string;
|
||||
Thinking?: string;
|
||||
Archived: boolean;
|
||||
ArchivedAt?: string | null;
|
||||
Cwd?: string;
|
||||
CreatedAt: string;
|
||||
UpdatedAt: string;
|
||||
Mode?: string;
|
||||
AvailableModes?: Array<{ id: string; label: string }>;
|
||||
Capabilities?: {
|
||||
Streaming?: boolean;
|
||||
Persistence?: boolean;
|
||||
DynamicModes?: boolean;
|
||||
McpServers?: boolean;
|
||||
};
|
||||
Labels?: Record<string, string>;
|
||||
Worktree?: string | null;
|
||||
ParentAgentId?: string | null;
|
||||
}
|
||||
|
||||
/** Result of `paseo send --json`. */
|
||||
export interface PaseoSendResult {
|
||||
/** The agent's textual response. */
|
||||
text?: string;
|
||||
/** Structured output if the agent produced any. */
|
||||
output?: unknown;
|
||||
/** Error message if the turn failed. */
|
||||
error?: string;
|
||||
/** True if the turn completed successfully. */
|
||||
ok?: boolean;
|
||||
}
|
||||
|
||||
export interface PaseoClientConfig {
|
||||
/** Path to the paseo binary. Default: auto-resolved from PATH. */
|
||||
paseoBin: string;
|
||||
/**
|
||||
* Explicit `--host <host>` value for CLI calls.
|
||||
* Format: `host:port` or `tcp://host:port?ssl=true&password=secret`.
|
||||
* Omit to use the CLI default (Unix socket, fallback localhost:6767).
|
||||
*/
|
||||
cliHost?: string;
|
||||
}
|
||||
|
||||
const DEFAULT_PASEO_BIN = 'paseo';
|
||||
|
||||
// ─── Client ──────────────────────────────────────────────────────────────────
|
||||
|
||||
export class PaseoClientError extends Error {
|
||||
constructor(
|
||||
message: string,
|
||||
public readonly command: string,
|
||||
public readonly exitCode: number | null,
|
||||
public readonly stderr: string,
|
||||
) {
|
||||
super(message);
|
||||
this.name = 'PaseoClientError';
|
||||
}
|
||||
}
|
||||
|
||||
export class PaseoClient {
|
||||
/** @internal visible for testing */
|
||||
readonly bin: string;
|
||||
private readonly hostArgs: string[];
|
||||
|
||||
constructor(config?: Partial<PaseoClientConfig>) {
|
||||
this.bin = config?.paseoBin ?? DEFAULT_PASEO_BIN;
|
||||
this.hostArgs = config?.cliHost ? ['--host', config.cliHost] : [];
|
||||
}
|
||||
|
||||
// ─── Read operations (CLI `ls --json`, `inspect --json`) ──────────────────
|
||||
|
||||
/** List all non-archived agents. */
|
||||
async listAgents(): Promise<PaseoAgentListItem[]> {
|
||||
const raw = await this.runJson(['ls', '--json', ...this.hostArgs]);
|
||||
return raw as PaseoAgentListItem[];
|
||||
}
|
||||
|
||||
/** Get detailed status for a single agent by ID or prefix. */
|
||||
async getAgentStatus(agentId: string): Promise<PaseoAgentDetail> {
|
||||
const raw = await this.runJson(['inspect', '--json', agentId, ...this.hostArgs]);
|
||||
return raw as PaseoAgentDetail;
|
||||
}
|
||||
|
||||
/**
|
||||
* Quick liveness check — runs `paseo ls --json --limit 1` and returns success.
|
||||
* The daemon is healthy if the CLI exits 0.
|
||||
*/
|
||||
async health(): Promise<{ status: string }> {
|
||||
try {
|
||||
await this.runCli(['ls', '--json', '--limit', '1', ...this.hostArgs]);
|
||||
return { status: 'ok' };
|
||||
} catch {
|
||||
return { status: 'error' };
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Write operations (CLI subcommands) ───────────────────────────────────
|
||||
|
||||
/**
|
||||
* Import a provider session as a Paseo agent.
|
||||
* Uses `paseo import <sessionId> --provider <provider> [--label k=v]`.
|
||||
*/
|
||||
async importAgent(
|
||||
sessionId: string,
|
||||
provider: string,
|
||||
labels?: Record<string, string>,
|
||||
): Promise<PaseoAgentDetail> {
|
||||
const args: string[] = ['import', '--json', ...this.hostArgs];
|
||||
|
||||
if (provider) {
|
||||
args.push('--provider', provider);
|
||||
}
|
||||
if (labels) {
|
||||
for (const [k, v] of Object.entries(labels)) {
|
||||
args.push('--label', `${k}=${v}`);
|
||||
}
|
||||
}
|
||||
args.push(sessionId);
|
||||
|
||||
const raw = await this.runJson(args);
|
||||
return raw as PaseoAgentDetail;
|
||||
}
|
||||
|
||||
/** Archive (soft-delete) a Paseo agent by ID or prefix. */
|
||||
async archiveAgent(agentId: string): Promise<void> {
|
||||
await this.runCli(['archive', '--json', ...this.hostArgs, agentId]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a prompt to an existing agent.
|
||||
*
|
||||
* By default waits for the agent to complete the turn (streams text events
|
||||
* via the optional `onEvent` callback) and returns the structured result.
|
||||
* Pass `noWait: true` to fire-and-forget.
|
||||
*/
|
||||
async sendPrompt(
|
||||
agentId: string,
|
||||
prompt: string,
|
||||
options?: {
|
||||
noWait?: boolean;
|
||||
onEvent?: (event: { type: 'text' | 'reasoning'; text: string }) => void;
|
||||
signal?: AbortSignal;
|
||||
},
|
||||
): Promise<PaseoSendResult> {
|
||||
const args: string[] = ['send', '--json', ...this.hostArgs];
|
||||
|
||||
if (options?.noWait) {
|
||||
args.push('--no-wait');
|
||||
}
|
||||
|
||||
args.push(agentId, prompt);
|
||||
|
||||
// With --json and no --no-wait, the output is JSON after completion.
|
||||
// For streaming, we read stderr without --json for real-time text.
|
||||
const raw = await this.runCli(args, options?.signal);
|
||||
try {
|
||||
return JSON.parse(raw) as PaseoSendResult;
|
||||
} catch {
|
||||
return { text: raw, ok: true };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream-send: runs `paseo send` WITHOUT `--json`, forward text/reasoning
|
||||
* lines to onEvent in real time. Use when the caller wants to stream agent
|
||||
* output as it arrives rather than wait for the full JSON result.
|
||||
*/
|
||||
async streamSend(
|
||||
agentId: string,
|
||||
prompt: string,
|
||||
onEvent: (event: { type: 'text' | 'reasoning'; text: string }) => void,
|
||||
signal?: AbortSignal,
|
||||
): Promise<PaseoSendResult> {
|
||||
return new Promise<PaseoSendResult>((resolve, reject) => {
|
||||
const args = ['send', ...this.hostArgs, agentId, prompt];
|
||||
|
||||
const child = spawn(this.bin, args, {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
signal,
|
||||
});
|
||||
|
||||
let stdout = '';
|
||||
let stderr = '';
|
||||
|
||||
if (child.stdout) {
|
||||
const rl = createInterface({ input: child.stdout });
|
||||
rl.on('line', (line: string) => {
|
||||
stdout += line + '\n';
|
||||
// Forward as text event for real-time display
|
||||
onEvent({ type: 'text', text: line + '\n' });
|
||||
});
|
||||
}
|
||||
|
||||
if (child.stderr) {
|
||||
child.stderr.on('data', (chunk: Buffer) => {
|
||||
stderr += chunk.toString();
|
||||
});
|
||||
}
|
||||
|
||||
once(child, 'close').then((raw) => {
|
||||
const exitCode = (raw[0] as number | null) ?? 0;
|
||||
if (exitCode !== 0) {
|
||||
reject(
|
||||
new PaseoClientError(
|
||||
`paseo send failed (exit ${exitCode}): ${stderr.trim()}`,
|
||||
'send',
|
||||
exitCode,
|
||||
stderr,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
resolve({ text: stdout, ok: true });
|
||||
});
|
||||
|
||||
child.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
/** Interrupt/stop a running agent. */
|
||||
async stopAgent(agentId: string): Promise<void> {
|
||||
await this.runCli(['stop', ...this.hostArgs, agentId]);
|
||||
}
|
||||
|
||||
// ─── Private helpers ───────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Run a CLI command and return stdout as a string.
|
||||
* Throws PaseoClientError on non-zero exit.
|
||||
*/
|
||||
private async runCli(
|
||||
args: string[],
|
||||
signal?: AbortSignal,
|
||||
): Promise<string> {
|
||||
return new Promise<string>((resolve, reject) => {
|
||||
const child = spawn(this.bin, args, {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
signal,
|
||||
});
|
||||
|
||||
let stdout = '';
|
||||
let stderr = '';
|
||||
|
||||
if (child.stdout) {
|
||||
child.stdout.on('data', (chunk: Buffer) => {
|
||||
stdout += chunk.toString();
|
||||
});
|
||||
}
|
||||
|
||||
if (child.stderr) {
|
||||
child.stderr.on('data', (chunk: Buffer) => {
|
||||
stderr += chunk.toString();
|
||||
});
|
||||
}
|
||||
|
||||
child.on('error', (err: Error) => {
|
||||
// If signal aborted, treat as cancellation not error
|
||||
if (signal?.aborted) {
|
||||
resolve('');
|
||||
return;
|
||||
}
|
||||
reject(err);
|
||||
});
|
||||
|
||||
once(child, 'close').then((raw) => {
|
||||
const exitCode = (raw[0] as number | null) ?? 0;
|
||||
if (signal?.aborted) {
|
||||
resolve('');
|
||||
return;
|
||||
}
|
||||
if (exitCode !== 0) {
|
||||
const msg = stderr.trim() || `exit code ${exitCode}`;
|
||||
reject(
|
||||
new PaseoClientError(
|
||||
`paseo ${args[0] ?? '?'} failed: ${msg}`,
|
||||
args[0] ?? '?',
|
||||
exitCode,
|
||||
stderr,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
resolve(stdout);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a CLI command and parse stdout as JSON.
|
||||
* Throws PaseoClientError on non-zero exit or parse failure.
|
||||
*/
|
||||
private async runJson(args: string[]): Promise<unknown> {
|
||||
const stdout = await this.runCli(args);
|
||||
try {
|
||||
return JSON.parse(stdout);
|
||||
} catch (err) {
|
||||
throw new PaseoClientError(
|
||||
`paseo ${args[0] ?? '?'} returned invalid JSON: ${(stdout || '<empty>').slice(0, 200)}`,
|
||||
args[0] ?? '?',
|
||||
0,
|
||||
stdout,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,4 +3,9 @@ export { formatMemoryBlock } from './prompt.js';
|
||||
export { scanMemoryScopes } from './scan.js';
|
||||
export { parseMemoryEntries } from './entries.js';
|
||||
export { ensureMemoryScaffold, getMemoryRoot } from './paths.js';
|
||||
export { ContextTier } from './context-tier.js';
|
||||
export { DeepDream } from './deep-dream.js';
|
||||
export { CoreTier } from './core-tier.js';
|
||||
export type { MemoryEntry } from './entries.js';
|
||||
export type { ContextTierConfig, ConversationTurn } from './context-tier.js';
|
||||
export type { CoreTierEntry, CoreTierSearchResult, CoreTierSearchOptions } from './core-tier.js';
|
||||
|
||||
@@ -355,7 +355,7 @@ export const FlowRunStepUpdatedFrame = z.object({
|
||||
type: z.literal('flow_run_step_updated'),
|
||||
run_id: Uuid,
|
||||
step_id: z.string().min(1),
|
||||
status: z.enum(['pending', 'running', 'completed', 'failed', 'skipped', 'cancelled']),
|
||||
status: z.enum(['pending', 'running', 'completed', 'failed', 'skipped', 'cancelled', 'timed_out']),
|
||||
run_status: z.enum(['running', 'completed', 'failed', 'cancelled']).optional(),
|
||||
report: z.string().optional(),
|
||||
});
|
||||
@@ -446,6 +446,21 @@ export const ToolTraceFinishFrame = z.object({
|
||||
finished_at: z.string().datetime(),
|
||||
});
|
||||
|
||||
// ---- collision warning frame (v2.8) ----------------------------------------
|
||||
//
|
||||
// Published when the BooCoder detects that multiple worktrees/agents are editing
|
||||
// the same file concurrently. Advisory only — writes are not blocked.
|
||||
|
||||
const ConflictSeverityValue = z.enum(['same_line', 'adjacent_line', 'different_area']);
|
||||
|
||||
export const CollisionWarningFrame = z.object({
|
||||
type: z.literal('collision_warning'),
|
||||
file_path: z.string().min(1),
|
||||
worktrees: z.array(z.string().min(1)),
|
||||
agents: z.array(z.string().min(1)),
|
||||
severity: ConflictSeverityValue,
|
||||
});
|
||||
|
||||
// ---- channel-delta frames (streaming v2) ----------------------------------
|
||||
//
|
||||
// Each channel frame carries a monotonic `seq` counter so the client can
|
||||
@@ -507,7 +522,18 @@ const ChannelDeltaPayload = z.discriminatedUnion('channel', [
|
||||
export const ChannelDeltaFrame = z.object({
|
||||
type: z.literal('channel_delta'),
|
||||
seq: z.number().int().nonnegative(),
|
||||
...ChannelDeltaPayload.shape,
|
||||
channel: z.union([
|
||||
z.literal('text'), z.literal('tool_call'),
|
||||
z.literal('tool_result'), z.literal('status'), z.literal('error'),
|
||||
]),
|
||||
message_id: Uuid.optional(),
|
||||
chat_id: Uuid.optional(),
|
||||
content: z.string().optional(),
|
||||
tool_call: ToolCallShape.optional(),
|
||||
tool_message_id: Uuid.optional(),
|
||||
tool_call_id: ToolCallId.optional(),
|
||||
output: z.unknown().optional(),
|
||||
truncated: z.boolean().optional(),
|
||||
});
|
||||
|
||||
// ---- discriminated union ---------------------------------------------------
|
||||
@@ -541,6 +567,8 @@ export const WsFrameSchema = z.discriminatedUnion('type', [
|
||||
// tool trace
|
||||
ToolTraceStartFrame,
|
||||
ToolTraceFinishFrame,
|
||||
// collision warning
|
||||
CollisionWarningFrame,
|
||||
// channel-delta (streaming v2)
|
||||
ChannelDeltaFrame,
|
||||
// per-user
|
||||
@@ -593,6 +621,7 @@ export const KNOWN_FRAME_TYPES: readonly WsFrame['type'][] = [
|
||||
'battle_updated',
|
||||
'tool_trace_start',
|
||||
'tool_trace_finish',
|
||||
'collision_warning',
|
||||
'channel_delta',
|
||||
'chat_status',
|
||||
'session_updated',
|
||||
|
||||
15
pnpm-lock.yaml
generated
15
pnpm-lock.yaml
generated
@@ -97,6 +97,9 @@ importers:
|
||||
|
||||
apps/server:
|
||||
dependencies:
|
||||
'@ai-sdk/deepseek':
|
||||
specifier: ^2.0.35
|
||||
version: 2.0.35(zod@3.25.76)
|
||||
'@ai-sdk/openai-compatible':
|
||||
specifier: ^2.0.47
|
||||
version: 2.0.47(zod@3.25.76)
|
||||
@@ -302,6 +305,12 @@ packages:
|
||||
peerDependencies:
|
||||
zod: ^3.25.0 || ^4.0.0
|
||||
|
||||
'@ai-sdk/deepseek@2.0.35':
|
||||
resolution: {integrity: sha512-9DhYurbAvcurOEGN6u2myYDybrrzGfcrkG8hwmFjwTrePW6KCMggm0YxP7e8RkLYcQKqCEMgFlyEB4BM6EmiKg==}
|
||||
engines: {node: '>=18'}
|
||||
peerDependencies:
|
||||
zod: ^3.25.76 || ^4.1.8
|
||||
|
||||
'@ai-sdk/gateway@3.0.119':
|
||||
resolution: {integrity: sha512-VAhfRWC+JexZakkVfmjaJKaTj00x7/UHdE8kMWL3NhuQAlf8oXtg9r4dfvFZrByXxchGRBvYE3biEUyibkg0xg==}
|
||||
engines: {node: '>=18'}
|
||||
@@ -4363,6 +4372,12 @@ snapshots:
|
||||
dependencies:
|
||||
zod: 3.25.76
|
||||
|
||||
'@ai-sdk/deepseek@2.0.35(zod@3.25.76)':
|
||||
dependencies:
|
||||
'@ai-sdk/provider': 3.0.10
|
||||
'@ai-sdk/provider-utils': 4.0.27(zod@3.25.76)
|
||||
zod: 3.25.76
|
||||
|
||||
'@ai-sdk/gateway@3.0.119(zod@3.25.76)':
|
||||
dependencies:
|
||||
'@ai-sdk/provider': 3.0.10
|
||||
|
||||
Reference in New Issue
Block a user