Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath"
version = "2.5.3"
version = "2.5.4"
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
179 changes: 170 additions & 9 deletions src/uipath/_cli/_evals/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ class UiPathEvalContext:
report_coverage: bool = False
input_overrides: dict[str, Any] | None = None
model_settings_id: str = "default"
resume: bool = False
job_id: str | None = None


class UiPathEvalRuntime:
Expand Down Expand Up @@ -327,7 +329,8 @@ def __init__(
self.trace_manager.tracer_provider.add_span_processor(live_tracking_processor)

self.logs_exporter: ExecutionLogsExporter = ExecutionLogsExporter()
self.execution_id = str(uuid.uuid4())
# Use job_id if available (for single runtime runs), otherwise generate UUID
self.execution_id = context.job_id or str(uuid.uuid4())
self.coverage = coverage.Coverage(branch=True)

async def __aenter__(self) -> "UiPathEvalRuntime":
Expand Down Expand Up @@ -405,6 +408,17 @@ async def initiate_evaluation(
)

async def execute(self) -> UiPathRuntimeResult:
logger.info("=" * 80)
logger.info("EVAL RUNTIME: Starting evaluation execution")
logger.info(f"EVAL RUNTIME: Execution ID: {self.execution_id}")
logger.info(f"EVAL RUNTIME: Job ID: {self.context.job_id}")
logger.info(f"EVAL RUNTIME: Resume mode: {self.context.resume}")
if self.context.resume:
logger.info(
"🟢 EVAL RUNTIME: RESUME MODE ENABLED - Will resume from suspended state"
)
logger.info("=" * 80)

# Configure model settings override before creating runtime
await self._configure_model_settings_override()

Expand Down Expand Up @@ -490,9 +504,63 @@ async def execute(self) -> UiPathRuntimeResult:
wait_for_completion=False,
)

# Collect triggers from all evaluation runs (pass-through from inner runtime)
logger.info("=" * 80)
logger.info(
"EVAL RUNTIME: Collecting triggers from all evaluation runs"
)
all_triggers = []
for eval_run_result in results.evaluation_set_results:
if (
eval_run_result.agent_execution_output
and eval_run_result.agent_execution_output.result
):
runtime_result = (
eval_run_result.agent_execution_output.result
)
if runtime_result.trigger:
all_triggers.append(runtime_result.trigger)
if runtime_result.triggers:
all_triggers.extend(runtime_result.triggers)

if all_triggers:
logger.info(
f"EVAL RUNTIME: ✅ Passing through {len(all_triggers)} trigger(s) to top-level result"
)
for i, trigger in enumerate(all_triggers, 1):
logger.info(
f"EVAL RUNTIME: Pass-through trigger {i}: {trigger.model_dump(by_alias=True)}"
)
else:
logger.info("EVAL RUNTIME: No triggers to pass through")
logger.info("=" * 80)

# Determine overall status - propagate status from inner runtime
# This is critical for serverless executor to know to save state and suspend job
# Priority: SUSPENDED > FAULTED > SUCCESSFUL
overall_status = UiPathRuntimeStatus.SUCCESSFUL
for eval_run_result in results.evaluation_set_results:
if (
eval_run_result.agent_execution_output
and eval_run_result.agent_execution_output.result
):
inner_status = (
eval_run_result.agent_execution_output.result.status
)
if inner_status == UiPathRuntimeStatus.SUSPENDED:
overall_status = UiPathRuntimeStatus.SUSPENDED
logger.info(
"EVAL RUNTIME: Propagating SUSPENDED status from inner runtime"
)
break # SUSPENDED takes highest priority, stop checking
elif inner_status == UiPathRuntimeStatus.FAULTED:
overall_status = UiPathRuntimeStatus.FAULTED
# Continue checking in case a later eval is SUSPENDED

result = UiPathRuntimeResult(
output={**results.model_dump(by_alias=True)},
status=UiPathRuntimeStatus.SUCCESSFUL,
status=overall_status,
triggers=all_triggers if all_triggers else None,
)
return result
except Exception as e:
Expand Down Expand Up @@ -561,6 +629,14 @@ async def _execute_eval(
runtime,
input_overrides=self.context.input_overrides,
)

logger.info(
f"DEBUG: Agent execution result status: {agent_execution_output.result.status}"
)
logger.info(
f"DEBUG: Agent execution result trigger: {agent_execution_output.result.trigger}"
)

except Exception as e:
if self.context.verbose:
if isinstance(e, EvaluationRuntimeException):
Expand Down Expand Up @@ -596,6 +672,69 @@ async def _execute_eval(
)
raise

# Check if execution was suspended (e.g., waiting for RPA job completion)
if (
agent_execution_output.result.status
== UiPathRuntimeStatus.SUSPENDED
):
# For suspended executions, we don't run evaluators yet
# The serverless executor should save the triggers and resume later
logger.info("=" * 80)
logger.info(
f"🔴 EVAL RUNTIME: DETECTED SUSPENSION for eval '{eval_item.name}' (id: {eval_item.id})"
)
logger.info("EVAL RUNTIME: Agent returned SUSPENDED status")

# Extract triggers from result
triggers = []
if agent_execution_output.result.trigger:
triggers.append(agent_execution_output.result.trigger)
if agent_execution_output.result.triggers:
triggers.extend(agent_execution_output.result.triggers)

logger.info(
f"EVAL RUNTIME: Extracted {len(triggers)} trigger(s) from suspended execution"
)
for i, trigger in enumerate(triggers, 1):
logger.info(
f"EVAL RUNTIME: Trigger {i}: {trigger.model_dump(by_alias=True)}"
)
logger.info("=" * 80)

# IMPORTANT: Always include execution output with triggers when suspended
# This ensures triggers are visible in the output JSON for serverless executor
evaluation_run_results.agent_execution_output = (
convert_eval_execution_output_to_serializable(
agent_execution_output
)
)

# Publish suspended status event
await self.event_bus.publish(
EvaluationEvents.UPDATE_EVAL_RUN,
EvalRunUpdatedEvent(
execution_id=execution_id,
eval_item=eval_item,
eval_results=[],
success=True, # Not failed, just suspended
agent_output={
"status": "suspended",
"triggers": [
t.model_dump(by_alias=True) for t in triggers
],
},
agent_execution_time=agent_execution_output.execution_time,
spans=agent_execution_output.spans,
logs=agent_execution_output.logs,
exception_details=None,
),
wait_for_completion=False,
)

# Return partial results with trigger information
# The evaluation will be completed when resumed
return evaluation_run_results

if self.context.verbose:
evaluation_run_results.agent_execution_output = (
convert_eval_execution_output_to_serializable(
Expand Down Expand Up @@ -802,14 +941,18 @@ async def execute_runtime(
"span_type": "eval",
}

# Create a new runtime with unique runtime_id for this eval execution.
# This ensures each eval has its own LangGraph thread_id (clean state),
# preventing message accumulation across eval runs.
# Create a new runtime with runtime_id for this eval execution.
# For suspend/resume scenarios, we use eval_item.id as runtime_id (thread_id)
# so checkpoints can be found across suspend and resume invocations.
# For non-suspend scenarios, this still ensures each eval has its own thread_id.
eval_runtime = None
try:
runtime_id = eval_item.id
if self.context.resume:
logger.info(f"🟢 EVAL RUNTIME: Using eval_item.id '{runtime_id}' to load checkpoint from suspend")
eval_runtime = await self.factory.new_runtime(
entrypoint=self.context.entrypoint or "",
runtime_id=execution_id,
runtime_id=runtime_id,
)
execution_runtime = UiPathExecutionRuntime(
delegate=eval_runtime,
Expand All @@ -827,9 +970,27 @@ async def execute_runtime(
input_overrides or {},
eval_id=eval_item.id,
)
result = await execution_runtime.execute(
input=inputs_with_overrides,
)

# Handle resume mode: provide resume data to continue from interrupt()
if self.context.resume:
try:
from langgraph.types import Command
# Provide mock resume data for evaluation testing
# In production, orchestrator would provide actual result data
resume_data = {"status": "completed", "result": "mock_completion_data"}
logger.info(f"🟢 EVAL RUNTIME: Resuming with mock data: {resume_data}")
result = await execution_runtime.execute(
input=Command(resume=resume_data),
)
except ImportError:
logger.warning("langgraph.types.Command not available, falling back to normal execution")
result = await execution_runtime.execute(
input=inputs_with_overrides,
)
else:
result = await execution_runtime.execute(
input=inputs_with_overrides,
)
except Exception as e:
end_time = time()
spans, logs = self._get_and_clear_execution_data(execution_id)
Expand Down
12 changes: 12 additions & 0 deletions src/uipath/_cli/cli_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ def setup_reporting_prereq(no_report: bool) -> bool:
default="{}",
help='Input field overrides per evaluation ID: \'{"eval-1": {"operator": "*"}, "eval-2": {"a": 100}}\'. Supports deep merge for nested objects.',
)
@click.option(
"--resume",
is_flag=True,
default=False,
help="Resume execution from a previous suspended state",
)
def eval(
entrypoint: str | None,
eval_set: str | None,
Expand All @@ -134,6 +140,7 @@ def eval(
trace_file: str | None,
max_llm_concurrency: int,
input_overrides: dict[str, Any],
resume: bool,
) -> None:
"""Run an evaluation set against the agent.

Expand All @@ -150,6 +157,7 @@ def eval(
trace_file: File path where traces will be written in JSONL format
max_llm_concurrency: Maximum concurrent LLM requests
input_overrides: Input field overrides mapping (direct field override with deep merge)
resume: Resume execution from a previous suspended state
"""
set_llm_concurrency(max_llm_concurrency)

Expand Down Expand Up @@ -188,6 +196,7 @@ def eval(
eval_context.report_coverage = report_coverage
eval_context.model_settings_id = model_settings_id
eval_context.input_overrides = input_overrides
eval_context.resume = resume

try:

Expand All @@ -211,6 +220,9 @@ async def execute_eval():
trace_manager=trace_manager,
command="eval",
) as ctx:
# Set job_id in eval context for single runtime runs
eval_context.job_id = ctx.job_id

if ctx.job_id:
trace_manager.add_span_exporter(LlmOpsHttpExporter())

Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading