Hashline editing: content-hash anchors for edit_file stale-patch detection.
Pure-JS xxHash32, line hash computation, validation with HashlineMismatchError,
256-entry hash dictionary. 6 files in apps/coder/src/services/hashline/.
Audit hooks: emitHook('tool.execute.after') wired in frame-emitter.ts for
completed/failed tool results. emitHook('turn.end') wired at terminal points
in dispatcher.ts (all 5 run functions: native, external, opencode, warm ACP,
claude SDK). Fire-and-forget, non-blocking.
1760 lines
72 KiB
TypeScript
1760 lines
72 KiB
TypeScript
import type { Sql } from '../db.js';
|
||
import type { FastifyBaseLogger } from 'fastify';
|
||
import type { Broker } from '@boocode/server/broker';
|
||
import type { WsFrame } from '@boocode/contracts/ws-frames';
|
||
import type { Config } from '../config.js';
|
||
import { createWorktree, diffWorktree, cleanupWorktree, ensureSessionWorktree } from './worktrees.js';
|
||
import { asPermissionMode } from './tools/types.js';
|
||
import { createCheckpoint } from './checkpoints.js';
|
||
import { makeDcpStreamStripper } from './dcp-strip.js';
|
||
import { dispatchViaAcp } from './acp-dispatch.js';
|
||
import { getResolvedRegistry } from './provider-config-registry.js';
|
||
import { dispatchViaPty } from './pty-dispatch.js';
|
||
import { clearTaskCommands, setTaskCommands } from './agent-commands-cache.js';
|
||
import { getManifestCommands } from './provider-commands.js';
|
||
import { persistExternalAgentTurn } from './agent-turn-persist.js';
|
||
import { snapshotToWireToolCall, type AcpToolSnapshot } from './acp-tool-snapshot.js';
|
||
import { agentPool, OPENCODE_POOL_KEY } from './agent-pool.js';
|
||
import { OpenCodeServerBackend } from './backends/opencode-server.js';
|
||
import { WarmAcpBackend } from './backends/warm-acp.js';
|
||
import { ClaudeSdkBackend } from './backends/claude-sdk.js';
|
||
import { shouldUseWarmBackend } from './backends/warm-acp-routing.js';
|
||
import { shouldUseClaudeSdk } from './backends/claude-sdk-routing.js';
|
||
import type { AgentBackend, AgentEvent } from './agent-backend.js';
|
||
import { publishAgentStatus } from './agent-status-publish.js';
|
||
import type { AgentStatus } from './normalize-agent-status.js';
|
||
import { createCancelRegistry } from './cancel-registry.js';
|
||
import {
|
||
finalizeStreamingMessage,
|
||
classifyTerminalStatus,
|
||
type TerminalMessageStatus,
|
||
} from './finalize-message.js';
|
||
import { shouldFailOnMissingAgent } from './flow-runner-decisions.js';
|
||
import { emitHook } from '../plugins/host.js';
|
||
|
||
interface InferenceRunner {
|
||
enqueue: (
|
||
sessionId: string,
|
||
chatId: string,
|
||
assistantId: string,
|
||
user: string,
|
||
permissionMode?: 'plan' | 'ask' | 'bypass',
|
||
) => void;
|
||
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
|
||
hasActive: (chatId: string) => boolean;
|
||
}
|
||
|
||
interface Deps {
|
||
sql: Sql;
|
||
inference: InferenceRunner;
|
||
broker: Broker;
|
||
log: FastifyBaseLogger;
|
||
config: Config;
|
||
/**
|
||
* Orchestrator hook (D-2). Fired once per task as it reaches a terminal state
|
||
* (completed | failed | cancelled), AFTER the run-function has written that
|
||
* state. Path-agnostic — it keys off the settled `tasks` row, not any single
|
||
* run-function's terminal branch, so it fires for native + every external
|
||
* path. The flow-runner wires this to advance its `flow_runs`; absent (default)
|
||
* the dispatcher behaves exactly as before. Best-effort: a throw is logged and
|
||
* swallowed so it can never wedge the poll loop.
|
||
*/
|
||
onTaskTerminal?: (taskId: string, state: string) => void;
|
||
}
|
||
|
||
// Terminal task states the orchestrator hook fires on. 'blocked' is excluded —
|
||
// a blocked task is awaiting a human decision, so its runTask promise has not
|
||
// settled yet (the hook only fires after runTask settles).
|
||
const TERMINAL_TASK_STATES = new Set(['completed', 'failed', 'cancelled']);
|
||
|
||
// LISTEN/NOTIFY ('tasks_new') is the fast path — the dispatcher reacts to new
|
||
// tasks immediately. The poll is only a safety net for notifications missed
|
||
// during a listen-connection drop (porsager auto-reconnects), so it can stay slow.
|
||
const POLL_INTERVAL_MS = 2_000;
|
||
const COMPLETION_POLL_MS = 2_000;
|
||
|
||
export function createDispatcher(deps: Deps): {
|
||
cancelExternalTask(taskId: string): boolean;
|
||
start(): void;
|
||
stop(): Promise<void>;
|
||
} {
|
||
const { sql, inference, broker, log, config, onTaskTerminal } = deps;
|
||
let timer: ReturnType<typeof setInterval> | null = null;
|
||
let listener: { unlisten: () => Promise<void> } | null = null;
|
||
let polling = false;
|
||
let stopping = false;
|
||
// v2.6 (1.9): per-session in-flight registry replaces the global `running`
|
||
// boolean. Key = session_id (or `task:<id>` for sessionless tasks). Sessions
|
||
// without an in-flight turn run concurrently; within a session, strictly one
|
||
// turn at a time.
|
||
const inflight = new Map<string, Promise<void>>();
|
||
|
||
// F1: per-task abort registry. Each external run-function registers its per-turn
|
||
// AbortController here (keyed by task id); the cancel route reaches it through the
|
||
// exported `cancelExternalTask`; the run's `.finally` deletes the entry. Native
|
||
// boocode tasks are never registered, so a Stop on one returns false and falls
|
||
// through to the unchanged inference.cancel path.
|
||
const taskControllers = createCancelRegistry();
|
||
|
||
// Shared entry point for both the poll timer and the NOTIFY listener. poll()'s
|
||
// `polling`/`stopping` guard makes this safe to call concurrently — a notify
|
||
// arriving mid-poll returns immediately and never double-dispatches.
|
||
function triggerPoll(reason: string): void {
|
||
poll().catch((err) => {
|
||
log.error({ err, reason }, 'dispatcher: poll error');
|
||
});
|
||
}
|
||
|
||
function concurrencyKey(task: { id: string; session_id: string | null }): string {
|
||
return task.session_id ?? `task:${task.id}`;
|
||
}
|
||
|
||
// agent-status-normalize (#10): publish a normalized per-(chat,agent) status on
|
||
// the session channel. Every external-agent path (warm-acp / opencode / claude-sdk /
|
||
// pty one-shot) reports `working` at turn start, `idle` on clean completion, and
|
||
// `error` on the failure path through this single helper so the four paths stay
|
||
// DRY and consistent. Best-effort — publishAgentStatus never throws.
|
||
function emitAgentStatus(
|
||
sessionId: string,
|
||
chatId: string,
|
||
agent: string,
|
||
status: AgentStatus,
|
||
reason: string,
|
||
): void {
|
||
publishAgentStatus(broker.publishFrame, sessionId, chatId, agent, status, reason);
|
||
}
|
||
|
||
// EmitHook: fire-and-forget turn.end notification. Best-effort — a hook throwing
|
||
// is silently swallowed so it never blocks the dispatch flow.
|
||
function emitTurnEnd(
|
||
sessionId: string,
|
||
taskId: string,
|
||
state: string,
|
||
agent?: string | null,
|
||
model?: string | null,
|
||
outputSummary?: string,
|
||
): void {
|
||
void emitHook('turn.end', {
|
||
sessionId,
|
||
turnSummary: { taskId, state, agent, model: model ?? undefined, outputSummary },
|
||
});
|
||
}
|
||
|
||
// F1 (OCE-001/OCE-002): finalize a streaming assistant message into a terminal
|
||
// state and publish the matching message_complete frame. Best-effort + idempotent
|
||
// (the helper's `WHERE status='streaming'` guard) — a failure here must never mask
|
||
// the original abort/error, so it logs and swallows.
|
||
function finalizeMessage(
|
||
sessionId: string,
|
||
chatId: string,
|
||
assistantId: string,
|
||
status: TerminalMessageStatus,
|
||
model: string | null,
|
||
content?: string,
|
||
): Promise<boolean> {
|
||
return finalizeStreamingMessage(sql, broker.publishFrame, {
|
||
sessionId,
|
||
chatId,
|
||
assistantId,
|
||
status,
|
||
model,
|
||
content,
|
||
}).catch((err) => {
|
||
log.error({ err: err instanceof Error ? err.message : String(err), assistantId }, 'dispatcher: finalizeStreamingMessage failed');
|
||
return false;
|
||
});
|
||
}
|
||
|
||
// F1: the cancel route's reach into an in-flight external run. Idempotent — a
|
||
// double-Stop re-aborts an already-aborted controller (no-op) and a Stop on a
|
||
// finished/native task returns false. Aborting only fires the backend's per-turn
|
||
// cancel (session.abort / session/cancel / interrupt / child.kill); it never kills
|
||
// a warm pool process, so persistent worktrees + pooled backends are preserved.
|
||
function cancelExternalTask(taskId: string): boolean {
|
||
return taskControllers.cancel(taskId);
|
||
}
|
||
|
||
// D-2: notify the orchestrator that a task settled. Re-reads the terminal state
|
||
// the run-function wrote (so this is path-agnostic — it works for native and
|
||
// every external path without instrumenting each terminal branch). Best-effort:
|
||
// a read failure or a callback throw is logged and swallowed.
|
||
function fireTaskTerminal(taskId: string): void {
|
||
if (!onTaskTerminal) return;
|
||
sql<{ state: string }[]>`SELECT state FROM tasks WHERE id = ${taskId}`
|
||
.then((rows) => {
|
||
const state = rows[0]?.state;
|
||
if (state && TERMINAL_TASK_STATES.has(state)) {
|
||
try {
|
||
onTaskTerminal(taskId, state);
|
||
} catch (err) {
|
||
log.error({ err, taskId }, 'dispatcher: onTaskTerminal callback threw');
|
||
}
|
||
}
|
||
})
|
||
.catch((err) => {
|
||
log.error({ err, taskId }, 'dispatcher: onTaskTerminal state read failed');
|
||
});
|
||
}
|
||
|
||
async function poll(): Promise<void> {
|
||
// `polling` serializes poll() execution itself (timer + NOTIFY can fire
|
||
// concurrently) so we never double-select a task. It does NOT serialize task
|
||
// execution — that's what `inflight` (keyed per session) governs.
|
||
if (polling || stopping) return;
|
||
polling = true;
|
||
try {
|
||
// Oldest-first; start every pending task whose session isn't already busy.
|
||
const rows = await sql<{
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
chat_id: string | null;
|
||
}[]>`
|
||
SELECT id, project_id, input, agent, model, mode_id, thinking_option_id, session_id, chat_id
|
||
FROM tasks
|
||
WHERE state = 'pending'
|
||
ORDER BY created_at
|
||
LIMIT 50
|
||
`;
|
||
for (const task of rows) {
|
||
if (stopping) break;
|
||
const key = concurrencyKey(task);
|
||
if (inflight.has(key)) continue; // this session already has an in-flight turn
|
||
// Register synchronously (before any await) so a later row in this pass
|
||
// with the same key is skipped and a concurrent poll can't re-pick it.
|
||
const p = runTask(task).finally(() => {
|
||
inflight.delete(key);
|
||
// F1: drop the abort controller once the run settles. After this, a Stop
|
||
// on the (now-finished) task returns false — cancel-after-exit is safe.
|
||
taskControllers.delete(task.id);
|
||
});
|
||
inflight.set(key, p);
|
||
// D-2: fire the orchestrator hook once the run settles (terminal state
|
||
// written), on both fulfilment and rejection. Detached from `p` so it
|
||
// never affects the inflight lifecycle or stop()'s drain.
|
||
if (onTaskTerminal) {
|
||
void p.then(
|
||
() => fireTaskTerminal(task.id),
|
||
() => fireTaskTerminal(task.id),
|
||
);
|
||
}
|
||
}
|
||
} finally {
|
||
polling = false;
|
||
}
|
||
}
|
||
|
||
async function runTask(task: {
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
chat_id: string | null;
|
||
}): Promise<void> {
|
||
const taskId = task.id;
|
||
|
||
// Determine execution path: if agent is specified AND exists in available_agents → Path B
|
||
if (task.agent) {
|
||
const [agentRow] = await sql<{ name: string; supports_acp: boolean; install_path: string | null }[]>`
|
||
SELECT name, supports_acp, install_path FROM available_agents WHERE name = ${task.agent}
|
||
`;
|
||
if (agentRow) {
|
||
// ORCHESTRATOR READ-ONLY INVARIANT (D-4). A qwen task dispatched in plan
|
||
// mode MUST bind the hard tool-level gate, and only the PTY path applies
|
||
// it (`qwen --approval-mode plan`, pty-dispatch.ts:75 — reads allowed,
|
||
// writes blocked inside the agent binary). The ACP paths set the mode via
|
||
// a session RPC (`setSessionMode`) that is fail-OPEN — a failed/ignored
|
||
// call leaves the agent write-capable — so they are never safe for the
|
||
// read-only guarantee. Force the one-shot PTY path for qwen+plan tasks
|
||
// regardless of available_agents.supports_acp (which probes true for qwen,
|
||
// since `qwen --help` lists `--acp`). This is correct on its own merits
|
||
// too: qwen's ACP bridge is an HTTP daemon, not the stdio ACP that
|
||
// dispatchViaAcp drives (root CLAUDE.md), so PTY is the working qwen path.
|
||
// Scoped to qwen (the orchestrator's only agent) to avoid changing the
|
||
// routing of any other agent; the ACP fail-closed guard (acp-dispatch.ts)
|
||
// backstops a plan-mode task that reaches ACP by any other route.
|
||
if (task.agent === 'qwen' && task.mode_id === 'plan') {
|
||
await runExternalAgent(task, /* supportsAcp */ false, agentRow.install_path);
|
||
return;
|
||
}
|
||
// v2.6 (1.7): opencode routes to its warm HTTP-server backend.
|
||
// v2.6 Phase 2 (2.4): goose/qwen route to the warm ACP backend WHEN the
|
||
// task came from a real chat tab (session_id + chat_id) — shouldUseWarmBackend.
|
||
// Session-less creators (arena, MCP, new_task, generic /api/tasks) keep the
|
||
// existing one-shot worktree-per-task ACP/PTY path untouched.
|
||
if (task.agent === 'opencode') {
|
||
await runOpenCodeServerTask(task, agentRow.install_path);
|
||
} else if (shouldUseClaudeSdk(task)) {
|
||
// claude-sdk-sessionstore #9 (Part 2): env-flagged (CLAUDE_SDK_BACKEND, default
|
||
// OFF) warm Claude-SDK backend for chat-tab claude tasks. When the flag is off
|
||
// (production default) this predicate returns false and claude falls through to
|
||
// the UNCHANGED one-shot PTY runExternalAgent path below.
|
||
await runClaudeSdkTask(task, agentRow.install_path);
|
||
} else if (shouldUseWarmBackend(task)) {
|
||
await runWarmAcpTask(task, agentRow.install_path);
|
||
} else {
|
||
await runExternalAgent(task, agentRow.supports_acp, agentRow.install_path);
|
||
}
|
||
return;
|
||
}
|
||
// Orchestrator (qwen+plan) tasks must NEVER fall through to write-capable
|
||
// native inference — the PTY plan-mode path is the only safe route. Fail
|
||
// hard so the flow-runner propagates a clear error to the run. (H1 fix)
|
||
if (shouldFailOnMissingAgent(task.agent, task.mode_id)) {
|
||
const errMsg = 'orchestrator task cannot run: qwen agent is not available (probe failed or binary missing)';
|
||
log.error({ taskId, agent: task.agent }, `dispatcher: ${errMsg}`);
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg}
|
||
WHERE id = ${taskId}
|
||
`.catch(() => {});
|
||
return;
|
||
}
|
||
// Agent specified but not available — fall through to Path A with a warning
|
||
log.warn({ taskId, agent: task.agent }, 'dispatcher: specified agent not available, falling back to native');
|
||
}
|
||
|
||
// Path A — native inference (existing behavior)
|
||
await runNativeInference(task);
|
||
}
|
||
|
||
// ─── Path A: Native Inference ───────────────────────────────────────────────
|
||
|
||
async function runNativeInference(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null; mode_id: string | null; session_id: string | null }): Promise<void> {
|
||
const taskId = task.id;
|
||
log.info({ taskId }, 'dispatcher: starting task (path A — native)');
|
||
|
||
// Declared before try so the catch block can write it back on the task row.
|
||
let chatId: string | null = null;
|
||
let sessionId: string | undefined;
|
||
|
||
try {
|
||
// Mark running
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'running', started_at = clock_timestamp(), execution_path = 'native'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
|
||
// Session setup: reuse a pre-created session (e.g. Q&A arena contestants
|
||
// whose persona is stamped on the session via agent_id) or create a fresh one.
|
||
const model = task.model ?? config.DEFAULT_MODEL;
|
||
if (task.session_id) {
|
||
sessionId = task.session_id;
|
||
} else {
|
||
const sessionName = 'Task: ' + task.input.slice(0, 40);
|
||
const [session] = await sql<{ id: string }[]>`
|
||
INSERT INTO sessions (project_id, name, model, status)
|
||
VALUES (${task.project_id}, ${sessionName}, ${model}, 'open')
|
||
RETURNING id
|
||
`;
|
||
sessionId = session!.id;
|
||
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
|
||
}
|
||
|
||
const [chat] = await sql<{ id: string }[]>`
|
||
INSERT INTO chats (session_id, name, status)
|
||
VALUES (${sessionId}, 'Task execution', 'open')
|
||
RETURNING id
|
||
`;
|
||
chatId = chat!.id;
|
||
|
||
// Create user message + streaming assistant
|
||
await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
const [assistantMsg] = await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
const assistantId = assistantMsg!.id;
|
||
|
||
// Enqueue inference — pass the native permission gate (plan/ask/bypass)
|
||
// through to the write-tool context. Non-unified mode ids → undefined.
|
||
inference.enqueue(sessionId, chatId, assistantId, 'default', asPermissionMode(task.mode_id));
|
||
|
||
// Wait for inference to complete (poll message status)
|
||
const finalStatus = await waitForCompletion(assistantId);
|
||
|
||
if (stopping) {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'cancelled', ended_at = clock_timestamp()
|
||
WHERE id = ${taskId}
|
||
`;
|
||
if (sessionId) emitTurnEnd(sessionId, taskId, 'cancelled', null, task.model);
|
||
return;
|
||
}
|
||
|
||
// Aggregate token cost for the task's session
|
||
const [costRow] = await sql<{ total: number | null }[]>`
|
||
SELECT SUM(tokens_used)::int AS total
|
||
FROM messages
|
||
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
|
||
`;
|
||
const costTokens = costRow?.total ?? null;
|
||
|
||
if (finalStatus === 'complete') {
|
||
const [msg] = await sql<{ content: string | null }[]>`
|
||
SELECT content FROM messages WHERE id = ${assistantId}
|
||
`;
|
||
const summary = (msg?.content ?? '').slice(0, 500);
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens}, chat_id = ${chatId}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.info({ taskId, costTokens }, 'dispatcher: task completed (native)');
|
||
emitTurnEnd(sessionId, taskId, 'completed', null, task.model, summary);
|
||
} else {
|
||
const [msg] = await sql<{ content: string | null }[]>`
|
||
SELECT content FROM messages WHERE id = ${assistantId}
|
||
`;
|
||
const summary = (msg?.content ?? 'Inference failed').slice(0, 500);
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}, cost_tokens = ${costTokens}, chat_id = ${chatId}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)');
|
||
emitTurnEnd(sessionId, taskId, 'failed', null, task.model, summary);
|
||
}
|
||
} catch (err) {
|
||
const errMsg = err instanceof Error ? err.message : String(err);
|
||
log.error({ taskId, err: errMsg }, 'dispatcher: task error (native)');
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}, chat_id = ${chatId}
|
||
WHERE id = ${taskId}
|
||
`.catch(() => {});
|
||
if (sessionId) emitTurnEnd(sessionId, taskId, 'failed', null, task.model, errMsg);
|
||
}
|
||
}
|
||
|
||
// ─── Path B: External Agent Dispatch ──────<E29480><E29480><EFBFBD>─────────────────────────────────
|
||
|
||
async function runExternalAgent(
|
||
task: {
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
},
|
||
supportsAcp: boolean,
|
||
installPath: string | null,
|
||
): Promise<void> {
|
||
const taskId = task.id;
|
||
const agent = task.agent!;
|
||
const executionPath = supportsAcp ? 'acp' : 'pty';
|
||
|
||
log.info({ taskId, agent, executionPath }, 'dispatcher: starting task (path B — external)');
|
||
|
||
// Resolve the project's root path
|
||
const [project] = await sql<{ path: string | null }[]>`
|
||
SELECT path FROM projects WHERE id = ${task.project_id}
|
||
`;
|
||
const projectPath = project?.path;
|
||
if (!projectPath) {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
return;
|
||
}
|
||
|
||
// F1: register the per-task abort controller so a Stop reaches this run.
|
||
const ac = taskControllers.register(taskId);
|
||
|
||
// #10: hoisted above the try so the catch block can report `error` status with
|
||
// the (chat, agent) key. Empty until resolved below; guarded before use.
|
||
let sessionId = '';
|
||
let chatId = '';
|
||
// F1: hoisted so the catch / abort short-circuit can finalize the streaming
|
||
// assistant row. Empty until the row is created; finalize no-ops on ''.
|
||
let assistantId = '';
|
||
|
||
try {
|
||
// Mark running
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'running', started_at = clock_timestamp(), execution_path = ${executionPath}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
|
||
if (task.session_id) {
|
||
sessionId = task.session_id;
|
||
const chats = await sql<{ id: string }[]>`
|
||
SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1
|
||
`;
|
||
if (chats.length === 0) {
|
||
const [chat] = await sql<{ id: string }[]>`
|
||
INSERT INTO chats (session_id, name, status)
|
||
VALUES (${sessionId}, 'External agent execution', 'open')
|
||
RETURNING id
|
||
`;
|
||
chatId = chat!.id;
|
||
} else {
|
||
chatId = chats[0]!.id;
|
||
}
|
||
} else {
|
||
const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`;
|
||
const [session] = await sql<{ id: string }[]>`
|
||
INSERT INTO sessions (project_id, name, model, status)
|
||
VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open')
|
||
RETURNING id
|
||
`;
|
||
sessionId = session!.id;
|
||
|
||
const [chat] = await sql<{ id: string }[]>`
|
||
INSERT INTO chats (session_id, name, status)
|
||
VALUES (${sessionId}, 'External agent execution', 'open')
|
||
RETURNING id
|
||
`;
|
||
chatId = chat!.id;
|
||
|
||
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
|
||
}
|
||
|
||
if (!task.session_id) {
|
||
await sql`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
|
||
`;
|
||
}
|
||
|
||
// Step 1: Create worktree
|
||
log.info({ taskId, projectPath }, 'dispatcher: creating worktree');
|
||
const worktreePath = await createWorktree(projectPath, taskId, { signal: ac.signal });
|
||
log.info({ taskId, worktreePath }, 'dispatcher: worktree created');
|
||
|
||
// Step 2: Dispatch to agent
|
||
let outputSummary: string;
|
||
let assistantContent = '';
|
||
let acpReasoning = '';
|
||
|
||
const [assistantMsg] = await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
assistantId = assistantMsg!.id;
|
||
|
||
// write-edit-robustness #4: pre-turn worktree checkpoint (best-effort; a
|
||
// failure logs and never breaks dispatch). This path uses a per-task worktree
|
||
// (createWorktree, not the session worktree), so there's no worktrees-table id
|
||
// — pass null for worktreeId, the path is enough for restore's reset.
|
||
await createCheckpoint(
|
||
sql,
|
||
{ chatId, sessionId, worktreeId: null, worktreePath, messageId: assistantId },
|
||
{ signal: ac.signal, log },
|
||
).catch(() => null);
|
||
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_started',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
role: 'assistant',
|
||
} as WsFrame);
|
||
|
||
// #10: external-agent turn begins.
|
||
emitAgentStatus(sessionId, chatId, agent, 'working', 'turn_start');
|
||
|
||
const manifestCommands = getManifestCommands(agent);
|
||
if (manifestCommands.length > 0) {
|
||
setTaskCommands(taskId, manifestCommands);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'agent_commands',
|
||
task_id: taskId,
|
||
session_id: sessionId,
|
||
commands: manifestCommands,
|
||
} as WsFrame);
|
||
}
|
||
|
||
if (supportsAcp) {
|
||
const result = await dispatchViaAcp({
|
||
agent,
|
||
resolved: getResolvedRegistry().get(agent),
|
||
task: task.input,
|
||
worktreePath,
|
||
installPath: installPath ?? undefined,
|
||
model: task.model ?? undefined,
|
||
modeId: task.mode_id ?? undefined,
|
||
thinkingOptionId: task.thinking_option_id ?? undefined,
|
||
taskId,
|
||
sessionId,
|
||
chatId,
|
||
messageId: assistantId,
|
||
broker,
|
||
signal: ac.signal,
|
||
log,
|
||
});
|
||
assistantContent = result.output.slice(0, 50_000);
|
||
acpReasoning = result.reasoningText.slice(0, 200_000);
|
||
outputSummary = result.output.slice(0, 500);
|
||
await persistExternalAgentTurn(sql, assistantId, result.toolSnapshots, acpReasoning);
|
||
} else {
|
||
// v#7 (stream-json): claude + qwen run with --output-format stream-json.
|
||
// Parse the NDJSON live in pty-dispatch and forward AgentEvents here so we
|
||
// publish the SAME live frames the warm-ACP / opencode paths emit (text,
|
||
// reasoning, tool) and persist structured parts. Accumulate for the final
|
||
// message content + persistence; fall back to the opaque stdout slice when
|
||
// nothing parsed (agent ran without the flag, or crashed before emitting).
|
||
const ptyTextChunks: string[] = [];
|
||
const ptyReasoningChunks: string[] = [];
|
||
const ptyToolSnaps = new Map<string, AcpToolSnapshot>();
|
||
|
||
const onPtyEvent = (e: AgentEvent): void => {
|
||
switch (e.type) {
|
||
case 'text':
|
||
ptyTextChunks.push(e.text);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: e.text,
|
||
} as WsFrame);
|
||
break;
|
||
case 'reasoning':
|
||
ptyReasoningChunks.push(e.text);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'reasoning_delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: e.text,
|
||
} as WsFrame);
|
||
break;
|
||
case 'tool_call':
|
||
case 'tool_update':
|
||
ptyToolSnaps.set(e.toolCall.toolCallId, e.toolCall);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'tool_call',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
tool_call: snapshotToWireToolCall(e.toolCall),
|
||
} as WsFrame);
|
||
break;
|
||
case 'commands':
|
||
// stream-json carries no commands today; ignore if it ever does.
|
||
break;
|
||
}
|
||
};
|
||
|
||
const result = await dispatchViaPty({
|
||
agent,
|
||
task: task.input,
|
||
worktreePath,
|
||
installPath: installPath ?? undefined,
|
||
model: task.model ?? undefined,
|
||
modeId: task.mode_id ?? undefined,
|
||
thinkingOptionId: task.thinking_option_id ?? undefined,
|
||
signal: ac.signal,
|
||
log,
|
||
onEvent: onPtyEvent,
|
||
});
|
||
|
||
if (result.streamed) {
|
||
assistantContent = ptyTextChunks.join('').slice(0, 50_000);
|
||
// stream-json text can be empty for a tool-only turn — surface stderr or a
|
||
// placeholder so the message row isn't blank.
|
||
if (!assistantContent) {
|
||
assistantContent = (result.stderr || '(no text output)').slice(0, 50_000);
|
||
}
|
||
outputSummary = (ptyTextChunks.join('') || result.stderr).slice(0, 500);
|
||
acpReasoning = ptyReasoningChunks.join('').slice(0, 200_000);
|
||
await persistExternalAgentTurn(sql, assistantId, [...ptyToolSnaps.values()], acpReasoning);
|
||
} else {
|
||
// Fallback: agent produced no parseable NDJSON (ran without the flag, or
|
||
// crashed). Preserve today's opaque stdout-slice + single delta behavior.
|
||
assistantContent = (result.stdout || result.stderr || '(no output)').slice(0, 50_000);
|
||
outputSummary = (result.stdout || result.stderr).slice(0, 500);
|
||
|
||
if (assistantContent) {
|
||
broker.publishFrame(sessionId, {
|
||
type: 'delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: assistantContent,
|
||
} as WsFrame);
|
||
}
|
||
}
|
||
}
|
||
|
||
// F1: abort short-circuit BEFORE the unconditional 'complete' write. A Stop
|
||
// (cancelExternalTask → ac.abort) or shutdown finalizes the streaming row as
|
||
// 'cancelled' (keeping whatever streamed) instead of recording 'complete',
|
||
// and skips the diff. This one-shot path owns a per-task worktree, so we DO
|
||
// tear it down here (unlike the warm paths, which keep their persistent one).
|
||
if (ac.signal.aborted || stopping) {
|
||
await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent);
|
||
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
|
||
emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled');
|
||
emitTurnEnd(sessionId, taskId, 'cancelled', agent, task.model);
|
||
await cleanupWorktree(projectPath, taskId);
|
||
clearTaskCommands(taskId);
|
||
return;
|
||
}
|
||
|
||
await sql`
|
||
UPDATE messages
|
||
SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp()
|
||
WHERE id = ${assistantId}
|
||
`;
|
||
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_complete',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
model: task.model,
|
||
} as WsFrame);
|
||
|
||
// Step 3: Diff the worktree and queue pending changes
|
||
log.info({ taskId }, 'dispatcher: diffing worktree');
|
||
const diff = await diffWorktree(worktreePath, projectPath, { signal: ac.signal });
|
||
|
||
if (diff) {
|
||
// Queue a single pending_change entry with the full unified diff, stamped
|
||
// with the dispatched agent for DiffPanel attribution (v2.6 Phase 1-UX).
|
||
await sql`
|
||
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent)
|
||
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent})
|
||
`;
|
||
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff queued as pending change');
|
||
} else {
|
||
log.info({ taskId }, 'dispatcher: no changes detected in worktree');
|
||
}
|
||
|
||
// Step 4: Cleanup worktree
|
||
await cleanupWorktree(projectPath, taskId);
|
||
|
||
// Step 5: Aggregate token cost
|
||
const [extCostRow] = await sql<{ total: number | null }[]>`
|
||
SELECT SUM(tokens_used)::int AS total
|
||
FROM messages
|
||
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
|
||
`;
|
||
const extCostTokens = extCostRow?.total ?? null;
|
||
|
||
// Step 6: Mark task completed
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.info({ taskId, agent, costTokens: extCostTokens }, 'dispatcher: task completed (external)');
|
||
// #10: external-agent turn completed cleanly.
|
||
emitAgentStatus(sessionId, chatId, agent, 'idle', 'turn_complete');
|
||
emitTurnEnd(sessionId, taskId, 'completed', agent, task.model, outputSummary);
|
||
clearTaskCommands(taskId);
|
||
|
||
} catch (err) {
|
||
const errMsg = err instanceof Error ? err.message : String(err);
|
||
const status = classifyTerminalStatus({ aborted: ac.signal.aborted, error: err });
|
||
log.error({ taskId, agent, err: errMsg }, 'dispatcher: external agent error');
|
||
|
||
// Guard `NOT IN ('cancelled','completed')` so a genuine error in the catch
|
||
// never overwrites a state the cancel route already wrote (user-Stop wins).
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = ${status}, ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
|
||
WHERE id = ${taskId} AND state NOT IN ('cancelled', 'completed')
|
||
`.catch(() => {});
|
||
|
||
// F1 (OCE-001): finalize the streaming assistant message — the catch
|
||
// previously updated only `tasks` and left the message 'streaming' forever
|
||
// (the BooChat 5-min sweep runs in a different process and can't reach it).
|
||
await finalizeMessage(sessionId, chatId, assistantId, status, task.model);
|
||
|
||
// #10: external-agent turn failed/crashed. chatId may be unbound if the throw
|
||
// preceded its assignment — guard so the status publish never masks the real
|
||
// error.
|
||
if (chatId) emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'failed');
|
||
if (sessionId) emitTurnEnd(sessionId, taskId, status, agent, task.model, errMsg);
|
||
|
||
// Best-effort cleanup
|
||
await cleanupWorktree(projectPath, taskId);
|
||
clearTaskCommands(taskId);
|
||
}
|
||
}
|
||
|
||
// ─── Path B (opencode): warm OpenCode server backend (v2.6 1.7 + 1.10) ───────
|
||
|
||
// OpenCode runs ONE server per BooCoder process, shared across all sessions
|
||
// (the backend multiplexes sessions internally), so it's pooled under a fixed
|
||
// key (OPENCODE_POOL_KEY, shared with the lifecycle close-hook) rather than
|
||
// per-session. Warm ACP backends (Phase 2) are per (chat, agent).
|
||
function getOpenCodeBackend(installPath: string | null): AgentBackend {
|
||
let backend = agentPool.get(OPENCODE_POOL_KEY, 'opencode');
|
||
if (!backend) {
|
||
backend = new OpenCodeServerBackend({ sql, log, opencodeBinary: installPath ?? 'opencode' });
|
||
agentPool.register(OPENCODE_POOL_KEY, 'opencode', backend);
|
||
}
|
||
return backend;
|
||
}
|
||
|
||
async function runOpenCodeServerTask(
|
||
task: {
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
chat_id: string | null;
|
||
},
|
||
installPath: string | null,
|
||
): Promise<void> {
|
||
const taskId = task.id;
|
||
const agent = 'opencode';
|
||
log.info({ taskId, agent }, 'dispatcher: starting task (path B — opencode server)');
|
||
|
||
const [project] = await sql<{ path: string | null }[]>`
|
||
SELECT path FROM projects WHERE id = ${task.project_id}
|
||
`;
|
||
const projectPath = project?.path;
|
||
if (!projectPath) {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
return;
|
||
}
|
||
|
||
// F1: register the per-task abort controller so a Stop reaches this run.
|
||
const ac = taskControllers.register(taskId);
|
||
|
||
// #10: hoisted so the catch can report `error` with the (chat, agent) key.
|
||
let sessionId = '';
|
||
let chatId = '';
|
||
// F1: hoisted so the catch / abort short-circuit can finalize the streaming row.
|
||
let assistantId = '';
|
||
|
||
try {
|
||
// execution_path = 'acp' — the schema CHECK has no 'opencode_server' value
|
||
// (schema is frozen at Phase 0); the warm-vs-one-shot distinction lives in
|
||
// agent_sessions.backend. Reuse the closest existing value.
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
|
||
// Resolve session + chat. P1.5-b: the chat (tab) is the context key, so the
|
||
// chat_id MUST be non-null and stable before ensureSession. The coder message
|
||
// route + skills route stamp task.chat_id with the frontend tab's chat — use
|
||
// it directly. Session-less creators (arena, MCP, new_task, generic
|
||
// /api/tasks) leave it null; fall back to resolving/creating a real chat so
|
||
// ensureSession never receives a degenerate (null, agent) key.
|
||
if (task.chat_id && task.session_id) {
|
||
sessionId = task.session_id;
|
||
chatId = task.chat_id;
|
||
} else if (task.session_id) {
|
||
sessionId = task.session_id;
|
||
const chats = await sql<{ id: string }[]>`
|
||
SELECT id FROM chats WHERE session_id = ${sessionId} AND status = 'open' ORDER BY created_at DESC LIMIT 1
|
||
`;
|
||
if (chats.length === 0) {
|
||
const [chat] = await sql<{ id: string }[]>`
|
||
INSERT INTO chats (session_id, name, status)
|
||
VALUES (${sessionId}, 'External agent execution', 'open')
|
||
RETURNING id
|
||
`;
|
||
chatId = chat!.id;
|
||
} else {
|
||
chatId = chats[0]!.id;
|
||
}
|
||
} else {
|
||
const sessionName = `Task [${agent}]: ${task.input.slice(0, 30)}`;
|
||
const [session] = await sql<{ id: string }[]>`
|
||
INSERT INTO sessions (project_id, name, model, status)
|
||
VALUES (${task.project_id}, ${sessionName}, ${task.model ?? config.DEFAULT_MODEL}, 'open')
|
||
RETURNING id
|
||
`;
|
||
sessionId = session!.id;
|
||
const [chat] = await sql<{ id: string }[]>`
|
||
INSERT INTO chats (session_id, name, status)
|
||
VALUES (${sessionId}, 'External agent execution', 'open')
|
||
RETURNING id
|
||
`;
|
||
chatId = chat!.id;
|
||
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
|
||
}
|
||
|
||
if (!task.session_id) {
|
||
await sql`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
|
||
`;
|
||
}
|
||
|
||
// Persistent, session-keyed worktree (shared across turns; NOT torn down
|
||
// per turn — Phase 3 reaps it). Captures base_commit for a stable diff.
|
||
const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, {
|
||
signal: ac.signal,
|
||
});
|
||
log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready');
|
||
|
||
const [assistantMsg] = await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
assistantId = assistantMsg!.id;
|
||
|
||
// write-edit-robustness #4: pre-turn checkpoint of the persistent session
|
||
// worktree (best-effort; never breaks dispatch). worktreeId comes from the
|
||
// worktrees table (ensureSessionWorktree above).
|
||
await createCheckpoint(
|
||
sql,
|
||
{ chatId, sessionId, worktreeId, worktreePath, messageId: assistantId },
|
||
{ signal: ac.signal, log },
|
||
).catch(() => null);
|
||
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_started',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
role: 'assistant',
|
||
} as WsFrame);
|
||
|
||
// #10: opencode-server turn begins.
|
||
emitAgentStatus(sessionId, chatId, agent, 'working', 'turn_start');
|
||
|
||
const manifestCommands = getManifestCommands(agent);
|
||
if (manifestCommands.length > 0) {
|
||
setTaskCommands(taskId, manifestCommands);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'agent_commands',
|
||
task_id: taskId,
|
||
session_id: sessionId,
|
||
commands: manifestCommands,
|
||
} as WsFrame);
|
||
}
|
||
|
||
// Accumulate the turn's stream for persistence + the final message content.
|
||
const textChunks: string[] = [];
|
||
const reasoningChunks: string[] = [];
|
||
const toolSnaps = new Map<string, AcpToolSnapshot>();
|
||
// opencode's dcp plugin appends <dcp-message-id>…</dcp-message-id> to the
|
||
// text, streamed split across deltas — a per-chunk regex misses it (see
|
||
// dcp-strip.ts). Buffer text through a cross-chunk stripper so neither the
|
||
// live `delta` frames nor the persisted content ever carry the tag.
|
||
const dcp = makeDcpStreamStripper();
|
||
|
||
// Map transport-agnostic AgentEvents → the SAME WS frames the ACP path emits.
|
||
// This boundary is where message_id/chat_id get attached (the backend never
|
||
// owns them).
|
||
const onEvent = (e: AgentEvent): void => {
|
||
switch (e.type) {
|
||
case 'text': {
|
||
const safe = dcp.push(e.text);
|
||
if (safe) {
|
||
textChunks.push(safe);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: safe,
|
||
} as WsFrame);
|
||
}
|
||
break;
|
||
}
|
||
case 'reasoning':
|
||
reasoningChunks.push(e.text);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'reasoning_delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: e.text,
|
||
} as WsFrame);
|
||
break;
|
||
case 'tool_call':
|
||
case 'tool_update':
|
||
toolSnaps.set(e.toolCall.toolCallId, e.toolCall);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'tool_call',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
tool_call: snapshotToWireToolCall(e.toolCall),
|
||
} as WsFrame);
|
||
break;
|
||
case 'commands':
|
||
// opencode-server doesn't emit these today; ignore if it ever does.
|
||
break;
|
||
}
|
||
};
|
||
|
||
// opencode expects provider-prefixed model ids (e.g. 'llama-swap/qwen3.6-35b…').
|
||
// DEFAULT_MODEL is bare (no prefix) because native inference uses it directly
|
||
// against llama-swap. Coalesce empty string (frontend sends '' when no models
|
||
// listed) and prefix bare ids so parseModel always succeeds.
|
||
const rawModel = (task.model && task.model.trim()) || config.DEFAULT_MODEL;
|
||
const model = rawModel.includes('/') ? rawModel : `llama-swap/${rawModel}`;
|
||
const backend = getOpenCodeBackend(installPath);
|
||
const handle = await backend.ensureSession(sessionId, {
|
||
agent,
|
||
model,
|
||
chatId,
|
||
worktreePath,
|
||
worktreeId,
|
||
projectId: task.project_id,
|
||
});
|
||
const result = await backend.prompt(handle, task.input, {
|
||
worktreePath,
|
||
model,
|
||
signal: ac.signal,
|
||
onEvent,
|
||
});
|
||
// Phase 3: keep the pooled backend's slot warm across this (possibly long)
|
||
// turn so the idle sweep measures from turn END, not start.
|
||
agentPool.touch(OPENCODE_POOL_KEY, agent);
|
||
|
||
// Flush any text held back mid-tag at stream end (complete tags stripped).
|
||
const dcpTail = dcp.flush();
|
||
if (dcpTail) {
|
||
textChunks.push(dcpTail);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: dcpTail,
|
||
} as WsFrame);
|
||
}
|
||
|
||
const assistantContent = textChunks.join('').slice(0, 50_000);
|
||
const reasoningText = reasoningChunks.join('').slice(0, 200_000);
|
||
const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'opencode turn failed').slice(0, 500);
|
||
|
||
await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText);
|
||
|
||
// F1: abort short-circuit BEFORE the unconditional 'complete' write — fixes
|
||
// the warm success-path recording 'complete' on a Stop'd turn. The abort fired
|
||
// session.abort on the prompt only: the persistent session worktree is kept
|
||
// (no cleanup) and the pooled opencode server stays warm for the next turn.
|
||
if (ac.signal.aborted || stopping) {
|
||
await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent);
|
||
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
|
||
emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled');
|
||
emitTurnEnd(sessionId, taskId, 'cancelled', agent, task.model);
|
||
clearTaskCommands(taskId);
|
||
return; // worktree persists (no cleanup); backend stays warm
|
||
}
|
||
|
||
await sql`
|
||
UPDATE messages
|
||
SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp()
|
||
WHERE id = ${assistantId}
|
||
`;
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_complete',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
model: task.model,
|
||
} as WsFrame);
|
||
|
||
// 1.10: diff the persistent worktree against its captured baseline and
|
||
// SUPERSEDE the session's prior pending row (latest-wins, one accumulating
|
||
// diff) instead of stacking. Stamp agent for DiffPanel attribution.
|
||
const diff = await diffWorktree(worktreePath, projectPath, {
|
||
signal: ac.signal,
|
||
baseRef: baseCommit ?? 'HEAD',
|
||
});
|
||
if (diff) {
|
||
await sql`
|
||
DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending'
|
||
`;
|
||
await sql`
|
||
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent)
|
||
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent})
|
||
`;
|
||
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change');
|
||
} else {
|
||
log.info({ taskId }, 'dispatcher: no changes detected in session worktree');
|
||
}
|
||
|
||
// NO worktree cleanup — it's persistent (Phase 3 reaps it). Backend stays warm.
|
||
|
||
const [extCostRow] = await sql<{ total: number | null }[]>`
|
||
SELECT SUM(tokens_used)::int AS total
|
||
FROM messages
|
||
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
|
||
`;
|
||
const extCostTokens = extCostRow?.total ?? null;
|
||
|
||
const finalState = result.ok ? 'completed' : 'failed';
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.info({ taskId, agent, finalState, costTokens: extCostTokens }, 'dispatcher: task finished (opencode server)');
|
||
// #10: clean completion → idle; backend-reported failure → error.
|
||
emitAgentStatus(
|
||
sessionId,
|
||
chatId,
|
||
agent,
|
||
result.ok ? 'idle' : 'error',
|
||
result.ok ? 'turn_complete' : 'failed',
|
||
);
|
||
emitTurnEnd(sessionId, taskId, finalState, agent, task.model, outputSummary);
|
||
clearTaskCommands(taskId);
|
||
} catch (err) {
|
||
const errMsg = err instanceof Error ? err.message : String(err);
|
||
const status = classifyTerminalStatus({ aborted: ac.signal.aborted, error: err });
|
||
log.error({ taskId, agent, err: errMsg }, 'dispatcher: opencode server error');
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = ${status}, ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
|
||
WHERE id = ${taskId} AND state NOT IN ('cancelled', 'completed')
|
||
`.catch(() => {});
|
||
// F1 (OCE-001): finalize the streaming message (was left 'streaming').
|
||
await finalizeMessage(sessionId, chatId, assistantId, status, task.model);
|
||
// #10: turn crashed.
|
||
if (chatId) emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed');
|
||
if (sessionId) emitTurnEnd(sessionId, taskId, status, agent, task.model, errMsg);
|
||
clearTaskCommands(taskId);
|
||
// No worktree cleanup (persistent); backend stays warm for the next turn.
|
||
}
|
||
}
|
||
|
||
// ─── Path B (warm ACP): goose / qwen warm backend (v2.6 Phase 2) ─────────────
|
||
|
||
// Warm ACP backends are per (chat, agent): each owns ONE stdio process + ACP
|
||
// connection + session. Pool key = chatId; the AgentPool's secondary key is the
|
||
// agent. This mirrors agent_sessions' (chat_id, agent) PK.
|
||
function getWarmAcpBackend(chatId: string, agent: string, installPath: string | null): WarmAcpBackend {
|
||
let backend = agentPool.get(chatId, agent);
|
||
if (!backend) {
|
||
backend = new WarmAcpBackend({
|
||
sql,
|
||
log,
|
||
chatId,
|
||
agent,
|
||
installPath,
|
||
resolved: getResolvedRegistry().get(agent),
|
||
});
|
||
agentPool.register(chatId, agent, backend);
|
||
}
|
||
return backend as WarmAcpBackend;
|
||
}
|
||
|
||
async function runWarmAcpTask(
|
||
task: {
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
chat_id: string | null;
|
||
},
|
||
installPath: string | null,
|
||
): Promise<void> {
|
||
const taskId = task.id;
|
||
const agent = task.agent!;
|
||
// shouldUseWarmBackend guarantees both non-null before we get here.
|
||
const sessionId = task.session_id!;
|
||
const chatId = task.chat_id!;
|
||
log.info({ taskId, agent, chatId }, 'dispatcher: starting task (path B — warm ACP)');
|
||
|
||
const [project] = await sql<{ path: string | null }[]>`
|
||
SELECT path FROM projects WHERE id = ${task.project_id}
|
||
`;
|
||
const projectPath = project?.path;
|
||
if (!projectPath) {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
return;
|
||
}
|
||
|
||
// F1: register the per-task abort controller so a Stop reaches this run.
|
||
const ac = taskControllers.register(taskId);
|
||
// F1: hoisted so the catch / abort short-circuit can finalize the streaming row.
|
||
let assistantId = '';
|
||
|
||
try {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
|
||
// Persistent, session-keyed worktree (shared across turns + agents; NOT torn
|
||
// down per turn — Phase 3 reaps it). Same as the opencode-server path so a
|
||
// chat that switches opencode↔goose↔qwen shares one worktree.
|
||
const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, {
|
||
signal: ac.signal,
|
||
});
|
||
log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready (warm ACP)');
|
||
|
||
const [assistantMsg] = await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
assistantId = assistantMsg!.id;
|
||
|
||
// write-edit-robustness #4: pre-turn checkpoint of the persistent session
|
||
// worktree (best-effort; never breaks dispatch). Same worktree the opencode
|
||
// path uses — a chat that switches opencode↔goose↔qwen shares one worktree.
|
||
await createCheckpoint(
|
||
sql,
|
||
{ chatId, sessionId, worktreeId, worktreePath, messageId: assistantId },
|
||
{ signal: ac.signal, log },
|
||
).catch(() => null);
|
||
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_started',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
role: 'assistant',
|
||
} as WsFrame);
|
||
|
||
// #10: warm-ACP turn begins.
|
||
emitAgentStatus(sessionId, chatId, agent, 'working', 'turn_start');
|
||
|
||
const manifestCommands = getManifestCommands(agent);
|
||
if (manifestCommands.length > 0) {
|
||
setTaskCommands(taskId, manifestCommands);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'agent_commands',
|
||
task_id: taskId,
|
||
session_id: sessionId,
|
||
commands: manifestCommands,
|
||
} as WsFrame);
|
||
}
|
||
|
||
// Accumulate the turn's stream for persistence + the final message content.
|
||
const textChunks: string[] = [];
|
||
const reasoningChunks: string[] = [];
|
||
const toolSnaps = new Map<string, AcpToolSnapshot>();
|
||
|
||
// Map transport-agnostic AgentEvents → the SAME WS frames the one-shot ACP
|
||
// path emits (identical to runOpenCodeServerTask's onEvent). No dcp stripping:
|
||
// that's an opencode-plugin artifact; goose/qwen don't emit dcp tags.
|
||
const onEvent = (e: AgentEvent): void => {
|
||
switch (e.type) {
|
||
case 'text':
|
||
textChunks.push(e.text);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: e.text,
|
||
} as WsFrame);
|
||
break;
|
||
case 'reasoning':
|
||
reasoningChunks.push(e.text);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'reasoning_delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: e.text,
|
||
} as WsFrame);
|
||
break;
|
||
case 'tool_call':
|
||
case 'tool_update':
|
||
toolSnaps.set(e.toolCall.toolCallId, e.toolCall);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'tool_call',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
tool_call: snapshotToWireToolCall(e.toolCall),
|
||
} as WsFrame);
|
||
break;
|
||
case 'commands':
|
||
if (e.commands.length > 0) {
|
||
setTaskCommands(taskId, e.commands);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'agent_commands',
|
||
task_id: taskId,
|
||
session_id: sessionId,
|
||
commands: e.commands,
|
||
} as WsFrame);
|
||
}
|
||
break;
|
||
}
|
||
};
|
||
|
||
const model = task.model ?? undefined;
|
||
const backend = getWarmAcpBackend(chatId, agent, installPath);
|
||
const handle = await backend.ensureSession(sessionId, {
|
||
agent,
|
||
model: model ?? '',
|
||
chatId,
|
||
worktreePath,
|
||
worktreeId,
|
||
projectId: task.project_id,
|
||
});
|
||
const result = await backend.prompt(handle, task.input, {
|
||
worktreePath,
|
||
model: model ?? '',
|
||
signal: ac.signal,
|
||
onEvent,
|
||
taskId,
|
||
modeId: task.mode_id ?? undefined,
|
||
});
|
||
// Phase 3: keep the pooled (chat,agent) backend warm across the turn.
|
||
agentPool.touch(chatId, agent);
|
||
|
||
const assistantContent = textChunks.join('').slice(0, 50_000);
|
||
const reasoningText = reasoningChunks.join('').slice(0, 200_000);
|
||
const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'warm ACP turn failed').slice(0, 500);
|
||
|
||
await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText);
|
||
|
||
// F1: abort short-circuit BEFORE the unconditional 'complete' write — fixes
|
||
// the warm success-path recording 'complete' on a Stop'd turn. The abort fired
|
||
// session/cancel on the warm connection only (never killed the child), so the
|
||
// persistent worktree is kept and the pooled (chat,agent) backend stays warm.
|
||
if (ac.signal.aborted || stopping) {
|
||
await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent);
|
||
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
|
||
emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled');
|
||
emitTurnEnd(sessionId, taskId, 'cancelled', agent, task.model);
|
||
clearTaskCommands(taskId);
|
||
return; // worktree persists (no cleanup); backend stays warm
|
||
}
|
||
|
||
await sql`
|
||
UPDATE messages
|
||
SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp()
|
||
WHERE id = ${assistantId}
|
||
`;
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_complete',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
model: task.model,
|
||
} as WsFrame);
|
||
|
||
// Diff the persistent worktree against its captured baseline and SUPERSEDE
|
||
// the session's prior pending row (latest-wins) — identical to opencode.
|
||
const diff = await diffWorktree(worktreePath, projectPath, {
|
||
signal: ac.signal,
|
||
baseRef: baseCommit ?? 'HEAD',
|
||
});
|
||
if (diff) {
|
||
await sql`
|
||
DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending'
|
||
`;
|
||
await sql`
|
||
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent)
|
||
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent})
|
||
`;
|
||
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change (warm ACP)');
|
||
} else {
|
||
log.info({ taskId }, 'dispatcher: no changes detected in session worktree (warm ACP)');
|
||
}
|
||
|
||
// NO worktree cleanup — persistent (Phase 3 reaps it). Backend stays warm.
|
||
|
||
const [extCostRow] = await sql<{ total: number | null }[]>`
|
||
SELECT SUM(tokens_used)::int AS total
|
||
FROM messages
|
||
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
|
||
`;
|
||
const extCostTokens = extCostRow?.total ?? null;
|
||
|
||
const finalState = result.ok ? 'completed' : 'failed';
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.info({ taskId, agent, finalState }, 'dispatcher: task finished (warm ACP)');
|
||
// #10: clean completion → idle; backend-reported failure → error.
|
||
emitAgentStatus(
|
||
sessionId,
|
||
chatId,
|
||
agent,
|
||
result.ok ? 'idle' : 'error',
|
||
result.ok ? 'turn_complete' : 'failed',
|
||
);
|
||
emitTurnEnd(sessionId, taskId, finalState, agent, task.model, outputSummary);
|
||
clearTaskCommands(taskId);
|
||
} catch (err) {
|
||
const errMsg = err instanceof Error ? err.message : String(err);
|
||
const status = classifyTerminalStatus({ aborted: ac.signal.aborted, error: err });
|
||
log.error({ taskId, agent, err: errMsg }, 'dispatcher: warm ACP error');
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = ${status}, ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
|
||
WHERE id = ${taskId} AND state NOT IN ('cancelled', 'completed')
|
||
`.catch(() => {});
|
||
// F1 (OCE-001): finalize the streaming message (was left 'streaming').
|
||
await finalizeMessage(sessionId, chatId, assistantId, status, task.model);
|
||
// #10: turn crashed.
|
||
emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed');
|
||
emitTurnEnd(sessionId, taskId, status, agent, task.model, errMsg);
|
||
clearTaskCommands(taskId);
|
||
// No worktree cleanup (persistent); backend stays warm for the next turn.
|
||
}
|
||
}
|
||
|
||
// ─── Path B (claude SDK): warm Claude-SDK backend (v2.6 #9 Part 2) ───────────
|
||
|
||
// Claude-SDK backends are per (chat, agent) — each owns ONE persistent query()
|
||
// generator driven in streaming-input mode. Pool key = chatId (secondary = agent),
|
||
// mirroring agent_sessions' (chat_id, agent) PK + the warm-ACP pooling.
|
||
function getClaudeSdkBackend(chatId: string, agent: string, installPath: string | null): ClaudeSdkBackend {
|
||
let backend = agentPool.get(chatId, agent);
|
||
if (!backend) {
|
||
backend = new ClaudeSdkBackend({ sql, log, chatId, agent, installPath });
|
||
agentPool.register(chatId, agent, backend);
|
||
}
|
||
return backend as ClaudeSdkBackend;
|
||
}
|
||
|
||
async function runClaudeSdkTask(
|
||
task: {
|
||
id: string;
|
||
project_id: string;
|
||
input: string;
|
||
agent: string | null;
|
||
model: string | null;
|
||
mode_id: string | null;
|
||
thinking_option_id: string | null;
|
||
session_id: string | null;
|
||
chat_id: string | null;
|
||
},
|
||
installPath: string | null,
|
||
): Promise<void> {
|
||
const taskId = task.id;
|
||
const agent = task.agent!;
|
||
// shouldUseClaudeSdk guarantees both non-null before we get here.
|
||
const sessionId = task.session_id!;
|
||
const chatId = task.chat_id!;
|
||
log.info({ taskId, agent, chatId }, 'dispatcher: starting task (path B — claude SDK)');
|
||
|
||
const [project] = await sql<{ path: string | null }[]>`
|
||
SELECT path FROM projects WHERE id = ${task.project_id}
|
||
`;
|
||
const projectPath = project?.path;
|
||
if (!projectPath) {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = 'Project has no path — cannot create worktree'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
return;
|
||
}
|
||
|
||
// F1: register the per-task abort controller so a Stop reaches this run.
|
||
const ac = taskControllers.register(taskId);
|
||
// F1: hoisted so the catch / abort short-circuit can finalize the streaming row.
|
||
let assistantId = '';
|
||
|
||
try {
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = 'running', started_at = clock_timestamp(), execution_path = 'acp'
|
||
WHERE id = ${taskId}
|
||
`;
|
||
|
||
// Persistent, session-keyed worktree (shared across turns + agents; NOT torn
|
||
// down per turn — Phase 3 reaps it). Same as the opencode/warm-ACP paths so a
|
||
// chat that switches agents shares one worktree.
|
||
const { worktreeId, worktreePath, baseCommit } = await ensureSessionWorktree(sql, projectPath, sessionId, {
|
||
signal: ac.signal,
|
||
});
|
||
log.info({ taskId, worktreePath }, 'dispatcher: session worktree ready (claude SDK)');
|
||
|
||
const [assistantMsg] = await sql<{ id: string }[]>`
|
||
INSERT INTO messages (session_id, chat_id, role, content, status, model, created_at)
|
||
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', ${task.model}, clock_timestamp())
|
||
RETURNING id
|
||
`;
|
||
assistantId = assistantMsg!.id;
|
||
|
||
// write-edit-robustness #4: pre-turn checkpoint of the persistent session
|
||
// worktree (best-effort; never breaks dispatch).
|
||
await createCheckpoint(
|
||
sql,
|
||
{ chatId, sessionId, worktreeId, worktreePath, messageId: assistantId },
|
||
{ signal: ac.signal, log },
|
||
).catch(() => null);
|
||
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_started',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
role: 'assistant',
|
||
} as WsFrame);
|
||
|
||
// #10: claude-SDK turn begins.
|
||
emitAgentStatus(sessionId, chatId, agent, 'working', 'turn_start');
|
||
|
||
const manifestCommands = getManifestCommands(agent);
|
||
if (manifestCommands.length > 0) {
|
||
setTaskCommands(taskId, manifestCommands);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'agent_commands',
|
||
task_id: taskId,
|
||
session_id: sessionId,
|
||
commands: manifestCommands,
|
||
} as WsFrame);
|
||
}
|
||
|
||
// Accumulate the turn's stream for persistence + the final message content.
|
||
const textChunks: string[] = [];
|
||
const reasoningChunks: string[] = [];
|
||
const toolSnaps = new Map<string, AcpToolSnapshot>();
|
||
|
||
// Map transport-agnostic AgentEvents → the SAME WS frames the warm-ACP /
|
||
// opencode paths emit. This boundary attaches message_id/chat_id.
|
||
const onEvent = (e: AgentEvent): void => {
|
||
switch (e.type) {
|
||
case 'text':
|
||
textChunks.push(e.text);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: e.text,
|
||
} as WsFrame);
|
||
break;
|
||
case 'reasoning':
|
||
reasoningChunks.push(e.text);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'reasoning_delta',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
content: e.text,
|
||
} as WsFrame);
|
||
break;
|
||
case 'tool_call':
|
||
case 'tool_update':
|
||
toolSnaps.set(e.toolCall.toolCallId, e.toolCall);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'tool_call',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
tool_call: snapshotToWireToolCall(e.toolCall),
|
||
} as WsFrame);
|
||
break;
|
||
case 'commands':
|
||
if (e.commands.length > 0) {
|
||
setTaskCommands(taskId, e.commands);
|
||
broker.publishFrame(sessionId, {
|
||
type: 'agent_commands',
|
||
task_id: taskId,
|
||
session_id: sessionId,
|
||
commands: e.commands,
|
||
} as WsFrame);
|
||
}
|
||
break;
|
||
}
|
||
};
|
||
|
||
const model = task.model ?? undefined;
|
||
const backend = getClaudeSdkBackend(chatId, agent, installPath);
|
||
const handle = await backend.ensureSession(sessionId, {
|
||
agent,
|
||
model: model ?? '',
|
||
chatId,
|
||
worktreePath,
|
||
worktreeId,
|
||
projectId: task.project_id,
|
||
});
|
||
const result = await backend.prompt(handle, task.input, {
|
||
worktreePath,
|
||
model: model ?? '',
|
||
signal: ac.signal,
|
||
onEvent,
|
||
taskId,
|
||
modeId: task.mode_id ?? undefined,
|
||
});
|
||
// Phase 3: keep the pooled (chat,agent) backend warm across the turn.
|
||
agentPool.touch(chatId, agent);
|
||
|
||
const assistantContent = textChunks.join('').slice(0, 50_000);
|
||
const reasoningText = reasoningChunks.join('').slice(0, 200_000);
|
||
const outputSummary = (result.ok ? textChunks.join('') : result.error ?? 'claude SDK turn failed').slice(0, 500);
|
||
|
||
await persistExternalAgentTurn(sql, assistantId, [...toolSnaps.values()], reasoningText);
|
||
|
||
// F1: abort short-circuit BEFORE the unconditional 'complete' write — fixes
|
||
// the warm success-path recording 'complete' on a Stop'd turn. The abort fired
|
||
// the SDK interrupt on the same query generator only (never killed the warm
|
||
// process), so the persistent worktree is kept and the backend stays warm.
|
||
if (ac.signal.aborted || stopping) {
|
||
await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent);
|
||
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
|
||
emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled');
|
||
emitTurnEnd(sessionId, taskId, 'cancelled', agent, task.model);
|
||
clearTaskCommands(taskId);
|
||
return; // worktree persists (no cleanup); backend stays warm
|
||
}
|
||
|
||
// ctx_used/ctx_max from the SDK result (1M-aware) → the assistant message, so
|
||
// the ContextBar renders a real context-window fill for claude.
|
||
await sql`
|
||
UPDATE messages
|
||
SET content = ${assistantContent}, status = 'complete', finished_at = clock_timestamp(),
|
||
ctx_used = ${result.ctxUsed ?? null}, ctx_max = ${result.ctxMax ?? null}
|
||
WHERE id = ${assistantId}
|
||
`;
|
||
broker.publishFrame(sessionId, {
|
||
type: 'message_complete',
|
||
message_id: assistantId,
|
||
chat_id: chatId,
|
||
model: task.model,
|
||
} as WsFrame);
|
||
|
||
// Diff the persistent worktree against its captured baseline and SUPERSEDE
|
||
// the session's prior pending row (latest-wins) — identical to opencode/ACP.
|
||
const diff = await diffWorktree(worktreePath, projectPath, {
|
||
signal: ac.signal,
|
||
baseRef: baseCommit ?? 'HEAD',
|
||
});
|
||
if (diff) {
|
||
await sql`
|
||
DELETE FROM pending_changes WHERE session_id = ${sessionId} AND status = 'pending'
|
||
`;
|
||
await sql`
|
||
INSERT INTO pending_changes (session_id, task_id, file_path, operation, diff, agent)
|
||
VALUES (${sessionId}, ${taskId}, ${projectPath}, 'edit', ${diff}, ${agent})
|
||
`;
|
||
log.info({ taskId, diffLength: diff.length }, 'dispatcher: diff superseded prior pending change (claude SDK)');
|
||
} else {
|
||
log.info({ taskId }, 'dispatcher: no changes detected in session worktree (claude SDK)');
|
||
}
|
||
|
||
// NO worktree cleanup — persistent (Phase 3 reaps it). Backend stays warm.
|
||
|
||
const [extCostRow] = await sql<{ total: number | null }[]>`
|
||
SELECT SUM(tokens_used)::int AS total
|
||
FROM messages
|
||
WHERE session_id = ${sessionId} AND tokens_used IS NOT NULL
|
||
`;
|
||
const extCostTokens = extCostRow?.total ?? null;
|
||
|
||
const finalState = result.ok ? 'completed' : 'failed';
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = ${finalState}, ended_at = clock_timestamp(), output_summary = ${outputSummary}, cost_tokens = ${extCostTokens}
|
||
WHERE id = ${taskId}
|
||
`;
|
||
log.info({ taskId, agent, finalState }, 'dispatcher: task finished (claude SDK)');
|
||
// #10: clean completion → idle; backend-reported failure → error.
|
||
emitAgentStatus(
|
||
sessionId,
|
||
chatId,
|
||
agent,
|
||
result.ok ? 'idle' : 'error',
|
||
result.ok ? 'turn_complete' : 'failed',
|
||
);
|
||
emitTurnEnd(sessionId, taskId, finalState, agent, task.model, outputSummary);
|
||
clearTaskCommands(taskId);
|
||
} catch (err) {
|
||
const errMsg = err instanceof Error ? err.message : String(err);
|
||
const status = classifyTerminalStatus({ aborted: ac.signal.aborted, error: err });
|
||
log.error({ taskId, agent, err: errMsg }, 'dispatcher: claude SDK error');
|
||
await sql`
|
||
UPDATE tasks
|
||
SET state = ${status}, ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
|
||
WHERE id = ${taskId} AND state NOT IN ('cancelled', 'completed')
|
||
`.catch(() => {});
|
||
// F1 (OCE-001): finalize the streaming message (was left 'streaming').
|
||
await finalizeMessage(sessionId, chatId, assistantId, status, task.model);
|
||
// #10: turn crashed.
|
||
emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed');
|
||
emitTurnEnd(sessionId, taskId, status, agent, task.model, errMsg);
|
||
clearTaskCommands(taskId);
|
||
// No worktree cleanup (persistent); backend stays warm for the next turn.
|
||
}
|
||
}
|
||
|
||
// ─── Helpers ────────────────────────────────────────────────────────────────
|
||
|
||
async function waitForCompletion(assistantId: string): Promise<string> {
|
||
for (;;) {
|
||
if (stopping) return 'cancelled';
|
||
|
||
const [row] = await sql<{ status: string }[]>`
|
||
SELECT status FROM messages WHERE id = ${assistantId}
|
||
`;
|
||
const status = row?.status ?? 'failed';
|
||
if (status !== 'streaming') return status;
|
||
|
||
await sleep(COMPLETION_POLL_MS);
|
||
}
|
||
}
|
||
|
||
function sleep(ms: number): Promise<void> {
|
||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||
}
|
||
|
||
return {
|
||
cancelExternalTask,
|
||
start() {
|
||
log.info('dispatcher: starting poll loop + tasks_new listener');
|
||
|
||
// Fallback poll — catches notifications missed while the listen connection
|
||
// was down. The fast path is the NOTIFY listener below.
|
||
timer = setInterval(() => triggerPoll('interval'), POLL_INTERVAL_MS);
|
||
|
||
// Fast path: react immediately to new tasks. porsager reserves a dedicated
|
||
// connection and auto-resubscribes on reconnect; the onlisten callback
|
||
// fires on each (re)subscribe, so we kick a catch-up poll there too to
|
||
// sweep up anything inserted during a disconnect.
|
||
sql
|
||
.listen(
|
||
'tasks_new',
|
||
() => triggerPoll('notify'),
|
||
() => triggerPoll('listen-subscribed'),
|
||
)
|
||
.then((meta) => {
|
||
listener = meta;
|
||
})
|
||
.catch((err) => {
|
||
log.error({ err }, 'dispatcher: failed to LISTEN tasks_new — relying on poll fallback');
|
||
});
|
||
},
|
||
|
||
async stop() {
|
||
stopping = true;
|
||
if (timer) {
|
||
clearInterval(timer);
|
||
timer = null;
|
||
}
|
||
if (listener) {
|
||
await listener.unlisten().catch((err) => {
|
||
log.error({ err }, 'dispatcher: unlisten error');
|
||
});
|
||
listener = null;
|
||
}
|
||
if (inflight.size > 0) {
|
||
log.info({ count: inflight.size }, 'dispatcher: waiting for in-flight tasks');
|
||
await Promise.allSettled([...inflight.values()]);
|
||
}
|
||
log.info('dispatcher: stopped');
|
||
},
|
||
};
|
||
}
|