Skip to content

Commit fe14f1c

Browse files
authored
ENH: Adds service polling (#11)
1 parent 57c3ff9 commit fe14f1c

File tree

11 files changed

+422
-14
lines changed

11 files changed

+422
-14
lines changed

Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ ENV DF_DOCKER_HOST="unix:///var/run/docker.sock" \
1616
DF_RETRY="50" \
1717
DF_RETRY_INTERVAL="5" \
1818
DF_NOTIFY_LABEL="com.df.notify" \
19-
DF_INCLUDE_NODE_IP_INFO="false"
19+
DF_INCLUDE_NODE_IP_INFO="false" \
20+
DF_SERVICE_POLLING_INTERVAL="-1" \
21+
DF_USE_DOCKER_SERVICE_EVENTS="true"
2022

2123
EXPOSE 8080
2224

args.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import (
66
)
77

88
type args struct {
9-
Retry int
10-
RetryInterval int
9+
ServicePollingInterval int
10+
Retry int
11+
RetryInterval int
1112
}
1213

1314
func getArgs() *args {
1415
return &args{
16+
ServicePollingInterval: getValue(-1, "DF_SERVICE_POLLING_INTERVAL"),
1517
Retry: getValue(1, "DF_RETRY"),
1618
RetryInterval: getValue(0, "DF_RETRY_INTERVAL"),
1719
}

docs/config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ The following environment variables can be used when creating the `swarm-listene
1313
|DF_NOTIFY_REMOVE_NODE_URL |Comma separated list of URLs that will be used to send notification requests when a node is remove.<br>**Example**: `url1,url2`|
1414
|DF_RETRY |Number of notification request retries<br>**Default**: `50`<br>**Example**: `100`|
1515
|DF_RETRY_INTERVAL |Time between each notificationo request retry, in seconds.<br>**Default**: `5`<br>**Example**:`10`|
16+
|DF_SERVICE_POLLING_INTERVAL |Time between each service polling request, in seconds. When this value is set less than or equal to zero, service polling is disabled.<br>**Default**: `-1`<br>**Example**:`20`|
17+
|DF_USE_DOCKER_SERVICE_EVENTS|Use docker events api to get service updates.<br>**Default**:`true`|

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ func main() {
1212

1313
l.Printf("Starting Docker Flow: Swarm Listener")
1414
args := getArgs()
15-
swarmListener, err := service.NewSwarmListenerFromEnv(args.Retry, args.RetryInterval, l)
15+
swarmListener, err := service.NewSwarmListenerFromEnv(args.Retry, args.RetryInterval, args.ServicePollingInterval, l)
1616
if err != nil {
1717
l.Printf("Failed to initialize Docker Flow: Swarm Listener")
1818
l.Printf("ERROR: %v", err)

service/mocks.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ func (m *swarmServiceCacherMock) InsertAndCheck(ss SwarmServiceMini) bool {
6767
return args.Bool(0)
6868
}
6969

70+
func (m *swarmServiceCacherMock) IsNewOrUpdated(ss SwarmServiceMini) bool {
71+
args := m.Called(ss)
72+
return args.Bool(0)
73+
}
74+
7075
func (m *swarmServiceCacherMock) Delete(ID string) {
7176
m.Called(ID)
7277
}
@@ -81,6 +86,11 @@ func (m *swarmServiceCacherMock) Len() int {
8186
return args.Int(0)
8287
}
8388

89+
func (m *swarmServiceCacherMock) Keys() map[string]struct{} {
90+
args := m.Called()
91+
return args.Get(0).(map[string]struct{})
92+
}
93+
8494
type nodeListeningMock struct {
8595
mock.Mock
8696
}
@@ -136,3 +146,11 @@ func (m *notifyDistributorMock) HasServiceListeners() bool {
136146
func (m *notifyDistributorMock) HasNodeListeners() bool {
137147
return m.Called().Bool(0)
138148
}
149+
150+
type swarmServicePollingMock struct {
151+
mock.Mock
152+
}
153+
154+
func (m *swarmServicePollingMock) Run(eventChan chan<- Event) {
155+
m.Called(eventChan)
156+
}

service/servicecache.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import "sync"
55
// SwarmServiceCacher caches sevices
66
type SwarmServiceCacher interface {
77
InsertAndCheck(ss SwarmServiceMini) bool
8+
IsNewOrUpdated(ss SwarmServiceMini) bool
89
Delete(ID string)
910
Get(ID string) (SwarmServiceMini, bool)
1011
Len() int
12+
Keys() map[string]struct{}
1113
}
1214

1315
// SwarmServiceCache implements `SwarmServiceCacher`
@@ -36,6 +38,15 @@ func (c *SwarmServiceCache) InsertAndCheck(ss SwarmServiceMini) bool {
3638

3739
}
3840

41+
// IsNewOrUpdated returns true if service is new or updated
42+
func (c *SwarmServiceCache) IsNewOrUpdated(ss SwarmServiceMini) bool {
43+
c.mux.RLock()
44+
defer c.mux.RUnlock()
45+
46+
cachedService, ok := c.cache[ss.ID]
47+
return !ok || !ss.Equal(cachedService)
48+
}
49+
3950
// Delete delets service from cache
4051
func (c *SwarmServiceCache) Delete(ID string) {
4152
c.mux.Lock()
@@ -57,3 +68,14 @@ func (c *SwarmServiceCache) Len() int {
5768
defer c.mux.RUnlock()
5869
return len(c.cache)
5970
}
71+
72+
// Keys returns the keys of the cache
73+
func (c *SwarmServiceCache) Keys() map[string]struct{} {
74+
c.mux.RLock()
75+
defer c.mux.RUnlock()
76+
output := map[string]struct{}{}
77+
for key := range c.cache {
78+
output[key] = struct{}{}
79+
}
80+
return output
81+
}

service/servicecache_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,44 @@ func (s *SwarmServiceCacheTestSuite) Test_GetAndRemove_InCache_ReturnsSwarmServi
132132
s.Cache.Delete(s.SSMini.ID)
133133
s.AssertNotInCache(s.SSMini)
134134
s.Equal(s.SSMini, removedSSMini)
135+
}
136+
137+
func (s *SwarmServiceCacheTestSuite) Test_Keys() {
138+
s.Cache.InsertAndCheck(s.SSMini)
139+
s.AssertInCache(s.SSMini)
140+
141+
keys := s.Cache.Keys()
142+
143+
s.Require().Len(keys, 1)
144+
s.Contains(keys, s.SSMini.ID)
145+
146+
}
147+
148+
func (s *SwarmServiceCacheTestSuite) Test_IsNewOrUpdated_ServiceInCache() {
149+
s.Cache.InsertAndCheck(s.SSMini)
150+
s.AssertInCache(s.SSMini)
135151

152+
newOrUpdated := s.Cache.IsNewOrUpdated(s.SSMini)
153+
s.False(newOrUpdated)
136154
}
137155

156+
func (s *SwarmServiceCacheTestSuite) Test_IsNewOrUpdated_ServiceNotInCache() {
157+
newOrUpdated := s.Cache.IsNewOrUpdated(s.SSMini)
158+
s.True(newOrUpdated)
159+
}
160+
161+
func (s *SwarmServiceCacheTestSuite) Test_IsNewOrUpdated_ServiceIsDifferentCache() {
162+
163+
s.Cache.InsertAndCheck(s.SSMini)
164+
s.AssertInCache(s.SSMini)
165+
166+
anotherSSMini := getNewSwarmServiceMini()
167+
anotherSSMini.Name = "anotherName"
168+
169+
newOrUpdated := s.Cache.IsNewOrUpdated(anotherSSMini)
170+
s.True(newOrUpdated)
171+
172+
}
138173
func (s *SwarmServiceCacheTestSuite) Test_GetAndRemove_NotInCache_ReturnsFalse() {
139174

140175
_, ok := s.Cache.Get(s.SSMini.ID)

service/servicepoller.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
)
8+
9+
// SwarmServicePolling provides an interface for polling service changes
10+
type SwarmServicePolling interface {
11+
Run(eventChan chan<- Event)
12+
}
13+
14+
// SwarmServicePoller implements `SwarmServicePoller`
15+
type SwarmServicePoller struct {
16+
SSClient SwarmServiceInspector
17+
SSCache SwarmServiceCacher
18+
PollingInterval int
19+
MinifyFunc func(SwarmService) SwarmServiceMini
20+
Log *log.Logger
21+
}
22+
23+
// NewSwarmServicePoller creates a new `SwarmServicePoller`
24+
func NewSwarmServicePoller(
25+
ssClient SwarmServiceInspector,
26+
ssCache SwarmServiceCacher,
27+
pollingInterval int,
28+
minifyFunc func(SwarmService) SwarmServiceMini,
29+
log *log.Logger,
30+
) *SwarmServicePoller {
31+
return &SwarmServicePoller{
32+
SSClient: ssClient,
33+
SSCache: ssCache,
34+
PollingInterval: pollingInterval,
35+
MinifyFunc: minifyFunc,
36+
Log: log,
37+
}
38+
}
39+
40+
// Run starts poller and places events onto `eventChan`
41+
func (s SwarmServicePoller) Run(
42+
eventChan chan<- Event) {
43+
44+
if s.PollingInterval <= 0 {
45+
return
46+
}
47+
48+
s.Log.Printf("Polling for Service Changes")
49+
time.Sleep(time.Duration(s.PollingInterval) * time.Second)
50+
for {
51+
services, err := s.SSClient.SwarmServiceList(context.Background())
52+
if err != nil {
53+
s.Log.Printf("ERROR (SwarmServicePolling): %v", err)
54+
} else {
55+
nowTimeNano := time.Now().UTC().UnixNano()
56+
keys := s.SSCache.Keys()
57+
for _, ss := range services {
58+
delete(keys, ss.ID)
59+
ssMini := s.MinifyFunc(ss)
60+
if s.SSCache.IsNewOrUpdated(ssMini) {
61+
eventChan <- Event{
62+
Type: EventTypeCreate,
63+
ID: ss.ID,
64+
TimeNano: nowTimeNano,
65+
UseCache: true,
66+
}
67+
}
68+
}
69+
70+
// Remaining keys are removal events
71+
for k := range keys {
72+
eventChan <- Event{
73+
Type: EventTypeRemove,
74+
ID: k,
75+
TimeNano: nowTimeNano,
76+
UseCache: true,
77+
}
78+
}
79+
}
80+
time.Sleep(time.Duration(s.PollingInterval) * time.Second)
81+
}
82+
}

0 commit comments

Comments
 (0)