feat(coder): add hashline editing core + wire audit hooks into dispatch pipeline
Hashline editing: content-hash anchors for edit_file stale-patch detection.
Pure-JS xxHash32, line hash computation, validation with HashlineMismatchError,
256-entry hash dictionary. 6 files in apps/coder/src/services/hashline/.
Audit hooks: emitHook('tool.execute.after') wired in frame-emitter.ts for
completed/failed tool results. emitHook('turn.end') wired at terminal points
in dispatcher.ts (all 5 run functions: native, external, opencode, warm ACP,
claude SDK). Fire-and-forget, non-blocking.
This commit is contained in:
@@ -30,6 +30,7 @@ import {
|
||||
type TerminalMessageStatus,
|
||||
} from './finalize-message.js';
|
||||
import { shouldFailOnMissingAgent } from './flow-runner-decisions.js';
|
||||
import { emitHook } from '../plugins/host.js';
|
||||
|
||||
interface InferenceRunner {
|
||||
enqueue: (
|
||||
@@ -123,6 +124,22 @@ export function createDispatcher(deps: Deps): {
|
||||
publishAgentStatus(broker.publishFrame, sessionId, chatId, agent, status, reason);
|
||||
}
|
||||
|
||||
// EmitHook: fire-and-forget turn.end notification. Best-effort — a hook throwing
|
||||
// is silently swallowed so it never blocks the dispatch flow.
|
||||
function emitTurnEnd(
|
||||
sessionId: string,
|
||||
taskId: string,
|
||||
state: string,
|
||||
agent?: string | null,
|
||||
model?: string | null,
|
||||
outputSummary?: string,
|
||||
): void {
|
||||
void emitHook('turn.end', {
|
||||
sessionId,
|
||||
turnSummary: { taskId, state, agent, model: model ?? undefined, outputSummary },
|
||||
});
|
||||
}
|
||||
|
||||
// F1 (OCE-001/OCE-002): finalize a streaming assistant message into a terminal
|
||||
// state and publish the matching message_complete frame. Best-effort + idempotent
|
||||
// (the helper's `WHERE status='streaming'` guard) — a failure here must never mask
|
||||
@@ -318,6 +335,7 @@ export function createDispatcher(deps: Deps): {
|
||||
|
||||
// Declared before try so the catch block can write it back on the task row.
|
||||
let chatId: string | null = null;
|
||||
let sessionId: string | undefined;
|
||||
|
||||
try {
|
||||
// Mark running
|
||||
@@ -330,7 +348,6 @@ export function createDispatcher(deps: Deps): {
|
||||
// Session setup: reuse a pre-created session (e.g. Q&A arena contestants
|
||||
// whose persona is stamped on the session via agent_id) or create a fresh one.
|
||||
const model = task.model ?? config.DEFAULT_MODEL;
|
||||
let sessionId: string;
|
||||
if (task.session_id) {
|
||||
sessionId = task.session_id;
|
||||
} else {
|
||||
@@ -377,6 +394,7 @@ export function createDispatcher(deps: Deps): {
|
||||
SET state = 'cancelled', ended_at = clock_timestamp()
|
||||
WHERE id = ${taskId}
|
||||
`;
|
||||
if (sessionId) emitTurnEnd(sessionId, taskId, 'cancelled', null, task.model);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -399,6 +417,7 @@ export function createDispatcher(deps: Deps): {
|
||||
WHERE id = ${taskId}
|
||||
`;
|
||||
log.info({ taskId, costTokens }, 'dispatcher: task completed (native)');
|
||||
emitTurnEnd(sessionId, taskId, 'completed', null, task.model, summary);
|
||||
} else {
|
||||
const [msg] = await sql<{ content: string | null }[]>`
|
||||
SELECT content FROM messages WHERE id = ${assistantId}
|
||||
@@ -410,6 +429,7 @@ export function createDispatcher(deps: Deps): {
|
||||
WHERE id = ${taskId}
|
||||
`;
|
||||
log.warn({ taskId, finalStatus }, 'dispatcher: task failed (native)');
|
||||
emitTurnEnd(sessionId, taskId, 'failed', null, task.model, summary);
|
||||
}
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
@@ -419,6 +439,7 @@ export function createDispatcher(deps: Deps): {
|
||||
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}, chat_id = ${chatId}
|
||||
WHERE id = ${taskId}
|
||||
`.catch(() => {});
|
||||
if (sessionId) emitTurnEnd(sessionId, taskId, 'failed', null, task.model, errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -684,6 +705,7 @@ export function createDispatcher(deps: Deps): {
|
||||
await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent);
|
||||
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
|
||||
emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled');
|
||||
emitTurnEnd(sessionId, taskId, 'cancelled', agent, task.model);
|
||||
await cleanupWorktree(projectPath, taskId);
|
||||
clearTaskCommands(taskId);
|
||||
return;
|
||||
@@ -738,6 +760,7 @@ export function createDispatcher(deps: Deps): {
|
||||
log.info({ taskId, agent, costTokens: extCostTokens }, 'dispatcher: task completed (external)');
|
||||
// #10: external-agent turn completed cleanly.
|
||||
emitAgentStatus(sessionId, chatId, agent, 'idle', 'turn_complete');
|
||||
emitTurnEnd(sessionId, taskId, 'completed', agent, task.model, outputSummary);
|
||||
clearTaskCommands(taskId);
|
||||
|
||||
} catch (err) {
|
||||
@@ -762,6 +785,7 @@ export function createDispatcher(deps: Deps): {
|
||||
// preceded its assignment — guard so the status publish never masks the real
|
||||
// error.
|
||||
if (chatId) emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'failed');
|
||||
if (sessionId) emitTurnEnd(sessionId, taskId, status, agent, task.model, errMsg);
|
||||
|
||||
// Best-effort cleanup
|
||||
await cleanupWorktree(projectPath, taskId);
|
||||
@@ -1030,6 +1054,7 @@ export function createDispatcher(deps: Deps): {
|
||||
await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent);
|
||||
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
|
||||
emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled');
|
||||
emitTurnEnd(sessionId, taskId, 'cancelled', agent, task.model);
|
||||
clearTaskCommands(taskId);
|
||||
return; // worktree persists (no cleanup); backend stays warm
|
||||
}
|
||||
@@ -1090,6 +1115,7 @@ export function createDispatcher(deps: Deps): {
|
||||
result.ok ? 'idle' : 'error',
|
||||
result.ok ? 'turn_complete' : 'failed',
|
||||
);
|
||||
emitTurnEnd(sessionId, taskId, finalState, agent, task.model, outputSummary);
|
||||
clearTaskCommands(taskId);
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
@@ -1104,6 +1130,7 @@ export function createDispatcher(deps: Deps): {
|
||||
await finalizeMessage(sessionId, chatId, assistantId, status, task.model);
|
||||
// #10: turn crashed.
|
||||
if (chatId) emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed');
|
||||
if (sessionId) emitTurnEnd(sessionId, taskId, status, agent, task.model, errMsg);
|
||||
clearTaskCommands(taskId);
|
||||
// No worktree cleanup (persistent); backend stays warm for the next turn.
|
||||
}
|
||||
@@ -1308,6 +1335,7 @@ export function createDispatcher(deps: Deps): {
|
||||
await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent);
|
||||
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
|
||||
emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled');
|
||||
emitTurnEnd(sessionId, taskId, 'cancelled', agent, task.model);
|
||||
clearTaskCommands(taskId);
|
||||
return; // worktree persists (no cleanup); backend stays warm
|
||||
}
|
||||
@@ -1367,6 +1395,7 @@ export function createDispatcher(deps: Deps): {
|
||||
result.ok ? 'idle' : 'error',
|
||||
result.ok ? 'turn_complete' : 'failed',
|
||||
);
|
||||
emitTurnEnd(sessionId, taskId, finalState, agent, task.model, outputSummary);
|
||||
clearTaskCommands(taskId);
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
@@ -1381,6 +1410,7 @@ export function createDispatcher(deps: Deps): {
|
||||
await finalizeMessage(sessionId, chatId, assistantId, status, task.model);
|
||||
// #10: turn crashed.
|
||||
emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed');
|
||||
emitTurnEnd(sessionId, taskId, status, agent, task.model, errMsg);
|
||||
clearTaskCommands(taskId);
|
||||
// No worktree cleanup (persistent); backend stays warm for the next turn.
|
||||
}
|
||||
@@ -1576,6 +1606,7 @@ export function createDispatcher(deps: Deps): {
|
||||
await finalizeMessage(sessionId, chatId, assistantId, 'cancelled', task.model, assistantContent);
|
||||
await sql`UPDATE tasks SET state = 'cancelled', ended_at = clock_timestamp() WHERE id = ${taskId}`;
|
||||
emitAgentStatus(sessionId, chatId, agent, 'idle', stopping ? 'shutdown' : 'cancelled');
|
||||
emitTurnEnd(sessionId, taskId, 'cancelled', agent, task.model);
|
||||
clearTaskCommands(taskId);
|
||||
return; // worktree persists (no cleanup); backend stays warm
|
||||
}
|
||||
@@ -1638,6 +1669,7 @@ export function createDispatcher(deps: Deps): {
|
||||
result.ok ? 'idle' : 'error',
|
||||
result.ok ? 'turn_complete' : 'failed',
|
||||
);
|
||||
emitTurnEnd(sessionId, taskId, finalState, agent, task.model, outputSummary);
|
||||
clearTaskCommands(taskId);
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
@@ -1652,6 +1684,7 @@ export function createDispatcher(deps: Deps): {
|
||||
await finalizeMessage(sessionId, chatId, assistantId, status, task.model);
|
||||
// #10: turn crashed.
|
||||
emitAgentStatus(sessionId, chatId, agent, status === 'cancelled' ? 'idle' : 'error', status === 'cancelled' ? 'cancelled' : 'crashed');
|
||||
emitTurnEnd(sessionId, taskId, status, agent, task.model, errMsg);
|
||||
clearTaskCommands(taskId);
|
||||
// No worktree cleanup (persistent); backend stays warm for the next turn.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user