Skip to content

Commit 6d47a88

Browse files
authored
[release]Support for Polling
This release adds the following features: - Adds environment variables `DF_SERVICE_POLLING_INTERVAL` and `DF_USE_DOCKER_SERVICE_EVENTS` to configure service polling. Details can be found at the [Configuration documentation](http://swarmlistener.dockerflow.com/config/). - Adds environment variables `DF_NODE_POLLING_INTERVAL` and `DF_USE_DOCKER_NODE_EVENTS` to configure node polling. Details can be found at the [Configuration documentation](http://swarmlistener.dockerflow.com/config/). - More robust handling of events and sending out notifications.
1 parent cf38abd commit 6d47a88

File tree

11 files changed

+434
-31
lines changed

11 files changed

+434
-31
lines changed

Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ ENV DF_DOCKER_HOST="unix:///var/run/docker.sock" \
1818
DF_NOTIFY_LABEL="com.df.notify" \
1919
DF_INCLUDE_NODE_IP_INFO="false" \
2020
DF_SERVICE_POLLING_INTERVAL="-1" \
21-
DF_USE_DOCKER_SERVICE_EVENTS="true"
21+
DF_USE_DOCKER_SERVICE_EVENTS="true" \
22+
DF_NODE_POLLING_INTERVAL="-1" \
23+
DF_USE_DOCKER_NODE_EVENTS="true"
2224

2325
EXPOSE 8080
2426

args.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ import (
77

88
type args struct {
99
ServicePollingInterval int
10+
NodePollingInterval int
1011
Retry int
1112
RetryInterval int
1213
}
1314

1415
func getArgs() *args {
1516
return &args{
1617
ServicePollingInterval: getValue(-1, "DF_SERVICE_POLLING_INTERVAL"),
17-
Retry: getValue(1, "DF_RETRY"),
18-
RetryInterval: getValue(0, "DF_RETRY_INTERVAL"),
18+
NodePollingInterval: getValue(-1, "DF_NODE_POLLING_INTERVAL"),
19+
Retry: getValue(1, "DF_RETRY"),
20+
RetryInterval: getValue(0, "DF_RETRY_INTERVAL"),
1921
}
2022
}
2123

docs/config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,5 @@ The following environment variables can be used when creating the `swarm-listene
1515
|DF_RETRY_INTERVAL |Time between each notificationo request retry, in seconds.<br>**Default**: `5`<br>**Example**:`10`|
1616
|DF_SERVICE_POLLING_INTERVAL |Time between each service polling request, in seconds. When this value is set less than or equal to zero, service polling is disabled.<br>**Default**: `-1`<br>**Example**:`20`|
1717
|DF_USE_DOCKER_SERVICE_EVENTS|Use docker events api to get service updates.<br>**Default**:`true`|
18+
|DF_NODE_POLLING_INTERVAL |Time between each node polling request, in seconds. When this value is set less than or equal to zero, node polling is disabled.<br>**Default**: `-1`<br>**Example**:`20`|
19+
|DF_USE_DOCKER_NODE_EVENTS|Use docker events api to get node updates.<br>**Default**:`true`|

main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ func main() {
1212

1313
l.Printf("Starting Docker Flow: Swarm Listener")
1414
args := getArgs()
15-
swarmListener, err := service.NewSwarmListenerFromEnv(args.Retry, args.RetryInterval, args.ServicePollingInterval, l)
15+
swarmListener, err := service.NewSwarmListenerFromEnv(
16+
args.Retry, args.RetryInterval,
17+
args.ServicePollingInterval, args.NodePollingInterval, l)
1618
if err != nil {
1719
l.Printf("Failed to initialize Docker Flow: Swarm Listener")
1820
l.Printf("ERROR: %v", err)

service/mocks.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,16 @@ func (m *nodeCacherMock) Get(ID string) (NodeMini, bool) {
131131
return args.Get(0).(NodeMini), args.Bool(1)
132132
}
133133

134+
func (m *nodeCacherMock) IsNewOrUpdated(n NodeMini) bool {
135+
args := m.Called(n)
136+
return args.Bool(0)
137+
}
138+
139+
func (m *nodeCacherMock) Keys() map[string]struct{} {
140+
args := m.Called()
141+
return args.Get(0).(map[string]struct{})
142+
}
143+
134144
type notifyDistributorMock struct {
135145
mock.Mock
136146
}
@@ -154,3 +164,11 @@ type swarmServicePollingMock struct {
154164
func (m *swarmServicePollingMock) Run(eventChan chan<- Event) {
155165
m.Called(eventChan)
156166
}
167+
168+
type nodePollingMock struct {
169+
mock.Mock
170+
}
171+
172+
func (m *nodePollingMock) Run(eventChan chan<- Event) {
173+
m.Called(eventChan)
174+
}

service/nodecache.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
package service
22

3+
import "sync"
4+
35
// NodeCacher caches sevices
46
type NodeCacher interface {
57
InsertAndCheck(n NodeMini) bool
8+
IsNewOrUpdated(n NodeMini) bool
69
Delete(ID string)
710
Get(ID string) (NodeMini, bool)
11+
Keys() map[string]struct{}
812
}
913

1014
// NodeCache implements `NodeCacher`
1115
// Not threadsafe!
1216
type NodeCache struct {
1317
cache map[string]NodeMini
18+
mux sync.RWMutex
1419
}
1520

1621
// NewNodeCache creates a new `NewNodeCache`
@@ -23,6 +28,9 @@ func NewNodeCache() *NodeCache {
2328
// InsertAndCheck inserts `NodeMini` into cache
2429
// If the node is new or updated `InsertAndCheck` returns true.
2530
func (c *NodeCache) InsertAndCheck(n NodeMini) bool {
31+
c.mux.Lock()
32+
defer c.mux.Unlock()
33+
2634
cachedNode, ok := c.cache[n.ID]
2735
c.cache[n.ID] = n
2836

@@ -31,11 +39,37 @@ func (c *NodeCache) InsertAndCheck(n NodeMini) bool {
3139

3240
// Delete removes node from cache
3341
func (c *NodeCache) Delete(ID string) {
42+
c.mux.Lock()
43+
defer c.mux.Unlock()
44+
3445
delete(c.cache, ID)
3546
}
3647

3748
// Get gets node from cache
38-
func (c NodeCache) Get(ID string) (NodeMini, bool) {
49+
func (c *NodeCache) Get(ID string) (NodeMini, bool) {
50+
c.mux.RLock()
51+
defer c.mux.RUnlock()
52+
3953
v, ok := c.cache[ID]
4054
return v, ok
4155
}
56+
57+
// IsNewOrUpdated returns true if node is new or updated
58+
func (c *NodeCache) IsNewOrUpdated(n NodeMini) bool {
59+
c.mux.RLock()
60+
defer c.mux.RUnlock()
61+
62+
cachedNode, ok := c.cache[n.ID]
63+
return !ok || !n.Equal(cachedNode)
64+
}
65+
66+
// Keys return the keys of the cache
67+
func (c *NodeCache) Keys() map[string]struct{} {
68+
c.mux.RLock()
69+
defer c.mux.RUnlock()
70+
output := map[string]struct{}{}
71+
for key := range c.cache {
72+
output[key] = struct{}{}
73+
}
74+
return output
75+
}

service/nodecache_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,43 @@ func (s *NodeCacheTestSuite) Test_GetAndRemove_NotInCache_ReturnsFalse() {
149149
s.False(ok)
150150
}
151151

152+
func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeInCache() {
153+
s.Cache.InsertAndCheck(s.NMini)
154+
s.AssertInCache(s.NMini)
155+
156+
newOrUpdated := s.Cache.IsNewOrUpdated(s.NMini)
157+
s.False(newOrUpdated)
158+
}
159+
160+
func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeNotInCache() {
161+
newOrUpdated := s.Cache.IsNewOrUpdated(s.NMini)
162+
s.True(newOrUpdated)
163+
}
164+
165+
func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeIsDifferentCache() {
166+
167+
s.Cache.InsertAndCheck(s.NMini)
168+
s.AssertInCache(s.NMini)
169+
170+
anotherNMini := getNewNodeMini()
171+
anotherNMini.State = swarm.NodeStateDown
172+
173+
newOrUpdated := s.Cache.IsNewOrUpdated(anotherNMini)
174+
s.True(newOrUpdated)
175+
176+
}
177+
178+
func (s *NodeCacheTestSuite) Test_Keys() {
179+
s.Cache.InsertAndCheck(s.NMini)
180+
s.AssertInCache(s.NMini)
181+
182+
keys := s.Cache.Keys()
183+
184+
s.Require().Len(keys, 1)
185+
s.Contains(keys, s.NMini.ID)
186+
187+
}
188+
152189
func (s *NodeCacheTestSuite) AssertInCache(nm NodeMini) {
153190
ss, ok := s.Cache.Get(nm.ID)
154191
s.True(ok)

service/nodepoller.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
"github.com/docker/docker/api/types/swarm"
9+
)
10+
11+
// NodePolling provides an interface for polling node changes
12+
type NodePolling interface {
13+
Run(eventChan chan<- Event)
14+
}
15+
16+
// NodePoller implements `NodePolling`
17+
type NodePoller struct {
18+
Client NodeInspector
19+
Cache NodeCacher
20+
PollingInterval int
21+
MinifyFunc func(swarm.Node) NodeMini
22+
Log *log.Logger
23+
}
24+
25+
// NewNodePoller creates a new `NodePoller`
26+
func NewNodePoller(
27+
client NodeInspector,
28+
cache NodeCacher,
29+
pollingInterval int,
30+
minifyFunc func(swarm.Node) NodeMini,
31+
log *log.Logger,
32+
) *NodePoller {
33+
return &NodePoller{
34+
Client: client,
35+
Cache: cache,
36+
PollingInterval: pollingInterval,
37+
MinifyFunc: minifyFunc,
38+
Log: log,
39+
}
40+
}
41+
42+
// Run starts poller and places events onto `eventChan`
43+
func (n NodePoller) Run(eventChan chan<- Event) {
44+
45+
if n.PollingInterval <= 0 {
46+
return
47+
}
48+
49+
ctx := context.Background()
50+
51+
n.Log.Printf("Polling for Node Changes")
52+
time.Sleep(time.Duration(n.PollingInterval) * time.Second)
53+
54+
for {
55+
nodes, err := n.Client.NodeList(ctx)
56+
if err != nil {
57+
n.Log.Printf("ERROR (NodePoller): %v", err)
58+
} else {
59+
nowTimeNano := time.Now().UTC().UnixNano()
60+
keys := n.Cache.Keys()
61+
for _, node := range nodes {
62+
delete(keys, node.ID)
63+
64+
nodeMini := n.MinifyFunc(node)
65+
if n.Cache.IsNewOrUpdated(nodeMini) {
66+
eventChan <- Event{
67+
Type: EventTypeCreate,
68+
ID: node.ID,
69+
TimeNano: nowTimeNano,
70+
UseCache: true,
71+
}
72+
}
73+
}
74+
75+
// Remaining key sare removal events
76+
for k := range keys {
77+
eventChan <- Event{
78+
Type: EventTypeRemove,
79+
ID: k,
80+
TimeNano: nowTimeNano,
81+
UseCache: true,
82+
}
83+
}
84+
}
85+
time.Sleep(time.Duration(n.PollingInterval) * time.Second)
86+
}
87+
}

0 commit comments

Comments
 (0)