Go daemon (cmd/llama-sidecar): per-agent llama-server process pool with LRU eviction, OpenAI-compatible proxy, flag validation (Unsloth port), deterministic hash-keyed sidecar reuse. Windows service support via schtasks/NSSM with DETACHED_PROCESS, stdout pipe drain, and request-ctx decoupled child lifetime. Bug fixes (3b.1–3b5): -c flag drop from StripShadowingFlags, UTF-8 BOM in JSON config, -fa → --flash-attn on default, child process exit after one request (stdin devnull, stdout pipe, CREATE_NO_WINDOW → DETACHED, context.Background for child lifetime, background reaper goroutine). bench/: MTP on/off throughput sweep across 8 GGUFs via SSH+schtasks automation to sam-desktop. Per-GGUF production flags from llama-swap config with --ctx-size 32768 override. eval/: accuracy benchmarks (MMLU 100q, GSM8K 50q, HumanEval 164) + A/B model comparison (14 agent-typed prompts × 8 models). All scripts resumable at individual question level. 94 Go tests, race detector clean. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
249 lines
8.3 KiB
Python
249 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
||
"""MTP n_max sweep across MTP-capable models via llama-sidecar.
|
||
|
||
Usage:
|
||
python3 run_sweep.py # full sweep
|
||
python3 run_sweep.py --dry-run # print matrix, no API calls
|
||
python3 run_sweep.py --limit 1 # run first combo only (smoke)
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import sys
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from urllib.request import Request, urlopen
|
||
from urllib.error import URLError, HTTPError
|
||
|
||
SIDECAR_URL = os.environ.get("SIDECAR_URL", "http://100.101.41.16:8402")
|
||
RESULTS_PATH = Path(__file__).parent / "results.json"
|
||
|
||
MATRIX = [
|
||
("qwen3.6-35b-a3b-mxfp4", [0, 1, 2, 3]),
|
||
("qwen3.6-27b-mtp", [0, 1, 2, 3, 4]),
|
||
("qwopus3.6-27b-v2-mtp", [0, 2]),
|
||
("qwopus3.5-9b-coder-mtp", [0, 2]),
|
||
]
|
||
|
||
PROMPTS = {
|
||
"short": {
|
||
"content": "Reply with exactly five words: a haiku-like greeting.",
|
||
"max_tokens": 100,
|
||
},
|
||
"medium": {
|
||
"content": (
|
||
"Explain how multi-token prediction speculative decoding works in transformer "
|
||
"inference. Cover: 1) the draft model role, 2) the verification mechanism, "
|
||
"3) acceptance rate dynamics, 4) why MoE models gain less than dense models. "
|
||
"Aim for 400-500 words."
|
||
),
|
||
"max_tokens": 700,
|
||
},
|
||
"long": {
|
||
"content": (
|
||
"Write a complete Python implementation of a simple HTTP server that "
|
||
"accepts POST requests on /v1/chat/completions, validates JSON bodies "
|
||
"against a basic OpenAI schema, logs each request to stdout in JSON "
|
||
"format, and returns a hardcoded streaming response. Include error "
|
||
"handling for malformed JSON, missing required fields, and unsupported "
|
||
"methods. Add docstrings and type hints throughout. Show full file."
|
||
),
|
||
"max_tokens": 2500,
|
||
},
|
||
}
|
||
|
||
|
||
def build_flags(n_max: int) -> str:
|
||
if n_max > 0:
|
||
return f"--spec-type draft-mtp --spec-draft-n-max {n_max} --repeat-penalty 1.0"
|
||
return "--repeat-penalty 1.0"
|
||
|
||
|
||
def sidecar_request(method: str, path: str, body: dict | None = None,
|
||
headers: dict | None = None, timeout: int = 180) -> dict | None:
|
||
url = f"{SIDECAR_URL}{path}"
|
||
data = json.dumps(body).encode() if body else None
|
||
hdrs = {"Content-Type": "application/json"}
|
||
if headers:
|
||
hdrs.update(headers)
|
||
req = Request(url, data=data, headers=hdrs, method=method)
|
||
try:
|
||
with urlopen(req, timeout=timeout) as resp:
|
||
return json.loads(resp.read())
|
||
except HTTPError as e:
|
||
body_text = e.read().decode(errors="replace")
|
||
try:
|
||
return json.loads(body_text)
|
||
except json.JSONDecodeError:
|
||
return {"error": f"HTTP {e.code}", "body": body_text[:500]}
|
||
except URLError as e:
|
||
return {"error": str(e)}
|
||
|
||
|
||
def send_completion(model: str, flags: str, prompt: str, max_tokens: int) -> dict:
|
||
body = {
|
||
"model": model,
|
||
"messages": [{"role": "user", "content": prompt}],
|
||
"max_tokens": max_tokens,
|
||
"stream": False,
|
||
}
|
||
headers = {
|
||
"X-Agent-Flags": flags,
|
||
"X-Model-Id": model,
|
||
}
|
||
t0 = time.perf_counter()
|
||
resp = sidecar_request("POST", "/v1/chat/completions", body=body, headers=headers)
|
||
wall_ms = (time.perf_counter() - t0) * 1000
|
||
if resp is None:
|
||
return {"error": "no response", "wall_clock_ms": wall_ms}
|
||
resp["wall_clock_ms"] = wall_ms
|
||
return resp
|
||
|
||
|
||
def extract_metrics(resp: dict, model: str, n_max: int, prompt_name: str) -> dict:
|
||
timings = resp.get("timings", {})
|
||
usage = resp.get("usage", {})
|
||
sidecars = sidecar_request("GET", "/sidecars") or []
|
||
sidecar_hash = ""
|
||
sidecar_port = 0
|
||
if isinstance(sidecars, list):
|
||
for s in sidecars:
|
||
if s.get("model_id") == model:
|
||
sidecar_hash = s.get("hash", "")
|
||
sidecar_port = s.get("port", 0)
|
||
break
|
||
|
||
return {
|
||
"model": model,
|
||
"n_max": n_max,
|
||
"prompt": prompt_name,
|
||
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
|
||
"completion_tokens": usage.get("completion_tokens"),
|
||
"prompt_tokens": usage.get("prompt_tokens"),
|
||
"eval_tok_s": timings.get("predicted_per_second"),
|
||
"prompt_tok_s": timings.get("prompt_per_second"),
|
||
"eval_ms": timings.get("predicted_ms"),
|
||
"prompt_ms": timings.get("prompt_ms"),
|
||
"draft_n": timings.get("draft_n"),
|
||
"draft_n_accepted": timings.get("draft_n_accepted"),
|
||
"wall_clock_ms": resp.get("wall_clock_ms"),
|
||
"sidecar_hash": sidecar_hash,
|
||
"sidecar_port": sidecar_port,
|
||
"error": resp.get("error"),
|
||
}
|
||
|
||
|
||
def append_result(row: dict) -> None:
|
||
results = []
|
||
if RESULTS_PATH.exists():
|
||
try:
|
||
results = json.loads(RESULTS_PATH.read_text())
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
results.append(row)
|
||
RESULTS_PATH.write_text(json.dumps(results, indent=2) + "\n")
|
||
|
||
|
||
def evict_all_sidecars() -> None:
|
||
sidecars = sidecar_request("GET", "/sidecars")
|
||
if not isinstance(sidecars, list):
|
||
return
|
||
for s in sidecars:
|
||
h = s.get("hash", "")
|
||
if h:
|
||
sidecar_request("DELETE", f"/sidecars/{h}")
|
||
|
||
|
||
def run_combo(model: str, n_max: int, combo_idx: int, total_combos: int,
|
||
prompt_names: list[str]) -> None:
|
||
flags = build_flags(n_max)
|
||
label = f"[{combo_idx}/{total_combos}] {model} n_max={n_max}"
|
||
print(f"\n{'='*60}")
|
||
print(f"{label}")
|
||
print(f" flags: {flags}")
|
||
print(f"{'='*60}")
|
||
|
||
for pname in prompt_names:
|
||
p = PROMPTS[pname]
|
||
# Warmup
|
||
print(f" {pname}: warmup...", end="", flush=True)
|
||
send_completion(model, flags, p["content"], p["max_tokens"])
|
||
print(" done.", flush=True)
|
||
time.sleep(2)
|
||
|
||
# Record
|
||
print(f" {pname}: recording...", end="", flush=True)
|
||
resp = send_completion(model, flags, p["content"], p["max_tokens"])
|
||
row = extract_metrics(resp, model, n_max, pname)
|
||
append_result(row)
|
||
|
||
tok_s = row.get("eval_tok_s")
|
||
draft = row.get("draft_n")
|
||
err = row.get("error")
|
||
if err:
|
||
print(f" ERROR: {err}")
|
||
elif tok_s:
|
||
draft_str = f" draft_n={draft}" if draft else ""
|
||
print(f" {tok_s:.1f} tok/s{draft_str}")
|
||
else:
|
||
print(" (no timings in response)")
|
||
|
||
# Evict this sidecar to free VRAM
|
||
evict_all_sidecars()
|
||
print(f" evicted sidecars, sleeping 5s for VRAM release...")
|
||
time.sleep(5)
|
||
|
||
|
||
def dry_run() -> None:
|
||
combos = [(model, n) for model, ns in MATRIX for n in ns]
|
||
print(f"Dry run: {len(combos)} combos × 3 prompts × 2 calls = {len(combos)*6} API calls")
|
||
print(f"Estimated runtime: 60-90 minutes\n")
|
||
for i, (model, n_max) in enumerate(combos, 1):
|
||
flags = build_flags(n_max)
|
||
print(f" [{i}/{len(combos)}] {model} n_max={n_max}")
|
||
print(f" flags: {flags}")
|
||
for pname in PROMPTS:
|
||
p = PROMPTS[pname]
|
||
print(f" {pname}: max_tokens={p['max_tokens']}")
|
||
print(f"\nResults would be written to: {RESULTS_PATH}")
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="MTP n_max sweep benchmark")
|
||
parser.add_argument("--dry-run", action="store_true", help="Print matrix without running")
|
||
parser.add_argument("--limit", type=int, default=0, help="Run only first N combos")
|
||
args = parser.parse_args()
|
||
|
||
if args.dry_run:
|
||
dry_run()
|
||
return
|
||
|
||
# Check sidecar health
|
||
health = sidecar_request("GET", "/health")
|
||
if not health or health.get("status") != "ok":
|
||
print(f"Sidecar unhealthy: {health}", file=sys.stderr)
|
||
sys.exit(1)
|
||
print(f"Sidecar healthy: {health}")
|
||
|
||
# Clear existing sidecars
|
||
evict_all_sidecars()
|
||
|
||
combos = [(model, n) for model, ns in MATRIX for n in ns]
|
||
if args.limit > 0:
|
||
combos = combos[:args.limit]
|
||
prompt_names = list(PROMPTS.keys())
|
||
|
||
t_start = time.perf_counter()
|
||
for i, (model, n_max) in enumerate(combos, 1):
|
||
run_combo(model, n_max, i, len(combos), prompt_names)
|
||
|
||
elapsed = time.perf_counter() - t_start
|
||
print(f"\nSweep complete. {len(combos)} combos in {elapsed/60:.1f} minutes.")
|
||
print(f"Results: {RESULTS_PATH}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|