22# Distributed under the terms of the Modified BSD License.
33"""Code related to managing kernels running in YARN clusters."""
44
5+ import asyncio
56import os
67import signal
7- import time
88import logging
99import errno
1010import socket
1313from remote_kernel_provider .launcher import launch_kernel
1414from remote_kernel_provider .lifecycle_manager import RemoteKernelLifecycleManager
1515from yarn_api_client .resource_manager import ResourceManager
16- from urllib .parse import urlparse
17-
18- # Default logging level of yarn-api and underlying connectionpool produce too much noise - raise to warning only.
19- logging .getLogger ('yarn_api_client.resource_manager' ).setLevel (os .getenv ('EG_YARN_LOG_LEVEL' , logging .WARNING ))
20- logging .getLogger ('urllib3.connectionpool' ).setLevel (os .environ .get ('EG_YARN_LOG_LEVEL' , logging .WARNING ))
2116
2217local_ip = localinterfaces .public_ips ()[0 ]
2318poll_interval = float (os .getenv ('EG_POLL_INTERVAL' , '0.5' ))
2419max_poll_attempts = int (os .getenv ('EG_MAX_POLL_ATTEMPTS' , '10' ))
2520yarn_shutdown_wait_time = float (os .getenv ('EG_YARN_SHUTDOWN_WAIT_TIME' , '15.0' ))
2621
22+ # Default logging level of the underlying modules produce too much noise - handle levels seperate from app
23+ logging .getLogger ('yarn_api_client' ).setLevel (os .getenv ('YARN_API_CLIENT_LOG_LEVEL' , logging .INFO ))
24+ logging .getLogger ('urllib3' ).setLevel (os .environ .get ('URLLIB_LOG_LEVEL' , logging .WARNING ))
25+
26+
27+ def configure_yarn_api_client_logger ():
28+ # This is probably due to how Jupyter (traitlets) are configuring the logger,
29+ # but, for whatever reason, the yarn_api_client format is not that of Jupyter's.
30+ # As a result, we'll configure the logger format to use a similar style.
31+ # TODO: we should look into reaching up to the Application and pulling its logger settings.
32+ logger = logging .getLogger ("yarn_api_client" )
33+ if not logger .handlers :
34+ # Permit formatting customization via env
35+ log_format = os .environ .get ("YARN_API_CLIENT_LOG_FORMAT" ,
36+ "[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s] %(message)s" )
37+ date_format = os .environ .get ("YARN_API_CLIENT_DATE_FORMAT" , "%Y-%m-%d %H:%M:%S" )
38+
39+ _log_handler = logging .StreamHandler ()
40+ _log_handler .setFormatter (logging .Formatter (fmt = log_format , datefmt = date_format ))
41+ logger .addHandler (_log_handler )
42+ logger .propagate = False
43+
44+
45+ configure_yarn_api_client_logger ()
46+
2747
2848class YarnKernelLifecycleManager (RemoteKernelLifecycleManager ):
2949 """Kernel lifecycle management for YARN clusters."""
@@ -34,37 +54,38 @@ def __init__(self, kernel_manager, lifecycle_config):
3454 super (YarnKernelLifecycleManager , self ).__init__ (kernel_manager , lifecycle_config )
3555 self .application_id = None
3656 self .rm_addr = None
37- self .yarn_endpoint \
38- = lifecycle_config .get ('yarn_endpoint' ,
39- kernel_manager .app_config .get ('yarn_endpoint' ))
40- self .alt_yarn_endpoint \
41- = lifecycle_config .get ('alt_yarn_endpoint' ,
42- kernel_manager .app_config .get ('alt_yarn_endpoint' ))
43-
44- self .yarn_endpoint_security_enabled \
45- = lifecycle_config .get ('yarn_endpoint_security_enabled' ,
46- kernel_manager .app_config .get ('yarn_endpoint_security_enabled' , False ))
47-
48- yarn_master = alt_yarn_master = None
49- yarn_port = alt_yarn_port = None
57+
58+ # We'd like to have the kernel.json values override the globally configured values but because
59+ # 'null' is the default value for these (and means to go with the local endpoint), we really
60+ # can't do that elegantly. This means that the global setting will be used only if the kernel.json
61+ # value is 'null' (None). For those configurations that want to use the local endpoint, they should
62+ # just avoid setting these altogether.
63+ self .yarn_endpoint = lifecycle_config .get (
64+ 'yarn_endpoint' , kernel_manager .provider_config .get ('yarn_endpoint' ))
65+
66+ self .alt_yarn_endpoint = lifecycle_config .get (
67+ 'alt_yarn_endpoint' , kernel_manager .provider_config .get ('alt_yarn_endpoint' ))
68+
69+ self .yarn_endpoint_security_enabled = lifecycle_config .get (
70+ 'yarn_endpoint_security_enabled' ,
71+ kernel_manager .provider_config .get ('yarn_endpoint_security_enabled' , False ))
72+
73+ endpoints = None
5074 if self .yarn_endpoint :
51- yarn_url = urlparse (self .yarn_endpoint )
52- yarn_master = yarn_url .hostname
53- yarn_port = yarn_url .port
75+ endpoints = [self .yarn_endpoint ]
76+
5477 # Only check alternate if "primary" is set.
5578 if self .alt_yarn_endpoint :
56- alt_yarn_url = urlparse (self .alt_yarn_endpoint )
57- alt_yarn_master = alt_yarn_url .hostname
58- alt_yarn_port = alt_yarn_url .port
79+ endpoints .append (self .alt_yarn_endpoint )
80+
81+ auth = None
82+ if self .yarn_endpoint_security_enabled :
83+ from requests_kerberos import HTTPKerberosAuth
84+ auth = HTTPKerberosAuth ()
5985
60- self .resource_mgr = ResourceManager (address = yarn_master ,
61- port = yarn_port ,
62- alt_address = alt_yarn_master ,
63- alt_port = alt_yarn_port ,
64- kerberos_enabled = self .yarn_endpoint_security_enabled )
86+ self .resource_mgr = ResourceManager (service_endpoints = endpoints , auth = auth )
6587
66- host , port = self .resource_mgr .get_active_host_port ()
67- self .rm_addr = host + ':' + str (port )
88+ self .rm_addr = self .resource_mgr .get_active_endpoint ()
6889
6990 # TODO - fix wait time - should just add member to k-m.
7091 # YARN applications tend to take longer than the default 5 second wait time. Rather than
@@ -76,9 +97,9 @@ def __init__(self, kernel_manager, lifecycle_config):
7697 self .log .debug ("{class_name} shutdown wait time adjusted to {wait_time} seconds." .
7798 format (class_name = type (self ).__name__ , wait_time = kernel_manager .shutdown_wait_time ))
7899
79- def launch_process (self , kernel_cmd , ** kwargs ):
100+ async def launch_process (self , kernel_cmd , ** kwargs ):
80101 """Launches the specified process within a YARN cluster environment."""
81- super (YarnKernelLifecycleManager , self ).launch_process (kernel_cmd , ** kwargs )
102+ await super (YarnKernelLifecycleManager , self ).launch_process (kernel_cmd , ** kwargs )
82103
83104 # launch the local run.sh - which is configured for yarn-cluster...
84105 self .local_proc = launch_kernel (kernel_cmd , ** kwargs )
@@ -87,7 +108,7 @@ def launch_process(self, kernel_cmd, **kwargs):
87108
88109 self .log .debug ("Yarn cluster kernel launched using YARN RM address: {}, pid: {}, Kernel ID: {}, cmd: '{}'"
89110 .format (self .rm_addr , self .local_proc .pid , self .kernel_id , kernel_cmd ))
90- self .confirm_remote_startup ()
111+ await self .confirm_remote_startup ()
91112
92113 return self
93114
@@ -127,7 +148,7 @@ def send_signal(self, signum):
127148 # signum value because altternate interrupt signals might be in play.
128149 return super (YarnKernelLifecycleManager , self ).send_signal (signum )
129150
130- def kill (self ):
151+ async def kill (self ):
131152 """Kill a kernel.
132153 :return: None if the application existed and is not in RUNNING state, False otherwise.
133154 """
@@ -139,39 +160,39 @@ def kill(self):
139160 i = 1
140161 state = self ._query_app_state_by_id (self .application_id )
141162 while state not in YarnKernelLifecycleManager .final_states and i <= max_poll_attempts :
142- time .sleep (poll_interval )
163+ await asyncio .sleep (poll_interval )
143164 state = self ._query_app_state_by_id (self .application_id )
144165 i = i + 1
145166
146167 if state in YarnKernelLifecycleManager .final_states :
147168 result = None
148169
149170 if result is False : # We couldn't terminate via Yarn, try remote signal
150- result = super (YarnKernelLifecycleManager , self ).kill ()
171+ result = await super (YarnKernelLifecycleManager , self ).kill ()
151172
152173 self .log .debug ("YarnKernelLifecycleManager.kill, application ID: {}, kernel ID: {}, state: {}, result: {}"
153174 .format (self .application_id , self .kernel_id , state , result ))
154175 return result
155176
156- def cleanup (self ):
177+ async def cleanup (self ):
157178 """"""
158179 # we might have a defunct process (if using waitAppCompletion = false) - so poll, kill, wait when we have
159180 # a local_proc.
160181 if self .local_proc :
161182 self .log .debug ("YarnKernelLifecycleManager.cleanup: Clearing possible defunct process, pid={}..." .
162183 format (self .local_proc .pid ))
163184 if super (YarnKernelLifecycleManager , self ).poll ():
164- super (YarnKernelLifecycleManager , self ).kill ()
165- super (YarnKernelLifecycleManager , self ).wait ()
185+ await super (YarnKernelLifecycleManager , self ).kill ()
186+ await super (YarnKernelLifecycleManager , self ).wait ()
166187 self .local_proc = None
167188
168189 # reset application id to force new query - handles kernel restarts/interrupts
169190 self .application_id = None
170191
171192 # for cleanup, we should call the superclass last
172- super (YarnKernelLifecycleManager , self ).cleanup ()
193+ await super (YarnKernelLifecycleManager , self ).cleanup ()
173194
174- def confirm_remote_startup (self ):
195+ async def confirm_remote_startup (self ):
175196 """ Confirms the yarn application is in a started state before returning. Should post-RUNNING states be
176197 unexpectedly encountered (FINISHED, KILLED) then we must throw, otherwise the rest of the server will
177198 believe its talking to a valid kernel.
@@ -181,7 +202,7 @@ def confirm_remote_startup(self):
181202 ready_to_connect = False # we're ready to connect when we have a connection file to use
182203 while not ready_to_connect :
183204 i += 1
184- self .handle_timeout ()
205+ await self .handle_timeout ()
185206
186207 if self ._get_application_id (True ):
187208 # Once we have an application ID, start monitoring state, obtain assigned host and get connection info
@@ -196,7 +217,7 @@ def confirm_remote_startup(self):
196217 format (i , app_state , self .assigned_host , self .kernel_id , self .application_id ))
197218
198219 if self .assigned_host != '' :
199- ready_to_connect = self .receive_connection_info ()
220+ ready_to_connect = await self .receive_connection_info ()
200221 else :
201222 self .detect_launch_failure ()
202223
@@ -215,9 +236,9 @@ def _get_application_state(self):
215236 self .assigned_ip = socket .gethostbyname (self .assigned_host )
216237 return app_state
217238
218- def handle_timeout (self ):
239+ async def handle_timeout (self ):
219240 """Checks to see if the kernel launch timeout has been exceeded while awaiting connection info."""
220- time .sleep (poll_interval )
241+ await asyncio .sleep (poll_interval )
221242 time_interval = RemoteKernelLifecycleManager .get_time_diff (self .start_time )
222243
223244 if time_interval > self .kernel_launch_timeout :
@@ -234,7 +255,7 @@ def handle_timeout(self):
234255 else :
235256 reason = "App {} is RUNNING, but waited too long ({} secs) to get connection file. " \
236257 "Check YARN logs for more information." .format (self .application_id , self .kernel_launch_timeout )
237- self .kill ()
258+ await self .kill ()
238259 timeout_message = "KernelID: '{}' launch timeout due to: {}" .format (self .kernel_id , reason )
239260 self .log_and_raise (http_status_code = error_http_code , reason = timeout_message )
240261
0 commit comments