feat(web,coder): arena pane — compare 2-6 AI competitors on same prompt
Arena is a new pane kind for competitive AI evaluation. A Battle runs the same prompt against 2-6 Contestants across two concurrent lanes: local lane (llama-swap models, serial) and cloud lane (parallel). Added to all three registries: @boocode/contracts WsFrameSchema, server InferenceFrame, and web WsFrame. Backend (apps/coder): - arena-runner: battle scheduler, lane classifier, benchmark, results writer, resume, user winner override - arena-analyzer: two-stage digest→judge analysis on DEFAULT_MODEL - arena-decisions: status transitions and resume logic (unit-tested) - arena-analyzer-helpers: pure helper functions (unit-tested) - arena-model-call: model call utility for analysis - arena routes: create/get/list/stop/analyze/cross-examine/winner/diff - schema: battles, contestants, cross_examinations tables (idempotent) - remove old /api/arena* routes and tasks.arena_id column Frontend (apps/web): - ArenaLauncherDialog: battle type, prompt, contestant selection - ArenaPane: live roster, streaming output, analysis, cross-exam - DiffView: unified diff with line-by-line color for coding contests - Winner override per-row dropdown (Trophy icon) - battle_updated WS handler for live winner/analysis updates - arena pane kind in Workspace, ChatTabBar, useSidebar Cross-app: - ArenaState and ArenaContestantShape/WsFrame types (contracts) - battle_* frames in WsFrameSchema, InferenceFrame, and web WsFrame - manifest.json written per battle results folder - /Arena added to .gitignore Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
895
apps/coder/src/services/arena-runner.ts
Normal file
895
apps/coder/src/services/arena-runner.ts
Normal file
@@ -0,0 +1,895 @@
|
||||
/**
|
||||
* Arena battle-runner — DB-backed execution engine for Arena battles.
|
||||
*
|
||||
* Mirrors flow-runner.ts but implements the Arena's two-lane scheduler instead
|
||||
* of the Orchestrator's wave scheduler. Persists to battles/contestants tables
|
||||
* (not flow_runs/flow_steps). Each contestant is dispatched as a real tasks row
|
||||
* via an injected DispatchContestantFn (Phase 4 wires this to the dispatcher).
|
||||
* Advances on the dispatcher's onTaskTerminal hook.
|
||||
*
|
||||
* Scheduling:
|
||||
* - Cloud lane: all contestants start immediately, in parallel.
|
||||
* - Local lane: contestants run strictly one at a time (serial queue). Only
|
||||
* the first local contestant runs at start; the next is dispatched when the
|
||||
* current one terminates. Both lanes run concurrently with each other.
|
||||
*
|
||||
* Results:
|
||||
* Written to <projectRoot>/Arena/<battleSlug>/<identity>-<model>/
|
||||
* Coding: result.md + diff.patch (from the contestant's worktree).
|
||||
* Q&A: result.md with the text answer.
|
||||
*
|
||||
* Analyzer seam:
|
||||
* onBattleComplete is called when all contestants are terminal. Phase 5 wires
|
||||
* this to the two-stage digest→judge analyzer. A failed contestant does NOT
|
||||
* abort the battle — others continue and the analyzer judges survivors.
|
||||
*/
|
||||
import type { Sql } from '../db.js';
|
||||
import type { Broker } from '@boocode/server/broker';
|
||||
import type { WsFrame } from '@boocode/contracts/ws-frames';
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { BattleType, ContestantLane } from '@boocode/contracts/arena';
|
||||
import { mkdir, writeFile } from 'node:fs/promises';
|
||||
import { join } from 'node:path';
|
||||
import { diffWorktree } from './worktrees.js';
|
||||
import {
|
||||
buildBattleSlug,
|
||||
buildContestantDir,
|
||||
classifyLane,
|
||||
computeBenchmark,
|
||||
isBattleComplete,
|
||||
nextLocalContestant,
|
||||
reconcileContestants,
|
||||
type ContestantResumeAction,
|
||||
type ContestantSlot,
|
||||
} from './arena-decisions.js';
|
||||
|
||||
// ─── Public types ─────────────────────────────────────────────────────────────
|
||||
|
||||
export interface ContestantSpec {
|
||||
/** Backend name (coding) or persona name (qa). */
|
||||
identity: string;
|
||||
model: string;
|
||||
}
|
||||
|
||||
export interface BattleStartOpts {
|
||||
projectId: string;
|
||||
battleType: BattleType;
|
||||
prompt: string;
|
||||
/** 2–6 contestants. Duplicate (identity, model) pairs are rejected by the schema UNIQUE constraint. */
|
||||
contestants: ContestantSpec[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Injected dispatch function — Phase 4 wires this to the real task inserter.
|
||||
* Must INSERT a tasks row and return its id. The arena-runner sets the
|
||||
* contestant's task_id and status after this call.
|
||||
* `sessionId` is returned when already known (Q&A pre-creates the session);
|
||||
* null for coding contestants whose session is created lazily by the dispatcher.
|
||||
*/
|
||||
export type DispatchContestantFn = (opts: {
|
||||
projectId: string;
|
||||
contestantId: string;
|
||||
prompt: string;
|
||||
identity: string;
|
||||
model: string;
|
||||
battleType: BattleType;
|
||||
}) => Promise<{ taskId: string; sessionId: string | null }>;
|
||||
|
||||
/**
|
||||
* Called once when every contestant in a battle has reached a terminal state.
|
||||
* Phase 5 wires this to the two-stage digest→judge analyzer.
|
||||
* Must never throw — the caller swallows errors.
|
||||
*/
|
||||
export type OnBattleComplete = (battleId: string) => void;
|
||||
|
||||
/**
|
||||
* Called after a cross_examinations row has been inserted, with its id.
|
||||
* Phase 5 wires this to the analyzer's cross-examination runner.
|
||||
* Must never throw — the caller swallows errors.
|
||||
*/
|
||||
export type OnCrossExamStart = (opts: {
|
||||
battleId: string;
|
||||
crossExamId: string;
|
||||
identity: string;
|
||||
model: string;
|
||||
}) => void;
|
||||
|
||||
export interface BattleRunner {
|
||||
/** Start a battle: persist it + its contestants, classify lanes, dispatch initial wave. */
|
||||
startBattle(opts: BattleStartOpts): Promise<{ battleId: string }>;
|
||||
/**
|
||||
* Wire to createDispatcher({ onTaskTerminal }). Fires when ANY task settles;
|
||||
* the runner ignores tasks it doesn't own. Never throws.
|
||||
*/
|
||||
handleTaskTerminal(taskId: string, state: string): void;
|
||||
/**
|
||||
* Re-advance any battles still marked 'running' after a coder restart.
|
||||
* Mirrors flow-runner's initResume (D-9). Never throws.
|
||||
*/
|
||||
initResume(): Promise<void>;
|
||||
/**
|
||||
* Cancel a running battle. Marks it and all non-terminal contestants cancelled,
|
||||
* publishes frames, and returns the task_ids of in-flight contestants so the
|
||||
* route can abort them via the dispatcher's cancelExternalTask.
|
||||
*/
|
||||
cancelBattle(battleId: string): Promise<{ cancelled: boolean; taskIds: string[] }>;
|
||||
/**
|
||||
* Trigger analysis for a completed (or manually re-analyzed) battle.
|
||||
* Phase 5 wires this to the two-stage digest→judge analyzer. For now, calls
|
||||
* the injected onBattleComplete seam directly.
|
||||
*/
|
||||
triggerAnalysis(battleId: string): Promise<{ triggered: boolean }>;
|
||||
/**
|
||||
* Start a cross-examination on a battle. Inserts a cross_examinations row and
|
||||
* invokes the analyzer seam. Phase 5 fills the actual verdict logic.
|
||||
*/
|
||||
startCrossExam(
|
||||
battleId: string,
|
||||
opts: { identity: string; model: string },
|
||||
): Promise<{ crossExamId: string }>;
|
||||
/**
|
||||
* Manually set (or clear) the winner. Validates the contestant belongs to the
|
||||
* battle, updates battles.winner_contestant_id, and publishes a battle_updated
|
||||
* frame so the pane reflects the override immediately.
|
||||
*/
|
||||
setWinner(battleId: string, winnerId: string | null): Promise<{
|
||||
ok: boolean;
|
||||
notFound?: boolean;
|
||||
invalidContestant?: boolean;
|
||||
}>;
|
||||
}
|
||||
|
||||
// ─── Internal row shapes ──────────────────────────────────────────────────────
|
||||
|
||||
interface ContestantRow {
|
||||
id: string;
|
||||
battle_id: string;
|
||||
identity: string;
|
||||
model: string;
|
||||
lane: ContestantLane;
|
||||
task_id: string | null;
|
||||
worktree_id: string | null;
|
||||
status: string;
|
||||
}
|
||||
|
||||
interface BattleRow {
|
||||
id: string;
|
||||
project_id: string;
|
||||
battle_type: BattleType;
|
||||
prompt: string;
|
||||
status: string;
|
||||
results_path: string | null;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
// ─── Deps / factory ───────────────────────────────────────────────────────────
|
||||
|
||||
interface Deps {
|
||||
sql: Sql;
|
||||
broker: Broker;
|
||||
log: FastifyBaseLogger;
|
||||
dispatch: DispatchContestantFn;
|
||||
onBattleComplete: OnBattleComplete;
|
||||
/**
|
||||
* Called after a cross_examinations row is inserted. Phase 5 wires this to
|
||||
* the analyzer's cross-examination runner. Optional: absent → no cross-exam
|
||||
* logic runs (stub behaviour for tests).
|
||||
*/
|
||||
onCrossExamStart?: OnCrossExamStart;
|
||||
/**
|
||||
* Model IDs served by the local llama-swap server. Used for lane classification:
|
||||
* a contestant whose model is in this set runs in the local lane (serial, GPU-fair).
|
||||
* Q&A contestants are always local regardless of this set.
|
||||
* Defaults to an empty set → all coding contestants go to the cloud lane.
|
||||
*/
|
||||
localModels?: ReadonlySet<string>;
|
||||
}
|
||||
|
||||
const DEFAULT_LOCAL_MODELS: ReadonlySet<string> = new Set();
|
||||
|
||||
export function createBattleRunner(deps: Deps): BattleRunner {
|
||||
const { sql, broker, log, dispatch, onBattleComplete, onCrossExamStart } = deps;
|
||||
const localModels = deps.localModels ?? DEFAULT_LOCAL_MODELS;
|
||||
|
||||
// Serialize local-lane advance per battle so two near-simultaneous terminal
|
||||
// callbacks don't double-dispatch the next local contestant.
|
||||
const advanceChain = new Map<string, Promise<void>>();
|
||||
|
||||
// Delta bridge: per-contestant broker unsubscribe functions.
|
||||
// 'terminated' sentinel prevents a late-arriving setupDeltaBridge from
|
||||
// registering a subscription that would never be cleaned up.
|
||||
const deltaUnsubs = new Map<string, (() => void) | 'terminated'>();
|
||||
|
||||
function publishUser(frame: Record<string, unknown>): void {
|
||||
broker.publishUserFrame('default', frame as unknown as WsFrame);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to the contestant's inference session and forward delta frames
|
||||
* to the user channel as contestant_updated{delta}. Polls for session_id
|
||||
* when not immediately known (coding contestants whose session is created
|
||||
* lazily by the dispatcher). Unsubscribes on termination or max retries.
|
||||
*/
|
||||
async function setupDeltaBridge(
|
||||
battleId: string,
|
||||
contestantId: string,
|
||||
taskId: string,
|
||||
knownSessionId: string | null,
|
||||
): Promise<void> {
|
||||
let sessionId = knownSessionId;
|
||||
if (!sessionId) {
|
||||
// Coding contestant: session_id is written by the dispatcher just before
|
||||
// inference starts. Poll until it appears or the contestant terminates.
|
||||
for (let i = 0; i < 50; i++) {
|
||||
if (deltaUnsubs.get(contestantId) === 'terminated') return;
|
||||
const [row] = await sql<{ session_id: string | null }[]>`
|
||||
SELECT session_id FROM tasks WHERE id = ${taskId}
|
||||
`.catch(() => []);
|
||||
if (row?.session_id) { sessionId = row.session_id; break; }
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
}
|
||||
}
|
||||
if (!sessionId) return;
|
||||
if (deltaUnsubs.get(contestantId) === 'terminated') return;
|
||||
|
||||
const unsub = broker.subscribe(sessionId, (frame) => {
|
||||
if (frame.type === 'delta') {
|
||||
const deltaContent = (frame as unknown as { content?: unknown }).content;
|
||||
if (typeof deltaContent === 'string') {
|
||||
publishUser({
|
||||
type: 'contestant_updated',
|
||||
battle_id: battleId,
|
||||
contestant_id: contestantId,
|
||||
delta: deltaContent,
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const existing = deltaUnsubs.get(contestantId);
|
||||
if (existing === 'terminated') {
|
||||
unsub();
|
||||
} else {
|
||||
deltaUnsubs.set(contestantId, unsub);
|
||||
}
|
||||
}
|
||||
|
||||
function teardownDeltaBridge(contestantId: string): void {
|
||||
const entry = deltaUnsubs.get(contestantId);
|
||||
if (typeof entry === 'function') {
|
||||
entry();
|
||||
deltaUnsubs.delete(contestantId);
|
||||
} else {
|
||||
deltaUnsubs.set(contestantId, 'terminated');
|
||||
}
|
||||
}
|
||||
|
||||
// ─── startBattle ────────────────────────────────────────────────────────────
|
||||
|
||||
async function startBattle(opts: BattleStartOpts): Promise<{ battleId: string }> {
|
||||
if (opts.contestants.length < 2 || opts.contestants.length > 6) {
|
||||
throw new Error(`battle requires 2–6 contestants; got ${opts.contestants.length}`);
|
||||
}
|
||||
|
||||
const [proj] = await sql<{ path: string }[]>`SELECT path FROM projects WHERE id = ${opts.projectId}`;
|
||||
if (!proj) throw new Error(`project not found: ${opts.projectId}`);
|
||||
|
||||
// Insert the battle row as 'running'; update results_path once we have the id.
|
||||
const [battle] = await sql<{ id: string; created_at: Date }[]>`
|
||||
INSERT INTO battles (project_id, battle_type, prompt, status)
|
||||
VALUES (${opts.projectId}, ${opts.battleType}, ${opts.prompt}, 'running')
|
||||
RETURNING id, created_at
|
||||
`;
|
||||
const battleId = battle!.id;
|
||||
const battleSlug = buildBattleSlug(battleId, opts.battleType, battle!.created_at);
|
||||
const resultsPath = join(proj.path, 'Arena', battleSlug);
|
||||
|
||||
await sql`
|
||||
UPDATE battles SET results_path = ${resultsPath}, updated_at = clock_timestamp()
|
||||
WHERE id = ${battleId}
|
||||
`;
|
||||
|
||||
// Insert all contestant rows with lane classification.
|
||||
const contestantRows: Array<{ id: string; identity: string; model: string; lane: ContestantLane }> = [];
|
||||
for (const spec of opts.contestants) {
|
||||
const lane = classifyLane(opts.battleType, spec.identity, spec.model, localModels);
|
||||
const [row] = await sql<{ id: string }[]>`
|
||||
INSERT INTO contestants (battle_id, identity, model, lane, status)
|
||||
VALUES (${battleId}, ${spec.identity}, ${spec.model}, ${lane}, 'queued')
|
||||
RETURNING id
|
||||
`;
|
||||
contestantRows.push({ id: row!.id, identity: spec.identity, model: spec.model, lane });
|
||||
}
|
||||
|
||||
// Write initial manifest so the results folder is always populated.
|
||||
await writeManifest(
|
||||
battleId, resultsPath, opts.battleType, opts.prompt, battle!.created_at,
|
||||
contestantRows.map((c) => ({ identity: c.identity, model: c.model, lane: c.lane })),
|
||||
null,
|
||||
).catch((err) => {
|
||||
log.warn({ err: errMsg(err), battleId }, 'arena-runner: initial manifest write failed');
|
||||
});
|
||||
|
||||
publishUser({
|
||||
type: 'battle_started',
|
||||
battle_id: battleId,
|
||||
battle_type: opts.battleType,
|
||||
prompt: opts.prompt,
|
||||
contestants: contestantRows.map((c) => ({
|
||||
id: c.id,
|
||||
identity: c.identity,
|
||||
model: c.model,
|
||||
lane: c.lane,
|
||||
})),
|
||||
});
|
||||
|
||||
// Dispatch: cloud lane starts all contestants in parallel; local lane starts
|
||||
// only the first queued contestant (serial queue).
|
||||
let localStarted = false;
|
||||
for (const c of contestantRows) {
|
||||
if (c.lane === 'cloud') {
|
||||
await dispatchContestant(battleId, opts.projectId, opts.battleType, opts.prompt, c);
|
||||
} else if (!localStarted) {
|
||||
await dispatchContestant(battleId, opts.projectId, opts.battleType, opts.prompt, c);
|
||||
localStarted = true;
|
||||
// remaining local contestants stay 'queued' until this one finishes
|
||||
}
|
||||
}
|
||||
|
||||
return { battleId };
|
||||
}
|
||||
|
||||
async function dispatchContestant(
|
||||
battleId: string,
|
||||
projectId: string,
|
||||
battleType: BattleType,
|
||||
prompt: string,
|
||||
c: { id: string; identity: string; model: string; lane: ContestantLane },
|
||||
): Promise<void> {
|
||||
const { taskId, sessionId } = await dispatch({
|
||||
projectId,
|
||||
contestantId: c.id,
|
||||
prompt,
|
||||
identity: c.identity,
|
||||
model: c.model,
|
||||
battleType,
|
||||
});
|
||||
await sql`
|
||||
UPDATE contestants
|
||||
SET task_id = ${taskId}, status = 'running', updated_at = clock_timestamp()
|
||||
WHERE id = ${c.id}
|
||||
`;
|
||||
publishContestantFrame(battleId, c.id, { status: 'running' });
|
||||
// Start the delta bridge in the background; unsubscribe when the contestant
|
||||
// terminates (teardownDeltaBridge called in handleTaskTerminal).
|
||||
void setupDeltaBridge(battleId, c.id, taskId, sessionId ?? null);
|
||||
}
|
||||
|
||||
// ─── local-lane advance (serialized per battle) ───────────────────────────
|
||||
|
||||
function advanceLocalLane(battleId: string): Promise<void> {
|
||||
const prev = advanceChain.get(battleId) ?? Promise.resolve();
|
||||
const next = prev
|
||||
.catch(() => {})
|
||||
.then(() =>
|
||||
advanceLocalLaneInner(battleId).catch((err) => {
|
||||
log.error({ err: errMsg(err), battleId }, 'arena-runner: advanceLocalLane failed');
|
||||
}),
|
||||
);
|
||||
advanceChain.set(battleId, next);
|
||||
void next.finally(() => {
|
||||
if (advanceChain.get(battleId) === next) advanceChain.delete(battleId);
|
||||
});
|
||||
return next;
|
||||
}
|
||||
|
||||
async function advanceLocalLaneInner(battleId: string): Promise<void> {
|
||||
const battle = await loadBattle(battleId);
|
||||
if (!battle || battle.status !== 'running') return;
|
||||
|
||||
const contestants = await loadContestants(battleId);
|
||||
const slots: ContestantSlot[] = contestants.map((c) => ({
|
||||
id: c.id,
|
||||
lane: c.lane,
|
||||
status: c.status,
|
||||
}));
|
||||
|
||||
// Nothing to do if the local lane is still busy.
|
||||
const localRunning = slots.some((c) => c.lane === 'local' && c.status === 'running');
|
||||
if (localRunning) return;
|
||||
|
||||
const nextId = nextLocalContestant(slots);
|
||||
if (!nextId) return; // local queue is exhausted
|
||||
|
||||
const next = contestants.find((c) => c.id === nextId)!;
|
||||
await dispatchContestant(battleId, battle.project_id, battle.battle_type, battle.prompt, {
|
||||
id: next.id,
|
||||
identity: next.identity,
|
||||
model: next.model,
|
||||
lane: next.lane,
|
||||
});
|
||||
}
|
||||
|
||||
// ─── handleTaskTerminal ───────────────────────────────────────────────────
|
||||
|
||||
function handleTaskTerminal(taskId: string, state: string): void {
|
||||
void (async () => {
|
||||
// Look up which contestant owns this task (contestants_task_id_idx).
|
||||
const [row] = await sql<ContestantRow[]>`
|
||||
SELECT id, battle_id, identity, model, lane, task_id, worktree_id, status
|
||||
FROM contestants WHERE task_id = ${taskId}
|
||||
`;
|
||||
if (!row) return; // not an arena task — ignore
|
||||
if (row.status !== 'running') return; // already settled (idempotent)
|
||||
|
||||
const battle = await loadBattle(row.battle_id);
|
||||
|
||||
// Pull the task row for benchmark + output.
|
||||
const [task] = await sql<{
|
||||
chat_id: string | null;
|
||||
started_at: Date | null;
|
||||
ended_at: Date | null;
|
||||
cost_tokens: number | null;
|
||||
}[]>`SELECT chat_id, started_at, ended_at, cost_tokens FROM tasks WHERE id = ${taskId}`;
|
||||
|
||||
const endedAt = task?.ended_at ?? new Date();
|
||||
|
||||
if (state === 'completed') {
|
||||
const startedAt = task?.started_at ?? endedAt;
|
||||
const bench = computeBenchmark(startedAt, endedAt, task?.cost_tokens ?? null, row.lane);
|
||||
|
||||
const output = task?.chat_id ? await readChatOutput(task.chat_id) : '';
|
||||
|
||||
const resultPath = battle
|
||||
? await writeContestantResults(battle, row, output, bench).catch((err) => {
|
||||
log.warn({ err: errMsg(err), contestantId: row.id }, 'arena-runner: result write failed');
|
||||
return null;
|
||||
})
|
||||
: null;
|
||||
|
||||
await sql`
|
||||
UPDATE contestants
|
||||
SET status = 'done',
|
||||
duration_ms = ${Math.round(bench.durationMs)},
|
||||
tokens_per_sec = ${bench.tokensPerSec},
|
||||
cost_tokens = ${task?.cost_tokens ?? null},
|
||||
result_path = ${resultPath},
|
||||
updated_at = clock_timestamp()
|
||||
WHERE id = ${row.id} AND status = 'running'
|
||||
`;
|
||||
teardownDeltaBridge(row.id);
|
||||
|
||||
// Check if this was the last contestant.
|
||||
const allContestants = await loadContestants(row.battle_id);
|
||||
const battleDone = isBattleComplete(allContestants);
|
||||
|
||||
publishContestantFrame(row.battle_id, row.id, {
|
||||
status: 'done',
|
||||
duration_ms: Math.round(bench.durationMs),
|
||||
...(bench.tokensPerSec !== null ? { tokens_per_sec: bench.tokensPerSec } : {}),
|
||||
...(battleDone ? { battle_status: 'completed' } : {}),
|
||||
});
|
||||
|
||||
if (battleDone) {
|
||||
await completeBattle(row.battle_id);
|
||||
} else if (row.lane === 'local') {
|
||||
void advanceLocalLane(row.battle_id);
|
||||
}
|
||||
} else {
|
||||
// failed or cancelled — the contest continues; this contestant is error.
|
||||
const errorMsg = state === 'cancelled' ? 'cancelled' : `task ${state}`;
|
||||
await sql`
|
||||
UPDATE contestants
|
||||
SET status = 'error', error = ${errorMsg}, updated_at = clock_timestamp()
|
||||
WHERE id = ${row.id} AND status = 'running'
|
||||
`;
|
||||
teardownDeltaBridge(row.id);
|
||||
|
||||
const allContestants = await loadContestants(row.battle_id);
|
||||
const battleDone = isBattleComplete(allContestants);
|
||||
|
||||
publishContestantFrame(row.battle_id, row.id, {
|
||||
status: 'error',
|
||||
error: errorMsg,
|
||||
...(battleDone ? { battle_status: 'completed' } : {}),
|
||||
});
|
||||
|
||||
if (battleDone) {
|
||||
await completeBattle(row.battle_id);
|
||||
} else if (row.lane === 'local') {
|
||||
void advanceLocalLane(row.battle_id);
|
||||
}
|
||||
}
|
||||
})().catch((err) => {
|
||||
log.error({ err: errMsg(err), taskId }, 'arena-runner: handleTaskTerminal failed');
|
||||
});
|
||||
}
|
||||
|
||||
// ─── battle finalization ──────────────────────────────────────────────────
|
||||
|
||||
async function completeBattle(battleId: string): Promise<void> {
|
||||
const updated = await sql`
|
||||
UPDATE battles SET status = 'completed', updated_at = clock_timestamp()
|
||||
WHERE id = ${battleId} AND status = 'running'
|
||||
`;
|
||||
if (updated.count === 0) return; // already terminal (race guard)
|
||||
log.info({ battleId }, 'arena-runner: battle completed');
|
||||
|
||||
// Update manifest with finished_at timestamp.
|
||||
const completedBattle = await loadBattle(battleId);
|
||||
if (completedBattle?.results_path) {
|
||||
const contestants = await loadContestants(battleId);
|
||||
await writeManifest(
|
||||
battleId,
|
||||
completedBattle.results_path,
|
||||
completedBattle.battle_type,
|
||||
completedBattle.prompt,
|
||||
completedBattle.created_at,
|
||||
contestants.map((c) => ({ identity: c.identity, model: c.model, lane: c.lane })),
|
||||
new Date(),
|
||||
).catch((err) => {
|
||||
log.warn({ err: errMsg(err), battleId }, 'arena-runner: manifest update failed');
|
||||
});
|
||||
}
|
||||
|
||||
onBattleComplete(battleId);
|
||||
}
|
||||
|
||||
// ─── manifest writer ─────────────────────────────────────────────────────
|
||||
|
||||
async function writeManifest(
|
||||
battleId: string,
|
||||
resultsPath: string,
|
||||
battleType: BattleType,
|
||||
prompt: string,
|
||||
createdAt: Date,
|
||||
contestants: Array<{ identity: string; model: string; lane: ContestantLane }>,
|
||||
finishedAt: Date | null,
|
||||
): Promise<void> {
|
||||
await mkdir(resultsPath, { recursive: true });
|
||||
const manifest = {
|
||||
id: battleId,
|
||||
battle_type: battleType,
|
||||
prompt,
|
||||
contestants,
|
||||
created_at: createdAt.toISOString(),
|
||||
finished_at: finishedAt?.toISOString() ?? null,
|
||||
};
|
||||
await writeFile(join(resultsPath, 'manifest.json'), JSON.stringify(manifest, null, 2), 'utf8');
|
||||
}
|
||||
|
||||
// ─── results writer ───────────────────────────────────────────────────────
|
||||
|
||||
async function writeContestantResults(
|
||||
battle: BattleRow,
|
||||
contestant: { identity: string; model: string; lane: ContestantLane; worktree_id: string | null },
|
||||
output: string,
|
||||
bench: { durationMs: number; tokensPerSec: number | null },
|
||||
): Promise<string> {
|
||||
const resultsPath = await getOrBuildResultsPath(battle);
|
||||
if (!resultsPath) throw new Error('cannot resolve results path for battle ' + battle.id);
|
||||
|
||||
const contestantDir = buildContestantDir(contestant.identity, contestant.model);
|
||||
const dir = join(resultsPath, contestantDir);
|
||||
await mkdir(dir, { recursive: true });
|
||||
|
||||
const benchLines = [
|
||||
`duration: ${bench.durationMs}ms`,
|
||||
bench.tokensPerSec != null ? `tokens/sec: ${bench.tokensPerSec.toFixed(1)}` : null,
|
||||
]
|
||||
.filter(Boolean)
|
||||
.join('\n');
|
||||
|
||||
const resultMd =
|
||||
`# ${contestant.identity} / ${contestant.model}\n\n` +
|
||||
`## Benchmark\n\n${benchLines}\n\n` +
|
||||
`## Output\n\n${output}\n`;
|
||||
await writeFile(join(dir, 'result.md'), resultMd, 'utf8');
|
||||
|
||||
if (battle.battle_type === 'coding' && contestant.worktree_id) {
|
||||
const [wt] = await sql<{ path: string; base_commit: string | null }[]>`
|
||||
SELECT path, base_commit FROM worktrees WHERE id = ${contestant.worktree_id}
|
||||
`;
|
||||
if (wt) {
|
||||
const [proj] = await sql<{ path: string }[]>`
|
||||
SELECT path FROM projects WHERE id = ${battle.project_id}
|
||||
`;
|
||||
if (proj) {
|
||||
const diff = await diffWorktree(wt.path, proj.path, {
|
||||
baseRef: wt.base_commit ?? undefined,
|
||||
}).catch(() => '');
|
||||
await writeFile(join(dir, 'diff.patch'), diff, 'utf8');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return dir;
|
||||
}
|
||||
|
||||
/** Resolve or rebuild results_path for a battle (handles crash-before-UPDATE). */
|
||||
async function getOrBuildResultsPath(battle: BattleRow): Promise<string | null> {
|
||||
if (battle.results_path) return battle.results_path;
|
||||
const [proj] = await sql<{ path: string }[]>`SELECT path FROM projects WHERE id = ${battle.project_id}`;
|
||||
if (!proj) return null;
|
||||
const slug = buildBattleSlug(battle.id, battle.battle_type, battle.created_at);
|
||||
const resultsPath = join(proj.path, 'Arena', slug);
|
||||
await sql`
|
||||
UPDATE battles SET results_path = ${resultsPath}, updated_at = clock_timestamp()
|
||||
WHERE id = ${battle.id}
|
||||
`;
|
||||
return resultsPath;
|
||||
}
|
||||
|
||||
// ─── helpers ──────────────────────────────────────────────────────────────
|
||||
|
||||
async function readChatOutput(chatId: string): Promise<string> {
|
||||
const [m] = await sql<{ content: string | null }[]>`
|
||||
SELECT content FROM messages
|
||||
WHERE chat_id = ${chatId} AND role = 'assistant'
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
`;
|
||||
return m?.content ?? '';
|
||||
}
|
||||
|
||||
async function loadBattle(battleId: string): Promise<BattleRow | null> {
|
||||
const [b] = await sql<BattleRow[]>`
|
||||
SELECT id, project_id, battle_type, prompt, status, results_path, created_at
|
||||
FROM battles WHERE id = ${battleId}
|
||||
`;
|
||||
return b ?? null;
|
||||
}
|
||||
|
||||
async function loadContestants(battleId: string): Promise<ContestantRow[]> {
|
||||
return sql<ContestantRow[]>`
|
||||
SELECT id, battle_id, identity, model, lane, task_id, worktree_id, status
|
||||
FROM contestants WHERE battle_id = ${battleId}
|
||||
ORDER BY created_at ASC
|
||||
`;
|
||||
}
|
||||
|
||||
function publishContestantFrame(
|
||||
battleId: string,
|
||||
contestantId: string,
|
||||
extra: Record<string, unknown>,
|
||||
): void {
|
||||
publishUser({
|
||||
type: 'contestant_updated',
|
||||
battle_id: battleId,
|
||||
contestant_id: contestantId,
|
||||
...extra,
|
||||
});
|
||||
}
|
||||
|
||||
// ─── initResume ───────────────────────────────────────────────────────────
|
||||
|
||||
async function initResume(): Promise<void> {
|
||||
const battles = await sql<BattleRow[]>`
|
||||
SELECT id, project_id, battle_type, prompt, status, results_path, created_at
|
||||
FROM battles WHERE status = 'running'
|
||||
`;
|
||||
if (battles.length === 0) return;
|
||||
log.info({ count: battles.length }, 'arena-runner: resuming in-flight battles on startup');
|
||||
for (const battle of battles) {
|
||||
await resumeBattle(battle).catch((err) => {
|
||||
log.error({ err: errMsg(err), battleId: battle.id }, 'arena-runner: initResume failed for battle');
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function resumeBattle(battle: BattleRow): Promise<void> {
|
||||
const contestants = await loadContestants(battle.id);
|
||||
|
||||
const taskIds = contestants.map((c) => c.task_id).filter((id): id is string => id !== null);
|
||||
const taskStates = new Map<string, string>();
|
||||
if (taskIds.length > 0) {
|
||||
const tasks = await sql<{ id: string; state: string }[]>`
|
||||
SELECT id, state FROM tasks WHERE id = ANY(${taskIds})
|
||||
`;
|
||||
for (const t of tasks) taskStates.set(t.id, t.state);
|
||||
}
|
||||
|
||||
const decisions = reconcileContestants(
|
||||
contestants.map((c) => ({ contestantId: c.id, taskId: c.task_id, status: c.status })),
|
||||
taskStates,
|
||||
);
|
||||
|
||||
for (const decision of decisions) {
|
||||
if (decision.action === 'keep') continue;
|
||||
const contestant = contestants.find((c) => c.id === decision.contestantId)!;
|
||||
await applyResumeDecision(battle, contestant, decision.action);
|
||||
}
|
||||
|
||||
// Re-check completion after applying decisions.
|
||||
const updated = await loadContestants(battle.id);
|
||||
if (isBattleComplete(updated)) {
|
||||
await completeBattle(battle.id);
|
||||
} else {
|
||||
// Advance local lane in case a slot opened up.
|
||||
void advanceLocalLane(battle.id);
|
||||
}
|
||||
|
||||
log.info({ battleId: battle.id }, 'arena-runner: battle resumed');
|
||||
}
|
||||
|
||||
async function applyResumeDecision(
|
||||
battle: BattleRow,
|
||||
contestant: ContestantRow,
|
||||
action: ContestantResumeAction,
|
||||
): Promise<void> {
|
||||
switch (action) {
|
||||
case 'keep': break;
|
||||
|
||||
case 'mark-done': {
|
||||
const taskRow = contestant.task_id
|
||||
? (await sql<{ started_at: Date | null; ended_at: Date | null; cost_tokens: number | null; chat_id: string | null }[]>`
|
||||
SELECT started_at, ended_at, cost_tokens, chat_id FROM tasks WHERE id = ${contestant.task_id}`)[0]
|
||||
: null;
|
||||
const endedAt = taskRow?.ended_at ?? new Date();
|
||||
const startedAt = taskRow?.started_at ?? endedAt;
|
||||
const bench = computeBenchmark(startedAt, endedAt, taskRow?.cost_tokens ?? null, contestant.lane);
|
||||
const output = taskRow?.chat_id ? await readChatOutput(taskRow.chat_id) : '';
|
||||
const resultPath = battle
|
||||
? await writeContestantResults(battle, contestant, output, bench).catch((err) => {
|
||||
log.warn({ err: errMsg(err), contestantId: contestant.id }, 'arena-runner: resume result write failed');
|
||||
return null;
|
||||
})
|
||||
: null;
|
||||
await sql`
|
||||
UPDATE contestants
|
||||
SET status = 'done',
|
||||
duration_ms = ${Math.round(bench.durationMs)},
|
||||
tokens_per_sec = ${bench.tokensPerSec},
|
||||
result_path = ${resultPath},
|
||||
updated_at = clock_timestamp()
|
||||
WHERE id = ${contestant.id}
|
||||
`;
|
||||
break;
|
||||
}
|
||||
|
||||
case 'mark-error':
|
||||
await sql`
|
||||
UPDATE contestants
|
||||
SET status = 'error', error = 'task failed before callback',
|
||||
updated_at = clock_timestamp()
|
||||
WHERE id = ${contestant.id}
|
||||
`;
|
||||
break;
|
||||
|
||||
case 'mark-cancelled':
|
||||
await sql`
|
||||
UPDATE contestants
|
||||
SET status = 'error', error = 'cancelled before callback',
|
||||
updated_at = clock_timestamp()
|
||||
WHERE id = ${contestant.id}
|
||||
`;
|
||||
break;
|
||||
|
||||
case 're-dispatch': {
|
||||
const { taskId } = await dispatch({
|
||||
projectId: battle.project_id,
|
||||
contestantId: contestant.id,
|
||||
prompt: battle.prompt,
|
||||
identity: contestant.identity,
|
||||
model: contestant.model,
|
||||
battleType: battle.battle_type,
|
||||
});
|
||||
await sql`
|
||||
UPDATE contestants
|
||||
SET task_id = ${taskId}, updated_at = clock_timestamp()
|
||||
WHERE id = ${contestant.id}
|
||||
`;
|
||||
log.info(
|
||||
{ battleId: battle.id, contestantId: contestant.id, taskId },
|
||||
'arena-runner: contestant re-dispatched on resume',
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── cancelBattle ─────────────────────────────────────────────────────────
|
||||
|
||||
async function cancelBattle(battleId: string): Promise<{ cancelled: boolean; taskIds: string[] }> {
|
||||
const updated = await sql`
|
||||
UPDATE battles SET status = 'cancelled', updated_at = clock_timestamp()
|
||||
WHERE id = ${battleId} AND status = 'running'
|
||||
`;
|
||||
if (updated.count === 0) return { cancelled: false, taskIds: [] };
|
||||
|
||||
// Mark all non-terminal contestants cancelled and collect in-flight task_ids.
|
||||
const contestants = await sql<{ id: string; task_id: string | null; status: string }[]>`
|
||||
SELECT id, task_id, status FROM contestants
|
||||
WHERE battle_id = ${battleId} AND status NOT IN ('done', 'error')
|
||||
`;
|
||||
|
||||
if (contestants.length > 0) {
|
||||
await sql`
|
||||
UPDATE contestants
|
||||
SET status = 'error', error = 'battle cancelled', updated_at = clock_timestamp()
|
||||
WHERE battle_id = ${battleId} AND status NOT IN ('done', 'error')
|
||||
`;
|
||||
for (const c of contestants) {
|
||||
publishContestantFrame(battleId, c.id, {
|
||||
status: 'error',
|
||||
error: 'battle cancelled',
|
||||
battle_status: 'cancelled',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const taskIds = contestants
|
||||
.filter(
|
||||
(c): c is typeof c & { task_id: string } =>
|
||||
c.task_id !== null && c.status === 'running',
|
||||
)
|
||||
.map((c) => c.task_id);
|
||||
|
||||
log.info({ battleId }, 'arena-runner: battle cancelled by request');
|
||||
return { cancelled: true, taskIds };
|
||||
}
|
||||
|
||||
// ─── triggerAnalysis (Phase 5 seam) ──────────────────────────────────────
|
||||
|
||||
async function triggerAnalysis(battleId: string): Promise<{ triggered: boolean }> {
|
||||
const battle = await loadBattle(battleId);
|
||||
if (!battle) return { triggered: false };
|
||||
log.info({ battleId }, 'arena-runner: triggerAnalysis requested');
|
||||
// Calls the injected onBattleComplete seam — Phase 5 replaces this with the
|
||||
// real two-stage digest→judge analyzer (see ADR 0002 + plan Phase 5).
|
||||
onBattleComplete(battleId);
|
||||
return { triggered: true };
|
||||
}
|
||||
|
||||
// ─── startCrossExam (Phase 5 seam) ───────────────────────────────────────
|
||||
|
||||
async function startCrossExam(
|
||||
battleId: string,
|
||||
opts: { identity: string; model: string },
|
||||
): Promise<{ crossExamId: string }> {
|
||||
const [row] = await sql<{ id: string }[]>`
|
||||
INSERT INTO cross_examinations (battle_id, identity, model)
|
||||
VALUES (${battleId}, ${opts.identity}, ${opts.model})
|
||||
RETURNING id
|
||||
`;
|
||||
const crossExamId = row!.id;
|
||||
log.info({ battleId, crossExamId, ...opts }, 'arena-runner: cross-exam inserted, triggering analyzer');
|
||||
if (onCrossExamStart) {
|
||||
try {
|
||||
onCrossExamStart({ battleId, crossExamId, identity: opts.identity, model: opts.model });
|
||||
} catch (err) {
|
||||
log.error({ err: err instanceof Error ? err.message : String(err), battleId, crossExamId }, 'arena-runner: onCrossExamStart threw');
|
||||
}
|
||||
}
|
||||
return { crossExamId };
|
||||
}
|
||||
|
||||
// ─── setWinner (user override) ────────────────────────────────────────────
|
||||
|
||||
async function setWinner(
|
||||
battleId: string,
|
||||
winnerId: string | null,
|
||||
): Promise<{ ok: boolean; notFound?: boolean; invalidContestant?: boolean }> {
|
||||
const [row] = await sql<{ id: string }[]>`SELECT id FROM battles WHERE id = ${battleId}`;
|
||||
if (!row) return { ok: false, notFound: true };
|
||||
|
||||
if (winnerId !== null) {
|
||||
const [c] = await sql<{ id: string }[]>`
|
||||
SELECT id FROM contestants WHERE id = ${winnerId} AND battle_id = ${battleId}
|
||||
`;
|
||||
if (!c) return { ok: false, invalidContestant: true };
|
||||
}
|
||||
|
||||
await sql`
|
||||
UPDATE battles SET winner_contestant_id = ${winnerId}, updated_at = clock_timestamp()
|
||||
WHERE id = ${battleId}
|
||||
`;
|
||||
publishUser({ type: 'battle_updated', battle_id: battleId, winner_contestant_id: winnerId });
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
return { startBattle, handleTaskTerminal, initResume, cancelBattle, triggerAnalysis, startCrossExam, setWinner };
|
||||
}
|
||||
|
||||
function errMsg(e: unknown): string {
|
||||
return e instanceof Error ? e.message : String(e);
|
||||
}
|
||||
Reference in New Issue
Block a user