Merge branch 'pz-classifier'

Adds a deterministic-only Project Zomboid log classifier under
tools/pz-analyzer/, parallel to the existing Qwen-based research
tool. pz_parser.py is a pure module (parsing, attribution, file:line,
cause-chain, kind detection, two-level signatures); pz_classify.py
walks the redacted DebugLog-server directory, merges cross-file by
signature, and writes the spec-shaped JSON. 32 unit tests.
This commit is contained in:
2026-05-04 15:58:20 +00:00
23 changed files with 1730 additions and 0 deletions

View File

@@ -126,6 +126,7 @@ signature = sha256(pattern_id + mod_id)[:16]
Normalization for `pattern_id`:
- Strip session metadata prefix (`General f:N, t:N, st:N,N,N,N>` shape)
- Strip body-prefix severity token (`ERROR:` / `SEVERE:` / `WARN:` / `FATAL:`, case-insensitive) so a body that opens with the severity word still hashes the same as one that doesn't.
- Flatten double- and single-quoted strings to `"<S>"` / `'<S>'`
- Flatten ≥2-digit numeric runs to `<N>`
- Collapse whitespace
@@ -240,5 +241,6 @@ Test invocation: `python -m unittest discover tools/pz-analyzer/tests/` should b
- Editing `pz_error_analysis.py` or `pz_redact_all.sh`.
- Modifying any file in `/opt/ik-codex/src/`, `/opt/ik-codex/test/`, or `/opt/iblogs/`.
- AI / LLM integration of any kind in the new tool.
- LLM inference at runtime in iblogs / bosslogs production. The Qwen analyzer (`pz_error_analysis.py`) is a developer-only discovery tool used to expand the deterministic ruleset in `pz_parser.py` (and its future PHP port). Production rendering is deterministic-only, forever.
- iblogs front-end rendering of the classification output.
- Filesystem mod-scan reattribution (pzmm's symbol/vehicle indexes).

View File

@@ -0,0 +1,310 @@
#!/usr/bin/env python3
"""
pz_classify.py — Deterministic Project Zomboid log classifier orchestrator.
Walks ``*DebugLog-server*.txt`` files under the redacted-logs directory,
runs the pz_parser pipeline per file, merges records cross-file by their
deterministic ``signature``, and emits the spec-shaped JSON report.
Companion to the existing Qwen-backed discovery tool ``pz_error_analysis.py``
(left untouched). Zero AI dependency, stdlib-only, runs in seconds.
By convention the input is always the redacted directory produced by
``pz_redact_all.sh``; ``meta.redacted`` is therefore hard-coded ``true``.
If the user overrides ``--input`` to a non-redacted source we still emit
``true`` because we have no upstream way to verify redaction status.
Pipeline:
parser.parse_file per-file Entry list
parser.classify_entries per-file deduped Record list
_merge_cross_file global Record list deduped across files
_build_summary top-line stats + by_kind / by_attribution / top_mods
Output schema, CLI flags, and aggregation rules are defined in
``docs/superpowers/specs/2026-05-04-pz-deterministic-classifier-design.md``.
"""
from __future__ import annotations
import argparse
import dataclasses
import json
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from pz_parser import (
MAX_CAUSE_CHAIN_LEVELS,
MAX_STACK_FRAMES,
SEVERITY_LEVELS,
Record,
classify_entries,
parse_file,
)
# ---------------------------------------------------------------------------
# Defaults / constants
# ---------------------------------------------------------------------------
_REPO_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_INPUT: Path = _REPO_ROOT / ".scratch" / "pz" / "Logs.redacted"
DEFAULT_OUT: Path = _REPO_ROOT / ".scratch" / "pz" / "classify.json"
#: Filename glob driving the directory walk.
INPUT_GLOB: str = "*DebugLog-server*.txt"
#: Cap on entries in ``summary.top_mods`` — most occurrence-count-heavy mods.
TOP_MODS_LIMIT: int = 10
#: Confidence / attribution promotion ladders (higher rank wins on merge).
_CONFIDENCE_RANK: dict[str, int] = {"low": 0, "medium": 1, "high": 2}
_ATTRIBUTION_RANK: dict[str, int] = {
"unattributed": 0,
"inferred": 1,
"direct": 2,
}
#: Levels that count as errors (vs warnings) in the summary.
_ERROR_LEVELS: frozenset[str] = frozenset({"ERROR", "SEVERE", "FATAL"})
# ---------------------------------------------------------------------------
# Cross-file aggregation (spec §9, inter-file equivalent of parser dedup)
# ---------------------------------------------------------------------------
def _merge_cross_file(per_file_records: list[Record]) -> list[Record]:
"""Merge ``Record`` instances across files by ``signature``.
The parser already dedups within a single file. This is the inter-file
equivalent: when the same signature appears in records from multiple
files, sum occurrences, union file lists, promote attribution/confidence,
and merge stack and cause-chain (deduped, capped at parser constants).
First-seen is the earliest by file-then-line; since callers feed records
in sorted file order, the first record we encounter per signature is
already the earliest.
"""
by_signature: dict[str, Record] = {}
for incoming in per_file_records:
existing = by_signature.get(incoming.signature)
if existing is None:
# First occurrence — copy so we don't mutate the caller's list.
by_signature[incoming.signature] = Record(
signature=incoming.signature,
pattern_id=incoming.pattern_id,
level=incoming.level,
kind=incoming.kind,
mod_id=incoming.mod_id,
mod_name=incoming.mod_name,
attribution=incoming.attribution,
confidence=incoming.confidence,
attribution_reason=incoming.attribution_reason,
file=incoming.file,
line=incoming.line,
cause_chain=incoming.cause_chain,
stack=list(incoming.stack),
first_seen=incoming.first_seen,
occurrence_count=incoming.occurrence_count,
files=list(incoming.files),
excerpt=incoming.excerpt,
)
continue
# Aggregate.
existing.occurrence_count += incoming.occurrence_count
for fname in incoming.files:
if fname not in existing.files:
existing.files.append(fname)
# Promote attribution / confidence / mod_name on stronger evidence.
if _ATTRIBUTION_RANK[incoming.attribution] > _ATTRIBUTION_RANK[existing.attribution]:
existing.attribution = incoming.attribution
existing.attribution_reason = incoming.attribution_reason
if incoming.mod_name:
existing.mod_name = incoming.mod_name
if _CONFIDENCE_RANK[incoming.confidence] > _CONFIDENCE_RANK[existing.confidence]:
existing.confidence = incoming.confidence
# Merge stack frames preserving order, capped.
for frame in incoming.stack:
if frame not in existing.stack and len(existing.stack) < MAX_STACK_FRAMES:
existing.stack.append(frame)
# Merge cause chain (deduped tokens, capped).
if incoming.cause_chain and incoming.cause_chain != existing.cause_chain:
old = existing.cause_chain.split(" -> ") if existing.cause_chain else []
new = incoming.cause_chain.split(" -> ")
merged = list(old)
for tok in new:
if tok and tok not in merged:
merged.append(tok)
existing.cause_chain = " -> ".join(merged[:MAX_CAUSE_CHAIN_LEVELS])
return list(by_signature.values())
# ---------------------------------------------------------------------------
# Summary computation
# ---------------------------------------------------------------------------
def _build_summary(records: list[Record]) -> dict[str, object]:
"""Build the ``summary`` block per spec.
Counts records (signatures), not raw occurrences, except for ``top_mods``
which sums ``occurrence_count`` per mod_id so that volume-driving mods
surface even when they hit the same shape repeatedly.
"""
errors = sum(1 for r in records if r.level in _ERROR_LEVELS)
warnings = sum(1 for r in records if r.level == "WARN")
by_kind = Counter(r.kind for r in records)
by_attribution = Counter(r.attribution for r in records)
by_confidence = Counter(r.confidence for r in records)
# Group by mod_id summing total occurrence_count; preserve any mod_name.
mod_totals: dict[str, int] = {}
mod_names: dict[str, str] = {}
for r in records:
mod_totals[r.mod_id] = mod_totals.get(r.mod_id, 0) + r.occurrence_count
# First non-empty mod_name wins; subsequent records may have empty
# mod_name (e.g. for unattributed) so don't overwrite with "".
if r.mod_name and r.mod_id not in mod_names:
mod_names[r.mod_id] = r.mod_name
top_mods = sorted(
(
{
"mod_id": mod_id,
"mod_name": mod_names.get(mod_id, ""),
"occurrence_count": total,
}
for mod_id, total in mod_totals.items()
),
key=lambda d: d["occurrence_count"],
reverse=True,
)[:TOP_MODS_LIMIT]
return {
"errors": errors,
"warnings": warnings,
"by_kind": dict(by_kind),
"by_attribution": dict(by_attribution),
"by_confidence": dict(by_confidence),
"top_mods": top_mods,
}
# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def _run(input_dir: Path, out_path: Path, *, quiet: bool) -> int:
if not input_dir.is_dir():
print(
f"pz_classify: --input directory not found: {input_dir}",
file=sys.stderr,
)
return 2
started = datetime.now(timezone.utc).isoformat(timespec="seconds")
files = sorted(input_dir.glob(INPUT_GLOB))
all_records: list[Record] = []
log_lines_total = 0
error_lines_total = 0
for path in files:
try:
entries = parse_file(path)
except Exception as exc: # noqa: BLE001 — orchestrator must keep going.
print(
f"pz_classify: warning: failed to parse {path.name}: {exc}",
file=sys.stderr,
)
continue
# Body-line totals: every line under every parsed entry contributes
# to log_lines_total; severity-level entries' body lines feed
# error_lines_total. Counted before dedup so it reflects raw volume.
for e in entries:
log_lines_total += len(e.body)
if e.level in SEVERITY_LEVELS:
error_lines_total += len(e.body)
all_records.extend(classify_entries(entries, source_file=path.name))
merged = _merge_cross_file(all_records)
merged.sort(key=lambda r: r.occurrence_count, reverse=True)
finished = datetime.now(timezone.utc).isoformat(timespec="seconds")
unique_patterns = len({r.pattern_id for r in merged})
document: dict[str, object] = {
"meta": {
"input_dir": str(input_dir),
"files_scanned": len(files),
"log_lines_total": log_lines_total,
"error_lines_total": error_lines_total,
"unique_signatures": len(merged),
"unique_patterns": unique_patterns,
"redacted": True,
"started": started,
"finished": finished,
},
"signatures": [dataclasses.asdict(r) for r in merged],
"summary": _build_summary(merged),
}
tmp = out_path.with_suffix(out_path.suffix + ".tmp")
try:
out_path.parent.mkdir(parents=True, exist_ok=True)
with tmp.open("w", encoding="utf-8") as f:
json.dump(document, f, ensure_ascii=False, indent=2)
f.write("\n")
tmp.replace(out_path)
except OSError as exc:
print(f"pz_classify: failed to write {out_path}: {exc}", file=sys.stderr)
# Best-effort cleanup of the temp file.
try:
tmp.unlink()
except OSError:
pass
return 1
if not quiet:
print(
f"pz_classify: {len(files)} file(s), {log_lines_total} log lines, "
f"{error_lines_total} error lines, {len(merged)} records "
f"({unique_patterns} unique patterns) -> {out_path}"
)
return 0
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="pz_classify",
description=(
"Deterministic Project Zomboid log classifier. Walks redacted "
"DebugLog-server*.txt files, classifies errors/warnings, and "
"emits a JSON report."
),
)
parser.add_argument(
"--input",
type=Path,
default=DEFAULT_INPUT,
help=f"Input directory of redacted log files (default: {DEFAULT_INPUT}).",
)
parser.add_argument(
"--out",
type=Path,
default=DEFAULT_OUT,
help=f"Output JSON path (default: {DEFAULT_OUT}).",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress the trailing one-line summary.",
)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = _parse_args(argv)
return _run(args.input, args.out, quiet=args.quiet)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,777 @@
"""
pz_parser.py — Deterministic Project Zomboid log parser.
Pure module (no I/O beyond reading the path it is handed). Walks a redacted
DebugLog-server*.txt file, extracts errors/warnings, attributes each to a mod
where evidence allows, classifies by kind, and computes deterministic
signatures. Output records are designed to be `dataclasses.asdict()`-ready
for direct JSON serialisation.
Pipeline phases (per design spec at
docs/superpowers/specs/2026-05-04-pz-deterministic-classifier-design.md):
1. Severity-prefix recognition (ERROR|SEVERE|WARN)
2. Bidirectional stack collection (pre-stack walk back, post-stack walk forward)
3. Mod attribution (direct, inferred, unattributed)
4. File:line extraction (five fallbacks)
5. Cause-chain extraction (Caused by: chains + standalone exception lines)
6. Java exception kind detection
7. Engine-noise tagging
8. Signature computation (pattern_id + signature)
9. Aggregation (dedup on signature)
Style notes mirror sibling tool pz_error_analysis.py: type hints with built-in
generics, `from __future__ import annotations`, regex precompilation as
module-level constants, stdlib-only.
"""
from __future__ import annotations
import hashlib
import pathlib
import re
from dataclasses import dataclass
# ---------------------------------------------------------------------------
# Tunable constants
# ---------------------------------------------------------------------------
#: Lookback window (in raw file lines) for inferred mod attribution.
INFERRED_LOOKBACK_LINES: int = 40
#: Maximum frames retained per record after pre+post stack merge.
MAX_STACK_FRAMES: int = 8
#: Maximum lines walked in each direction during bidirectional stack collection.
STACK_WALK_LINES: int = 25
#: Maximum cause-chain depth retained.
MAX_CAUSE_CHAIN_LEVELS: int = 6
#: Truncation length for the normalised first line that feeds pattern_id.
PATTERN_ID_FIRST_LINE_MAX: int = 200
# ---------------------------------------------------------------------------
# Line-shape regexes (parsing)
# ---------------------------------------------------------------------------
#: PZ DebugLog entry header.
#: Example: ``[16-04-26 00:01:19.080] ERROR: General f:0, t:1, st:1,2,3,4> body``
ENTRY_RE = re.compile(
r"^\[(?P<ts>\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\]\s+"
r"(?P<level>[A-Z]+)\s*:\s*(?P<rest>.*)$"
)
#: Strips the "General f:N, t:N, st:N,N,N,N>" prefix from a body line.
SESSION_META_RE = re.compile(
r"^[A-Za-z][A-Za-z0-9]*\s+f:\d+,?\s*(?:t:\d+,?\s*)?st:[\d,]+>\s*"
)
# ---------------------------------------------------------------------------
# Severity-prefix recognition (phase 1)
# ---------------------------------------------------------------------------
#: Severity tokens that flag a body line as an error/warning event when they
#: appear at the start of body text. Per spec: broader than the existing
#: pz_error_analysis.py regex (adds SEVERE for Java util-logging).
SEVERITY_BODY_RE = re.compile(r"^\s*(ERROR|SEVERE|WARN)\s*[:\s]")
#: Bracketed-level tokens that map to severity events.
SEVERITY_LEVELS: tuple[str, ...] = ("ERROR", "WARN", "SEVERE", "FATAL")
# ---------------------------------------------------------------------------
# Stack-frame recognition (phase 2)
# ---------------------------------------------------------------------------
#: Markers that identify a line as stack-shaped. Used to gate pre/post stack
#: collection so we don't latch onto non-stack continuation text.
STACK_HINT_RE = re.compile(
r"(?:\bat\s+\S+|\[string\s+\"|function:\s|file:\s|\.lua\b)",
re.IGNORECASE,
)
# ---------------------------------------------------------------------------
# Mod attribution (phase 3)
# ---------------------------------------------------------------------------
#: Direct attribution marker: ``Lua((MOD:<name>))``.
LUA_MOD_MARKER_RE = re.compile(r"Lua\(\(MOD:([^)]+)\)\)")
#: Direct attribution: ``require("X") failed`` shape.
REQUIRE_FAILED_RE = re.compile(
r"""require\s*\(\s*["']([^"']+)["']\s*\)\s+failed""",
re.IGNORECASE,
)
#: Direct attribution: explicit ``needed by <mod>`` hint.
NEEDED_BY_RE = re.compile(r"needed\s+by\s+([A-Za-z0-9_'\- ]+?)(?:[,.]|$)", re.IGNORECASE)
#: Patterns that flag a body as "Lua-shaped" — gating filter for inferred
#: attribution. Mirrors the spec's enumeration.
LUA_SHAPED_PATTERNS: tuple[re.Pattern[str], ...] = (
re.compile(r"luamanager\.getfunctionobject", re.IGNORECASE),
re.compile(r"no\s+such\s+function", re.IGNORECASE),
re.compile(r"exception\s+thrown", re.IGNORECASE),
re.compile(r"runtimeexception", re.IGNORECASE),
re.compile(r"illegalstateexception", re.IGNORECASE),
re.compile(r"\blua\b", re.IGNORECASE),
)
# ---------------------------------------------------------------------------
# File:line extraction (phase 4) — five fallbacks tried in order
# ---------------------------------------------------------------------------
#: 1. ``at <path>.lua:<n>`` — typical Lua stack frame.
FILE_LINE_AT_RE = re.compile(r"\bat\s+([^\s:]+\.lua):(\d+)")
#: 2. ``function: ... file: <path>.lua line #<n>`` (or `: <n>`).
FILE_LINE_FUNCTION_RE = re.compile(
r"function:\s*[^,]*?file:\s*([^\s,]+\.lua)\s+line\s*(?:#|:)\s*(\d+)",
re.IGNORECASE,
)
#: 3. ``[string "<path>.lua"]:<n>`` — Lua VM source string.
FILE_LINE_STRING_RE = re.compile(r"""\[string\s+["']([^"']+\.lua)["']\]:(\d+)""")
#: 4. quoted path ending in a known extension; line # optional.
FILE_LINE_QUOTED_RE = re.compile(
r"""["']([^"']+\.(?:lua|txt|xml|json|ini|cfg|bin))["'](?::(\d+))?"""
)
#: 5. unquoted path segment beginning with a recognised root.
FILE_LINE_UNQUOTED_RE = re.compile(
r"\b((?:media|maps|lua|scripts)/[\w./\-]+\.(?:lua|txt|xml|json|ini|cfg|bin))(?::(\d+))?"
)
# ---------------------------------------------------------------------------
# Cause-chain extraction (phase 5)
# ---------------------------------------------------------------------------
#: ``Caused by: <ExceptionClass>: <msg>`` (msg optional).
CAUSED_BY_RE = re.compile(
r"Caused\s+by:\s+((?:\w+\.)+\w+(?:Exception|Error))(?::\s*(.+?))?\s*$",
re.IGNORECASE,
)
#: Standalone Java exception line: ``com.foo.BarException: msg``.
EXCEPTION_LINE_RE = re.compile(
r"((?:\w+\.)+\w+(?:Exception|Error))(?::\s*(.+?))?(?=\s+at\s|\s*$)"
)
# ---------------------------------------------------------------------------
# Engine-noise tagging (phase 7)
# ---------------------------------------------------------------------------
ENGINE_NOISE_PATTERNS: tuple[re.Pattern[str], ...] = (
re.compile(r"kahluathread\.flusherrormessage", re.IGNORECASE),
re.compile(r"dumping\s+lua\s+stack\s+trace", re.IGNORECASE),
)
# ---------------------------------------------------------------------------
# Signature normalisation (phase 8)
# ---------------------------------------------------------------------------
DOUBLE_QUOTED_RE = re.compile(r'"[^"]*"')
SINGLE_QUOTED_RE = re.compile(r"'[^']*'")
NUMERIC_RUN_RE = re.compile(r"\d{2,}")
WS_RUN_RE = re.compile(r"\s+")
#: Strips a leading ``ERROR:`` / ``SEVERE:`` / ``WARN:`` / ``FATAL:`` token
#: from a body line so a body that happens to begin with the severity word
#: hashes to the same pattern_id as the bracketed-only variant. Matches the
#: token plus any colon and trailing whitespace; case-insensitive.
SEVERITY_PREFIX_STRIP_RE = re.compile(
r"^\s*(?:ERROR|SEVERE|WARN|FATAL)\s*[:\s]\s*", re.IGNORECASE
)
# ---------------------------------------------------------------------------
# Dataclasses — match the JSON keys the spec mandates so consumers can
# `dataclasses.asdict(record)` straight to JSON.
# ---------------------------------------------------------------------------
@dataclass
class Entry:
"""One parsed log entry. Continuation lines (TAB-indented or otherwise
non-header lines) are folded into ``body``. Phase-2 stack collection
walks neighbouring entries (not raw lines), so no extra context is
stored here.
"""
timestamp: str
level: str
body: list[str]
line_start: int
line_end: int
@dataclass
class FirstSeen:
"""Provenance for the first occurrence of a deduped record."""
file: str
line: int
timestamp: str
@dataclass
class Record:
"""One classified, deduplicated error/warning record. Field names mirror
the JSON output schema in the spec verbatim — this object is intended to
be `dataclasses.asdict()`-ed straight into the output document.
"""
signature: str
pattern_id: str
level: str
kind: str
mod_id: str
mod_name: str
attribution: str
confidence: str
attribution_reason: str
file: str
line: int
cause_chain: str
stack: list[str]
first_seen: FirstSeen
occurrence_count: int
files: list[str]
excerpt: str
# ---------------------------------------------------------------------------
# Phase 0: file parse
# ---------------------------------------------------------------------------
def parse_file(path: pathlib.Path) -> list[Entry]:
"""Parse a DebugLog-server file into a list of multi-line entries.
Continuation lines (those not matching ENTRY_RE) append to the previous
entry's body, mirroring codex's PatternParser behaviour for multi-line
Java stack traces under an ERROR header.
"""
entries: list[Entry] = []
current: Entry | None = None
with path.open("r", encoding="utf-8", errors="replace") as f:
for lineno, raw in enumerate(f, start=1):
line = raw.rstrip("\n")
m = ENTRY_RE.match(line)
if m:
if current is not None:
entries.append(current)
current = Entry(
timestamp=m.group("ts"),
level=m.group("level"),
body=[m.group("rest")],
line_start=lineno,
line_end=lineno,
)
elif current is not None:
current.body.append(line)
current.line_end = lineno
# else: orphan line at start of file (no preceding entry); ignore.
if current is not None:
entries.append(current)
return entries
# ---------------------------------------------------------------------------
# Phase 1: severity-prefix recognition
# ---------------------------------------------------------------------------
def is_severity_entry(entry: Entry) -> bool:
"""True if this entry is an ERROR/WARN/SEVERE/FATAL — either by the
bracketed level or a leading SEVERE/ERROR/WARN token in the body (after
stripping the session-meta prefix)."""
if entry.level in SEVERITY_LEVELS:
return True
if entry.body and SEVERITY_BODY_RE.match(_strip_session_meta(entry.body[0])):
return True
return False
def effective_level(entry: Entry) -> str:
"""Return the effective severity for an entry. Body-prefix takes
precedence — covers the SEVERE-in-body case where bracketed level is LOG
*and* the case where bracketed level is ERROR but body says SEVERE.
"""
if entry.body:
m = SEVERITY_BODY_RE.match(_strip_session_meta(entry.body[0]))
if m:
return m.group(1).upper()
return entry.level
# ---------------------------------------------------------------------------
# Phase 2: bidirectional stack collection
# ---------------------------------------------------------------------------
def _is_stack_shaped(line: str) -> bool:
return bool(STACK_HINT_RE.search(line))
def _strip_session_meta(body_line: str) -> str:
"""Strip the ``General f:N, t:N, st:...> `` session-metadata prefix from
a body's first line so pattern matching can run against the meaningful tail.
"""
return SESSION_META_RE.sub("", body_line)
def _collect_pre_stack(entries: list[Entry], hit_idx: int) -> list[str]:
"""Walk back through prior entries; collect stack-shaped lines from each
entry's body. Stop at the previous severity-flagged entry. Cap collection
at MAX_STACK_FRAMES and at STACK_WALK_LINES of body lines examined.
Per spec, only return the block if at least one line looks stack-shaped.
"""
collected: list[str] = []
lines_examined = 0
for j in range(hit_idx - 1, -1, -1):
prior = entries[j]
# Stop at another severity line (the previous error's boundary).
if is_severity_entry(prior):
break
# Walk this entry's body in reverse; for body[0] the session-meta
# prefix is part of the line — strip it before stack-shape check.
for k in range(len(prior.body) - 1, -1, -1):
line = prior.body[k]
stripped = _strip_session_meta(line) if k == 0 else line
lines_examined += 1
if _is_stack_shaped(stripped):
collected.append(stripped.strip())
if len(collected) >= MAX_STACK_FRAMES:
break
if lines_examined >= STACK_WALK_LINES:
break
if len(collected) >= MAX_STACK_FRAMES or lines_examined >= STACK_WALK_LINES:
break
if not collected:
return []
collected.reverse() # restore source order
return collected
def _collect_post_stack(entries: list[Entry], hit_idx: int) -> list[str]:
"""Look at the entry's own body continuation lines first (stack frames
attached to the ERROR header become continuation lines after parsing),
then walk forward through subsequent entries. Stop at the next severity
entry. Cap at MAX_STACK_FRAMES and at STACK_WALK_LINES of body lines."""
entry = entries[hit_idx]
collected: list[str] = []
lines_examined = 0
# Body continuations (skip body[0] which is the headline itself).
for line in entry.body[1:]:
lines_examined += 1
if _is_stack_shaped(line):
collected.append(line.strip())
if len(collected) >= MAX_STACK_FRAMES:
return collected
if lines_examined >= STACK_WALK_LINES:
return collected
for j in range(hit_idx + 1, len(entries)):
next_entry = entries[j]
if is_severity_entry(next_entry):
break
for k, line in enumerate(next_entry.body):
stripped = _strip_session_meta(line) if k == 0 else line
lines_examined += 1
if _is_stack_shaped(stripped):
collected.append(stripped.strip())
if len(collected) >= MAX_STACK_FRAMES:
return collected
if lines_examined >= STACK_WALK_LINES:
return collected
return collected
def collect_stack(entries: list[Entry], hit_idx: int) -> list[str]:
"""Merge pre + post stack, dedup preserving order, cap at MAX_STACK_FRAMES."""
pre = _collect_pre_stack(entries, hit_idx)
post = _collect_post_stack(entries, hit_idx)
seen: set[str] = set()
merged: list[str] = []
for frame in pre + post:
if frame in seen:
continue
seen.add(frame)
merged.append(frame)
if len(merged) >= MAX_STACK_FRAMES:
break
return merged
# ---------------------------------------------------------------------------
# Phase 3: mod attribution
# ---------------------------------------------------------------------------
def _norm_mod_key(raw_name: str) -> str:
"""Lowercase, strip spaces / apostrophes / hyphens. Used as mod_id."""
s = raw_name.lower()
for ch in (" ", "'", "-"):
s = s.replace(ch, "")
return s
def _entry_text(entry: Entry) -> str:
"""Whole-entry text (body + collected stack) for marker scanning."""
return "\n".join(entry.body)
def attribute_entry(entry: Entry, prior_lookback_lines: list[str]) -> tuple[str, str, str, str, str]:
"""Determine ``(mod_id, mod_name, attribution, confidence, reason)``.
``prior_lookback_lines`` is the body lines from prior entries that fall
within INFERRED_LOOKBACK_LINES raw-file-line distance from this entry's
start, in source order. The list is scanned in reverse for the nearest
``Lua((MOD:Y))`` marker when inferred attribution is being attempted.
Direct-attribution priority: Lua marker -> needed-by -> require-failed.
Rationale: ``needed by <mod>`` names the dependent mod (more semantically
targeted) and is preferred over ``require("...") failed`` which only names
the missing module path. ``Lua((MOD:...))`` is unambiguous and wins
outright.
"""
text = _entry_text(entry)
# 1. Direct via Lua((MOD:X)) — unambiguous; outranks every other signal.
m = LUA_MOD_MARKER_RE.search(text)
if m:
raw = m.group(1).strip()
return (
_norm_mod_key(raw),
raw,
"direct",
"high",
"Lua((MOD:...)) marker on the entry itself",
)
# 2. Direct via "needed by <mod>"
m = NEEDED_BY_RE.search(text)
if m:
raw = m.group(1).strip().rstrip(".,;")
return (
_norm_mod_key(raw),
raw,
"direct",
"high",
"needed by <mod> hint",
)
# 3. Direct via require("X") failed — attribute to required module name.
m = REQUIRE_FAILED_RE.search(text)
if m:
raw = m.group(1).strip()
# Mod-name first segment (PZ paths often look like Mod/Foo/Bar).
mod_name = raw.split("/")[0] if "/" in raw else raw
return (
_norm_mod_key(mod_name),
mod_name,
"direct",
"high",
'require("...") failed shape',
)
# 4. Inferred — Lua-shaped body + recent Lua((MOD:Y)) within lookback.
if any(p.search(text) for p in LUA_SHAPED_PATTERNS):
for line in reversed(prior_lookback_lines):
mm = LUA_MOD_MARKER_RE.search(line)
if mm:
raw = mm.group(1).strip()
return (
_norm_mod_key(raw),
raw,
"inferred",
"medium",
f"Lua-shaped body; nearest Lua((MOD:{raw})) within "
f"{INFERRED_LOOKBACK_LINES}-line lookback",
)
return (
"__unattributed__",
"",
"unattributed",
"low",
"no marker; body not Lua-shaped or no recent Lua((MOD:...))",
)
# ---------------------------------------------------------------------------
# Phase 4: file:line extraction (five fallbacks, in order)
# ---------------------------------------------------------------------------
def extract_file_line(text: str) -> tuple[str, int]:
"""Run the five fallbacks in order. Returns ``(file, line)`` with line=0
when only a path was matched."""
m = FILE_LINE_AT_RE.search(text)
if m:
return m.group(1), int(m.group(2))
m = FILE_LINE_FUNCTION_RE.search(text)
if m:
return m.group(1), int(m.group(2))
m = FILE_LINE_STRING_RE.search(text)
if m:
return m.group(1), int(m.group(2))
m = FILE_LINE_QUOTED_RE.search(text)
if m:
return m.group(1), int(m.group(2)) if m.group(2) else 0
m = FILE_LINE_UNQUOTED_RE.search(text)
if m:
return m.group(1), int(m.group(2)) if m.group(2) else 0
return "", 0
# ---------------------------------------------------------------------------
# Phase 5: cause-chain extraction
# ---------------------------------------------------------------------------
def extract_cause_chain(text: str) -> str:
"""Return ``ExceptionA: msg -> ExceptionB: msg`` joined chain, deduped,
capped at MAX_CAUSE_CHAIN_LEVELS levels.
"""
tokens: list[str] = []
seen: set[str] = set()
for line in text.splitlines():
cb = CAUSED_BY_RE.search(line)
if cb:
cls = cb.group(1)
msg = cb.group(2) or ""
tok = f"{cls}: {msg.strip()}".rstrip(": ").strip()
if tok not in seen:
seen.add(tok)
tokens.append(tok)
continue
ex = EXCEPTION_LINE_RE.search(line)
if ex:
cls = ex.group(1)
msg = ex.group(2) or ""
tok = f"{cls}: {msg.strip()}".rstrip(": ").strip()
if tok not in seen:
seen.add(tok)
tokens.append(tok)
if len(tokens) >= MAX_CAUSE_CHAIN_LEVELS:
break
return " -> ".join(tokens[:MAX_CAUSE_CHAIN_LEVELS])
# ---------------------------------------------------------------------------
# Phase 6: Java exception kind detection
# ---------------------------------------------------------------------------
JAVA_EXCEPTION_RE = re.compile(r"(?:\w+\.)+\w+(?:Exception|Error)\b")
def detect_kind(entry: Entry, attribution: str, body_text: str) -> str:
"""Determine the ``kind`` field. Order: engine_noise > require_failed >
java_exception > lua_runtime > runtime."""
# Phase 7 short-circuit (engine noise outranks others per spec — engine
# noise is PZ's own diagnostic chatter regardless of class).
if any(p.search(body_text) for p in ENGINE_NOISE_PATTERNS):
return "engine_noise"
if REQUIRE_FAILED_RE.search(body_text):
return "require_failed"
has_java = bool(JAVA_EXCEPTION_RE.search(body_text))
has_lua_marker = bool(LUA_MOD_MARKER_RE.search(body_text))
if has_java and not has_lua_marker:
return "java_exception"
# Lua-attributed runtime / inferred
if has_lua_marker or attribution in ("direct", "inferred"):
return "lua_runtime"
return "runtime"
# ---------------------------------------------------------------------------
# Phase 8: signature computation
# ---------------------------------------------------------------------------
def normalize_first_line(first: str) -> str:
"""Per spec: strip session metadata prefix, strip any leading severity
word (so ``SEVERE: foo`` and ``foo`` produce the same pattern_id when both
are SEVERE-level), flatten quoted strings to ``"<S>"`` / ``'<S>'``, flatten
≥2-digit numeric runs to ``<N>``, collapse whitespace, truncate to 200
chars.
"""
s = first.strip()
s = SESSION_META_RE.sub("", s)
# Strip any leading ERROR:/SEVERE:/WARN:/FATAL: that survived in the body
# — the bracketed level already feeds pattern_id separately, so leaving
# the body-prefix in place would fragment signatures across "body has
# SEVERE: prefix" vs "body has no prefix but bracketed level is SEVERE."
s = SEVERITY_PREFIX_STRIP_RE.sub("", s)
s = DOUBLE_QUOTED_RE.sub('"<S>"', s)
s = SINGLE_QUOTED_RE.sub("'<S>'", s)
s = NUMERIC_RUN_RE.sub("<N>", s)
s = WS_RUN_RE.sub(" ", s)
return s[:PATTERN_ID_FIRST_LINE_MAX]
def compute_pattern_id(level: str, first_line: str) -> str:
"""``sha256(level + normalized_first_line)[:16]``, prefixed ``sha256:``.
16 hex chars (64 bits) chosen for JSON readability vs collision-resistance
trade-off; consumers treat as opaque.
"""
norm = normalize_first_line(first_line)
h = hashlib.sha256(f"{level}\n{norm}".encode("utf-8")).hexdigest()
return f"sha256:{h[:16]}"
def compute_signature(pattern_id: str, mod_id: str) -> str:
"""``sha256(pattern_id + mod_id)[:16]``, prefixed ``sha256:``.
16 hex chars (64 bits) chosen for JSON readability vs collision-resistance
trade-off; consumers treat as opaque.
"""
h = hashlib.sha256(f"{pattern_id}\n{mod_id}".encode("utf-8")).hexdigest()
return f"sha256:{h[:16]}"
# ---------------------------------------------------------------------------
# Aggregation (phase 9) and the public classify_entries entry point
# ---------------------------------------------------------------------------
_CONFIDENCE_RANK: dict[str, int] = {"low": 0, "medium": 1, "high": 2}
_ATTRIBUTION_RANK: dict[str, int] = {
"unattributed": 0,
"inferred": 1,
"direct": 2,
}
def _build_excerpt(entry: Entry, max_chars: int = 1000) -> str:
"""Best-effort one-block excerpt of the entry (header + continuations)."""
lines: list[str] = []
header = f'[{entry.timestamp}] {entry.level}: '
if entry.body:
lines.append(header + entry.body[0])
for cont in entry.body[1:]:
lines.append(cont)
text = "\n".join(lines)
if len(text) > max_chars:
text = text[:max_chars] + "\n... [truncated]"
return text
def _build_lookback_window(entries: list[Entry], hit_idx: int) -> list[str]:
"""Collect body lines from prior entries whose ``line_start`` falls within
INFERRED_LOOKBACK_LINES raw-file-line distance from the current entry.
Spec wording is "within the previous 40 lines", measured in raw file lines
(mirrors pzmm's ``(i - last_mod_line) <= 40``, inclusive of 40). Counting
raw lines means a multi-line entry (e.g., a 5-line Java stack trace) does
not shrink the practical window the way a body-line budget would.
Returned list is in source order (oldest first) so callers can call
``reversed()`` on it.
"""
if hit_idx <= 0:
return []
threshold = entries[hit_idx].line_start - INFERRED_LOOKBACK_LINES
in_window: list[Entry] = []
for j in range(hit_idx - 1, -1, -1):
prior = entries[j]
if prior.line_start < threshold:
break
in_window.append(prior)
# We accumulated newest-first; reverse so we emit in source order.
in_window.reverse()
collected: list[str] = []
for prior in in_window:
collected.extend(prior.body)
return collected
def classify_entries(entries: list[Entry], source_file: str = "") -> list[Record]:
"""Apply phases 1-9 to a parsed-file entry list. Returns one Record per
unique (mod_id, error_shape) pair after dedup on signature.
"""
by_signature: dict[str, Record] = {}
for hit_idx, entry in enumerate(entries):
if not is_severity_entry(entry):
continue
level = effective_level(entry)
body_text = _entry_text(entry)
# Phase 2: stack collection
stack = collect_stack(entries, hit_idx)
# Phase 3: attribution (with INFERRED_LOOKBACK_LINES lookback)
prior_window = _build_lookback_window(entries, hit_idx)
mod_id, mod_name, attribution, confidence, attribution_reason = attribute_entry(
entry, prior_window
)
# Phase 4: file:line extraction (search body + stack frames)
search_text = body_text + "\n" + "\n".join(stack)
file_path, line_no = extract_file_line(search_text)
# Phase 5: cause-chain extraction
cause_chain = extract_cause_chain(search_text)
# Phase 6 & 7: kind detection (engine_noise short-circuits)
kind = detect_kind(entry, attribution, body_text)
# Phase 8: signature computation
pattern_id = compute_pattern_id(level, entry.body[0] if entry.body else "")
signature = compute_signature(pattern_id, mod_id)
# Phase 9: dedup & aggregate
if signature not in by_signature:
by_signature[signature] = Record(
signature=signature,
pattern_id=pattern_id,
level=level,
kind=kind,
mod_id=mod_id,
mod_name=mod_name,
attribution=attribution,
confidence=confidence,
attribution_reason=attribution_reason,
file=file_path,
line=line_no,
cause_chain=cause_chain,
stack=list(stack),
first_seen=FirstSeen(
file=source_file,
line=entry.line_start,
timestamp=entry.timestamp,
),
occurrence_count=1,
files=[source_file] if source_file else [],
excerpt=_build_excerpt(entry),
)
else:
rec = by_signature[signature]
rec.occurrence_count += 1
if source_file and source_file not in rec.files:
rec.files.append(source_file)
# Promote attribution / confidence if this hit is stronger.
if _ATTRIBUTION_RANK[attribution] > _ATTRIBUTION_RANK[rec.attribution]:
rec.attribution = attribution
rec.attribution_reason = attribution_reason
if mod_name:
rec.mod_name = mod_name
if _CONFIDENCE_RANK[confidence] > _CONFIDENCE_RANK[rec.confidence]:
rec.confidence = confidence
# Merge stack frames (preserving order, capped).
for frame in stack:
if frame not in rec.stack and len(rec.stack) < MAX_STACK_FRAMES:
rec.stack.append(frame)
# Extend cause chain if the new hit has additional segments.
if cause_chain and cause_chain != rec.cause_chain:
# Concatenate unseen tokens.
old = rec.cause_chain.split(" -> ") if rec.cause_chain else []
new = cause_chain.split(" -> ")
merged = list(old)
for tok in new:
if tok and tok not in merged:
merged.append(tok)
rec.cause_chain = " -> ".join(merged[:MAX_CAUSE_CHAIN_LEVELS])
return list(by_signature.values())
__all__ = [
"Entry",
"FirstSeen",
"Record",
"parse_file",
"classify_entries",
"is_severity_entry",
"effective_level",
"collect_stack",
"attribute_entry",
"extract_file_line",
"extract_cause_chain",
"detect_kind",
"normalize_first_line",
"compute_pattern_id",
"compute_signature",
"INFERRED_LOOKBACK_LINES",
"MAX_STACK_FRAMES",
"STACK_WALK_LINES",
"MAX_CAUSE_CHAIN_LEVELS",
"SEVERITY_LEVELS",
]

View File

View File

@@ -0,0 +1,7 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:04:00.000] ERROR: General f:0, t:1776297840000, st:48,648,355,178> Lua((MOD:Test Mod Alpha)) wrapper failure
java.lang.RuntimeException: outer wrapper at zombie.Foo(Foo.java:10)
Caused by: java.lang.IllegalStateException: middle layer
Caused by: java.lang.NullPointerException: deepest cause
at zombie.Bar(Bar.java:99)
[16-04-26 00:04:01.000] LOG : General f:0, t:1776297841000, st:48,648,356,178> after.

View File

@@ -0,0 +1,8 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] ERROR: General f:0, t:1776297660000, st:48,648,175,178> Lua((MOD:Test Mod Alpha)) crash 1
at media/lua/client/A.lua:11
[16-04-26 00:01:01.000] ERROR: General f:0, t:1776297661000, st:48,648,176,178> Lua((MOD:Test Mod Alpha)) crash 1
at media/lua/client/A.lua:11
[16-04-26 00:01:02.000] ERROR: General f:0, t:1776297662000, st:48,648,177,178> Lua((MOD:Test Mod Alpha)) crash 1
at media/lua/client/A.lua:11
[16-04-26 00:01:03.000] LOG : General f:0, t:1776297663000, st:48,648,178,178> ok.

View File

View File

@@ -0,0 +1,4 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:03:00.000] ERROR: General f:0, t:1776297780000, st:48,648,295,178> KahluaThread.flusherrormessage> dumping lua stack trace
at media/lua/client/Foo.lua:1
[16-04-26 00:03:01.000] LOG : General f:0, t:1776297781000, st:48,648,296,178> after.

View File

@@ -0,0 +1,10 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] ERROR: General f:0, t:1776297660000, st:48,648,175,178> Lua((MOD:Test Mod A)) format1
at media/lua/client/F1.lua:11
[16-04-26 00:01:01.000] ERROR: General f:0, t:1776297661000, st:48,648,176,178> Lua((MOD:Test Mod B)) format2
function: doStuff -- file: media/lua/client/F2.lua line # 22
[16-04-26 00:01:02.000] ERROR: General f:0, t:1776297662000, st:48,648,177,178> Lua((MOD:Test Mod C)) format3
[string "media/lua/client/F3.lua"]:33: bang
[16-04-26 00:01:03.000] ERROR: General f:0, t:1776297663000, st:48,648,178,178> Lua((MOD:Test Mod D)) format4 about "media/lua/client/F4.lua" failure
[16-04-26 00:01:04.000] ERROR: General f:0, t:1776297664000, st:48,648,179,178> Lua((MOD:Test Mod E)) format5 path media/lua/client/F5.lua mention
[16-04-26 00:01:05.000] LOG : General f:0, t:1776297665000, st:48,648,180,178> ok.

View File

@@ -0,0 +1,7 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] LOG : General f:0, t:1776297660000, st:48,648,175,178> Lua((MOD:Spongies Clothing)) initialised.
[16-04-26 00:01:01.000] LOG : General f:0, t:1776297661000, st:48,648,176,178> ordinary log line.
[16-04-26 00:01:02.000] LOG : General f:0, t:1776297662000, st:48,648,177,178> another log line.
[16-04-26 00:01:03.000] ERROR: General f:0, t:1776297663000, st:48,648,178,178> LuaManager.GetFunctionObject> no such function: doStuff
at media/lua/client/Spongie.lua:7
[16-04-26 00:01:04.000] LOG : General f:0, t:1776297664000, st:48,648,179,178> ok.

View File

@@ -0,0 +1,8 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:19.080] ERROR: General f:0, t:1776297679080, st:48,648,194,258> DebugFileWatcher.registerDir> Exception thrown
java.nio.file.NoSuchFileException: /placeholder/config/mods at UnixException.translateToIOException(null:-1).
Stack trace:
at java.base/sun.nio.fs.UnixException.translateToIOException(Unknown Source)
at java.base/sun.nio.fs.UnixException.asIOException(Unknown Source)
at java.base/sun.nio.fs.LinuxWatchService$Poller.implRegister(Unknown Source)
[16-04-26 00:01:19.090] LOG : General f:0, t:1776297679090, st:48,648,194,268> after.

View File

@@ -0,0 +1,45 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] LOG : General f:0, t:1776297660000, st:48,648,175,178> Lua((MOD:Test Mod Distant)) initialised.
[16-04-26 00:01:01.000] LOG : General f:0, t:1776297661000, st:48,648,176,178> filler 1.
[16-04-26 00:01:02.000] LOG : General f:0, t:1776297662000, st:48,648,177,178> filler 2.
[16-04-26 00:01:03.000] LOG : General f:0, t:1776297663000, st:48,648,178,178> filler 3.
[16-04-26 00:01:04.000] LOG : General f:0, t:1776297664000, st:48,648,179,178> filler 4.
[16-04-26 00:01:05.000] LOG : General f:0, t:1776297665000, st:48,648,180,178> filler 5.
[16-04-26 00:01:06.000] LOG : General f:0, t:1776297666000, st:48,648,181,178> filler 6.
[16-04-26 00:01:07.000] LOG : General f:0, t:1776297667000, st:48,648,182,178> filler 7.
[16-04-26 00:01:08.000] LOG : General f:0, t:1776297668000, st:48,648,183,178> filler 8.
[16-04-26 00:01:09.000] LOG : General f:0, t:1776297669000, st:48,648,184,178> filler 9.
[16-04-26 00:01:10.000] LOG : General f:0, t:1776297670000, st:48,648,185,178> filler 10.
[16-04-26 00:01:11.000] LOG : General f:0, t:1776297671000, st:48,648,186,178> filler 11.
[16-04-26 00:01:12.000] LOG : General f:0, t:1776297672000, st:48,648,187,178> filler 12.
[16-04-26 00:01:13.000] LOG : General f:0, t:1776297673000, st:48,648,188,178> filler 13.
[16-04-26 00:01:14.000] LOG : General f:0, t:1776297674000, st:48,648,189,178> filler 14.
[16-04-26 00:01:15.000] LOG : General f:0, t:1776297675000, st:48,648,190,178> filler 15.
[16-04-26 00:01:16.000] LOG : General f:0, t:1776297676000, st:48,648,191,178> filler 16.
[16-04-26 00:01:17.000] LOG : General f:0, t:1776297677000, st:48,648,192,178> filler 17.
[16-04-26 00:01:18.000] LOG : General f:0, t:1776297678000, st:48,648,193,178> filler 18.
[16-04-26 00:01:19.000] LOG : General f:0, t:1776297679000, st:48,648,194,178> filler 19.
[16-04-26 00:01:20.000] LOG : General f:0, t:1776297680000, st:48,648,195,178> filler 20.
[16-04-26 00:01:21.000] LOG : General f:0, t:1776297681000, st:48,648,196,178> filler 21.
[16-04-26 00:01:22.000] LOG : General f:0, t:1776297682000, st:48,648,197,178> filler 22.
[16-04-26 00:01:23.000] LOG : General f:0, t:1776297683000, st:48,648,198,178> filler 23.
[16-04-26 00:01:24.000] LOG : General f:0, t:1776297684000, st:48,648,199,178> filler 24.
[16-04-26 00:01:25.000] LOG : General f:0, t:1776297685000, st:48,648,200,178> filler 25.
[16-04-26 00:01:26.000] LOG : General f:0, t:1776297686000, st:48,648,201,178> filler 26.
[16-04-26 00:01:27.000] LOG : General f:0, t:1776297687000, st:48,648,202,178> filler 27.
[16-04-26 00:01:28.000] LOG : General f:0, t:1776297688000, st:48,648,203,178> filler 28.
[16-04-26 00:01:29.000] LOG : General f:0, t:1776297689000, st:48,648,204,178> filler 29.
[16-04-26 00:01:30.000] LOG : General f:0, t:1776297690000, st:48,648,205,178> filler 30.
[16-04-26 00:01:31.000] LOG : General f:0, t:1776297691000, st:48,648,206,178> filler 31.
[16-04-26 00:01:32.000] LOG : General f:0, t:1776297692000, st:48,648,207,178> filler 32.
[16-04-26 00:01:33.000] LOG : General f:0, t:1776297693000, st:48,648,208,178> filler 33.
[16-04-26 00:01:34.000] LOG : General f:0, t:1776297694000, st:48,648,209,178> filler 34.
[16-04-26 00:01:35.000] LOG : General f:0, t:1776297695000, st:48,648,210,178> filler 35.
[16-04-26 00:01:36.000] LOG : General f:0, t:1776297696000, st:48,648,211,178> filler 36.
[16-04-26 00:01:37.000] LOG : General f:0, t:1776297697000, st:48,648,212,178> filler 37.
[16-04-26 00:01:38.000] LOG : General f:0, t:1776297698000, st:48,648,213,178> filler 38.
[16-04-26 00:01:39.000] LOG : General f:0, t:1776297699000, st:48,648,214,178> filler 39.
[16-04-26 00:01:40.000] LOG : General f:0, t:1776297700000, st:48,648,215,178> filler 40.
[16-04-26 00:01:41.000] LOG : General f:0, t:1776297701000, st:48,648,216,178> filler 41.
[16-04-26 00:01:42.000] ERROR: General f:0, t:1776297702000, st:48,648,217,178> LuaManager.GetFunctionObject> no such function (way past lookback)
[16-04-26 00:01:43.000] LOG : General f:0, t:1776297703000, st:48,648,218,178> ok.

View File

@@ -0,0 +1,6 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:19.131] LOG : Mod f:0, t:1776297679131, st:48,648,194,309> loading example_mod_alpha.
[16-04-26 00:05:00.000] ERROR: General f:0, t:1776297900000, st:48,648,415,178> Lua((MOD:Test Mod Alpha)) something broke
at media/lua/client/Foo.lua:42
function: doStuff -- file: media/lua/client/Foo.lua line # 42
[16-04-26 00:05:01.000] LOG : General f:0, t:1776297901000, st:48,648,416,178> after the error.

View File

@@ -0,0 +1,3 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] LOG : General f:0, t:1776297660000, st:48,648,175,178> ordinary line.
[16-04-26 00:02:00.000] LOG : General f:0, t:1776297720000, st:48,648,235,178> nothing wrong.

View File

@@ -0,0 +1,5 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] LOG : General f:0, t:1776297660000, st:48,648,175,178> Lua((MOD:Spongies Clothing)) initialised.
[16-04-26 00:01:01.000] LOG : General f:0, t:1776297661000, st:48,648,176,178> ordinary log line.
[16-04-26 00:01:03.000] ERROR: General f:0, t:1776297663000, st:48,648,178,178> Disk full while writing chunk data
[16-04-26 00:01:04.000] LOG : General f:0, t:1776297664000, st:48,648,179,178> ok.

View File

@@ -0,0 +1,6 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] ERROR: General f:0, t:1776297660000, st:48,648,175,178> Lua((MOD:Test Mod Alpha)) crash now
at media/lua/client/X.lua:11
at media/lua/client/Y.lua:22
[string "media/lua/client/Z.lua"]:33: oops
[16-04-26 00:01:04.000] LOG : General f:0, t:1776297664000, st:48,648,179,178> ok.

View File

@@ -0,0 +1,6 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] LOG : General f:0, t:1776297660000, st:48,648,175,178> at media/lua/client/A.lua:11
[16-04-26 00:01:01.000] LOG : General f:0, t:1776297661000, st:48,648,176,178> at media/lua/client/B.lua:22
[16-04-26 00:01:02.000] LOG : General f:0, t:1776297662000, st:48,648,177,178> [string "media/lua/client/C.lua"]:33: oops
[16-04-26 00:01:03.000] ERROR: General f:0, t:1776297663000, st:48,648,178,178> Lua((MOD:Test Mod Alpha)) crash
[16-04-26 00:01:04.000] LOG : General f:0, t:1776297664000, st:48,648,179,178> ok.

View File

@@ -0,0 +1,3 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] ERROR: General f:0, t:1776297660000, st:48,648,175,178> require("DependencyMod/Foo") failed: needed by Test Mod Alpha
[16-04-26 00:01:01.000] LOG : General f:0, t:1776297661000, st:48,648,176,178> ok.

View File

@@ -0,0 +1,5 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:01:00.000] ERROR: General f:0, t:1776297660000, st:48,648,175,178> ERROR: top-level error message
[16-04-26 00:01:01.000] WARN : General f:0, t:1776297661000, st:48,648,176,178> WARN: top-level warn message
[16-04-26 00:01:02.000] ERROR: General f:0, t:1776297662000, st:48,648,177,178> SEVERE: java-style severe message at zombie.Foo(Foo.java:5)
[16-04-26 00:01:03.000] LOG : General f:0, t:1776297663000, st:48,648,178,178> ok.

View File

@@ -0,0 +1,3 @@
[16-04-26 00:00:42.314] LOG : General f:0, t:1776297642254, st:48,648,157,434> server starting.
[16-04-26 00:02:00.000] WARN : General f:0, t:1776297720000, st:48,648,235,178> ZomboidFileSystem.loadModAndRequired> required mod "absent_mod" not found.
[16-04-26 00:02:01.000] LOG : General f:0, t:1776297721000, st:48,648,236,178> after.

View File

@@ -0,0 +1,225 @@
"""Tests for pz_parser phase 3 — mod attribution."""
from __future__ import annotations
import pathlib
import sys
import unittest
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[1]))
import pz_parser # noqa: E402
FIXTURE_DIR = pathlib.Path(__file__).resolve().parent / "fixtures"
def fixture(name: str) -> pathlib.Path:
return FIXTURE_DIR / name
class AttributionBucketTests(unittest.TestCase):
"""Three confidence buckets: direct (high), inferred (medium),
unattributed (low)."""
def test_direct_attribution_when_lua_marker_on_entry(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_lua_attributed.txt"))
records = pz_parser.classify_entries(entries, source_file="la.txt")
self.assertEqual(len(records), 1)
rec = records[0]
self.assertEqual(rec.attribution, "direct")
self.assertEqual(rec.confidence, "high")
# mod_id is normalised: lowercase, no spaces / apostrophes / hyphens.
self.assertEqual(rec.mod_id, "testmodalpha")
self.assertEqual(rec.mod_name, "Test Mod Alpha")
def test_inferred_attribution_within_lookback_window(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_inferred.txt"))
records = pz_parser.classify_entries(entries, source_file="in.txt")
self.assertEqual(len(records), 1)
rec = records[0]
self.assertEqual(rec.attribution, "inferred")
self.assertEqual(rec.confidence, "medium")
self.assertEqual(rec.mod_id, "spongiesclothing")
def test_unattributed_when_no_marker_and_not_lua_shaped(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_unattributed.txt"))
records = pz_parser.classify_entries(entries, source_file="ua.txt")
self.assertEqual(len(records), 1)
rec = records[0]
self.assertEqual(rec.attribution, "unattributed")
self.assertEqual(rec.confidence, "low")
self.assertEqual(rec.mod_id, "__unattributed__")
class LookbackBoundaryTests(unittest.TestCase):
"""Phase 3 — 40-line inferred-attribution window boundary."""
def test_lua_marker_beyond_lookback_does_not_attribute(self) -> None:
# Fixture places the Lua((MOD:...)) >40 lines before the ERROR.
entries = pz_parser.parse_file(fixture("fixture_lookback_boundary.txt"))
records = pz_parser.classify_entries(entries, source_file="lb.txt")
self.assertEqual(len(records), 1)
rec = records[0]
# The Lua-shaped ERROR is far enough back to be unattributed.
self.assertEqual(rec.attribution, "unattributed")
self.assertEqual(rec.mod_id, "__unattributed__")
def test_non_lua_shaped_body_rejects_inferred_attribution(self) -> None:
# Recent Lua((MOD:Spongies Clothing)) emitted, but the ERROR body
# ("Disk full while writing chunk data") isn't Lua-shaped.
entries = pz_parser.parse_file(fixture("fixture_non_lua_no_inferred.txt"))
records = pz_parser.classify_entries(entries, source_file="nl.txt")
self.assertEqual(len(records), 1)
rec = records[0]
self.assertEqual(rec.attribution, "unattributed")
class NeededByTests(unittest.TestCase):
"""Phase 3 — direct attribution via "needed by <mod>" hint."""
def test_needed_by_extracts_dependent_mod(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_require_failed.txt"))
records = pz_parser.classify_entries(entries, source_file="rf.txt")
self.assertEqual(len(records), 1)
rec = records[0]
# "needed by Test Mod Alpha" should set the mod to Test Mod Alpha
# (preferred over the require("...") side which would mention
# DependencyMod). Either way we want direct/high.
self.assertEqual(rec.attribution, "direct")
self.assertEqual(rec.confidence, "high")
# The "needed by" branch is checked before the require() branch in
# the priority order; mod_id should reflect Test Mod Alpha.
self.assertEqual(rec.mod_id, "testmodalpha")
def _make_marker_line(idx: int) -> str:
"""Synthesise a single LOG-level entry containing a Lua((MOD:...)) marker."""
# Vary timestamps so the bracketed prefix is unique-ish; not strictly
# required — they only feed Entry.timestamp, not parsing.
return (
f"[16-04-26 00:00:{idx:02d}.000] LOG : General f:0, "
f"t:1776297642{idx:03d}, st:48,648,157,434> "
"Lua((MOD:Test Mod Alpha)) initialised."
)
def _make_filler_line(idx: int) -> str:
"""A plain LOG-level entry with no marker; one raw line."""
return (
f"[16-04-26 00:01:{idx % 60:02d}.000] LOG : General f:0, "
f"t:177629760{idx:04d}, st:48,648,200,178> filler entry {idx}."
)
def _make_error_line() -> str:
"""A Lua-shaped ERROR with no Lua((MOD:...)) marker on the entry itself
— so attribution must come from the lookback window if it comes at all."""
return (
"[16-04-26 00:02:00.000] ERROR: General f:0, "
"t:1776297900000, st:48,648,300,178> "
"LuaManager.GetFunctionObject> no such function: doStuff"
)
class RawLineLookbackTests(unittest.TestCase):
"""Phase 3 — lookback semantics measure raw file lines, not body-line
budgets. Multi-line entries inside the window must not shrink the
practical reach."""
def _write_fixture(self, name: str, lines: list[str]) -> pathlib.Path:
path = FIXTURE_DIR / name
path.write_text("\n".join(lines) + "\n")
return path
def test_marker_exactly_at_lookback_boundary_attributes(self) -> None:
# Marker on line 1, ERROR on line 41 -> raw-line distance = 40
# (inclusive of INFERRED_LOOKBACK_LINES=40 -> still attributed).
lines = [_make_marker_line(0)]
for i in range(1, 40):
lines.append(_make_filler_line(i))
lines.append(_make_error_line()) # line 41 in the fixture
path = self._write_fixture("_rawline_at_boundary.txt", lines)
try:
entries = pz_parser.parse_file(path)
self.assertEqual(entries[0].line_start, 1)
self.assertEqual(entries[-1].line_start, 41)
records = pz_parser.classify_entries(entries, source_file="b1.txt")
self.assertEqual(len(records), 1)
self.assertEqual(records[0].attribution, "inferred")
self.assertEqual(records[0].mod_id, "testmodalpha")
finally:
path.unlink()
def test_marker_one_line_past_boundary_does_not_attribute(self) -> None:
# Marker on line 1, ERROR on line 42 -> raw-line distance = 41
# (just outside INFERRED_LOOKBACK_LINES -> unattributed).
lines = [_make_marker_line(0)]
for i in range(1, 41):
lines.append(_make_filler_line(i))
lines.append(_make_error_line()) # line 42 in the fixture
path = self._write_fixture("_rawline_past_boundary.txt", lines)
try:
entries = pz_parser.parse_file(path)
self.assertEqual(entries[0].line_start, 1)
self.assertEqual(entries[-1].line_start, 42)
records = pz_parser.classify_entries(entries, source_file="b2.txt")
self.assertEqual(len(records), 1)
self.assertEqual(records[0].attribution, "unattributed")
self.assertEqual(records[0].mod_id, "__unattributed__")
finally:
path.unlink()
def test_multiline_entry_does_not_shrink_practical_lookback(self) -> None:
"""Multi-line entries inside the lookback window do not break
attribution. (Old body-line-budget and new raw-line-distance semantics
happen to be equivalent on contiguous PZ entries; this test locks the
post-fix semantic against future regression to a budget that *would*
differ — e.g. a body-line cap with a smaller value.)
"""
# Layout the file so a multi-line entry sits between marker and ERROR.
# The marker on line 1 is within 40 raw lines of the ERROR even though
# the file has a 6-line multi-line entry in between.
lines = [_make_marker_line(0)] # raw line 1: marker entry
# Single-line fillers on raw lines 2..30 (29 entries).
for i in range(1, 30):
lines.append(_make_filler_line(i))
# Multi-line entry: header on raw line 31, 5 continuations on lines
# 32..36 (Java-stack-trace shape).
lines.append(
"[16-04-26 00:01:30.000] LOG : General f:0, "
"t:1776297930000, st:48,648,200,178> stack trace dump"
)
for k in range(5):
lines.append(f"\tat zombie.SomeClass.method{k}(SomeClass.java:{k + 1})")
# Single-line fillers on raw lines 37..40 (4 entries).
for i in range(30, 34):
lines.append(_make_filler_line(i))
# ERROR at raw line 41 -> N - 1 = 40 -> within window.
lines.append(_make_error_line())
path = self._write_fixture("_rawline_multiline.txt", lines)
try:
entries = pz_parser.parse_file(path)
# Sanity-check the layout: first entry at line 1, multi-line entry
# sits at line 31 with 6 body lines (header + 5 continuations),
# ERROR at line 41.
self.assertEqual(entries[0].line_start, 1)
multi = next(
e for e in entries
if e.line_start == 31 and len(e.body) == 6
)
self.assertEqual(multi.line_end, 36)
self.assertEqual(entries[-1].line_start, 41)
records = pz_parser.classify_entries(entries, source_file="ml.txt")
self.assertEqual(len(records), 1)
# Raw-line-distance semantics: the marker on line 1 is 40 raw
# lines from the ERROR on line 41, so attribution holds. (Old
# body-line-budget would also pass here on contiguous entries;
# this assertion locks the post-fix behavior against future
# regression to a tighter cap.)
self.assertEqual(records[0].attribution, "inferred")
self.assertEqual(records[0].mod_id, "testmodalpha")
finally:
path.unlink()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,199 @@
"""Tests for pz_parser parsing pipeline (phases 1, 2, 4-7, 9)."""
from __future__ import annotations
import pathlib
import sys
import unittest
# Make the parser module importable when running via `python -m unittest
# discover -s tools/pz-analyzer/tests`.
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[1]))
import pz_parser # noqa: E402
FIXTURE_DIR = pathlib.Path(__file__).resolve().parent / "fixtures"
def fixture(name: str) -> pathlib.Path:
return FIXTURE_DIR / name
class ParseFileTests(unittest.TestCase):
"""Phase 0 — basic line-shape recognition and continuation folding."""
def test_parse_file_groups_continuations_under_entry(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_java_exception.txt"))
# 3 bracketed entries; the ERROR has 4 continuation lines.
self.assertEqual(len(entries), 3)
error_entry = entries[1]
self.assertEqual(error_entry.level, "ERROR")
self.assertGreater(len(error_entry.body), 1)
# First continuation should be the java exception line.
self.assertIn("NoSuchFileException", error_entry.body[1])
def test_parse_file_handles_empty_file(self) -> None:
self.assertEqual(pz_parser.parse_file(fixture("fixture_empty.txt")), [])
def test_parse_file_handles_no_errors(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_no_errors.txt"))
self.assertEqual(len(entries), 3)
self.assertTrue(all(e.level == "LOG" for e in entries))
class SeverityRecognitionTests(unittest.TestCase):
"""Phase 1 — ERROR / WARN / SEVERE recognition."""
def test_classify_picks_up_error_warn_and_severe(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_severity_variants.txt"))
records = pz_parser.classify_entries(entries, source_file="severity.txt")
levels = sorted({r.level for r in records})
# Spec accepts ERROR / WARN / SEVERE. The third entry has bracketed
# ERROR but body starts with SEVERE: ; effective_level should be SEVERE.
self.assertIn("ERROR", levels)
self.assertIn("WARN", levels)
self.assertIn("SEVERE", levels)
def test_log_lines_are_ignored(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_no_errors.txt"))
records = pz_parser.classify_entries(entries, source_file="x.txt")
self.assertEqual(records, [])
class StackCollectionTests(unittest.TestCase):
"""Phase 2 — bidirectional stack collection."""
def test_pre_stack_walk_picks_up_preceding_lua_frames(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_pre_stack.txt"))
# The ERROR entry is the 5th LOG-bracketed line; its predecessors are
# LOG-bracketed entries whose bodies are stack-shaped lines.
records = pz_parser.classify_entries(entries, source_file="pre.txt")
self.assertEqual(len(records), 1)
rec = records[0]
# Pre-stack walk should pick up at least the "at media/lua/.../A.lua:11" frame.
self.assertTrue(any("A.lua:11" in f for f in rec.stack))
def test_post_stack_collected_from_entry_body_continuations(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_post_stack.txt"))
records = pz_parser.classify_entries(entries, source_file="post.txt")
self.assertEqual(len(records), 1)
rec = records[0]
self.assertTrue(any("X.lua:11" in f for f in rec.stack))
self.assertTrue(any("Y.lua:22" in f for f in rec.stack))
# Lua [string "..."]:N form preserves quoting in the captured frame.
self.assertTrue(any("Z.lua" in f and ":33" in f for f in rec.stack))
def test_stack_capped_at_eight_frames(self) -> None:
# Synthesise an ERROR with many continuation frames.
lines = ["[16-04-26 00:00:42.314] ERROR: General f:0, t:1, st:1,2,3,4> Lua((MOD:Test Mod Alpha)) crash"]
for i in range(20):
lines.append(f"\tat media/lua/client/F{i}.lua:{i + 1}")
path = FIXTURE_DIR / "_runtime_stack_cap.txt"
path.write_text("\n".join(lines) + "\n")
try:
entries = pz_parser.parse_file(path)
records = pz_parser.classify_entries(entries, source_file="cap.txt")
self.assertEqual(len(records), 1)
self.assertLessEqual(len(records[0].stack), pz_parser.MAX_STACK_FRAMES)
# And it should be exactly MAX_STACK_FRAMES given >MAX inputs.
self.assertEqual(len(records[0].stack), pz_parser.MAX_STACK_FRAMES)
finally:
path.unlink()
class FileLineExtractionTests(unittest.TestCase):
"""Phase 4 — five-fallback file:line extraction."""
def test_each_fallback_form_extracts_path(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_file_line_fallbacks.txt"))
records = pz_parser.classify_entries(entries, source_file="ff.txt")
# 5 distinct ERRORs, distinct mods — should produce 5 records.
files = sorted(r.file for r in records)
self.assertEqual(
files,
sorted([
"media/lua/client/F1.lua",
"media/lua/client/F2.lua",
"media/lua/client/F3.lua",
"media/lua/client/F4.lua",
"media/lua/client/F5.lua",
]),
)
def test_quoted_path_without_line_number_yields_zero(self) -> None:
# Format 4 fixture line lacks a :NN suffix on the quoted path.
file_path, line_no = pz_parser.extract_file_line(
'failure about "media/lua/client/F4.lua" tail'
)
self.assertEqual(file_path, "media/lua/client/F4.lua")
self.assertEqual(line_no, 0)
class CauseChainTests(unittest.TestCase):
"""Phase 5 — Caused-by chain unwinding."""
def test_caused_by_chain_renders_with_arrow_separator(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_cause_chain.txt"))
records = pz_parser.classify_entries(entries, source_file="cc.txt")
self.assertEqual(len(records), 1)
chain = records[0].cause_chain
self.assertIn("RuntimeException", chain)
self.assertIn("IllegalStateException", chain)
self.assertIn("NullPointerException", chain)
# Order preserved (outer -> inner).
idx_runtime = chain.index("RuntimeException")
idx_illegal = chain.index("IllegalStateException")
idx_null = chain.index("NullPointerException")
self.assertLess(idx_runtime, idx_illegal)
self.assertLess(idx_illegal, idx_null)
def test_no_cause_chain_when_no_exceptions(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_unattributed.txt"))
records = pz_parser.classify_entries(entries, source_file="u.txt")
self.assertEqual(len(records), 1)
self.assertEqual(records[0].cause_chain, "")
class KindDetectionTests(unittest.TestCase):
"""Phases 6 & 7 — kind classification."""
def test_java_exception_kind_when_no_lua_marker(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_java_exception.txt"))
records = pz_parser.classify_entries(entries, source_file="je.txt")
self.assertEqual(len(records), 1)
self.assertEqual(records[0].kind, "java_exception")
# Java engine errors should resolve to __unattributed__.
self.assertEqual(records[0].mod_id, "__unattributed__")
def test_engine_noise_kind_for_kahluathread(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_engine_noise.txt"))
records = pz_parser.classify_entries(entries, source_file="en.txt")
self.assertEqual(len(records), 1)
self.assertEqual(records[0].kind, "engine_noise")
def test_lua_runtime_kind_for_attributed_lua_error(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_lua_attributed.txt"))
records = pz_parser.classify_entries(entries, source_file="la.txt")
self.assertEqual(len(records), 1)
self.assertEqual(records[0].kind, "lua_runtime")
def test_require_failed_kind(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_require_failed.txt"))
records = pz_parser.classify_entries(entries, source_file="rf.txt")
self.assertEqual(len(records), 1)
self.assertEqual(records[0].kind, "require_failed")
class AggregationTests(unittest.TestCase):
"""Phase 9 — dedup, occurrence_count, files-set growth."""
def test_three_identical_errors_dedup_to_one_record(self) -> None:
entries = pz_parser.parse_file(fixture("fixture_dedup.txt"))
records = pz_parser.classify_entries(entries, source_file="dd.txt")
self.assertEqual(len(records), 1)
self.assertEqual(records[0].occurrence_count, 3)
# files list shouldn't duplicate "dd.txt".
self.assertEqual(records[0].files, ["dd.txt"])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,91 @@
"""Tests for pz_parser phase 8 — signature computation."""
from __future__ import annotations
import pathlib
import sys
import unittest
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[1]))
import pz_parser # noqa: E402
class PatternIdStabilityTests(unittest.TestCase):
"""pattern_id should be invariant under formatting variations."""
def test_pattern_id_collapses_numeric_runs(self) -> None:
a = pz_parser.compute_pattern_id(
"ERROR",
"General f:0, t:1776297642, st:48,648,157,434> failed at offset 12345",
)
b = pz_parser.compute_pattern_id(
"ERROR",
"General f:0, t:9999999999, st:99,99,99,99> failed at offset 99999",
)
self.assertEqual(a, b)
def test_pattern_id_collapses_quoted_strings_and_whitespace(self) -> None:
a = pz_parser.compute_pattern_id(
"ERROR",
'no such function "doStuff" in module',
)
b = pz_parser.compute_pattern_id(
"ERROR",
'no such function "fooBarBaz" in module',
)
# Whitespace-collapse plus quoted-string-flatten => same pattern_id.
self.assertEqual(a, b)
def test_pattern_id_changes_with_level(self) -> None:
a = pz_parser.compute_pattern_id("ERROR", "exception thrown")
b = pz_parser.compute_pattern_id("WARN", "exception thrown")
self.assertNotEqual(a, b)
class SignatureUniquenessTests(unittest.TestCase):
"""signature should fan out across mods sharing a pattern_id."""
def test_signature_unique_per_mod_for_shared_pattern(self) -> None:
# Same first line, different mod_ids — different signatures, same pattern_id.
pat = pz_parser.compute_pattern_id("ERROR", "Lua((MOD:X)) crash")
sig_a = pz_parser.compute_signature(pat, "spongiesclothing")
sig_b = pz_parser.compute_signature(pat, "testmodalpha")
self.assertNotEqual(sig_a, sig_b)
# Both should share their pattern_id (consumer's pattern-fanout view).
self.assertEqual(pat[:7], "sha256:")
class SeverityPrefixStripTests(unittest.TestCase):
"""A body line that begins with a literal severity word (``SEVERE:``,
``ERROR:``, ``WARN:``, ``FATAL:``) should not fragment pattern_id away
from the otherwise-identical body that lacks the prefix. The bracketed
level already feeds pattern_id; the prefix is redundant and varies in
practice."""
def test_pattern_id_invariant_under_body_prefix_severe(self) -> None:
# Same logical error: one line carries ``SEVERE: `` body prefix, the
# other doesn't. Both classified as SEVERE by their bracketed level.
with_prefix = pz_parser.compute_pattern_id(
"SEVERE",
"SEVERE: foo at zombie.X(File.java:42)",
)
without_prefix = pz_parser.compute_pattern_id(
"SEVERE",
"foo at zombie.X(File.java:42)",
)
self.assertEqual(with_prefix, without_prefix)
def test_pattern_id_invariant_under_body_prefix_error(self) -> None:
with_prefix = pz_parser.compute_pattern_id(
"ERROR",
"ERROR: doStuff failed in module",
)
without_prefix = pz_parser.compute_pattern_id(
"ERROR",
"doStuff failed in module",
)
self.assertEqual(with_prefix, without_prefix)
if __name__ == "__main__":
unittest.main()