/** * claude-sdk-sessionstore #9 (Part 2) — a tiny PURE pushable async-iterable. * * The Claude Agent SDK's streaming-input mode wants `query({ prompt })` where * `prompt` is an `AsyncIterable`. To keep ONE `query()` generator * alive across many turns (the "warm" property), the backend feeds it ONE user * message per `prompt()` turn through a queue that stays open between turns and is * only closed at `closeSession`/`dispose`. This is that queue. * * Semantics (the bit worth unit-testing — push/close/iterate ordering): * - `push(v)` enqueues a value. If a consumer is parked in `await next()`, it's * handed the value immediately; otherwise the value buffers in FIFO order. * - The async iterator yields buffered/pushed values in push order, and PARKS * (never busy-loops) when the buffer is empty — so the SDK generator waits for * the next turn's message instead of seeing end-of-input. * - `close()` ends the iterable: any parked consumer resolves `{done:true}` and * all future `next()`s return done. Values pushed after close are dropped. * - It's single-consumer (one `query()` reads it); concurrent consumers are not a * supported shape and not needed here. * * No SDK import — generic over the pushed value `T` — so the pure push/close/iterate * ordering is testable without the `SDKUserMessage` shape or a live binary. */ export interface Pushable { /** Enqueue a value (or hand it to a parked consumer). No-op after close. */ push(value: T): void; /** End the iterable. Idempotent; a parked consumer resolves done. */ close(): void; /** True once `close()` has been called. */ readonly closed: boolean; /** The async-iterable the consumer (the SDK `query`) drives. */ readonly iterable: AsyncIterable; } export function createPushable(): Pushable { const buffer: T[] = []; // A waiting consumer's resolver (null when none is parked). Single-consumer. let pendingResolve: ((res: IteratorResult) => void) | null = null; let closed = false; function push(value: T): void { if (closed) return; if (pendingResolve) { const resolve = pendingResolve; pendingResolve = null; resolve({ value, done: false }); return; } buffer.push(value); } function close(): void { if (closed) return; closed = true; if (pendingResolve) { const resolve = pendingResolve; pendingResolve = null; resolve({ value: undefined, done: true }); } } const iterator: AsyncIterator = { next(): Promise> { // Drain the buffer first (FIFO), regardless of close — buffered values // pushed before close are still delivered. if (buffer.length > 0) { return Promise.resolve({ value: buffer.shift() as T, done: false }); } if (closed) { return Promise.resolve({ value: undefined, done: true }); } // Park until the next push/close. Single-consumer: only one waiter at a time. return new Promise>((resolve) => { pendingResolve = resolve; }); }, return(): Promise> { // Consumer abandoned the loop (e.g. `break`) → close so a later push no-ops. close(); return Promise.resolve({ value: undefined, done: true }); }, }; return { push, close, get closed() { return closed; }, iterable: { [Symbol.asyncIterator]() { return iterator; }, }, }; }