diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 7e88fdb..e2518fa 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -20,6 +20,7 @@ import { createInferenceRunner } from './services/inference.js'; import { createBroker } from './services/broker.js'; import { listSkills } from './services/skills.js'; import * as compaction from './services/compaction.js'; +import { configureModelContext } from './services/model-context.js'; async function main() { const config = loadConfig(); @@ -48,6 +49,11 @@ async function main() { await applySchema(sql); app.log.info('database schema applied'); + // v1.11.3: tell the model-context cache where llama-swap lives. Cache + // lookups go to ${LLAMA_SWAP_URL}/upstream//props to read + // default_generation_settings.n_ctx — the value persisted as messages.ctx_max. + configureModelContext({ llamaSwapUrl: config.LLAMA_SWAP_URL }); + await app.register(fastifyWebsocket); app.get('/api/health', async () => { diff --git a/apps/server/src/services/__tests__/model-context.test.ts b/apps/server/src/services/__tests__/model-context.test.ts new file mode 100644 index 0000000..abff31e --- /dev/null +++ b/apps/server/src/services/__tests__/model-context.test.ts @@ -0,0 +1,205 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + configureModelContext, + getModelContext, + invalidateModelContext, +} from '../model-context.js'; + +// ---- fixtures --------------------------------------------------------------- + +const TEST_URL = 'http://llama-swap.test:8401'; + +function mockOkProps(n_ctx: number, total_slots = 1) { + return new Response( + JSON.stringify({ + default_generation_settings: { n_ctx }, + total_slots, + }), + { status: 200, headers: { 'Content-Type': 'application/json' } }, + ); +} + +beforeEach(() => { + invalidateModelContext(); + configureModelContext({ llamaSwapUrl: TEST_URL }); +}); + +afterEach(() => { + vi.restoreAllMocks(); + vi.useRealTimers(); +}); + +// ---- positive cache --------------------------------------------------------- + +describe('getModelContext — positive cache', () => { + it('returns the parsed body on a 200 with valid shape', async () => { + const fetchSpy = vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(mockOkProps(262_144, 1)); + const result = await getModelContext('qwen3.6'); + expect(result).not.toBeNull(); + expect(result!.n_ctx).toBe(262_144); + expect(result!.total_slots).toBe(1); + expect(typeof result!.fetched_at).toBe('number'); + // Verify the URL was constructed correctly — encodes the model name in + // case it contains characters that would break the path. + expect(fetchSpy).toHaveBeenCalledExactlyOnceWith( + `${TEST_URL}/upstream/qwen3.6/props`, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); + + it('serves the second call from cache without refetching', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockResolvedValueOnce(mockOkProps(262_144)); + const a = await getModelContext('qwen3.6'); + const b = await getModelContext('qwen3.6'); + expect(a).toEqual(b); + expect(fetchSpy).toHaveBeenCalledTimes(1); + }); + + it('defaults total_slots to 1 when the server omits it', async () => { + // Mirror the docstring claim — total_slots is informational and we don't + // reject the response just because it's missing. + vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce( + new Response(JSON.stringify({ default_generation_settings: { n_ctx: 8192 } }), { + status: 200, + }), + ); + const result = await getModelContext('partial-model'); + expect(result).not.toBeNull(); + expect(result!.n_ctx).toBe(8192); + expect(result!.total_slots).toBe(1); + }); +}); + +// ---- negative cache (single-shot) ------------------------------------------ + +describe('getModelContext — negative cache (single failure modes)', () => { + it('returns null and negative-caches when default_generation_settings is missing', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockResolvedValueOnce(new Response(JSON.stringify({ total_slots: 1 }), { status: 200 })); + const result = await getModelContext('broken'); + expect(result).toBeNull(); + // Second call within TTL must not refetch. + const result2 = await getModelContext('broken'); + expect(result2).toBeNull(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + }); + + it('returns null and negative-caches when n_ctx is missing inside default_generation_settings', async () => { + const fetchSpy = vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce( + new Response(JSON.stringify({ default_generation_settings: {}, total_slots: 1 }), { + status: 200, + }), + ); + await getModelContext('half-broken'); + await getModelContext('half-broken'); + expect(fetchSpy).toHaveBeenCalledTimes(1); + }); + + it('returns null and negative-caches on non-200 (404)', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockResolvedValueOnce(new Response('not found', { status: 404 })); + const result = await getModelContext('missing-model'); + expect(result).toBeNull(); + const result2 = await getModelContext('missing-model'); + expect(result2).toBeNull(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + }); + + it('returns null and negative-caches on network error', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockRejectedValueOnce(new TypeError('fetch failed: connect ECONNREFUSED')); + const result = await getModelContext('down-upstream'); + expect(result).toBeNull(); + const result2 = await getModelContext('down-upstream'); + expect(result2).toBeNull(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + }); +}); + +// ---- negative cache TTL ----------------------------------------------------- + +describe('getModelContext — negative cache TTL', () => { + it('does NOT refetch when a second call lands within the 60s TTL', async () => { + vi.useFakeTimers(); + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockResolvedValueOnce(new Response('boom', { status: 500 })); + + await getModelContext('flapping'); + vi.advanceTimersByTime(30_000); + await getModelContext('flapping'); + expect(fetchSpy).toHaveBeenCalledTimes(1); + }); + + it('refetches when the second call lands after the 60s TTL expires', async () => { + vi.useFakeTimers(); + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockResolvedValueOnce(new Response('boom', { status: 500 })) + // Recovered upstream on the retry — we expect a positive cache hit + // after this fires. + .mockResolvedValueOnce(mockOkProps(8192)); + + await getModelContext('flapping'); + vi.advanceTimersByTime(61_000); + const result = await getModelContext('flapping'); + expect(result).not.toBeNull(); + expect(result!.n_ctx).toBe(8192); + expect(fetchSpy).toHaveBeenCalledTimes(2); + }); +}); + +// ---- invalidateModelContext ------------------------------------------------- + +describe('invalidateModelContext', () => { + it('clears a single positive entry by model name', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockResolvedValueOnce(mockOkProps(8192)) + .mockResolvedValueOnce(mockOkProps(8192)); + + await getModelContext('cleared'); + invalidateModelContext('cleared'); + await getModelContext('cleared'); + expect(fetchSpy).toHaveBeenCalledTimes(2); + }); + + it('clears ALL entries when called with no arg', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockResolvedValueOnce(mockOkProps(8192)) + .mockResolvedValueOnce(mockOkProps(16_384)) + // After the full clear, both models re-fetch. + .mockResolvedValueOnce(mockOkProps(8192)) + .mockResolvedValueOnce(mockOkProps(16_384)); + + await getModelContext('alpha'); + await getModelContext('beta'); + invalidateModelContext(); + await getModelContext('alpha'); + await getModelContext('beta'); + expect(fetchSpy).toHaveBeenCalledTimes(4); + }); + + it('clearing a positive entry also clears the matching negative entry', async () => { + // Mixed state: first call fails (negative-caches), then we invalidate + // explicitly and the next call should fetch again rather than serve + // the stale negative entry. + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockResolvedValueOnce(new Response('boom', { status: 500 })) + .mockResolvedValueOnce(mockOkProps(4096)); + + await getModelContext('formerly-broken'); + invalidateModelContext('formerly-broken'); + const result = await getModelContext('formerly-broken'); + expect(result).not.toBeNull(); + expect(result!.n_ctx).toBe(4096); + expect(fetchSpy).toHaveBeenCalledTimes(2); + }); +}); diff --git a/apps/server/src/services/compaction.ts b/apps/server/src/services/compaction.ts index e954bb6..ebbaf16 100644 --- a/apps/server/src/services/compaction.ts +++ b/apps/server/src/services/compaction.ts @@ -21,6 +21,7 @@ import type { Sql } from '../db.js'; import type { Config } from '../config.js'; import type { Broker } from './broker.js'; import { SUMMARY_TEMPLATE } from './compaction-prompt.js'; +import * as modelContextLookup from './model-context.js'; const COMPACTION_BUFFER = 20_000; const MIN_PRESERVE_RECENT_TOKENS = 2_000; @@ -271,7 +272,6 @@ interface CompletionResult { content: string; promptTokens: number; completionTokens: number; - nCtx: number | null; } async function callLlamaSwap( @@ -292,14 +292,15 @@ async function callLlamaSwap( const json = (await res.json()) as { choices?: Array<{ message?: { content?: string } }>; usage?: { prompt_tokens?: number; completion_tokens?: number }; - timings?: { n_ctx?: number }; }; + // v1.11.3: removed the dead `json.timings?.n_ctx` read — llama-server's + // completions don't emit n_ctx in timings. ctx_max on the summary row + // comes from model-context.getModelContext below in process(). const content = json.choices?.[0]?.message?.content ?? ''; const promptTokens = json.usage?.prompt_tokens ?? 0; const completionTokens = json.usage?.completion_tokens ?? 0; - const nCtx = typeof json.timings?.n_ctx === 'number' ? json.timings.n_ctx : null; - log.debug({ promptTokens, completionTokens, nCtx, chars: content.length }, 'compaction llm complete'); - return { content, promptTokens, completionTokens, nCtx }; + log.debug({ promptTokens, completionTokens, chars: content.length }, 'compaction llm complete'); + return { content, promptTokens, completionTokens }; } // === entry point === @@ -422,6 +423,12 @@ export async function process(input: ProcessInput): Promise { // 7. Single completion (no tools). Throws on llama-swap failure. result = await callLlamaSwap(config, session.model, payload, log); + // 7b. v1.11.3: fetch the model's true context window from llama-swap's + // /upstream//props (the streaming completion doesn't carry it). + // Same pattern as inference.ts; the cache makes repeated calls free. + const mctx = await modelContextLookup.getModelContext(session.model); + const nCtx = mctx?.n_ctx ?? null; + // 8. Insert the new anchored summary row. role='assistant' per spec; the // UI distinguishes via summary=true. tail_start_id points at the first // preserved tail message so debug surfaces / future tools can reason @@ -436,7 +443,7 @@ export async function process(input: ProcessInput): Promise { VALUES ( ${sessionId}, ${chatId}, 'assistant', ${result.content}, 'message', 'complete', true, ${sel.tail_start_id}, - ${result.completionTokens}, ${result.promptTokens}, ${result.nCtx}, + ${result.completionTokens}, ${result.promptTokens}, ${nCtx}, clock_timestamp(), clock_timestamp() ) RETURNING id diff --git a/apps/server/src/services/inference.ts b/apps/server/src/services/inference.ts index 9767cdd..02b59df 100644 --- a/apps/server/src/services/inference.ts +++ b/apps/server/src/services/inference.ts @@ -22,6 +22,7 @@ import { PathScopeError, resolveProjectRoot } from './path_guard.js'; import { maybeAutoNameChat } from './auto_name.js'; import { getAgentById } from './agents.js'; import * as compaction from './compaction.js'; +import * as modelContext from './model-context.js'; import type { Broker } from './broker.js'; const BASE_SYSTEM_PROMPT = (projectPath: string) => @@ -138,9 +139,6 @@ interface ChatCompletionChunk { completion_tokens?: number; total_tokens?: number; }; - timings?: { - n_ctx?: number; - }; } export interface InferenceContext { @@ -339,7 +337,6 @@ interface StreamResult { toolCalls: ToolCall[]; promptTokens: number | null; completionTokens: number | null; - nCtx: number | null; } interface StreamOptions { @@ -454,7 +451,6 @@ async function streamCompletion( let finishReason: string | null = null; let promptTokens: number | null = null; let completionTokens: number | null = null; - let nCtx: number | null = null; const toolCallsBuffer = new Map(); for await (const line of sseLines(res.body)) { @@ -476,9 +472,11 @@ async function streamCompletion( completionTokens = parsed.usage.completion_tokens; } } - if (parsed.timings && typeof parsed.timings.n_ctx === 'number') { - nCtx = parsed.timings.n_ctx; - } + // v1.11.3: removed dead `parsed.timings.n_ctx` read. llama-server's + // streaming completion does NOT emit n_ctx in timings (verified + // empirically); the authoritative source is llama-swap's + // /upstream//props endpoint, fetched per-turn via + // model-context.getModelContext() at the finalization sites below. const choice = parsed.choices?.[0]; if (!choice) continue; @@ -564,7 +562,7 @@ async function streamCompletion( toolCalls.push({ id: t.id || `call_${toolCalls.length}`, name: t.name, args }); } - return { finishReason, content, toolCalls, promptTokens, completionTokens, nCtx }; + return { finishReason, content, toolCalls, promptTokens, completionTokens }; } async function executeToolCall( @@ -781,7 +779,14 @@ async function executeToolPhase( projectRoot: string ): Promise { const { sessionId, chatId, assistantMessageId, toolsUsed, signal } = args; - const { content, toolCalls, promptTokens, completionTokens, nCtx } = result; + const { content, toolCalls, promptTokens, completionTokens } = result; + + // v1.11.3: ctx_max comes from llama-swap /upstream//props, not the + // streaming completion (which doesn't emit n_ctx). getModelContext caches + // the positive lookup for the process lifetime, so this is a single Map + // hit after the first invocation per model. + const mctx = await modelContext.getModelContext(session.model); + const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] @@ -917,7 +922,11 @@ async function finalizeCompletion( session: Session ): Promise { const { sessionId, chatId, assistantMessageId } = args; - const { content, finishReason, promptTokens, completionTokens, nCtx } = result; + const { content, finishReason, promptTokens, completionTokens } = result; + + // v1.11.3: see executeToolPhase for the rationale. + const mctx = await modelContext.getModelContext(session.model); + const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] @@ -1150,6 +1159,9 @@ async function runCapHitSummary( // even on a partial / failed summary the chat history shows where the // budget was hit. if (summaryOk && result) { + // v1.11.3: see executeToolPhase for the rationale. + const mctx = await modelContext.getModelContext(session.model); + const nCtx = mctx?.n_ctx ?? null; const [updated] = await ctx.sql< { tokens_used: number | null; ctx_used: number | null; ctx_max: number | null; finished_at: string | null }[] >` @@ -1158,7 +1170,7 @@ async function runCapHitSummary( status = 'complete', tokens_used = ${result.completionTokens}, ctx_used = ${result.promptTokens}, - ctx_max = ${result.nCtx}, + ctx_max = ${nCtx}, finished_at = clock_timestamp() WHERE id = ${assistantMessageId} RETURNING tokens_used, ctx_used, ctx_max, finished_at diff --git a/apps/server/src/services/model-context.ts b/apps/server/src/services/model-context.ts new file mode 100644 index 0000000..66594b3 --- /dev/null +++ b/apps/server/src/services/model-context.ts @@ -0,0 +1,113 @@ +// v1.11.3: llama-swap model-context cache. Replaces the dead +// `parsed.timings.n_ctx` capture in inference.ts / compaction.ts — +// llama-server's streaming completion never emits n_ctx in timings (verified +// empirically: timings carries prompt_n / predicted_n / *_ms / *_per_second +// only). The authoritative source is llama-swap's +// /upstream//props endpoint at .default_generation_settings.n_ctx. +// +// Cache design: +// - Positive entries (n_ctx + total_slots) have no TTL. A model's context +// size doesn't change while llama-swap is running; an admin endpoint +// can invalidateModelContext() if it ever does. +// - Negative entries (failed fetch) have a 60s TTL so a misconfigured or +// down model doesn't get hammered every inference turn, but recovers +// within a minute once the upstream comes back. +// - 3s AbortController timeout on the fetch — long enough for a healthy +// upstream, short enough that a stuck upstream doesn't block the +// ctx_max UPDATE that follows. + +export interface ModelContext { + n_ctx: number; + total_slots: number; + fetched_at: number; +} + +const NEGATIVE_TTL_MS = 60_000; +const FETCH_TIMEOUT_MS = 3_000; + +const positiveCache = new Map(); +// Value is the unix-ms timestamp of the last failed fetch. Used to gate +// re-fetches within the 60s window. +const negativeCache = new Map(); + +// Set once at startup by index.ts. We don't import loadConfig() directly +// here to keep this module trivially mockable in tests (set the URL in +// beforeEach instead of stubbing process.env + loadConfig's cache). +let llamaSwapUrl: string | null = null; + +export function configureModelContext(opts: { llamaSwapUrl: string }): void { + llamaSwapUrl = opts.llamaSwapUrl; +} + +export async function getModelContext(model: string): Promise { + // 1. Positive cache hit — no TTL check, model n_ctx is invariant. + const pos = positiveCache.get(model); + if (pos) return pos; + + // 2. Negative cache hit within TTL — return null without refetching. + // Stale negative entries (older than the TTL) fall through to a fresh + // attempt below; we don't delete them eagerly because the next successful + // fetch will overwrite via the positive map and the negative entry + // becomes irrelevant. + const negTs = negativeCache.get(model); + if (negTs !== undefined && Date.now() - negTs < NEGATIVE_TTL_MS) { + return null; + } + + // 3. Module not initialized. Defensive — index.ts calls + // configureModelContext at startup; if a test forgets, fail closed so + // the chat still works (ctx_max stays null, UI degrades gracefully). + if (!llamaSwapUrl) { + negativeCache.set(model, Date.now()); + return null; + } + + // 4. Fetch with timeout. AbortController fires after FETCH_TIMEOUT_MS; + // both the timeout path and a fetch reject end up in the catch below + // and produce a negative cache entry. + const url = `${llamaSwapUrl}/upstream/${encodeURIComponent(model)}/props`; + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); + try { + const res = await fetch(url, { signal: controller.signal }); + clearTimeout(timer); + if (!res.ok) { + negativeCache.set(model, Date.now()); + return null; + } + const body = (await res.json()) as { + default_generation_settings?: { n_ctx?: number }; + total_slots?: number; + }; + const n_ctx = body?.default_generation_settings?.n_ctx; + if (typeof n_ctx !== 'number' || n_ctx <= 0) { + negativeCache.set(model, Date.now()); + return null; + } + // total_slots is informational; default to 1 if missing rather than + // reject the whole response. Most local llama-swap setups run a + // single slot anyway. + const total_slots = + typeof body?.total_slots === 'number' && body.total_slots > 0 ? body.total_slots : 1; + const entry: ModelContext = { n_ctx, total_slots, fetched_at: Date.now() }; + positiveCache.set(model, entry); + // Clear any stale negative entry so a future query sees the positive + // hit cleanly (otherwise the negative TTL never expires from the map). + negativeCache.delete(model); + return entry; + } catch { + clearTimeout(timer); + negativeCache.set(model, Date.now()); + return null; + } +} + +export function invalidateModelContext(model?: string): void { + if (model === undefined) { + positiveCache.clear(); + negativeCache.clear(); + } else { + positiveCache.delete(model); + negativeCache.delete(model); + } +}