@@ -25,6 +25,31 @@ type workItemsStream interface {
2525 Recv () (* protos.WorkItem , error )
2626}
2727
28+ const keepaliveInterval = 30 * time .Second
29+ const keepaliveTimeout = 5 * time .Second
30+
31+ func (c * TaskHubGrpcClient ) startKeepaliveLoop (ctx context.Context ) context.CancelFunc {
32+ ctx , cancel := context .WithCancel (ctx )
33+ go func () {
34+ ticker := time .NewTicker (keepaliveInterval )
35+ defer ticker .Stop ()
36+ for {
37+ select {
38+ case <- ctx .Done ():
39+ return
40+ case <- ticker .C :
41+ callCtx , callCancel := context .WithTimeout (ctx , keepaliveTimeout )
42+ _ , err := c .client .Hello (callCtx , & emptypb.Empty {})
43+ callCancel ()
44+ if err != nil && ctx .Err () == nil {
45+ c .logger .Debugf ("keepalive failed: %v" , err )
46+ }
47+ }
48+ }
49+ }()
50+ return cancel
51+ }
52+
2853func (c * TaskHubGrpcClient ) StartWorkItemListener (ctx context.Context , r * task.TaskRegistry ) error {
2954 executor := task .NewTaskExecutor (r )
3055
@@ -52,7 +77,9 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T
5277
5378 go func () {
5479 c .logger .Info ("starting background processor" )
80+ cancelKeepalive := c .startKeepaliveLoop (ctx )
5581 defer func () {
82+ cancelKeepalive ()
5683 c .logger .Info ("stopping background processor" )
5784 // We must use a background context here as the stream's context is likely canceled
5885 shutdownErr := executor .Shutdown (context .Background ())
@@ -114,6 +141,8 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T
114141 return
115142 }
116143 c .logger .Infof ("successfully reconnected work item listener stream..." )
144+ cancelKeepalive ()
145+ cancelKeepalive = c .startKeepaliveLoop (ctx )
117146 // continue iterating
118147 continue
119148 }
0 commit comments