@@ -40,7 +40,7 @@ def __init__(
4040 self ._config_etag : Optional [str ] = None
4141 self ._config_lastmodified : Optional [str ] = None
4242
43- # Exponential backoff configuration
43+ # Exponential backoff configuration
4444 self ._sse_reconnect_attempts = 0
4545 self ._min_reconnect_interval = 5.0 # Start at 5 seconds
4646 self ._max_reconnect_interval = 300.0 # Cap at 5 minutes
@@ -60,13 +60,15 @@ def _recreate_sse_connection(self):
6060 """Recreate the SSE connection with the current config."""
6161 with self ._sse_reconnect_lock :
6262 if self ._config is None or self ._options .disable_realtime_updates :
63- logger .debug ("Skipping SSE recreation - no config or updates disabled" )
63+ logger .debug ("Devcycle: Skipping SSE recreation - no config or updates disabled" )
6464 return
6565
6666 try :
6767 # Close existing connection if present
68- if self ._sse_manager is not None and self ._sse_manager .client is not None :
69- logger .debug ("Closing existing SSE connection before recreating" )
68+ if (
69+ self ._sse_manager is not None
70+ and self ._sse_manager .client is not None
71+ ):
7072 self ._sse_manager .client .close ()
7173 if self ._sse_manager .read_thread .is_alive ():
7274 self ._sse_manager .read_thread .join (timeout = 1.0 )
@@ -78,23 +80,21 @@ def _recreate_sse_connection(self):
7880 self .sse_message ,
7981 )
8082 self ._sse_manager .update (self ._config )
81- logger .info ("SSE connection recreated successfully" )
83+ logger .info ("Devcyle: SSE connection created successfully" )
8284 except Exception as e :
83- logger .error (f"Devcycle: Failed to recreate SSE connection: { e } " )
85+ logger .debug (f"Devcycle: Failed to recreate SSE connection: { e } " )
8486
8587 def _delayed_sse_reconnect (self , delay_seconds : float ):
8688 """Delayed SSE reconnection with configurable backoff."""
8789 try :
88- logger .debug (f"Waiting { delay_seconds } s before reconnecting SSE..." )
89- time .sleep (delay_seconds )
90- logger .debug ("Delay complete, attempting to recreate SSE connection" )
90+ logger .debug (f"Devcycle: Waiting { delay_seconds } s before reconnecting SSE..." )
91+ time .sleep (delay_seconds )
9192 self ._recreate_sse_connection ()
9293 except Exception as e :
93- logger .error (f"Error during delayed SSE reconnection: { e } " )
94+ logger .error (f"Devcycle: Error during delayed SSE reconnection: { e } " )
9495 finally :
9596 with self ._sse_reconnect_lock :
9697 self ._sse_reconnecting = False
97- logger .debug ("Reconnection attempt completed" )
9898
9999 def _get_config (self , last_modified : Optional [float ] = None ):
100100 try :
@@ -134,7 +134,9 @@ def _get_config(self, last_modified: Optional[float] = None):
134134 or self ._sse_manager .client is None
135135 or not self ._sse_manager .read_thread .is_alive ()
136136 ):
137- logger .info ("DevCycle: SSE connection not active, creating new connection" )
137+ logger .info (
138+ "DevCycle: SSE connection not active, creating new connection"
139+ )
138140 self ._recreate_sse_connection ()
139141
140142 if (
@@ -178,8 +180,7 @@ def sse_message(self, message: ld_eventsource.actions.Event):
178180 self .sse_state (None )
179181 logger .info (f"DevCycle: Received message: { message .data } " )
180182 sse_message = json .loads (message .data )
181-
182-
183+
183184 dvc_data = json .loads (sse_message .get ("data" ))
184185 if (
185186 dvc_data .get ("type" ) == "refetchConfig"
@@ -188,77 +189,67 @@ def sse_message(self, message: ld_eventsource.actions.Event):
188189 ):
189190 logger .info ("DevCycle: Received refetchConfig message - updating config" )
190191 self ._get_config (dvc_data ["lastModified" ] / 1000.0 )
191- # Succesfully maintained connection and received ping, reset our connect attempts.
192- if ( dvc_data .get ("type" ) == ' ping' ) :
192+ # SSE connection healthy, reconnect attempts reset .
193+ if dvc_data .get ("type" ) == " ping" or dvc_data . get ( "type" ) == "refetchConfig" :
193194 self ._sse_reconnect_attempts = 0
194195
195196 def sse_error (self , error : ld_eventsource .actions .Fault ):
196- """
197- Handle SSE connection errors with exponential backoff reconnection.
198-
199- Switches to polling mode (10s intervals) and attempts reconnection with backoff:
200- 5s → 10s → 20s → 40s → 80s → 160s → 300s (capped at 5 min).
201- Backoff resets on successful reconnection.
202-
203- Thread-safe with _sse_reconnect_lock to prevent concurrent reconnection attempts.
204- """
205- """Handle SSE connection errors with exponential backoff."""
206197 self ._sse_connected = False
207- logger .debug (f"SSE connection error: { error .error } " )
208- current_time = time .time ()
209-
198+ logger .debug (f"Devcyle: SSE connection error: { error .error } " )
199+ current_time = time .time ()
200+
210201 # Calculate exponential backoff interval (capped at max)
211202 backoff_interval = min (
212- self ._min_reconnect_interval * (2 ** self ._sse_reconnect_attempts ),
213- self ._max_reconnect_interval
203+ self ._min_reconnect_interval * (2 ** self ._sse_reconnect_attempts ),
204+ self ._max_reconnect_interval ,
214205 )
215206
216207 with self ._sse_reconnect_lock :
217- # Check if we're already reconnecting
218208 if self ._sse_reconnecting :
219- logger .debug ("Reconnection already in progress, skipping" )
209+ logger .debug ("Devcyle: Reconnection already in progress, skipping" )
220210 return
221-
211+
222212 # Check if we need to wait for backoff
223- if (self ._last_reconnect_attempt_time is not None and
224- current_time - self ._last_reconnect_attempt_time < backoff_interval ):
225- time_remaining = backoff_interval - (current_time - self ._last_reconnect_attempt_time )
213+ if (
214+ self ._last_reconnect_attempt_time is not None
215+ and current_time - self ._last_reconnect_attempt_time < backoff_interval
216+ ):
217+ time_remaining = backoff_interval - (
218+ current_time - self ._last_reconnect_attempt_time
219+ )
226220 logger .debug (
227- f"Skipping reconnection attempt, waiting for backoff period "
221+ f"Devcyle: Skipping reconnection attempt, waiting for backoff period "
228222 f"({ time_remaining :.1f} s remaining of { backoff_interval :.1f} s)"
229223 )
230224 return
231-
232- # Mark that we're now reconnecting
225+
233226 self ._sse_reconnecting = True
234227 self ._last_reconnect_attempt_time = current_time
235228 self ._sse_reconnect_attempts += 1
236-
237- logger .info (
238- f"Attempting SSE reconnection (attempt #{ self ._sse_reconnect_attempts } , "
229+
230+ logger .debug (
231+ f"Devcyle: Attempting SSE reconnection (attempt #{ self ._sse_reconnect_attempts } , "
239232 f"next backoff: { backoff_interval :.1f} s)"
240233 )
241-
234+
242235 reconnect_thread = threading .Thread (
243- target = self ._delayed_sse_reconnect ,
244- args = (backoff_interval ,),
245- daemon = True
236+ target = self ._delayed_sse_reconnect , args = (backoff_interval ,), daemon = True
246237 )
247238 reconnect_thread .start ()
248239
249240 def sse_state (self , state : Optional [ld_eventsource .actions .Start ]):
250241 if not self ._sse_connected :
251242 self ._sse_connected = True
252243 logger .info ("DevCycle: Connected to SSE stream" )
253-
254- # Clear reconnection state on successful connection
244+
245+ # Clear reconnection state
255246 with self ._sse_reconnect_lock :
256247 self ._sse_reconnecting = False
257248 self ._last_reconnect_attempt_time = None
258249 else :
259- logger .debug ("SSE keepalive received" )
250+ logger .debug ("Devcyle: SSE keepalive received" )
260251
261252 def close (self ):
262253 self ._polling_enabled = False
263254 if self ._sse_manager is not None and self ._sse_manager .client is not None :
264- self ._sse_manager .client .close ()
255+ self ._sse_manager .client .close ()
0 commit comments