@@ -15,10 +15,31 @@ interface CircuitStats {
1515 lastFailureTime : number
1616 consecutiveFailures : number
1717 totalRequests : number
18+ lastStateChange : number
1819}
1920
21+ // Default configuration constants
22+ const CIRCUIT_BREAKER_DEFAULTS = {
23+ FAILURE_THRESHOLD : 5 ,
24+ RESET_TIMEOUT : 60_000 , // 1 minute
25+ HALF_OPEN_MAX_ATTEMPTS : 3 ,
26+ MONITORING_PERIOD : 300_000 , // 5 minutes
27+ } as const
28+
2029/**
21- * Circuit Breaker pattern implementation for resilient network calls
30+ * Circuit Breaker pattern implementation for resilient network calls.
31+ *
32+ * States:
33+ * - CLOSED: Normal operation, requests flow through
34+ * - OPEN: Failure threshold exceeded, requests fail fast or use fallback
35+ * - HALF_OPEN: Testing if service recovered, limited requests allowed
36+ *
37+ * Features:
38+ * - Automatic state transitions based on failure/success patterns
39+ * - Configurable thresholds and timeouts
40+ * - Fallback support for graceful degradation
41+ * - Metrics and health tracking
42+ * - Batch operation support
2243 */
2344export class CircuitBreaker {
2445 private state : CircuitState = 'CLOSED'
@@ -28,45 +49,65 @@ export class CircuitBreaker {
2849 lastFailureTime : 0 ,
2950 consecutiveFailures : 0 ,
3051 totalRequests : 0 ,
52+ lastStateChange : Date . now ( ) ,
3153 }
3254 private halfOpenAttempts = 0
3355 private readonly failureThreshold : number
3456 private readonly resetTimeout : number
3557 private readonly halfOpenMaxAttempts : number
3658 private readonly monitoringPeriod : number
3759 private logger : Logger
38- private stateChangeCallbacks : Array < ( state : CircuitState ) => void > = [ ]
60+ private stateChangeCallbacks : Array < ( state : CircuitState , previousState : CircuitState ) => void > =
61+ [ ]
3962 private monitoringInterval ?: NodeJS . Timeout
63+ private disposed = false
4064
4165 constructor ( logger : Logger , options : CircuitBreakerOptions = { } ) {
4266 this . logger = logger . child ( { component : 'CircuitBreaker' } )
43- this . failureThreshold = options . failureThreshold ?? 5
44- this . resetTimeout = options . resetTimeout ?? 60_000 // 1 minute
45- this . halfOpenMaxAttempts = options . halfOpenMaxAttempts ?? 3
46- this . monitoringPeriod = options . monitoringPeriod ?? 300_000 // 5 minutes
67+ this . failureThreshold = options . failureThreshold ?? CIRCUIT_BREAKER_DEFAULTS . FAILURE_THRESHOLD
68+ this . resetTimeout = options . resetTimeout ?? CIRCUIT_BREAKER_DEFAULTS . RESET_TIMEOUT
69+ this . halfOpenMaxAttempts =
70+ options . halfOpenMaxAttempts ?? CIRCUIT_BREAKER_DEFAULTS . HALF_OPEN_MAX_ATTEMPTS
71+ this . monitoringPeriod = options . monitoringPeriod ?? CIRCUIT_BREAKER_DEFAULTS . MONITORING_PERIOD
4772
48- // Periodic stats reset
73+ // Periodic stats reset for rolling window
4974 this . monitoringInterval = setInterval ( ( ) => this . resetStats ( ) , this . monitoringPeriod )
75+
76+ // Ensure interval doesn't prevent process exit
77+ if ( this . monitoringInterval . unref ) {
78+ this . monitoringInterval . unref ( )
79+ }
80+
81+ this . logger . debug ( 'CircuitBreaker initialized' , {
82+ failureThreshold : this . failureThreshold ,
83+ resetTimeout : this . resetTimeout ,
84+ halfOpenMaxAttempts : this . halfOpenMaxAttempts ,
85+ monitoringPeriod : this . monitoringPeriod ,
86+ } )
5087 }
5188
5289 /**
5390 * Execute a function with circuit breaker protection
5491 */
5592 async execute < T > ( fn : ( ) => Promise < T > , fallback ?: ( ) => T | Promise < T > ) : Promise < T > {
93+ this . ensureNotDisposed ( )
5694 this . stats . totalRequests ++
5795
5896 // Check if circuit should transition from OPEN to HALF_OPEN
5997 if ( this . state === 'OPEN' ) {
60- if ( Date . now ( ) - this . stats . lastFailureTime >= this . resetTimeout ) {
98+ const timeSinceLastFailure = Date . now ( ) - this . stats . lastFailureTime
99+ if ( timeSinceLastFailure >= this . resetTimeout ) {
61100 this . transitionTo ( 'HALF_OPEN' )
62101 } else if ( fallback ) {
63- this . logger . debug ( 'Circuit is OPEN, using fallback' )
102+ this . logger . debug ( 'Circuit is OPEN, using fallback' , {
103+ timeUntilReset : Math . ceil ( ( this . resetTimeout - timeSinceLastFailure ) / 1000 ) ,
104+ } )
64105 return fallback ( )
65106 } else {
66- throw new Error (
67- `Circuit breaker is OPEN. Reset in ${ Math . ceil (
68- ( this . resetTimeout - ( Date . now ( ) - this . stats . lastFailureTime ) ) / 1000 ,
69- ) } seconds` ,
107+ const timeUntilReset = Math . ceil ( ( this . resetTimeout - timeSinceLastFailure ) / 1000 )
108+ throw new CircuitOpenError (
109+ `Circuit breaker is OPEN. Reset in ${ timeUntilReset } seconds` ,
110+ timeUntilReset ,
70111 )
71112 }
72113 }
@@ -78,7 +119,7 @@ export class CircuitBreaker {
78119 if ( fallback ) {
79120 return fallback ( )
80121 }
81- throw new Error ( 'Circuit breaker is OPEN after max half-open attempts' )
122+ throw new CircuitOpenError ( 'Circuit breaker is OPEN after max half-open attempts' , 0 )
82123 }
83124 this . halfOpenAttempts ++
84125 }
@@ -88,11 +129,13 @@ export class CircuitBreaker {
88129 this . onSuccess ( )
89130 return result
90131 } catch ( error ) {
91- this . onFailure ( )
132+ this . onFailure ( error )
92133
93- // Try fallback if available
134+ // Try fallback if available and circuit is now open
94135 if ( fallback && this . state === 'OPEN' ) {
95- this . logger . warn ( 'Execution failed, using fallback' , { error } )
136+ this . logger . warn ( 'Execution failed, circuit opened, using fallback' , {
137+ error : error instanceof Error ? error . message : String ( error ) ,
138+ } )
96139 return fallback ( )
97140 }
98141
@@ -107,32 +150,37 @@ export class CircuitBreaker {
107150 operations : Array < ( ) => Promise < T > > ,
108151 options : { concurrency ?: number ; stopOnFailure ?: boolean } = { } ,
109152 ) : Promise < Array < { success : boolean ; result ?: T ; error ?: Error } > > {
153+ this . ensureNotDisposed ( )
154+
110155 const { concurrency = 5 , stopOnFailure = false } = options
111156 const results : Array < { success : boolean ; result ?: T ; error ?: Error } > = [ ]
112157
158+ // Split operations into chunks for controlled concurrency
113159 const chunks : Array < Array < ( ) => Promise < T > > > = [ ]
114160 for ( let i = 0 ; i < operations . length ; i += concurrency ) {
115161 chunks . push ( operations . slice ( i , i + concurrency ) )
116162 }
117163
118164 for ( const chunk of chunks ) {
165+ // Stop if circuit is open and stopOnFailure is true
166+ if ( this . state === 'OPEN' && stopOnFailure ) {
167+ this . logger . debug ( 'Stopping batch execution, circuit is OPEN' )
168+ break
169+ }
170+
119171 const chunkResults = await Promise . allSettled ( chunk . map ( ( op ) => this . execute ( op ) ) )
120172
121173 for ( const result of chunkResults ) {
122174 if ( result . status === 'fulfilled' ) {
123175 results . push ( { success : true , result : result . value } )
124176 } else {
125- results . push ( { success : false , error : result . reason } )
177+ const error = result . reason instanceof Error ? result . reason : new Error ( String ( result . reason ) )
178+ results . push ( { success : false , error } )
126179 if ( stopOnFailure ) {
127180 return results
128181 }
129182 }
130183 }
131-
132- // Stop if circuit is open
133- if ( this . state === 'OPEN' && stopOnFailure ) {
134- break
135- }
136184 }
137185
138186 return results
@@ -148,29 +196,44 @@ export class CircuitBreaker {
148196 /**
149197 * Get circuit statistics
150198 */
151- getStats ( ) : Readonly < CircuitStats > {
152- return { ...this . stats }
199+ getStats ( ) : Readonly < CircuitStats & { healthPercentage : number ; timeInCurrentState : number } > {
200+ return {
201+ ...this . stats ,
202+ healthPercentage : this . getHealthPercentage ( ) ,
203+ timeInCurrentState : Date . now ( ) - this . stats . lastStateChange ,
204+ }
153205 }
154206
155207 /**
156- * Get circuit health percentage
208+ * Get circuit health percentage (0-100)
157209 */
158210 getHealthPercentage ( ) : number {
159211 if ( this . stats . totalRequests === 0 ) return 100
160- return ( this . stats . successes / this . stats . totalRequests ) * 100
212+ return Math . round ( ( this . stats . successes / this . stats . totalRequests ) * 100 )
213+ }
214+
215+ /**
216+ * Check if circuit is healthy (CLOSED state)
217+ */
218+ isHealthy ( ) : boolean {
219+ return this . state === 'CLOSED'
161220 }
162221
163222 /**
164- * Force circuit to open
223+ * Force circuit to open (manual trip)
165224 */
166225 trip ( ) : void {
226+ this . ensureNotDisposed ( )
227+ this . logger . warn ( 'Circuit manually tripped' )
167228 this . transitionTo ( 'OPEN' )
168229 }
169230
170231 /**
171- * Force circuit to close
232+ * Force circuit to close (manual reset)
172233 */
173234 reset ( ) : void {
235+ this . ensureNotDisposed ( )
236+ this . logger . info ( 'Circuit manually reset' )
174237 this . transitionTo ( 'CLOSED' )
175238 this . stats . consecutiveFailures = 0
176239 this . halfOpenAttempts = 0
@@ -179,8 +242,17 @@ export class CircuitBreaker {
179242 /**
180243 * Register callback for state changes
181244 */
182- onStateChange ( callback : ( state : CircuitState ) => void ) : void {
245+ onStateChange (
246+ callback : ( state : CircuitState , previousState : CircuitState ) => void ,
247+ ) : ( ) => void {
183248 this . stateChangeCallbacks . push ( callback )
249+ // Return unsubscribe function
250+ return ( ) => {
251+ const index = this . stateChangeCallbacks . indexOf ( callback )
252+ if ( index > - 1 ) {
253+ this . stateChangeCallbacks . splice ( index , 1 )
254+ }
255+ }
184256 }
185257
186258 /**
@@ -200,15 +272,20 @@ export class CircuitBreaker {
200272 /**
201273 * Handle failed execution
202274 */
203- private onFailure ( ) : void {
275+ private onFailure ( error : unknown ) : void {
204276 this . stats . failures ++
205277 this . stats . consecutiveFailures ++
206278 this . stats . lastFailureTime = Date . now ( )
207279
280+ const errorMessage = error instanceof Error ? error . message : String ( error )
281+
208282 if ( this . state === 'HALF_OPEN' ) {
209283 if ( this . halfOpenAttempts >= this . halfOpenMaxAttempts ) {
210284 this . transitionTo ( 'OPEN' )
211- this . logger . warn ( 'Circuit failed in HALF_OPEN state, transitioning to OPEN' )
285+ this . logger . warn ( 'Circuit failed in HALF_OPEN state, transitioning to OPEN' , {
286+ error : errorMessage ,
287+ attempts : this . halfOpenAttempts ,
288+ } )
212289 }
213290 } else if (
214291 this . state === 'CLOSED' &&
@@ -218,6 +295,7 @@ export class CircuitBreaker {
218295 this . logger . error ( 'Circuit breaker tripped, transitioning to OPEN' , {
219296 consecutiveFailures : this . stats . consecutiveFailures ,
220297 threshold : this . failureThreshold ,
298+ lastError : errorMessage ,
221299 } )
222300 }
223301 }
@@ -230,8 +308,19 @@ export class CircuitBreaker {
230308 this . state = newState
231309
232310 if ( oldState !== newState ) {
311+ this . stats . lastStateChange = Date . now ( )
233312 this . logger . info ( 'Circuit state changed' , { from : oldState , to : newState } )
234- this . stateChangeCallbacks . forEach ( ( cb ) => cb ( newState ) )
313+
314+ // Notify all registered callbacks
315+ for ( const callback of this . stateChangeCallbacks ) {
316+ try {
317+ callback ( newState , oldState )
318+ } catch ( err ) {
319+ this . logger . warn ( 'Error in state change callback' , {
320+ error : err instanceof Error ? err . message : String ( err ) ,
321+ } )
322+ }
323+ }
235324
236325 if ( newState === 'HALF_OPEN' ) {
237326 this . halfOpenAttempts = 0
@@ -240,40 +329,83 @@ export class CircuitBreaker {
240329 }
241330
242331 /**
243- * Reset statistics periodically
332+ * Reset statistics periodically (rolling window)
244333 */
245334 private resetStats ( ) : void {
335+ if ( this . disposed ) return
336+
246337 // Keep failure tracking but reset totals for percentage calculations
338+ const previousTotal = this . stats . totalRequests
247339 this . stats . totalRequests = 0
248340 this . stats . successes = 0
249341 this . stats . failures = 0
342+
343+ if ( previousTotal > 0 ) {
344+ this . logger . trace ( 'Reset circuit breaker stats' , { previousTotal } )
345+ }
250346 }
251347
252348 /**
253349 * Create a wrapped function with circuit breaker protection
254350 */
255- wrap < T extends ( ...args : never [ ] ) => Promise < unknown > > (
256- fn : T ,
257- fallback ?: (
258- ...args : Parameters < T >
259- ) => ReturnType < T > | Promise < Awaited < ReturnType < T > > > ,
260- ) : T {
261- return ( async ( ...args : Parameters < T > ) => {
351+ wrap < TArgs extends unknown [ ] , TResult > (
352+ fn : ( ...args : TArgs ) => Promise < TResult > ,
353+ fallback ?: ( ...args : TArgs ) => TResult | Promise < TResult > ,
354+ ) : ( ...args : TArgs ) => Promise < TResult > {
355+ return async ( ...args : TArgs ) : Promise < TResult > => {
262356 return this . execute (
263357 ( ) => fn ( ...args ) ,
264358 fallback ? ( ) => fallback ( ...args ) : undefined ,
265359 )
266- } ) as T
360+ }
361+ }
362+
363+ /**
364+ * Ensure circuit breaker is not disposed
365+ */
366+ private ensureNotDisposed ( ) : void {
367+ if ( this . disposed ) {
368+ throw new Error ( 'CircuitBreaker has been disposed' )
369+ }
267370 }
268371
269372 /**
270373 * Clean up resources
271374 */
272375 dispose ( ) : void {
376+ if ( this . disposed ) return
377+
378+ this . disposed = true
379+
273380 if ( this . monitoringInterval ) {
274381 clearInterval ( this . monitoringInterval )
275382 this . monitoringInterval = undefined
276383 }
384+
277385 this . stateChangeCallbacks = [ ]
386+ this . logger . debug ( 'CircuitBreaker disposed' )
387+ }
388+
389+ /**
390+ * Support for async disposal
391+ */
392+ async [ Symbol . asyncDispose ] ( ) : Promise < void > {
393+ this . dispose ( )
394+ }
395+ }
396+
397+ /**
398+ * Error thrown when circuit is open
399+ */
400+ export class CircuitOpenError extends Error {
401+ constructor (
402+ message : string ,
403+ public readonly timeUntilReset : number ,
404+ ) {
405+ super ( message )
406+ this . name = 'CircuitOpenError'
407+ if ( Error . captureStackTrace ) {
408+ Error . captureStackTrace ( this , CircuitOpenError )
409+ }
278410 }
279411}
0 commit comments