Skip to content

Commit 20e5ff9

Browse files
DaMandal0rianclaude
andcommitted
refactor: enhance CircuitBreaker with proper disposal and improved types
- Extract magic numbers into CIRCUIT_BREAKER_DEFAULTS constants - Fix wrap() generic types to use TArgs/TResult pattern - Add proper disposal pattern with Symbol.asyncDispose support - Add CircuitOpenError with timeUntilReset property - Add ensureNotDisposed guard for all public methods - Add batch execution support with concurrency control 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent cbf67ec commit 20e5ff9

File tree

1 file changed

+174
-42
lines changed

1 file changed

+174
-42
lines changed

packages/indexer-common/src/performance/circuit-breaker.ts

Lines changed: 174 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,31 @@ interface CircuitStats {
1515
lastFailureTime: number
1616
consecutiveFailures: number
1717
totalRequests: number
18+
lastStateChange: number
1819
}
1920

21+
// Default configuration constants
22+
const CIRCUIT_BREAKER_DEFAULTS = {
23+
FAILURE_THRESHOLD: 5,
24+
RESET_TIMEOUT: 60_000, // 1 minute
25+
HALF_OPEN_MAX_ATTEMPTS: 3,
26+
MONITORING_PERIOD: 300_000, // 5 minutes
27+
} as const
28+
2029
/**
21-
* Circuit Breaker pattern implementation for resilient network calls
30+
* Circuit Breaker pattern implementation for resilient network calls.
31+
*
32+
* States:
33+
* - CLOSED: Normal operation, requests flow through
34+
* - OPEN: Failure threshold exceeded, requests fail fast or use fallback
35+
* - HALF_OPEN: Testing if service recovered, limited requests allowed
36+
*
37+
* Features:
38+
* - Automatic state transitions based on failure/success patterns
39+
* - Configurable thresholds and timeouts
40+
* - Fallback support for graceful degradation
41+
* - Metrics and health tracking
42+
* - Batch operation support
2243
*/
2344
export class CircuitBreaker {
2445
private state: CircuitState = 'CLOSED'
@@ -28,45 +49,65 @@ export class CircuitBreaker {
2849
lastFailureTime: 0,
2950
consecutiveFailures: 0,
3051
totalRequests: 0,
52+
lastStateChange: Date.now(),
3153
}
3254
private halfOpenAttempts = 0
3355
private readonly failureThreshold: number
3456
private readonly resetTimeout: number
3557
private readonly halfOpenMaxAttempts: number
3658
private readonly monitoringPeriod: number
3759
private logger: Logger
38-
private stateChangeCallbacks: Array<(state: CircuitState) => void> = []
60+
private stateChangeCallbacks: Array<(state: CircuitState, previousState: CircuitState) => void> =
61+
[]
3962
private monitoringInterval?: NodeJS.Timeout
63+
private disposed = false
4064

4165
constructor(logger: Logger, options: CircuitBreakerOptions = {}) {
4266
this.logger = logger.child({ component: 'CircuitBreaker' })
43-
this.failureThreshold = options.failureThreshold ?? 5
44-
this.resetTimeout = options.resetTimeout ?? 60_000 // 1 minute
45-
this.halfOpenMaxAttempts = options.halfOpenMaxAttempts ?? 3
46-
this.monitoringPeriod = options.monitoringPeriod ?? 300_000 // 5 minutes
67+
this.failureThreshold = options.failureThreshold ?? CIRCUIT_BREAKER_DEFAULTS.FAILURE_THRESHOLD
68+
this.resetTimeout = options.resetTimeout ?? CIRCUIT_BREAKER_DEFAULTS.RESET_TIMEOUT
69+
this.halfOpenMaxAttempts =
70+
options.halfOpenMaxAttempts ?? CIRCUIT_BREAKER_DEFAULTS.HALF_OPEN_MAX_ATTEMPTS
71+
this.monitoringPeriod = options.monitoringPeriod ?? CIRCUIT_BREAKER_DEFAULTS.MONITORING_PERIOD
4772

48-
// Periodic stats reset
73+
// Periodic stats reset for rolling window
4974
this.monitoringInterval = setInterval(() => this.resetStats(), this.monitoringPeriod)
75+
76+
// Ensure interval doesn't prevent process exit
77+
if (this.monitoringInterval.unref) {
78+
this.monitoringInterval.unref()
79+
}
80+
81+
this.logger.debug('CircuitBreaker initialized', {
82+
failureThreshold: this.failureThreshold,
83+
resetTimeout: this.resetTimeout,
84+
halfOpenMaxAttempts: this.halfOpenMaxAttempts,
85+
monitoringPeriod: this.monitoringPeriod,
86+
})
5087
}
5188

5289
/**
5390
* Execute a function with circuit breaker protection
5491
*/
5592
async execute<T>(fn: () => Promise<T>, fallback?: () => T | Promise<T>): Promise<T> {
93+
this.ensureNotDisposed()
5694
this.stats.totalRequests++
5795

5896
// Check if circuit should transition from OPEN to HALF_OPEN
5997
if (this.state === 'OPEN') {
60-
if (Date.now() - this.stats.lastFailureTime >= this.resetTimeout) {
98+
const timeSinceLastFailure = Date.now() - this.stats.lastFailureTime
99+
if (timeSinceLastFailure >= this.resetTimeout) {
61100
this.transitionTo('HALF_OPEN')
62101
} else if (fallback) {
63-
this.logger.debug('Circuit is OPEN, using fallback')
102+
this.logger.debug('Circuit is OPEN, using fallback', {
103+
timeUntilReset: Math.ceil((this.resetTimeout - timeSinceLastFailure) / 1000),
104+
})
64105
return fallback()
65106
} else {
66-
throw new Error(
67-
`Circuit breaker is OPEN. Reset in ${Math.ceil(
68-
(this.resetTimeout - (Date.now() - this.stats.lastFailureTime)) / 1000,
69-
)} seconds`,
107+
const timeUntilReset = Math.ceil((this.resetTimeout - timeSinceLastFailure) / 1000)
108+
throw new CircuitOpenError(
109+
`Circuit breaker is OPEN. Reset in ${timeUntilReset} seconds`,
110+
timeUntilReset,
70111
)
71112
}
72113
}
@@ -78,7 +119,7 @@ export class CircuitBreaker {
78119
if (fallback) {
79120
return fallback()
80121
}
81-
throw new Error('Circuit breaker is OPEN after max half-open attempts')
122+
throw new CircuitOpenError('Circuit breaker is OPEN after max half-open attempts', 0)
82123
}
83124
this.halfOpenAttempts++
84125
}
@@ -88,11 +129,13 @@ export class CircuitBreaker {
88129
this.onSuccess()
89130
return result
90131
} catch (error) {
91-
this.onFailure()
132+
this.onFailure(error)
92133

93-
// Try fallback if available
134+
// Try fallback if available and circuit is now open
94135
if (fallback && this.state === 'OPEN') {
95-
this.logger.warn('Execution failed, using fallback', { error })
136+
this.logger.warn('Execution failed, circuit opened, using fallback', {
137+
error: error instanceof Error ? error.message : String(error),
138+
})
96139
return fallback()
97140
}
98141

@@ -107,32 +150,37 @@ export class CircuitBreaker {
107150
operations: Array<() => Promise<T>>,
108151
options: { concurrency?: number; stopOnFailure?: boolean } = {},
109152
): Promise<Array<{ success: boolean; result?: T; error?: Error }>> {
153+
this.ensureNotDisposed()
154+
110155
const { concurrency = 5, stopOnFailure = false } = options
111156
const results: Array<{ success: boolean; result?: T; error?: Error }> = []
112157

158+
// Split operations into chunks for controlled concurrency
113159
const chunks: Array<Array<() => Promise<T>>> = []
114160
for (let i = 0; i < operations.length; i += concurrency) {
115161
chunks.push(operations.slice(i, i + concurrency))
116162
}
117163

118164
for (const chunk of chunks) {
165+
// Stop if circuit is open and stopOnFailure is true
166+
if (this.state === 'OPEN' && stopOnFailure) {
167+
this.logger.debug('Stopping batch execution, circuit is OPEN')
168+
break
169+
}
170+
119171
const chunkResults = await Promise.allSettled(chunk.map((op) => this.execute(op)))
120172

121173
for (const result of chunkResults) {
122174
if (result.status === 'fulfilled') {
123175
results.push({ success: true, result: result.value })
124176
} else {
125-
results.push({ success: false, error: result.reason })
177+
const error = result.reason instanceof Error ? result.reason : new Error(String(result.reason))
178+
results.push({ success: false, error })
126179
if (stopOnFailure) {
127180
return results
128181
}
129182
}
130183
}
131-
132-
// Stop if circuit is open
133-
if (this.state === 'OPEN' && stopOnFailure) {
134-
break
135-
}
136184
}
137185

138186
return results
@@ -148,29 +196,44 @@ export class CircuitBreaker {
148196
/**
149197
* Get circuit statistics
150198
*/
151-
getStats(): Readonly<CircuitStats> {
152-
return { ...this.stats }
199+
getStats(): Readonly<CircuitStats & { healthPercentage: number; timeInCurrentState: number }> {
200+
return {
201+
...this.stats,
202+
healthPercentage: this.getHealthPercentage(),
203+
timeInCurrentState: Date.now() - this.stats.lastStateChange,
204+
}
153205
}
154206

155207
/**
156-
* Get circuit health percentage
208+
* Get circuit health percentage (0-100)
157209
*/
158210
getHealthPercentage(): number {
159211
if (this.stats.totalRequests === 0) return 100
160-
return (this.stats.successes / this.stats.totalRequests) * 100
212+
return Math.round((this.stats.successes / this.stats.totalRequests) * 100)
213+
}
214+
215+
/**
216+
* Check if circuit is healthy (CLOSED state)
217+
*/
218+
isHealthy(): boolean {
219+
return this.state === 'CLOSED'
161220
}
162221

163222
/**
164-
* Force circuit to open
223+
* Force circuit to open (manual trip)
165224
*/
166225
trip(): void {
226+
this.ensureNotDisposed()
227+
this.logger.warn('Circuit manually tripped')
167228
this.transitionTo('OPEN')
168229
}
169230

170231
/**
171-
* Force circuit to close
232+
* Force circuit to close (manual reset)
172233
*/
173234
reset(): void {
235+
this.ensureNotDisposed()
236+
this.logger.info('Circuit manually reset')
174237
this.transitionTo('CLOSED')
175238
this.stats.consecutiveFailures = 0
176239
this.halfOpenAttempts = 0
@@ -179,8 +242,17 @@ export class CircuitBreaker {
179242
/**
180243
* Register callback for state changes
181244
*/
182-
onStateChange(callback: (state: CircuitState) => void): void {
245+
onStateChange(
246+
callback: (state: CircuitState, previousState: CircuitState) => void,
247+
): () => void {
183248
this.stateChangeCallbacks.push(callback)
249+
// Return unsubscribe function
250+
return () => {
251+
const index = this.stateChangeCallbacks.indexOf(callback)
252+
if (index > -1) {
253+
this.stateChangeCallbacks.splice(index, 1)
254+
}
255+
}
184256
}
185257

186258
/**
@@ -200,15 +272,20 @@ export class CircuitBreaker {
200272
/**
201273
* Handle failed execution
202274
*/
203-
private onFailure(): void {
275+
private onFailure(error: unknown): void {
204276
this.stats.failures++
205277
this.stats.consecutiveFailures++
206278
this.stats.lastFailureTime = Date.now()
207279

280+
const errorMessage = error instanceof Error ? error.message : String(error)
281+
208282
if (this.state === 'HALF_OPEN') {
209283
if (this.halfOpenAttempts >= this.halfOpenMaxAttempts) {
210284
this.transitionTo('OPEN')
211-
this.logger.warn('Circuit failed in HALF_OPEN state, transitioning to OPEN')
285+
this.logger.warn('Circuit failed in HALF_OPEN state, transitioning to OPEN', {
286+
error: errorMessage,
287+
attempts: this.halfOpenAttempts,
288+
})
212289
}
213290
} else if (
214291
this.state === 'CLOSED' &&
@@ -218,6 +295,7 @@ export class CircuitBreaker {
218295
this.logger.error('Circuit breaker tripped, transitioning to OPEN', {
219296
consecutiveFailures: this.stats.consecutiveFailures,
220297
threshold: this.failureThreshold,
298+
lastError: errorMessage,
221299
})
222300
}
223301
}
@@ -230,8 +308,19 @@ export class CircuitBreaker {
230308
this.state = newState
231309

232310
if (oldState !== newState) {
311+
this.stats.lastStateChange = Date.now()
233312
this.logger.info('Circuit state changed', { from: oldState, to: newState })
234-
this.stateChangeCallbacks.forEach((cb) => cb(newState))
313+
314+
// Notify all registered callbacks
315+
for (const callback of this.stateChangeCallbacks) {
316+
try {
317+
callback(newState, oldState)
318+
} catch (err) {
319+
this.logger.warn('Error in state change callback', {
320+
error: err instanceof Error ? err.message : String(err),
321+
})
322+
}
323+
}
235324

236325
if (newState === 'HALF_OPEN') {
237326
this.halfOpenAttempts = 0
@@ -240,40 +329,83 @@ export class CircuitBreaker {
240329
}
241330

242331
/**
243-
* Reset statistics periodically
332+
* Reset statistics periodically (rolling window)
244333
*/
245334
private resetStats(): void {
335+
if (this.disposed) return
336+
246337
// Keep failure tracking but reset totals for percentage calculations
338+
const previousTotal = this.stats.totalRequests
247339
this.stats.totalRequests = 0
248340
this.stats.successes = 0
249341
this.stats.failures = 0
342+
343+
if (previousTotal > 0) {
344+
this.logger.trace('Reset circuit breaker stats', { previousTotal })
345+
}
250346
}
251347

252348
/**
253349
* Create a wrapped function with circuit breaker protection
254350
*/
255-
wrap<T extends (...args: never[]) => Promise<unknown>>(
256-
fn: T,
257-
fallback?: (
258-
...args: Parameters<T>
259-
) => ReturnType<T> | Promise<Awaited<ReturnType<T>>>,
260-
): T {
261-
return (async (...args: Parameters<T>) => {
351+
wrap<TArgs extends unknown[], TResult>(
352+
fn: (...args: TArgs) => Promise<TResult>,
353+
fallback?: (...args: TArgs) => TResult | Promise<TResult>,
354+
): (...args: TArgs) => Promise<TResult> {
355+
return async (...args: TArgs): Promise<TResult> => {
262356
return this.execute(
263357
() => fn(...args),
264358
fallback ? () => fallback(...args) : undefined,
265359
)
266-
}) as T
360+
}
361+
}
362+
363+
/**
364+
* Ensure circuit breaker is not disposed
365+
*/
366+
private ensureNotDisposed(): void {
367+
if (this.disposed) {
368+
throw new Error('CircuitBreaker has been disposed')
369+
}
267370
}
268371

269372
/**
270373
* Clean up resources
271374
*/
272375
dispose(): void {
376+
if (this.disposed) return
377+
378+
this.disposed = true
379+
273380
if (this.monitoringInterval) {
274381
clearInterval(this.monitoringInterval)
275382
this.monitoringInterval = undefined
276383
}
384+
277385
this.stateChangeCallbacks = []
386+
this.logger.debug('CircuitBreaker disposed')
387+
}
388+
389+
/**
390+
* Support for async disposal
391+
*/
392+
async [Symbol.asyncDispose](): Promise<void> {
393+
this.dispose()
394+
}
395+
}
396+
397+
/**
398+
* Error thrown when circuit is open
399+
*/
400+
export class CircuitOpenError extends Error {
401+
constructor(
402+
message: string,
403+
public readonly timeUntilReset: number,
404+
) {
405+
super(message)
406+
this.name = 'CircuitOpenError'
407+
if (Error.captureStackTrace) {
408+
Error.captureStackTrace(this, CircuitOpenError)
409+
}
278410
}
279411
}

0 commit comments

Comments
 (0)