|
15 | 15 | logger = logging.getLogger(__name__) |
16 | 16 | tracer = trace.get_tracer(__name__) |
17 | 17 |
|
| 18 | +MAX_RETRIES = 3 |
| 19 | +RETRY_DELAY = 1 |
18 | 20 |
|
19 | 21 | class SessionServer: |
20 | 22 | """Manages a server process for a specific session.""" |
@@ -58,21 +60,22 @@ async def start(self) -> None: |
58 | 60 |
|
59 | 61 | async def on_message_received(self) -> None: |
60 | 62 | """Get new incoming messages from UiPath MCP Server.""" |
61 | | - response = await self._uipath.api_client.request_async( |
62 | | - "GET", |
63 | | - f"mcp_/mcp/{self._server_config.name}/in/messages?sessionId={self._session_id}", |
64 | | - ) |
65 | | - if response.status_code == 200: |
66 | | - messages = response.json() |
67 | | - for message in messages: |
68 | | - logger.info(f"Received message: {message}") |
69 | | - json_message = types.JSONRPCMessage.model_validate(message) |
70 | | - with self._mcp_tracer.create_span_for_message( |
71 | | - json_message, |
72 | | - session_id=self._session_id, |
73 | | - server_name=self._server_config.name, |
74 | | - ) as _: |
75 | | - await self._message_queue.put(json_message) |
| 63 | + for attempt in range(MAX_RETRIES + 1): |
| 64 | + try: |
| 65 | + await self._get_messages_internal() |
| 66 | + break |
| 67 | + except Exception as e: |
| 68 | + logger.error( |
| 69 | + f"Error receiving messages for session {self._session_id}: {e}", |
| 70 | + exc_info=True, |
| 71 | + ) |
| 72 | + if attempt < MAX_RETRIES: |
| 73 | + await asyncio.sleep(RETRY_DELAY) |
| 74 | + else: |
| 75 | + logger.error( |
| 76 | + f"Max retries reached for receiving messages in session {self._session_id}" |
| 77 | + ) |
| 78 | + raise |
76 | 79 |
|
77 | 80 | async def stop(self) -> None: |
78 | 81 | """Clean up resources and stop the server.""" |
@@ -176,21 +179,56 @@ async def _send_message(self, message: types.JSONRPCMessage) -> None: |
176 | 179 | """Send new message to UiPath MCP Server.""" |
177 | 180 | with self._mcp_tracer.create_span_for_message( |
178 | 181 | message, session_id=self._session_id, server_name=self._server_config.name |
179 | | - ) as span: |
180 | | - try: |
181 | | - response = await self._uipath.api_client.request_async( |
182 | | - "POST", |
183 | | - f"mcp_/mcp/{self._server_config.name}/out/message?sessionId={self._session_id}", |
184 | | - json=message.model_dump(), |
185 | | - ) |
186 | | - if response.status_code == 202: |
187 | | - logger.info( |
188 | | - f"Outgoing message sent to UiPath MCP Server: {message}" |
189 | | - ) |
190 | | - else: |
191 | | - self._mcp_tracer.record_http_error( |
192 | | - span, response.status_code, response.text |
| 182 | + ) as _: |
| 183 | + for attempt in range(MAX_RETRIES + 1): |
| 184 | + try: |
| 185 | + await self._send_message_internal(message) |
| 186 | + break |
| 187 | + except Exception as e: |
| 188 | + logger.error( |
| 189 | + f"Error sending message to UiPath MCP Server for session {self._session_id}: {e}", |
| 190 | + exc_info=True, |
193 | 191 | ) |
194 | | - except Exception as e: |
195 | | - self._mcp_tracer.record_exception(span, e) |
196 | | - raise |
| 192 | + if attempt < MAX_RETRIES: |
| 193 | + await asyncio.sleep(RETRY_DELAY) |
| 194 | + else: |
| 195 | + logger.error( |
| 196 | + f"Max retries reached for sending message in session {self._session_id}" |
| 197 | + ) |
| 198 | + raise |
| 199 | + |
| 200 | + async def _send_message_internal(self, message: types.JSONRPCMessage) -> None: |
| 201 | + response = await self._uipath.api_client.request_async( |
| 202 | + "POST", |
| 203 | + f"mcp_/mcp/{self._server_config.name}/out/message?sessionId={self._session_id}", |
| 204 | + json=message.model_dump(), |
| 205 | + ) |
| 206 | + if response.status_code == 202: |
| 207 | + logger.info( |
| 208 | + f"Outgoing message sent to UiPath MCP Server: {message}" |
| 209 | + ) |
| 210 | + elif 500 <= response.status_code < 600: |
| 211 | + raise Exception( |
| 212 | + f"{response.status_code} - {response.text}" |
| 213 | + ) |
| 214 | + |
| 215 | + async def _get_messages_internal(self) -> None: |
| 216 | + response = await self._uipath.api_client.request_async( |
| 217 | + "GET", |
| 218 | + f"mcp_/mcp/{self._server_config.name}/in/messages?sessionId={self._session_id}", |
| 219 | + ) |
| 220 | + if response.status_code == 200: |
| 221 | + messages = response.json() |
| 222 | + for message in messages: |
| 223 | + logger.info(f"Received message: {message}") |
| 224 | + json_message = types.JSONRPCMessage.model_validate(message) |
| 225 | + with self._mcp_tracer.create_span_for_message( |
| 226 | + json_message, |
| 227 | + session_id=self._session_id, |
| 228 | + server_name=self._server_config.name, |
| 229 | + ) as _: |
| 230 | + await self._message_queue.put(json_message) |
| 231 | + elif 500 <= response.status_code < 600: |
| 232 | + raise Exception( |
| 233 | + f"{response.status_code} - {response.text}" |
| 234 | + ) |
0 commit comments