Skip to content

Commit cf38abd

Browse files
authored
RFC: Simplifies cancel managing (#16)
1 parent 40da6c3 commit cf38abd

File tree

6 files changed

+53
-137
lines changed

6 files changed

+53
-137
lines changed

service/cancelmanager.go

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
type CancelManaging interface {
1010
Add(rootCtx context.Context, id string, reqID int64) context.Context
1111
Delete(id string, reqID int64) bool
12-
ForceDelete(id string) bool
1312
}
1413

1514
type cancelPair struct {
@@ -19,28 +18,26 @@ type cancelPair struct {
1918

2019
// CancelManager implements the `CancelManaging` interface that is thread safe
2120
type CancelManager struct {
22-
v map[string]cancelPair
23-
mux sync.Mutex
24-
cancelBeforeAdding bool
21+
v map[string]cancelPair
22+
mux sync.Mutex
2523
}
2624

2725
// NewCancelManager creates a new `CancelManager`
28-
func NewCancelManager(cancelBeforeAdding bool) *CancelManager {
26+
func NewCancelManager() *CancelManager {
2927
return &CancelManager{
30-
v: map[string]cancelPair{},
31-
mux: sync.Mutex{},
32-
cancelBeforeAdding: cancelBeforeAdding,
28+
v: map[string]cancelPair{},
29+
mux: sync.Mutex{},
3330
}
3431
}
3532

3633
// Add creates an context for `id` and `reqID` and returns that context.
37-
// If `id` exists in memory and cancelBeforeAdding is true, the task with that `id` will be canceled.
34+
// If `id` exists in memory, the task with that `id` will be canceled.
3835
func (m *CancelManager) Add(rootCtx context.Context, id string, reqID int64) context.Context {
3936
m.mux.Lock()
4037
defer m.mux.Unlock()
4138

4239
pair, ok := m.v[id]
43-
if m.cancelBeforeAdding && ok {
40+
if ok {
4441
pair.Cancel()
4542
delete(m.v, id)
4643
}
@@ -71,19 +68,3 @@ func (m *CancelManager) Delete(id string, reqID int64) bool {
7168
delete(m.v, id)
7269
return true
7370
}
74-
75-
// ForceDelete deletes an id without looking at the `reqID` and count
76-
func (m *CancelManager) ForceDelete(id string) bool {
77-
m.mux.Lock()
78-
defer m.mux.Unlock()
79-
80-
pair, ok := m.v[id]
81-
82-
if !ok {
83-
return false
84-
}
85-
86-
pair.Cancel()
87-
delete(m.v, id)
88-
return true
89-
}

service/cancelmanager_test.go

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (s *CancelManagerTestSuite) SetupSuite() {
2222
}
2323

2424
func (s *CancelManagerTestSuite) Test_Add_IDEqual_CancelsContext_Returns_Context() {
25-
cm := NewCancelManager(true)
25+
cm := NewCancelManager()
2626
ctx := cm.Add(s.ctx, "id1", 1)
2727
cm.Add(s.ctx, "id1", 2)
2828

@@ -42,7 +42,7 @@ L:
4242

4343
func (s *CancelManagerTestSuite) Test_Add_IDNotExist_Returns_Context() {
4444

45-
cm := NewCancelManager(true)
45+
cm := NewCancelManager()
4646
firstCtx := cm.Add(s.ctx, "id1", 1)
4747
s.NotNil(firstCtx)
4848

@@ -51,7 +51,7 @@ func (s *CancelManagerTestSuite) Test_Add_IDNotExist_Returns_Context() {
5151
}
5252

5353
func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDNotEqual_DoesNothing() {
54-
cm := NewCancelManager(true)
54+
cm := NewCancelManager()
5555
cm.Add(s.ctx, "id1", 1)
5656

5757
s.Require().Len(cm.v, 1)
@@ -63,7 +63,7 @@ func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDNotEqual_DoesNothing()
6363
}
6464

6565
func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDEqual_CallsCancel_RemovesFromMemory() {
66-
cm := NewCancelManager(true)
66+
cm := NewCancelManager()
6767
ctx := cm.Add(s.ctx, "id1", 1)
6868

6969
s.Require().Len(cm.v, 1)
@@ -85,31 +85,11 @@ L:
8585

8686
func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDEqual_CntNotZero_StaysInMemory() {
8787
// Set startingCnt to 2
88-
cm := NewCancelManager(true)
88+
cm := NewCancelManager()
8989
cm.Add(s.ctx, "id1", 1)
9090
s.Require().Len(cm.v, 1)
9191
s.Require().Contains(cm.v, "id1")
9292

9393
s.True(cm.Delete("id1", 1))
9494
s.Require().Len(cm.v, 0)
9595
}
96-
97-
func (s *CancelManagerTestSuite) Test_ForceDelete() {
98-
cm := NewCancelManager(true)
99-
ctx := cm.Add(s.ctx, "id1", 1)
100-
s.Require().Len(cm.v, 1)
101-
102-
s.False(cm.ForceDelete("DOESNOTEXIST"))
103-
s.True(cm.ForceDelete("id1"))
104-
105-
L:
106-
for {
107-
select {
108-
case <-time.After(time.Second * 5):
109-
s.Fail("Timeout")
110-
return
111-
case <-ctx.Done():
112-
break L
113-
}
114-
}
115-
}

service/notifydistributor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ func newNotifyDistributorfromStrings(serviceCreateAddrs, serviceRemoveAddrs, nod
9999

100100
return newNotifyDistributor(
101101
notifyEndpoints,
102-
NewCancelManager(true),
103-
NewCancelManager(true),
102+
NewCancelManager(),
103+
NewCancelManager(),
104104
interval,
105105
logger)
106106
}

service/notifydistributor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,8 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
304304
},
305305
}
306306

307-
notifyD := newNotifyDistributor(endpoints, NewCancelManager(true),
308-
NewCancelManager(true), 1, s.log)
307+
notifyD := newNotifyDistributor(endpoints, NewCancelManager(),
308+
NewCancelManager(), 1, s.log)
309309
serviceChan := make(chan Notification)
310310

311311
notifyD.Run(serviceChan, nil)
@@ -379,8 +379,8 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
379379
},
380380
}
381381

382-
notifyD := newNotifyDistributor(endpoints, NewCancelManager(true),
383-
NewCancelManager(true), 1, s.log)
382+
notifyD := newNotifyDistributor(endpoints, NewCancelManager(),
383+
NewCancelManager(), 1, s.log)
384384
nodeChan := make(chan Notification)
385385

386386
notifyD.Run(nil, nodeChan)

service/swarmlistener.go

Lines changed: 33 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -22,41 +22,6 @@ type SwarmListening interface {
2222
GetNodesParameters(ctx context.Context) ([]map[string]string, error)
2323
}
2424

25-
// CreateRemoveCancelManager combines two cancel managers for creating and
26-
// removing services
27-
type CreateRemoveCancelManager struct {
28-
createCancelManager CancelManaging
29-
removeCancelManager CancelManaging
30-
mux sync.RWMutex
31-
}
32-
33-
// AddEvent controls canceling for creating and removing services
34-
// A create event will cancel delete events with the same ID
35-
// A remove event will cancel create events with the same ID
36-
func (c *CreateRemoveCancelManager) AddEvent(event Event) context.Context {
37-
c.mux.Lock()
38-
defer c.mux.Unlock()
39-
if event.Type == EventTypeCreate {
40-
c.removeCancelManager.ForceDelete(event.ID)
41-
return c.createCancelManager.Add(context.Background(), event.ID, event.TimeNano)
42-
}
43-
// EventTypeRemove
44-
c.createCancelManager.ForceDelete(event.ID)
45-
return c.removeCancelManager.Add(context.Background(), event.ID, event.TimeNano)
46-
}
47-
48-
// RemoveEvent removes and cancels event from its corresponding
49-
// cancel manager
50-
func (c *CreateRemoveCancelManager) RemoveEvent(event Event) bool {
51-
c.mux.Lock()
52-
defer c.mux.Unlock()
53-
if event.Type == EventTypeCreate {
54-
return c.createCancelManager.Delete(event.ID, event.TimeNano)
55-
}
56-
// EventTypeRemove
57-
return c.removeCancelManager.Delete(event.ID, event.TimeNano)
58-
}
59-
6025
// SwarmListener provides public api
6126
type SwarmListener struct {
6227
SSListener SwarmServiceListening
@@ -75,13 +40,13 @@ type SwarmListener struct {
7540

7641
NotifyDistributor NotifyDistributing
7742

78-
ServiceCreateRemoveCancelManager *CreateRemoveCancelManager
79-
NodeCreateRemoveCancelManager *CreateRemoveCancelManager
80-
IncludeNodeInfo bool
81-
UseDockerServiceEvents bool
82-
IgnoreKey string
83-
IncludeKey string
84-
Log *log.Logger
43+
ServiceCancelManager CancelManaging
44+
NodeCancelManager CancelManaging
45+
IncludeNodeInfo bool
46+
UseDockerServiceEvents bool
47+
IgnoreKey string
48+
IncludeKey string
49+
Log *log.Logger
8550
}
8651

8752
func newSwarmListener(
@@ -100,10 +65,8 @@ func newSwarmListener(
10065

10166
notifyDistributor NotifyDistributing,
10267

103-
serviceCreateCancelManager CancelManaging,
104-
serviceRemoveCancelManager CancelManaging,
105-
nodeCreateCancelManager CancelManaging,
106-
nodeRemoveCancelManager CancelManaging,
68+
serviceCancelManager CancelManaging,
69+
nodeCancelManager CancelManaging,
10770
includeNodeInfo bool,
10871
useDockerServiceEvents bool,
10972
ignoreKey string,
@@ -112,24 +75,20 @@ func newSwarmListener(
11275
) *SwarmListener {
11376

11477
return &SwarmListener{
115-
SSListener: ssListener,
116-
SSClient: ssClient,
117-
SSCache: ssCache,
118-
SSPoller: ssPoller,
119-
SSEventChan: ssEventChan,
120-
SSNotificationChan: ssNotificationChan,
121-
NodeListener: nodeListener,
122-
NodeClient: nodeClient,
123-
NodeCache: nodeCache,
124-
NodeEventChan: nodeEventChan,
125-
NodeNotificationChan: nodeNotificationChan,
126-
NotifyDistributor: notifyDistributor,
127-
ServiceCreateRemoveCancelManager: &CreateRemoveCancelManager{
128-
createCancelManager: serviceCreateCancelManager,
129-
removeCancelManager: serviceRemoveCancelManager},
130-
NodeCreateRemoveCancelManager: &CreateRemoveCancelManager{
131-
createCancelManager: nodeCreateCancelManager,
132-
removeCancelManager: nodeRemoveCancelManager},
78+
SSListener: ssListener,
79+
SSClient: ssClient,
80+
SSCache: ssCache,
81+
SSPoller: ssPoller,
82+
SSEventChan: ssEventChan,
83+
SSNotificationChan: ssNotificationChan,
84+
NodeListener: nodeListener,
85+
NodeClient: nodeClient,
86+
NodeCache: nodeCache,
87+
NodeEventChan: nodeEventChan,
88+
NodeNotificationChan: nodeNotificationChan,
89+
NotifyDistributor: notifyDistributor,
90+
ServiceCancelManager: serviceCancelManager,
91+
NodeCancelManager: nodeCancelManager,
13392
IncludeNodeInfo: includeNodeInfo,
13493
UseDockerServiceEvents: useDockerServiceEvents,
13594
IgnoreKey: ignoreKey,
@@ -204,10 +163,8 @@ func NewSwarmListenerFromEnv(
204163
nodeEventChan,
205164
nodeNotificationChan,
206165
notifyDistributor,
207-
NewCancelManager(true),
208-
NewCancelManager(true),
209-
NewCancelManager(true),
210-
NewCancelManager(true),
166+
NewCancelManager(),
167+
NewCancelManager(),
211168
includeNodeInfo,
212169
useDockerServiceEvents,
213170
ignoreKey,
@@ -253,8 +210,8 @@ func (l *SwarmListener) connectServiceChannels() {
253210
}
254211

255212
func (l *SwarmListener) processServiceEventCreate(event Event) {
256-
ctx := l.ServiceCreateRemoveCancelManager.AddEvent(event)
257-
defer l.ServiceCreateRemoveCancelManager.RemoveEvent(event)
213+
ctx := l.ServiceCancelManager.Add(context.Background(), event.ID, event.TimeNano)
214+
defer l.ServiceCancelManager.Delete(event.ID, event.TimeNano)
258215

259216
errChan := make(chan error)
260217

@@ -303,8 +260,8 @@ func (l *SwarmListener) processServiceEventCreate(event Event) {
303260
}
304261

305262
func (l *SwarmListener) processServiceEventRemove(event Event) {
306-
ctx := l.ServiceCreateRemoveCancelManager.AddEvent(event)
307-
defer l.ServiceCreateRemoveCancelManager.RemoveEvent(event)
263+
ctx := l.ServiceCancelManager.Add(context.Background(), event.ID, event.TimeNano)
264+
defer l.ServiceCancelManager.Delete(event.ID, event.TimeNano)
308265

309266
errChan := make(chan error)
310267

@@ -353,8 +310,8 @@ func (l *SwarmListener) connectNodeChannels() {
353310
}
354311

355312
func (l *SwarmListener) processNodeEventCreate(event Event) {
356-
ctx := l.NodeCreateRemoveCancelManager.AddEvent(event)
357-
defer l.NodeCreateRemoveCancelManager.RemoveEvent(event)
313+
ctx := l.NodeCancelManager.Add(context.Background(), event.ID, event.TimeNano)
314+
defer l.NodeCancelManager.Delete(event.ID, event.TimeNano)
358315

359316
errChan := make(chan error)
360317

@@ -398,8 +355,8 @@ func (l *SwarmListener) processNodeEventCreate(event Event) {
398355
}
399356

400357
func (l *SwarmListener) processNodeEventRemove(event Event) {
401-
ctx := l.NodeCreateRemoveCancelManager.AddEvent(event)
402-
defer l.NodeCreateRemoveCancelManager.RemoveEvent(event)
358+
ctx := l.NodeCancelManager.Add(context.Background(), event.ID, event.TimeNano)
359+
defer l.NodeCancelManager.Delete(event.ID, event.TimeNano)
403360

404361
errChan := make(chan error)
405362
go func() {

service/swarmlistener_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,8 @@ func (s *SwarmListenerTestSuite) SetupTest() {
6363
make(chan Event),
6464
make(chan Notification),
6565
s.NotifyDistributorMock,
66-
NewCancelManager(true),
67-
NewCancelManager(true),
68-
NewCancelManager(true),
69-
NewCancelManager(true),
66+
NewCancelManager(),
67+
NewCancelManager(),
7068
false,
7169
true,
7270
"com.df.notify",

0 commit comments

Comments
 (0)