Skip to content

Commit ecba87e

Browse files
refactor: use dynamic JSON detection instead of hardcoded field names
Detect any string starting with { or [ and parse as JSON. More maintainable - no need to update field list when new attributes added. Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 65a0db2 commit ecba87e

File tree

2 files changed

+257
-15
lines changed

2 files changed

+257
-15
lines changed
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
# Proposal: Interruptible Process Trace Context Preservation
2+
3+
## Problem Statement
4+
5+
When a Python agent suspends via LangGraph's `interrupt()`, two issues occur:
6+
7+
1. **Checkpoint-based execution**: LangGraph uses checkpoints with `thread_id` as cursor. On resume, execution restarts from the beginning of the interrupted node - creating a new trace context.
8+
9+
2. **Process boundary**: Python agent process exits on suspend. On resume, a new process starts with no memory of original trace.
10+
11+
**Result**: LLMOps shows two separate traces with no link between them.
12+
13+
**Goal**: Match C# Agents behavior - suspended spans show as RUNNING, resumed execution links to original trace.
14+
15+
**Reference**: [LangGraph Interrupts Documentation](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/wait-user-input/)
16+
17+
---
18+
19+
## How Does C# Do It?
20+
21+
### Two Upsert Methods
22+
23+
From `Agents/backend/Execution.Shared/Traces/TraceSpan.cs`:
24+
25+
```csharp
26+
// 1. Workflow context - uses Temporal Activity as async boundary
27+
public async Task UpsertWorkflowAsync()
28+
{
29+
Id = await Workflow.ExecuteActivityAsync(
30+
(TraceExecutor x) => x.UpsertSpanAsync(payload), options);
31+
}
32+
33+
// 2. Non-workflow context - direct async call
34+
public async Task UpsertAsync(ITraceCreationService service)
35+
{
36+
Id = await service.UpsertSpanAsync(...);
37+
}
38+
```
39+
40+
### Upsert Decision Matrix
41+
42+
From `ConversationalEngineWorkflow.cs`:
43+
44+
| When | Await? | Why |
45+
|------|--------|-----|
46+
| Before LLM call | Fire-and-forget | Real-time visibility, don't block |
47+
| After success (normal) | Fire-and-forget | Non-critical, performance matters |
48+
| After success (eval) | Await | Evaluators need span data |
49+
| On error | Await | Must record before throwing |
50+
| On suspend | Await | Critical checkpoint, process exiting |
51+
52+
### Status Values
53+
54+
From `llm-observability/StatusEnum.cs`:
55+
56+
| Status | Value | Used For |
57+
|--------|-------|----------|
58+
| Unset | 0 | Default |
59+
| Ok | 1 | Success |
60+
| Error | 2 | Failure |
61+
| **Running** | **3** | **Suspended/waiting** |
62+
| Restricted | 4 | - |
63+
| Cancelled | 5 | - |
64+
65+
**Note**: No dedicated INTERRUPTED status. C# uses `Running` for suspended spans.
66+
67+
### C# Escalation Tool Pattern
68+
69+
From `EscalationToolWorkflow.cs` - the tool span CONTINUES after resume (same span, multiple upserts):
70+
71+
```csharp
72+
var span = SpanInit(payload);
73+
await span.UpsertWorkflowAsync(); // 1. Initial (Running)
74+
75+
// ... create escalation task ...
76+
span.Attributes.TaskId = result.TaskId;
77+
await span.UpsertWorkflowAsync(); // 2. With task details
78+
79+
await Workflow.WaitConditionAsync(...); // SUSPEND HERE
80+
81+
// ... after resume ...
82+
span.Attributes.Result = outcome.Result;
83+
span.Status = TraceStatus.Ok;
84+
await span.UpsertWorkflowAsync(); // 3. Final (Ok)
85+
```
86+
87+
**Key insight**: Same span object is upserted 3 times. Temporal preserves span in memory across suspend/resume.
88+
89+
---
90+
91+
## Can We Follow Same as C#?
92+
93+
### Differences
94+
95+
| Aspect | C# Agents | Python Agents |
96+
|--------|-----------|---------------|
97+
| Orchestration | Temporal Workflows | LangGraph |
98+
| Async boundary | Temporal Activities | Python async/await |
99+
| Suspend mechanism | `Workflow.WaitConditionAsync` | `GraphInterrupt` |
100+
| Resume trigger | Temporal signal | CLI `--resume` flag |
101+
| State persistence | Temporal (in-memory across suspend) | SQLite (across process restart) |
102+
| Span object | Preserved in memory | Lost on process exit |
103+
104+
### What We Can Replicate
105+
106+
| C# Feature | Python Equivalent |
107+
|------------|-------------------|
108+
| `await span.UpsertWorkflowAsync()` | `exporter.upsert_span()` (sync, blocking) |
109+
| `Running` status for suspend | Same - use `SpanStatus.RUNNING = 3` |
110+
| Trace context preservation | SQLite storage for span data |
111+
112+
**SQLite storage needs more than IDs**: Since upsert overwrites, we must store full span data (TraceId, SpanId, ParentId, Name, StartTime, Attributes) to continue the span on resume. See `llm-observability/Span.cs` for required fields.
113+
114+
### What We Cannot Replicate
115+
116+
- **Temporal's durable execution**: Temporal automatically replays workflow from checkpoint on failure. Python has no equivalent - if process dies mid-execution, state is lost unless explicitly persisted.
117+
118+
- **True fire-and-forget**: Temporal Activity Workers run in separate thread pool, so `_ = UpsertWorkflowAsync()` doesn't block workflow. In Python, we'd need explicit queue+thread (like OTel's [BatchSpanProcessor](https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py)).
119+
120+
---
121+
122+
## Should We Do That?
123+
124+
**Yes**, with simplifications:
125+
126+
1. **Use sync `upsert_span()`** - Process is exiting anyway on suspend, blocking 50-200ms is acceptable. Existing sync method works fine - no need for async version.
127+
128+
2. **SQLite for trace context** - Already used for resume triggers. Store full span data for continuation.
129+
130+
3. **Skip queue+thread for now** - Not needed for suspend-only. If we need real-time updates from sync callbacks later, we can follow OTel's BatchSpanProcessor pattern.
131+
132+
---
133+
134+
## Example: Escalation Tool
135+
136+
### Before (Current Behavior)
137+
138+
```
139+
Agent Run #1 (trace_id: abc-123)
140+
├── LLM Call (decides to escalate)
141+
├── Tool: escalation_tool
142+
│ └── raises GraphInterrupt
143+
└── [Agent span ends - status unknown]
144+
[Tool span ends - status unknown]
145+
146+
--- Process exits, LangGraph checkpoint saved ---
147+
148+
Agent Run #2 (trace_id: xyz-789) ← NEW TRACE
149+
├── [Resumes from node start per LangGraph behavior]
150+
├── LLM Call
151+
└── Agent completes
152+
```
153+
154+
**LLMOps shows**: Two separate traces, no link between them.
155+
156+
### After (Expected Behavior)
157+
158+
**Continue same spans (like C#)**
159+
160+
```
161+
Agent Run (trace_id: abc-123)
162+
├── LLM Call
163+
├── Tool: escalation_tool
164+
│ ├── [Upsert: RUNNING status, saved to SQLite]
165+
│ │
166+
│ │ --- Process exits ---
167+
│ │
168+
│ ├── [Resume: Load from SQLite, upsert continuation]
169+
│ └── [Upsert: OK status with result]
170+
└── Agent completes [OK status]
171+
```
172+
173+
174+
175+
---
176+
177+
## Changes Needed
178+
179+
### uipath-python
180+
181+
| Change | Description |
182+
|--------|-------------|
183+
| None | Existing sync `upsert_span()` works |
184+
| (Future) Add async version | If we need non-blocking upserts |
185+
186+
### uipath-langchain-python
187+
188+
**Storage approach**: [PR #372](https://github.com/UiPath/uipath-langchain-python/pull/372) already adds generic key-value storage:
189+
190+
| Already Available | Description |
191+
|-------------------|-------------|
192+
| `set_value(runtime_id, namespace, key, value)` | Generic key-value persist |
193+
| `get_value(runtime_id, namespace, key)` | Generic key-value retrieve |
194+
| `__uipath_runtime_kv` table | Schema: `(runtime_id, namespace, key, value, timestamp)` |
195+
196+
**Our trace context storage**:
197+
```python
198+
# Usage
199+
storage.set_value(
200+
runtime_id="agent-123",
201+
namespace="trace_context",
202+
key="agent_span",
203+
value={
204+
"trace_id": "abc-123-...",
205+
"span_id": "def-456-...",
206+
"parent_span_id": None,
207+
"name": "agent.json",
208+
"start_time": "2024-01-15T10:30:00Z",
209+
"attributes": {
210+
"agentId": "agent-123",
211+
"systemPrompt": "...",
212+
"userPrompt": "..."
213+
}
214+
}
215+
)
216+
```
217+
218+
| Change Needed | Description |
219+
|---------------|-------------|
220+
| None in uipath-langchain-python | Use existing `set_value`/`get_value` with namespace=`trace_context` |
221+
222+
### uipath-agents-python
223+
224+
**Piggyback opportunity**: Runtime wrapping order is `UiPathResumableRuntime``TelemetryRuntimeWrapper``AgentsLangGraphRuntime`. The span is still open when result returns to TelemetryRuntimeWrapper (before `finally` cleanup). We can detect SUSPENDED there.
225+
226+
| Change | Description |
227+
|--------|-------------|
228+
| Modify `TelemetryRuntimeWrapper.execute()` | Check `result.status == SUSPENDED` before span closes |
229+
| Upsert agent span with RUNNING | Call `upsert_span()` with current span data |
230+
| Save trace context to SQLite | Via storage (passed from factory) |
231+
| Restore trace context on resume | Load from SQLite, restore span as parent |
232+
233+
234+
---
235+
236+
**Error during resume**: Use try/catch pattern like C#:
237+
```python
238+
try:
239+
result = await delegate.execute(...)
240+
except Exception as ex:
241+
# Upsert span with ERROR status before re-raising
242+
upsert_span(agent_span, status=SpanStatus.ERROR, error=str(ex))
243+
raise
244+
```
245+
From `EscalationToolWorkflow.cs:123-126`: `span.SetError(ex, Workflow.UtcNow)` then `await span.UpsertWorkflowAsync()`
246+
247+
## Open Questions
248+
249+
1. **Multiple suspensions**: If agent suspends multiple times, do we overwrite trace context or keep history?
250+
251+
2. **Cleanup**: When should we delete trace context from SQLite? After successful completion?
252+
253+

src/uipath/tracing/_otel_exporters.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -359,21 +359,10 @@ def _process_span_attributes(self, span_data: Dict[str, Any]) -> None:
359359

360360
# Parse JSON-encoded strings that should be objects (avoids double-encoding)
361361
# OTEL only accepts primitives, so agents serialize dicts to JSON strings.
362-
# We parse them back here before final serialization.
363-
for key in (
364-
"inputSchema",
365-
"outputSchema",
366-
"settings",
367-
"toolCalls",
368-
"usage",
369-
"error",
370-
"output",
371-
"input",
372-
"arguments",
373-
"result",
374-
):
375-
if key in attributes:
376-
attributes[key] = _safe_parse_json(attributes[key])
362+
# Detect and parse any string that looks like JSON object/array.
363+
for key, value in attributes.items():
364+
if isinstance(value, str) and value and value[0] in "{[":
365+
attributes[key] = _safe_parse_json(value)
377366

378367
# If attributes were a string (legacy path), serialize back
379368
# If dict (optimized path), leave as dict - caller will serialize once at the end

0 commit comments

Comments
 (0)