Files
boocode/apps/coder/src/services/dispatcher.ts
indifferentketchup aa3797e356 feat(coder): v2.6 Phase 3 — lifecycle hardening (idle evict, crash recovery, worktree reaper)
Idle TTL eviction per (chat,agent) + LRU cap (never a busy backend); pure lifecycle-decisions.ts (TDD). Crash recovery lifts openchamber's health-monitor + busy-aware-restart + stale-grace state machine into opencode-server.ts (+ port reclaim) and warm-acp.ts; opencode crash -> fresh sessions, ACP -> re-session/new. F.1 turn-guard + U.6 usage preserved (their tests pass). Orphan worktree reaper (1h grace, superset-style dirty/unpushed preflight, Paseo soft-delete) + close hooks + diff re-baseline after apply_pending. 35 new tests + DB-opt-in reconnect test; 215 coder tests pass; tsc + build clean. Completes v2.6. Follow-ups out of scope: apps/server close-hook caller, 3.7 DiffPanel staging hint, live smokes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 01:10:09 +00:00

1107 lines
42 KiB
TypeScript
Raw Blame History

import type { Sql } from '../db.js';
import type { FastifyBaseLogger } from 'fastify';
import type { Broker } from '@boocode/server/broker';
import type { WsFrame } from '@boocode/server/ws-frames';
import type { Config } from '../config.js';
import { createWorktree, diffWorktree, cleanupWorktree, ensureSessionWorktree } from './worktrees.js';
import { makeDcpStreamStripper } from './dcp-strip.js';
import { dispatchViaAcp } from './acp-dispatch.js';
import { getResolvedRegistry } from './provider-config-registry.js';
import { dispatchViaPty } from './pty-dispatch.js';
import { clearTaskCommands, setTaskCommands } from './agent-commands-cache.js';
import { getManifestCommands } from './provider-commands.js';
import { persistExternalAgentTurn } from './agent-turn-persist.js';
import { snapshotToWireToolCall, type AcpToolSnapshot } from './acp-tool-snapshot.js';
import { agentPool, OPENCODE_POOL_KEY } from './agent-pool.js';
import { OpenCodeServerBackend } from './backends/opencode-server.js';
import { WarmAcpBackend } from './backends/warm-acp.js';
import { shouldUseWarmBackend } from './backends/warm-acp-routing.js';
import type { AgentBackend, AgentEvent } from './agent-backend.js';
interface InferenceRunner {
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
hasActive: (chatId: string) => boolean;
}
interface Deps {
sql: Sql;
inference: InferenceRunner;
broker: Broker;
log: FastifyBaseLogger;
config: Config;
}
// LISTEN/NOTIFY ('tasks_new') is the fast path — the dispatcher reacts to new
// tasks immediately. The poll is only a safety net for notifications missed
// during a listen-connection drop (porsager auto-reconnects), so it can stay slow.
const POLL_INTERVAL_MS = 2_000;
const COMPLETION_POLL_MS = 2_000;
export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<void> } {
const { sql, inference, broker, log, config } = deps;
let timer: ReturnType<typeof setInterval> | null = null;
let listener: { unlisten: () => Promise<void> } | null = null;
let polling = false;
let stopping = false;
// v2.6 (1.9): per-session in-flight registry replaces the global `running`
// boolean. Key = session_id (or `task:<id>` for sessionless tasks). Sessions
// without an in-flight turn run concurrently; within a session, strictly one
// turn at a time.
const inflight = new Map<string, Promise<void>>();
// Shared entry point for both the poll timer and the NOTIFY listener. poll()'s
// `polling`/`stopping` guard makes this safe to call concurrently — a notify
// arriving mid-poll returns immediately and never double-dispatches.
function triggerPoll(reason: string): void {
poll().catch((err) => {
log.error({ err, reason }, 'dispatcher: poll error');
});
}
function concurrencyKey(task: { id: string; session_id: string | null }): string {
return task.session_id ?? `task:${task.id}`;
}
async function poll(): Promise<void> {
// `polling` serializes poll() execution itself (timer + NOTIFY can fire
// concurrently) so we never double-select a task. It does NOT serialize task
// execution — that's what `inflight` (keyed per session) governs.
if (polling || stopping) return;
polling = true;
try {
// Oldest-first; start every pending task whose session isn't already busy.
const rows = await sql<{
id: string;
project_id: string;
input: string;
agent: string | null;
model: string | null;
mode_id: string | null;
thinking_option_id: string | null;
session_id: string | null;
chat_id: string | null;
}[]>`
SELECT id, project_id, input, agent, model, mode_id, thinking_option_id, session_id, chat_id
FROM tasks
WHERE state = 'pending'
ORDER BY created_at
LIMIT 50
`;
for (const task of rows) {
if (stopping) break;
const key = concurrencyKey(task);
if (inflight.has(key)) continue; // this session already has an in-flight turn
// Register synchronously (before any await) so a later row in this pass
// with the same key is skipped and a concurrent poll can't re-pick it.
const p = runTask(task).finally(() => {
inflight.delete(key);
});
inflight.set(key, p);
}
} finally {
polling = false;
}
}
async function runTask(task: {
id: string;
project_id: string;
input: string;
agent: string | null;
model: string | null;
mode_id: string | null;
thinking_option_id: string | null;
session_id: string | null;
chat_id: string | null;
}): Promise<void> {
const taskId = task.id;
// Determine execution path: if agent is specified AND exists in available_agents → Path B
if (task.agent) {
const [agentRow] = await sql<{ name: string; supports_acp: boolean; install_path: string | null }[]>`
SELECT name, supports_acp, install_path FROM available_agents WHERE name = ${task.agent}
`;
if (agentRow) {
// v2.6 (1.7): opencode routes to its warm HTTP-server backend.
// v2.6 Phase 2 (2.4): goose/qwen route to the warm ACP backend WHEN the
// task came from a real chat tab (session_id + chat_id) — shouldUseWarmBackend.
// Session-less creators (arena, MCP, new_task, generic /api/tasks) keep the
// existing one-shot worktree-per-task ACP/PTY path untouched.
if (task.agent === 'opencode') {
await runOpenCodeServerTask(task, agentRow.install_path);
} else if (shouldUseWarmBackend(task)) {
await runWarmAcpTask(task, agentRow.install_path);
} else {
await runExternalAgent(task, agentRow.supports_acp, agentRow.install_path);
}
return;
}
// Agent specified but not available — fall through to Path A with a warning
log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native');
}
// Path A — native inference (existing behavior)
await runNativeInference(task);
}
// ─── Path A: Native Inference ───────────────────────────────────────────────
async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; session_id: string | null }): Promise<void> {
const taskId = task.id;
log.info({ taskId }, 'dispatcher: starting task (path A — native)');
try {
// Mark running
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = 'native'
WHERE id = ${taskId}
`;
// Create session + chat for this task
const model = task.model ?? config.DEFAULT_MODEL;
const sessionName = 'Task: ' + task.input.slice(0, 40);
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${task.project_id}, ${sessionName}, ${model}, 'open')
RETURNING id
`;
const sessionId = session!.id;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'Task execution', 'open')
RETURNING id
`;
const chatId = chat!.id;
// Link task to session
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
// Create user message + streaming assistant
await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
RETURNING id
`;
const [assistantMsg] = await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
const assistantId = assistantMsg!.id;
// Enqueue inference
inference.enqueue(sessionId, chatId, assistantId, 'default');
// Wait for inference to complete (poll message status)
const finalStatus = await waitForCompletion(assistantId);
if (stopping) {
await sql`
UPDATE tasks
SET state = 'cancelled', ended_at = clock_timestamp()
WHERE id = ${taskId}
`;
return;
}
// Aggregate token cost for the task's session
const [costRow] = await sql<{ total: number | null }[]>`
SELECT SUM(tokens_used)::int AS total
FROM messages
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
`;
const costTokens = costRow?.total ?? null;
if (finalStatus === 'complete') {
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
const summary = (msg?.content ?? '').slice(0, 500);
await sql`
UPDATE tasks
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens}
WHERE id = ${taskId}
`;
log.info({ taskId, costTokens }, 'dispatcher: task completed (native)');
} else {
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
const summary = (msg?.content ?? 'Inference failed').slice(0, 500);
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens}
WHERE id = ${taskId}
`;
log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)');
}
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {});
}
}
// ─── Path B: External Agent Dispatch ──────<E29480><E29480><EFBFBD>─────────────────────────────────
async function runExternalAgent(
task: {
id: string;
project_id: string;
input: string;
agent: string | null;
model: string | null;
mode_id: string | null;
thinking_option_id: string | null;
session_id: string | null;
},
supportsAcp: boolean,
installPath: string | null,
): Promise<void> {
const taskId = task.id;
const agent = task.agent!;
const executionPath = supportsAcp ? 'acp' : 'pty';
log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)');
// Resolve the project's root path
const [project] = await sql<{ path: string | null }[]>`
SELECT path FROM projects WHERE id = ${task.project_id}
`;
const projectPath = project?.path;
if (!projectPath) {
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree'
WHERE id = ${taskId}
`;
return;
}
// Create an abort controller for this task
const ac = new AbortController();
try {
// Mark running
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath}
WHERE id = ${taskId}
`;
let sessionId: string;
let chatId: string;
if (task.session_id) {
sessionId = task.session_id;
const chats = await sql<{ id: string }[]>`
SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1
`;
if (chats.length === 0) {
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'External agent execution', 'open')
RETURNING id
`;
chatId = chat!.id;
} else {
chatId = chats[0]!.id;
}
} else {
const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`;
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open')
RETURNING id
`;
sessionId = session!.id;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'External agent execution', 'open')
RETURNING id
`;
chatId = chat!.id;
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
}
if (!task.session_id) {
await sql`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
`;
}
// Step 1: Create worktree
log.info({ taskId, projectPath }, 'dispatcher: creating worktree');
const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal });
log.info({ taskId, worktreePath }, 'dispatcher: worktree created');
// Step 2: Dispatch to agent
let outputSummary: string;
let assistantContent = '';
let acpReasoning = '';
const [assistantMsg] = await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
const assistantId = assistantMsg!.id;
broker.publishFrame(sessionId, {
type: 'message_started',
message_id: assistantId,
chat_id: chatId,
role: 'assistant',
} as WsFrame);
const manifestCommands = getManifestCommands(agent);
if (manifestCommands.length > 0) {
setTaskCommands(taskId, manifestCommands);
broker.publishFrame(sessionId, {
type: 'agent_commands',
task_id: taskId,
session_id: sessionId,
commands: manifestCommands,
} as WsFrame);
}
if (supportsAcp) {
const result = await dispatchViaAcp({
agent,
resolved: getResolvedRegistry().get(agent),
task: task.input,
worktreePath,
installPath: installPath ?? undefined,
model: task.model ?? undefined,
modeId: task.mode_id ?? undefined,
thinkingOptionId: task.thinking_option_id ?? undefined,
taskId,
sessionId,
chatId,
messageId: assistantId,
broker,
signal: ac.signal,
log,
});
assistantContent = result.output.slice(0, 50_000);
acpReasoning = result.reasoningText.slice(0, 200_000);
outputSummary = result.output.slice(0, 500);
await persistExternalAgentTurn(sql, assistantId, result.toolSnapshots, acpReasoning);
} else {
const result = await dispatchViaPty({
agent,
task: task.input,
worktreePath,
installPath: installPath ?? undefined,
model: task.model ?? undefined,
modeId: task.mode_id ?? undefined,
thinkingOptionId: task.thinking_option_id ?? undefined,
signal: ac.signal,
log,
});
assistantContent = (result.stdout || result.stderr || '(no output)').slice(0, 50_000);
outputSummary = (result.stdout || result.stderr).slice(0, 500);
if (assistantContent) {
broker.publishFrame(sessionId, {
type: 'delta',
message_id: assistantId,
chat_id: chatId,
content: assistantContent,
} as WsFrame);
}
}
await sql`
UPDATE messages
SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp()
WHERE id = ${assistantId}
`;
broker.publishFrame(sessionId, {
type: 'message_complete',
message_id: assistantId,
chat_id: chatId,
} as WsFrame);
if (stopping) {
await sql`
UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}
`;
await cleanupWorktree(projectPath, taskId);
return;
}
// Step 3: Diff the worktree and queue pending changes
log.info({ taskId }, 'dispatcher: diffing worktree');
const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal });
if (diff) {
// Queue a single pending_change entry with the full unified diff, stamped
// with the dispatched agent for DiffPanel attribution (v2.6 Phase 1-UX).
await sql`
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent)
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent})
`;
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change');
} else {
log.info({ taskId }, 'dispatcher: no changes detected in worktree');
}
// Step 4: Cleanup worktree
await cleanupWorktree(projectPath, taskId);
// Step 5: Aggregate token cost
const [extCostRow] = await sql<{ total: number | null }[]>`
SELECT SUM(tokens_used)::int AS total
FROM messages
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
`;
const extCostTokens = extCostRow?.total ?? null;
// Step 6: Mark task completed
await sql`
UPDATE tasks
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens}
WHERE id = ${taskId}
`;
log.info({ taskId, agent, costTokens: extCostTokens }, 'dispatcher: task completed (external)');
clearTaskCommands(taskId);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {});
// Best-effort cleanup
await cleanupWorktree(projectPath, taskId);
clearTaskCommands(taskId);
}
}
// ─── Path B (opencode): warm OpenCode server backend (v2.6 1.7 + 1.10) ───────
// OpenCode runs ONE server per BooCoder process, shared across all sessions
// (the backend multiplexes sessions internally), so it's pooled under a fixed
// key (OPENCODE_POOL_KEY, shared with the lifecycle close-hook) rather than
// per-session. Warm ACP backends (Phase 2) are per (chat, agent).
function getOpenCodeBackend(installPath: string | null): AgentBackend {
let backend = agentPool.get(OPENCODE_POOL_KEY, 'opencode');
if (!backend) {
backend = new OpenCodeServerBackend({ sql, log, opencodeBinary: installPath ?? 'opencode' });
agentPool.register(OPENCODE_POOL_KEY, 'opencode', backend);
}
return backend;
}
async function runOpenCodeServerTask(
task: {
id: string;
project_id: string;
input: string;
agent: string | null;
model: string | null;
mode_id: string | null;
thinking_option_id: string | null;
session_id: string | null;
chat_id: string | null;
},
installPath: string | null,
): Promise<void> {
const taskId = task.id;
const agent = 'opencode';
log.info({ taskId, agent }, 'dispatcher: starting task (path B — opencode server)');
const [project] = await sql<{ path: string | null }[]>`
SELECT path FROM projects WHERE id = ${task.project_id}
`;
const projectPath = project?.path;
if (!projectPath) {
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree'
WHERE id = ${taskId}
`;
return;
}
const ac = new AbortController();
try {
// execution_path = 'acp' — the schema CHECK has no 'opencode_server' value
// (schema is frozen at Phase 0); the warm-vs-one-shot distinction lives in
// agent_sessions.backend. Reuse the closest existing value.
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp'
WHERE id = ${taskId}
`;
// Resolve session + chat. P1.5-b: the chat (tab) is the context key, so the
// chat_id MUST be non-null and stable before ensureSession. The coder message
// route + skills route stamp task.chat_id with the frontend tab's chat — use
// it directly. Session-less creators (arena, MCP, new_task, generic
// /api/tasks) leave it null; fall back to resolving/creating a real chat so
// ensureSession never receives a degenerate (null, agent) key.
let sessionId: string;
let chatId: string;
if (task.chat_id && task.session_id) {
sessionId = task.session_id;
chatId = task.chat_id;
} else if (task.session_id) {
sessionId = task.session_id;
const chats = await sql<{ id: string }[]>`
SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1
`;
if (chats.length === 0) {
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'External agent execution', 'open')
RETURNING id
`;
chatId = chat!.id;
} else {
chatId = chats[0]!.id;
}
} else {
const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`;
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open')
RETURNING id
`;
sessionId = session!.id;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'External agent execution', 'open')
RETURNING id
`;
chatId = chat!.id;
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
}
if (!task.session_id) {
await sql`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
`;
}
// Persistent, session-keyed worktree (shared across turns; NOT torn down
// per turn — Phase 3 reaps it). Captures base_commit for a stable diff.
const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, {
signal: ac.signal,
});
log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready');
const [assistantMsg] = await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
const assistantId = assistantMsg!.id;
broker.publishFrame(sessionId, {
type: 'message_started',
message_id: assistantId,
chat_id: chatId,
role: 'assistant',
} as WsFrame);
const manifestCommands = getManifestCommands(agent);
if (manifestCommands.length > 0) {
setTaskCommands(taskId, manifestCommands);
broker.publishFrame(sessionId, {
type: 'agent_commands',
task_id: taskId,
session_id: sessionId,
commands: manifestCommands,
} as WsFrame);
}
// Accumulate the turn's stream for persistence + the final message content.
const textChunks: string[] = [];
const reasoningChunks: string[] = [];
const toolSnaps = new Map<string, AcpToolSnapshot>();
// opencode's dcp plugin appends <dcp-message-id>…</dcp-message-id> to the
// text, streamed split across deltas — a per-chunk regex misses it (see
// dcp-strip.ts). Buffer text through a cross-chunk stripper so neither the
// live `delta` frames nor the persisted content ever carry the tag.
const dcp = makeDcpStreamStripper();
// Map transport-agnostic AgentEvents → the SAME WS frames the ACP path emits.
// This boundary is where message_id/chat_id get attached (the backend never
// owns them).
const onEvent = (e: AgentEvent): void => {
switch (e.type) {
case 'text': {
const safe = dcp.push(e.text);
if (safe) {
textChunks.push(safe);
broker.publishFrame(sessionId, {
type: 'delta',
message_id: assistantId,
chat_id: chatId,
content: safe,
} as WsFrame);
}
break;
}
case 'reasoning':
reasoningChunks.push(e.text);
broker.publishFrame(sessionId, {
type: 'reasoning_delta',
message_id: assistantId,
chat_id: chatId,
content: e.text,
} as WsFrame);
break;
case 'tool_call':
case 'tool_update':
toolSnaps.set(e.toolCall.toolCallId, e.toolCall);
broker.publishFrame(sessionId, {
type: 'tool_call',
message_id: assistantId,
chat_id: chatId,
tool_call: snapshotToWireToolCall(e.toolCall),
} as WsFrame);
break;
case 'commands':
// opencode-server doesn't emit these today; ignore if it ever does.
break;
}
};
// opencode expects provider-prefixed model ids (e.g. 'llama-swap/qwen3.6-35b…').
// DEFAULT_MODEL is bare (no prefix) because native inference uses it directly
// against llama-swap. Coalesce empty string (frontend sends '' when no models
// listed) and prefix bare ids so parseModel always succeeds.
const rawModel = (task.model && task.model.trim()) || config.DEFAULT_MODEL;
const model = rawModel.includes('/') ? rawModel : `llama-swap/${rawModel}`;
const backend = getOpenCodeBackend(installPath);
const handle = await backend.ensureSession(sessionId, {
agent,
model,
chatId,
worktreePath,
worktreeId,
projectId: task.project_id,
});
const result = await backend.prompt(handle, task.input, {
worktreePath,
model,
signal: ac.signal,
onEvent,
});
// Phase 3: keep the pooled backend's slot warm across this (possibly long)
// turn so the idle sweep measures from turn END, not start.
agentPool.touch(OPENCODE_POOL_KEY, agent);
// Flush any text held back mid-tag at stream end (complete tags stripped).
const dcpTail = dcp.flush();
if (dcpTail) {
textChunks.push(dcpTail);
broker.publishFrame(sessionId, {
type: 'delta',
message_id: assistantId,
chat_id: chatId,
content: dcpTail,
} as WsFrame);
}
const assistantContent = textChunks.join('').slice(0, 50_000);
const reasoningText = reasoningChunks.join('').slice(0, 200_000);
const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'opencode turn failed').slice(0, 500);
await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText);
await sql`
UPDATE messages
SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp()
WHERE id = ${assistantId}
`;
broker.publishFrame(sessionId, {
type: 'message_complete',
message_id: assistantId,
chat_id: chatId,
} as WsFrame);
if (stopping) {
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
return; // worktree persists (no cleanup); backend stays warm
}
// 1.10: diff the persistent worktree against its captured baseline and
// SUPERSEDE the session's prior pending row (latest-wins, one accumulating
// diff) instead of stacking. Stamp agent for DiffPanel attribution.
const diff = await diffWorktree(worktreePath, projectPath, {
signal: ac.signal,
baseRef: baseCommit ?? 'HEAD',
});
if (diff) {
await sql`
DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending'
`;
await sql`
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent)
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent})
`;
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change');
} else {
log.info({ taskId }, 'dispatcher: no changes detected in session worktree');
}
// NO worktree cleanup — it's persistent (Phase 3 reaps it). Backend stays warm.
const [extCostRow] = await sql<{ total: number | null }[]>`
SELECT SUM(tokens_used)::int AS total
FROM messages
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
`;
const extCostTokens = extCostRow?.total ?? null;
const finalState = result.ok ? 'completed' : 'failed';
await sql`
UPDATE tasks
SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens}
WHERE id = ${taskId}
`;
log.info({ taskId, agent, finalState, costTokens: extCostTokens }, 'dispatcher: task finished (opencode server)');
clearTaskCommands(taskId);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, agent, err: errMsg }, 'dispatcher: opencode server error');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {});
clearTaskCommands(taskId);
// No worktree cleanup (persistent); backend stays warm for the next turn.
}
}
// ─── Path B (warm ACP): goose / qwen warm backend (v2.6 Phase 2) ─────────────
// Warm ACP backends are per (chat, agent): each owns ONE stdio process + ACP
// connection + session. Pool key = chatId; the AgentPool's secondary key is the
// agent. This mirrors agent_sessions' (chat_id, agent) PK.
function getWarmAcpBackend(chatId: string, agent: string, installPath: string | null): WarmAcpBackend {
let backend = agentPool.get(chatId, agent);
if (!backend) {
backend = new WarmAcpBackend({
sql,
log,
chatId,
agent,
installPath,
resolved: getResolvedRegistry().get(agent),
});
agentPool.register(chatId, agent, backend);
}
return backend as WarmAcpBackend;
}
async function runWarmAcpTask(
task: {
id: string;
project_id: string;
input: string;
agent: string | null;
model: string | null;
mode_id: string | null;
thinking_option_id: string | null;
session_id: string | null;
chat_id: string | null;
},
installPath: string | null,
): Promise<void> {
const taskId = task.id;
const agent = task.agent!;
// shouldUseWarmBackend guarantees both non-null before we get here.
const sessionId = task.session_id!;
const chatId = task.chat_id!;
log.info({ taskId, agent, chatId }, 'dispatcher: starting task (path B — warm ACP)');
const [project] = await sql<{ path: string | null }[]>`
SELECT path FROM projects WHERE id = ${task.project_id}
`;
const projectPath = project?.path;
if (!projectPath) {
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree'
WHERE id = ${taskId}
`;
return;
}
const ac = new AbortController();
try {
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp'
WHERE id = ${taskId}
`;
// Persistent, session-keyed worktree (shared across turns + agents; NOT torn
// down per turn — Phase 3 reaps it). Same as the opencode-server path so a
// chat that switches opencode↔goose↔qwen shares one worktree.
const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, {
signal: ac.signal,
});
log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready (warm ACP)');
const [assistantMsg] = await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
const assistantId = assistantMsg!.id;
broker.publishFrame(sessionId, {
type: 'message_started',
message_id: assistantId,
chat_id: chatId,
role: 'assistant',
} as WsFrame);
const manifestCommands = getManifestCommands(agent);
if (manifestCommands.length > 0) {
setTaskCommands(taskId, manifestCommands);
broker.publishFrame(sessionId, {
type: 'agent_commands',
task_id: taskId,
session_id: sessionId,
commands: manifestCommands,
} as WsFrame);
}
// Accumulate the turn's stream for persistence + the final message content.
const textChunks: string[] = [];
const reasoningChunks: string[] = [];
const toolSnaps = new Map<string, AcpToolSnapshot>();
// Map transport-agnostic AgentEvents → the SAME WS frames the one-shot ACP
// path emits (identical to runOpenCodeServerTask's onEvent). No dcp stripping:
// that's an opencode-plugin artifact; goose/qwen don't emit dcp tags.
const onEvent = (e: AgentEvent): void => {
switch (e.type) {
case 'text':
textChunks.push(e.text);
broker.publishFrame(sessionId, {
type: 'delta',
message_id: assistantId,
chat_id: chatId,
content: e.text,
} as WsFrame);
break;
case 'reasoning':
reasoningChunks.push(e.text);
broker.publishFrame(sessionId, {
type: 'reasoning_delta',
message_id: assistantId,
chat_id: chatId,
content: e.text,
} as WsFrame);
break;
case 'tool_call':
case 'tool_update':
toolSnaps.set(e.toolCall.toolCallId, e.toolCall);
broker.publishFrame(sessionId, {
type: 'tool_call',
message_id: assistantId,
chat_id: chatId,
tool_call: snapshotToWireToolCall(e.toolCall),
} as WsFrame);
break;
case 'commands':
if (e.commands.length > 0) {
setTaskCommands(taskId, e.commands);
broker.publishFrame(sessionId, {
type: 'agent_commands',
task_id: taskId,
session_id: sessionId,
commands: e.commands,
} as WsFrame);
}
break;
}
};
const model = task.model ?? undefined;
const backend = getWarmAcpBackend(chatId, agent, installPath);
const handle = await backend.ensureSession(sessionId, {
agent,
model: model ?? '',
chatId,
worktreePath,
worktreeId,
projectId: task.project_id,
});
const result = await backend.prompt(handle, task.input, {
worktreePath,
model: model ?? '',
signal: ac.signal,
onEvent,
taskId,
modeId: task.mode_id ?? undefined,
});
// Phase 3: keep the pooled (chat,agent) backend warm across the turn.
agentPool.touch(chatId, agent);
const assistantContent = textChunks.join('').slice(0, 50_000);
const reasoningText = reasoningChunks.join('').slice(0, 200_000);
const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'warm ACP turn failed').slice(0, 500);
await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText);
await sql`
UPDATE messages
SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp()
WHERE id = ${assistantId}
`;
broker.publishFrame(sessionId, {
type: 'message_complete',
message_id: assistantId,
chat_id: chatId,
} as WsFrame);
if (stopping) {
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
return; // worktree persists (no cleanup); backend stays warm
}
// Diff the persistent worktree against its captured baseline and SUPERSEDE
// the session's prior pending row (latest-wins) — identical to opencode.
const diff = await diffWorktree(worktreePath, projectPath, {
signal: ac.signal,
baseRef: baseCommit ?? 'HEAD',
});
if (diff) {
await sql`
DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending'
`;
await sql`
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent)
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent})
`;
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change (warm ACP)');
} else {
log.info({ taskId }, 'dispatcher: no changes detected in session worktree (warm ACP)');
}
// NO worktree cleanup — persistent (Phase 3 reaps it). Backend stays warm.
const [extCostRow] = await sql<{ total: number | null }[]>`
SELECT SUM(tokens_used)::int AS total
FROM messages
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
`;
const extCostTokens = extCostRow?.total ?? null;
const finalState = result.ok ? 'completed' : 'failed';
await sql`
UPDATE tasks
SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens}
WHERE id = ${taskId}
`;
log.info({ taskId, agent, finalState }, 'dispatcher: task finished (warm ACP)');
clearTaskCommands(taskId);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, agent, err: errMsg }, 'dispatcher: warm ACP error');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {});
clearTaskCommands(taskId);
// No worktree cleanup (persistent); backend stays warm for the next turn.
}
}
// ─── Helpers ────────────────────────────────────────────────────────────────
async function waitForCompletion(assistantId: string): Promise<string> {
for (;;) {
if (stopping) return 'cancelled';
const [row] = await sql<{ status: string }[]>`
SELECT status FROM messages WHERE id = ${assistantId}
`;
const status = row?.status ?? 'failed';
if (status !== 'streaming') return status;
await sleep(COMPLETION_POLL_MS);
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
return {
start() {
log.info('dispatcher: starting poll loop + tasks_new listener');
// Fallback poll — catches notifications missed while the listen connection
// was down. The fast path is the NOTIFY listener below.
timer = setInterval(() => triggerPoll('interval'), POLL_INTERVAL_MS);
// Fast path: react immediately to new tasks. porsager reserves a dedicated
// connection and auto-resubscribes on reconnect; the onlisten callback
// fires on each (re)subscribe, so we kick a catch-up poll there too to
// sweep up anything inserted during a disconnect.
sql
.listen(
'tasks_new',
() => triggerPoll('notify'),
() => triggerPoll('listen-subscribed'),
)
.then((meta) => {
listener = meta;
})
.catch((err) => {
log.error({ err }, 'dispatcher: failed to LISTEN tasks_new — relying on poll fallback');
});
},
async stop() {
stopping = true;
if (timer) {
clearInterval(timer);
timer = null;
}
if (listener) {
await listener.unlisten().catch((err) => {
log.error({ err }, 'dispatcher: unlisten error');
});
listener = null;
}
if (inflight.size > 0) {
log.info({ count: inflight.size }, 'dispatcher: waiting for in-flight tasks');
await Promise.allSettled([...inflight.values()]);
}
log.info('dispatcher: stopped');
},
};
}