33import asyncio
44import logging
55import os
6+ import uuid
67from typing import Any
78from urllib .parse import urlparse
89
1213 UiPathConversationEvent ,
1314 UiPathConversationExchangeEndEvent ,
1415 UiPathConversationExchangeEvent ,
16+ UiPathConversationInterruptEvent ,
17+ UiPathConversationInterruptStartEvent ,
18+ UiPathConversationMessageEvent ,
1519)
20+ from uipath .runtime import UiPathRuntimeResult
1621from uipath .runtime .chat import UiPathChatProtocol
1722from uipath .runtime .context import UiPathRuntimeContext
1823
@@ -51,6 +56,7 @@ def __init__(
5156 self .headers = headers
5257 self ._client : AsyncClient | None = None
5358 self ._connected_event = asyncio .Event ()
59+ self ._waiting_for_resume = False
5460
5561 async def connect (self , timeout : float = 10.0 ) -> None :
5662 """Establish WebSocket connection to the server.
@@ -127,23 +133,7 @@ async def disconnect(self) -> None:
127133 logger .warning ("WebSocket client not connected" )
128134 return
129135
130- # Send exchange end event using stored IDs
131- if self ._client and self ._connected_event .is_set ():
132- try :
133- end_event = UiPathConversationEvent (
134- conversation_id = self .conversation_id ,
135- exchange = UiPathConversationExchangeEvent (
136- exchange_id = self .exchange_id ,
137- end = UiPathConversationExchangeEndEvent (),
138- ),
139- )
140- event_data = end_event .model_dump (
141- mode = "json" , exclude_none = True , by_alias = True
142- )
143- await self ._client .emit ("ConversationEvent" , event_data )
144- logger .info ("Exchange end event sent" )
145- except Exception as e :
146- logger .warning (f"Error sending exchange end event: { e } " )
136+ await self .emit_exchange_end_event ()
147137
148138 try :
149139 logger .info ("Disconnecting from WebSocket server" )
@@ -154,7 +144,9 @@ async def disconnect(self) -> None:
154144 finally :
155145 await self ._cleanup_client ()
156146
157- async def emit_message_event (self , message_event : Any ) -> None :
147+ async def emit_message_event (
148+ self , message_event : UiPathConversationMessageEvent
149+ ) -> None :
158150 """Wrap and send a message event to the WebSocket server.
159151
160152 Args:
@@ -169,6 +161,9 @@ async def emit_message_event(self, message_event: Any) -> None:
169161 if not self ._connected_event .is_set ():
170162 raise RuntimeError ("WebSocket client not in connected state" )
171163
164+ # Store the current message ID, used for emitting interrupt events.
165+ self ._current_message_id = message_event .message_id
166+
172167 try :
173168 # Wrap message event with conversation/exchange IDs
174169 wrapped_event = UiPathConversationEvent (
@@ -191,6 +186,83 @@ async def emit_message_event(self, message_event: Any) -> None:
191186 logger .error (f"Error sending conversation event to WebSocket: { e } " )
192187 raise RuntimeError (f"Failed to send conversation event: { e } " ) from e
193188
189+ async def emit_exchange_end_event (self ):
190+ # Send exchange end event using stored IDs
191+ if self ._client and self ._connected_event .is_set ():
192+ try :
193+ end_event = UiPathConversationEvent (
194+ conversation_id = self .conversation_id ,
195+ exchange = UiPathConversationExchangeEndEvent (
196+ exchange_id = self .exchange_id
197+ ),
198+ )
199+ event_data = end_event .model_dump (
200+ mode = "json" , exclude_none = True , by_alias = True
201+ )
202+ await self ._client .emit ("ConversationEvent" , event_data )
203+ logger .info ("Exchange end event sent" )
204+ except Exception as e :
205+ logger .warning (f"Error sending exchange end event: { e } " )
206+
207+ async def emit_interrupt_event (self , runtime_result : UiPathRuntimeResult ):
208+ # Send startInterrupt event using stored ID's
209+ if self ._client and self ._connected_event .is_set ():
210+ try :
211+
212+ self ._interrupt_id = str (uuid .uuid4 ())
213+
214+ interrupt_event = UiPathConversationEvent (
215+ conversation_id = self .conversation_id ,
216+ exchange = UiPathConversationExchangeEvent (
217+ exchange_id = self .exchange_id ,
218+ message = UiPathConversationMessageEvent (
219+ message_id = self ._current_message_id ,
220+ interrupt = UiPathConversationInterruptEvent (
221+ interrupt_id = self ._interrupt_id ,
222+ start = UiPathConversationInterruptStartEvent (
223+ type = "coded-agent-test" , value = runtime_result .output
224+ ),
225+ ),
226+ ),
227+ ),
228+ )
229+ event_data = interrupt_event .model_dump (
230+ mode = "json" , exclude_none = True , by_alias = True
231+ )
232+ await self ._client .emit ("ConversationEvent" , event_data )
233+ logger .info ("Interrupt event sent" )
234+ except Exception as e :
235+ logger .warning (f"Error sending interrupt event: { e } " )
236+
237+ async def wait_for_resume (self ) -> dict [str , Any ]:
238+ """Wait for the interrupt_end event to be received.
239+
240+ Returns:
241+ Resume data from the interrupt end event
242+ """
243+ if self ._client is None :
244+ raise RuntimeError ("WebSocket client not connected" )
245+
246+ # Initialize resume event and data
247+ self ._resume_event = asyncio .Event ()
248+ self ._resume_data = None
249+ self ._waiting_for_resume = True
250+
251+ # Register handler for interrupt events
252+ self ._client .on ("ConversationEvent" , self ._handle_conversation_event )
253+
254+ try :
255+ # Wait for the resume event to be signaled
256+ await self ._resume_event .wait ()
257+
258+ # Return the resume data
259+ resume_data = self ._resume_data or {}
260+
261+ return resume_data
262+ finally :
263+ # Clear the waiting flag
264+ self ._waiting_for_resume = False
265+
194266 @property
195267 def is_connected (self ) -> bool :
196268 """Check if the WebSocket is currently connected.
@@ -214,6 +286,38 @@ async def _handle_connect_error(self, data: Any) -> None:
214286 """Handle connection error event."""
215287 logger .error (f"WebSocket connection error: { data } " )
216288
289+ async def _handle_conversation_event (self , data : Any , * args : Any ) -> None :
290+ """Handle incoming conversation event from the server.
291+
292+ Args:
293+ data: The incoming conversation event data (JSON)
294+ *args: Additional arguments from Socket.IO
295+ """
296+ # Only process events when actively waiting for resume
297+ if not self ._waiting_for_resume :
298+ return
299+
300+ try :
301+ # Parse the incoming event as a UiPathConversationEvent
302+ event = UiPathConversationEvent .model_validate (data )
303+
304+ if isinstance (event .exchange , UiPathConversationExchangeEvent ):
305+ message = event .exchange .message
306+ if message and message .message_id == self ._current_message_id :
307+ if message .interrupt :
308+ if (
309+ message .interrupt .interrupt_id
310+ == self ._interrupt_id
311+ ):
312+ if message .interrupt .end :
313+ # Extract resume data from the end event
314+ # end is already a dict (typed as Any), no need to call model_dump
315+ self ._resume_data = message .interrupt .end
316+ self ._resume_event .set ()
317+ logger .info ("Resume event received" )
318+ except Exception as e :
319+ logger .error (f"Error handling conversation event: { e } " )
320+
217321 async def _cleanup_client (self ) -> None :
218322 """Clean up client resources."""
219323 self ._connected_event .clear ()
0 commit comments