Was leaking unhandled OSError tracebacks when the output's parent path could not be created. Exit code stays 1; user-facing message matches the existing write-failure path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
311 lines
12 KiB
Python
311 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
pz_classify.py — Deterministic Project Zomboid log classifier orchestrator.
|
|
|
|
Walks ``*DebugLog-server*.txt`` files under the redacted-logs directory,
|
|
runs the pz_parser pipeline per file, merges records cross-file by their
|
|
deterministic ``signature``, and emits the spec-shaped JSON report.
|
|
|
|
Companion to the existing Qwen-backed discovery tool ``pz_error_analysis.py``
|
|
(left untouched). Zero AI dependency, stdlib-only, runs in seconds.
|
|
|
|
By convention the input is always the redacted directory produced by
|
|
``pz_redact_all.sh``; ``meta.redacted`` is therefore hard-coded ``true``.
|
|
If the user overrides ``--input`` to a non-redacted source we still emit
|
|
``true`` because we have no upstream way to verify redaction status.
|
|
|
|
Pipeline:
|
|
parser.parse_file per-file Entry list
|
|
parser.classify_entries per-file deduped Record list
|
|
_merge_cross_file global Record list deduped across files
|
|
_build_summary top-line stats + by_kind / by_attribution / top_mods
|
|
|
|
Output schema, CLI flags, and aggregation rules are defined in
|
|
``docs/superpowers/specs/2026-05-04-pz-deterministic-classifier-design.md``.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import dataclasses
|
|
import json
|
|
import sys
|
|
from collections import Counter
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from pz_parser import (
|
|
MAX_CAUSE_CHAIN_LEVELS,
|
|
MAX_STACK_FRAMES,
|
|
SEVERITY_LEVELS,
|
|
Record,
|
|
classify_entries,
|
|
parse_file,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Defaults / constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
DEFAULT_INPUT: Path = _REPO_ROOT / ".scratch" / "pz" / "Logs.redacted"
|
|
DEFAULT_OUT: Path = _REPO_ROOT / ".scratch" / "pz" / "classify.json"
|
|
|
|
#: Filename glob driving the directory walk.
|
|
INPUT_GLOB: str = "*DebugLog-server*.txt"
|
|
#: Cap on entries in ``summary.top_mods`` — most occurrence-count-heavy mods.
|
|
TOP_MODS_LIMIT: int = 10
|
|
|
|
#: Confidence / attribution promotion ladders (higher rank wins on merge).
|
|
_CONFIDENCE_RANK: dict[str, int] = {"low": 0, "medium": 1, "high": 2}
|
|
_ATTRIBUTION_RANK: dict[str, int] = {
|
|
"unattributed": 0,
|
|
"inferred": 1,
|
|
"direct": 2,
|
|
}
|
|
#: Levels that count as errors (vs warnings) in the summary.
|
|
_ERROR_LEVELS: frozenset[str] = frozenset({"ERROR", "SEVERE", "FATAL"})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cross-file aggregation (spec §9, inter-file equivalent of parser dedup)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _merge_cross_file(per_file_records: list[Record]) -> list[Record]:
|
|
"""Merge ``Record`` instances across files by ``signature``.
|
|
|
|
The parser already dedups within a single file. This is the inter-file
|
|
equivalent: when the same signature appears in records from multiple
|
|
files, sum occurrences, union file lists, promote attribution/confidence,
|
|
and merge stack and cause-chain (deduped, capped at parser constants).
|
|
First-seen is the earliest by file-then-line; since callers feed records
|
|
in sorted file order, the first record we encounter per signature is
|
|
already the earliest.
|
|
"""
|
|
by_signature: dict[str, Record] = {}
|
|
for incoming in per_file_records:
|
|
existing = by_signature.get(incoming.signature)
|
|
if existing is None:
|
|
# First occurrence — copy so we don't mutate the caller's list.
|
|
by_signature[incoming.signature] = Record(
|
|
signature=incoming.signature,
|
|
pattern_id=incoming.pattern_id,
|
|
level=incoming.level,
|
|
kind=incoming.kind,
|
|
mod_id=incoming.mod_id,
|
|
mod_name=incoming.mod_name,
|
|
attribution=incoming.attribution,
|
|
confidence=incoming.confidence,
|
|
attribution_reason=incoming.attribution_reason,
|
|
file=incoming.file,
|
|
line=incoming.line,
|
|
cause_chain=incoming.cause_chain,
|
|
stack=list(incoming.stack),
|
|
first_seen=incoming.first_seen,
|
|
occurrence_count=incoming.occurrence_count,
|
|
files=list(incoming.files),
|
|
excerpt=incoming.excerpt,
|
|
)
|
|
continue
|
|
# Aggregate.
|
|
existing.occurrence_count += incoming.occurrence_count
|
|
for fname in incoming.files:
|
|
if fname not in existing.files:
|
|
existing.files.append(fname)
|
|
# Promote attribution / confidence / mod_name on stronger evidence.
|
|
if _ATTRIBUTION_RANK[incoming.attribution] > _ATTRIBUTION_RANK[existing.attribution]:
|
|
existing.attribution = incoming.attribution
|
|
existing.attribution_reason = incoming.attribution_reason
|
|
if incoming.mod_name:
|
|
existing.mod_name = incoming.mod_name
|
|
if _CONFIDENCE_RANK[incoming.confidence] > _CONFIDENCE_RANK[existing.confidence]:
|
|
existing.confidence = incoming.confidence
|
|
# Merge stack frames preserving order, capped.
|
|
for frame in incoming.stack:
|
|
if frame not in existing.stack and len(existing.stack) < MAX_STACK_FRAMES:
|
|
existing.stack.append(frame)
|
|
# Merge cause chain (deduped tokens, capped).
|
|
if incoming.cause_chain and incoming.cause_chain != existing.cause_chain:
|
|
old = existing.cause_chain.split(" -> ") if existing.cause_chain else []
|
|
new = incoming.cause_chain.split(" -> ")
|
|
merged = list(old)
|
|
for tok in new:
|
|
if tok and tok not in merged:
|
|
merged.append(tok)
|
|
existing.cause_chain = " -> ".join(merged[:MAX_CAUSE_CHAIN_LEVELS])
|
|
return list(by_signature.values())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Summary computation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _build_summary(records: list[Record]) -> dict[str, object]:
|
|
"""Build the ``summary`` block per spec.
|
|
|
|
Counts records (signatures), not raw occurrences, except for ``top_mods``
|
|
which sums ``occurrence_count`` per mod_id so that volume-driving mods
|
|
surface even when they hit the same shape repeatedly.
|
|
"""
|
|
errors = sum(1 for r in records if r.level in _ERROR_LEVELS)
|
|
warnings = sum(1 for r in records if r.level == "WARN")
|
|
by_kind = Counter(r.kind for r in records)
|
|
by_attribution = Counter(r.attribution for r in records)
|
|
by_confidence = Counter(r.confidence for r in records)
|
|
|
|
# Group by mod_id summing total occurrence_count; preserve any mod_name.
|
|
mod_totals: dict[str, int] = {}
|
|
mod_names: dict[str, str] = {}
|
|
for r in records:
|
|
mod_totals[r.mod_id] = mod_totals.get(r.mod_id, 0) + r.occurrence_count
|
|
# First non-empty mod_name wins; subsequent records may have empty
|
|
# mod_name (e.g. for unattributed) so don't overwrite with "".
|
|
if r.mod_name and r.mod_id not in mod_names:
|
|
mod_names[r.mod_id] = r.mod_name
|
|
top_mods = sorted(
|
|
(
|
|
{
|
|
"mod_id": mod_id,
|
|
"mod_name": mod_names.get(mod_id, ""),
|
|
"occurrence_count": total,
|
|
}
|
|
for mod_id, total in mod_totals.items()
|
|
),
|
|
key=lambda d: d["occurrence_count"],
|
|
reverse=True,
|
|
)[:TOP_MODS_LIMIT]
|
|
|
|
return {
|
|
"errors": errors,
|
|
"warnings": warnings,
|
|
"by_kind": dict(by_kind),
|
|
"by_attribution": dict(by_attribution),
|
|
"by_confidence": dict(by_confidence),
|
|
"top_mods": top_mods,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Driver
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _run(input_dir: Path, out_path: Path, *, quiet: bool) -> int:
|
|
if not input_dir.is_dir():
|
|
print(
|
|
f"pz_classify: --input directory not found: {input_dir}",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
started = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
files = sorted(input_dir.glob(INPUT_GLOB))
|
|
|
|
all_records: list[Record] = []
|
|
log_lines_total = 0
|
|
error_lines_total = 0
|
|
|
|
for path in files:
|
|
try:
|
|
entries = parse_file(path)
|
|
except Exception as exc: # noqa: BLE001 — orchestrator must keep going.
|
|
print(
|
|
f"pz_classify: warning: failed to parse {path.name}: {exc}",
|
|
file=sys.stderr,
|
|
)
|
|
continue
|
|
# Body-line totals: every line under every parsed entry contributes
|
|
# to log_lines_total; severity-level entries' body lines feed
|
|
# error_lines_total. Counted before dedup so it reflects raw volume.
|
|
for e in entries:
|
|
log_lines_total += len(e.body)
|
|
if e.level in SEVERITY_LEVELS:
|
|
error_lines_total += len(e.body)
|
|
all_records.extend(classify_entries(entries, source_file=path.name))
|
|
|
|
merged = _merge_cross_file(all_records)
|
|
merged.sort(key=lambda r: r.occurrence_count, reverse=True)
|
|
|
|
finished = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
|
|
unique_patterns = len({r.pattern_id for r in merged})
|
|
|
|
document: dict[str, object] = {
|
|
"meta": {
|
|
"input_dir": str(input_dir),
|
|
"files_scanned": len(files),
|
|
"log_lines_total": log_lines_total,
|
|
"error_lines_total": error_lines_total,
|
|
"unique_signatures": len(merged),
|
|
"unique_patterns": unique_patterns,
|
|
"redacted": True,
|
|
"started": started,
|
|
"finished": finished,
|
|
},
|
|
"signatures": [dataclasses.asdict(r) for r in merged],
|
|
"summary": _build_summary(merged),
|
|
}
|
|
|
|
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
|
|
try:
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
json.dump(document, f, ensure_ascii=False, indent=2)
|
|
f.write("\n")
|
|
tmp.replace(out_path)
|
|
except OSError as exc:
|
|
print(f"pz_classify: failed to write {out_path}: {exc}", file=sys.stderr)
|
|
# Best-effort cleanup of the temp file.
|
|
try:
|
|
tmp.unlink()
|
|
except OSError:
|
|
pass
|
|
return 1
|
|
|
|
if not quiet:
|
|
print(
|
|
f"pz_classify: {len(files)} file(s), {log_lines_total} log lines, "
|
|
f"{error_lines_total} error lines, {len(merged)} records "
|
|
f"({unique_patterns} unique patterns) -> {out_path}"
|
|
)
|
|
return 0
|
|
|
|
|
|
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
prog="pz_classify",
|
|
description=(
|
|
"Deterministic Project Zomboid log classifier. Walks redacted "
|
|
"DebugLog-server*.txt files, classifies errors/warnings, and "
|
|
"emits a JSON report."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--input",
|
|
type=Path,
|
|
default=DEFAULT_INPUT,
|
|
help=f"Input directory of redacted log files (default: {DEFAULT_INPUT}).",
|
|
)
|
|
parser.add_argument(
|
|
"--out",
|
|
type=Path,
|
|
default=DEFAULT_OUT,
|
|
help=f"Output JSON path (default: {DEFAULT_OUT}).",
|
|
)
|
|
parser.add_argument(
|
|
"--quiet",
|
|
action="store_true",
|
|
help="Suppress the trailing one-line summary.",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = _parse_args(argv)
|
|
return _run(args.input, args.out, quiet=args.quiet)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|