feat: deterministic PZ log classifier orchestrator
Walks DebugLog-server*.txt under the redacted directory, runs the parser per file, merges cross-file by signature, and emits the spec-shaped JSON report. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
310
tools/pz-analyzer/pz_classify.py
Normal file
310
tools/pz-analyzer/pz_classify.py
Normal file
@@ -0,0 +1,310 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
pz_classify.py — Deterministic Project Zomboid log classifier orchestrator.
|
||||
|
||||
Walks ``*DebugLog-server*.txt`` files under the redacted-logs directory,
|
||||
runs the pz_parser pipeline per file, merges records cross-file by their
|
||||
deterministic ``signature``, and emits the spec-shaped JSON report.
|
||||
|
||||
Companion to the existing Qwen-backed discovery tool ``pz_error_analysis.py``
|
||||
(left untouched). Zero AI dependency, stdlib-only, runs in seconds.
|
||||
|
||||
By convention the input is always the redacted directory produced by
|
||||
``pz_redact_all.sh``; ``meta.redacted`` is therefore hard-coded ``true``.
|
||||
If the user overrides ``--input`` to a non-redacted source we still emit
|
||||
``true`` because we have no upstream way to verify redaction status.
|
||||
|
||||
Pipeline:
|
||||
parser.parse_file per-file Entry list
|
||||
parser.classify_entries per-file deduped Record list
|
||||
_merge_cross_file global Record list deduped across files
|
||||
_build_summary top-line stats + by_kind / by_attribution / top_mods
|
||||
|
||||
Output schema, CLI flags, and aggregation rules are defined in
|
||||
``docs/superpowers/specs/2026-05-04-pz-deterministic-classifier-design.md``.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import dataclasses
|
||||
import json
|
||||
import sys
|
||||
from collections import Counter
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from pz_parser import (
|
||||
MAX_CAUSE_CHAIN_LEVELS,
|
||||
MAX_STACK_FRAMES,
|
||||
SEVERITY_LEVELS,
|
||||
Record,
|
||||
classify_entries,
|
||||
parse_file,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Defaults / constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||
DEFAULT_INPUT: Path = _REPO_ROOT / ".scratch" / "pz" / "Logs.redacted"
|
||||
DEFAULT_OUT: Path = _REPO_ROOT / ".scratch" / "pz" / "classify.json"
|
||||
|
||||
#: Filename glob driving the directory walk.
|
||||
INPUT_GLOB: str = "*DebugLog-server*.txt"
|
||||
#: Cap on entries in ``summary.top_mods`` — most occurrence-count-heavy mods.
|
||||
TOP_MODS_LIMIT: int = 10
|
||||
|
||||
#: Confidence / attribution promotion ladders (higher rank wins on merge).
|
||||
_CONFIDENCE_RANK: dict[str, int] = {"low": 0, "medium": 1, "high": 2}
|
||||
_ATTRIBUTION_RANK: dict[str, int] = {
|
||||
"unattributed": 0,
|
||||
"inferred": 1,
|
||||
"direct": 2,
|
||||
}
|
||||
#: Levels that count as errors (vs warnings) in the summary.
|
||||
_ERROR_LEVELS: frozenset[str] = frozenset({"ERROR", "SEVERE", "FATAL"})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cross-file aggregation (spec §9, inter-file equivalent of parser dedup)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _merge_cross_file(per_file_records: list[Record]) -> list[Record]:
|
||||
"""Merge ``Record`` instances across files by ``signature``.
|
||||
|
||||
The parser already dedups within a single file. This is the inter-file
|
||||
equivalent: when the same signature appears in records from multiple
|
||||
files, sum occurrences, union file lists, promote attribution/confidence,
|
||||
and merge stack and cause-chain (deduped, capped at parser constants).
|
||||
First-seen is the earliest by file-then-line; since callers feed records
|
||||
in sorted file order, the first record we encounter per signature is
|
||||
already the earliest.
|
||||
"""
|
||||
by_signature: dict[str, Record] = {}
|
||||
for incoming in per_file_records:
|
||||
existing = by_signature.get(incoming.signature)
|
||||
if existing is None:
|
||||
# First occurrence — copy so we don't mutate the caller's list.
|
||||
by_signature[incoming.signature] = Record(
|
||||
signature=incoming.signature,
|
||||
pattern_id=incoming.pattern_id,
|
||||
level=incoming.level,
|
||||
kind=incoming.kind,
|
||||
mod_id=incoming.mod_id,
|
||||
mod_name=incoming.mod_name,
|
||||
attribution=incoming.attribution,
|
||||
confidence=incoming.confidence,
|
||||
attribution_reason=incoming.attribution_reason,
|
||||
file=incoming.file,
|
||||
line=incoming.line,
|
||||
cause_chain=incoming.cause_chain,
|
||||
stack=list(incoming.stack),
|
||||
first_seen=incoming.first_seen,
|
||||
occurrence_count=incoming.occurrence_count,
|
||||
files=list(incoming.files),
|
||||
excerpt=incoming.excerpt,
|
||||
)
|
||||
continue
|
||||
# Aggregate.
|
||||
existing.occurrence_count += incoming.occurrence_count
|
||||
for fname in incoming.files:
|
||||
if fname not in existing.files:
|
||||
existing.files.append(fname)
|
||||
# Promote attribution / confidence / mod_name on stronger evidence.
|
||||
if _ATTRIBUTION_RANK[incoming.attribution] > _ATTRIBUTION_RANK[existing.attribution]:
|
||||
existing.attribution = incoming.attribution
|
||||
existing.attribution_reason = incoming.attribution_reason
|
||||
if incoming.mod_name:
|
||||
existing.mod_name = incoming.mod_name
|
||||
if _CONFIDENCE_RANK[incoming.confidence] > _CONFIDENCE_RANK[existing.confidence]:
|
||||
existing.confidence = incoming.confidence
|
||||
# Merge stack frames preserving order, capped.
|
||||
for frame in incoming.stack:
|
||||
if frame not in existing.stack and len(existing.stack) < MAX_STACK_FRAMES:
|
||||
existing.stack.append(frame)
|
||||
# Merge cause chain (deduped tokens, capped).
|
||||
if incoming.cause_chain and incoming.cause_chain != existing.cause_chain:
|
||||
old = existing.cause_chain.split(" -> ") if existing.cause_chain else []
|
||||
new = incoming.cause_chain.split(" -> ")
|
||||
merged = list(old)
|
||||
for tok in new:
|
||||
if tok and tok not in merged:
|
||||
merged.append(tok)
|
||||
existing.cause_chain = " -> ".join(merged[:MAX_CAUSE_CHAIN_LEVELS])
|
||||
return list(by_signature.values())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Summary computation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_summary(records: list[Record]) -> dict[str, object]:
|
||||
"""Build the ``summary`` block per spec.
|
||||
|
||||
Counts records (signatures), not raw occurrences, except for ``top_mods``
|
||||
which sums ``occurrence_count`` per mod_id so that volume-driving mods
|
||||
surface even when they hit the same shape repeatedly.
|
||||
"""
|
||||
errors = sum(1 for r in records if r.level in _ERROR_LEVELS)
|
||||
warnings = sum(1 for r in records if r.level == "WARN")
|
||||
by_kind = Counter(r.kind for r in records)
|
||||
by_attribution = Counter(r.attribution for r in records)
|
||||
by_confidence = Counter(r.confidence for r in records)
|
||||
|
||||
# Group by mod_id summing total occurrence_count; preserve any mod_name.
|
||||
mod_totals: dict[str, int] = {}
|
||||
mod_names: dict[str, str] = {}
|
||||
for r in records:
|
||||
mod_totals[r.mod_id] = mod_totals.get(r.mod_id, 0) + r.occurrence_count
|
||||
# First non-empty mod_name wins; subsequent records may have empty
|
||||
# mod_name (e.g. for unattributed) so don't overwrite with "".
|
||||
if r.mod_name and r.mod_id not in mod_names:
|
||||
mod_names[r.mod_id] = r.mod_name
|
||||
top_mods = sorted(
|
||||
(
|
||||
{
|
||||
"mod_id": mod_id,
|
||||
"mod_name": mod_names.get(mod_id, ""),
|
||||
"occurrence_count": total,
|
||||
}
|
||||
for mod_id, total in mod_totals.items()
|
||||
),
|
||||
key=lambda d: d["occurrence_count"],
|
||||
reverse=True,
|
||||
)[:TOP_MODS_LIMIT]
|
||||
|
||||
return {
|
||||
"errors": errors,
|
||||
"warnings": warnings,
|
||||
"by_kind": dict(by_kind),
|
||||
"by_attribution": dict(by_attribution),
|
||||
"by_confidence": dict(by_confidence),
|
||||
"top_mods": top_mods,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Driver
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _run(input_dir: Path, out_path: Path, *, quiet: bool) -> int:
|
||||
if not input_dir.is_dir():
|
||||
print(
|
||||
f"pz_classify: --input directory not found: {input_dir}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 2
|
||||
|
||||
started = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||
files = sorted(input_dir.glob(INPUT_GLOB))
|
||||
|
||||
all_records: list[Record] = []
|
||||
log_lines_total = 0
|
||||
error_lines_total = 0
|
||||
|
||||
for path in files:
|
||||
try:
|
||||
entries = parse_file(path)
|
||||
except Exception as exc: # noqa: BLE001 — orchestrator must keep going.
|
||||
print(
|
||||
f"pz_classify: warning: failed to parse {path.name}: {exc}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
continue
|
||||
# Body-line totals: every line under every parsed entry contributes
|
||||
# to log_lines_total; severity-level entries' body lines feed
|
||||
# error_lines_total. Counted before dedup so it reflects raw volume.
|
||||
for e in entries:
|
||||
log_lines_total += len(e.body)
|
||||
if e.level in SEVERITY_LEVELS:
|
||||
error_lines_total += len(e.body)
|
||||
all_records.extend(classify_entries(entries, source_file=path.name))
|
||||
|
||||
merged = _merge_cross_file(all_records)
|
||||
merged.sort(key=lambda r: r.occurrence_count, reverse=True)
|
||||
|
||||
finished = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||
|
||||
unique_patterns = len({r.pattern_id for r in merged})
|
||||
|
||||
document: dict[str, object] = {
|
||||
"meta": {
|
||||
"input_dir": str(input_dir),
|
||||
"files_scanned": len(files),
|
||||
"log_lines_total": log_lines_total,
|
||||
"error_lines_total": error_lines_total,
|
||||
"unique_signatures": len(merged),
|
||||
"unique_patterns": unique_patterns,
|
||||
"redacted": True,
|
||||
"started": started,
|
||||
"finished": finished,
|
||||
},
|
||||
"signatures": [dataclasses.asdict(r) for r in merged],
|
||||
"summary": _build_summary(merged),
|
||||
}
|
||||
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
|
||||
try:
|
||||
with tmp.open("w", encoding="utf-8") as f:
|
||||
json.dump(document, f, ensure_ascii=False, indent=2)
|
||||
f.write("\n")
|
||||
tmp.replace(out_path)
|
||||
except OSError as exc:
|
||||
print(f"pz_classify: failed to write {out_path}: {exc}", file=sys.stderr)
|
||||
# Best-effort cleanup of the temp file.
|
||||
try:
|
||||
tmp.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return 1
|
||||
|
||||
if not quiet:
|
||||
print(
|
||||
f"pz_classify: {len(files)} file(s), {log_lines_total} log lines, "
|
||||
f"{error_lines_total} error lines, {len(merged)} records "
|
||||
f"({unique_patterns} unique patterns) -> {out_path}"
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="pz_classify",
|
||||
description=(
|
||||
"Deterministic Project Zomboid log classifier. Walks redacted "
|
||||
"DebugLog-server*.txt files, classifies errors/warnings, and "
|
||||
"emits a JSON report."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--input",
|
||||
type=Path,
|
||||
default=DEFAULT_INPUT,
|
||||
help=f"Input directory of redacted log files (default: {DEFAULT_INPUT}).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--out",
|
||||
type=Path,
|
||||
default=DEFAULT_OUT,
|
||||
help=f"Output JSON path (default: {DEFAULT_OUT}).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--quiet",
|
||||
action="store_true",
|
||||
help="Suppress the trailing one-line summary.",
|
||||
)
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = _parse_args(argv)
|
||||
return _run(args.input, args.out, quiet=args.quiet)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user