coder(dispatcher): react to new tasks via LISTEN/NOTIFY, poll as fallback
AFTER INSERT trigger on tasks fires pg_notify('tasks_new'); the dispatcher listens via porsager sql.listen and triggers an immediate poll, with the setInterval poll kept at 2s as a missed-notification safety net. Per-session guard unchanged (no double-dispatch).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -71,3 +71,22 @@ ALTER TABLE available_agents ADD COLUMN IF NOT EXISTS transport TEXT DEFAULT 'pt
|
|||||||
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS mode_id TEXT;
|
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS mode_id TEXT;
|
||||||
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS thinking_option_id TEXT;
|
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS thinking_option_id TEXT;
|
||||||
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS feature_values JSONB;
|
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS feature_values JSONB;
|
||||||
|
|
||||||
|
-- LISTEN/NOTIFY fast path: every tasks INSERT (from any call site — routes,
|
||||||
|
-- new_task tool, arena, MCP server) fires pg_notify('tasks_new') in the same
|
||||||
|
-- transaction, so the dispatcher reacts immediately instead of waiting for the
|
||||||
|
-- fallback poll. Postgres holds the notification until COMMIT, so the listener
|
||||||
|
-- always sees the committed row. A trigger covers all insert paths with no
|
||||||
|
-- app-code drift. Idempotent: re-applied on every startup.
|
||||||
|
CREATE OR REPLACE FUNCTION notify_tasks_new() RETURNS trigger AS $$
|
||||||
|
BEGIN
|
||||||
|
PERFORM pg_notify('tasks_new', '');
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
DROP TRIGGER IF EXISTS tasks_notify_new ON tasks;
|
||||||
|
CREATE TRIGGER tasks_notify_new
|
||||||
|
AFTER INSERT ON tasks
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION notify_tasks_new();
|
||||||
|
|||||||
@@ -24,16 +24,29 @@ interface Deps {
|
|||||||
config: Config;
|
config: Config;
|
||||||
}
|
}
|
||||||
|
|
||||||
const POLL_INTERVAL_MS = 5_000;
|
// LISTEN/NOTIFY ('tasks_new') is the fast path — the dispatcher reacts to new
|
||||||
|
// tasks immediately. The poll is only a safety net for notifications missed
|
||||||
|
// during a listen-connection drop (porsager auto-reconnects), so it can stay slow.
|
||||||
|
const POLL_INTERVAL_MS = 2_000;
|
||||||
const COMPLETION_POLL_MS = 2_000;
|
const COMPLETION_POLL_MS = 2_000;
|
||||||
|
|
||||||
export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<void> } {
|
export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<void> } {
|
||||||
const { sql, inference, broker, log, config } = deps;
|
const { sql, inference, broker, log, config } = deps;
|
||||||
let timer: ReturnType<typeof setInterval> | null = null;
|
let timer: ReturnType<typeof setInterval> | null = null;
|
||||||
|
let listener: { unlisten: () => Promise<void> } | null = null;
|
||||||
let running = false;
|
let running = false;
|
||||||
let stopping = false;
|
let stopping = false;
|
||||||
let inflightPromise: Promise<void> | null = null;
|
let inflightPromise: Promise<void> | null = null;
|
||||||
|
|
||||||
|
// Shared entry point for both the poll timer and the NOTIFY listener. poll()'s
|
||||||
|
// `running`/`stopping` guard makes this safe to call concurrently — a notify
|
||||||
|
// arriving mid-task returns immediately and never double-dispatches.
|
||||||
|
function triggerPoll(reason: string): void {
|
||||||
|
poll().catch((err) => {
|
||||||
|
log.error({ err, reason }, 'dispatcher: poll error');
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async function poll(): Promise<void> {
|
async function poll(): Promise<void> {
|
||||||
if (running || stopping) return;
|
if (running || stopping) return;
|
||||||
|
|
||||||
@@ -463,12 +476,28 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
|||||||
|
|
||||||
return {
|
return {
|
||||||
start() {
|
start() {
|
||||||
log.info('dispatcher: starting poll loop');
|
log.info('dispatcher: starting poll loop + tasks_new listener');
|
||||||
timer = setInterval(() => {
|
|
||||||
poll().catch((err) => {
|
// Fallback poll — catches notifications missed while the listen connection
|
||||||
log.error({ err }, 'dispatcher: poll error');
|
// was down. The fast path is the NOTIFY listener below.
|
||||||
|
timer = setInterval(() => triggerPoll('interval'), POLL_INTERVAL_MS);
|
||||||
|
|
||||||
|
// Fast path: react immediately to new tasks. porsager reserves a dedicated
|
||||||
|
// connection and auto-resubscribes on reconnect; the onlisten callback
|
||||||
|
// fires on each (re)subscribe, so we kick a catch-up poll there too to
|
||||||
|
// sweep up anything inserted during a disconnect.
|
||||||
|
sql
|
||||||
|
.listen(
|
||||||
|
'tasks_new',
|
||||||
|
() => triggerPoll('notify'),
|
||||||
|
() => triggerPoll('listen-subscribed'),
|
||||||
|
)
|
||||||
|
.then((meta) => {
|
||||||
|
listener = meta;
|
||||||
|
})
|
||||||
|
.catch((err) => {
|
||||||
|
log.error({ err }, 'dispatcher: failed to LISTEN tasks_new — relying on poll fallback');
|
||||||
});
|
});
|
||||||
}, POLL_INTERVAL_MS);
|
|
||||||
},
|
},
|
||||||
|
|
||||||
async stop() {
|
async stop() {
|
||||||
@@ -477,6 +506,12 @@ export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<v
|
|||||||
clearInterval(timer);
|
clearInterval(timer);
|
||||||
timer = null;
|
timer = null;
|
||||||
}
|
}
|
||||||
|
if (listener) {
|
||||||
|
await listener.unlisten().catch((err) => {
|
||||||
|
log.error({ err }, 'dispatcher: unlisten error');
|
||||||
|
});
|
||||||
|
listener = null;
|
||||||
|
}
|
||||||
if (inflightPromise) {
|
if (inflightPromise) {
|
||||||
log.info('dispatcher: waiting for in-flight task');
|
log.info('dispatcher: waiting for in-flight task');
|
||||||
await inflightPromise;
|
await inflightPromise;
|
||||||
|
|||||||
Reference in New Issue
Block a user