Skip to content

Commit 7e0c57d

Browse files
committed
ENH: Increases memory efficiency
1 parent 5c1e78a commit 7e0c57d

File tree

2 files changed

+36
-25
lines changed

2 files changed

+36
-25
lines changed

service/swarmlistener.go

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,26 @@ func NewSwarmListenerFromEnv(retries, interval int, logger *log.Logger) (*SwarmL
135135
if err != nil {
136136
return nil, err
137137
}
138-
ssListener := NewSwarmServiceListener(dockerClient, logger)
139-
ssClient := NewSwarmServiceClient(dockerClient, ignoreKey, "com.df.scrapeNetwork", logger)
140-
ssCache := NewSwarmServiceCache()
138+
notifyDistributor := NewNotifyDistributorFromEnv(retries, interval, logger)
141139

142-
nodeListener := NewNodeListener(dockerClient, logger)
143-
nodeClient := NewNodeClient(dockerClient)
144-
nodeCache := NewNodeCache()
140+
var ssListener *SwarmServiceListener
141+
var ssClient *SwarmServiceClient
142+
var ssCache *SwarmServiceCache
143+
var nodeListener *NodeListener
144+
var nodeClient *NodeClient
145+
var nodeCache *NodeCache
146+
147+
if notifyDistributor.HasServiceListeners() {
148+
ssListener = NewSwarmServiceListener(dockerClient, logger)
149+
ssClient = NewSwarmServiceClient(dockerClient, ignoreKey, "com.df.scrapeNetwork", logger)
150+
ssCache = NewSwarmServiceCache()
151+
}
145152

146-
notifyDistributor := NewNotifyDistributorFromEnv(retries, interval, logger)
153+
if notifyDistributor.HasNodeListeners() {
154+
nodeListener = NewNodeListener(dockerClient, logger)
155+
nodeClient = NewNodeClient(dockerClient)
156+
nodeCache = NewNodeCache()
157+
}
147158

148159
return newSwarmListener(
149160
ssListener,
@@ -167,13 +178,13 @@ func NewSwarmListenerFromEnv(retries, interval int, logger *log.Logger) (*SwarmL
167178

168179
// Run starts swarm listener
169180
func (l *SwarmListener) Run() {
170-
l.connectServiceChannels()
171-
l.connectNodeChannels()
172181

173-
if l.SSEventChan != nil {
182+
if l.NotifyDistributor.HasServiceListeners() {
183+
l.connectServiceChannels()
174184
l.SSListener.ListenForServiceEvents(l.SSEventChan)
175185
}
176-
if l.NodeEventChan != nil {
186+
if l.NotifyDistributor.HasNodeListeners() {
187+
l.connectNodeChannels()
177188
l.NodeListener.ListenForNodeEvents(l.NodeEventChan)
178189
}
179190

@@ -182,13 +193,6 @@ func (l *SwarmListener) Run() {
182193

183194
func (l *SwarmListener) connectServiceChannels() {
184195

185-
// Remove service channels if there are no service listeners
186-
if !l.NotifyDistributor.HasServiceListeners() {
187-
l.SSEventChan = nil
188-
l.SSNotificationChan = nil
189-
return
190-
}
191-
192196
go func() {
193197
for event := range l.SSEventChan {
194198
if event.Type == EventTypeCreate {
@@ -289,13 +293,6 @@ func (l *SwarmListener) processServiceEventRemove(event Event) {
289293

290294
func (l *SwarmListener) connectNodeChannels() {
291295

292-
// Remove node channels if there are no service listeners
293-
if !l.NotifyDistributor.HasNodeListeners() {
294-
l.NodeEventChan = nil
295-
l.NodeNotificationChan = nil
296-
return
297-
}
298-
299296
go func() {
300297
for event := range l.NodeEventChan {
301298
if event.Type == EventTypeCreate {
@@ -389,6 +386,11 @@ func (l *SwarmListener) processNodeEventRemove(event Event) {
389386

390387
// NotifyServices places all services on queue to notify services on service events
391388
func (l SwarmListener) NotifyServices(useCache bool) {
389+
390+
if !l.NotifyDistributor.HasServiceListeners() {
391+
return
392+
}
393+
392394
services, err := l.SSClient.SwarmServiceList(context.Background())
393395
if err != nil {
394396
l.Log.Printf("ERROR: NotifyService, %v", err)
@@ -405,6 +407,11 @@ func (l SwarmListener) NotifyServices(useCache bool) {
405407

406408
// NotifyNodes places all services on queue to notify serivces on node events
407409
func (l SwarmListener) NotifyNodes(useCache bool) {
410+
411+
if !l.NotifyDistributor.HasNodeListeners() {
412+
return
413+
}
414+
408415
nodes, err := l.NodeClient.NodeList(context.Background())
409416
if err != nil {
410417
l.Log.Printf("ERROR: NotifyNodes, %v", err)

service/swarmlistener_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ func (s *SwarmListenerTestSuite) Test_NotifyServices_WithCache() {
236236
},
237237
}
238238
s.SSClientMock.On("SwarmServiceList", mock.AnythingOfType("*context.emptyCtx")).Return(expServices, nil)
239+
s.NotifyDistributorMock.On("HasServiceListeners").Return(true)
239240

240241
s.SwarmListener.NotifyServices(true)
241242

@@ -274,6 +275,7 @@ func (s *SwarmListenerTestSuite) Test_NotifyServices_WithoutCache() {
274275
},
275276
}
276277
s.SSClientMock.On("SwarmServiceList", mock.AnythingOfType("*context.emptyCtx")).Return(expServices, nil)
278+
s.NotifyDistributorMock.On("HasServiceListeners").Return(true)
277279

278280
s.SwarmListener.NotifyServices(false)
279281

@@ -309,6 +311,7 @@ func (s *SwarmListenerTestSuite) Test_NotifyNodes_WithoutCache() {
309311
},
310312
}
311313
s.NodeClientMock.On("NodeList", mock.AnythingOfType("*context.emptyCtx")).Return(expNodes, nil)
314+
s.NotifyDistributorMock.On("HasNodeListeners").Return(true)
312315

313316
s.SwarmListener.NotifyNodes(false)
314317

@@ -344,6 +347,7 @@ func (s *SwarmListenerTestSuite) Test_NotifyNodes_WithCache() {
344347
},
345348
}
346349
s.NodeClientMock.On("NodeList", mock.AnythingOfType("*context.emptyCtx")).Return(expNodes, nil)
350+
s.NotifyDistributorMock.On("HasNodeListeners").Return(true)
347351

348352
s.SwarmListener.NotifyNodes(true)
349353

0 commit comments

Comments
 (0)