diff --git a/tools/pz-analyzer/pz_classify.py b/tools/pz-analyzer/pz_classify.py new file mode 100644 index 0000000..7bdb560 --- /dev/null +++ b/tools/pz-analyzer/pz_classify.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +""" +pz_classify.py — Deterministic Project Zomboid log classifier orchestrator. + +Walks ``*DebugLog-server*.txt`` files under the redacted-logs directory, +runs the pz_parser pipeline per file, merges records cross-file by their +deterministic ``signature``, and emits the spec-shaped JSON report. + +Companion to the existing Qwen-backed discovery tool ``pz_error_analysis.py`` +(left untouched). Zero AI dependency, stdlib-only, runs in seconds. + +By convention the input is always the redacted directory produced by +``pz_redact_all.sh``; ``meta.redacted`` is therefore hard-coded ``true``. +If the user overrides ``--input`` to a non-redacted source we still emit +``true`` because we have no upstream way to verify redaction status. + +Pipeline: + parser.parse_file per-file Entry list + parser.classify_entries per-file deduped Record list + _merge_cross_file global Record list deduped across files + _build_summary top-line stats + by_kind / by_attribution / top_mods + +Output schema, CLI flags, and aggregation rules are defined in +``docs/superpowers/specs/2026-05-04-pz-deterministic-classifier-design.md``. +""" +from __future__ import annotations + +import argparse +import dataclasses +import json +import sys +from collections import Counter +from datetime import datetime, timezone +from pathlib import Path + +from pz_parser import ( + MAX_CAUSE_CHAIN_LEVELS, + MAX_STACK_FRAMES, + SEVERITY_LEVELS, + Record, + classify_entries, + parse_file, +) + +# --------------------------------------------------------------------------- +# Defaults / constants +# --------------------------------------------------------------------------- + +_REPO_ROOT = Path(__file__).resolve().parents[2] +DEFAULT_INPUT: Path = _REPO_ROOT / ".scratch" / "pz" / "Logs.redacted" +DEFAULT_OUT: Path = _REPO_ROOT / ".scratch" / "pz" / "classify.json" + +#: Filename glob driving the directory walk. +INPUT_GLOB: str = "*DebugLog-server*.txt" +#: Cap on entries in ``summary.top_mods`` — most occurrence-count-heavy mods. +TOP_MODS_LIMIT: int = 10 + +#: Confidence / attribution promotion ladders (higher rank wins on merge). +_CONFIDENCE_RANK: dict[str, int] = {"low": 0, "medium": 1, "high": 2} +_ATTRIBUTION_RANK: dict[str, int] = { + "unattributed": 0, + "inferred": 1, + "direct": 2, +} +#: Levels that count as errors (vs warnings) in the summary. +_ERROR_LEVELS: frozenset[str] = frozenset({"ERROR", "SEVERE", "FATAL"}) + + +# --------------------------------------------------------------------------- +# Cross-file aggregation (spec §9, inter-file equivalent of parser dedup) +# --------------------------------------------------------------------------- + + +def _merge_cross_file(per_file_records: list[Record]) -> list[Record]: + """Merge ``Record`` instances across files by ``signature``. + + The parser already dedups within a single file. This is the inter-file + equivalent: when the same signature appears in records from multiple + files, sum occurrences, union file lists, promote attribution/confidence, + and merge stack and cause-chain (deduped, capped at parser constants). + First-seen is the earliest by file-then-line; since callers feed records + in sorted file order, the first record we encounter per signature is + already the earliest. + """ + by_signature: dict[str, Record] = {} + for incoming in per_file_records: + existing = by_signature.get(incoming.signature) + if existing is None: + # First occurrence — copy so we don't mutate the caller's list. + by_signature[incoming.signature] = Record( + signature=incoming.signature, + pattern_id=incoming.pattern_id, + level=incoming.level, + kind=incoming.kind, + mod_id=incoming.mod_id, + mod_name=incoming.mod_name, + attribution=incoming.attribution, + confidence=incoming.confidence, + attribution_reason=incoming.attribution_reason, + file=incoming.file, + line=incoming.line, + cause_chain=incoming.cause_chain, + stack=list(incoming.stack), + first_seen=incoming.first_seen, + occurrence_count=incoming.occurrence_count, + files=list(incoming.files), + excerpt=incoming.excerpt, + ) + continue + # Aggregate. + existing.occurrence_count += incoming.occurrence_count + for fname in incoming.files: + if fname not in existing.files: + existing.files.append(fname) + # Promote attribution / confidence / mod_name on stronger evidence. + if _ATTRIBUTION_RANK[incoming.attribution] > _ATTRIBUTION_RANK[existing.attribution]: + existing.attribution = incoming.attribution + existing.attribution_reason = incoming.attribution_reason + if incoming.mod_name: + existing.mod_name = incoming.mod_name + if _CONFIDENCE_RANK[incoming.confidence] > _CONFIDENCE_RANK[existing.confidence]: + existing.confidence = incoming.confidence + # Merge stack frames preserving order, capped. + for frame in incoming.stack: + if frame not in existing.stack and len(existing.stack) < MAX_STACK_FRAMES: + existing.stack.append(frame) + # Merge cause chain (deduped tokens, capped). + if incoming.cause_chain and incoming.cause_chain != existing.cause_chain: + old = existing.cause_chain.split(" -> ") if existing.cause_chain else [] + new = incoming.cause_chain.split(" -> ") + merged = list(old) + for tok in new: + if tok and tok not in merged: + merged.append(tok) + existing.cause_chain = " -> ".join(merged[:MAX_CAUSE_CHAIN_LEVELS]) + return list(by_signature.values()) + + +# --------------------------------------------------------------------------- +# Summary computation +# --------------------------------------------------------------------------- + + +def _build_summary(records: list[Record]) -> dict[str, object]: + """Build the ``summary`` block per spec. + + Counts records (signatures), not raw occurrences, except for ``top_mods`` + which sums ``occurrence_count`` per mod_id so that volume-driving mods + surface even when they hit the same shape repeatedly. + """ + errors = sum(1 for r in records if r.level in _ERROR_LEVELS) + warnings = sum(1 for r in records if r.level == "WARN") + by_kind = Counter(r.kind for r in records) + by_attribution = Counter(r.attribution for r in records) + by_confidence = Counter(r.confidence for r in records) + + # Group by mod_id summing total occurrence_count; preserve any mod_name. + mod_totals: dict[str, int] = {} + mod_names: dict[str, str] = {} + for r in records: + mod_totals[r.mod_id] = mod_totals.get(r.mod_id, 0) + r.occurrence_count + # First non-empty mod_name wins; subsequent records may have empty + # mod_name (e.g. for unattributed) so don't overwrite with "". + if r.mod_name and r.mod_id not in mod_names: + mod_names[r.mod_id] = r.mod_name + top_mods = sorted( + ( + { + "mod_id": mod_id, + "mod_name": mod_names.get(mod_id, ""), + "occurrence_count": total, + } + for mod_id, total in mod_totals.items() + ), + key=lambda d: d["occurrence_count"], + reverse=True, + )[:TOP_MODS_LIMIT] + + return { + "errors": errors, + "warnings": warnings, + "by_kind": dict(by_kind), + "by_attribution": dict(by_attribution), + "by_confidence": dict(by_confidence), + "top_mods": top_mods, + } + + +# --------------------------------------------------------------------------- +# Driver +# --------------------------------------------------------------------------- + + +def _run(input_dir: Path, out_path: Path, *, quiet: bool) -> int: + if not input_dir.is_dir(): + print( + f"pz_classify: --input directory not found: {input_dir}", + file=sys.stderr, + ) + return 2 + + started = datetime.now(timezone.utc).isoformat(timespec="seconds") + files = sorted(input_dir.glob(INPUT_GLOB)) + + all_records: list[Record] = [] + log_lines_total = 0 + error_lines_total = 0 + + for path in files: + try: + entries = parse_file(path) + except Exception as exc: # noqa: BLE001 — orchestrator must keep going. + print( + f"pz_classify: warning: failed to parse {path.name}: {exc}", + file=sys.stderr, + ) + continue + # Body-line totals: every line under every parsed entry contributes + # to log_lines_total; severity-level entries' body lines feed + # error_lines_total. Counted before dedup so it reflects raw volume. + for e in entries: + log_lines_total += len(e.body) + if e.level in SEVERITY_LEVELS: + error_lines_total += len(e.body) + all_records.extend(classify_entries(entries, source_file=path.name)) + + merged = _merge_cross_file(all_records) + merged.sort(key=lambda r: r.occurrence_count, reverse=True) + + finished = datetime.now(timezone.utc).isoformat(timespec="seconds") + + unique_patterns = len({r.pattern_id for r in merged}) + + document: dict[str, object] = { + "meta": { + "input_dir": str(input_dir), + "files_scanned": len(files), + "log_lines_total": log_lines_total, + "error_lines_total": error_lines_total, + "unique_signatures": len(merged), + "unique_patterns": unique_patterns, + "redacted": True, + "started": started, + "finished": finished, + }, + "signatures": [dataclasses.asdict(r) for r in merged], + "summary": _build_summary(merged), + } + + out_path.parent.mkdir(parents=True, exist_ok=True) + tmp = out_path.with_suffix(out_path.suffix + ".tmp") + try: + with tmp.open("w", encoding="utf-8") as f: + json.dump(document, f, ensure_ascii=False, indent=2) + f.write("\n") + tmp.replace(out_path) + except OSError as exc: + print(f"pz_classify: failed to write {out_path}: {exc}", file=sys.stderr) + # Best-effort cleanup of the temp file. + try: + tmp.unlink() + except OSError: + pass + return 1 + + if not quiet: + print( + f"pz_classify: {len(files)} file(s), {log_lines_total} log lines, " + f"{error_lines_total} error lines, {len(merged)} records " + f"({unique_patterns} unique patterns) -> {out_path}" + ) + return 0 + + +def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog="pz_classify", + description=( + "Deterministic Project Zomboid log classifier. Walks redacted " + "DebugLog-server*.txt files, classifies errors/warnings, and " + "emits a JSON report." + ), + ) + parser.add_argument( + "--input", + type=Path, + default=DEFAULT_INPUT, + help=f"Input directory of redacted log files (default: {DEFAULT_INPUT}).", + ) + parser.add_argument( + "--out", + type=Path, + default=DEFAULT_OUT, + help=f"Output JSON path (default: {DEFAULT_OUT}).", + ) + parser.add_argument( + "--quiet", + action="store_true", + help="Suppress the trailing one-line summary.", + ) + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = _parse_args(argv) + return _run(args.input, args.out, quiet=args.quiet) + + +if __name__ == "__main__": + sys.exit(main())