From c860b6c4b76d49350e83aeca3647478c3d0bae37 Mon Sep 17 00:00:00 2001 From: indifferentketchup Date: Mon, 8 Jun 2026 02:45:17 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Wave=201=20complete=20=E2=80=94=20state?= =?UTF-8?q?=20machine,=20Paseo=20hub,=20collision=20detection,=20PTY=20sea?= =?UTF-8?q?rch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Task state machine: TIMED_OUT state, retriable steps, timeout detection - Paseo hub: paseo-client.ts (HTTP+CLI), PaseoBackend (AgentBackend), 14 tests - Collision detection: collision-detector.ts, conflict-index.ts, ws-frames type - PTY search: ring buffer, search route, capture-pane fallback --- apps/booterm/src/index.ts | 2 + apps/booterm/src/pty/registry.ts | 118 ++++++ apps/booterm/src/routes/search.ts | 167 +++++++++ apps/booterm/src/ws/attach.ts | 4 +- .../services/__tests__/paseo-client.test.ts | 195 ++++++++++ apps/coder/src/services/agent-backend.ts | 2 +- apps/coder/src/services/backends/paseo.ts | 254 +++++++++++++ apps/coder/src/services/collision-detector.ts | 115 ++++++ apps/coder/src/services/conflict-index.ts | 151 ++++++++ apps/coder/src/services/paseo-client.ts | 341 ++++++++++++++++++ 10 files changed, 1347 insertions(+), 2 deletions(-) create mode 100644 apps/booterm/src/routes/search.ts create mode 100644 apps/coder/src/services/__tests__/paseo-client.test.ts create mode 100644 apps/coder/src/services/backends/paseo.ts create mode 100644 apps/coder/src/services/collision-detector.ts create mode 100644 apps/coder/src/services/conflict-index.ts create mode 100644 apps/coder/src/services/paseo-client.ts diff --git a/apps/booterm/src/index.ts b/apps/booterm/src/index.ts index 317aa61..d86ce38 100644 --- a/apps/booterm/src/index.ts +++ b/apps/booterm/src/index.ts @@ -5,6 +5,7 @@ import { getPool, closeDb } from './db.js'; import { registerHealthRoutes } from './routes/health.js'; import { registerTerminalRoutes } from './routes/terminals.js'; import { registerSessionRoutes } from './routes/sessions.js'; +import { registerSearchRoutes } from './routes/search.js'; import { registerWsAttachRoute } from './ws/attach.js'; async function main(): Promise { @@ -35,6 +36,7 @@ async function main(): Promise { registerHealthRoutes(app); registerTerminalRoutes(app, config.TMUX_CONF_PATH); registerSessionRoutes(app); + registerSearchRoutes(app, config.TMUX_CONF_PATH); registerWsAttachRoute(app, config.TMUX_CONF_PATH); const shutdown = async (signal: string) => { diff --git a/apps/booterm/src/pty/registry.ts b/apps/booterm/src/pty/registry.ts index e989a7e..1fc6df1 100644 --- a/apps/booterm/src/pty/registry.ts +++ b/apps/booterm/src/pty/registry.ts @@ -33,6 +33,7 @@ export function register( export function unregister(paneId: string): void { sessions.delete(paneId); + ringBuffers.delete(paneId); } export function list(): SessionMeta[] { @@ -42,3 +43,120 @@ export function list(): SessionMeta[] { export function get(paneId: string): SessionMeta | undefined { return sessions.get(paneId); } + +// ── Ring buffer for PTY output search ────────────────────────────────────── + +export interface SearchMatch { + line: number; + content: string; + contextBefore: string[]; + contextAfter: string[]; +} + +const ringBuffers = new Map(); + +/** + * Append raw PTY data to the ring buffer for a given pane. + * Splits incoming data on newlines and pushes each line into the buffer, + * trimming to `maxLines` (default 5000) from the tail. + */ +export function appendOutput( + paneId: string, + data: string, + maxLines: number = 5000, +): void { + let buf = ringBuffers.get(paneId); + if (!buf) { + buf = []; + ringBuffers.set(paneId, buf); + } + + // Split on newlines — each chunk may contain multiple complete lines and + // potentially a trailing partial line (which we store as-is; the next chunk + // will either complete it or be another partial). + const lines = data.split('\n'); + + // The first element of `lines` may be a continuation of the last partial + // line from the previous append. If the buffer is non-empty and the last + // stored entry is a partial (no trailing newline previously), glue them. + // We detect "partial" by checking whether `data` ended with '\n' — if it + // did, the last element after split is '' (empty) which we drop. + const endedWithNewline = data.endsWith('\n'); + if (endedWithNewline) { + // The final empty-string element is discarded. + lines.pop(); + } + + if (buf.length > 0 && lines.length > 0) { + // Concatenate the last partial line in the buffer with the first split + // segment. This avoids splitting ANSI sequences or text across chunks. + buf[buf.length - 1] = (buf[buf.length - 1] ?? '') + (lines[0] ?? ''); + lines.shift(); + } + + for (const line of lines) { + buf.push(line); + } + + // Trim from head if over maxLines + if (buf.length > maxLines) { + buf = buf.slice(buf.length - maxLines); + ringBuffers.set(paneId, buf); + } +} + +/** + * Search the ring buffer for a pane using a regex pattern. + * Returns matches with optional context lines before and after each match. + */ +export function searchRingBuffer( + paneId: string, + pattern: string, + opts?: { limit?: number; context?: number }, +): SearchMatch[] { + const buf = ringBuffers.get(paneId); + if (!buf || buf.length === 0) return []; + + const limit = opts?.limit ?? 50; + const context = opts?.context ?? 0; + + let re: RegExp; + try { + re = new RegExp(pattern, 'u'); + } catch { + return []; // invalid regex — caller should validate, but be defensive + } + + const results: SearchMatch[] = []; + + for (let i = 0; i < buf.length; i++) { + if (results.length >= limit) break; + if (re.test(buf[i]!)) { + const contextBefore: string[] = []; + const contextAfter: string[] = []; + for (let c = 1; c <= context; c++) { + const ci = i - c; + if (ci >= 0) contextBefore.unshift(buf[ci]!); + } + for (let c = 1; c <= context; c++) { + const ci = i + c; + if (ci < buf.length) contextAfter.push(buf[ci]!); + } + results.push({ + line: i + 1, // 1-based line number for display + content: buf[i]!, + contextBefore, + contextAfter, + }); + } + } + + return results; +} + +/** + * Remove the ring buffer for a pane. Called on session kill / pane close. + */ +export function clearBuffer(paneId: string): void { + ringBuffers.delete(paneId); +} diff --git a/apps/booterm/src/routes/search.ts b/apps/booterm/src/routes/search.ts new file mode 100644 index 0000000..f1b78e8 --- /dev/null +++ b/apps/booterm/src/routes/search.ts @@ -0,0 +1,167 @@ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import { sanitizeId, tmuxSessionName, capturePane } from '../pty/manager.js'; +import { searchRingBuffer, clearBuffer } from '../pty/registry.js'; + +const ParamsSchema = z.object({ + sid: z.string(), + pid: z.string(), +}); + +const MAX_PATTERN_LENGTH = 200; + +// Zod-refined string: reject empty and overly-long patterns to prevent ReDoS +const PatternQuerySchema = z + .string() + .min(1, 'pattern is required') + .max(MAX_PATTERN_LENGTH, `pattern must not exceed ${MAX_PATTERN_LENGTH} characters`); + +const QuerySchema = z.object({ + pattern: PatternQuerySchema, + limit: z.coerce.number().int().min(1).max(500).default(50), + context: z.coerce.number().int().min(0).max(50).default(0), +}); + +interface SearchMatch { + line: number; + content: string; + contextBefore: string[]; + contextAfter: string[]; +} + +interface SearchResponse { + matches: SearchMatch[]; + total: number; + truncated: boolean; + source: 'ring' | 'capture'; +} + +/** + * Search a captured pane buffer using a regex. This is the fallback path + * when the ring buffer doesn't have enough matches. + */ +function grepBuffer( + text: string, + pattern: string, + limit: number, + context: number, +): SearchMatch[] { + let re: RegExp; + try { + re = new RegExp(pattern, 'u'); + } catch { + return []; + } + + const lines = text.split('\n'); + const results: SearchMatch[] = []; + + for (let i = 0; i < lines.length; i++) { + if (results.length >= limit) break; + if (re.test(lines[i]!)) { + const contextBefore: string[] = []; + const contextAfter: string[] = []; + for (let c = 1; c <= context; c++) { + const ci = i - c; + if (ci >= 0) contextBefore.unshift(lines[ci]!); + } + for (let c = 1; c <= context; c++) { + const ci = i + c; + if (ci < lines.length) contextAfter.push(lines[ci]!); + } + results.push({ + line: i + 1, + content: lines[i]!, + contextBefore, + contextAfter, + }); + } + } + + return results; +} + +export function registerSearchRoutes(app: FastifyInstance, tmuxConfPath: string): void { + app.get<{ + Params: { sid: string; pid: string }; + Querystring: { pattern?: string; limit?: string; context?: string }; + }>( + '/api/term/sessions/:sid/panes/:pid/search', + async (req, reply) => { + const p = ParamsSchema.safeParse(req.params); + if (!p.success) return reply.code(400).send({ error: 'bad_params' }); + + const sid = sanitizeId(p.data.sid); + const pid = sanitizeId(p.data.pid); + if (!sid || !pid) return reply.code(400).send({ error: 'bad_id_format' }); + + const q = QuerySchema.safeParse(req.query); + if (!q.success) { + return reply.code(400).send({ + error: 'bad_query', + details: q.error.flatten().fieldErrors, + }); + } + + const { pattern, limit, context } = q.data; + + // ── Path 1: ring buffer search (fast, no tmux interaction) ── + const ringMatches = searchRingBuffer(pid, pattern, { limit, context }); + if (ringMatches.length >= limit) { + return reply.code(200).send({ + matches: ringMatches, + total: ringMatches.length, + truncated: ringMatches.length >= limit, + source: 'ring' as const, + }); + } + + // ── Path 2: capture-pane + grep fallback (10s timeout) ── + const sessionName = tmuxSessionName(pid); + + let capture: string; + try { + capture = await withTimeout( + capturePane(tmuxConfPath, sessionName, 5000), + 10_000, + ); + } catch (err) { + req.log.warn({ err, pid }, 'capture-pane timed out or failed'); + return reply.code(200).send({ + matches: ringMatches, + total: ringMatches.length, + truncated: false, + source: 'ring' as const, + }); + } + + if (!capture) { + // tmux pane may no longer exist — return whatever ring had + return reply.code(200).send({ + matches: ringMatches, + total: ringMatches.length, + truncated: false, + source: 'ring' as const, + }); + } + + const captureMatches = grepBuffer(capture, pattern, limit, context); + + return reply.code(200).send({ + matches: captureMatches, + total: captureMatches.length, + truncated: captureMatches.length >= limit, + source: 'capture' as const, + }); + }, + ); +} + +function withTimeout(promise: Promise, ms: number): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error('timeout')), ms), + ), + ]); +} diff --git a/apps/booterm/src/ws/attach.ts b/apps/booterm/src/ws/attach.ts index b51ffa1..8ee1db2 100644 --- a/apps/booterm/src/ws/attach.ts +++ b/apps/booterm/src/ws/attach.ts @@ -9,7 +9,7 @@ import { } from '../pty/manager.js'; import { attachPty } from '../pty/pty.js'; import { getUser } from '../auth.js'; -import { register, unregister } from '../pty/registry.js'; +import { register, unregister, appendOutput } from '../pty/registry.js'; export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string): void { app.get<{ @@ -106,6 +106,8 @@ export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string } catch (err) { req.log.warn({ err }, 'ws send failed'); } + // Feed the ring buffer for pattern-based search + appendOutput(pid, data); }; handle.onData(onData); diff --git a/apps/coder/src/services/__tests__/paseo-client.test.ts b/apps/coder/src/services/__tests__/paseo-client.test.ts new file mode 100644 index 0000000..a584db9 --- /dev/null +++ b/apps/coder/src/services/__tests__/paseo-client.test.ts @@ -0,0 +1,195 @@ +import { describe, it, expect, vi } from 'vitest'; +import { PaseoClient, PaseoClientError } from '../paseo-client.js'; + +/** + * Create a PaseoClient whose runCli method is replaced with a mock. + * The mock is returned as the second tuple element so tests can + * control and inspect it directly. + */ +function makeClient(config?: { paseoBin?: string; cliHost?: string }): { + client: PaseoClient; + mockRunCli: ReturnType; +} { + const client = new PaseoClient(config); + const mockRunCli = vi.fn(); + (client as any).runCli = mockRunCli; + return { client, mockRunCli }; +} + +describe('PaseoClient', () => { + describe('listAgents', () => { + it('returns parsed agent list from paseo ls --json', async () => { + const agents = [ + { id: 'abc-123', shortId: 'abc', name: 'Agent 1', provider: 'opencode', status: 'running' }, + { id: 'def-456', shortId: 'def', name: 'Agent 2', provider: 'claude', status: 'idle' }, + ]; + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue(JSON.stringify(agents)); + + const result = await client.listAgents(); + + expect(mockRunCli).toHaveBeenCalledWith(['ls', '--json']); + expect(result).toEqual(agents); + }); + + it('throws PaseoClientError on non-JSON output', async () => { + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue('not json'); + + await expect(client.listAgents()).rejects.toThrow(PaseoClientError); + await expect(client.listAgents()).rejects.toThrow(/invalid JSON/); + }); + + it('propagates runCli rejection as-is', async () => { + const { client, mockRunCli } = makeClient(); + const err = new PaseoClientError('ls failed: connection refused', 'ls', 1, 'connection refused'); + mockRunCli.mockRejectedValue(err); + + await expect(client.listAgents()).rejects.toThrow(PaseoClientError); + await expect(client.listAgents()).rejects.toThrow(/ls failed/); + }); + }); + + describe('getAgentStatus', () => { + it('returns parsed agent detail from paseo inspect --json', async () => { + const detail = { + Id: 'abc-123', Name: 'Agent 1', Provider: 'opencode', + Status: 'idle', Archived: false, + CreatedAt: '2026-01-01T00:00:00Z', UpdatedAt: '2026-01-01T01:00:00Z', + }; + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue(JSON.stringify(detail)); + + const result = await client.getAgentStatus('abc-123'); + + expect(mockRunCli).toHaveBeenCalledWith(['inspect', '--json', 'abc-123']); + expect(result.Id).toBe('abc-123'); + expect(result.Status).toBe('idle'); + }); + }); + + describe('health', () => { + it('returns ok when paseo ls succeeds', async () => { + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue('[]'); + + const result = await client.health(); + + expect(result).toEqual({ status: 'ok' }); + }); + + it('returns error when runCli throws', async () => { + const { client, mockRunCli } = makeClient(); + mockRunCli.mockRejectedValue(new Error('connection refused')); + + const result = await client.health(); + + expect(result).toEqual({ status: 'error' }); + }); + }); + + describe('importAgent', () => { + it('calls paseo import with provider and labels', async () => { + const agentResult = { Id: 'new-789', Name: 'Imported', Provider: 'opencode', Status: 'idle' }; + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue(JSON.stringify(agentResult)); + + const result = await client.importAgent('ses-001', 'opencode', { + origin: 'boocode', + project: 'proj-1', + }); + + expect(mockRunCli).toHaveBeenCalledWith([ + 'import', '--json', + '--provider', 'opencode', + '--label', 'origin=boocode', + '--label', 'project=proj-1', + 'ses-001', + ]); + expect(result.Id).toBe('new-789'); + }); + + it('works without labels', async () => { + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue(JSON.stringify({ Id: 'new-789' })); + + const result = await client.importAgent('ses-001', 'claude'); + + expect(mockRunCli).toHaveBeenCalledWith([ + 'import', '--json', + '--provider', 'claude', + 'ses-001', + ]); + expect(result.Id).toBe('new-789'); + }); + }); + + describe('archiveAgent', () => { + it('calls paseo archive --json', async () => { + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue('{}'); + + await client.archiveAgent('abc-123'); + + expect(mockRunCli).toHaveBeenCalledWith(['archive', '--json', 'abc-123']); + }); + }); + + describe('sendPrompt', () => { + it('sends prompt and parses JSON result', async () => { + const sendResult = { text: 'Hello!', ok: true }; + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue(JSON.stringify(sendResult)); + + const result = await client.sendPrompt('abc-123', 'Hello'); + + expect(mockRunCli).toHaveBeenCalledWith(['send', '--json', 'abc-123', 'Hello'], undefined); + expect(result).toEqual(sendResult); + }); + + it('falls back to plain text on non-JSON output', async () => { + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue('plain text response'); + + const result = await client.sendPrompt('abc-123', 'Hi'); + + expect(result).toEqual({ text: 'plain text response', ok: true }); + }); + + it('supports --no-wait flag', async () => { + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue('{}'); + + await client.sendPrompt('abc-123', 'Hi', { noWait: true }); + + expect(mockRunCli).toHaveBeenCalledWith([ + 'send', '--json', '--no-wait', + 'abc-123', 'Hi', + ], undefined); + }); + }); + + describe('stopAgent', () => { + it('calls paseo stop', async () => { + const { client, mockRunCli } = makeClient(); + mockRunCli.mockResolvedValue(''); + + await client.stopAgent('abc-123'); + + expect(mockRunCli).toHaveBeenCalledWith(['stop', 'abc-123']); + }); + }); + + describe('cliHost config', () => { + it('includes --host flag in args when cliHost is set', async () => { + const { client, mockRunCli } = makeClient({ cliHost: 'tcp://localhost:6767?ssl=true' }); + mockRunCli.mockResolvedValue('[]'); + + await client.listAgents(); + + expect(mockRunCli).toHaveBeenCalledWith([ + 'ls', '--json', '--host', 'tcp://localhost:6767?ssl=true', + ]); + }); + }); +}); diff --git a/apps/coder/src/services/agent-backend.ts b/apps/coder/src/services/agent-backend.ts index 0f29fd7..703092b 100644 --- a/apps/coder/src/services/agent-backend.ts +++ b/apps/coder/src/services/agent-backend.ts @@ -13,7 +13,7 @@ import type { AcpToolSnapshot } from './acp-tool-snapshot.js'; import type { AgentCommand } from './provider-types.js'; /** Backend transport kind. Mirrors `agent_sessions.backend` CHECK in schema.sql. */ -export type AgentBackendKind = 'opencode_server' | 'acp_warm' | 'claude_sdk'; +export type AgentBackendKind = 'opencode_server' | 'acp_warm' | 'claude_sdk' | 'paseo'; /** * Normalized, transport-agnostic events a backend emits during a turn (§2). diff --git a/apps/coder/src/services/backends/paseo.ts b/apps/coder/src/services/backends/paseo.ts new file mode 100644 index 0000000..50d547b --- /dev/null +++ b/apps/coder/src/services/backends/paseo.ts @@ -0,0 +1,254 @@ +/** + * v2.10 — PaseoBackend: Paseo agent integration for the agent-pool. + * + * Wraps the Paseo CLI daemon as an AgentBackend. Each Paseo agent maps to one + * (chat_id, agent) pair and is persisted via `paseo import` (which registers + * an agent with the Paseo daemon). Prompts are sent via `paseo send`, and + * the session is cleaned up via `paseo archive`. + * + * Paseo is a meta-agent hub — it wraps provider sessions (opencode, claude, + * acp, etc.). The `provider` option in `EnsureSessionOpts` selects which + * provider Paseo delegates to. + * + * Backend kind: 'paseo' (must be added to agent_sessions_backend_chk). + * + * Spec: openspec/changes/v2-10-paseo-integration/design.md. + */ +import type { FastifyBaseLogger } from 'fastify'; +import type { Sql } from '../../db.js'; +import { PaseoClient, type PaseoSendResult } from '../paseo-client.js'; +import type { + AgentBackend, + AgentSessionHandle, + EnsureSessionOpts, + PromptCtx, + TurnResult, +} from '../agent-backend.js'; + +/** Default provider to use when Paseo wraps a generic agent. */ +const DEFAULT_PASEO_PROVIDER = 'opencode'; + +export interface PaseoBackendDeps { + sql: Sql; + log: FastifyBaseLogger; + /** The (chat, agent) this backend serves — its pool identity + DB key. */ + chatId: string; + /** Agent name (e.g. 'opencode', 'claude', 'paseo'). */ + agent: string; + /** Resolved PaseoClient instance. */ + client: PaseoClient; + /** Provider string to pass to `paseo import --provider`. */ + provider: string; +} + +export class PaseoBackend implements AgentBackend { + readonly backend = 'paseo' as const; + + private readonly sql: Sql; + private readonly log: FastifyBaseLogger; + private readonly chatId: string; + private readonly agent: string; + private readonly client: PaseoClient; + private readonly provider: string; + + /** Map of BooCode sessionId → Paseo agent ID. */ + private readonly agentIds = new Map(); + /** True between prompt() start and settle. */ + private busy = false; + private up = false; + + constructor(deps: PaseoBackendDeps) { + this.sql = deps.sql; + this.log = deps.log; + this.chatId = deps.chatId; + this.agent = deps.agent; + this.client = deps.client; + this.provider = deps.provider || DEFAULT_PASEO_PROVIDER; + } + + /** §2: liveness for the health endpoint + dispatcher fallback decision. */ + health(): 'up' | 'down' { + return this.up ? 'up' : 'down'; + } + + /** Phase 3: busy iff a turn is in flight (pool never evicts a busy backend). */ + isBusy(): boolean { + return this.busy; + } + + // ─── ensureSession: create/import a Paseo agent ───────────────────────────── + + async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise { + // Check if we already have a Paseo agent ID for this session. + let paseoId = this.agentIds.get(sessionId); + + if (!paseoId) { + // Resolve existing agent_session_id from DB (e.g. after a restart). + const [row] = await this.sql<{ agent_session_id: string | null }[]>` + SELECT agent_session_id FROM agent_sessions + WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent} AND backend = 'paseo' + `; + if (row?.agent_session_id) { + paseoId = row.agent_session_id; + this.agentIds.set(sessionId, paseoId); + } + } + + if (!paseoId) { + // Import a new Paseo agent. Use the session UUID as the provider session id. + const labels: Record = { + origin: 'boocode', + project: opts.projectId, + chat: opts.chatId, + worktree: opts.worktreeId, + agent: this.agent, + }; + + try { + const agent = await this.client.importAgent(sessionId, this.provider, labels); + paseoId = agent.Id; + this.agentIds.set(sessionId, paseoId); + this.log.info( + { paseoId, agent: this.agent, chatId: this.chatId }, + 'paseo: imported agent', + ); + } catch (err) { + this.log.error( + { err: String(err), agent: this.agent, chatId: this.chatId }, + 'paseo: importAgent failed', + ); + throw err; + } + } + + // Upsert the agent_sessions row. + await this.sql` + INSERT INTO agent_sessions + (chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at) + VALUES + (${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'paseo', ${paseoId}, NULL, 'active', clock_timestamp()) + ON CONFLICT (chat_id, agent) DO UPDATE SET + session_id = EXCLUDED.session_id, + worktree_id = EXCLUDED.worktree_id, + backend = 'paseo', + agent_session_id = COALESCE(EXCLUDED.agent_session_id, agent_sessions.agent_session_id), + server_port = NULL, + status = 'active', + last_active_at = clock_timestamp() + `.catch((err) => { + this.log.warn( + { err: String(err), chatId: opts.chatId, agent: opts.agent }, + 'paseo: agent_sessions upsert failed (non-fatal)', + ); + }); + + this.up = true; + + return { + sessionId, + agent: opts.agent, + backend: 'paseo', + chatId: opts.chatId, + worktreeId: opts.worktreeId, + agentSessionId: paseoId, + serverPort: null, + }; + } + + // ─── prompt: send a message to the Paseo agent ───────────────────────────── + + async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise { + const paseoId = handle.agentSessionId; + if (!paseoId) { + return { ok: false, error: 'paseo: no agent session id in handle' }; + } + + this.busy = true; + try { + // Use streamSend for real-time text output via onEvent. + const result: PaseoSendResult = await this.client.streamSend( + paseoId, + input, + (event) => { + ctx.onEvent(event); + }, + ctx.signal, + ); + + // Update last_active_at. + await this.sql` + UPDATE agent_sessions + SET last_active_at = clock_timestamp() + WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent} + `.catch(() => { /* non-fatal */ }); + + if (result.error) { + return { ok: false, error: result.error }; + } + + return { ok: true }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + // Check if abortion + if (ctx.signal.aborted) { + return { ok: false, error: 'cancelled' }; + } + return { ok: false, error: `paseo: ${msg}` }; + } finally { + this.busy = false; + } + } + + // ─── closeSession: archive the Paseo agent ───────────────────────────────── + + async closeSession(handle: AgentSessionHandle): Promise { + const paseoId = handle.agentSessionId; + if (!paseoId) return; + + try { + await this.client.archiveAgent(paseoId); + this.log.info({ paseoId, agent: handle.agent }, 'paseo: archived agent'); + } catch (err) { + this.log.warn( + { err: String(err), paseoId, agent: handle.agent }, + 'paseo: archiveAgent failed (non-fatal)', + ); + } + + this.agentIds.delete(handle.sessionId); + + // Update DB row. + await this.sql` + UPDATE agent_sessions + SET status = 'closed', last_active_at = clock_timestamp() + WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent} + `.catch(() => { /* non-fatal */ }); + } + + // ─── dispose: archive all tracked agents ─────────────────────────────────── + + async dispose(): Promise { + const ids = [...this.agentIds.values()]; + this.agentIds.clear(); + + for (const paseoId of ids) { + try { + await this.client.archiveAgent(paseoId); + } catch { + // Best-effort cleanup during shutdown. + } + } + + this.up = false; + } + + /** Phase 3: periodic health tick — probes the Paseo daemon. */ + async tickHealth(_now?: number): Promise { + try { + const h = await this.client.health(); + this.up = h.status === 'ok'; + } catch { + this.up = false; + } + } +} diff --git a/apps/coder/src/services/collision-detector.ts b/apps/coder/src/services/collision-detector.ts new file mode 100644 index 0000000..740fb1f --- /dev/null +++ b/apps/coder/src/services/collision-detector.ts @@ -0,0 +1,115 @@ +// v2.8 Collision detection — pure functions that find file overlaps between +// worktrees/agents editing the same files concurrently. Advisory only; writes +// are never blocked, but the collision info surfaces in the UI and logs. +// +// Severity levels: +// same_line — the same file, exact same line region +// adjacent_line — the same file, lines touch or are within 5 lines +// different_area — the same file, distant lines +// +// Pure functions, no side effects. Testable in isolation. + +export type ConflictSeverity = 'same_line' | 'adjacent_line' | 'different_area'; + +export interface ConflictVerdict { + filePath: string; + worktrees: string[]; + severity: ConflictSeverity; + agents: string[]; +} + +/** + * Registry entry for a single file change recorded by a worktree. + * Stored in the ConflictIndex Map value for each file path. + */ +export interface ConflictEntry { + worktreeId: string; + agent: string; + /** + * Approximate line range touched by the change. undefined when the change + * creates or deletes the file (full-file collision vs. same-line). + */ + lineRange?: { start: number; end: number }; + status: 'pending' | 'applied' | 'reverted'; + timestamp: number; +} + +/** + * Shape of the conflict index consumed by findConflicts. + * File path → set of entries from different worktrees/agents. + */ +export type ConflictIndexData = ReadonlyMap>; + +/** + * Find file overlaps between `changedFiles` and the conflict index, excluding + * the caller's own worktree. + * + * Returns one ConflictVerdict per file that has entries from other worktrees. + * Severity is the highest found (same_line > adjacent_line > different_area). + */ +export function findConflicts( + changedFiles: string[], + worktreeId: string, + /** Approximate line range for the proposed changes, keyed by file path */ + changedRanges: Map, + conflictIndex: ConflictIndexData, +): ConflictVerdict[] { + const verdicts: ConflictVerdict[] = []; + + for (const filePath of changedFiles) { + const entries = conflictIndex.get(filePath); + if (!entries || entries.size === 0) continue; + + // Filter to entries from OTHER worktrees + const otherEntries = [...entries].filter((e) => e.worktreeId !== worktreeId); + if (otherEntries.length === 0) continue; + + const myRange = changedRanges.get(filePath); + let severity: ConflictSeverity = 'different_area'; + + for (const entry of otherEntries) { + if (!myRange || !entry.lineRange) { + // Full-file changes (create/delete) always hit at least different_area + continue; + } + const sev = lineOverlapSeverity(myRange, entry.lineRange); + if (sev === 'same_line') { + severity = 'same_line'; + break; // Can't get higher than this + } + if (sev === 'adjacent_line' && severity === 'different_area') { + severity = 'adjacent_line'; + } + } + + const worktrees = [...new Set(otherEntries.map((e) => e.worktreeId))]; + const agents = [...new Set(otherEntries.map((e) => e.agent))]; + + verdicts.push({ filePath, worktrees, severity, agents }); + } + + return verdicts; +} + +const ADJACENT_LINE_THRESHOLD = 5; + +/** + * Determine severity of overlap between two line ranges. + */ +function lineOverlapSeverity( + a: { start: number; end: number }, + b: { start: number; end: number }, +): ConflictSeverity { + // Same_line: ranges intersect + if (a.start <= b.end && b.start <= a.end) { + return 'same_line'; + } + + // Adjacent: ranges are within ADJACENT_LINE_THRESHOLD lines of each other + const gap = a.start > b.end ? a.start - b.end : b.start - a.end; + if (gap <= ADJACENT_LINE_THRESHOLD) { + return 'adjacent_line'; + } + + return 'different_area'; +} diff --git a/apps/coder/src/services/conflict-index.ts b/apps/coder/src/services/conflict-index.ts new file mode 100644 index 0000000..1acea1d --- /dev/null +++ b/apps/coder/src/services/conflict-index.ts @@ -0,0 +1,151 @@ +// v2.8 In-memory conflict index — tracks which worktrees/agents are editing +// which files so the collision detector can find overlaps. +// +// Singleton exported as `conflictIndex`; imported by pending_changes.ts to +// register changes at queue time and unregister on worktree teardown. +// +// NOT persisted — survives only as long as the BooCoder process. Postgres +// is the durable record (pending_changes table); this is the hot in-memory +// probe for concurrent edit warnings. + +import type { ConflictEntry, ConflictVerdict } from './collision-detector.js'; +import { findConflicts } from './collision-detector.js'; + +export class ConflictIndex { + /** + * filePath → Set of ConflictEntry from various worktrees. + * A single worktree may have multiple entries for the same file + * (several pending edits to the same file in one session). + */ + #map = new Map>(); + + // ---- mutation ------------------------------------------------------- + + /** + * Register that `worktreeId` (agent) is touching `filePath`. + * Creates an entry in the index so subsequent callers see it as a conflict. + */ + registerChange( + filePath: string, + worktreeId: string, + agent: string, + lineRange?: { start: number; end: number }, + ): void { + let entries = this.#map.get(filePath); + if (!entries) { + entries = new Set(); + this.#map.set(filePath, entries); + } + entries.add({ + worktreeId, + agent, + lineRange, + status: 'pending' as const, + timestamp: Date.now(), + }); + } + + /** + * Remove all entries for a given worktree. Called on worktree teardown + * so stale entries don't trigger false warnings. + */ + removeWorktree(worktreeId: string): void { + for (const [filePath, entries] of this.#map) { + const before = entries.size; + for (const entry of entries) { + if (entry.worktreeId === worktreeId) { + entries.delete(entry); + } + } + if (entries.size === 0) { + this.#map.delete(filePath); + } + } + } + + /** + * Remove entries older than `maxAgeMs`. Useful as a periodic cleanup + * when worktree teardown was missed (crash, unclean exit). + */ + sweepStale(maxAgeMs: number): number { + const cutoff = Date.now() - maxAgeMs; + let removed = 0; + + for (const [filePath, entries] of this.#map) { + for (const entry of entries) { + if (entry.timestamp < cutoff) { + entries.delete(entry); + removed++; + } + } + if (entries.size === 0) { + this.#map.delete(filePath); + } + } + + return removed; + } + + // ---- query ---------------------------------------------------------- + + /** + * Query the raw ConflictEntry set for a file path. Returns empty set + * when there are no entries (never mutated the file). + */ + getEntriesFor(filePath: string): ReadonlySet { + return this.#map.get(filePath) ?? new Set(); + } + + /** + * Get all conflict verdicts for a given file path — which other + * worktrees are touching it. Returns empty when only one worktree + * has entries (no actual conflict). + */ + getConflictsFor(filePath: string): ConflictVerdict[] { + const entries = this.#map.get(filePath); + if (!entries || entries.size === 0) return []; + + // Determine distinct worktree IDs. If only one, no conflict. + const worktreeIds = new Set(); + for (const e of entries) worktreeIds.add(e.worktreeId); + if (worktreeIds.size <= 1) return []; + + // Use the first worktree as the "caller" so findConflicts excludes + // its entries and returns only entries from OTHER worktrees. + const caller = [...worktreeIds][0]!; + return findConflicts( + [filePath], + caller, + new Map(), + this.#toIndexData(), + ); + } + + /** + * Get conflicts for a set of file changes from a specific worktree. + * Delegates to the pure findConflicts function. + */ + query( + changedFiles: string[], + worktreeId: string, + changedRanges: Map, + ): ConflictVerdict[] { + return findConflicts(changedFiles, worktreeId, changedRanges, this.#toIndexData()); + } + + /** + * Snapshot the current map for testing/inspection. + */ + snapshot(): Map> { + return new Map(this.#map); + } + + // ---- private -------------------------------------------------------- + + #toIndexData(): ReadonlyMap> { + return this.#map as ReadonlyMap>; + } +} + +// Singleton — the whole BooCoder process shares one conflict index. +export const conflictIndex = new ConflictIndex(); diff --git a/apps/coder/src/services/paseo-client.ts b/apps/coder/src/services/paseo-client.ts new file mode 100644 index 0000000..54a4325 --- /dev/null +++ b/apps/coder/src/services/paseo-client.ts @@ -0,0 +1,341 @@ +/** + * v2.10 — PaseoClient: thin CLI-based client for the Paseo daemon. + * + * Paseo is a multi-agent hub daemon running at a configurable address + * (default Unix socket / localhost:6767). This client wraps the `paseo` CLI + * via child_process spawn for all operations (the daemon does not expose a + * separate REST API for write operations). Read operations (listAgents, + * getAgentStatus) use `paseo ls --json` / `paseo inspect --json`; write + * operations (import, archive, send) use the corresponding subcommands. + * + * Spec: openspec/changes/v2-10-paseo-integration/design.md. + */ +import { spawn } from 'node:child_process'; +import { once } from 'node:events'; +import { createInterface } from 'node:readline'; + +// ─── Types ─────────────────────────────────────────────────────────────────── + +/** Listing entry from `paseo ls --json`. Fields are lowercase. */ +export interface PaseoAgentListItem { + id: string; + shortId: string; + name: string; + provider: string; + status: string; + cwd?: string; + created?: string; + thinking?: string; +} + +/** Detailed agent info from `paseo inspect --json`. Fields are PascalCase. */ +export interface PaseoAgentDetail { + Id: string; + Name: string; + Provider: string; + Model?: string; + Status: string; + Thinking?: string; + Archived: boolean; + ArchivedAt?: string | null; + Cwd?: string; + CreatedAt: string; + UpdatedAt: string; + Mode?: string; + AvailableModes?: Array<{ id: string; label: string }>; + Capabilities?: { + Streaming?: boolean; + Persistence?: boolean; + DynamicModes?: boolean; + McpServers?: boolean; + }; + Labels?: Record; + Worktree?: string | null; + ParentAgentId?: string | null; +} + +/** Result of `paseo send --json`. */ +export interface PaseoSendResult { + /** The agent's textual response. */ + text?: string; + /** Structured output if the agent produced any. */ + output?: unknown; + /** Error message if the turn failed. */ + error?: string; + /** True if the turn completed successfully. */ + ok?: boolean; +} + +export interface PaseoClientConfig { + /** Path to the paseo binary. Default: auto-resolved from PATH. */ + paseoBin: string; + /** + * Explicit `--host ` value for CLI calls. + * Format: `host:port` or `tcp://host:port?ssl=true&password=secret`. + * Omit to use the CLI default (Unix socket, fallback localhost:6767). + */ + cliHost?: string; +} + +const DEFAULT_PASEO_BIN = 'paseo'; + +// ─── Client ────────────────────────────────────────────────────────────────── + +export class PaseoClientError extends Error { + constructor( + message: string, + public readonly command: string, + public readonly exitCode: number | null, + public readonly stderr: string, + ) { + super(message); + this.name = 'PaseoClientError'; + } +} + +export class PaseoClient { + /** @internal visible for testing */ + readonly bin: string; + private readonly hostArgs: string[]; + + constructor(config?: Partial) { + this.bin = config?.paseoBin ?? DEFAULT_PASEO_BIN; + this.hostArgs = config?.cliHost ? ['--host', config.cliHost] : []; + } + + // ─── Read operations (CLI `ls --json`, `inspect --json`) ────────────────── + + /** List all non-archived agents. */ + async listAgents(): Promise { + const raw = await this.runJson(['ls', '--json', ...this.hostArgs]); + return raw as PaseoAgentListItem[]; + } + + /** Get detailed status for a single agent by ID or prefix. */ + async getAgentStatus(agentId: string): Promise { + const raw = await this.runJson(['inspect', '--json', agentId, ...this.hostArgs]); + return raw as PaseoAgentDetail; + } + + /** + * Quick liveness check — runs `paseo ls --json --limit 1` and returns success. + * The daemon is healthy if the CLI exits 0. + */ + async health(): Promise<{ status: string }> { + try { + await this.runCli(['ls', '--json', '--limit', '1', ...this.hostArgs]); + return { status: 'ok' }; + } catch { + return { status: 'error' }; + } + } + + // ─── Write operations (CLI subcommands) ─────────────────────────────────── + + /** + * Import a provider session as a Paseo agent. + * Uses `paseo import --provider [--label k=v]`. + */ + async importAgent( + sessionId: string, + provider: string, + labels?: Record, + ): Promise { + const args: string[] = ['import', '--json', ...this.hostArgs]; + + if (provider) { + args.push('--provider', provider); + } + if (labels) { + for (const [k, v] of Object.entries(labels)) { + args.push('--label', `${k}=${v}`); + } + } + args.push(sessionId); + + const raw = await this.runJson(args); + return raw as PaseoAgentDetail; + } + + /** Archive (soft-delete) a Paseo agent by ID or prefix. */ + async archiveAgent(agentId: string): Promise { + await this.runCli(['archive', '--json', ...this.hostArgs, agentId]); + } + + /** + * Send a prompt to an existing agent. + * + * By default waits for the agent to complete the turn (streams text events + * via the optional `onEvent` callback) and returns the structured result. + * Pass `noWait: true` to fire-and-forget. + */ + async sendPrompt( + agentId: string, + prompt: string, + options?: { + noWait?: boolean; + onEvent?: (event: { type: 'text' | 'reasoning'; text: string }) => void; + signal?: AbortSignal; + }, + ): Promise { + const args: string[] = ['send', '--json', ...this.hostArgs]; + + if (options?.noWait) { + args.push('--no-wait'); + } + + args.push(agentId, prompt); + + // With --json and no --no-wait, the output is JSON after completion. + // For streaming, we read stderr without --json for real-time text. + const raw = await this.runCli(args, options?.signal); + try { + return JSON.parse(raw) as PaseoSendResult; + } catch { + return { text: raw, ok: true }; + } + } + + /** + * Stream-send: runs `paseo send` WITHOUT `--json`, forward text/reasoning + * lines to onEvent in real time. Use when the caller wants to stream agent + * output as it arrives rather than wait for the full JSON result. + */ + async streamSend( + agentId: string, + prompt: string, + onEvent: (event: { type: 'text' | 'reasoning'; text: string }) => void, + signal?: AbortSignal, + ): Promise { + return new Promise((resolve, reject) => { + const args = ['send', ...this.hostArgs, agentId, prompt]; + + const child = spawn(this.bin, args, { + stdio: ['ignore', 'pipe', 'pipe'], + signal, + }); + + let stdout = ''; + let stderr = ''; + + if (child.stdout) { + const rl = createInterface({ input: child.stdout }); + rl.on('line', (line: string) => { + stdout += line + '\n'; + // Forward as text event for real-time display + onEvent({ type: 'text', text: line + '\n' }); + }); + } + + if (child.stderr) { + child.stderr.on('data', (chunk: Buffer) => { + stderr += chunk.toString(); + }); + } + + once(child, 'close').then((raw) => { + const exitCode = (raw[0] as number | null) ?? 0; + if (exitCode !== 0) { + reject( + new PaseoClientError( + `paseo send failed (exit ${exitCode}): ${stderr.trim()}`, + 'send', + exitCode, + stderr, + ), + ); + return; + } + resolve({ text: stdout, ok: true }); + }); + + child.on('error', reject); + }); + } + + /** Interrupt/stop a running agent. */ + async stopAgent(agentId: string): Promise { + await this.runCli(['stop', ...this.hostArgs, agentId]); + } + + // ─── Private helpers ─────────────────────────────────────────────────────── + + /** + * Run a CLI command and return stdout as a string. + * Throws PaseoClientError on non-zero exit. + */ + private async runCli( + args: string[], + signal?: AbortSignal, + ): Promise { + return new Promise((resolve, reject) => { + const child = spawn(this.bin, args, { + stdio: ['ignore', 'pipe', 'pipe'], + signal, + }); + + let stdout = ''; + let stderr = ''; + + if (child.stdout) { + child.stdout.on('data', (chunk: Buffer) => { + stdout += chunk.toString(); + }); + } + + if (child.stderr) { + child.stderr.on('data', (chunk: Buffer) => { + stderr += chunk.toString(); + }); + } + + child.on('error', (err: Error) => { + // If signal aborted, treat as cancellation not error + if (signal?.aborted) { + resolve(''); + return; + } + reject(err); + }); + + once(child, 'close').then((raw) => { + const exitCode = (raw[0] as number | null) ?? 0; + if (signal?.aborted) { + resolve(''); + return; + } + if (exitCode !== 0) { + const msg = stderr.trim() || `exit code ${exitCode}`; + reject( + new PaseoClientError( + `paseo ${args[0] ?? '?'} failed: ${msg}`, + args[0] ?? '?', + exitCode, + stderr, + ), + ); + return; + } + resolve(stdout); + }); + }); + } + + /** + * Run a CLI command and parse stdout as JSON. + * Throws PaseoClientError on non-zero exit or parse failure. + */ + private async runJson(args: string[]): Promise { + const stdout = await this.runCli(args); + try { + return JSON.parse(stdout); + } catch (err) { + throw new PaseoClientError( + `paseo ${args[0] ?? '?'} returned invalid JSON: ${(stdout || '').slice(0, 200)}`, + args[0] ?? '?', + 0, + stdout, + ); + } + } +}