Skip to content

Commit 46d3442

Browse files
turcsanyippvillard31
authored andcommitted
NIFI-14782 Extended Kafka3ConnectionService OAuth authentication with SASL Extensions support
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #10567.
1 parent 5a87d32 commit 46d3442

File tree

5 files changed

+99
-8
lines changed

5 files changed

+99
-8
lines changed

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,16 @@
8484
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION;
8585
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD;
8686
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE;
87+
import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.SASL_EXTENSION_PROPERTY_PREFIX;
88+
import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
89+
import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.removeSaslExtensionPropertyPrefix;
8790

8891
@Tags({"Apache", "Kafka", "Message", "Publish", "Consume"})
89-
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
90-
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
92+
@DynamicProperty(name = "The name of a Kafka configuration property or a SASL extension property.", value = "The value of the given property.",
93+
description = "Kafka configuration properties will be added on the Kafka configuration after loading any provided configuration properties."
9194
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
92-
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.",
95+
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration."
96+
+ " SASL extension properties can be specified in " + SASL_EXTENSION_PROPERTY_PREFIX + "propertyName format (e.g. " + SASL_EXTENSION_PROPERTY_PREFIX + "logicalCluster).",
9397
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
9498
@CapabilityDescription("Provides and manages connections to Kafka Brokers for producer or consumer operations.")
9599
public class Kafka3ConnectionService extends AbstractControllerService implements KafkaConnectionService, VerifiableControllerService, KafkaClientComponent {
@@ -214,12 +218,26 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
214218

215219
@Override
216220
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
221+
final String propertyName;
222+
final String propertyType;
223+
final ExpressionLanguageScope expressionLanguageScope;
224+
225+
if (isSaslExtensionProperty(propertyDescriptorName)) {
226+
propertyName = removeSaslExtensionPropertyPrefix(propertyDescriptorName);
227+
propertyType = "SASL Extension";
228+
expressionLanguageScope = ExpressionLanguageScope.NONE;
229+
} else {
230+
propertyName = propertyDescriptorName;
231+
propertyType = "Kafka Configuration";
232+
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT;
233+
}
234+
217235
return new PropertyDescriptor.Builder()
218-
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
236+
.description("Specifies the value for '%s' %s property.".formatted(propertyName, propertyType))
219237
.name(propertyDescriptorName)
220238
.addValidator(new DynamicPropertyValidator(ProducerConfig.class, ConsumerConfig.class))
221239
.dynamic(true)
222-
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
240+
.expressionLanguageSupported(expressionLanguageScope)
223241
.build();
224242
}
225243

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.apache.nifi.kafka.service.security;
1818

1919
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
20+
import org.apache.kafka.common.security.auth.SaslExtensions;
21+
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
2022
import org.apache.kafka.common.security.oauthbearer.ClientJwtValidator;
2123
import org.apache.kafka.common.security.oauthbearer.JwtValidatorException;
2224
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
@@ -29,10 +31,16 @@
2931
import org.slf4j.LoggerFactory;
3032

3133
import javax.security.auth.callback.Callback;
34+
import javax.security.auth.callback.UnsupportedCallbackException;
3235
import javax.security.auth.login.AppConfigurationEntry;
3336

37+
import java.util.Collections;
3438
import java.util.List;
3539
import java.util.Map;
40+
import java.util.stream.Collectors;
41+
42+
import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
43+
import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.removeSaslExtensionPropertyPrefix;
3644

3745
/**
3846
* {@link org.apache.kafka.common.security.auth.AuthenticateCallbackHandler} implementation to support OAuth 2 in NiFi Kafka components.
@@ -49,6 +57,8 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand
4957
private OAuth2AccessTokenProvider accessTokenProvider;
5058
private ClientJwtValidator accessTokenValidator;
5159

60+
private Map<String, String> saslExtensions;
61+
5262
@Override
5363
public void configure(final Map<String, ?> configs, final String saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
5464
final Map<String, Object> options = JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
@@ -72,20 +82,32 @@ public void configure(final Map<String, ?> configs, final String saslMechanism,
7282
this.accessTokenProvider = accessTokenProvider;
7383
this.accessTokenValidator = new ClientJwtValidator();
7484
this.accessTokenValidator.configure(configs, saslMechanism, List.of());
85+
86+
this.saslExtensions = options.entrySet().stream()
87+
.filter(entry -> isSaslExtensionProperty(entry.getKey()))
88+
.collect(Collectors.collectingAndThen(
89+
Collectors.toMap(entry -> removeSaslExtensionPropertyPrefix(entry.getKey()), entry -> entry.getValue().toString()),
90+
Collections::unmodifiableMap));
7591
}
7692

7793
@Override
78-
public void handle(final Callback[] callbacks) {
94+
public void handle(final Callback[] callbacks) throws UnsupportedCallbackException {
7995
for (final Callback callback : callbacks) {
8096
if (callback instanceof OAuthBearerTokenCallback) {
8197
handleTokenCallback((OAuthBearerTokenCallback) callback);
98+
} else if (callback instanceof SaslExtensionsCallback) {
99+
handleExtensionsCallback((SaslExtensionsCallback) callback);
100+
} else {
101+
throw new UnsupportedCallbackException(callback);
82102
}
83103
}
84104
}
85105

86106
private void handleTokenCallback(final OAuthBearerTokenCallback callback) {
87107
final String accessToken;
88108
try {
109+
// Kafka's ExpiringCredentialRefreshingLogin calls this method when the current token is about to expire and expects a refreshed token, so forcefully update it
110+
accessTokenProvider.refreshAccessDetails();
89111
accessToken = accessTokenProvider.getAccessDetails().getAccessToken();
90112
} catch (Exception e) {
91113
LOGGER.error("Could not retrieve access token", e);
@@ -102,6 +124,11 @@ private void handleTokenCallback(final OAuthBearerTokenCallback callback) {
102124
}
103125
}
104126

127+
private void handleExtensionsCallback(final SaslExtensionsCallback callback) {
128+
// a unique SaslExtensions object must be returned otherwise it will be lost upon relogin
129+
callback.extensions(new SaslExtensions(saslExtensions));
130+
}
131+
105132
@Override
106133
public void close() {
107134
}

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
2121

2222
import static javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
23+
import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
2324

2425
/**
2526
* SASL OAuthBearer Login Module implementation of configuration provider
@@ -33,7 +34,8 @@ public class OAuthBearerLoginConfigProvider implements LoginConfigProvider {
3334
* Get JAAS configuration for activating OAuthBearer Login Module.
3435
* The login module uses callback handlers to acquire Access Tokens. NiFi's callback handler relies on {@link org.apache.nifi.oauth2.OAuth2AccessTokenProvider} controller service to get the token.
3536
* The controller service will be passed to the callback handler via Kafka config map (as an object, instead of the string-based JAAS config).
36-
* The JAAS config contains the service id in order to make the callback handler unique to the given service (Kafka creates separate callback handlers based on JAAS config).
37+
* The JAAS config contains the service id and the SASL extension properties in order to make the callback handler unique to the given configuration
38+
* (the Kafka framework creates separate callback handlers based on JAAS config).
3739
*
3840
* @param context Property Context
3941
* @return JAAS configuration with OAuthBearer Login Module
@@ -45,6 +47,10 @@ public String getConfiguration(final PropertyContext context) {
4547
final String serviceId = context.getProperty(KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE).getValue();
4648
builder.append(SERVICE_ID_KEY, serviceId);
4749

50+
context.getAllProperties().entrySet().stream()
51+
.filter(entry -> isSaslExtensionProperty(entry.getKey()))
52+
.forEach(entry -> builder.append(entry.getKey(), entry.getValue()));
53+
4854
return builder.build();
4955
}
5056
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.kafka.shared.util;
18+
19+
/**
20+
* Utility class to handle SASL Extension properties
21+
*/
22+
public class SaslExtensionUtil {
23+
24+
public static final String SASL_EXTENSION_PROPERTY_PREFIX = "sasl_extension_";
25+
26+
private SaslExtensionUtil() {
27+
}
28+
29+
public static boolean isSaslExtensionProperty(final String propertyName) {
30+
return propertyName.startsWith(SASL_EXTENSION_PROPERTY_PREFIX);
31+
}
32+
33+
public static String removeSaslExtensionPropertyPrefix(final String propertyName) {
34+
return propertyName.substring(SASL_EXTENSION_PROPERTY_PREFIX.length());
35+
}
36+
}

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.HashSet;
2626
import java.util.Set;
2727

28+
import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.SASL_EXTENSION_PROPERTY_PREFIX;
29+
2830
/**
2931
* Validator for dynamic Kafka properties
3032
*/
@@ -48,10 +50,12 @@ public ValidationResult validate(final String subject, final String input, final
4850

4951
if (subject.startsWith(PARTITIONS_PROPERTY_PREFIX)) {
5052
builder.valid(true);
53+
} else if (subject.startsWith(SASL_EXTENSION_PROPERTY_PREFIX)) {
54+
builder.valid(true);
5155
} else {
5256
final boolean valid = clientPropertyNames.contains(subject);
5357
builder.valid(valid);
54-
builder.explanation("must be a known Kafka client configuration property");
58+
builder.explanation("must be a known Kafka client configuration property or a SASL extension property");
5559
}
5660

5761
return builder.build();

0 commit comments

Comments
 (0)