Skip to content

Conversation

@tpalfy
Copy link
Contributor

@tpalfy tpalfy commented Nov 17, 2025

Summary

NIFI-15226

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-nar-utils</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency appears to be unused, can it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.nifi.mock.MockComponentLogger is referenced in TestConsumerPartitionsUtil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. In that case, this dependency should be removed and MockComponentLogger should be replaced with MockComponentLog from nifi-mock to avoid referencing framework modules in extension modules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preserved this class from the older change. But actually we don't need either. We can just mock the logger with Mockito.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial build fails on the following tests:

Error:    ConsumeKafkaTest.testVerifyFailed:116 » NullPointer Cannot invoke "org.apache.nifi.kafka.service.api.consumer.PollingContext.getTopics()" because "this.pollingContext" is null
Error:    ConsumeKafkaTest.testVerifySuccessful:99 » NullPointer Cannot invoke "org.apache.nifi.kafka.service.api.consumer.PollingContext.getTopics()" because "this.pollingContext" is null

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work on this @tpalfy. I noted a handful of minor recommendations, and plan to take a closer look at some of the implementation details.

private final AutoOffsetReset autoOffsetReset;

public Subscription(final String groupId, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) {
public Subscription(final String groupId, final Integer partition, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor, but I recommend placing partition after topics to align with general hierarchy:

Suggested change
public Subscription(final String groupId, final Integer partition, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) {
public Subscription(final String groupId, final Collection<String> topics, final Integer partition, final AutoOffsetReset autoOffsetReset) {

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

public class TestConsumerPartitionsUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor note, the public modifiers on the class and method level are not necessary for JUnit 5. Since this is a new test class, recommend removing them.

}

@Test
public void testNoPartitionAssignments() throws UnknownHostException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnknownHostException does not appear to be thrown in this and other methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerPartitionsUtil.getPartitionsForHost throws it.

return !hostnameToPartitionMapping.isEmpty();
}

public static int getPartitionAssignmentCount(final Map<String, String> properties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend moving all public methods before all private methods in this class.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tpalfy Tested with different scenarios, including fewer or more concurrent tasks than partitions, and it works as expected.

Added one minor comment inline and one more thing here: There is some legacy code referencing partition properties in DynamicPropertyValidator. Please clean this up, as the partition dynamic properties are not applied to the Kafka controller service.

@tpalfy tpalfy force-pushed the NIFI-15226-Kafka-static-partition-mapping branch from 6269ef2 to d1da7f6 Compare December 11, 2025 19:15
Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the latest changes @tpalfy.

+1 from my side.

@exceptionfactory
Copy link
Contributor

Thanks for the updates @tpalfy and thanks for the review @turcsanyip, I will follow up and also review the latest changes soon.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the adjustments @tpalfy, and thanks for the review @turcsanyip.

On a more detailed review, I am concerned about the level of complexity introduced with these changes. The ConsumerPartitionsUtil is very complex with implementation details based on current host and dynamic properties. The addition of partition-based dynamic property names, plus host names, makes the configuration non-portable between NiFi installations. This complexity is also evident in the handling of KafkaConsumerService instances.

For these reasons, I am not supportive of the current proposed set of changes.

The historical implementation grew very complex and very difficult to maintain, so considering other options would be helpful.

One option that comes to mind is a separate Processor, named something like ConsumeKafkaPartition. That would clearly communicate the purpose, allowing for more focused logic. I would not attempt to extend the current ConsumeKafka Processor, so copying some the existing implementation would be reasonable. As much of the handling logic is in other classes, there should be some code reuse opportunities, without a subclass.

I'm open to other options that would avoid introducing this level of complexity to the current Processor.

@turcsanyip
Copy link
Contributor

Thanks for sharing your thoughts and concerns @exceptionfactory.

I agree that ConsumerPartitionsUtil is complex. However, it is worth noting that it was directly reused from the NiFi 1.x implementation where it was a single commit that worked properly. So I would not consider it legacy code that was getting more and more complicated over time. I also reviewed it in this round and I did not find how it could be simplified / changed. This brings us to your second concern about portability. The partitions need to be bound to a NiFi node and hostnames seem the only way to do it. So even if a separate processor were added, it would still have the portability issue.

Considering that the change does not affect the existing users of the processor (no config change, portability issue does not arise if the feature is not used), I do not think it would be worth adding a new processor that requires duplicated code leading to additional maintenance effort and issues going forward.

As far as I can recall, the original plan of migrating to Kafka 3 processors was to implement the core functionality in the first round and add the extra features later. Based on this, and considering that a separate implementation would also introduce complexity and non-portability issues via ConsumerPartitionsUtil, I believe we could add this feature to the current ConsumeKafka processor.

@exceptionfactory
Copy link
Contributor

Thanks for the thoughtful reply @turcsanyip.

Although this capability and general approach was implemented in NiFi 1, part of the purpose of refactoring Kafka support was to provide better decoupling of features. The majority of that separation comes with the Kafka Connection Service, and it is good to see that these proposed changes have minimal impact on that Controller Service interface surface.

With that being said, the proposed changes do introduce significant logic into ConsumeKafka that is specific to this particular capability. Although it does not impact existing configuration, it does increase the maintenance of the ConsumeKafka implementation in general. For that reason, I would much rather see this particular use case decoupled into its own Processor.

A separate Processor also has benefits from a flow design perspective, as it provides a clearer distinction on the intended use case of static partition assignment to individual NiFi nodes. Given that such a use case is less common, keeping it distinct from the regular ConsumeKafka Processor is helpful from both a flow design and component implementation perspective. From an implement perspective, it also provides the opportunity to define more of the implementation in the Processor itself, versus in a separate Util class. Of course these are tradeoffs, but I think the benefits outweigh the costs.

@tpalfy
Copy link
Contributor Author

tpalfy commented Jan 8, 2026

@exceptionfactory Can you please provide some objective reasoning for considering the ConsumerPartitionsUtil too complex? Here are my objective reasoning to argue against it:

  1. 210 rows. Not a big class by any measure.
  2. 4 public methods and 4 private methods. This is not out of the ordinary for any util class.
  3. Only java.net and java.util imports, apart from org.apache.nifi.components.ValidationResult and org.apache.nifi.logging.ComponentLog. The logic in it is exceptionally well contained.
  4. In NiFi 1.x the same class had 1 change since it's inception in 2020, and that was about logging. The class is exceptionally stable and isolated.
  5. I found only 2 changes related to static partition mapping in the old code. The logic implemented in this PR intentionally follows that implementation for the very reason to minimize risk.

Also there's an obvious contradiction in adding a separate processor. Code duplication would mean that basically all the changes done to ConsumeKafka would need to be carried over to the new class. Now that would be a maintenance nightmare.

According to the original Jira https://issues.apache.org/jira/browse/NIFI-11259, it was agreed upon that full compatibility will be achieved by adding this feature later. There is a Jira (still open) that proves this:https://issues.apache.org/jira/browse/NIFI-13654
That is also an objective fact.

Here's the implementation as agreed upon, following the original as closely as possible, which was proven to be working and stable by the previous change history. Can't see how we can get better than that.

Unless more adequate reasons are provided to deny this change, this should move forward.

@joewitt
Copy link
Contributor

joewitt commented Jan 8, 2026

@tpalfy I think we can all agree @exceptionfactory has done amazing work increasing quality and decreasing technical debt. We need more of what he does - not less. He is also far and away the most prolific reviewer which we also need more people doing.

Declaring his or any reviewers response as inadequate doesnt help your case. Stating his feedback isnt objective doesnt help either.

@tpalfy
Copy link
Contributor Author

tpalfy commented Jan 8, 2026

@joewitt Thank you for the input. I would be interested in your opinion as well about the PR itself.

As I see, the original implementation being too complex and hard to maintain (which was the original argument) is not supported by the change history. It did need 2 fairly serious adjustments, but after that the end result remained stable and correct for many years without the need for change.
Maybe I'm missing something here, but I was trying to be very careful to make sure this change works properly.

Also, how would we prevent the maintenance burden, coming with the suggested approach of code duplication? Kafka is a hot spot in general, having to duplicate most changes to 2 implementations does feel like a huge tradeoff to me.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without reiterating previous reasons, the concern about complexity and maintainability is not primarily related to the number of lines or number of methods. If that were the only measure of maintainability, then this would be a different conversation.

The fundamental issue is the alternative consumption strategy that static partition assignment introduces. This is described well in the additional details copied from the initial implementation. The complexity shows itself not in the number of lines and methods, but in the different code paths and configuration requirements. This surfaces in elements such as the addition of the queue and map as member variables, but are only relevant to static partition configuration.

In previous implementations, one Processor supported a large number of different use cases. The question now is whether one Processor should continue to support a significantly different use case, or whether a separate Processor provides an understandable way forward.

@tpalfy
Copy link
Contributor Author

tpalfy commented Jan 8, 2026

"The question now is whether one Processor should continue to support a significantly different use case, or whether a separate Processor provides an understandable way forward."

I think that's a fair and good question. I also think it warrants a cost-benefit analysis. I would consider 2 major concerns: UX and DevX.

As for UX, there is value in encompassing the functionality of "consume from kafka" in a single entity and having the "how exactly" covered by configuration. Otherwise, the line between "similar" and "significantly different" use-case can get blurry.

As for DevX, while I see the value of extension vs modification, the new processor would basically have the same SDLC as the current one. Whatever change is needed, it would be needed for both, most, if not all, of the time. That screams bad design.

This is not a new feature. It has it's risks, but it's not something that we have nothing but prediction to rely on. We have a history of this in the previous implementation. Again, maybe I missed something, but as far as I can tell - specifically from maintenance perspective - it was fine because the logic is well isolated enough.

@exceptionfactory
Copy link
Contributor

Thanks for the reply @tpalfy.

I agree UX and DevX are the two major areas of concern, but the response does not address the fundamental issues I have raised with each. This is admittedly a grey area, which is the reason for this extended conversation. The question is not about SDLC, but about cohesion of functionality.

Coming at this from one additional angle, this feature was introduced in NiFi 1.13.0, which was prior to the implementation of Stateless execution. Now that the framework supports configuring any Process Group with Stateless execution, it is easier to design a coherent pipeline where the NiFi consumer does not commit offsets until after successful processing. It still requires careful pipeline design, but it means that records for a specific partition can be processed to completion, and if rebalancing occurs, the new consumer receives a coherent set of records.

In a different direction, Kafka 4.0 introduced Queues which provide another more coherent alternative to attempted static partition assignments on the consumer side.

Although this static partition assignment may have been a useful feature at the time, now over five years ago, these are some of the reasons I do not support bringing it back in the same way. I remain open to considering an approach with a separate Processor, and I am willing to work through a review to build that in an optimal way, although I now question whether it is even needed. With different capabilities available, and a different module design, I do not support bringing back this feature as it was previously implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants