Compare commits

..

2 Commits

Author SHA1 Message Date
752ea74f43 v2.0.0-final: dispatcher + task queue + agent probing
Phase 4 of v2.0. BooCoder can now queue tasks and dispatch them
through the inference loop autonomously.

Dispatcher (services/dispatcher.ts): in-process setInterval(5s) polls
tasks WHERE state='pending', picks one at a time, creates an isolated
session+chat, enqueues inference with the task's input as the user
message, polls for completion, marks state completed/failed with
output_summary. Single-task-at-a-time for v2.0.0; parallel dispatch
is a Phase 5+ concern. Respects onClose hook for graceful shutdown.

Task routes (routes/tasks.ts): POST /api/tasks (create), GET /api/tasks
(list with state/project filters), GET /api/tasks/:id (detail),
POST /api/tasks/:id/cancel (marks cancelled, aborts if running).

Agent probe (services/agent-probe.ts): on startup, probes PATH for
opencode/goose/claude/pi via which + --version. UPSERTs into
available_agents table. Finds nothing inside the container (expected —
Phase 5 addresses host-agent access via ACP/PTY).

Schema: ALTER TABLE tasks ADD COLUMN IF NOT EXISTS session_id (links
task to its auto-created inference session for isolation).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 03:55:18 +00:00
73b53089b0 CLAUDE.md: v2.0.0 architecture docs — BooCoder, DB rename, MCP config, workspace deps
Session learnings applied:
- Database renamed boochat (from boocode), new tables documented
- BooCoder architecture section: workspace dep pattern, write tools,
  coder pane integration, proxy routing
- Environment: MCP_CONFIG_PATH, BooCoder health at :9502
- Workflow: Go binary at /snap/go/current/bin, codecontext fork location
- Conventions: workspace exports with types conditions, Docker build order

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 03:51:24 +00:00
6 changed files with 412 additions and 2 deletions

View File

@@ -69,6 +69,14 @@ Key services:
Route registration: all routes registered in `index.ts` via `register*Routes(app, sql, ...)` functions. Routes are in `routes/*.ts`.
### BooCoder (`apps/coder/src/`)
- Write-capable coding agent. Separate Fastify server at port 9502, same docker network (`boocode_net`).
- **Workspace dependency on `@boocode/server`**: imports `createInferenceRunner`, `createBroker`, `ALL_TOOLS`, `appendMcpTools` from the server's compiled `dist/`. apps/server's `package.json` has an `exports` map with `types` conditions for NodeNext resolution. apps/server must build FIRST (Dockerfile builds server → coder).
- `apps/server/tsconfig.json` has `declaration: true` so `.d.ts` files exist for workspace consumers.
- Write tools (`edit_file`, `create_file`, `delete_file`, `apply_pending`, `rewind`) queue in `pending_changes` table. Nothing hits disk until `apply_pending` is called. `write_guard.ts` validates paths (resolve + prefix-check, no realpath since files may not exist for creates).
- Frontend: NOT a separate SPA. BooCoder is a `'coder'` pane type within BooChat's SPA (`apps/web/`). `CoderPane.tsx` in `apps/web/src/components/panes/`. API requests go through `/api/coder/*` proxy (Vite dev + Fastify production) which rewrites to `http://boocoder:3000/api/*`. WS connects directly to `:9502`.
### Frontend (`apps/web/src/`)
- **React 18** + React Router v6 + **Tailwind v4** + shadcn/radix-ui primitives.
@@ -105,14 +113,16 @@ Sessions hold 15 panes (chat / empty / placeholder terminal+agent). v1.12.1 m
## Database
PostgreSQL 16. Tables: `projects`, `sessions`, `chats`, `messages`, `settings`, `message_parts` (v1.13.0). Views: `messages_with_parts` (v1.13.1-B parts-merge read path), `tool_cost_stats` (v1.13.10 per-tool 100-call rolling window). (`session_panes` was dropped in v1.12.1; workspace pane state lives in `sessions.workspace_panes jsonb`.) Schema applied idempotently on startup via `applySchema()`. Use `clock_timestamp()` (not `NOW()`) inside transactions. CHECK constraints in place: `projects_status_chk` ('open'|'archived'), `sessions_status_chk` (same), `chats_status_chk` (same), `messages_role_chk`, `messages_status_chk` — keep in sync with the `*_STATUSES` const arrays in `apps/server/src/types/api.ts`. The older anonymous `messages_status_check` (without 'cancelled') and `messages_role_check` (without 'system') were dropped in v1.12.1; only the `_chk` variants remain.
PostgreSQL 16. Database name: `boochat` (renamed from `boocode` in v2.0.0-alpha; Docker service name stays `boocode_db`). Tables: `projects`, `sessions`, `chats`, `messages`, `settings`, `message_parts` (v1.13.0), `pending_changes` (v2.0.0), `tasks` (v2.0.0), `available_agents` (v2.0.0). Views: `messages_with_parts` (v1.13.1-B parts-merge read path), `tool_cost_stats` (v1.13.10 per-tool 100-call rolling window), `human_inbox` (v2.0.0 — tasks WHERE state IN blocked/failed). (`session_panes` was dropped in v1.12.1; workspace pane state lives in `sessions.workspace_panes jsonb`.) Schema applied idempotently on startup via `applySchema()`. Use `clock_timestamp()` (not `NOW()`) inside transactions. CHECK constraints in place: `projects_status_chk` ('open'|'archived'), `sessions_status_chk` (same), `chats_status_chk` (same), `messages_role_chk`, `messages_status_chk` — keep in sync with the `*_STATUSES` const arrays in `apps/server/src/types/api.ts`. The older anonymous `messages_status_check` (without 'cancelled') and `messages_role_check` (without 'system') were dropped in v1.12.1; only the `_chk` variants remain.
Schema CHECK migration order when renaming allowed values: (1) `ALTER TABLE ... DROP CONSTRAINT IF EXISTS <system_name>` (inline `CREATE TABLE` checks get `<table>_<column>_check`), (2) `UPDATE` rows to new values, (3) wrap new constraint ADD in `DO $$ ... pg_constraint` guard — that block is the only way to get `ADD CONSTRAINT IF NOT EXISTS`.
## Environment
Required: `DATABASE_URL`, `LLAMA_SWAP_URL`. Optional: `PORT` (3000), `HOST` (0.0.0.0), `PROJECT_ROOT_WHITELIST` (/opt, read-only scope for add-existing path resolution), `BOOTSTRAP_ROOT` (/opt/projects, writable scope for create-new-project bootstrap mkdir target — host must `mkdir -p /opt/projects` before container start), `DEFAULT_MODEL`, `LOG_LEVEL`, `SEARXNG_URL` (default `http://100.114.205.53:8888` — internal Tailscale Fathom; the public `search.indifferentketchup.com` is behind Authelia and unusable from server context), `BOOCODE_TOOLS` (`core` | `standard` | `all`, default `all`; v1.13.15-tools tier filter — ceiling, never expands an agent's whitelist).
Required: `DATABASE_URL`, `LLAMA_SWAP_URL`. Optional: `PORT` (3000), `HOST` (0.0.0.0), `PROJECT_ROOT_WHITELIST` (/opt, read-only scope for add-existing path resolution), `BOOTSTRAP_ROOT` (/opt/projects, writable scope for create-new-project bootstrap mkdir target — host must `mkdir -p /opt/projects` before container start), `DEFAULT_MODEL`, `LOG_LEVEL`, `SEARXNG_URL` (default `http://100.114.205.53:8888` — internal Tailscale Fathom; the public `search.indifferentketchup.com` is behind Authelia and unusable from server context), `BOOCODE_TOOLS` (`core` | `standard` | `all`, default `all`; v1.13.15-tools tier filter — ceiling, never expands an agent's whitelist), `MCP_CONFIG_PATH` (optional; default `/data/mcp.json` — JSON config for MCP servers matching opencode's `mcpServers` shape; file missing = no MCP).
BooCoder at port 9502: `curl http://100.114.205.53:9502/api/health`. Same Tailscale IP binding as BooChat. Health reports tool count: `{"ok":true,"db":true,"tools":30}`.
## Workflow
@@ -134,6 +144,8 @@ Required: `DATABASE_URL`, `LLAMA_SWAP_URL`. Optional: `PORT` (3000), `HOST` (0.0
- `/opt/boolab` hosts a working sibling BooCode terminal at `boocode.indifferentketchup.com`. Useful for visual side-by-side comparison on the same iPhone when debugging booterm rendering. Boolab uses Tailwind v3 (`@tailwind base`); boocode uses v4 — many subtle build differences. Don't assume parity.
- booterm SSHs to the host as `samkintop@100.114.205.53` (the Tailscale IP). The hostname `ubuntu-homelab` (shown in the bash prompt after login) does NOT resolve from inside the container — only the host's `/etc/hosts` knows it. Override via `BOOTERM_SSH_HOST` / `BOOTERM_SSH_USER` env vars in docker-compose if you ever move the shell to a different machine.
- codecontext sidecar lives at `/opt/boocode/codecontext/`. Sidecar HTTP API at `http://codecontext:8080/v1/<tool_name>` over the `boocode_net` bridge (no host port). BooCode wrappers in `apps/server/src/services/tools/codecontext/`. The `.codecontextignore.template` documents recommended ignore patterns; users copy and adapt to project root manually.
- codecontext fork at `/opt/forks/codecontext/` — separate git repo (branch `boocode-ts`), pushed via the same boocode_gitea SSH key to `indifferentketchup/codecontext`. Build: `go build ./...`. Test: `go test ./...`. Docker rebuild: `docker compose build --no-cache codecontext`.
- Go binary: `/snap/go/current/bin/go` (not on PATH by default). Use `export PATH=$PATH:/snap/go/current/bin` or full path for Go commands.
- `os/exec` child supervisors must explicitly call `child.Wait()` in a goroutine and `os.Exit` on child death. `Signal(0)` returns nil on zombies and is NOT a liveness check. Without `Wait()`, docker's `restart: unless-stopped` policy never fires because the parent stays alive. The `codecontext/shim.go` implementation is the reference pattern.
## Conventions
@@ -156,3 +168,5 @@ Required: `DATABASE_URL`, `LLAMA_SWAP_URL`. Optional: `PORT` (3000), `HOST` (0.0
- Tool-name whitelists must derive from `ALL_TOOLS` in `services/tools.ts`, never hardcoded. `services/agents.ts` `ALL_TOOL_NAMES` had this drift class until v1.12 — same pattern applies to any future tool-aware code.
- Agent registry lives at `data/AGENTS.md` (global, bind-mounted at `/data/AGENTS.md`). No per-project `AGENTS.md` in this repo — removed in v1.12 to eliminate the two-files-must-stay-in-sync drift. The `getAgentsForProject` per-project override mechanism remains for *other* projects.
- MCP stdio transport uses newline-delimited JSON (NDJSON), NOT LSP-style `Content-Length` headers. The `codecontext/shim.go` framing implementation is the reference; per the MCP spec (modelcontextprotocol.io/specification/server/transports).
- **Workspace dependency pattern** (`apps/coder``@boocode/server`): the consuming package adds `"@boocode/server": "workspace:*"` in `package.json`. The provider's `package.json` needs `exports` with `types` + `default` conditions per subpath: `"./inference": { "types": "./dist/.../index.d.ts", "default": "./dist/.../index.js" }`. Without the `types` condition, NodeNext resolution can't find `.d.ts` files and tsc fails with "Cannot find module" in the consumer.
- **Docker build order for workspace deps**: the Dockerfile must `COPY` + `RUN pnpm build` the provider app BEFORE the consumer app. `apps/coder/Dockerfile` builds `apps/server` first, then `apps/coder`.

View File

@@ -23,7 +23,11 @@ import { setInferenceContext, clearInferenceContext } from './services/tools/inf
// Routes
import { registerMessageRoutes } from './routes/messages.js';
import { registerPendingRoutes } from './routes/pending.js';
import { registerTaskRoutes } from './routes/tasks.js';
import { registerWebSocket } from './routes/ws.js';
// Phase 4: dispatcher + agent probe
import { createDispatcher } from './services/dispatcher.js';
import { probeAgents } from './services/agent-probe.js';
async function main() {
const config = loadConfig();
@@ -113,9 +117,18 @@ async function main() {
});
});
// Phase 4: probe available agents on startup
await probeAgents(sql, app.log);
// Phase 4: dispatcher — polls tasks table and runs inference
const dispatcher = createDispatcher({ sql, inference: inferenceApi, broker, log: app.log, config });
dispatcher.start();
app.addHook('onClose', () => dispatcher.stop());
// Register routes
registerMessageRoutes(app, sql, broker, inferenceApi);
registerPendingRoutes(app, sql);
registerTaskRoutes(app, sql, inferenceApi);
registerWebSocket(app, sql, broker);
// Serve static frontend (built web app). In production, the dist/ is

View File

@@ -0,0 +1,138 @@
import type { FastifyInstance } from 'fastify';
import { z } from 'zod';
import type { Sql } from '../db.js';
interface InferenceApi {
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
}
const CreateBody = z.object({
project_id: z.string().uuid(),
input: z.string().min(1).max(64_000),
agent: z.string().max(100).optional(),
model: z.string().max(200).optional(),
});
const ListQuery = z.object({
state: z.enum(['pending', 'running', 'completed', 'failed', 'blocked', 'cancelled']).optional(),
project_id: z.string().uuid().optional(),
});
export function registerTaskRoutes(app: FastifyInstance, sql: Sql, inference: InferenceApi): void {
// POST /api/tasks — create a new task
app.post('/api/tasks', async (req, reply) => {
const parsed = CreateBody.safeParse(req.body);
if (!parsed.success) {
reply.code(400);
return { error: 'invalid body', details: parsed.error.flatten() };
}
const { project_id, input, agent, model } = parsed.data;
const [task] = await sql<{ id: string; state: string }[]>`
INSERT INTO tasks (project_id, input, agent, model)
VALUES (${project_id}, ${input}, ${agent ?? null}, ${model ?? null})
RETURNING id, state
`;
reply.code(201);
return { id: task!.id, state: task!.state };
});
// GET /api/tasks — list tasks with optional filters
app.get('/api/tasks', async (req, _reply) => {
const parsed = ListQuery.safeParse(req.query);
if (!parsed.success) {
return { error: 'invalid query', details: parsed.error.flatten() };
}
const { state, project_id } = parsed.data;
// Build query with optional filters
if (state && project_id) {
return sql`
SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at
FROM tasks
WHERE state = ${state} AND project_id = ${project_id}
ORDER BY created_at DESC
LIMIT 100
`;
} else if (state) {
return sql`
SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at
FROM tasks
WHERE state = ${state}
ORDER BY created_at DESC
LIMIT 100
`;
} else if (project_id) {
return sql`
SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at
FROM tasks
WHERE project_id = ${project_id}
ORDER BY created_at DESC
LIMIT 100
`;
} else {
return sql`
SELECT id, project_id, state, input, output_summary, agent, model, execution_path, session_id, started_at, ended_at, created_at
FROM tasks
ORDER BY created_at DESC
LIMIT 100
`;
}
});
// GET /api/tasks/:id — single task detail
app.get<{ Params: { id: string } }>('/api/tasks/:id', async (req, reply) => {
const rows = await sql`
SELECT id, project_id, parent_task_id, state, input, output_summary, agent, model, execution_path, worktree_path, session_id, cost_tokens, started_at, ended_at, created_at
FROM tasks
WHERE id = ${req.params.id}
`;
if (rows.length === 0) {
reply.code(404);
return { error: 'task not found' };
}
return rows[0];
});
// POST /api/tasks/:id/cancel — cancel a pending or running task
app.post<{ Params: { id: string } }>('/api/tasks/:id/cancel', async (req, reply) => {
const taskId = req.params.id;
// Get current task state + session info
const rows = await sql<{ id: string; state: string; session_id: string | null }[]>`
SELECT id, state, session_id FROM tasks WHERE id = ${taskId}
`;
if (rows.length === 0) {
reply.code(404);
return { error: 'task not found' };
}
const task = rows[0]!;
if (task.state !== 'pending' && task.state !== 'running') {
reply.code(409);
return { error: `cannot cancel task in state '${task.state}'` };
}
// If running, try to cancel inference
if (task.state === 'running' && task.session_id) {
// Find active chat in the task's session
const chats = await sql<{ id: string }[]>`
SELECT id FROM chats WHERE session_id = ${task.session_id} AND status = 'open'
`;
for (const chat of chats) {
await inference.cancel(task.session_id, chat.id);
}
}
await sql`
UPDATE tasks
SET state = 'cancelled', ended_at = clock_timestamp()
WHERE id = ${taskId} AND state IN ('pending', 'running')
`;
return { cancelled: true };
});
}

View File

@@ -43,6 +43,9 @@ CREATE TABLE IF NOT EXISTS available_agents (
last_probed_at TIMESTAMPTZ
);
-- v2.0.0 Phase 4: link tasks to their inference sessions.
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS session_id UUID REFERENCES sessions(id);
-- Human inbox: tasks needing attention
CREATE OR REPLACE VIEW human_inbox AS
SELECT * FROM tasks WHERE state IN ('blocked', 'failed');

View File

@@ -0,0 +1,51 @@
import { execFile } from 'node:child_process';
import { promisify } from 'node:util';
import type { Sql } from '../db.js';
import type { FastifyBaseLogger } from 'fastify';
const execFileAsync = promisify(execFile);
const KNOWN_AGENTS: Array<{ name: string; supportsAcp: boolean }> = [
{ name: 'opencode', supportsAcp: true },
{ name: 'goose', supportsAcp: true },
{ name: 'claude', supportsAcp: false },
{ name: 'pi', supportsAcp: false },
];
export async function probeAgents(sql: Sql, log: FastifyBaseLogger): Promise<void> {
log.info('agent-probe: scanning PATH for known agents');
for (const agent of KNOWN_AGENTS) {
try {
// Check if the agent binary is on PATH
const { stdout: whichOut } = await execFileAsync('which', [agent.name], { timeout: 5_000 });
const installPath = whichOut.trim();
if (!installPath) continue;
// Get version
let version: string | null = null;
try {
const { stdout: verOut } = await execFileAsync(agent.name, ['--version'], { timeout: 10_000 });
version = verOut.trim().slice(0, 100);
} catch {
// Some agents may not support --version — that's fine
}
// UPSERT into available_agents
await sql`
INSERT INTO available_agents (name, install_path, version, supports_acp, last_probed_at)
VALUES (${agent.name}, ${installPath}, ${version}, ${agent.supportsAcp}, clock_timestamp())
ON CONFLICT (name) DO UPDATE SET
install_path = EXCLUDED.install_path,
version = EXCLUDED.version,
supports_acp = EXCLUDED.supports_acp,
last_probed_at = EXCLUDED.last_probed_at
`;
log.info({ agent: agent.name, version, installPath }, 'agent-probe: found');
} catch {
// Agent not found on PATH — skip silently
}
}
log.info('agent-probe: scan complete');
}

View File

@@ -0,0 +1,191 @@
import type { Sql } from '../db.js';
import type { FastifyBaseLogger } from 'fastify';
import type { Broker } from '@boocode/server/broker';
import type { Config } from '../config.js';
interface InferenceRunner {
enqueue: (sessionId: string, chatId: string, assistantId: string, user: string) => void;
cancel: (sessionId: string, chatId: string) => Promise<boolean>;
hasActive: (chatId: string) => boolean;
}
interface Deps {
sql: Sql;
inference: InferenceRunner;
broker: Broker;
log: FastifyBaseLogger;
config: Config;
}
const POLL_INTERVAL_MS = 5_000;
const COMPLETION_POLL_MS = 2_000;
export function createDispatcher(deps: Deps): { start(): void; stop(): Promise<void> } {
const { sql, inference, log, config } = deps;
let timer: ReturnType<typeof setInterval> | null = null;
let running = false;
let stopping = false;
let inflightPromise: Promise<void> | null = null;
async function poll(): Promise<void> {
if (running || stopping) return;
// Grab one pending task
const rows = await sql<{ id: string; project_id: string; input: string; agent: string | null; model: string | null }[]>`
SELECT id, project_id, input, agent, model
FROM tasks
WHERE state = 'pending'
ORDER BY created_at
LIMIT 1
`;
if (rows.length === 0) return;
const task = rows[0]!;
running = true;
inflightPromise = runTask(task).finally(() => {
running = false;
inflightPromise = null;
});
}
async function runTask(task: { id: string; project_id: string; input: string; agent: string | null; model: string | null }): Promise<void> {
const taskId = task.id;
log.info({ taskId }, 'dispatcher: starting task');
try {
// Mark running
await sql`
UPDATE tasks
SET state = 'running', started_at = clock_timestamp(), execution_path = 'native'
WHERE id = ${taskId}
`;
// Create session + chat for this task
const model = task.model ?? config.DEFAULT_MODEL;
const sessionName = 'Task: ' + task.input.slice(0, 40);
const [session] = await sql<{ id: string }[]>`
INSERT INTO sessions (project_id, name, model, status)
VALUES (${task.project_id}, ${sessionName}, ${model}, 'open')
RETURNING id
`;
const sessionId = session!.id;
const [chat] = await sql<{ id: string }[]>`
INSERT INTO chats (session_id, name, status)
VALUES (${sessionId}, 'Task execution', 'open')
RETURNING id
`;
const chatId = chat!.id;
// Link task to session
await sql`UPDATE tasks SET session_id = ${sessionId} WHERE id = ${taskId}`;
// Create user message + streaming assistant
await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'user', ${task.input}, 'complete', clock_timestamp())
RETURNING id
`;
const [assistantMsg] = await sql<{ id: string }[]>`
INSERT INTO messages (session_id, chat_id, role, content, status, created_at)
VALUES (${sessionId}, ${chatId}, 'assistant', '', 'streaming', clock_timestamp())
RETURNING id
`;
const assistantId = assistantMsg!.id;
// Enqueue inference
inference.enqueue(sessionId, chatId, assistantId, 'default');
// Wait for inference to complete (poll message status)
const finalStatus = await waitForCompletion(assistantId);
if (stopping) {
// Graceful shutdown — mark cancelled
await sql`
UPDATE tasks
SET state = 'cancelled', ended_at = clock_timestamp()
WHERE id = ${taskId}
`;
return;
}
if (finalStatus === 'complete') {
// Grab assistant content for output_summary
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
const summary = (msg?.content ?? '').slice(0, 500);
await sql`
UPDATE tasks
SET state = 'completed', ended_at = clock_timestamp(), output_summary = ${summary}
WHERE id = ${taskId}
`;
log.info({ taskId }, 'dispatcher: task completed');
} else {
// failed or cancelled
const [msg] = await sql<{ content: string | null }[]>`
SELECT content FROM messages WHERE id = ${assistantId}
`;
const summary = (msg?.content ?? 'Inference failed').slice(0, 500);
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${summary}
WHERE id = ${taskId}
`;
log.warn({ taskId, finalStatus }, 'dispatcher: task failed');
}
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ taskId, err: errMsg }, 'dispatcher: task error');
await sql`
UPDATE tasks
SET state = 'failed', ended_at = clock_timestamp(), output_summary = ${errMsg.slice(0, 500)}
WHERE id = ${taskId}
`.catch(() => {}); // best-effort
}
}
async function waitForCompletion(assistantId: string): Promise<string> {
// Poll until the assistant message is no longer streaming
for (;;) {
if (stopping) return 'cancelled';
const [row] = await sql<{ status: string }[]>`
SELECT status FROM messages WHERE id = ${assistantId}
`;
const status = row?.status ?? 'failed';
if (status !== 'streaming') return status;
await sleep(COMPLETION_POLL_MS);
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
return {
start() {
log.info('dispatcher: starting poll loop');
timer = setInterval(() => {
poll().catch((err) => {
log.error({ err }, 'dispatcher: poll error');
});
}, POLL_INTERVAL_MS);
},
async stop() {
stopping = true;
if (timer) {
clearInterval(timer);
timer = null;
}
if (inflightPromise) {
log.info('dispatcher: waiting for in-flight task');
await inflightPromise;
}
log.info('dispatcher: stopped');
},
};
}