lianetm commented on code in PR #19754: URL: https://github.com/apache/kafka/pull/19754#discussion_r2123890568
########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetriever.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer; + +import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils; +import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpJwtRetriever; +import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils; +import org.apache.kafka.common.security.oauthbearer.internals.secured.JwtBearerRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionCreator; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionJwtTemplate; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.DefaultAssertionCreator; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.FileAssertionCreator; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.StaticAssertionJwtTemplate; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +import javax.net.ssl.SSLSocketFactory; +import javax.security.auth.login.AppConfigurationEntry; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_ALGORITHM; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_PASSPHRASE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionUtils.layeredAssertionJwtTemplate; + +/** + * {@code JwtBearerJwtRetriever} is a {@link JwtRetriever} that performs the steps to request + * a JWT from an OAuth/OIDC identity provider using the <code>urn:ietf:params:oauth:grant-type:jwt-bearer</code> + * grant type. This grant type is used for machine-to-machine "service accounts". + * + * <p/> + * + * This {@code JwtRetriever} is enabled by specifying its class name in the Kafka configuration. + * For client use, specify the class name in the <code>sasl.oauthbearer.jwt.retriever.class</code> + * configuration like so: + * + * <pre> + * sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever + * </pre> + * + * <p/> + * + * If using this {@code JwtRetriever} on the broker side (for inter-broker communication), the configuration + * should be specified with a listener-based property: + * + * <pre> + * listener.name.<listener name>.oauthbearer.sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever + * </pre> + * + * <p/> + * + * The {@code JwtBearerJwtRetriever} also uses the following configuration: + * + * <ul> + * <li><code>sasl.oauthbearer.assertion.algorithm</code></li> + * <li><code>sasl.oauthbearer.assertion.claim.aud</code></li> + * <li><code>sasl.oauthbearer.assertion.claim.exp.seconds</code></li> + * <li><code>sasl.oauthbearer.assertion.claim.iss</code></li> + * <li><code>sasl.oauthbearer.assertion.claim.jti.include</code></li> + * <li><code>sasl.oauthbearer.assertion.claim.nbf.seconds</code></li> + * <li><code>sasl.oauthbearer.assertion.claim.sub</code></li> + * <li><code>sasl.oauthbearer.assertion.file</code></li> + * <li><code>sasl.oauthbearer.assertion.private.key.file</code></li> + * <li><code>sasl.oauthbearer.assertion.private.key.passphrase</code></li> + * <li><code>sasl.oauthbearer.assertion.template.file</code></li> + * <li><code>sasl.oauthbearer.jwt.retriever.class</code></li> + * <li><code>sasl.oauthbearer.scope</code></li> + * <li><code>sasl.oauthbearer.token.endpoint.url</code></li> + * </ul> + * + * Please refer to the official Apache Kafka documentation for more information on these, and related, configuration. + * + * <p/> + * + * Here's an example of the JAAS configuration for a Kafka client: + * + * <pre> + * sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; + * + * sasl.oauthbearer.assertion.algorithm=RS256 + * sasl.oauthbearer.assertion.claim.aud=my-application-audience + * sasl.oauthbearer.assertion.claim.exp.seconds=600 + * sasl.oauthbearer.assertion.claim.iss=my-oauth-issuer + * sasl.oauthbearer.assertion.claim.jti.include=true + * sasl.oauthbearer.assertion.claim.nbf.seconds=120 + * sasl.oauthbearer.assertion.claim.sub=kafka-app-1234 + * sasl.oauthbearer.assertion.private.key.file=/path/to/private.key + * sasl.oauthbearer.assertion.private.key.passphrase=$3cr3+ + * sasl.oauthbearer.assertion.template.file=/path/to/assertion-template.json + * sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever + * sasl.oauthbearer.scope=my-application-scope + * sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token + * </pre> + */ +public class JwtBearerJwtRetriever implements JwtRetriever { + + private final Time time; + private HttpJwtRetriever delegate; + private AssertionJwtTemplate assertionJwtTemplate; + private AssertionCreator assertionCreator; + + public JwtBearerJwtRetriever() { + this(Time.SYSTEM); + } + + public JwtBearerJwtRetriever(Time time) { + this.time = time; + } + + @Override + public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { Review Comment: I don't see any test for this configure logic on the `JwtBearerJwtRetriever`, just checking it is coming in the following PR? ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/AssertionUtils.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.oauthbearer.internals.secured.assertion; + +import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils; +import org.apache.kafka.common.utils.Time; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.security.KeyFactory; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.Signature; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import javax.crypto.Cipher; +import javax.crypto.EncryptedPrivateKeyInfo; +import javax.crypto.SecretKey; +import javax.crypto.SecretKeyFactory; +import javax.crypto.spec.PBEKeySpec; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_ALGORITHM; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_TEMPLATE_FILE; + +/** + * Set of utilities for the OAuth JWT assertion logic. + */ +public class AssertionUtils { + + public static final String TOKEN_SIGNING_ALGORITHM_RS256 = "RS256"; + public static final String TOKEN_SIGNING_ALGORITHM_ES256 = "ES256"; + + /** + * Inspired by {@code org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.PemStore}, which is not + * visible to reuse directly. + */ + public static PrivateKey privateKey(byte[] privateKeyContents, + Optional<String> passphrase) throws GeneralSecurityException, IOException { + PKCS8EncodedKeySpec keySpec; + + if (passphrase.isPresent()) { + EncryptedPrivateKeyInfo keyInfo = new EncryptedPrivateKeyInfo(privateKeyContents); + String algorithm = keyInfo.getAlgName(); + SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance(algorithm); + SecretKey pbeKey = secretKeyFactory.generateSecret(new PBEKeySpec(passphrase.get().toCharArray())); + Cipher cipher = Cipher.getInstance(algorithm); + cipher.init(Cipher.DECRYPT_MODE, pbeKey, keyInfo.getAlgParameters()); + keySpec = keyInfo.getKeySpec(cipher); + } else { + byte[] pkcs8EncodedBytes = Base64.getDecoder().decode(privateKeyContents); + keySpec = new PKCS8EncodedKeySpec(pkcs8EncodedBytes); + } + + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + return keyFactory.generatePrivate(keySpec); + } + + public static Signature getSignature(String algorithm) throws GeneralSecurityException { + if (algorithm.equalsIgnoreCase(TOKEN_SIGNING_ALGORITHM_RS256)) { + return Signature.getInstance("SHA256withRSA"); + } else if (algorithm.equalsIgnoreCase(TOKEN_SIGNING_ALGORITHM_ES256)) { + return Signature.getInstance("SHA256withECDSA"); + } else { + throw new NoSuchAlgorithmException(String.format("Unsupported signing algorithm: %s", algorithm)); Review Comment: has this been addressed? ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -143,7 +311,7 @@ public class SaslConfigs { public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url"; public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC = "The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer's token" - + " endpoint URL to which requests will be made to login based on the configuration in " + SASL_JAAS_CONFIG + ". If the URL is file-based, it" + + " endpoint URL to which requests will be made to login based on the configured <code>JwtRetriever</code>. If the URL is file-based, it" Review Comment: should we reference the `SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS` config here instead of the class name? Seeing the related config name is probably more helpful (similar to what was done before for the old config) ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtValidator</code> implementation used to" + + " validate the JWT from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_GRANT_TYPE = "sasl.oauthbearer.grant.type"; + public static final String DEFAULT_SASL_OAUTHBEARER_GRANT_TYPE = "client_credentials"; + public static final String SASL_OAUTHBEARER_GRANT_TYPE_DOC = "The OAuth grant type to use when communicating with the identity provider. On the whole, the" + + " OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the <code>JwtRetriever</code> implementation." + + " The default value of <code>client_credentials</code> maintains backward compatibility. The built-in grant types are:" + + "<ul>" + + "<li><code>client_credentials</code></li>" + + "<li><code>urn:ietf:params:oauth:grant-type:jwt-bearer</code></li>" + + "</ul>" + + "<p>The OAuth code in Apache Kafka does not limit the values that are used. A user can write a custom <code>JwtRetriever</code> implementation that uses" + + " a completely different grant type, if desired.</p>"; + + public static final String SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope"; + public static final String SASL_OAUTHBEARER_SCOPE_DOC = "<p>This is the level of access a client application is granted to a resource or API which is" + + " included in the token request. If provided, it should match one or more scopes configured in the identity provider.</p>" + + "<p>" + + "The scope was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>scope</code>." + + " For backward compatibility, the <code>scope</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." Review Comment: This is missing (Do we want to add the note the `sasl.jaas.config`? about the deprecated options and how the new props will take precedence) ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientCredentialsRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils; +import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpJwtRetriever; +import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +import javax.net.ssl.SSLSocketFactory; +import javax.security.auth.login.AppConfigurationEntry; + +import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; + +/** + * {@code ClientCredentialsJwtRetriever} is a {@link JwtRetriever} that performs the steps to request + * a JWT from an OAuth/OIDC identity provider using the <code>client_credentials</code> grant type. This + * grant type is commonly used for non-interactive "service accounts" where there is no user available + * to interactively supply credentials. + * + * <p/> + * + * This {@code JwtRetriever} is enabled by specifying its class name in the Kafka configuration. + * For client use, specify the class name in the <code>sasl.oauthbearer.jwt.retriever.class</code> + * configuration like so: + * + * <pre> + * sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever + * </pre> + * + * <p/> + * + * If using this {@code JwtRetriever} on the broker side (for inter-broker communication), the configuration + * should be specified with a listener-based property: + * + * <pre> + * listener.name.<listener name>.oauthbearer.sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever + * </pre> + * + * <p/> + * + * The {@code ClientCredentialsJwtRetriever} also uses the following configuration: + * + * <ul> + * <li><code>sasl.oauthbearer.client.credentials.client.id</code></li> + * <li><code>sasl.oauthbearer.client.credentials.client.secret</code></li> + * <li><code>sasl.oauthbearer.scope</code></li> + * <li><code>sasl.oauthbearer.token.endpoint.url</code></li> + * </ul> + * + * Please refer to the official Apache Kafka documentation for more information on these, and related configuration. + * + * <p/> + * + * Previous versions of this implementation used <code>sasl.jaas.config</code> to specify attributes such + * as <code>clientId</code>, <code>clientSecret</code>, and <code>scope</code>. These will still work, but + * if the configuration for each of these is specified, it will be used instead of the JAAS option. + * + * <p/> + * + * Here's an example of the JAAS configuration for a Kafka client: + * + * <pre> + * sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; + * + * sasl.oauthbearer.client.credentials.client.id=jdoe + * sasl.oauthbearer.client.credentials.client.secret=$3cr3+ + * sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever + * sasl.oauthbearer.scope=my-application-scope + * sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token + * </pre> + */ +public class ClientCredentialsJwtRetriever implements JwtRetriever { + + private static final Logger LOG = LoggerFactory.getLogger(ClientCredentialsJwtRetriever.class); + + private HttpJwtRetriever delegate; + + @Override + public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { + ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism); + JaasOptionsUtils jou = new JaasOptionsUtils(JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries)); + + URL tokenEndpointUrl = cu.validateUrl(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL); + + ConfigOrJaas configOrJaas = new ConfigOrJaas(cu, jou); + String clientId = configOrJaas.clientId(); + String clientSecret = configOrJaas.clientSecret(); + String scope = configOrJaas.scope(); + + SSLSocketFactory sslSocketFactory = null; + + if (jou.shouldCreateSSLSocketFactory(tokenEndpointUrl)) + sslSocketFactory = jou.createSSLSocketFactory(); + + boolean urlencodeHeader = validateUrlencodeHeader(cu); + + HttpRequestFormatter requestFormatter = new ClientCredentialsRequestFormatter( + clientId, + clientSecret, + scope, + urlencodeHeader + ); + + delegate = new HttpJwtRetriever( + requestFormatter, + sslSocketFactory, + tokenEndpointUrl.toString(), + cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS), + cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS), + cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false), + cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false) + ); + } + + @Override + public String retrieve() throws JwtRetrieverException { + if (delegate == null) + throw new IllegalStateException("JWT retriever delegate is null; please call configure() first"); + + return delegate.retrieve(); + } + + /** + * In some cases, the incoming {@link Map} doesn't contain a value for + * {@link SaslConfigs#SASL_OAUTHBEARER_HEADER_URLENCODE}. Returning {@code null} from {@link Map#get(Object)} + * will cause a {@link NullPointerException} when it is later unboxed. + * + * <p/> + * + * This utility method ensures that we have a non-{@code null} value to use in the + * {@link HttpJwtRetriever} constructor. + */ + static boolean validateUrlencodeHeader(ConfigurationUtils configurationUtils) { + Boolean urlencodeHeader = configurationUtils.get(SASL_OAUTHBEARER_HEADER_URLENCODE); + return Objects.requireNonNullElse(urlencodeHeader, DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE); + } + + /** + * Retrieves the values first from configuration, then falls back to JAAS, and, if required, throws an error. + */ + private static class ConfigOrJaas { + + private final ConfigurationUtils cu; + private final JaasOptionsUtils jou; + + private ConfigOrJaas(ConfigurationUtils cu, JaasOptionsUtils jou) { + this.cu = cu; + this.jou = jou; + } + + private String clientId() { + return getValue( + CLIENT_ID_CONFIG, + "clientId", + true, + cu::validateString, + jou::validateString + ); + } + + private String clientSecret() { + return getValue( + CLIENT_SECRET_CONFIG, + "clientSecret", + true, + cu::validatePassword, + jou::validateString + ); + } + + private String scope() { + return getValue( + SCOPE_CONFIG, + "scope", + false, + cu::validateString, + jou::validateString + ); + } + + private String getValue(String configName, + String jaasName, + boolean isRequired, + Function<String, String> configValueGetter, + Function<String, String> jaasValueGetter) { + if (cu.containsKey(configName)) { + return configValueGetter.apply(configName); Review Comment: did we consider logging a warn in case `cu.containsKey(configName) && jou.containsKey(jaasName)`? Otherwise it will go silently that the user has 2 conflicting properties, and that the jaas oauth option will be ignored -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org