66import uuid
77import warnings
88from os import environ as env
9- from typing import Any , Optional
9+ from typing import Any , Dict , Optional
1010
1111import httpx
1212from langchain_core .tracers .base import AsyncBaseTracer
1313from langchain_core .tracers .schemas import Run
1414from pydantic import PydanticDeprecationWarning
15+ from uipath_sdk ._cli ._runtime ._contracts import UiPathTraceContext
1516
17+ from ._events import CustomTraceEvents , FunctionCallEventData
1618from ._utils import _setup_tracer_httpx_logging , _simple_serialize_defaults
1719
1820logger = logging .getLogger (__name__ )
@@ -27,78 +29,98 @@ class Status:
2729
2830
2931class AsyncUiPathTracer (AsyncBaseTracer ):
30- def __init__ (self , client = None , ** kwargs ):
32+ def __init__ (
33+ self ,
34+ context : Optional [UiPathTraceContext ] = None ,
35+ client : Optional [httpx .AsyncClient ] = None ,
36+ ** kwargs ,
37+ ):
3138 super ().__init__ (** kwargs )
3239
3340 self .client = client or httpx .AsyncClient ()
3441 self .retries = 3
3542 self .log_queue : queue .Queue [dict [str , Any ]] = queue .Queue ()
3643
44+ self .context = context or UiPathTraceContext ()
45+
3746 llm_ops_pattern = self ._get_base_url () + "{orgId}/llmops_"
38- self .orgId = env .get (
39- "UIPATH_ORGANIZATION_ID" , "00000000-0000-0000-0000-000000000000"
40- )
41- self .tenantId = env .get (
42- "UIPATH_TENANT_ID" , "00000000-0000-0000-0000-000000000000"
43- )
44- self .url = llm_ops_pattern .format (orgId = self .orgId ).rstrip ("/" )
4547
46- self .auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
48+ self .url = llm_ops_pattern .format (orgId = self .context .org_id ).rstrip ("/" )
49+
50+ auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
4751 "UIPATH_ACCESS_TOKEN"
4852 )
4953
50- self .jobKey = env .get ("UIPATH_JOB_KEY" )
51- self .folderKey = env .get ("UIPATH_FOLDER_KEY" )
52- self .processKey = env .get ("UIPATH_PROCESS_UUID" )
53- self .parent_span_id = env .get ("UIPATH_PARENT_SPAN_ID" )
54-
55- self .referenceId = self .jobKey or str (uuid .uuid4 ())
56-
57- self .headers = {
58- "Authorization" : f"Bearer { self .auth_token } " ,
59- }
54+ self .headers = {"Authorization" : f"Bearer { auth_token } " }
6055
6156 self .running = True
6257 self .worker_task = asyncio .create_task (self ._worker ())
58+ self .function_call_run_map : Dict [str , Run ] = {}
59+
60+ async def on_custom_event (
61+ self ,
62+ name : str ,
63+ data : Any ,
64+ * ,
65+ run_id : uuid .UUID ,
66+ tags = None ,
67+ metadata = None ,
68+ ** kwargs : Any ,
69+ ) -> None :
70+ if name == CustomTraceEvents .UIPATH_TRACE_FUNCTION_CALL :
71+ # only handle the function call event
72+
73+ if not isinstance (data , FunctionCallEventData ):
74+ logger .warning (
75+ f"Received unexpected data type for function call event: { type (data )} "
76+ )
77+ return
6378
64- def _get_base_url ( self ) -> str :
65- uipath_url = (
66- env . get ( "UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
67- )
68- uipath_url = uipath_url . rstrip ( "/" )
79+ if data . event_type == "call" :
80+ run = self . run_map [ str ( run_id )]
81+ child_run = run . create_child (
82+ name = data . function_name , run_type = data . run_type , tags = data . tags
83+ )
6984
70- # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
71- parts = uipath_url . split ( "//" )
85+ if data . metadata is not None :
86+ run . add_metadata ( data . metadata )
7287
73- # after splitting by //, the base URL will be at index 1 along with the rest,
74- # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
75- base_url_parts = parts [1 ].split ("/" )
88+ call_uuid = data .call_uuid
89+ self .function_call_run_map [call_uuid ] = child_run
7690
77- # combine scheme and netloc to get the base URL
78- base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
91+ self ._send_span (run )
7992
80- return base_url
93+ if data .event_type == "completion" :
94+ call_uuid = data .call_uuid
95+ previous_run = self .function_call_run_map .pop (call_uuid , None )
96+
97+ if previous_run :
98+ previous_run .end (
99+ outputs = self ._safe_dict_dump (data .output ), error = data .error
100+ )
101+ self ._send_span (previous_run )
81102
82103 async def init_trace (self , run_name , trace_id = None ) -> None :
83- trace_id_env = env .get ("UIPATH_TRACE_ID" )
104+ if self .context .trace_id :
105+ # trace id already set no need to do anything
106+ return
84107
85- if trace_id_env :
86- self .trace_parent = trace_id_env
87- else :
88- await self .start_trace (run_name , trace_id )
108+ # no trace id, start a new trace
109+ await self .start_trace (run_name , trace_id )
89110
90111 async def start_trace (self , run_name , trace_id = None ) -> None :
91- self .trace_parent = trace_id or str (uuid .uuid4 ())
92- run_name = run_name or f"Job Run: { self .trace_parent } "
112+ self .context .trace_id = str (uuid .uuid4 ())
113+
114+ run_name = run_name or f"Job Run: { self .context .trace_id } "
93115 trace_data = {
94- "id" : self .trace_parent ,
116+ "id" : self .context . trace_id ,
95117 "name" : re .sub (
96118 "[!@#$<>\.]" , "" , run_name
97119 ), # if we use these characters the Agents UI throws some error (but llmops backend seems fine)
98- "referenceId" : self .referenceId ,
120+ "referenceId" : self .context . reference_id ,
99121 "attributes" : "{}" ,
100- "organizationId" : self .orgId ,
101- "tenantId" : self .tenantId ,
122+ "organizationId" : self .context . org_id ,
123+ "tenantId" : self .context . tenant_id ,
102124 }
103125
104126 for attempt in range (self .retries ):
@@ -176,9 +198,9 @@ async def _worker(self):
176198
177199 async def _persist_run (self , run : Run ) -> None :
178200 # Determine if this is a start or end trace based on whether end_time is set
179- await self ._send_span (run )
201+ self ._send_span (run )
180202
181- async def _send_span (self , run : Run ) -> None :
203+ def _send_span (self , run : Run ) -> None :
182204 """Send span data for a run to the API"""
183205 run_id = str (run .id )
184206
@@ -193,27 +215,27 @@ async def _send_span(self, run: Run) -> None:
193215 parent_id = (
194216 str (run .parent_run_id )
195217 if run .parent_run_id is not None
196- else self .parent_span_id
218+ else self .context . parent_span_id
197219 )
198- attributes = self ._safe_json_dump (self ._run_to_dict (run ))
220+ attributes = self ._safe_jsons_dump (self ._run_to_dict (run ))
199221 status = self ._determine_status (run .error )
200222
201223 span_data = {
202224 "id" : run_id ,
203225 "parentId" : parent_id ,
204- "traceId" : self .trace_parent ,
226+ "traceId" : self .context . trace_id ,
205227 "name" : run .name ,
206228 "startTime" : start_time ,
207229 "endTime" : end_time ,
208- "referenceId" : self .referenceId ,
230+ "referenceId" : self .context . reference_id ,
209231 "attributes" : attributes ,
210- "organizationId" : self .orgId ,
211- "tenantId" : self .tenantId ,
232+ "organizationId" : self .context . org_id ,
233+ "tenantId" : self .context . tenant_id ,
212234 "spanType" : "LangGraphRun" ,
213235 "status" : status ,
214- "jobKey" : self .jobKey ,
215- "folderKey" : self .folderKey ,
216- "processKey" : self .processKey ,
236+ "jobKey" : self .context . job_id ,
237+ "folderKey" : self .context . folder_key ,
238+ "processKey" : self .context . folder_key ,
217239 }
218240
219241 self .log_queue .put (span_data )
@@ -237,14 +259,23 @@ def _determine_status(self, error: Optional[str]):
237259
238260 return Status .SUCCESS
239261
240- def _safe_json_dump (self , obj ) -> str :
262+ def _safe_jsons_dump (self , obj ) -> str :
241263 try :
242264 json_str = json .dumps (obj , default = _simple_serialize_defaults )
243265 return json_str
244266 except Exception as e :
245- logger .warning (e )
267+ logger .warning (f"Error serializing object to JSON: { e } " )
246268 return "{ }"
247269
270+ def _safe_dict_dump (self , obj ) -> Dict [str , Any ]:
271+ try :
272+ serialized = json .loads (json .dumps (obj , default = _simple_serialize_defaults ))
273+ return serialized
274+ except Exception as e :
275+ # Last resort - string representation
276+ logger .warning (f"Error serializing object to JSON: { e } " )
277+ return {"raw" : str (obj )}
278+
248279 def _run_to_dict (self , run : Run ):
249280 with warnings .catch_warnings ():
250281 warnings .simplefilter ("ignore" , category = PydanticDeprecationWarning )
@@ -254,3 +285,21 @@ def _run_to_dict(self, run: Run):
254285 "inputs" : run .inputs .copy () if run .inputs is not None else None ,
255286 "outputs" : run .outputs .copy () if run .outputs is not None else None ,
256287 }
288+
289+ def _get_base_url (self ) -> str :
290+ uipath_url = (
291+ env .get ("UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
292+ )
293+ uipath_url = uipath_url .rstrip ("/" )
294+
295+ # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
296+ parts = uipath_url .split ("//" )
297+
298+ # after splitting by //, the base URL will be at index 1 along with the rest,
299+ # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
300+ base_url_parts = parts [1 ].split ("/" )
301+
302+ # combine scheme and netloc to get the base URL
303+ base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
304+
305+ return base_url
0 commit comments