Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath"
version = "2.6.8"
version = "2.6.9"
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
80 changes: 49 additions & 31 deletions src/uipath/_cli/_evals/_live_tracking_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,64 @@ class LiveTrackingSpanProcessor(SpanProcessor):
- On span start: Upsert with RUNNING status
- On span end: Upsert with final status (OK/ERROR)

All upsert calls run in background threads without blocking evaluation
execution. Uses a thread pool to cap the maximum number of concurrent
threads and avoid resource exhaustion.
All upsert calls run in background threads without blocking execution.
Uses a thread pool to cap the maximum number of concurrent threads
and avoid resource exhaustion.

Filtering:
Applies the span_filter from factory settings (if provided).
- Low-code agents: Filter to uipath.custom_instrumentation=True
- Coded functions: No filtering (settings=None)

Architecture note:
One LiveTrackingSpanProcessor per LlmOpsHttpExporter.
Do not share processors between exporters.
"""

def __init__(
self,
exporter: LlmOpsHttpExporter,
max_workers: int = 10,
settings=None,
):
self.exporter = exporter
self.span_status = SpanStatus
self.settings = settings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Equivalent code:

We are using it on the next line

 if settings and settings.trace_settings:
     self.span_filter = settings.trace_settings.span_filter
 else:
     self.span_filter = None
    ```

self.span_filter = (
settings.trace_settings.span_filter
if settings and settings.trace_settings
else None
)
self.executor = ThreadPoolExecutor(
max_workers=max_workers, thread_name_prefix="span-upsert"
)

@classmethod
def create_and_register(
cls,
exporter: LlmOpsHttpExporter,
trace_manager,
max_workers: int = 10,
settings=None,
) -> "LiveTrackingSpanProcessor":
"""Factory method to create and register a live tracking processor.

Creates one LiveTrackingSpanProcessor per exporter following the
architecture pattern: one processor → one exporter.

Args:
exporter: The LlmOpsHttpExporter to send upserts to
trace_manager: UiPathTraceManager instance to register with
max_workers: Thread pool size for async upserts
settings: UiPathRuntimeFactorySettings with optional span_filter

Returns:
The created and registered processor
"""
processor = cls(exporter, max_workers, settings)
trace_manager.add_span_processor(processor)
return processor

def _upsert_span_async(
self, span: Span | ReadableSpan, status_override: int | None = None
) -> None:
Expand Down Expand Up @@ -59,40 +101,16 @@ def on_start(
self, span: Span, parent_context: context_api.Context | None = None
) -> None:
"""Called when span starts - upsert with RUNNING status (non-blocking)."""
# Only track evaluation-related spans
if span.attributes and self._is_eval_span(span):
# Apply factory span filter if configured
if self.span_filter is None or self.span_filter(span):
self._upsert_span_async(span, status_override=self.span_status.RUNNING)

def on_end(self, span: ReadableSpan) -> None:
"""Called when span ends - upsert with final status (non-blocking)."""
# Only track evaluation-related spans
if span.attributes and self._is_eval_span(span):
# Apply factory span filter if configured
if self.span_filter is None or self.span_filter(span):
self._upsert_span_async(span)

def _is_eval_span(self, span: Span | ReadableSpan) -> bool:
"""Check if span is evaluation-related."""
if not span.attributes:
return False

span_type = span.attributes.get("span_type")
# Track eval-related span types
eval_span_types = {
"eval",
"evaluator",
"evaluation",
"eval_set_run",
"evalOutput",
}

if span_type in eval_span_types:
return True

# Also track spans with execution.id (eval executions)
if "execution.id" in span.attributes:
return True

return False

def shutdown(self) -> None:
"""Shutdown the processor and wait for pending tasks to complete."""
try:
Expand Down
4 changes: 0 additions & 4 deletions src/uipath/_cli/_evals/_progress_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,10 +730,6 @@ async def handle_update_eval_run(self, payload: EvalRunUpdatedEvent) -> None:
f"Found eval_run_id={eval_run_id} for execution_id={payload.execution_id} in cache"
)

# Export spans using the eval_set_run_id as trace_id (already set during CREATE_EVAL_SET_RUN)
# Individual eval runs are distinguished by span attributes, not separate trace IDs
self.spans_exporter.export(payload.spans)

for eval_result in payload.eval_results:
evaluator_id = eval_result.evaluator_id
if evaluator_id in self.evaluator_scores:
Expand Down
8 changes: 5 additions & 3 deletions src/uipath/_cli/cli_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from uipath._cli._chat._bridge import get_chat_bridge
from uipath._cli._debug._bridge import get_debug_bridge
from uipath._cli._evals._live_tracking_processor import LiveTrackingSpanProcessor
from uipath._cli._evals._span_collection import ExecutionSpanCollector
from uipath._cli._evals.mocks.mocks import (
clear_execution_context,
Expand Down Expand Up @@ -194,15 +195,16 @@ async def execute_debug_runtime():
trigger_poll_interval: float = 5.0

factory = UiPathRuntimeFactoryRegistry.get(context=ctx)
factory_settings = await factory.get_settings()

runtime = await factory.new_runtime(
entrypoint, ctx.conversation_id or ctx.job_id or "default"
)

if ctx.job_id:
is_low_code = entrypoint == "agent.json"
trace_manager.add_span_exporter(
LlmOpsHttpExporter(is_low_code=is_low_code)
job_exporter = LlmOpsHttpExporter()
LiveTrackingSpanProcessor.create_and_register(
job_exporter, trace_manager, settings=factory_settings
)
trigger_poll_interval = (
0.0 # Polling disabled for production jobs
Expand Down
47 changes: 17 additions & 30 deletions src/uipath/_cli/cli_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,13 @@ def eval(
async def execute_eval():
event_bus = EventBus()

is_low_code = eval_context.entrypoint == "agent.json"

# Only create studio web exporter when reporting to Studio Web
studio_web_tracking_exporter = None
if should_register_progress_reporter:
# Set trace_id during exporter creation, not mutation afterward
studio_web_tracking_exporter = LlmOpsHttpExporter(
is_low_code=is_low_code
trace_id=eval_context.eval_set_run_id
)
if eval_context.eval_set_run_id:
studio_web_tracking_exporter.trace_id = (
eval_context.eval_set_run_id
)

progress_reporter = StudioWebProgressReporter(
studio_web_tracking_exporter
Expand All @@ -241,41 +236,33 @@ async def execute_eval():
# Set job_id in eval context for single runtime runs
eval_context.job_id = ctx.job_id

# Create job exporter for live tracking
job_exporter = None
runtime_factory = UiPathRuntimeFactoryRegistry.get(context=ctx)
factory_settings = await runtime_factory.get_settings()
if ctx.job_id:
job_exporter = LlmOpsHttpExporter(is_low_code=is_low_code)
trace_manager.add_span_exporter(job_exporter)
# Add live tracking processor for real-time span updates
job_tracking_processor = LiveTrackingSpanProcessor(job_exporter)
trace_manager.tracer_span_processors.append(
job_tracking_processor
)
trace_manager.tracer_provider.add_span_processor(
job_tracking_processor
job_exporter = LlmOpsHttpExporter()
LiveTrackingSpanProcessor.create_and_register(
job_exporter, trace_manager, settings=factory_settings
)

# Add studio web tracking processor if reporting to Studio Web
if studio_web_tracking_exporter:
studio_web_tracking_processor = LiveTrackingSpanProcessor(
studio_web_tracking_exporter
)
trace_manager.tracer_span_processors.append(
studio_web_tracking_processor
)
trace_manager.tracer_provider.add_span_processor(
studio_web_tracking_processor
LiveTrackingSpanProcessor.create_and_register(
studio_web_tracking_exporter,
trace_manager,
settings=factory_settings,
)

if trace_file:
trace_settings = (
factory_settings.trace_settings
if factory_settings
else None
)
trace_manager.add_span_exporter(
JsonLinesFileExporter(trace_file)
JsonLinesFileExporter(trace_file), settings=trace_settings
)

project_id = UiPathConfig.project_id

runtime_factory = UiPathRuntimeFactoryRegistry.get(context=ctx)

try:
if project_id:
studio_client = StudioClient(project_id)
Expand Down
10 changes: 7 additions & 3 deletions src/uipath/_cli/cli_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from uipath._utils._bindings import ResourceOverwritesContext
from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter

from ._evals._live_tracking_processor import LiveTrackingSpanProcessor
from ._utils._console import ConsoleLogger
from .middlewares import Middlewares

Expand Down Expand Up @@ -180,15 +181,18 @@ async def execute() -> None:
factory: UiPathRuntimeFactoryProtocol | None = None
try:
factory = UiPathRuntimeFactoryRegistry.get(context=ctx)
factory_settings = await factory.get_settings()
runtime = await factory.new_runtime(
entrypoint,
ctx.conversation_id or ctx.job_id or "default",
)

if ctx.job_id:
is_low_code = entrypoint == "agent.json"
trace_manager.add_span_exporter(
LlmOpsHttpExporter(is_low_code=is_low_code)
job_exporter = LlmOpsHttpExporter()
LiveTrackingSpanProcessor.create_and_register(
job_exporter,
trace_manager,
settings=factory_settings,
)

if ctx.conversation_id and ctx.exchange_id:
Expand Down
8 changes: 7 additions & 1 deletion src/uipath/functions/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ async def get_storage(self) -> UiPathRuntimeStorageProtocol | None:
return None

async def get_settings(self) -> UiPathRuntimeFactorySettings | None:
"""Get factory settings if any (placeholder for protocol compliance)."""
"""Get factory settings for coded functions.
Coded functions don't need span filtering - all spans are relevant
since developers have full control over instrumentation.
Low-code agents (LangGraph) need filtering due to framework overhead.
"""
return None

async def new_runtime(
Expand Down
50 changes: 3 additions & 47 deletions src/uipath/tracing/_otel_exporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,13 @@ class Status:
def __init__(
self,
trace_id: Optional[str] = None,
is_low_code: bool = False,
**kwargs,
):
"""Initialize the exporter with the base URL and authentication token.
Args:
trace_id: Optional trace ID to use for all spans
is_low_code: Whether this is for a low code process (agent.json).
If True, applies custom instrumentation filtering.
If False (coded agent), no filtering is applied.
"""
super().__init__(**kwargs)
super().__init__()
self.base_url = self._get_base_url()
self.auth_token = os.environ.get("UIPATH_ACCESS_TOKEN")
self.headers = {
Expand All @@ -131,23 +126,15 @@ def __init__(

self.http_client = httpx.Client(**client_kwargs, headers=self.headers)
self.trace_id = trace_id
self.is_low_code = is_low_code

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""Export spans to UiPath LLM Ops."""
if len(spans) == 0:
logger.warning("No spans to export")
return SpanExportResult.SUCCESS

# Filter out spans marked for dropping
filtered_spans = [s for s in spans if not self._should_drop_span(s)]

if len(filtered_spans) == 0:
logger.debug("No spans to export after filtering dropped spans")
return SpanExportResult.SUCCESS

logger.debug(
f"Exporting {len(filtered_spans)} spans to {self.base_url}/llmopstenant_/api/Traces/spans"
f"Exporting {len(spans)} spans to {self.base_url}/llmopstenant_/api/Traces/spans"
)

# Use optimized path: keep attributes as dict for processing
Expand All @@ -156,7 +143,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
_SpanUtils.otel_span_to_uipath_span(
span, custom_trace_id=self.trace_id, serialize_attributes=False
).to_dict(serialize_attributes=False)
for span in filtered_spans
for span in spans
]

url = self._build_url(span_list)
Expand Down Expand Up @@ -197,9 +184,6 @@ def upsert_span(
Returns:
SpanExportResult indicating success or failure
"""
if self._should_drop_span(span):
return SpanExportResult.SUCCESS

span_data = _SpanUtils.otel_span_to_uipath_span(
span, custom_trace_id=self.trace_id, serialize_attributes=False
).to_dict(serialize_attributes=False)
Expand Down Expand Up @@ -418,34 +402,6 @@ def _get_base_url(self) -> str:

return uipath_url

def _should_drop_span(self, span: ReadableSpan) -> bool:
"""Check if span should be dropped using whitelist filtering.
For low code processes (agent.json):
Only spans with uipath.custom_instrumentation=True are kept.
All other spans (HTTP instrumentation, OpenTelemetry generic spans,
auto-instrumentation, etc.) are dropped.
For coded agents:
No filtering is applied - all spans are kept.
Args:
span: The span to check
Returns:
True if the span should be dropped, False otherwise
"""
# For coded agents, don't drop any spans
if not self.is_low_code:
return False

# For low code processes, apply whitelist filtering
attrs = span.attributes or {}

# Whitelist: only keep spans with custom instrumentation marker
# Drop everything else
return not attrs.get("uipath.custom_instrumentation")


class JsonLinesFileExporter(SpanExporter):
def __init__(self, file_path: str):
Expand Down
Loading