-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15226 Allow ConsumeKafka to use static partition mapping #10538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
NIFI-15226 Allow ConsumeKafka to use static partition mapping #10538
Conversation
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.nifi</groupId> | ||
| <artifactId>nifi-framework-nar-utils</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency appears to be unused, can it be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.nifi.mock.MockComponentLogger is referenced in TestConsumerPartitionsUtil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying. In that case, this dependency should be removed and MockComponentLogger should be replaced with MockComponentLog from nifi-mock to avoid referencing framework modules in extension modules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I preserved this class from the older change. But actually we don't need either. We can just mock the logger with Mockito.
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial build fails on the following tests:
Error: ConsumeKafkaTest.testVerifyFailed:116 » NullPointer Cannot invoke "org.apache.nifi.kafka.service.api.consumer.PollingContext.getTopics()" because "this.pollingContext" is null
Error: ConsumeKafkaTest.testVerifySuccessful:99 » NullPointer Cannot invoke "org.apache.nifi.kafka.service.api.consumer.PollingContext.getTopics()" because "this.pollingContext" is null
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...afka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
Outdated
Show resolved
Hide resolved
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work on this @tpalfy. I noted a handful of minor recommendations, and plan to take a closer look at some of the implementation details.
| private final AutoOffsetReset autoOffsetReset; | ||
|
|
||
| public Subscription(final String groupId, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) { | ||
| public Subscription(final String groupId, final Integer partition, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very minor, but I recommend placing partition after topics to align with general hierarchy:
| public Subscription(final String groupId, final Integer partition, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) { | |
| public Subscription(final String groupId, final Collection<String> topics, final Integer partition, final AutoOffsetReset autoOffsetReset) { |
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
| import static org.mockito.Mockito.mock; | ||
|
|
||
| public class TestConsumerPartitionsUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor note, the public modifiers on the class and method level are not necessary for JUnit 5. Since this is a new test class, recommend removing them.
| } | ||
|
|
||
| @Test | ||
| public void testNoPartitionAssignments() throws UnknownHostException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnknownHostException does not appear to be thrown in this and other methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerPartitionsUtil.getPartitionsForHost throws it.
...sors/src/test/java/org/apache/nifi/kafka/processors/consumer/TestConsumerPartitionsUtil.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
| return !hostnameToPartitionMapping.isEmpty(); | ||
| } | ||
|
|
||
| public static int getPartitionAssignmentCount(final Map<String, String> properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend moving all public methods before all private methods in this class.
...ocessors/src/main/java/org/apache/nifi/kafka/processors/consumer/ConsumerPartitionsUtil.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tpalfy Tested with different scenarios, including fewer or more concurrent tasks than partitions, and it works as expected.
Added one minor comment inline and one more thing here: There is some legacy code referencing partition properties in DynamicPropertyValidator. Please clean this up, as the partition dynamic properties are not applied to the Kafka controller service.
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Show resolved
Hide resolved
…nstead of just having it commented out)
6269ef2 to
d1da7f6
Compare
turcsanyip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the latest changes @tpalfy.
+1 from my side.
|
Thanks for the updates @tpalfy and thanks for the review @turcsanyip, I will follow up and also review the latest changes soon. |
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the adjustments @tpalfy, and thanks for the review @turcsanyip.
On a more detailed review, I am concerned about the level of complexity introduced with these changes. The ConsumerPartitionsUtil is very complex with implementation details based on current host and dynamic properties. The addition of partition-based dynamic property names, plus host names, makes the configuration non-portable between NiFi installations. This complexity is also evident in the handling of KafkaConsumerService instances.
For these reasons, I am not supportive of the current proposed set of changes.
The historical implementation grew very complex and very difficult to maintain, so considering other options would be helpful.
One option that comes to mind is a separate Processor, named something like ConsumeKafkaPartition. That would clearly communicate the purpose, allowing for more focused logic. I would not attempt to extend the current ConsumeKafka Processor, so copying some the existing implementation would be reasonable. As much of the handling logic is in other classes, there should be some code reuse opportunities, without a subclass.
I'm open to other options that would avoid introducing this level of complexity to the current Processor.
|
Thanks for sharing your thoughts and concerns @exceptionfactory. I agree that Considering that the change does not affect the existing users of the processor (no config change, portability issue does not arise if the feature is not used), I do not think it would be worth adding a new processor that requires duplicated code leading to additional maintenance effort and issues going forward. As far as I can recall, the original plan of migrating to Kafka 3 processors was to implement the core functionality in the first round and add the extra features later. Based on this, and considering that a separate implementation would also introduce complexity and non-portability issues via |
|
Thanks for the thoughtful reply @turcsanyip. Although this capability and general approach was implemented in NiFi 1, part of the purpose of refactoring Kafka support was to provide better decoupling of features. The majority of that separation comes with the Kafka Connection Service, and it is good to see that these proposed changes have minimal impact on that Controller Service interface surface. With that being said, the proposed changes do introduce significant logic into A separate Processor also has benefits from a flow design perspective, as it provides a clearer distinction on the intended use case of static partition assignment to individual NiFi nodes. Given that such a use case is less common, keeping it distinct from the regular |
|
@exceptionfactory Can you please provide some objective reasoning for considering the ConsumerPartitionsUtil too complex? Here are my objective reasoning to argue against it:
Also there's an obvious contradiction in adding a separate processor. Code duplication would mean that basically all the changes done to ConsumeKafka would need to be carried over to the new class. Now that would be a maintenance nightmare. According to the original Jira https://issues.apache.org/jira/browse/NIFI-11259, it was agreed upon that full compatibility will be achieved by adding this feature later. There is a Jira (still open) that proves this:https://issues.apache.org/jira/browse/NIFI-13654 Here's the implementation as agreed upon, following the original as closely as possible, which was proven to be working and stable by the previous change history. Can't see how we can get better than that. Unless more adequate reasons are provided to deny this change, this should move forward. |
|
@tpalfy I think we can all agree @exceptionfactory has done amazing work increasing quality and decreasing technical debt. We need more of what he does - not less. He is also far and away the most prolific reviewer which we also need more people doing. Declaring his or any reviewers response as inadequate doesnt help your case. Stating his feedback isnt objective doesnt help either. |
|
@joewitt Thank you for the input. I would be interested in your opinion as well about the PR itself. As I see, the original implementation being too complex and hard to maintain (which was the original argument) is not supported by the change history. It did need 2 fairly serious adjustments, but after that the end result remained stable and correct for many years without the need for change. Also, how would we prevent the maintenance burden, coming with the suggested approach of code duplication? Kafka is a hot spot in general, having to duplicate most changes to 2 implementations does feel like a huge tradeoff to me. |
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without reiterating previous reasons, the concern about complexity and maintainability is not primarily related to the number of lines or number of methods. If that were the only measure of maintainability, then this would be a different conversation.
The fundamental issue is the alternative consumption strategy that static partition assignment introduces. This is described well in the additional details copied from the initial implementation. The complexity shows itself not in the number of lines and methods, but in the different code paths and configuration requirements. This surfaces in elements such as the addition of the queue and map as member variables, but are only relevant to static partition configuration.
In previous implementations, one Processor supported a large number of different use cases. The question now is whether one Processor should continue to support a significantly different use case, or whether a separate Processor provides an understandable way forward.
|
"The question now is whether one Processor should continue to support a significantly different use case, or whether a separate Processor provides an understandable way forward." I think that's a fair and good question. I also think it warrants a cost-benefit analysis. I would consider 2 major concerns: UX and DevX. As for UX, there is value in encompassing the functionality of "consume from kafka" in a single entity and having the "how exactly" covered by configuration. Otherwise, the line between "similar" and "significantly different" use-case can get blurry. As for DevX, while I see the value of extension vs modification, the new processor would basically have the same SDLC as the current one. Whatever change is needed, it would be needed for both, most, if not all, of the time. That screams bad design. This is not a new feature. It has it's risks, but it's not something that we have nothing but prediction to rely on. We have a history of this in the previous implementation. Again, maybe I missed something, but as far as I can tell - specifically from maintenance perspective - it was fine because the logic is well isolated enough. |
|
Thanks for the reply @tpalfy. I agree UX and DevX are the two major areas of concern, but the response does not address the fundamental issues I have raised with each. This is admittedly a grey area, which is the reason for this extended conversation. The question is not about SDLC, but about cohesion of functionality. Coming at this from one additional angle, this feature was introduced in NiFi 1.13.0, which was prior to the implementation of Stateless execution. Now that the framework supports configuring any Process Group with Stateless execution, it is easier to design a coherent pipeline where the NiFi consumer does not commit offsets until after successful processing. It still requires careful pipeline design, but it means that records for a specific partition can be processed to completion, and if rebalancing occurs, the new consumer receives a coherent set of records. In a different direction, Kafka 4.0 introduced Queues which provide another more coherent alternative to attempted static partition assignments on the consumer side. Although this static partition assignment may have been a useful feature at the time, now over five years ago, these are some of the reasons I do not support bringing it back in the same way. I remain open to considering an approach with a separate Processor, and I am willing to work through a review to build that in an optimal way, although I now question whether it is even needed. With different capabilities available, and a different module design, I do not support bringing back this feature as it was previously implemented. |
Summary
NIFI-15226
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation