-
Notifications
You must be signed in to change notification settings - Fork 440
TEZ-4007: Introduce AmExtensions and Zookeeper-based FrameworkServices #427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
| * Curator/Zookeeper impl of AMRegistryClient | ||
| */ | ||
| @InterfaceAudience.Public | ||
| public class ZkAMRegistryClient extends AMRegistryClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. What is the duty of ZooKeeper here? When I read this patch a while ago, I presumed ZooKeeper would be used for two reasons. My memory might be wrong.
- Generate a unique application ID
- Service discovery for application masters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@okumin: both:
- ZkAMRegistry.generateNewId takes care of generating applicationId (backed by zookeeper)
- it also for service discovery (AM registers itself, client receives updates about that)
I'm happy to see that you're interested in the area!
please be aware that it's under refactor, I'll let you know, when it's ready
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Background: I wonder if it can be independent of ZooKeeper technically. The reliability and fault-tolerance of ZK are beneficial in many environments, but in other cases, high availability is not necessary. Trino does not provide fault-tolerance with its coordinator, and some users appreciate the ease of use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely! be aware that these patches have been working for years in downstream distributions. This PR is about contributing them back, so apparently, it's ZK the long-term plan is to improve abstractions and make it more pluggable, and replace ZK with Kubernetes etcd, for example, but the default (yarn-based) behavior doesn't utilize an AM registry at all
can you please elaborate on how Trino's lack of fault tolerance comes to this picture? I mean, this ZK-based registry is more about AM service discovery, not fault tolerance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi folks, sorry, I'd like to chime in with a question here. :)
From the description of TEZ-3991, this feature seems to allow the AM to manage its own lifecycle, while clients such as HiveServer2 only need to discover available AMs via ZooKeeper (ZK) and submit tasks.
Based on my current limited understanding, the benefits of this feature should include reducing the burden on HS2 and potentially making task submission more efficient.
However, I have a few questions:
-
Is this feature primarily beneficial for LLAP mode? Since LLAP requires long-running Tez AMs, and IMO this feature seems more useful for long-running tasks.
-
Could this functionality serve as preliminary groundwork for decoupling the Tez framework from YARN? For example, could it pave the way for Tez to run directly on Kubernetes without relying on YARN for resource management?
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be aware that these patches have been working for years in downstream distributions
Sounds excellent.
can you please elaborate on how Trino's lack of fault tolerance comes to this picture?
Trino must have SPOF, but still attracts users. Hive on Tez (with LLAP) without fault tolerance is easier to set up than Hive on Tez with fault tolerance. I expect the easy quick start should attract users.
4702ed6 to
c24caeb
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
d3c969a to
b3ae071
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
ec30327 to
db11fb6
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
2e44662 to
eec88ab
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
|
🎊 +1 overall
This message was automatically generated. |
ayushtkn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments. Rest LGTM
| if (other instanceof AMRecord otherRecord) { | ||
| return appId.equals(otherRecord.appId) | ||
| && host.equals(otherRecord.host) | ||
| && hostName.equals(otherRecord.hostName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this lead to NPE, we don't have any default here
this.hostName = serviceRecord.get(HOST_NAME_RECORD_KEY);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is initialized from DAGAppMaster:
https://github.com/apache/tez/pull/427/files#diff-54ba4a2af15261379079ed5a9c1f9eea52da7bdd3f1109fe7b96d4abf07f6173R677-R678
AMRecord amRecord = amRegistry.createAmRecord(appId, rpcServerAddress.getHostName(),
rpcServerAddress.getAddress().getHostAddress(), rpcServerAddress.getPort(), computeName);
hostName is not supposed to be null, so if it's null, it's already a fatal issue, which is revealed by NPE, I don't think we make this better
|
|
||
| @Override | ||
| public String toString() { | ||
| return toServiceRecord().attributes().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toString calls toServiceRecord, So, it can lead to initializing the serviceRecord = new ServiceRecord();, are we ok with it. This could lead to issues later, like someone logged it or so, before setting the values and then we have the service record cached as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no "before setting the values" scenario given only final fields, I'm assuming the the logging must happen with the AMRecord instance when the constructor is definitely finished
| AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port, | ||
| String computeName); | ||
|
|
||
| void close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AutoCloseable already defines close, do we need to define again here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
| void add(AMRecord server) throws Exception; | ||
|
|
||
| void remove(AMRecord server) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be server or record
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record is better, fixing this
| * <p>Listeners may be registered to receive notifications when AM records | ||
| * appear or are removed.</p> | ||
| */ | ||
| public abstract class AMRegistryClient implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do implements AutoCloseable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can, but it doesn't make the code better: this class is usually used as field, not local variable in try-catch, this applies to production as well as testing code, so I think Closeable might be better here
| @Override | ||
| public void start() { | ||
| try { | ||
| amRegistryClient.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should return if isRunning is true here
|
|
||
| public class ZkFrameworkClient extends FrameworkClient { | ||
|
|
||
| private AMRecord amRecord; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be volatile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should, also, tracking amHost and amPort separately doesn't make sense, let's just rely on a single volatile record field
| } else if (modeInEnv != null) { | ||
| return getByMode(interfaze, modeInEnv); | ||
| } else if (defaultClazz != null) { | ||
| return (T) defaultClazz.newInstance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is deprecated, does this work
defaultClazz.getDeclaredConstructor().newInstance();
| String modeInConf = conf != null ? conf.get(TezConfiguration.TEZ_FRAMEWORK_MODE) : null; | ||
| String modeInEnv = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); | ||
| try { | ||
| if (modeInConf != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be StringUtils.isNotEmpty()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, much cleaner and takes care of empty string too
| .thenReturn(YarnApplicationState.RUNNING); | ||
|
|
||
| //Client 1 start | ||
| client.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing close?
…s - initial patch
…s - checkstyle, spotbugs, javadoc improvements, refactor, test fixes
This comment was marked as outdated.
This comment was marked as outdated.
|
@ayushtkn : address your comments, good catches, thanks! |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
okumin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for porting the nice feature 👍
|
|
||
| if (sessionToken == null) { | ||
| throw new RuntimeException("Could not find session token in AM Credentials"); | ||
| sessionToken = TokenCache.getSessionToken(amCredentials); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this double-check help?
- YARN -> When
sessionTokenis null,TokenCache.getSessionToken(amCredentials)is probably always null - ZK ->
sessionTokenis not always null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, so the extra attempt to fetch it from session token doesn't make sense, let me change this to:
if (sessionToken == null) {
throw new RuntimeException("Could not find session token in AM Credentials");
}
| import org.apache.tez.frameworkplugins.ClientFrameworkService; | ||
|
|
||
| public class ZkStandaloneClientFrameworkService implements ClientFrameworkService { | ||
| @Override public FrameworkClient newFrameworkClient() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @Override public FrameworkClient newFrameworkClient() { | |
| @Override | |
| public FrameworkClient newFrameworkClient() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, thanks!
|
(!) A patch to the testing environment has been detected. |
|
💔 -1 overall
This message was automatically generated. |
|
hm, something happened to spotbugs, even the latest version didn't help: |
ayushtkn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanx @abstractdog for the nice work & bearing with my slow reviews :-)
|
Thanks, @ayushtkn, for the approval! |
|
Thank you! @abstractdog I can't wait to experience the features of Tez on Cloud. Let's move on to the next step! |
|
💔 -1 overall
This message was automatically generated. |
This PR contains a Zookeeper-based Framework service and a major refactor in this area.
Pieces of this PR: