- Task state machine: TIMED_OUT state, retriable steps, timeout detection - Paseo hub: paseo-client.ts (HTTP+CLI), PaseoBackend (AgentBackend), 14 tests - Collision detection: collision-detector.ts, conflict-index.ts, ws-frames type - PTY search: ring buffer, search route, capture-pane fallback
255 lines
8.0 KiB
TypeScript
255 lines
8.0 KiB
TypeScript
/**
|
|
* v2.10 — PaseoBackend: Paseo agent integration for the agent-pool.
|
|
*
|
|
* Wraps the Paseo CLI daemon as an AgentBackend. Each Paseo agent maps to one
|
|
* (chat_id, agent) pair and is persisted via `paseo import` (which registers
|
|
* an agent with the Paseo daemon). Prompts are sent via `paseo send`, and
|
|
* the session is cleaned up via `paseo archive`.
|
|
*
|
|
* Paseo is a meta-agent hub — it wraps provider sessions (opencode, claude,
|
|
* acp, etc.). The `provider` option in `EnsureSessionOpts` selects which
|
|
* provider Paseo delegates to.
|
|
*
|
|
* Backend kind: 'paseo' (must be added to agent_sessions_backend_chk).
|
|
*
|
|
* Spec: openspec/changes/v2-10-paseo-integration/design.md.
|
|
*/
|
|
import type { FastifyBaseLogger } from 'fastify';
|
|
import type { Sql } from '../../db.js';
|
|
import { PaseoClient, type PaseoSendResult } from '../paseo-client.js';
|
|
import type {
|
|
AgentBackend,
|
|
AgentSessionHandle,
|
|
EnsureSessionOpts,
|
|
PromptCtx,
|
|
TurnResult,
|
|
} from '../agent-backend.js';
|
|
|
|
/** Default provider to use when Paseo wraps a generic agent. */
|
|
const DEFAULT_PASEO_PROVIDER = 'opencode';
|
|
|
|
export interface PaseoBackendDeps {
|
|
sql: Sql;
|
|
log: FastifyBaseLogger;
|
|
/** The (chat, agent) this backend serves — its pool identity + DB key. */
|
|
chatId: string;
|
|
/** Agent name (e.g. 'opencode', 'claude', 'paseo'). */
|
|
agent: string;
|
|
/** Resolved PaseoClient instance. */
|
|
client: PaseoClient;
|
|
/** Provider string to pass to `paseo import --provider`. */
|
|
provider: string;
|
|
}
|
|
|
|
export class PaseoBackend implements AgentBackend {
|
|
readonly backend = 'paseo' as const;
|
|
|
|
private readonly sql: Sql;
|
|
private readonly log: FastifyBaseLogger;
|
|
private readonly chatId: string;
|
|
private readonly agent: string;
|
|
private readonly client: PaseoClient;
|
|
private readonly provider: string;
|
|
|
|
/** Map of BooCode sessionId → Paseo agent ID. */
|
|
private readonly agentIds = new Map<string, string>();
|
|
/** True between prompt() start and settle. */
|
|
private busy = false;
|
|
private up = false;
|
|
|
|
constructor(deps: PaseoBackendDeps) {
|
|
this.sql = deps.sql;
|
|
this.log = deps.log;
|
|
this.chatId = deps.chatId;
|
|
this.agent = deps.agent;
|
|
this.client = deps.client;
|
|
this.provider = deps.provider || DEFAULT_PASEO_PROVIDER;
|
|
}
|
|
|
|
/** §2: liveness for the health endpoint + dispatcher fallback decision. */
|
|
health(): 'up' | 'down' {
|
|
return this.up ? 'up' : 'down';
|
|
}
|
|
|
|
/** Phase 3: busy iff a turn is in flight (pool never evicts a busy backend). */
|
|
isBusy(): boolean {
|
|
return this.busy;
|
|
}
|
|
|
|
// ─── ensureSession: create/import a Paseo agent ─────────────────────────────
|
|
|
|
async ensureSession(sessionId: string, opts: EnsureSessionOpts): Promise<AgentSessionHandle> {
|
|
// Check if we already have a Paseo agent ID for this session.
|
|
let paseoId = this.agentIds.get(sessionId);
|
|
|
|
if (!paseoId) {
|
|
// Resolve existing agent_session_id from DB (e.g. after a restart).
|
|
const [row] = await this.sql<{ agent_session_id: string | null }[]>`
|
|
SELECT agent_session_id FROM agent_sessions
|
|
WHERE chat_id = ${opts.chatId} AND agent = ${opts.agent} AND backend = 'paseo'
|
|
`;
|
|
if (row?.agent_session_id) {
|
|
paseoId = row.agent_session_id;
|
|
this.agentIds.set(sessionId, paseoId);
|
|
}
|
|
}
|
|
|
|
if (!paseoId) {
|
|
// Import a new Paseo agent. Use the session UUID as the provider session id.
|
|
const labels: Record<string, string> = {
|
|
origin: 'boocode',
|
|
project: opts.projectId,
|
|
chat: opts.chatId,
|
|
worktree: opts.worktreeId,
|
|
agent: this.agent,
|
|
};
|
|
|
|
try {
|
|
const agent = await this.client.importAgent(sessionId, this.provider, labels);
|
|
paseoId = agent.Id;
|
|
this.agentIds.set(sessionId, paseoId);
|
|
this.log.info(
|
|
{ paseoId, agent: this.agent, chatId: this.chatId },
|
|
'paseo: imported agent',
|
|
);
|
|
} catch (err) {
|
|
this.log.error(
|
|
{ err: String(err), agent: this.agent, chatId: this.chatId },
|
|
'paseo: importAgent failed',
|
|
);
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
// Upsert the agent_sessions row.
|
|
await this.sql`
|
|
INSERT INTO agent_sessions
|
|
(chat_id, session_id, worktree_id, agent, backend, agent_session_id, server_port, status, last_active_at)
|
|
VALUES
|
|
(${opts.chatId}, ${sessionId}, ${opts.worktreeId}, ${opts.agent}, 'paseo', ${paseoId}, NULL, 'active', clock_timestamp())
|
|
ON CONFLICT (chat_id, agent) DO UPDATE SET
|
|
session_id = EXCLUDED.session_id,
|
|
worktree_id = EXCLUDED.worktree_id,
|
|
backend = 'paseo',
|
|
agent_session_id = COALESCE(EXCLUDED.agent_session_id, agent_sessions.agent_session_id),
|
|
server_port = NULL,
|
|
status = 'active',
|
|
last_active_at = clock_timestamp()
|
|
`.catch((err) => {
|
|
this.log.warn(
|
|
{ err: String(err), chatId: opts.chatId, agent: opts.agent },
|
|
'paseo: agent_sessions upsert failed (non-fatal)',
|
|
);
|
|
});
|
|
|
|
this.up = true;
|
|
|
|
return {
|
|
sessionId,
|
|
agent: opts.agent,
|
|
backend: 'paseo',
|
|
chatId: opts.chatId,
|
|
worktreeId: opts.worktreeId,
|
|
agentSessionId: paseoId,
|
|
serverPort: null,
|
|
};
|
|
}
|
|
|
|
// ─── prompt: send a message to the Paseo agent ─────────────────────────────
|
|
|
|
async prompt(handle: AgentSessionHandle, input: string, ctx: PromptCtx): Promise<TurnResult> {
|
|
const paseoId = handle.agentSessionId;
|
|
if (!paseoId) {
|
|
return { ok: false, error: 'paseo: no agent session id in handle' };
|
|
}
|
|
|
|
this.busy = true;
|
|
try {
|
|
// Use streamSend for real-time text output via onEvent.
|
|
const result: PaseoSendResult = await this.client.streamSend(
|
|
paseoId,
|
|
input,
|
|
(event) => {
|
|
ctx.onEvent(event);
|
|
},
|
|
ctx.signal,
|
|
);
|
|
|
|
// Update last_active_at.
|
|
await this.sql`
|
|
UPDATE agent_sessions
|
|
SET last_active_at = clock_timestamp()
|
|
WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent}
|
|
`.catch(() => { /* non-fatal */ });
|
|
|
|
if (result.error) {
|
|
return { ok: false, error: result.error };
|
|
}
|
|
|
|
return { ok: true };
|
|
} catch (err) {
|
|
const msg = err instanceof Error ? err.message : String(err);
|
|
// Check if abortion
|
|
if (ctx.signal.aborted) {
|
|
return { ok: false, error: 'cancelled' };
|
|
}
|
|
return { ok: false, error: `paseo: ${msg}` };
|
|
} finally {
|
|
this.busy = false;
|
|
}
|
|
}
|
|
|
|
// ─── closeSession: archive the Paseo agent ─────────────────────────────────
|
|
|
|
async closeSession(handle: AgentSessionHandle): Promise<void> {
|
|
const paseoId = handle.agentSessionId;
|
|
if (!paseoId) return;
|
|
|
|
try {
|
|
await this.client.archiveAgent(paseoId);
|
|
this.log.info({ paseoId, agent: handle.agent }, 'paseo: archived agent');
|
|
} catch (err) {
|
|
this.log.warn(
|
|
{ err: String(err), paseoId, agent: handle.agent },
|
|
'paseo: archiveAgent failed (non-fatal)',
|
|
);
|
|
}
|
|
|
|
this.agentIds.delete(handle.sessionId);
|
|
|
|
// Update DB row.
|
|
await this.sql`
|
|
UPDATE agent_sessions
|
|
SET status = 'closed', last_active_at = clock_timestamp()
|
|
WHERE chat_id = ${handle.chatId} AND agent = ${handle.agent}
|
|
`.catch(() => { /* non-fatal */ });
|
|
}
|
|
|
|
// ─── dispose: archive all tracked agents ───────────────────────────────────
|
|
|
|
async dispose(): Promise<void> {
|
|
const ids = [...this.agentIds.values()];
|
|
this.agentIds.clear();
|
|
|
|
for (const paseoId of ids) {
|
|
try {
|
|
await this.client.archiveAgent(paseoId);
|
|
} catch {
|
|
// Best-effort cleanup during shutdown.
|
|
}
|
|
}
|
|
|
|
this.up = false;
|
|
}
|
|
|
|
/** Phase 3: periodic health tick — probes the Paseo daemon. */
|
|
async tickHealth(_now?: number): Promise<void> {
|
|
try {
|
|
const h = await this.client.health();
|
|
this.up = h.status === 'ok';
|
|
} catch {
|
|
this.up = false;
|
|
}
|
|
}
|
|
}
|