diff --git a/pyproject.toml b/pyproject.toml index 26400216d..395539a26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath" -version = "2.5.3" +version = "2.5.4" description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools." readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/_cli/_evals/_runtime.py b/src/uipath/_cli/_evals/_runtime.py index adc1199e2..9be5ad499 100644 --- a/src/uipath/_cli/_evals/_runtime.py +++ b/src/uipath/_cli/_evals/_runtime.py @@ -295,6 +295,8 @@ class UiPathEvalContext: report_coverage: bool = False input_overrides: dict[str, Any] | None = None model_settings_id: str = "default" + resume: bool = False + job_id: str | None = None class UiPathEvalRuntime: @@ -327,7 +329,8 @@ def __init__( self.trace_manager.tracer_provider.add_span_processor(live_tracking_processor) self.logs_exporter: ExecutionLogsExporter = ExecutionLogsExporter() - self.execution_id = str(uuid.uuid4()) + # Use job_id if available (for single runtime runs), otherwise generate UUID + self.execution_id = context.job_id or str(uuid.uuid4()) self.coverage = coverage.Coverage(branch=True) async def __aenter__(self) -> "UiPathEvalRuntime": @@ -405,6 +408,17 @@ async def initiate_evaluation( ) async def execute(self) -> UiPathRuntimeResult: + logger.info("=" * 80) + logger.info("EVAL RUNTIME: Starting evaluation execution") + logger.info(f"EVAL RUNTIME: Execution ID: {self.execution_id}") + logger.info(f"EVAL RUNTIME: Job ID: {self.context.job_id}") + logger.info(f"EVAL RUNTIME: Resume mode: {self.context.resume}") + if self.context.resume: + logger.info( + "🟢 EVAL RUNTIME: RESUME MODE ENABLED - Will resume from suspended state" + ) + logger.info("=" * 80) + # Configure model settings override before creating runtime await self._configure_model_settings_override() @@ -490,9 +504,63 @@ async def execute(self) -> UiPathRuntimeResult: wait_for_completion=False, ) + # Collect triggers from all evaluation runs (pass-through from inner runtime) + logger.info("=" * 80) + logger.info( + "EVAL RUNTIME: Collecting triggers from all evaluation runs" + ) + all_triggers = [] + for eval_run_result in results.evaluation_set_results: + if ( + eval_run_result.agent_execution_output + and eval_run_result.agent_execution_output.result + ): + runtime_result = ( + eval_run_result.agent_execution_output.result + ) + if runtime_result.trigger: + all_triggers.append(runtime_result.trigger) + if runtime_result.triggers: + all_triggers.extend(runtime_result.triggers) + + if all_triggers: + logger.info( + f"EVAL RUNTIME: ✅ Passing through {len(all_triggers)} trigger(s) to top-level result" + ) + for i, trigger in enumerate(all_triggers, 1): + logger.info( + f"EVAL RUNTIME: Pass-through trigger {i}: {trigger.model_dump(by_alias=True)}" + ) + else: + logger.info("EVAL RUNTIME: No triggers to pass through") + logger.info("=" * 80) + + # Determine overall status - propagate status from inner runtime + # This is critical for serverless executor to know to save state and suspend job + # Priority: SUSPENDED > FAULTED > SUCCESSFUL + overall_status = UiPathRuntimeStatus.SUCCESSFUL + for eval_run_result in results.evaluation_set_results: + if ( + eval_run_result.agent_execution_output + and eval_run_result.agent_execution_output.result + ): + inner_status = ( + eval_run_result.agent_execution_output.result.status + ) + if inner_status == UiPathRuntimeStatus.SUSPENDED: + overall_status = UiPathRuntimeStatus.SUSPENDED + logger.info( + "EVAL RUNTIME: Propagating SUSPENDED status from inner runtime" + ) + break # SUSPENDED takes highest priority, stop checking + elif inner_status == UiPathRuntimeStatus.FAULTED: + overall_status = UiPathRuntimeStatus.FAULTED + # Continue checking in case a later eval is SUSPENDED + result = UiPathRuntimeResult( output={**results.model_dump(by_alias=True)}, - status=UiPathRuntimeStatus.SUCCESSFUL, + status=overall_status, + triggers=all_triggers if all_triggers else None, ) return result except Exception as e: @@ -561,6 +629,14 @@ async def _execute_eval( runtime, input_overrides=self.context.input_overrides, ) + + logger.info( + f"DEBUG: Agent execution result status: {agent_execution_output.result.status}" + ) + logger.info( + f"DEBUG: Agent execution result trigger: {agent_execution_output.result.trigger}" + ) + except Exception as e: if self.context.verbose: if isinstance(e, EvaluationRuntimeException): @@ -596,6 +672,69 @@ async def _execute_eval( ) raise + # Check if execution was suspended (e.g., waiting for RPA job completion) + if ( + agent_execution_output.result.status + == UiPathRuntimeStatus.SUSPENDED + ): + # For suspended executions, we don't run evaluators yet + # The serverless executor should save the triggers and resume later + logger.info("=" * 80) + logger.info( + f"🔴 EVAL RUNTIME: DETECTED SUSPENSION for eval '{eval_item.name}' (id: {eval_item.id})" + ) + logger.info("EVAL RUNTIME: Agent returned SUSPENDED status") + + # Extract triggers from result + triggers = [] + if agent_execution_output.result.trigger: + triggers.append(agent_execution_output.result.trigger) + if agent_execution_output.result.triggers: + triggers.extend(agent_execution_output.result.triggers) + + logger.info( + f"EVAL RUNTIME: Extracted {len(triggers)} trigger(s) from suspended execution" + ) + for i, trigger in enumerate(triggers, 1): + logger.info( + f"EVAL RUNTIME: Trigger {i}: {trigger.model_dump(by_alias=True)}" + ) + logger.info("=" * 80) + + # IMPORTANT: Always include execution output with triggers when suspended + # This ensures triggers are visible in the output JSON for serverless executor + evaluation_run_results.agent_execution_output = ( + convert_eval_execution_output_to_serializable( + agent_execution_output + ) + ) + + # Publish suspended status event + await self.event_bus.publish( + EvaluationEvents.UPDATE_EVAL_RUN, + EvalRunUpdatedEvent( + execution_id=execution_id, + eval_item=eval_item, + eval_results=[], + success=True, # Not failed, just suspended + agent_output={ + "status": "suspended", + "triggers": [ + t.model_dump(by_alias=True) for t in triggers + ], + }, + agent_execution_time=agent_execution_output.execution_time, + spans=agent_execution_output.spans, + logs=agent_execution_output.logs, + exception_details=None, + ), + wait_for_completion=False, + ) + + # Return partial results with trigger information + # The evaluation will be completed when resumed + return evaluation_run_results + if self.context.verbose: evaluation_run_results.agent_execution_output = ( convert_eval_execution_output_to_serializable( @@ -802,14 +941,18 @@ async def execute_runtime( "span_type": "eval", } - # Create a new runtime with unique runtime_id for this eval execution. - # This ensures each eval has its own LangGraph thread_id (clean state), - # preventing message accumulation across eval runs. + # Create a new runtime with runtime_id for this eval execution. + # For suspend/resume scenarios, we use eval_item.id as runtime_id (thread_id) + # so checkpoints can be found across suspend and resume invocations. + # For non-suspend scenarios, this still ensures each eval has its own thread_id. eval_runtime = None try: + runtime_id = eval_item.id + if self.context.resume: + logger.info(f"🟢 EVAL RUNTIME: Using eval_item.id '{runtime_id}' to load checkpoint from suspend") eval_runtime = await self.factory.new_runtime( entrypoint=self.context.entrypoint or "", - runtime_id=execution_id, + runtime_id=runtime_id, ) execution_runtime = UiPathExecutionRuntime( delegate=eval_runtime, @@ -827,9 +970,27 @@ async def execute_runtime( input_overrides or {}, eval_id=eval_item.id, ) - result = await execution_runtime.execute( - input=inputs_with_overrides, - ) + + # Handle resume mode: provide resume data to continue from interrupt() + if self.context.resume: + try: + from langgraph.types import Command + # Provide mock resume data for evaluation testing + # In production, orchestrator would provide actual result data + resume_data = {"status": "completed", "result": "mock_completion_data"} + logger.info(f"🟢 EVAL RUNTIME: Resuming with mock data: {resume_data}") + result = await execution_runtime.execute( + input=Command(resume=resume_data), + ) + except ImportError: + logger.warning("langgraph.types.Command not available, falling back to normal execution") + result = await execution_runtime.execute( + input=inputs_with_overrides, + ) + else: + result = await execution_runtime.execute( + input=inputs_with_overrides, + ) except Exception as e: end_time = time() spans, logs = self._get_and_clear_execution_data(execution_id) diff --git a/src/uipath/_cli/cli_eval.py b/src/uipath/_cli/cli_eval.py index 4f2600d47..ae9441dc2 100644 --- a/src/uipath/_cli/cli_eval.py +++ b/src/uipath/_cli/cli_eval.py @@ -120,6 +120,12 @@ def setup_reporting_prereq(no_report: bool) -> bool: default="{}", help='Input field overrides per evaluation ID: \'{"eval-1": {"operator": "*"}, "eval-2": {"a": 100}}\'. Supports deep merge for nested objects.', ) +@click.option( + "--resume", + is_flag=True, + default=False, + help="Resume execution from a previous suspended state", +) def eval( entrypoint: str | None, eval_set: str | None, @@ -134,6 +140,7 @@ def eval( trace_file: str | None, max_llm_concurrency: int, input_overrides: dict[str, Any], + resume: bool, ) -> None: """Run an evaluation set against the agent. @@ -150,6 +157,7 @@ def eval( trace_file: File path where traces will be written in JSONL format max_llm_concurrency: Maximum concurrent LLM requests input_overrides: Input field overrides mapping (direct field override with deep merge) + resume: Resume execution from a previous suspended state """ set_llm_concurrency(max_llm_concurrency) @@ -188,6 +196,7 @@ def eval( eval_context.report_coverage = report_coverage eval_context.model_settings_id = model_settings_id eval_context.input_overrides = input_overrides + eval_context.resume = resume try: @@ -211,6 +220,9 @@ async def execute_eval(): trace_manager=trace_manager, command="eval", ) as ctx: + # Set job_id in eval context for single runtime runs + eval_context.job_id = ctx.job_id + if ctx.job_id: trace_manager.add_span_exporter(LlmOpsHttpExporter()) diff --git a/uv.lock b/uv.lock index 2f2dacf24..c63e780a6 100644 --- a/uv.lock +++ b/uv.lock @@ -2486,7 +2486,7 @@ wheels = [ [[package]] name = "uipath" -version = "2.5.3" +version = "2.5.4" source = { editable = "." } dependencies = [ { name = "applicationinsights" },