@@ -131,13 +131,27 @@ def create_stream(self):
131131 def nudge (self ):
132132 """Nudge the zmq connections with kernel_info_requests
133133 Returns a Future that will resolve when we have received
134- a control reply and at least one iopub message,
134+ a shell or control reply and at least one iopub message,
135135 ensuring that zmq subscriptions are established,
136136 sockets are fully connected, and kernel is responsive.
137137 Keeps retrying kernel_info_request until these are both received.
138138 """
139139 kernel = self .kernel_manager .get_kernel (self .kernel_id )
140140
141+ # Do not nudge busy kernels as kernel info requests sent to shell are
142+ # queued behind execution requests.
143+ # nudging in this case would cause a potentially very long wait
144+ # before connections are opened,
145+ # plus it is *very* unlikely that a busy kernel will not finish
146+ # establishing its zmq subscriptions before processing the next request.
147+ if getattr (kernel , "execution_state" ) == "busy" :
148+ self .log .debug ("Nudge: not nudging busy kernel %s" , self .kernel_id )
149+ f = Future ()
150+ f .set_result (None )
151+ return f
152+ # Use a transient shell channel to prevent leaking
153+ # shell responses to the front-end.
154+ shell_channel = kernel .connect_shell ()
141155 # Use a transient control channel to prevent leaking
142156 # control responses to the front-end.
143157 control_channel = kernel .connect_control ()
@@ -160,18 +174,26 @@ def cleanup(_=None):
160174 """Common cleanup"""
161175 loop .remove_timeout (nudge_handle )
162176 iopub_channel .stop_on_recv ()
177+ if not shell_channel .closed ():
178+ shell_channel .close ()
163179 if not control_channel .closed ():
164180 control_channel .close ()
165181
166182 # trigger cleanup when both message futures are resolved
167183 both_done .add_done_callback (cleanup )
168184
169- def on_control_reply (msg ):
185+ def on_shell_reply (msg ):
170186 self .log .debug ("Nudge: shell info reply received: %s" , self .kernel_id )
171187 if not info_future .done ():
172188 self .log .debug ("Nudge: resolving shell future: %s" , self .kernel_id )
173189 info_future .set_result (None )
174190
191+ def on_control_reply (msg ):
192+ self .log .debug ("Nudge: control info reply received: %s" , self .kernel_id )
193+ if not info_future .done ():
194+ self .log .debug ("Nudge: resolving control future: %s" , self .kernel_id )
195+ info_future .set_result (None )
196+
175197 def on_iopub (msg ):
176198 self .log .debug ("Nudge: IOPub received: %s" , self .kernel_id )
177199 if not iopub_future .done ():
@@ -180,6 +202,7 @@ def on_iopub(msg):
180202 iopub_future .set_result (None )
181203
182204 iopub_channel .on_recv (on_iopub )
205+ shell_channel .on_recv (on_shell_reply )
183206 control_channel .on_recv (on_control_reply )
184207 loop = IOLoop .current ()
185208
@@ -200,6 +223,12 @@ def nudge(count):
200223 finish ()
201224 return
202225
226+ # check for closed zmq socket
227+ if shell_channel .closed ():
228+ self .log .debug ("Nudge: cancelling on closed zmq socket: %s" , self .kernel_id )
229+ finish ()
230+ return
231+
203232 # check for closed zmq socket
204233 if control_channel .closed ():
205234 self .log .debug ("Nudge: cancelling on closed zmq socket: %s" , self .kernel_id )
@@ -209,6 +238,7 @@ def nudge(count):
209238 if not both_done .done ():
210239 log = self .log .warning if count % 10 == 0 else self .log .debug
211240 log ("Nudge: attempt %s on kernel %s" % (count , self .kernel_id ))
241+ self .session .send (shell_channel , "kernel_info_request" )
212242 self .session .send (control_channel , "kernel_info_request" )
213243 nonlocal nudge_handle
214244 nudge_handle = loop .call_later (0.5 , nudge , count )
0 commit comments