|
| 1 | +"""Rich error formatting for AI summarization test playground (staff debugging).""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +from collections import deque |
| 6 | +from typing import Any |
| 7 | + |
| 8 | + |
| 9 | +def format_playground_exception(exc: BaseException, *, max_related: int = 16) -> str: |
| 10 | + """ |
| 11 | + Build a multi-line, human-readable error for the summarization test UIs. |
| 12 | +
|
| 13 | + Works for any exception type. Adds optional Pydantic validation details when present. |
| 14 | + """ |
| 15 | + lines: list[str] = [] |
| 16 | + |
| 17 | + def append_block(title: str, body: str) -> None: |
| 18 | + if lines: |
| 19 | + lines.append("") |
| 20 | + lines.append(f"=== {title} ===") |
| 21 | + lines.append(body.strip() if body else "(keine Meldung)") |
| 22 | + |
| 23 | + append_block("Hauptfehler", f"{type(exc).__name__}: {exc}") |
| 24 | + |
| 25 | + related = _collect_related_exceptions(exc, max_total=max_related) |
| 26 | + if related: |
| 27 | + parts = [] |
| 28 | + for i, link in enumerate(related, start=1): |
| 29 | + parts.append(f"[{i}] {type(link).__name__}: {link}") |
| 30 | + append_block("Verknüpfte Fehler (Ursache / Kontext / Gruppe)", "\n".join(parts)) |
| 31 | + |
| 32 | + pydantic_bits = _collect_pydantic_error_details(exc) |
| 33 | + if pydantic_bits: |
| 34 | + append_block("Validierungsdetails", "\n".join(pydantic_bits)) |
| 35 | + |
| 36 | + return "\n".join(lines) |
| 37 | + |
| 38 | + |
| 39 | +def _collect_related_exceptions(root: BaseException, *, max_total: int) -> list[BaseException]: |
| 40 | + """ |
| 41 | + All exceptions reachable via __cause__, __context__, and ExceptionGroup.subexceptions. |
| 42 | +
|
| 43 | + Excludes `root` (already shown as Hauptfehler). Order: BFS, no duplicates. |
| 44 | + """ |
| 45 | + out: list[BaseException] = [] |
| 46 | + seen: set[int] = set() |
| 47 | + q: deque[BaseException] = deque() |
| 48 | + seen.add(id(root)) |
| 49 | + |
| 50 | + def enqueue(e: BaseException | None) -> None: |
| 51 | + if e is None or id(e) in seen: |
| 52 | + return |
| 53 | + seen.add(id(e)) |
| 54 | + q.append(e) |
| 55 | + |
| 56 | + enqueue(getattr(root, "__cause__", None)) |
| 57 | + enqueue(getattr(root, "__context__", None)) |
| 58 | + _enqueue_exception_group_children(root, enqueue) |
| 59 | + |
| 60 | + while q and len(out) < max_total: |
| 61 | + cur = q.popleft() |
| 62 | + out.append(cur) |
| 63 | + enqueue(getattr(cur, "__cause__", None)) |
| 64 | + enqueue(getattr(cur, "__context__", None)) |
| 65 | + _enqueue_exception_group_children(cur, enqueue) |
| 66 | + |
| 67 | + return out |
| 68 | + |
| 69 | + |
| 70 | +def _enqueue_exception_group_children(exc: BaseException, enqueue: Any) -> None: |
| 71 | + subs = getattr(exc, "exceptions", None) |
| 72 | + if not subs: |
| 73 | + return |
| 74 | + if type(exc).__name__ not in ("ExceptionGroup", "BaseExceptionGroup"): |
| 75 | + return |
| 76 | + for sub in subs: |
| 77 | + if isinstance(sub, BaseException): |
| 78 | + enqueue(sub) |
| 79 | + |
| 80 | + |
| 81 | +def _collect_pydantic_error_details(exc: BaseException, *, max_errors: int = 12) -> list[str]: |
| 82 | + """Extract pydantic v2 ValidationError.errors() entries from an exception chain.""" |
| 83 | + out: list[str] = [] |
| 84 | + seen: set[int] = set() |
| 85 | + stack: list[BaseException] = [exc] |
| 86 | + |
| 87 | + while stack: |
| 88 | + cur = stack.pop() |
| 89 | + cid = id(cur) |
| 90 | + if cid in seen: |
| 91 | + continue |
| 92 | + seen.add(cid) |
| 93 | + |
| 94 | + err_fn = getattr(cur, "errors", None) |
| 95 | + if callable(err_fn): |
| 96 | + try: |
| 97 | + raw = err_fn() |
| 98 | + except Exception: |
| 99 | + raw = None |
| 100 | + if isinstance(raw, list) and raw: |
| 101 | + for item in raw[:max_errors]: |
| 102 | + out.append(_format_one_pydantic_error(item)) |
| 103 | + if len(raw) > max_errors: |
| 104 | + out.append(f"... und {len(raw) - max_errors} weitere Fehler") |
| 105 | + return out |
| 106 | + |
| 107 | + for nxt in (getattr(cur, "__cause__", None), getattr(cur, "__context__", None)): |
| 108 | + if isinstance(nxt, BaseException): |
| 109 | + stack.append(nxt) |
| 110 | + |
| 111 | + return out |
| 112 | + |
| 113 | + |
| 114 | +def _format_one_pydantic_error(item: Any) -> str: |
| 115 | + if not isinstance(item, dict): |
| 116 | + return str(item) |
| 117 | + loc = item.get("loc") |
| 118 | + loc_s = ".".join(str(x) for x in loc) if isinstance(loc, tuple) else str(loc) |
| 119 | + msg = item.get("msg", "") |
| 120 | + typ = item.get("type", "") |
| 121 | + parts = [f"• {loc_s or '(root)'}: {msg}"] |
| 122 | + if typ: |
| 123 | + parts.append(f" (Typ: {typ})") |
| 124 | + inp = item.get("input") |
| 125 | + if inp is not None: |
| 126 | + snippet = _shorten_for_display(inp, limit=400) |
| 127 | + parts.append(f" Eingabe-Ausschnitt: {snippet!r}") |
| 128 | + return "\n".join(parts) |
| 129 | + |
| 130 | + |
| 131 | +def _shorten_for_display(s: Any, *, limit: int) -> str: |
| 132 | + text = s if isinstance(s, str) else repr(s) |
| 133 | + text = text.replace("\r\n", "\n").replace("\r", "\n") |
| 134 | + if len(text) <= limit: |
| 135 | + return text |
| 136 | + return text[: limit - 3] + "..." |
0 commit comments