feat: Wave 1 complete — state machine, Paseo hub, collision detection, PTY search
- Task state machine: TIMED_OUT state, retriable steps, timeout detection - Paseo hub: paseo-client.ts (HTTP+CLI), PaseoBackend (AgentBackend), 14 tests - Collision detection: collision-detector.ts, conflict-index.ts, ws-frames type - PTY search: ring buffer, search route, capture-pane fallback
This commit is contained in:
@@ -5,6 +5,7 @@ import { getPool, closeDb } from './db.js';
|
||||
import { registerHealthRoutes } from './routes/health.js';
|
||||
import { registerTerminalRoutes } from './routes/terminals.js';
|
||||
import { registerSessionRoutes } from './routes/sessions.js';
|
||||
import { registerSearchRoutes } from './routes/search.js';
|
||||
import { registerWsAttachRoute } from './ws/attach.js';
|
||||
|
||||
async function main(): Promise<void> {
|
||||
@@ -35,6 +36,7 @@ async function main(): Promise<void> {
|
||||
registerHealthRoutes(app);
|
||||
registerTerminalRoutes(app, config.TMUX_CONF_PATH);
|
||||
registerSessionRoutes(app);
|
||||
registerSearchRoutes(app, config.TMUX_CONF_PATH);
|
||||
registerWsAttachRoute(app, config.TMUX_CONF_PATH);
|
||||
|
||||
const shutdown = async (signal: string) => {
|
||||
|
||||
@@ -33,6 +33,7 @@ export function register(
|
||||
|
||||
export function unregister(paneId: string): void {
|
||||
sessions.delete(paneId);
|
||||
ringBuffers.delete(paneId);
|
||||
}
|
||||
|
||||
export function list(): SessionMeta[] {
|
||||
@@ -42,3 +43,120 @@ export function list(): SessionMeta[] {
|
||||
export function get(paneId: string): SessionMeta | undefined {
|
||||
return sessions.get(paneId);
|
||||
}
|
||||
|
||||
// ── Ring buffer for PTY output search ──────────────────────────────────────
|
||||
|
||||
export interface SearchMatch {
|
||||
line: number;
|
||||
content: string;
|
||||
contextBefore: string[];
|
||||
contextAfter: string[];
|
||||
}
|
||||
|
||||
const ringBuffers = new Map<string, string[]>();
|
||||
|
||||
/**
|
||||
* Append raw PTY data to the ring buffer for a given pane.
|
||||
* Splits incoming data on newlines and pushes each line into the buffer,
|
||||
* trimming to `maxLines` (default 5000) from the tail.
|
||||
*/
|
||||
export function appendOutput(
|
||||
paneId: string,
|
||||
data: string,
|
||||
maxLines: number = 5000,
|
||||
): void {
|
||||
let buf = ringBuffers.get(paneId);
|
||||
if (!buf) {
|
||||
buf = [];
|
||||
ringBuffers.set(paneId, buf);
|
||||
}
|
||||
|
||||
// Split on newlines — each chunk may contain multiple complete lines and
|
||||
// potentially a trailing partial line (which we store as-is; the next chunk
|
||||
// will either complete it or be another partial).
|
||||
const lines = data.split('\n');
|
||||
|
||||
// The first element of `lines` may be a continuation of the last partial
|
||||
// line from the previous append. If the buffer is non-empty and the last
|
||||
// stored entry is a partial (no trailing newline previously), glue them.
|
||||
// We detect "partial" by checking whether `data` ended with '\n' — if it
|
||||
// did, the last element after split is '' (empty) which we drop.
|
||||
const endedWithNewline = data.endsWith('\n');
|
||||
if (endedWithNewline) {
|
||||
// The final empty-string element is discarded.
|
||||
lines.pop();
|
||||
}
|
||||
|
||||
if (buf.length > 0 && lines.length > 0) {
|
||||
// Concatenate the last partial line in the buffer with the first split
|
||||
// segment. This avoids splitting ANSI sequences or text across chunks.
|
||||
buf[buf.length - 1] = (buf[buf.length - 1] ?? '') + (lines[0] ?? '');
|
||||
lines.shift();
|
||||
}
|
||||
|
||||
for (const line of lines) {
|
||||
buf.push(line);
|
||||
}
|
||||
|
||||
// Trim from head if over maxLines
|
||||
if (buf.length > maxLines) {
|
||||
buf = buf.slice(buf.length - maxLines);
|
||||
ringBuffers.set(paneId, buf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Search the ring buffer for a pane using a regex pattern.
|
||||
* Returns matches with optional context lines before and after each match.
|
||||
*/
|
||||
export function searchRingBuffer(
|
||||
paneId: string,
|
||||
pattern: string,
|
||||
opts?: { limit?: number; context?: number },
|
||||
): SearchMatch[] {
|
||||
const buf = ringBuffers.get(paneId);
|
||||
if (!buf || buf.length === 0) return [];
|
||||
|
||||
const limit = opts?.limit ?? 50;
|
||||
const context = opts?.context ?? 0;
|
||||
|
||||
let re: RegExp;
|
||||
try {
|
||||
re = new RegExp(pattern, 'u');
|
||||
} catch {
|
||||
return []; // invalid regex — caller should validate, but be defensive
|
||||
}
|
||||
|
||||
const results: SearchMatch[] = [];
|
||||
|
||||
for (let i = 0; i < buf.length; i++) {
|
||||
if (results.length >= limit) break;
|
||||
if (re.test(buf[i]!)) {
|
||||
const contextBefore: string[] = [];
|
||||
const contextAfter: string[] = [];
|
||||
for (let c = 1; c <= context; c++) {
|
||||
const ci = i - c;
|
||||
if (ci >= 0) contextBefore.unshift(buf[ci]!);
|
||||
}
|
||||
for (let c = 1; c <= context; c++) {
|
||||
const ci = i + c;
|
||||
if (ci < buf.length) contextAfter.push(buf[ci]!);
|
||||
}
|
||||
results.push({
|
||||
line: i + 1, // 1-based line number for display
|
||||
content: buf[i]!,
|
||||
contextBefore,
|
||||
contextAfter,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the ring buffer for a pane. Called on session kill / pane close.
|
||||
*/
|
||||
export function clearBuffer(paneId: string): void {
|
||||
ringBuffers.delete(paneId);
|
||||
}
|
||||
|
||||
167
apps/booterm/src/routes/search.ts
Normal file
167
apps/booterm/src/routes/search.ts
Normal file
@@ -0,0 +1,167 @@
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import { z } from 'zod';
|
||||
import { sanitizeId, tmuxSessionName, capturePane } from '../pty/manager.js';
|
||||
import { searchRingBuffer, clearBuffer } from '../pty/registry.js';
|
||||
|
||||
const ParamsSchema = z.object({
|
||||
sid: z.string(),
|
||||
pid: z.string(),
|
||||
});
|
||||
|
||||
const MAX_PATTERN_LENGTH = 200;
|
||||
|
||||
// Zod-refined string: reject empty and overly-long patterns to prevent ReDoS
|
||||
const PatternQuerySchema = z
|
||||
.string()
|
||||
.min(1, 'pattern is required')
|
||||
.max(MAX_PATTERN_LENGTH, `pattern must not exceed ${MAX_PATTERN_LENGTH} characters`);
|
||||
|
||||
const QuerySchema = z.object({
|
||||
pattern: PatternQuerySchema,
|
||||
limit: z.coerce.number().int().min(1).max(500).default(50),
|
||||
context: z.coerce.number().int().min(0).max(50).default(0),
|
||||
});
|
||||
|
||||
interface SearchMatch {
|
||||
line: number;
|
||||
content: string;
|
||||
contextBefore: string[];
|
||||
contextAfter: string[];
|
||||
}
|
||||
|
||||
interface SearchResponse {
|
||||
matches: SearchMatch[];
|
||||
total: number;
|
||||
truncated: boolean;
|
||||
source: 'ring' | 'capture';
|
||||
}
|
||||
|
||||
/**
|
||||
* Search a captured pane buffer using a regex. This is the fallback path
|
||||
* when the ring buffer doesn't have enough matches.
|
||||
*/
|
||||
function grepBuffer(
|
||||
text: string,
|
||||
pattern: string,
|
||||
limit: number,
|
||||
context: number,
|
||||
): SearchMatch[] {
|
||||
let re: RegExp;
|
||||
try {
|
||||
re = new RegExp(pattern, 'u');
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
|
||||
const lines = text.split('\n');
|
||||
const results: SearchMatch[] = [];
|
||||
|
||||
for (let i = 0; i < lines.length; i++) {
|
||||
if (results.length >= limit) break;
|
||||
if (re.test(lines[i]!)) {
|
||||
const contextBefore: string[] = [];
|
||||
const contextAfter: string[] = [];
|
||||
for (let c = 1; c <= context; c++) {
|
||||
const ci = i - c;
|
||||
if (ci >= 0) contextBefore.unshift(lines[ci]!);
|
||||
}
|
||||
for (let c = 1; c <= context; c++) {
|
||||
const ci = i + c;
|
||||
if (ci < lines.length) contextAfter.push(lines[ci]!);
|
||||
}
|
||||
results.push({
|
||||
line: i + 1,
|
||||
content: lines[i]!,
|
||||
contextBefore,
|
||||
contextAfter,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
export function registerSearchRoutes(app: FastifyInstance, tmuxConfPath: string): void {
|
||||
app.get<{
|
||||
Params: { sid: string; pid: string };
|
||||
Querystring: { pattern?: string; limit?: string; context?: string };
|
||||
}>(
|
||||
'/api/term/sessions/:sid/panes/:pid/search',
|
||||
async (req, reply) => {
|
||||
const p = ParamsSchema.safeParse(req.params);
|
||||
if (!p.success) return reply.code(400).send({ error: 'bad_params' });
|
||||
|
||||
const sid = sanitizeId(p.data.sid);
|
||||
const pid = sanitizeId(p.data.pid);
|
||||
if (!sid || !pid) return reply.code(400).send({ error: 'bad_id_format' });
|
||||
|
||||
const q = QuerySchema.safeParse(req.query);
|
||||
if (!q.success) {
|
||||
return reply.code(400).send({
|
||||
error: 'bad_query',
|
||||
details: q.error.flatten().fieldErrors,
|
||||
});
|
||||
}
|
||||
|
||||
const { pattern, limit, context } = q.data;
|
||||
|
||||
// ── Path 1: ring buffer search (fast, no tmux interaction) ──
|
||||
const ringMatches = searchRingBuffer(pid, pattern, { limit, context });
|
||||
if (ringMatches.length >= limit) {
|
||||
return reply.code(200).send({
|
||||
matches: ringMatches,
|
||||
total: ringMatches.length,
|
||||
truncated: ringMatches.length >= limit,
|
||||
source: 'ring' as const,
|
||||
});
|
||||
}
|
||||
|
||||
// ── Path 2: capture-pane + grep fallback (10s timeout) ──
|
||||
const sessionName = tmuxSessionName(pid);
|
||||
|
||||
let capture: string;
|
||||
try {
|
||||
capture = await withTimeout(
|
||||
capturePane(tmuxConfPath, sessionName, 5000),
|
||||
10_000,
|
||||
);
|
||||
} catch (err) {
|
||||
req.log.warn({ err, pid }, 'capture-pane timed out or failed');
|
||||
return reply.code(200).send({
|
||||
matches: ringMatches,
|
||||
total: ringMatches.length,
|
||||
truncated: false,
|
||||
source: 'ring' as const,
|
||||
});
|
||||
}
|
||||
|
||||
if (!capture) {
|
||||
// tmux pane may no longer exist — return whatever ring had
|
||||
return reply.code(200).send({
|
||||
matches: ringMatches,
|
||||
total: ringMatches.length,
|
||||
truncated: false,
|
||||
source: 'ring' as const,
|
||||
});
|
||||
}
|
||||
|
||||
const captureMatches = grepBuffer(capture, pattern, limit, context);
|
||||
|
||||
return reply.code(200).send({
|
||||
matches: captureMatches,
|
||||
total: captureMatches.length,
|
||||
truncated: captureMatches.length >= limit,
|
||||
source: 'capture' as const,
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
|
||||
return Promise.race([
|
||||
promise,
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('timeout')), ms),
|
||||
),
|
||||
]);
|
||||
}
|
||||
@@ -9,7 +9,7 @@ import {
|
||||
} from '../pty/manager.js';
|
||||
import { attachPty } from '../pty/pty.js';
|
||||
import { getUser } from '../auth.js';
|
||||
import { register, unregister } from '../pty/registry.js';
|
||||
import { register, unregister, appendOutput } from '../pty/registry.js';
|
||||
|
||||
export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string): void {
|
||||
app.get<{
|
||||
@@ -106,6 +106,8 @@ export function registerWsAttachRoute(app: FastifyInstance, tmuxConfPath: string
|
||||
} catch (err) {
|
||||
req.log.warn({ err }, 'ws send failed');
|
||||
}
|
||||
// Feed the ring buffer for pattern-based search
|
||||
appendOutput(pid, data);
|
||||
};
|
||||
handle.onData(onData);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user