Files
boocode/apps/control/src/services/sse-pipeline.ts

211 lines
7.4 KiB
TypeScript

import type { FleetState } from './fleet-state.js';
import { ensureHostState, stampLastSeen, incrementSeq } from './fleet-state.js';
import type { LlamaSweepSSEEvent, MetricsEntry } from './fleet-connector.js';
import type { LogRelay } from './log-relay.js';
import type { DeltaEmitter } from './delta-emitter.js';
import type { getSql } from '../db.js';
import type { loadConfig } from '../config.js';
import { trimCapture, parseCaptureJson } from './retention.js';
import { detectGap } from './reconcile.js';
export interface MappedMetricsEntry {
id: number;
ts: string;
model: string;
req_path: string;
status_code: number;
duration_ms: number;
cache_tokens: number;
input_tokens: number;
output_tokens: number;
prompt_tps: number;
gen_tps: number;
has_capture: boolean;
/** P4: NULL for ring data -- ActivityLogEntry does not carry request headers. */
source: string | null;
}
export function mapMetricsEntry(entry: MetricsEntry): MappedMetricsEntry {
return {
id: entry.id,
ts: entry.timestamp,
model: entry.model,
req_path: entry.req_path,
status_code: entry.resp_status_code,
duration_ms: entry.duration_ms,
cache_tokens: entry.tokens.cache_tokens,
input_tokens: entry.tokens.input_tokens,
output_tokens: entry.tokens.output_tokens,
prompt_tps: entry.tokens.prompt_per_second,
gen_tps: entry.tokens.tokens_per_second,
has_capture: entry.has_capture,
source: null,
};
}
export async function handleLlamaSweepEvent(
fleet: FleetState,
sql: ReturnType<typeof getSql>,
config: ReturnType<typeof loadConfig>,
providerId: string,
emitter: DeltaEmitter,
event: LlamaSweepSSEEvent,
logRelay: LogRelay | null = null,
): Promise<void> {
const state = ensureHostState(fleet, providerId);
stampLastSeen(state);
switch (event.type) {
case 'modelStatus': {
// Real payload: FULL-FLEET array of {id, state, ...} (fork apiModel).
// Derive transitions by diffing against current state; persist only changes.
state.liveness = 'connected';
const changed: Array<{ model: string; state: string }> = [];
for (const m of event.data) {
const prev = state.models.get(m.id);
if (!prev || prev.state !== m.state) {
changed.push({ model: m.id, state: m.state });
}
state.models.set(m.id, {
model: m.id,
state: m.state,
ts: new Date(),
ttlDeadline: prev?.ttlDeadline ?? null,
inflight: prev?.inflight ?? 0,
});
}
if (changed.length === 0) break;
const seq = incrementSeq(state);
for (const c of changed) {
await sql`
INSERT INTO control_model_events (provider_id, model, state, ts, detail)
VALUES (${providerId}, ${c.model}, ${c.state}, clock_timestamp(), ${sql.json({} as never)})
ON CONFLICT (provider_id, model, state, ts) DO NOTHING
`;
}
emitter.publish({
type: 'control_fleet' as const,
seq,
hosts: [{
providerId: state.providerId,
liveness: state.liveness,
lastSeenAt: state.lastSeenAt?.toISOString() ?? null,
seq: state.seq,
models: Array.from(state.models.values()).map((m) => ({
model: m.model,
state: m.state,
ts: m.ts.toISOString(),
ttlDeadline: m.ttlDeadline?.toISOString() ?? null,
inflight: m.inflight,
})),
}],
});
break;
}
case 'logData': {
const source = event.data.source as 'proxy' | 'upstream' | 'model';
const text = event.data.data;
if (logRelay) {
logRelay.append(providerId, source, text);
}
const seq = incrementSeq(state);
emitter.publish({
type: 'control_log' as const,
seq,
providerId,
source,
line: text,
ts: new Date().toISOString(),
});
break;
}
case 'metrics': {
const entries = event.data;
await handleReconcile(fleet, sql, config, providerId, emitter, event.data).catch((err) => {
const msg = (err as Error).message ?? String(err);
console.warn({ providerId, err: msg }, 'fleet: reconcile failed');
});
for (const entry of entries) {
const captureTrimmed = entry.capture ? trimCapture(entry.capture, config.CAPTURE_SIZE_KB) : null;
const captureObj = captureTrimmed ? parseCaptureJson(captureTrimmed) : null;
const mapped = mapMetricsEntry(entry);
await sql`
INSERT INTO control_requests (provider_id, swap_entry_id, ts, model, req_path, status_code, duration_ms, cache_tokens, input_tokens, output_tokens, prompt_tps, gen_tps, has_capture, capture, source)
VALUES (${providerId}, ${mapped.id}, ${mapped.ts}, ${mapped.model}, ${mapped.req_path}, ${mapped.status_code}, ${mapped.duration_ms}, ${mapped.cache_tokens}, ${mapped.input_tokens}, ${mapped.output_tokens}, ${mapped.prompt_tps}, ${mapped.gen_tps}, ${mapped.has_capture}, ${captureObj ? sql.json(captureObj as never) : sql`NULL::jsonb`}, ${mapped.source})
ON CONFLICT (provider_id, swap_entry_id, ts) DO NOTHING
`;
emitter.publish({
type: 'control_activity' as const,
seq: state.seq,
providerId,
entry: {
id: mapped.id,
ts: mapped.ts,
model: mapped.model,
reqPath: mapped.req_path,
statusCode: mapped.status_code,
durationMs: mapped.duration_ms,
},
});
}
break;
}
case 'inflight': {
state.inflightTotal = event.data.total;
break;
}
}
}
async function handleReconcile(
fleet: FleetState,
sql: ReturnType<typeof getSql>,
config: ReturnType<typeof loadConfig>,
providerId: string,
emitter: DeltaEmitter,
metrics: MetricsEntry[],
): Promise<boolean> {
const state = ensureHostState(fleet, providerId);
stampLastSeen(state);
state.liveness = 'connected';
const entries = metrics ?? [];
const oldestReconcileTs = entries.length > 0
? entries[entries.length - 1]!.timestamp
: null;
if (oldestReconcileTs) {
const newestPersisted = await sql<{ ts: string }[]>`
SELECT ts FROM control_requests
WHERE provider_id = ${providerId}
ORDER BY ts DESC LIMIT 1
`;
if (newestPersisted.length > 0) {
const newestRow = newestPersisted[0]!;
if (detectGap(oldestReconcileTs, newestRow.ts)) {
await sql`
INSERT INTO control_model_events (provider_id, model, state, ts, detail)
VALUES (${providerId}, '*', 'gap_suspected', clock_timestamp(), ${sql.json({
oldestReconcile: oldestReconcileTs,
newestPersisted: newestRow.ts,
} as never)})
ON CONFLICT (provider_id, model, state, ts) DO NOTHING
`;
}
}
}
for (const entry of entries) {
const mapped = mapMetricsEntry(entry);
await sql`
INSERT INTO control_requests (provider_id, swap_entry_id, ts, model, req_path, status_code, duration_ms, cache_tokens, input_tokens, output_tokens, prompt_tps, gen_tps, has_capture, source)
VALUES (${providerId}, ${mapped.id}, ${mapped.ts}, ${mapped.model}, ${mapped.req_path}, ${mapped.status_code}, ${mapped.duration_ms}, ${mapped.cache_tokens}, ${mapped.input_tokens}, ${mapped.output_tokens}, ${mapped.prompt_tps}, ${mapped.gen_tps}, ${mapped.has_capture}, ${mapped.source})
ON CONFLICT (provider_id, swap_entry_id, ts) DO NOTHING
`;
}
void emitter;
return true;
}