diff --git a/src/uipath/platform/common/interrupt_models.py b/src/uipath/platform/common/interrupt_models.py index c27ec0578..c68a5156a 100644 --- a/src/uipath/platform/common/interrupt_models.py +++ b/src/uipath/platform/common/interrupt_models.py @@ -4,7 +4,11 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator -from ..action_center.tasks import Task +from uipath.platform.context_grounding.context_grounding_index import ( + ContextGroundingIndex, +) + +from ..action_center import Task from ..context_grounding import ( BatchTransformCreationResponse, BatchTransformOutputColumn, @@ -84,6 +88,19 @@ class WaitDeepRag(BaseModel): index_folder_key: str | None = None +class CreateEphemeralIndex(BaseModel): + """Model representing a Ephemeral Index task creation.""" + + usage: str + attachments: list[str] + + +class WaitEphemeralIndex(BaseModel): + """Model representing a wait Ephemeral Index task.""" + + index: ContextGroundingIndex + + class CreateBatchTransform(BaseModel): """Model representing a Batch Transform task creation.""" diff --git a/src/uipath/platform/context_grounding/__init__.py b/src/uipath/platform/context_grounding/__init__.py index 1682bc5b8..784ac1098 100644 --- a/src/uipath/platform/context_grounding/__init__.py +++ b/src/uipath/platform/context_grounding/__init__.py @@ -12,6 +12,7 @@ DeepRagCreationResponse, DeepRagResponse, DeepRagStatus, + IndexStatus, ) from .context_grounding_index import ContextGroundingIndex from .context_grounding_payloads import ( @@ -52,6 +53,7 @@ "DeepRagCreationResponse", "DeepRagResponse", "DeepRagStatus", + "IndexStatus", "DropboxDataSource", "DropboxSourceConfig", "GoogleDriveDataSource", diff --git a/src/uipath/platform/context_grounding/_context_grounding_service.py b/src/uipath/platform/context_grounding/_context_grounding_service.py index b7152da61..226f19a03 100644 --- a/src/uipath/platform/context_grounding/_context_grounding_service.py +++ b/src/uipath/platform/context_grounding/_context_grounding_service.py @@ -1,9 +1,9 @@ +import uuid from pathlib import Path from typing import Annotated, Any, Dict, List, Optional, Tuple, Union import httpx from pydantic import Field, TypeAdapter -from typing_extensions import deprecated from ..._utils import Endpoint, RequestSpec, header_folder, resource_override from ..._utils._ssl_context import get_httpx_client_kwargs @@ -33,10 +33,12 @@ ) from .context_grounding_index import ContextGroundingIndex from .context_grounding_payloads import ( + AttachmentsDataSource, BucketDataSource, BucketSourceConfig, ConfluenceDataSource, ConfluenceSourceConfig, + CreateEphemeralIndexPayload, CreateIndexPayload, DropboxDataSource, DropboxSourceConfig, @@ -276,13 +278,7 @@ async def retrieve_async( raise Exception("ContextGroundingIndex not found") from e @traced(name="contextgrounding_retrieve_by_id", run_type="uipath") - @deprecated("Use retrieve instead") - def retrieve_by_id( - self, - id: str, - folder_key: Optional[str] = None, - folder_path: Optional[str] = None, - ) -> Any: + def retrieve_by_id(self, id: str) -> Any: """Retrieve context grounding index information by its ID. This method provides direct access to a context index using its unique @@ -290,17 +286,11 @@ def retrieve_by_id( Args: id (str): The unique identifier of the context index. - folder_key (Optional[str]): The key of the folder where the index resides. - folder_path (Optional[str]): The path of the folder where the index resides. Returns: Any: The index information, including its configuration and metadata. """ - spec = self._retrieve_by_id_spec( - id, - folder_key=folder_key, - folder_path=folder_path, - ) + spec = self._retrieve_by_id_spec(id) return self.request( spec.method, @@ -309,13 +299,7 @@ def retrieve_by_id( ).json() @traced(name="contextgrounding_retrieve_by_id", run_type="uipath") - @deprecated("Use retrieve_async instead") - async def retrieve_by_id_async( - self, - id: str, - folder_key: Optional[str] = None, - folder_path: Optional[str] = None, - ) -> Any: + async def retrieve_by_id_async(self, id: str) -> Any: """Retrieve asynchronously context grounding index information by its ID. This method provides direct access to a context index using its unique @@ -323,17 +307,11 @@ async def retrieve_by_id_async( Args: id (str): The unique identifier of the context index. - folder_key (Optional[str]): The key of the folder where the index resides. - folder_path (Optional[str]): The path of the folder where the index resides. Returns: Any: The index information, including its configuration and metadata. """ - spec = self._retrieve_by_id_spec( - id, - folder_key=folder_key, - folder_path=folder_path, - ) + spec = self._retrieve_by_id_spec(id) response = await self.request_async( spec.method, @@ -398,6 +376,28 @@ def create_index( return ContextGroundingIndex.model_validate(response.json()) + @resource_override(resource_type="index") + @traced(name="contextgrounding_create_ephemeral_index", run_type="uipath") + def create_ephemeral_index( + self, + usage: str, + attachments: list[uuid.UUID], + ) -> ContextGroundingIndex: + """Create a new context ephemeral grounding index.""" + spec = self._create_ephemeral_spec( + usage, + attachments, + ) + + response = self.request( + spec.method, + spec.endpoint, + json=spec.json, + headers=spec.headers, + ) + + return ContextGroundingIndex.model_validate(response.json()) + @resource_override(resource_type="index") @traced(name="contextgrounding_create_index", run_type="uipath") async def create_index_async( @@ -1197,6 +1197,37 @@ def _create_spec( }, ) + def _create_ephemeral_spec( + self, + usage: str, + attachments: list[uuid.UUID] = None, + ) -> RequestSpec: + """Create request spec for index creation.""" + data_source_dict = self._build_ephemeral_data_source(attachments) + + payload = CreateEphemeralIndexPayload( + usage=usage, + data_source=data_source_dict, + ) + + return RequestSpec( + method="POST", + endpoint=Endpoint("/ecs_/v2/indexes/createephemeral"), + json=payload.model_dump(by_alias=True, exclude_none=True), + headers={}, + ) + + def _build_ephemeral_data_source( + self, attachments: list[uuid.UUID] + ) -> Dict[str, Any]: + data_source: AttachmentsDataSource + data_source = AttachmentsDataSource(attachments=attachments) + return data_source.model_dump( + by_alias=True, + exclude_none=True, + mode="json", + ) + def _build_data_source(self, source: SourceConfig) -> Dict[str, Any]: """Build data source configuration from typed source config. @@ -1265,20 +1296,11 @@ def _build_data_source(self, source: SourceConfig) -> Dict[str, Any]: return data_source.model_dump(by_alias=True, exclude_none=True) - def _retrieve_by_id_spec( - self, - id: str, - folder_key: Optional[str] = None, - folder_path: Optional[str] = None, - ) -> RequestSpec: - folder_key = self._resolve_folder_key(folder_key, folder_path) - + def _retrieve_by_id_spec(self, id: str) -> RequestSpec: return RequestSpec( method="GET", endpoint=Endpoint(f"/ecs_/v2/indexes/{id}"), - headers={ - **header_folder(folder_key, None), - }, + headers={}, ) def _delete_by_id_spec( diff --git a/src/uipath/platform/context_grounding/context_grounding.py b/src/uipath/platform/context_grounding/context_grounding.py index 885b02e56..c8b66b7d2 100644 --- a/src/uipath/platform/context_grounding/context_grounding.py +++ b/src/uipath/platform/context_grounding/context_grounding.py @@ -41,6 +41,15 @@ class DeepRagStatus(str, Enum): FAILED = "Failed" +class IndexStatus(str, Enum): + """Enum representing possible index tasks status.""" + + QUEUED = "Queued" + IN_PROGRESS = "InProgress" + SUCCESSFUL = "Successful" + FAILED = "Failed" + + class Citation(BaseModel): """Model representing a deep RAG citation.""" diff --git a/src/uipath/platform/context_grounding/context_grounding_index.py b/src/uipath/platform/context_grounding/context_grounding_index.py index 4cbea632a..698fa2b8a 100644 --- a/src/uipath/platform/context_grounding/context_grounding_index.py +++ b/src/uipath/platform/context_grounding/context_grounding_index.py @@ -44,9 +44,9 @@ class ContextGroundingIndex(BaseModel): extra="allow", ) - @field_serializer("last_ingested", "last_queried", when_used="json") + @field_serializer("last_ingested", "last_queried") def serialize_datetime(self, value): - """Serialize datetime fields to ISO 8601 format for JSON output.""" + """Serialize datetime fields to ISO 8601 format.""" if isinstance(value, datetime): return value.isoformat() if value else None return value diff --git a/src/uipath/platform/context_grounding/context_grounding_payloads.py b/src/uipath/platform/context_grounding/context_grounding_payloads.py index 4b64778d7..05fd3b1f2 100644 --- a/src/uipath/platform/context_grounding/context_grounding_payloads.py +++ b/src/uipath/platform/context_grounding/context_grounding_payloads.py @@ -1,6 +1,7 @@ """Payload models for context grounding index creation and configuration.""" import re +import uuid from typing import Any, Dict, Literal, Optional, Union from pydantic import BaseModel, ConfigDict, Field, model_validator @@ -82,6 +83,10 @@ class ConfluenceDataSource(DataSourceBase): space_id: str = Field(alias="spaceId", description="Space ID") +class AttachmentsDataSource(BaseModel): + attachments: list[uuid.UUID] = Field(description="List of attachment ids") + + class Indexer(BaseModel): """Configuration for periodic indexing of data sources.""" @@ -136,6 +141,17 @@ class CreateIndexPayload(BaseModel): model_config = ConfigDict(populate_by_name=True) +class CreateEphemeralIndexPayload(BaseModel): + """ """ + + usage: str = Field(description="Index usage") + data_source: Dict[str, Any] = Field( + alias="dataSource", description="Data source configuration" + ) + + model_config = ConfigDict(populate_by_name=True) + + # user-facing source configuration models class BaseSourceConfig(BaseModel): """Base configuration for all source types.""" diff --git a/src/uipath/platform/resume_triggers/_protocol.py b/src/uipath/platform/resume_triggers/_protocol.py index a23a88b9b..b91099be6 100644 --- a/src/uipath/platform/resume_triggers/_protocol.py +++ b/src/uipath/platform/resume_triggers/_protocol.py @@ -35,7 +35,14 @@ WaitJob, WaitTask, ) -from uipath.platform.context_grounding import DeepRagStatus +from uipath.platform.common.interrupt_models import ( + CreateEphemeralIndex, + WaitEphemeralIndex, +) +from uipath.platform.context_grounding import DeepRagStatus, IndexStatus +from uipath.platform.context_grounding.context_grounding_index import ( + ContextGroundingIndex, +) from uipath.platform.errors import ( BatchTransformNotCompleteException, ExtractionNotCompleteException, @@ -226,6 +233,33 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None: return trigger_response + case UiPathResumeTriggerType.EPHEMERAL_INDEX: + if trigger.item_key: + index = uipath.context_grounding.retrieve_by_id(trigger.item_key) + + ephemeral_index = ContextGroundingIndex(**index) + + ephemeral_index_status = ephemeral_index.last_ingestion_status + + if ephemeral_index_status in ( + IndexStatus.QUEUED, + IndexStatus.IN_PROGRESS, + ): + raise UiPathPendingTriggerError( + ErrorCategory.SYSTEM, + f"Index ingestion is not finished yet. Current status: {ephemeral_index_status}", + ) + + if ephemeral_index_status != IndexStatus.SUCCESSFUL: + raise UiPathFaultedTriggerError( + ErrorCategory.USER, + f"Index ingestion '{ephemeral_index.name}' did not finish successfully.", + ) + + trigger_response = ephemeral_index + + return trigger_response + case UiPathResumeTriggerType.BATCH_RAG: if trigger.item_key: destination_path = self._extract_field( @@ -354,6 +388,10 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: await self._handle_deep_rag_job_trigger( suspend_value, resume_trigger, uipath ) + case UiPathResumeTriggerType.EPHEMERAL_INDEX: + await self._handle_ephemeral_index_job_trigger( + suspend_value, resume_trigger, uipath + ) case UiPathResumeTriggerType.BATCH_RAG: await self._handle_batch_rag_job_trigger( suspend_value, resume_trigger, uipath @@ -391,6 +429,8 @@ def _determine_trigger_type(self, value: Any) -> UiPathResumeTriggerType: return UiPathResumeTriggerType.JOB if isinstance(value, (CreateDeepRag, WaitDeepRag)): return UiPathResumeTriggerType.DEEP_RAG + if isinstance(value, (CreateEphemeralIndex, WaitEphemeralIndex)): + return UiPathResumeTriggerType.EPHEMERAL_INDEX if isinstance(value, (CreateBatchTransform, WaitBatchTransform)): return UiPathResumeTriggerType.BATCH_RAG if isinstance(value, (DocumentExtraction, WaitDocumentExtraction)): @@ -415,6 +455,8 @@ def _determine_trigger_name(self, value: Any) -> UiPathResumeTriggerName: return UiPathResumeTriggerName.JOB if isinstance(value, (CreateDeepRag, WaitDeepRag)): return UiPathResumeTriggerName.DEEP_RAG + if isinstance(value, (CreateEphemeralIndex, WaitEphemeralIndex)): + return UiPathResumeTriggerName.EPHEMERAL_INDEX if isinstance(value, (CreateBatchTransform, WaitBatchTransform)): return UiPathResumeTriggerName.BATCH_RAG if isinstance(value, (DocumentExtraction, WaitDocumentExtraction)): @@ -479,6 +521,21 @@ async def _handle_deep_rag_job_trigger( raise Exception("Failed to start deep rag") resume_trigger.item_key = deep_rag.id + async def _handle_ephemeral_index_job_trigger( + self, value: Any, resume_trigger: UiPathResumeTrigger, uipath: UiPath + ) -> None: + """Handle ephemeral index""" + if isinstance(value, WaitEphemeralIndex): + resume_trigger.item_key = value.index.id + elif isinstance(value, CreateEphemeralIndex): + ephemeral_index = uipath.context_grounding.create_ephemeral_index( + usage=value.usage, + attachments=value.attachments, + ) + if not ephemeral_index: + raise Exception("Failed to create ephemeral index") + resume_trigger.item_key = ephemeral_index.id + async def _handle_batch_rag_job_trigger( self, value: Any, resume_trigger: UiPathResumeTrigger, uipath: UiPath ) -> None: diff --git a/tests/cli/test_hitl.py b/tests/cli/test_hitl.py index bdeb43c22..5ef8690a3 100644 --- a/tests/cli/test_hitl.py +++ b/tests/cli/test_hitl.py @@ -25,6 +25,10 @@ WaitJob, WaitTask, ) +from uipath.platform.common.interrupt_models import ( + CreateEphemeralIndex, + WaitEphemeralIndex, +) from uipath.platform.context_grounding import ( BatchTransformCreationResponse, BatchTransformOutputColumn, @@ -32,6 +36,10 @@ CitationMode, DeepRagCreationResponse, DeepRagStatus, + IndexStatus, +) +from uipath.platform.context_grounding.context_grounding_index import ( + ContextGroundingIndex, ) from uipath.platform.orchestrator import ( Job, @@ -517,6 +525,95 @@ async def test_read_batch_rag_trigger_pending( reader = UiPathResumeTriggerReader() await reader.read_trigger(resume_trigger) + @pytest.mark.anyio + async def test_read_ephemeral_index_trigger_successful( + self, + setup_test_env: None, + ) -> None: + """Test reading a successful ephemeral index trigger.""" + index_id = "test-ephemeral-index-id" + index_data = { + "id": index_id, + "name": "test-index", + "lastIngestionStatus": IndexStatus.SUCCESSFUL.value, + } + + mock_retrieve_by_id = AsyncMock(return_value=index_data) + + with patch( + "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_by_id", + new=mock_retrieve_by_id, + ): + resume_trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.EPHEMERAL_INDEX, + item_key=index_id, + ) + reader = UiPathResumeTriggerReader() + result = await reader.read_trigger(resume_trigger) + + assert isinstance(result, ContextGroundingIndex) + assert result.id == index_id + mock_retrieve_by_id.assert_called_once_with(index_id) + + @pytest.mark.anyio + async def test_read_ephemeral_index_trigger_pending( + self, + setup_test_env: None, + ) -> None: + """Test reading a pending ephemeral index trigger raises pending error.""" + from uipath.core.errors import UiPathPendingTriggerError + + index_id = "test-ephemeral-index-id" + index_data = { + "id": index_id, + "name": "test-index", + "lastIngestionStatus": IndexStatus.IN_PROGRESS.value, + } + + mock_retrieve_by_id = AsyncMock(return_value=index_data) + + with patch( + "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_by_id", + new=mock_retrieve_by_id, + ): + resume_trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.EPHEMERAL_INDEX, + item_key=index_id, + ) + + with pytest.raises(UiPathPendingTriggerError): + reader = UiPathResumeTriggerReader() + await reader.read_trigger(resume_trigger) + + @pytest.mark.anyio + async def test_read_ephemeral_index_trigger_failed( + self, + setup_test_env: None, + ) -> None: + """Test reading a failed ephemeral index trigger raises faulted error.""" + index_id = "test-ephemeral-index-id" + index_data = { + "id": index_id, + "name": "test-index", + "lastIngestionStatus": IndexStatus.FAILED.value, + } + + mock_retrieve_by_id = AsyncMock(return_value=index_data) + + with patch( + "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.retrieve_by_id", + new=mock_retrieve_by_id, + ): + resume_trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.EPHEMERAL_INDEX, + item_key=index_id, + ) + + with pytest.raises(UiPathFaultedTriggerError) as exc_info: + reader = UiPathResumeTriggerReader() + await reader.read_trigger(resume_trigger) + assert exc_info.value.args[0] == ErrorCategory.USER + class TestHitlProcessor: """Tests for the HitlProcessor class.""" @@ -778,6 +875,62 @@ async def test_create_resume_trigger_wait_batch_transform( assert resume_trigger.trigger_type == UiPathResumeTriggerType.BATCH_RAG assert resume_trigger.item_key == batch_transform_id + @pytest.mark.anyio + async def test_create_resume_trigger_create_ephemeral_index( + self, + setup_test_env: None, + ) -> None: + """Test creating a resume trigger for CreateEphemeralIndex.""" + index_id = "test-ephemeral-index-id" + attachments = ["attachment-uuid-1", "attachment-uuid-2"] + create_ephemeral_index = CreateEphemeralIndex( + usage="DeepRAG", + attachments=attachments, + ) + + mock_index = ContextGroundingIndex( + id=index_id, + name="ephemeral-index", + last_ingestion_status=IndexStatus.QUEUED.value, + ) + mock_create_ephemeral_index = AsyncMock(return_value=mock_index) + + with patch( + "uipath.platform.context_grounding._context_grounding_service.ContextGroundingService.create_ephemeral_index", + new=mock_create_ephemeral_index, + ): + processor = UiPathResumeTriggerCreator() + resume_trigger = await processor.create_trigger(create_ephemeral_index) + + assert resume_trigger is not None + assert resume_trigger.trigger_type == UiPathResumeTriggerType.EPHEMERAL_INDEX + assert resume_trigger.item_key == index_id + mock_create_ephemeral_index.assert_called_once_with( + usage=create_ephemeral_index.usage, + attachments=create_ephemeral_index.attachments, + ) + + @pytest.mark.anyio + async def test_create_resume_trigger_wait_ephemeral_index( + self, + setup_test_env: None, + ) -> None: + """Test creating a resume trigger for WaitEphemeralIndex.""" + index_id = "test-ephemeral-index-id" + ephemeral_index = ContextGroundingIndex( + id=index_id, + name="ephemeral-index", + last_ingestion_status=IndexStatus.IN_PROGRESS.value, + ) + wait_ephemeral_index = WaitEphemeralIndex(index=ephemeral_index) + + processor = UiPathResumeTriggerCreator() + resume_trigger = await processor.create_trigger(wait_ephemeral_index) + + assert resume_trigger is not None + assert resume_trigger.trigger_type == UiPathResumeTriggerType.EPHEMERAL_INDEX + assert resume_trigger.item_key == index_id + class TestDocumentExtractionModels: """Tests for document extraction models.""" diff --git a/tests/sdk/services/test_context_grounding_service.py b/tests/sdk/services/test_context_grounding_service.py index 72efef02c..614a91fb7 100644 --- a/tests/sdk/services/test_context_grounding_service.py +++ b/tests/sdk/services/test_context_grounding_service.py @@ -1760,6 +1760,61 @@ def test_download_batch_transform_result_creates_nested_directories( assert destination.read_bytes() == b"col1,col2\nval1,val2" assert destination.parent.exists() + def test_create_ephemeral_index( + self, + httpx_mock: HTTPXMock, + service: ContextGroundingService, + base_url: str, + org: str, + tenant: str, + version: str, + ) -> None: + import uuid + + httpx_mock.add_response( + url=f"{base_url}{org}{tenant}/ecs_/v2/indexes/createephemeral", + status_code=200, + json={ + "id": "ephemeral-index-id", + "name": "ephemeral-index", + "lastIngestionStatus": "Queued", + }, + ) + + attachment_ids = [uuid.uuid4(), uuid.uuid4()] + index = service.create_ephemeral_index( + usage="DeepRAG", + attachments=attachment_ids, + ) + + assert isinstance(index, ContextGroundingIndex) + assert index.id == "ephemeral-index-id" + assert index.name == "ephemeral-index" + assert index.last_ingestion_status == "Queued" + + sent_requests = httpx_mock.get_requests() + if sent_requests is None: + raise Exception("No request was sent") + + assert sent_requests[0].method == "POST" + assert ( + sent_requests[0].url + == f"{base_url}{org}{tenant}/ecs_/v2/indexes/createephemeral" + ) + + request_data = json.loads(sent_requests[0].content) + assert request_data["usage"] == "DeepRAG" + assert "dataSource" in request_data + assert request_data["dataSource"]["attachments"] == [ + str(att) for att in attachment_ids + ] + + assert HEADER_USER_AGENT in sent_requests[0].headers + assert ( + sent_requests[0].headers[HEADER_USER_AGENT] + == f"UiPath.Python.Sdk/UiPath.Python.Sdk.Activities.ContextGroundingService.create_ephemeral_index/{version}" + ) + @pytest.mark.anyio async def test_download_batch_transform_result_async_creates_nested_directories( self,