@@ -46,7 +46,6 @@ def __init__(
4646 self ._max_reconnect_interval = 300.0 # Cap at 5 minutes
4747 self ._last_reconnect_attempt_time : Optional [float ] = None
4848 self ._sse_reconnecting = False
49- self ._sse_reconnect_lock = threading .Lock ()
5049 self ._config_api_client = ConfigAPIClient (self ._sdk_key , self ._options )
5150
5251 self ._polling_enabled = True
@@ -58,47 +57,45 @@ def is_initialized(self) -> bool:
5857
5958 def _recreate_sse_connection (self ):
6059 """Recreate the SSE connection with the current config."""
61- with self ._sse_reconnect_lock :
62- if self ._config is None or self ._options .disable_realtime_updates :
63- logger .debug (
64- "Devcycle: Skipping SSE recreation - no config or updates disabled"
65- )
66- return
60+ if self ._config is None or self ._options .disable_realtime_updates :
61+ logger .debug (
62+ "DevCycle: Skipping SSE recreation - no config or updates disabled"
63+ )
64+ return
6765
68- try :
69- # Close existing connection if present
70- if (
71- self . _sse_manager is not None
72- and self . _sse_manager . client is not None
73- ) :
74- self ._sse_manager .client .close ()
75- if self ._sse_manager .read_thread .is_alive ():
76- self ._sse_manager .read_thread .join (timeout = 1.0 )
77-
78- # Create new SSE manager
79- self ._sse_manager = SSEManager (
80- self .sse_state ,
81- self .sse_error ,
82- self .sse_message ,
83- )
84- self ._sse_manager .update (self ._config )
85- logger . info ( "Devcyle: SSE connection created successfully" )
86- except Exception as e :
87- logger .debug (f"Devcycle : Failed to recreate SSE connection: { e } " )
66+ # Update timestamp right before attempting connection
67+ self . _last_reconnect_attempt_time = time . time ()
68+
69+ try :
70+ # Close existing connection if present
71+ if self . _sse_manager is not None and self . _sse_manager . client is not None :
72+ self ._sse_manager .client .close ()
73+ if self ._sse_manager .read_thread .is_alive ():
74+ self ._sse_manager .read_thread .join (timeout = 1.0 )
75+
76+ # Create new SSE manager
77+ self ._sse_manager = SSEManager (
78+ self .sse_state ,
79+ self .sse_error ,
80+ self .sse_message ,
81+ )
82+ self ._sse_manager .update (self ._config )
83+
84+ except Exception as e :
85+ logger .debug (f"DevCycle : Failed to recreate SSE connection: { e } " )
8886
8987 def _delayed_sse_reconnect (self , delay_seconds : float ):
9088 """Delayed SSE reconnection with configurable backoff."""
9189 try :
9290 logger .debug (
93- f"Devcycle : Waiting { delay_seconds } s before reconnecting SSE..."
91+ f"DevCycle : Waiting { delay_seconds } s before reconnecting SSE..."
9492 )
9593 time .sleep (delay_seconds )
9694 self ._recreate_sse_connection ()
9795 except Exception as e :
98- logger .error (f"Devcycle : Error during delayed SSE reconnection: { e } " )
96+ logger .error (f"DevCycle : Error during delayed SSE reconnection: { e } " )
9997 finally :
100- with self ._sse_reconnect_lock :
101- self ._sse_reconnecting = False
98+ self ._sse_reconnecting = False
10299
103100 def _get_config (self , last_modified : Optional [float ] = None ):
104101 try :
@@ -199,45 +196,39 @@ def sse_message(self, message: ld_eventsource.actions.Event):
199196
200197 def sse_error (self , error : ld_eventsource .actions .Fault ):
201198 self ._sse_connected = False
202- logger .debug (f"Devcyle : SSE connection error: { error .error } " )
199+ logger .debug (f"DevCyle : SSE connection error: { error .error } " )
203200 current_time = time .time ()
204201
202+ if self ._sse_reconnecting :
203+ logger .debug ("DevCyle: Reconnection already in progress, skipping" )
204+ return
205+
205206 # Calculate exponential backoff interval (capped at max)
206207 backoff_interval = min (
207208 self ._min_reconnect_interval * (2 ** self ._sse_reconnect_attempts ),
208209 self ._max_reconnect_interval ,
209210 )
210211
211- with self ._sse_reconnect_lock :
212- if self ._sse_reconnecting :
213- logger .debug ("Devcyle: Reconnection already in progress, skipping" )
214- return
215-
216- # Check if we need to wait for backoff
217- if (
218- self ._last_reconnect_attempt_time is not None
219- and current_time - self ._last_reconnect_attempt_time < backoff_interval
220- ):
221- time_remaining = backoff_interval - (
222- current_time - self ._last_reconnect_attempt_time
223- )
212+ # Check if we need to wait for remaining backoff time
213+ delay_seconds = backoff_interval
214+ if self ._last_reconnect_attempt_time is not None :
215+ time_since_last_attempt = current_time - self ._last_reconnect_attempt_time
216+ if time_since_last_attempt < backoff_interval :
217+ delay_seconds = backoff_interval - time_since_last_attempt
224218 logger .debug (
225- f"Devcyle: Skipping reconnection attempt, waiting for backoff period "
226- f"({ time_remaining :.1f} s remaining of { backoff_interval :.1f} s)"
219+ f"DevCyle: Within backoff period, scheduling reconnection in { delay_seconds :.1f} s"
227220 )
228- return
229221
230- self ._sse_reconnecting = True
231- self ._last_reconnect_attempt_time = current_time
232- self ._sse_reconnect_attempts += 1
222+ self ._sse_reconnecting = True
223+ self ._sse_reconnect_attempts += 1
233224
234225 logger .debug (
235- f"Devcyle : Attempting SSE reconnection (attempt #{ self ._sse_reconnect_attempts } , "
236- f"next backoff: { backoff_interval :.1f} s)"
226+ f"DevCyle : Attempting SSE reconnection (attempt #{ self ._sse_reconnect_attempts } , "
227+ f"backoff: { delay_seconds :.1f} s)"
237228 )
238229
239230 reconnect_thread = threading .Thread (
240- target = self ._delayed_sse_reconnect , args = (backoff_interval ,), daemon = True
231+ target = self ._delayed_sse_reconnect , args = (delay_seconds ,), daemon = True
241232 )
242233 reconnect_thread .start ()
243234
@@ -247,11 +238,10 @@ def sse_state(self, state: Optional[ld_eventsource.actions.Start]):
247238 logger .info ("DevCycle: Connected to SSE stream" )
248239
249240 # Clear reconnection state
250- with self ._sse_reconnect_lock :
251- self ._sse_reconnecting = False
252- self ._last_reconnect_attempt_time = None
241+ self ._sse_reconnecting = False
242+ self ._last_reconnect_attempt_time = None
253243 else :
254- logger .debug ("Devcyle : SSE keepalive received" )
244+ logger .debug ("DevCyle : SSE keepalive received" )
255245
256246 def close (self ):
257247 self ._polling_enabled = False
0 commit comments