package pool import ( "bytes" "context" "fmt" "io" "log/slog" "net/http" "os" "os/exec" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/indifferentketchup/llama-sidecar/internal/config" "github.com/indifferentketchup/llama-sidecar/internal/validator" ) type Sidecar struct { Hash string ModelID string ModelPath string Flags []string Port int Pid int StartedAt time.Time LastUsed atomic.Int64 healthy atomic.Bool cmd *exec.Cmd cancel context.CancelFunc done chan error stderr *ringBuffer stopMon context.CancelFunc stdinFile *os.File stdoutR *os.File stdoutFile *os.File } func (s *Sidecar) Healthy() bool { return s.healthy.Load() } func (s *Sidecar) TouchLastUsed() { s.LastUsed.Store(time.Now().UnixNano()) } func (s *Sidecar) LastStderr() string { return s.stderr.String() } // Spawner abstracts sidecar creation for testing. type Spawner interface { Spawn(ctx context.Context, cfg *config.Config, modelID, modelPath string, flags []string, port int, hash string) (*Sidecar, error) Kill(s *Sidecar) error } type RealSpawner struct{} func (rs *RealSpawner) Spawn(ctx context.Context, cfg *config.Config, modelID, modelPath string, flags []string, port int, hash string) (*Sidecar, error) { args := buildArgs(cfg.BaseArgs, modelPath, port, flags) _ = ctx childCtx, cancel := context.WithCancel(context.Background()) cmd := exec.CommandContext(childCtx, cfg.LlamaServerBin, args...) setPlatformAttrs(cmd) devNull, err := os.Open(os.DevNull) if err != nil { cancel() return nil, fmt.Errorf("open devnull: %w", err) } cmd.Stdin = devNull stderr := newRingBuffer(64) prefix := fmt.Sprintf("[sidecar:%s:%d] ", hash[:8], port) cmd.Stderr = io.MultiWriter(stderr, &prefixWriter{prefix: prefix}) stdoutR, stdoutW, err := os.Pipe() if err != nil { cancel() devNull.Close() return nil, fmt.Errorf("stdout pipe: %w", err) } go io.Copy(io.Discard, stdoutR) cmd.Stdout = stdoutW slog.Info("spawning sidecar", "hash", hash, "model", modelID, "port", port, "args", strings.Join(args, " ")) if err := cmd.Start(); err != nil { cancel() return nil, fmt.Errorf("spawn failed: %w", err) } s := &Sidecar{ Hash: hash, ModelID: modelID, ModelPath: modelPath, Flags: flags, Port: port, Pid: cmd.Process.Pid, StartedAt: time.Now(), cmd: cmd, cancel: cancel, done: make(chan error, 1), stderr: stderr, stdinFile: devNull, stdoutR: stdoutR, stdoutFile: stdoutW, } s.LastUsed.Store(time.Now().UnixNano()) go func() { err := cmd.Wait() s.healthy.Store(false) exitCode := -1 if cmd.ProcessState != nil { exitCode = cmd.ProcessState.ExitCode() } slog.Error("sidecar child exited", "hash", hash, "port", port, "pid", s.Pid, "exit_code", exitCode, "wait_err", fmt.Sprintf("%v", err), "uptime", time.Since(s.StartedAt).Round(time.Millisecond), "stderr_tail", stderr.String(), ) s.done <- err close(s.done) }() // Wait for health healthURL := fmt.Sprintf("http://127.0.0.1:%d/health", port) deadline := time.Now().Add(time.Duration(cfg.HealthTimeoutSeconds) * time.Second) for time.Now().Before(deadline) { resp, err := http.Get(healthURL) if err == nil { resp.Body.Close() if resp.StatusCode == 200 { s.healthy.Store(true) slog.Info("sidecar healthy", "hash", hash, "port", port, "elapsed", time.Since(s.StartedAt).Round(time.Millisecond)) monCtx, monCancel := context.WithCancel(ctx) s.stopMon = monCancel go s.healthMonitor(monCtx, cfg.HealthIntervalSeconds) return s, nil } } select { case <-childCtx.Done(): return nil, fmt.Errorf("sidecar process exited during health check") case <-time.After(500 * time.Millisecond): } } _ = rs.Kill(s) return nil, fmt.Errorf("health check timed out after %ds, last stderr: %s", cfg.HealthTimeoutSeconds, s.stderr.LastLine()) } func (rs *RealSpawner) Kill(s *Sidecar) error { if s.stopMon != nil { s.stopMon() } s.cancel() select { case <-s.done: case <-time.After(5 * time.Second): if s.cmd.Process != nil { _ = s.cmd.Process.Kill() } <-s.done } if s.stdinFile != nil { s.stdinFile.Close() } if s.stdoutFile != nil { s.stdoutFile.Close() } if s.stdoutR != nil { s.stdoutR.Close() } slog.Info("sidecar killed", "hash", s.Hash, "port", s.Port) return nil } func (s *Sidecar) healthMonitor(ctx context.Context, intervalSec int) { ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() failures := 0 url := fmt.Sprintf("http://127.0.0.1:%d/health", s.Port) client := &http.Client{Timeout: 5 * time.Second} for { select { case <-ctx.Done(): return case <-ticker.C: resp, err := client.Get(url) if err != nil || resp.StatusCode != 200 { if resp != nil { resp.Body.Close() } failures++ if failures >= 3 { slog.Warn("sidecar unhealthy, marking for eviction", "hash", s.Hash, "port", s.Port) s.healthy.Store(false) return } } else { resp.Body.Close() failures = 0 } } } } func buildArgs(baseArgs []string, modelPath string, port int, userFlags []string) []string { deduped := dedupFlags(baseArgs, userFlags) args := make([]string, 0, len(deduped)+len(userFlags)+4) args = append(args, deduped...) args = append(args, "--model", modelPath) args = append(args, "--port", strconv.Itoa(port)) args = append(args, userFlags...) return args } // dedupFlags removes from autoArgs any flag that the user also supplied, // so the user's value wins via llama.cpp's last-wins CLI parsing. func dedupFlags(autoArgs, userArgs []string) []string { userNames := make(map[string]bool) for _, tok := range userArgs { if name := validator.FlagName(tok); name != "" { userNames[name] = true } } out := make([]string, 0, len(autoArgs)) i := 0 for i < len(autoArgs) { tok := autoArgs[i] name := validator.FlagName(tok) if name == "" || !userNames[name] { out = append(out, tok) i++ continue } if strings.Contains(tok, "=") { i++ } else if i+1 < len(autoArgs) && validator.FlagName(autoArgs[i+1]) == "" { i += 2 } else { i++ } } return out } // Ring buffer for last N lines of stderr type ringBuffer struct { mu sync.Mutex lines []string max int } func newRingBuffer(max int) *ringBuffer { return &ringBuffer{lines: make([]string, 0, max), max: max} } func (rb *ringBuffer) Write(p []byte) (int, error) { rb.mu.Lock() defer rb.mu.Unlock() for _, line := range strings.Split(string(p), "\n") { line = strings.TrimRight(line, "\r\n") if line == "" { continue } if len(rb.lines) >= rb.max { rb.lines = rb.lines[1:] } rb.lines = append(rb.lines, line) } return len(p), nil } func (rb *ringBuffer) String() string { rb.mu.Lock() defer rb.mu.Unlock() return strings.Join(rb.lines, "\n") } func (rb *ringBuffer) LastLine() string { rb.mu.Lock() defer rb.mu.Unlock() if len(rb.lines) == 0 { return "" } return rb.lines[len(rb.lines)-1] } type prefixWriter struct { prefix string buf bytes.Buffer } func (pw *prefixWriter) Write(p []byte) (int, error) { pw.buf.Write(p) for { line, err := pw.buf.ReadString('\n') if err != nil { pw.buf.WriteString(line) break } fmt.Fprint(os.Stderr, pw.prefix+line) } return len(p), nil }