#!/usr/bin/env python3 """MTP n_max sweep across MTP-capable models via llama-sidecar. Usage: python3 run_sweep.py # full sweep python3 run_sweep.py --dry-run # print matrix, no API calls python3 run_sweep.py --limit 1 # run first combo only (smoke) """ import argparse import json import os import sys import time from datetime import datetime, timezone from pathlib import Path from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError SIDECAR_URL = os.environ.get("SIDECAR_URL", "http://100.101.41.16:8402") RESULTS_PATH = Path(__file__).parent / "results.json" MATRIX = [ ("qwen3.6-35b-a3b-mxfp4", [0, 1, 2, 3]), ("qwen3.6-27b-mtp", [0, 1, 2, 3, 4]), ("qwopus3.6-27b-v2-mtp", [0, 2]), ("qwopus3.5-9b-coder-mtp", [0, 2]), ] PROMPTS = { "short": { "content": "Reply with exactly five words: a haiku-like greeting.", "max_tokens": 100, }, "medium": { "content": ( "Explain how multi-token prediction speculative decoding works in transformer " "inference. Cover: 1) the draft model role, 2) the verification mechanism, " "3) acceptance rate dynamics, 4) why MoE models gain less than dense models. " "Aim for 400-500 words." ), "max_tokens": 700, }, "long": { "content": ( "Write a complete Python implementation of a simple HTTP server that " "accepts POST requests on /v1/chat/completions, validates JSON bodies " "against a basic OpenAI schema, logs each request to stdout in JSON " "format, and returns a hardcoded streaming response. Include error " "handling for malformed JSON, missing required fields, and unsupported " "methods. Add docstrings and type hints throughout. Show full file." ), "max_tokens": 2500, }, } def build_flags(n_max: int) -> str: if n_max > 0: return f"--spec-type draft-mtp --spec-draft-n-max {n_max} --repeat-penalty 1.0" return "--repeat-penalty 1.0" def sidecar_request(method: str, path: str, body: dict | None = None, headers: dict | None = None, timeout: int = 180) -> dict | None: url = f"{SIDECAR_URL}{path}" data = json.dumps(body).encode() if body else None hdrs = {"Content-Type": "application/json"} if headers: hdrs.update(headers) req = Request(url, data=data, headers=hdrs, method=method) try: with urlopen(req, timeout=timeout) as resp: return json.loads(resp.read()) except HTTPError as e: body_text = e.read().decode(errors="replace") try: return json.loads(body_text) except json.JSONDecodeError: return {"error": f"HTTP {e.code}", "body": body_text[:500]} except URLError as e: return {"error": str(e)} def send_completion(model: str, flags: str, prompt: str, max_tokens: int) -> dict: body = { "model": model, "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "stream": False, } headers = { "X-Agent-Flags": flags, "X-Model-Id": model, } t0 = time.perf_counter() resp = sidecar_request("POST", "/v1/chat/completions", body=body, headers=headers) wall_ms = (time.perf_counter() - t0) * 1000 if resp is None: return {"error": "no response", "wall_clock_ms": wall_ms} resp["wall_clock_ms"] = wall_ms return resp def extract_metrics(resp: dict, model: str, n_max: int, prompt_name: str) -> dict: timings = resp.get("timings", {}) usage = resp.get("usage", {}) sidecars = sidecar_request("GET", "/sidecars") or [] sidecar_hash = "" sidecar_port = 0 if isinstance(sidecars, list): for s in sidecars: if s.get("model_id") == model: sidecar_hash = s.get("hash", "") sidecar_port = s.get("port", 0) break return { "model": model, "n_max": n_max, "prompt": prompt_name, "timestamp_utc": datetime.now(timezone.utc).isoformat(), "completion_tokens": usage.get("completion_tokens"), "prompt_tokens": usage.get("prompt_tokens"), "eval_tok_s": timings.get("predicted_per_second"), "prompt_tok_s": timings.get("prompt_per_second"), "eval_ms": timings.get("predicted_ms"), "prompt_ms": timings.get("prompt_ms"), "draft_n": timings.get("draft_n"), "draft_n_accepted": timings.get("draft_n_accepted"), "wall_clock_ms": resp.get("wall_clock_ms"), "sidecar_hash": sidecar_hash, "sidecar_port": sidecar_port, "error": resp.get("error"), } def append_result(row: dict) -> None: results = [] if RESULTS_PATH.exists(): try: results = json.loads(RESULTS_PATH.read_text()) except (json.JSONDecodeError, OSError): pass results.append(row) RESULTS_PATH.write_text(json.dumps(results, indent=2) + "\n") def evict_all_sidecars() -> None: sidecars = sidecar_request("GET", "/sidecars") if not isinstance(sidecars, list): return for s in sidecars: h = s.get("hash", "") if h: sidecar_request("DELETE", f"/sidecars/{h}") def run_combo(model: str, n_max: int, combo_idx: int, total_combos: int, prompt_names: list[str]) -> None: flags = build_flags(n_max) label = f"[{combo_idx}/{total_combos}] {model} n_max={n_max}" print(f"\n{'='*60}") print(f"{label}") print(f" flags: {flags}") print(f"{'='*60}") for pname in prompt_names: p = PROMPTS[pname] # Warmup print(f" {pname}: warmup...", end="", flush=True) send_completion(model, flags, p["content"], p["max_tokens"]) print(" done.", flush=True) time.sleep(2) # Record print(f" {pname}: recording...", end="", flush=True) resp = send_completion(model, flags, p["content"], p["max_tokens"]) row = extract_metrics(resp, model, n_max, pname) append_result(row) tok_s = row.get("eval_tok_s") draft = row.get("draft_n") err = row.get("error") if err: print(f" ERROR: {err}") elif tok_s: draft_str = f" draft_n={draft}" if draft else "" print(f" {tok_s:.1f} tok/s{draft_str}") else: print(" (no timings in response)") # Evict this sidecar to free VRAM evict_all_sidecars() print(f" evicted sidecars, sleeping 5s for VRAM release...") time.sleep(5) def dry_run() -> None: combos = [(model, n) for model, ns in MATRIX for n in ns] print(f"Dry run: {len(combos)} combos × 3 prompts × 2 calls = {len(combos)*6} API calls") print(f"Estimated runtime: 60-90 minutes\n") for i, (model, n_max) in enumerate(combos, 1): flags = build_flags(n_max) print(f" [{i}/{len(combos)}] {model} n_max={n_max}") print(f" flags: {flags}") for pname in PROMPTS: p = PROMPTS[pname] print(f" {pname}: max_tokens={p['max_tokens']}") print(f"\nResults would be written to: {RESULTS_PATH}") def main() -> None: parser = argparse.ArgumentParser(description="MTP n_max sweep benchmark") parser.add_argument("--dry-run", action="store_true", help="Print matrix without running") parser.add_argument("--limit", type=int, default=0, help="Run only first N combos") args = parser.parse_args() if args.dry_run: dry_run() return # Check sidecar health health = sidecar_request("GET", "/health") if not health or health.get("status") != "ok": print(f"Sidecar unhealthy: {health}", file=sys.stderr) sys.exit(1) print(f"Sidecar healthy: {health}") # Clear existing sidecars evict_all_sidecars() combos = [(model, n) for model, ns in MATRIX for n in ns] if args.limit > 0: combos = combos[:args.limit] prompt_names = list(PROMPTS.keys()) t_start = time.perf_counter() for i, (model, n_max) in enumerate(combos, 1): run_combo(model, n_max, i, len(combos), prompt_names) elapsed = time.perf_counter() - t_start print(f"\nSweep complete. {len(combos)} combos in {elapsed/60:.1f} minutes.") print(f"Results: {RESULTS_PATH}") if __name__ == "__main__": main()