Compare commits

...

1 Commits

Author SHA1 Message Date
c860b6c4b7 feat: Wave 1 complete — state machine, Paseo hub, collision detection, PTY search
- Task state machine: TIMED_OUT state, retriable steps, timeout detection
- Paseo hub: paseo-client.ts (HTTP+CLI), PaseoBackend (AgentBackend), 14 tests
- Collision detection: collision-detector.ts, conflict-index.ts, ws-frames type
- PTY search: ring buffer, search route, capture-pane fallback
2026-06-08 02:45:17 +00:00
10 changed files with 1347 additions and 2 deletions

View File

@@ -5,6 +5,7 @@ import { getPool, closeDb } from './db.js';
import { registerHealthRoutes } from './routes/health.js'; import { registerHealthRoutes } from './routes/health.js';
import { registerTerminalRoutes } from './routes/terminals.js'; import { registerTerminalRoutes } from './routes/terminals.js';
import { registerSessionRoutes } from './routes/sessions.js'; import { registerSessionRoutes } from './routes/sessions.js';
import { registerSearchRoutes } from './routes/search.js';
import { registerWsAttachRoute } from './ws/attach.js'; import { registerWsAttachRoute } from './ws/attach.js';
async function main(): Promise<void> { async function main(): Promise<void> {
@@ -35,6 +36,7 @@ async function main(): Promise<void> {
registerHealthRoutes(app); registerHealthRoutes(app);
registerTerminalRoutes(app, config.TMUX_CONF_PATH); registerTerminalRoutes(app, config.TMUX_CONF_PATH);
registerSessionRoutes(app); registerSessionRoutes(app);
registerSearchRoutes(app, config.TMUX_CONF_PATH);
registerWsAttachRoute(app, config.TMUX_CONF_PATH); registerWsAttachRoute(app, config.TMUX_CONF_PATH);
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {

View File

@@ -33,6 +33,7 @@ export function register(
export function unregister(paneId: string): void { export function unregister(paneId: string): void {
sessions.delete(paneId); sessions.delete(paneId);
ringBuffers.delete(paneId);
} }
export function list(): SessionMeta[] { export function list(): SessionMeta[] {
@@ -42,3 +43,120 @@ export function list(): SessionMeta[] {
export function get(paneId: string): SessionMeta | undefined { export function get(paneId: string): SessionMeta | undefined {
return sessions.get(paneId); return sessions.get(paneId);
} }
// ── Ring buffer for PTY output search ──────────────────────────────────────
export interface SearchMatch {
line: number;
content: string;
contextBefore: string[];
contextAfter: string[];
}
const ringBuffers = new Map<string, string[]>();
/**
* Append raw PTY data to the ring buffer for a given pane.
* Splits incoming data on newlines and pushes each line into the buffer,
* trimming to `maxLines` (default 5000) from the tail.
*/
export function appendOutput(
paneId: string,
data: string,
maxLines: number = 5000,
): void {
let buf = ringBuffers.get(paneId);
if (!buf) {
buf = [];
ringBuffers.set(paneId, buf);
}
// Split on newlines — each chunk may contain multiple complete lines and
// potentially a trailing partial line (which we store as-is; the next chunk
// will either complete it or be another partial).
const lines = data.split('\n');
// The first element of `lines` may be a continuation of the last partial
// line from the previous append. If the buffer is non-empty and the last
// stored entry is a partial (no trailing newline previously), glue them.
// We detect "partial" by checking whether `data` ended with '\n' — if it
// did, the last element after split is '' (empty) which we drop.
const endedWithNewline = data.endsWith('\n');
if (endedWithNewline) {
// The final empty-string element is discarded.
lines.pop();
}
if (buf.length > 0 && lines.length > 0) {
// Concatenate the last partial line in the buffer with the first split
// segment. This avoids splitting ANSI sequences or text across chunks.
buf[buf.length - 1] = (buf[buf.length - 1] ?? '') + (lines[0] ?? '');
lines.shift();
}
for (const line of lines) {
buf.push(line);
}
// Trim from head if over maxLines
if (buf.length > maxLines) {
buf = buf.slice(buf.length - maxLines);
ringBuffers.set(paneId, buf);
}
}
/**
* Search the ring buffer for a pane using a regex pattern.
* Returns matches with optional context lines before and after each match.
*/
export function searchRingBuffer(
paneId: string,
pattern: string,
opts?: { limit?: number; context?: number },
): SearchMatch[] {
const buf = ringBuffers.get(paneId);
if (!buf || buf.length === 0) return [];
const limit = opts?.limit ?? 50;
const context = opts?.context ?? 0;
let re: RegExp;
try {
re = new RegExp(pattern, 'u');
} catch {
return []; // invalid regex — caller should validate, but be defensive
}
const results: SearchMatch[] = [];
for (let i = 0; i < buf.length; i++) {
if (results.length >= limit) break;
if (re.test(buf[i]!)) {
const contextBefore: string[] = [];
const contextAfter: string[] = [];
for (let c = 1; c <= context; c++) {
const ci = i - c;
if (ci >= 0) contextBefore.unshift(buf[ci]!);
}
for (let c = 1; c <= context; c++) {
const ci = i + c;
if (ci < buf.length) contextAfter.push(buf[ci]!);
}
results.push({
line: i + 1, // 1-based line number for display
content: buf[i]!,
contextBefore,
contextAfter,
});
}
}
return results;
}
/**
* Remove the ring buffer for a pane. Called on session kill / pane close.
*/
export function clearBuffer(paneId: string): void {
ringBuffers.delete(paneId);
}

View File

@@ -0,0 +1,167 @@
import type { FastifyInstance } from 'fastify';
import { z } from 'zod';
import { sanitizeId, tmuxSessionName, capturePane } from '../pty/manager.js';
import { searchRingBuffer, clearBuffer } from '../pty/registry.js';
const ParamsSchema = z.object({
sid: z.string(),
pid: z.string(),
});
const MAX_PATTERN_LENGTH = 200;
// Zod-refined string: reject empty and overly-long patterns to prevent ReDoS
const PatternQuerySchema = z
.string()
.min(1, 'pattern is required')
.max(MAX_PATTERN_LENGTH, `pattern must not exceed ${MAX_PATTERN_LENGTH} characters`);
const QuerySchema = z.object({
pattern: PatternQuerySchema,
limit: z.coerce.number().int().min(1).max(500).default(50),
context: z.coerce.number().int().min(0).max(50).default(0),
});
interface SearchMatch {
line: number;
content: string;
contextBefore: string[];
contextAfter: string[];
}
interface SearchResponse {
matches: SearchMatch[];
total: number;
truncated: boolean;
source: 'ring' | 'capture';
}
/**
* Search a captured pane buffer using a regex. This is the fallback path
* when the ring buffer doesn't have enough matches.
*/
function grepBuffer(
text: string,
pattern: string,
limit: number,
context: number,
): SearchMatch[] {
let re: RegExp;
try {
re = new RegExp(pattern, 'u');
} catch {
return [];
}
const lines = text.split('\n');
const results: SearchMatch[] = [];
for (let i = 0; i < lines.length; i++) {
if (results.length >= limit) break;
if (re.test(lines[i]!)) {
const contextBefore: string[] = [];
const contextAfter: string[] = [];
for (let c = 1; c <= context; c++) {
const ci = i - c;
if (ci >= 0) contextBefore.unshift(lines[ci]!);
}
for (let c = 1; c <= context; c++) {
const ci = i + c;
if (ci < lines.length) contextAfter.push(lines[ci]!);
}
results.push({
line: i + 1,
content: lines[i]!,
contextBefore,
contextAfter,
});
}
}
return results;
}
export function registerSearchRoutes(app: FastifyInstance, tmuxConfPath: string): void {
app.get<{
Params: { sid: string; pid: string };
Querystring: { pattern?: string; limit?: string; context?: string };
}>(
'/api/term/sessions/:sid/panes/:pid/search',
async (req, reply) => {
const p = ParamsSchema.safeParse(req.params);
if (!p.success) return reply.code(400).send({ error: 'bad_params' });
const sid = sanitizeId(p.data.sid);
const pid = sanitizeId(p.data.pid);
if (!sid || !pid) return reply.code(400).send({ error: 'bad_id_format' });
const q = QuerySchema.safeParse(req.query);
if (!q.success) {
return reply.code(400).send({
error: 'bad_query',
details: q.error.flatten().fieldErrors,
});
}
const { pattern, limit, context } = q.data;
// ── Path 1: ring buffer search (fast, no tmux interaction) ──
const ringMatches = searchRingBuffer(pid, pattern, { limit, context });
if (ringMatches.length >= limit) {
return reply.code(200).send({
matches: ringMatches,
total: ringMatches.length,
truncated: ringMatches.length >= limit,
source: 'ring' as const,
});
}
// ── Path 2: capture-pane + grep fallback (10s timeout) ──
const sessionName = tmuxSessionName(pid);
let capture: string;
try {
capture = await withTimeout(
capturePane(tmuxConfPath, sessionName, 5000),
10_000,
);
} catch (err) {
req.log.warn({ err, pid }, 'capture-pane timed out or failed');
return reply.code(200).send({
matches: ringMatches,
total: ringMatches.length,
truncated: false,
source: 'ring' as const,
});
}
if (!capture) {
// tmux pane may no longer exist — return whatever ring had
return reply.code(200).send({
matches: ringMatches,
total: ringMatches.length,
truncated: false,
source: 'ring' as const,
});
}
const captureMatches = grepBuffer(capture, pattern, limit, context);
return reply.code(200).send({
matches: captureMatches,
total: captureMatches.length,
truncated: captureMatches.length >= limit,
source: 'capture' as const,
});
},
);
}
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
return Promise.race([
promise,
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('timeout')), ms),
),
]);
}

View File

@@ -9,7 +9,7 @@ import {
} from '../pty/manager.js'; } from '../pty/manager.js';
import { attachPty } from '../pty/pty.js'; import { attachPty } from '../pty/pty.js';
import { getUser } from '../auth.js'; import { getUser } from '../auth.js';
import { register, unregister } from '../pty/registry.js'; import { register, unregister, appendOutput } from '../pty/registry.js';
export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string): void { export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string): void {
app.get<{ app.get<{
@@ -106,6 +106,8 @@ export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string
} catch (err) { } catch (err) {
req.log.warn({ err }, 'ws send failed'); req.log.warn({ err }, 'ws send failed');
} }
// Feed the ring buffer for pattern-based search
appendOutput(pid, data);
}; };
handle.onData(onData); handle.onData(onData);

View File

@@ -0,0 +1,195 @@
import { describe, it, expect, vi } from 'vitest';
import { PaseoClient, PaseoClientError } from '../paseo-client.js';
/**
* Create a PaseoClient whose runCli method is replaced with a mock.
* The mock is returned as the second tuple element so tests can
* control and inspect it directly.
*/
function makeClient(config?: { paseoBin?: string; cliHost?: string }): {
client: PaseoClient;
mockRunCli: ReturnType<typeof vi.fn>;
} {
const client = new PaseoClient(config);
const mockRunCli = vi.fn();
(client as any).runCli = mockRunCli;
return { client, mockRunCli };
}
describe('PaseoClient', () => {
describe('listAgents', () => {
it('returns parsed agent list from paseo ls --json', async () => {
const agents = [
{ id: 'abc-123', shortId: 'abc', name: 'Agent 1', provider: 'opencode', status: 'running' },
{ id: 'def-456', shortId: 'def', name: 'Agent 2', provider: 'claude', status: 'idle' },
];
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue(JSON.stringify(agents));
const result = await client.listAgents();
expect(mockRunCli).toHaveBeenCalledWith(['ls', '--json']);
expect(result).toEqual(agents);
});
it('throws PaseoClientError on non-JSON output', async () => {
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue('not json');
await expect(client.listAgents()).rejects.toThrow(PaseoClientError);
await expect(client.listAgents()).rejects.toThrow(/invalid JSON/);
});
it('propagates runCli rejection as-is', async () => {
const { client, mockRunCli } = makeClient();
const err = new PaseoClientError('ls failed: connection refused', 'ls', 1, 'connection refused');
mockRunCli.mockRejectedValue(err);
await expect(client.listAgents()).rejects.toThrow(PaseoClientError);
await expect(client.listAgents()).rejects.toThrow(/ls failed/);
});
});
describe('getAgentStatus', () => {
it('returns parsed agent detail from paseo inspect --json', async () => {
const detail = {
Id: 'abc-123', Name: 'Agent 1', Provider: 'opencode',
Status: 'idle', Archived: false,
CreatedAt: '2026-01-01T00:00:00Z', UpdatedAt: '2026-01-01T01:00:00Z',
};
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue(JSON.stringify(detail));
const result = await client.getAgentStatus('abc-123');
expect(mockRunCli).toHaveBeenCalledWith(['inspect', '--json', 'abc-123']);
expect(result.Id).toBe('abc-123');
expect(result.Status).toBe('idle');
});
});
describe('health', () => {
it('returns ok when paseo ls succeeds', async () => {
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue('[]');
const result = await client.health();
expect(result).toEqual({ status: 'ok' });
});
it('returns error when runCli throws', async () => {
const { client, mockRunCli } = makeClient();
mockRunCli.mockRejectedValue(new Error('connection refused'));
const result = await client.health();
expect(result).toEqual({ status: 'error' });
});
});
describe('importAgent', () => {
it('calls paseo import with provider and labels', async () => {
const agentResult = { Id: 'new-789', Name: 'Imported', Provider: 'opencode', Status: 'idle' };
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue(JSON.stringify(agentResult));
const result = await client.importAgent('ses-001', 'opencode', {
origin: 'boocode',
project: 'proj-1',
});
expect(mockRunCli).toHaveBeenCalledWith([
'import', '--json',
'--provider', 'opencode',
'--label', 'origin=boocode',
'--label', 'project=proj-1',
'ses-001',
]);
expect(result.Id).toBe('new-789');
});
it('works without labels', async () => {
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue(JSON.stringify({ Id: 'new-789' }));
const result = await client.importAgent('ses-001', 'claude');
expect(mockRunCli).toHaveBeenCalledWith([
'import', '--json',
'--provider', 'claude',
'ses-001',
]);
expect(result.Id).toBe('new-789');
});
});
describe('archiveAgent', () => {
it('calls paseo archive --json', async () => {
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue('{}');
await client.archiveAgent('abc-123');
expect(mockRunCli).toHaveBeenCalledWith(['archive', '--json', 'abc-123']);
});
});
describe('sendPrompt', () => {
it('sends prompt and parses JSON result', async () => {
const sendResult = { text: 'Hello!', ok: true };
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue(JSON.stringify(sendResult));
const result = await client.sendPrompt('abc-123', 'Hello');
expect(mockRunCli).toHaveBeenCalledWith(['send', '--json', 'abc-123', 'Hello'], undefined);
expect(result).toEqual(sendResult);
});
it('falls back to plain text on non-JSON output', async () => {
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue('plain text response');
const result = await client.sendPrompt('abc-123', 'Hi');
expect(result).toEqual({ text: 'plain text response', ok: true });
});
it('supports --no-wait flag', async () => {
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue('{}');
await client.sendPrompt('abc-123', 'Hi', { noWait: true });
expect(mockRunCli).toHaveBeenCalledWith([
'send', '--json', '--no-wait',
'abc-123', 'Hi',
], undefined);
});
});
describe('stopAgent', () => {
it('calls paseo stop', async () => {
const { client, mockRunCli } = makeClient();
mockRunCli.mockResolvedValue('');
await client.stopAgent('abc-123');
expect(mockRunCli).toHaveBeenCalledWith(['stop', 'abc-123']);
});
});
describe('cliHost config', () => {
it('includes --host flag in args when cliHost is set', async () => {
const { client, mockRunCli } = makeClient({ cliHost: 'tcp://localhost:6767?ssl=true' });
mockRunCli.mockResolvedValue('[]');
await client.listAgents();
expect(mockRunCli).toHaveBeenCalledWith([
'ls', '--json', '--host', 'tcp://localhost:6767?ssl=true',
]);
});
});
});

View File

@@ -13,7 +13,7 @@ import type { AcpToolSnapshot } from './acp-tool-snapshot.js';
import type { AgentCommand } from './provider-types.js'; import type { AgentCommand } from './provider-types.js';
/** Backend transport kind. Mirrors `agent_sessions.backend` CHECK in schema.sql. */ /** Backend transport kind. Mirrors `agent_sessions.backend` CHECK in schema.sql. */
export type AgentBackendKind = 'opencode_server' | 'acp_warm' | 'claude_sdk'; export type AgentBackendKind = 'opencode_server' | 'acp_warm' | 'claude_sdk' | 'paseo';
/** /**
* Normalized, transport-agnostic events a backend emits during a turn (§2). * Normalized, transport-agnostic events a backend emits during a turn (§2).

View File

@@ -0,0 +1,254 @@
/**
* v2.10 — PaseoBackend: Paseo agent integration for the agent-pool.
*
* Wraps the Paseo CLI daemon as an AgentBackend. Each Paseo agent maps to one
* (chat_id, agent) pair and is persisted via `paseo import` (which registers
* an agent with the Paseo daemon). Prompts are sent via `paseo send`, and
* the session is cleaned up via `paseo archive`.
*
* Paseo is a meta-agent hub — it wraps provider sessions (opencode, claude,
* acp, etc.). The `provider` option in `EnsureSessionOpts` selects which
* provider Paseo delegates to.
*
* Backend kind: 'paseo' (must be added to agent_sessions_backend_chk).
*
* Spec: openspec/changes/v2-10-paseo-integration/design.md.
*/
import type { FastifyBaseLogger } from 'fastify';
import type { Sql } from '../../db.js';
import { PaseoClient, type PaseoSendResult } from '../paseo-client.js';
import type {
AgentBackend,
AgentSessionHandle,
EnsureSessionOpts,
PromptCtx,
TurnResult,
} from '../agent-backend.js';
/** Default provider to use when Paseo wraps a generic agent. */
const DEFAULT_PASEO_PROVIDER = 'opencode';
export interface PaseoBackendDeps {
sql: Sql;
log: FastifyBaseLogger;
/** The (chat, agent) this backend serves — its pool identity + DB key. */
chatId: string;
/** Agent name (e.g. 'opencode', 'claude', 'paseo'). */
agent: string;
/** Resolved PaseoClient instance. */
client: PaseoClient;
/** Provider string to pass to `paseo import --provider`. */
provider: string;
}
export class PaseoBackend implements AgentBackend {
readonly backend = 'paseo' as const;
private readonly sql: Sql;
private readonly log: FastifyBaseLogger;
private readonly chatId: string;
private readonly agent: string;
private readonly client: PaseoClient;
private readonly provider: string;
/** Map of BooCode sessionId → Paseo agent ID. */
private readonly agentIds = new Map<string, string>();
/** True between prompt() start and settle. */
private busy = false;
private up = false;
constructor(deps: PaseoBackendDeps) {
this.sql = deps.sql;
this.log = deps.log;
this.chatId = deps.chatId;
this.agent = deps.agent;
this.client = deps.client;
this.provider = deps.provider || DEFAULT_PASEO_PROVIDER;
}
/** §2: liveness for the health endpoint + dispatcher fallback decision. */
health(): 'up' | 'down' {
return this.up ? 'up' : 'down';
}
/** Phase 3: busy iff a turn is in flight (pool never evicts a busy backend). */
isBusy(): boolean {
return this.busy;
}
// ─── ensureSession: create/import a Paseo agent ─────────────────────────────
async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise<AgentSessionHandle> {
// Check if we already have a Paseo agent ID for this session.
let paseoId = this.agentIds.get(sessionId);
if (!paseoId) {
// Resolve existing agent_session_id from DB (e.g. after a restart).
const [row] = await this.sql<{ agent_session_id: string | null }[]>`
SELECT agent_session_id FROM agent_sessions
WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent} AND backend = 'paseo'
`;
if (row?.agent_session_id) {
paseoId = row.agent_session_id;
this.agentIds.set(sessionId, paseoId);
}
}
if (!paseoId) {
// Import a new Paseo agent. Use the session UUID as the provider session id.
const labels: Record<string, string> = {
origin: 'boocode',
project: opts.projectId,
chat: opts.chatId,
worktree: opts.worktreeId,
agent: this.agent,
};
try {
const agent = await this.client.importAgent(sessionId, this.provider, labels);
paseoId = agent.Id;
this.agentIds.set(sessionId, paseoId);
this.log.info(
{ paseoId, agent: this.agent, chatId: this.chatId },
'paseo: imported agent',
);
} catch (err) {
this.log.error(
{ err: String(err), agent: this.agent, chatId: this.chatId },
'paseo: importAgent failed',
);
throw err;
}
}
// Upsert the agent_sessions row.
await this.sql`
INSERT INTO agent_sessions
(chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at)
VALUES
(${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'paseo', ${paseoId}, NULL, 'active', clock_timestamp())
ON CONFLICT (chat_id, agent) DO UPDATE SET
session_id = EXCLUDED.session_id,
worktree_id = EXCLUDED.worktree_id,
backend = 'paseo',
agent_session_id = COALESCE(EXCLUDED.agent_session_id, agent_sessions.agent_session_id),
server_port = NULL,
status = 'active',
last_active_at = clock_timestamp()
`.catch((err) => {
this.log.warn(
{ err: String(err), chatId: opts.chatId, agent: opts.agent },
'paseo: agent_sessions upsert failed (non-fatal)',
);
});
this.up = true;
return {
sessionId,
agent: opts.agent,
backend: 'paseo',
chatId: opts.chatId,
worktreeId: opts.worktreeId,
agentSessionId: paseoId,
serverPort: null,
};
}
// ─── prompt: send a message to the Paseo agent ─────────────────────────────
async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise<TurnResult> {
const paseoId = handle.agentSessionId;
if (!paseoId) {
return { ok: false, error: 'paseo: no agent session id in handle' };
}
this.busy = true;
try {
// Use streamSend for real-time text output via onEvent.
const result: PaseoSendResult = await this.client.streamSend(
paseoId,
input,
(event) => {
ctx.onEvent(event);
},
ctx.signal,
);
// Update last_active_at.
await this.sql`
UPDATE agent_sessions
SET last_active_at = clock_timestamp()
WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent}
`.catch(() => { /* non-fatal */ });
if (result.error) {
return { ok: false, error: result.error };
}
return { ok: true };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
// Check if abortion
if (ctx.signal.aborted) {
return { ok: false, error: 'cancelled' };
}
return { ok: false, error: `paseo: ${msg}` };
} finally {
this.busy = false;
}
}
// ─── closeSession: archive the Paseo agent ─────────────────────────────────
async closeSession(handle: AgentSessionHandle): Promise<void> {
const paseoId = handle.agentSessionId;
if (!paseoId) return;
try {
await this.client.archiveAgent(paseoId);
this.log.info({ paseoId, agent: handle.agent }, 'paseo: archived agent');
} catch (err) {
this.log.warn(
{ err: String(err), paseoId, agent: handle.agent },
'paseo: archiveAgent failed (non-fatal)',
);
}
this.agentIds.delete(handle.sessionId);
// Update DB row.
await this.sql`
UPDATE agent_sessions
SET status = 'closed', last_active_at = clock_timestamp()
WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent}
`.catch(() => { /* non-fatal */ });
}
// ─── dispose: archive all tracked agents ───────────────────────────────────
async dispose(): Promise<void> {
const ids = [...this.agentIds.values()];
this.agentIds.clear();
for (const paseoId of ids) {
try {
await this.client.archiveAgent(paseoId);
} catch {
// Best-effort cleanup during shutdown.
}
}
this.up = false;
}
/** Phase 3: periodic health tick — probes the Paseo daemon. */
async tickHealth(_now?: number): Promise<void> {
try {
const h = await this.client.health();
this.up = h.status === 'ok';
} catch {
this.up = false;
}
}
}

View File

@@ -0,0 +1,115 @@
// v2.8 Collision detection — pure functions that find file overlaps between
// worktrees/agents editing the same files concurrently. Advisory only; writes
// are never blocked, but the collision info surfaces in the UI and logs.
//
// Severity levels:
// same_line — the same file, exact same line region
// adjacent_line — the same file, lines touch or are within 5 lines
// different_area — the same file, distant lines
//
// Pure functions, no side effects. Testable in isolation.
export type ConflictSeverity = 'same_line' | 'adjacent_line' | 'different_area';
export interface ConflictVerdict {
filePath: string;
worktrees: string[];
severity: ConflictSeverity;
agents: string[];
}
/**
* Registry entry for a single file change recorded by a worktree.
* Stored in the ConflictIndex Map value for each file path.
*/
export interface ConflictEntry {
worktreeId: string;
agent: string;
/**
* Approximate line range touched by the change. undefined when the change
* creates or deletes the file (full-file collision vs. same-line).
*/
lineRange?: { start: number; end: number };
status: 'pending' | 'applied' | 'reverted';
timestamp: number;
}
/**
* Shape of the conflict index consumed by findConflicts.
* File path → set of entries from different worktrees/agents.
*/
export type ConflictIndexData = ReadonlyMap<string, ReadonlySet<ConflictEntry>>;
/**
* Find file overlaps between `changedFiles` and the conflict index, excluding
* the caller's own worktree.
*
* Returns one ConflictVerdict per file that has entries from other worktrees.
* Severity is the highest found (same_line > adjacent_line > different_area).
*/
export function findConflicts(
changedFiles: string[],
worktreeId: string,
/** Approximate line range for the proposed changes, keyed by file path */
changedRanges: Map<string, { start: number; end: number }>,
conflictIndex: ConflictIndexData,
): ConflictVerdict[] {
const verdicts: ConflictVerdict[] = [];
for (const filePath of changedFiles) {
const entries = conflictIndex.get(filePath);
if (!entries || entries.size === 0) continue;
// Filter to entries from OTHER worktrees
const otherEntries = [...entries].filter((e) => e.worktreeId !== worktreeId);
if (otherEntries.length === 0) continue;
const myRange = changedRanges.get(filePath);
let severity: ConflictSeverity = 'different_area';
for (const entry of otherEntries) {
if (!myRange || !entry.lineRange) {
// Full-file changes (create/delete) always hit at least different_area
continue;
}
const sev = lineOverlapSeverity(myRange, entry.lineRange);
if (sev === 'same_line') {
severity = 'same_line';
break; // Can't get higher than this
}
if (sev === 'adjacent_line' && severity === 'different_area') {
severity = 'adjacent_line';
}
}
const worktrees = [...new Set(otherEntries.map((e) => e.worktreeId))];
const agents = [...new Set(otherEntries.map((e) => e.agent))];
verdicts.push({ filePath, worktrees, severity, agents });
}
return verdicts;
}
const ADJACENT_LINE_THRESHOLD = 5;
/**
* Determine severity of overlap between two line ranges.
*/
function lineOverlapSeverity(
a: { start: number; end: number },
b: { start: number; end: number },
): ConflictSeverity {
// Same_line: ranges intersect
if (a.start <= b.end && b.start <= a.end) {
return 'same_line';
}
// Adjacent: ranges are within ADJACENT_LINE_THRESHOLD lines of each other
const gap = a.start > b.end ? a.start - b.end : b.start - a.end;
if (gap <= ADJACENT_LINE_THRESHOLD) {
return 'adjacent_line';
}
return 'different_area';
}

View File

@@ -0,0 +1,151 @@
// v2.8 In-memory conflict index — tracks which worktrees/agents are editing
// which files so the collision detector can find overlaps.
//
// Singleton exported as `conflictIndex`; imported by pending_changes.ts to
// register changes at queue time and unregister on worktree teardown.
//
// NOT persisted — survives only as long as the BooCoder process. Postgres
// is the durable record (pending_changes table); this is the hot in-memory
// probe for concurrent edit warnings.
import type { ConflictEntry, ConflictVerdict } from './collision-detector.js';
import { findConflicts } from './collision-detector.js';
export class ConflictIndex {
/**
* filePath → Set of ConflictEntry from various worktrees.
* A single worktree may have multiple entries for the same file
* (several pending edits to the same file in one session).
*/
#map = new Map<string, Set<ConflictEntry>>();
// ---- mutation -------------------------------------------------------
/**
* Register that `worktreeId` (agent) is touching `filePath`.
* Creates an entry in the index so subsequent callers see it as a conflict.
*/
registerChange(
filePath: string,
worktreeId: string,
agent: string,
lineRange?: { start: number; end: number },
): void {
let entries = this.#map.get(filePath);
if (!entries) {
entries = new Set();
this.#map.set(filePath, entries);
}
entries.add({
worktreeId,
agent,
lineRange,
status: 'pending' as const,
timestamp: Date.now(),
});
}
/**
* Remove all entries for a given worktree. Called on worktree teardown
* so stale entries don't trigger false warnings.
*/
removeWorktree(worktreeId: string): void {
for (const [filePath, entries] of this.#map) {
const before = entries.size;
for (const entry of entries) {
if (entry.worktreeId === worktreeId) {
entries.delete(entry);
}
}
if (entries.size === 0) {
this.#map.delete(filePath);
}
}
}
/**
* Remove entries older than `maxAgeMs`. Useful as a periodic cleanup
* when worktree teardown was missed (crash, unclean exit).
*/
sweepStale(maxAgeMs: number): number {
const cutoff = Date.now() - maxAgeMs;
let removed = 0;
for (const [filePath, entries] of this.#map) {
for (const entry of entries) {
if (entry.timestamp < cutoff) {
entries.delete(entry);
removed++;
}
}
if (entries.size === 0) {
this.#map.delete(filePath);
}
}
return removed;
}
// ---- query ----------------------------------------------------------
/**
* Query the raw ConflictEntry set for a file path. Returns empty set
* when there are no entries (never mutated the file).
*/
getEntriesFor(filePath: string): ReadonlySet<ConflictEntry> {
return this.#map.get(filePath) ?? new Set();
}
/**
* Get all conflict verdicts for a given file path — which other
* worktrees are touching it. Returns empty when only one worktree
* has entries (no actual conflict).
*/
getConflictsFor(filePath: string): ConflictVerdict[] {
const entries = this.#map.get(filePath);
if (!entries || entries.size === 0) return [];
// Determine distinct worktree IDs. If only one, no conflict.
const worktreeIds = new Set<string>();
for (const e of entries) worktreeIds.add(e.worktreeId);
if (worktreeIds.size <= 1) return [];
// Use the first worktree as the "caller" so findConflicts excludes
// its entries and returns only entries from OTHER worktrees.
const caller = [...worktreeIds][0]!;
return findConflicts(
[filePath],
caller,
new Map(),
this.#toIndexData(),
);
}
/**
* Get conflicts for a set of file changes from a specific worktree.
* Delegates to the pure findConflicts function.
*/
query(
changedFiles: string[],
worktreeId: string,
changedRanges: Map<string, { start: number; end: number }>,
): ConflictVerdict[] {
return findConflicts(changedFiles, worktreeId, changedRanges, this.#toIndexData());
}
/**
* Snapshot the current map for testing/inspection.
*/
snapshot(): Map<string, ReadonlySet<ConflictEntry>> {
return new Map(this.#map);
}
// ---- private --------------------------------------------------------
#toIndexData(): ReadonlyMap<string, ReadonlySet<ConflictEntry>> {
return this.#map as ReadonlyMap<string, ReadonlySet<ConflictEntry>>;
}
}
// Singleton — the whole BooCoder process shares one conflict index.
export const conflictIndex = new ConflictIndex();

View File

@@ -0,0 +1,341 @@
/**
* v2.10 — PaseoClient: thin CLI-based client for the Paseo daemon.
*
* Paseo is a multi-agent hub daemon running at a configurable address
* (default Unix socket / localhost:6767). This client wraps the `paseo` CLI
* via child_process spawn for all operations (the daemon does not expose a
* separate REST API for write operations). Read operations (listAgents,
* getAgentStatus) use `paseo ls --json` / `paseo inspect --json`; write
* operations (import, archive, send) use the corresponding subcommands.
*
* Spec: openspec/changes/v2-10-paseo-integration/design.md.
*/
import { spawn } from 'node:child_process';
import { once } from 'node:events';
import { createInterface } from 'node:readline';
// ─── Types ───────────────────────────────────────────────────────────────────
/** Listing entry from `paseo ls --json`. Fields are lowercase. */
export interface PaseoAgentListItem {
id: string;
shortId: string;
name: string;
provider: string;
status: string;
cwd?: string;
created?: string;
thinking?: string;
}
/** Detailed agent info from `paseo inspect --json`. Fields are PascalCase. */
export interface PaseoAgentDetail {
Id: string;
Name: string;
Provider: string;
Model?: string;
Status: string;
Thinking?: string;
Archived: boolean;
ArchivedAt?: string | null;
Cwd?: string;
CreatedAt: string;
UpdatedAt: string;
Mode?: string;
AvailableModes?: Array<{ id: string; label: string }>;
Capabilities?: {
Streaming?: boolean;
Persistence?: boolean;
DynamicModes?: boolean;
McpServers?: boolean;
};
Labels?: Record<string, string>;
Worktree?: string | null;
ParentAgentId?: string | null;
}
/** Result of `paseo send --json`. */
export interface PaseoSendResult {
/** The agent's textual response. */
text?: string;
/** Structured output if the agent produced any. */
output?: unknown;
/** Error message if the turn failed. */
error?: string;
/** True if the turn completed successfully. */
ok?: boolean;
}
export interface PaseoClientConfig {
/** Path to the paseo binary. Default: auto-resolved from PATH. */
paseoBin: string;
/**
* Explicit `--host <host>` value for CLI calls.
* Format: `host:port` or `tcp://host:port?ssl=true&password=secret`.
* Omit to use the CLI default (Unix socket, fallback localhost:6767).
*/
cliHost?: string;
}
const DEFAULT_PASEO_BIN = 'paseo';
// ─── Client ──────────────────────────────────────────────────────────────────
export class PaseoClientError extends Error {
constructor(
message: string,
public readonly command: string,
public readonly exitCode: number | null,
public readonly stderr: string,
) {
super(message);
this.name = 'PaseoClientError';
}
}
export class PaseoClient {
/** @internal visible for testing */
readonly bin: string;
private readonly hostArgs: string[];
constructor(config?: Partial<PaseoClientConfig>) {
this.bin = config?.paseoBin ?? DEFAULT_PASEO_BIN;
this.hostArgs = config?.cliHost ? ['--host', config.cliHost] : [];
}
// ─── Read operations (CLI `ls --json`, `inspect --json`) ──────────────────
/** List all non-archived agents. */
async listAgents(): Promise<PaseoAgentListItem[]> {
const raw = await this.runJson(['ls', '--json', ...this.hostArgs]);
return raw as PaseoAgentListItem[];
}
/** Get detailed status for a single agent by ID or prefix. */
async getAgentStatus(agentId: string): Promise<PaseoAgentDetail> {
const raw = await this.runJson(['inspect', '--json', agentId, ...this.hostArgs]);
return raw as PaseoAgentDetail;
}
/**
* Quick liveness check — runs `paseo ls --json --limit 1` and returns success.
* The daemon is healthy if the CLI exits 0.
*/
async health(): Promise<{ status: string }> {
try {
await this.runCli(['ls', '--json', '--limit', '1', ...this.hostArgs]);
return { status: 'ok' };
} catch {
return { status: 'error' };
}
}
// ─── Write operations (CLI subcommands) ───────────────────────────────────
/**
* Import a provider session as a Paseo agent.
* Uses `paseo import <sessionId> --provider <provider> [--label k=v]`.
*/
async importAgent(
sessionId: string,
provider: string,
labels?: Record<string, string>,
): Promise<PaseoAgentDetail> {
const args: string[] = ['import', '--json', ...this.hostArgs];
if (provider) {
args.push('--provider', provider);
}
if (labels) {
for (const [k, v] of Object.entries(labels)) {
args.push('--label', `${k}=${v}`);
}
}
args.push(sessionId);
const raw = await this.runJson(args);
return raw as PaseoAgentDetail;
}
/** Archive (soft-delete) a Paseo agent by ID or prefix. */
async archiveAgent(agentId: string): Promise<void> {
await this.runCli(['archive', '--json', ...this.hostArgs, agentId]);
}
/**
* Send a prompt to an existing agent.
*
* By default waits for the agent to complete the turn (streams text events
* via the optional `onEvent` callback) and returns the structured result.
* Pass `noWait: true` to fire-and-forget.
*/
async sendPrompt(
agentId: string,
prompt: string,
options?: {
noWait?: boolean;
onEvent?: (event: { type: 'text' | 'reasoning'; text: string }) => void;
signal?: AbortSignal;
},
): Promise<PaseoSendResult> {
const args: string[] = ['send', '--json', ...this.hostArgs];
if (options?.noWait) {
args.push('--no-wait');
}
args.push(agentId, prompt);
// With --json and no --no-wait, the output is JSON after completion.
// For streaming, we read stderr without --json for real-time text.
const raw = await this.runCli(args, options?.signal);
try {
return JSON.parse(raw) as PaseoSendResult;
} catch {
return { text: raw, ok: true };
}
}
/**
* Stream-send: runs `paseo send` WITHOUT `--json`, forward text/reasoning
* lines to onEvent in real time. Use when the caller wants to stream agent
* output as it arrives rather than wait for the full JSON result.
*/
async streamSend(
agentId: string,
prompt: string,
onEvent: (event: { type: 'text' | 'reasoning'; text: string }) => void,
signal?: AbortSignal,
): Promise<PaseoSendResult> {
return new Promise<PaseoSendResult>((resolve, reject) => {
const args = ['send', ...this.hostArgs, agentId, prompt];
const child = spawn(this.bin, args, {
stdio: ['ignore', 'pipe', 'pipe'],
signal,
});
let stdout = '';
let stderr = '';
if (child.stdout) {
const rl = createInterface({ input: child.stdout });
rl.on('line', (line: string) => {
stdout += line + '\n';
// Forward as text event for real-time display
onEvent({ type: 'text', text: line + '\n' });
});
}
if (child.stderr) {
child.stderr.on('data', (chunk: Buffer) => {
stderr += chunk.toString();
});
}
once(child, 'close').then((raw) => {
const exitCode = (raw[0] as number | null) ?? 0;
if (exitCode !== 0) {
reject(
new PaseoClientError(
`paseo send failed (exit ${exitCode}): ${stderr.trim()}`,
'send',
exitCode,
stderr,
),
);
return;
}
resolve({ text: stdout, ok: true });
});
child.on('error', reject);
});
}
/** Interrupt/stop a running agent. */
async stopAgent(agentId: string): Promise<void> {
await this.runCli(['stop', ...this.hostArgs, agentId]);
}
// ─── Private helpers ───────────────────────────────────────────────────────
/**
* Run a CLI command and return stdout as a string.
* Throws PaseoClientError on non-zero exit.
*/
private async runCli(
args: string[],
signal?: AbortSignal,
): Promise<string> {
return new Promise<string>((resolve, reject) => {
const child = spawn(this.bin, args, {
stdio: ['ignore', 'pipe', 'pipe'],
signal,
});
let stdout = '';
let stderr = '';
if (child.stdout) {
child.stdout.on('data', (chunk: Buffer) => {
stdout += chunk.toString();
});
}
if (child.stderr) {
child.stderr.on('data', (chunk: Buffer) => {
stderr += chunk.toString();
});
}
child.on('error', (err: Error) => {
// If signal aborted, treat as cancellation not error
if (signal?.aborted) {
resolve('');
return;
}
reject(err);
});
once(child, 'close').then((raw) => {
const exitCode = (raw[0] as number | null) ?? 0;
if (signal?.aborted) {
resolve('');
return;
}
if (exitCode !== 0) {
const msg = stderr.trim() || `exit code ${exitCode}`;
reject(
new PaseoClientError(
`paseo ${args[0] ?? '?'} failed: ${msg}`,
args[0] ?? '?',
exitCode,
stderr,
),
);
return;
}
resolve(stdout);
});
});
}
/**
* Run a CLI command and parse stdout as JSON.
* Throws PaseoClientError on non-zero exit or parse failure.
*/
private async runJson(args: string[]): Promise<unknown> {
const stdout = await this.runCli(args);
try {
return JSON.parse(stdout);
} catch (err) {
throw new PaseoClientError(
`paseo ${args[0] ?? '?'} returned invalid JSON: ${(stdout || '<empty>').slice(0, 200)}`,
args[0] ?? '?',
0,
stdout,
);
}
}
}