Skip to content

Commit 5c1e78a

Browse files
committed
ENH: Removes from cache after success
1 parent 6c61960 commit 5c1e78a

File tree

4 files changed

+72
-50
lines changed

4 files changed

+72
-50
lines changed

service/notifydistributor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type Notification struct {
1616
Parameters string
1717
TimeNano int64
1818
Context context.Context
19-
Done chan struct{}
19+
ErrorChan chan error
2020
}
2121

2222
type internalNotification struct {
@@ -181,8 +181,8 @@ func (d NotifyDistributor) distributeServiceNotification(n Notification) {
181181
}
182182
wg.Wait()
183183

184-
if n.Done != nil {
185-
n.Done <- struct{}{}
184+
if n.ErrorChan != nil {
185+
n.ErrorChan <- nil
186186
}
187187
}
188188

@@ -200,8 +200,8 @@ func (d NotifyDistributor) distributeNodeNotification(n Notification) {
200200
}(endpoint)
201201
}
202202
wg.Wait()
203-
if n.Done != nil {
204-
n.Done <- struct{}{}
203+
if n.ErrorChan != nil {
204+
n.ErrorChan <- nil
205205
}
206206
}
207207

service/notifydistributor_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ func (s *NotifyDistributorTestSuite) Test_NewNotifyDistributorFromEnv_Node() {
277277

278278
func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints_Servies() {
279279

280-
service1Done := make(chan struct{})
281-
service2Done := make(chan struct{})
280+
service1ErrChan := make(chan error)
281+
service2ErrChan := make(chan error)
282282

283283
serviceNotifyMock1 := notificationSenderMock{}
284284
serviceNotifyMock1.On("Create", mock.AnythingOfType("*context.cancelCtx"), "hello=world").
@@ -317,7 +317,7 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
317317
Parameters: "hello=world",
318318
TimeNano: int64(1),
319319
Context: s.ctx,
320-
Done: service1Done,
320+
ErrorChan: service1ErrChan,
321321
}
322322
}()
323323
go func() {
@@ -327,21 +327,21 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
327327
Parameters: "hello=world2",
328328
TimeNano: int64(2),
329329
Context: s.ctx,
330-
Done: service2Done,
330+
ErrorChan: service2ErrChan,
331331
}
332332
}()
333333

334334
timer := time.NewTimer(time.Second * 5).C
335335

336336
for {
337-
if service1Done == nil && service2Done == nil {
337+
if service1ErrChan == nil && service2ErrChan == nil {
338338
break
339339
}
340340
select {
341-
case <-service1Done:
342-
service1Done = nil
343-
case <-service2Done:
344-
service2Done = nil
341+
case <-service1ErrChan:
342+
service1ErrChan = nil
343+
case <-service2ErrChan:
344+
service2ErrChan = nil
345345
case <-timer:
346346
s.Fail("Timeout")
347347
return
@@ -353,8 +353,8 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
353353
}
354354

355355
func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints_Nodes1() {
356-
node1Done := make(chan struct{})
357-
node2Done := make(chan struct{})
356+
node1ErrChan := make(chan error)
357+
node2ErrChan := make(chan error)
358358

359359
nodesNotifyMock1 := notificationSenderMock{}
360360
nodesNotifyMock1.On("Create", mock.AnythingOfType("*context.cancelCtx"), "hello=world").
@@ -392,7 +392,7 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
392392
Parameters: "hello=world",
393393
TimeNano: int64(1),
394394
Context: s.ctx,
395-
Done: node1Done,
395+
ErrorChan: node1ErrChan,
396396
}
397397
}()
398398
go func() {
@@ -402,21 +402,21 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
402402
Parameters: "hello=world2",
403403
TimeNano: int64(2),
404404
Context: s.ctx,
405-
Done: node2Done,
405+
ErrorChan: node2ErrChan,
406406
}
407407
}()
408408

409409
timer := time.NewTimer(time.Second * 5).C
410410

411411
for {
412-
if node1Done == nil && node2Done == nil {
412+
if node1ErrChan == nil && node2ErrChan == nil {
413413
break
414414
}
415415
select {
416-
case <-node1Done:
417-
node1Done = nil
418-
case <-node2Done:
419-
node2Done = nil
416+
case <-node1ErrChan:
417+
node1ErrChan = nil
418+
case <-node2ErrChan:
419+
node2ErrChan = nil
420420
case <-timer:
421421
s.Fail("Timeout")
422422
return

service/swarmlistener.go

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package service
22

33
import (
44
"context"
5+
"fmt"
56
"log"
67
"os"
78
"strings"
@@ -203,18 +204,17 @@ func (l *SwarmListener) processServiceEventCreate(event Event) {
203204
ctx := l.ServiceCreateRemoveCancelManager.AddEvent(event)
204205
defer l.ServiceCreateRemoveCancelManager.RemoveEvent(event)
205206

206-
doneChan := make(chan struct{})
207+
errChan := make(chan error)
207208

208209
go func() {
209210
service, err := l.SSClient.SwarmServiceInspect(ctx, event.ID, l.IncludeNodeInfo)
210211
if err != nil {
211-
if !strings.Contains(err.Error(), "context canceled") {
212-
l.Log.Printf("ERROR: %v", err)
213-
}
212+
errChan <- err
214213
return
215214
}
216215
// Ignored service (filtered by `com.df.notify`)
217216
if service == nil {
217+
errChan <- nil
218218
return
219219
}
220220
ssm := MinifySwarmService(*service, l.IgnoreKey, l.IncludeKey)
@@ -223,6 +223,7 @@ func (l *SwarmListener) processServiceEventCreate(event Event) {
223223
// Store in cache
224224
isUpdated := l.SSCache.InsertAndCheck(ssm)
225225
if !isUpdated {
226+
errChan <- nil
226227
return
227228
}
228229
metrics.RecordService(l.SSCache.Len())
@@ -231,12 +232,17 @@ func (l *SwarmListener) processServiceEventCreate(event Event) {
231232
params := GetSwarmServiceMiniCreateParameters(ssm)
232233
paramsEncoded := ConvertMapStringStringToURLValues(params).Encode()
233234
l.placeOnNotificationChan(
234-
l.SSNotificationChan, event.Type, event.TimeNano, ssm.ID, paramsEncoded, doneChan)
235+
l.SSNotificationChan, event.Type, event.TimeNano, ssm.ID, paramsEncoded, errChan)
235236
}()
236237

237238
for {
238239
select {
239-
case <-doneChan:
240+
case err := <-errChan:
241+
if err != nil {
242+
if !strings.Contains(err.Error(), "context canceled") {
243+
l.Log.Printf("ERROR: %v", err)
244+
}
245+
}
240246
return
241247
case <-ctx.Done():
242248
return
@@ -248,26 +254,32 @@ func (l *SwarmListener) processServiceEventRemove(event Event) {
248254
ctx := l.ServiceCreateRemoveCancelManager.AddEvent(event)
249255
defer l.ServiceCreateRemoveCancelManager.RemoveEvent(event)
250256

251-
doneChan := make(chan struct{})
257+
errChan := make(chan error)
252258

253259
go func() {
254260

255261
ssm, ok := l.SSCache.Get(event.ID)
256262
if !ok {
263+
errChan <- fmt.Errorf("%s not in cache", event.ID)
257264
return
258265
}
259-
l.SSCache.Delete(ssm.ID)
260-
metrics.RecordService(l.SSCache.Len())
261-
262266
params := GetSwarmServiceMiniRemoveParameters(ssm)
263267
paramsEncoded := ConvertMapStringStringToURLValues(params).Encode()
264268
l.placeOnNotificationChan(
265-
l.SSNotificationChan, event.Type, event.TimeNano, ssm.ID, paramsEncoded, doneChan)
269+
l.SSNotificationChan, event.Type, event.TimeNano, ssm.ID, paramsEncoded, errChan)
266270
}()
267271

268272
for {
269273
select {
270-
case <-doneChan:
274+
case err := <-errChan:
275+
if err != nil {
276+
if !strings.Contains(err.Error(), "not in cache") {
277+
l.Log.Printf("ERROR: %v", err)
278+
}
279+
return
280+
}
281+
l.SSCache.Delete(event.ID)
282+
metrics.RecordService(l.SSCache.Len())
271283
return
272284
case <-ctx.Done():
273285
return
@@ -299,15 +311,13 @@ func (l *SwarmListener) processNodeEventCreate(event Event) {
299311
ctx := l.NodeCreateRemoveCancelManager.AddEvent(event)
300312
defer l.NodeCreateRemoveCancelManager.RemoveEvent(event)
301313

302-
doneChan := make(chan struct{})
314+
errChan := make(chan error)
303315

304316
go func() {
305317

306318
node, err := l.NodeClient.NodeInspect(event.ID)
307319
if err != nil {
308-
if !strings.Contains(err.Error(), "context canceled") {
309-
l.Log.Printf("ERROR: %v", err)
310-
}
320+
errChan <- err
311321
return
312322
}
313323
nm := MinifyNode(node)
@@ -316,17 +326,24 @@ func (l *SwarmListener) processNodeEventCreate(event Event) {
316326
// Store in cache
317327
isUpdated := l.NodeCache.InsertAndCheck(nm)
318328
if !isUpdated {
329+
errChan <- nil
319330
return
320331
}
321332
}
322333
params := GetNodeMiniCreateParameters(nm)
323334
paramsEncoded := ConvertMapStringStringToURLValues(params).Encode()
324-
l.placeOnNotificationChan(l.NodeNotificationChan, event.Type, event.TimeNano, nm.ID, paramsEncoded, doneChan)
335+
l.placeOnNotificationChan(l.NodeNotificationChan, event.Type, event.TimeNano, nm.ID, paramsEncoded, errChan)
325336
}()
326337

327338
for {
328339
select {
329-
case <-doneChan:
340+
case err := <-errChan:
341+
if err != nil {
342+
if !strings.Contains(err.Error(), "context canceled") {
343+
l.Log.Printf("ERROR: %v", err)
344+
}
345+
return
346+
}
330347
l.NotifyServices(true)
331348
return
332349
case <-ctx.Done():
@@ -339,22 +356,29 @@ func (l *SwarmListener) processNodeEventRemove(event Event) {
339356
ctx := l.NodeCreateRemoveCancelManager.AddEvent(event)
340357
defer l.NodeCreateRemoveCancelManager.RemoveEvent(event)
341358

342-
doneChan := make(chan struct{})
359+
errChan := make(chan error)
343360
go func() {
344361
nm, ok := l.NodeCache.Get(event.ID)
345362
if !ok {
363+
errChan <- fmt.Errorf("%s not in cache", event.ID)
346364
return
347365
}
348-
l.NodeCache.Delete(nm.ID)
349366

350367
params := GetNodeMiniRemoveParameters(nm)
351368
paramsEncoded := ConvertMapStringStringToURLValues(params).Encode()
352-
l.placeOnNotificationChan(l.NodeNotificationChan, event.Type, event.TimeNano, nm.ID, paramsEncoded, doneChan)
369+
l.placeOnNotificationChan(l.NodeNotificationChan, event.Type, event.TimeNano, nm.ID, paramsEncoded, errChan)
353370
}()
354371

355372
for {
356373
select {
357-
case <-doneChan:
374+
case err := <-errChan:
375+
if err != nil {
376+
if !strings.Contains(err.Error(), "not in cache") {
377+
l.Log.Printf("ERROR: %v", err)
378+
}
379+
return
380+
}
381+
l.NodeCache.Delete(event.ID)
358382
l.NotifyServices(true)
359383
return
360384
case <-ctx.Done():
@@ -395,13 +419,13 @@ func (l SwarmListener) NotifyNodes(useCache bool) {
395419
}()
396420
}
397421

398-
func (l SwarmListener) placeOnNotificationChan(notiChan chan<- Notification, eventType EventType, timeNano int64, ID string, parameters string, doneChan chan struct{}) {
422+
func (l SwarmListener) placeOnNotificationChan(notiChan chan<- Notification, eventType EventType, timeNano int64, ID string, parameters string, errorChan chan error) {
399423
notiChan <- Notification{
400424
EventType: eventType,
401425
ID: ID,
402426
Parameters: parameters,
403427
TimeNano: timeNano,
404-
Done: doneChan,
428+
ErrorChan: errorChan,
405429
}
406430
}
407431

service/swarmlistener_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ func (s *SwarmListenerTestSuite) Test_Run_ServicesChannel() {
8181
s.SSClientMock.On("SwarmServiceInspect", mock.AnythingOfType("*context.cancelCtx"), "serviceID1", true).Return(&ss1, nil)
8282
s.SSCacheMock.On("InsertAndCheck", ss1m).Return(true).
8383
On("Get", "serviceID2").Return(ss2m, true).
84-
On("Delete", "serviceID2").
8584
On("Len").Return(2)
8685
s.NotifyDistributorMock.
8786
On("HasServiceListeners").Return(true).
@@ -169,8 +168,7 @@ func (s *SwarmListenerTestSuite) Test_Run_NodeChannel() {
169168
s.NodeListeningMock.On("ListenForNodeEvents", mock.AnythingOfType("chan<- service.Event"))
170169
s.NodeClientMock.On("NodeInspect", "nodeID1").Return(n1, nil)
171170
s.NodeCacheMock.On("InsertAndCheck", n1m).Return(true).
172-
On("Get", "nodeID2").Return(n2m, true).
173-
On("Delete", "nodeID2")
171+
On("Get", "nodeID2").Return(n2m, true)
174172
s.NotifyDistributorMock.
175173
On("HasServiceListeners").Return(false).
176174
On("HasNodeListeners").Return(true).

0 commit comments

Comments
 (0)