This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 14adbad94 ATLAS-4788 : Kafka password is in clear text in
application.properties
14adbad94 is described below
commit 14adbad94a0de95f6d390b2d8c80d9bd914c0438
Author: chaitali <[email protected]>
AuthorDate: Mon Oct 30 17:13:07 2023 +0530
ATLAS-4788 : Kafka password is in clear text in application.properties
Signed-off-by: Pinal Shah <[email protected]>
---
.../java/org/apache/atlas/utils/KafkaUtils.java | 17 ++-
.../org/apache/atlas/utils/KafkaUtilsTest.java | 123 ++++++++++++++++++++-
2 files changed, 138 insertions(+), 2 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
index 167442259..672caa8e6 100644
--- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -19,6 +19,7 @@
package org.apache.atlas.utils;
import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.security.SecurityUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -43,6 +44,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import static
org.apache.atlas.security.SecurityProperties.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
public class KafkaUtils implements AutoCloseable {
@@ -62,6 +64,9 @@ public class KafkaUtils implements AutoCloseable {
public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY =
"sasl.jaas.config";
+ public static final String JAAS_PASSWORD_SUFFIX = "password";
+ private static final String JAAS_MASK_PASSWORD = "********";
+
final protected Properties kafkaConfiguration;
final protected AdminClient adminClient;
final protected boolean importInternalTopics;
@@ -254,6 +259,7 @@ public class KafkaUtils implements AutoCloseable {
String optionPrefix = keyPrefix +
JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
String principalOptionKey = optionPrefix +
JAAS_PRINCIPAL_PROP;
+ String passwordOptionKey = optionPrefix +
JAAS_PASSWORD_SUFFIX;
int optionPrefixLen = optionPrefix.length();
StringBuffer optionStringBuffer = new StringBuffer();
@@ -271,7 +277,16 @@ public class KafkaUtils implements AutoCloseable {
} catch (IOException e) {
LOG.warn("Failed to build serverPrincipal. Using
provided value:[{}]", optionVal);
}
-
+ if (key.equalsIgnoreCase(passwordOptionKey)) {
+ String jaasKafkaClientConfigurationProperty =
"atlas.jaas.KafkaClient.option.password";
+ if
(JAAS_MASK_PASSWORD.equals(configuration.getString(jaasKafkaClientConfigurationProperty)))
{
+ try {
+ optionVal =
SecurityUtil.getPassword(configuration, jaasKafkaClientConfigurationProperty,
HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH);
+ } catch (Exception e) {
+ LOG.error("Error in getting secure
password ", e);
+ }
+ }
+ }
optionVal = surroundWithQuotes(optionVal);
optionStringBuffer.append(String.format(" %s=%s",
key.substring(optionPrefixLen), optionVal));
diff --git a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
index 562e28ae1..9b4f093e7 100644
--- a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
+++ b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
@@ -20,13 +20,20 @@ package org.apache.atlas.utils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.JaasContext;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.Test;
+
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -37,6 +44,11 @@ import static org.testng.Assert.fail;
public class KafkaUtilsTest {
+ protected Path jksPath;
+ protected String providerUrl;
+
+ protected static final String JAAS_MASKED_PASSWORD = "keypass";
+
@Test
public void testSetKafkaJAASPropertiesForAllProperValues() {
Properties properties = new Properties();
@@ -262,6 +274,94 @@ public class KafkaUtilsTest {
}
}
+ @Test
+ public void testSetKafkaJAASPropertiesForClearTextPassword() throws
Exception {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+ setupCredentials();
+ final String loginModuleName =
"org.apache.kafka.common.security.scram.ScramLoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "false";
+ final String optionServiceName = "kafka";
+ final String optionTokenAuth = "true";
+ final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ";
+ final String optionPassword = "admin123";
+
+ configuration.setProperty("atlas.kafka.bootstrap.servers",
"localhost:9100");
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",
optionServiceName);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth",
optionTokenAuth);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.username",
optionUsername);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.password",
optionPassword);
+ configuration.setProperty("hadoop.security.credential.provider.path",
providerUrl);
+
+ try (MockedStatic mockedKafkaUtilsClass =
Mockito.mockStatic(KafkaUtils.class)) {
+ mockedKafkaUtilsClass.when(() ->
KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+ mockedKafkaUtilsClass.when(() ->
KafkaUtils.setKafkaJAASProperties(configuration,
properties)).thenCallRealMethod();
+
+ KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+ String newPropertyValue =
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),
"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=\"" +
optionUseKeyTab + "\""), "useKeyTab not present in new property or value
doesn't match");
+ assertTrue(newPropertyValue.contains("storeKey=\"" +
optionStoreKey + "\""), "storeKey not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("serviceName=\"" +
optionServiceName + "\""), "serviceName not present in new property or value
doesn't match");
+ assertTrue(newPropertyValue.contains("tokenauth=\"" +
optionTokenAuth + "\""), "tokenauth not pres////.ent in new property or value
doesn't match");
+ assertTrue(newPropertyValue.contains("username=\"" +
optionUsername + "\""), "username not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("password=\"" +
optionPassword + "\""), "password not present in new property or value doesn't
match");
+ assertJaaSConfigLoadable(newPropertyValue);
+ }
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForPasswordEncryption() throws
Exception {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+ setupCredentials();
+ final String loginModuleName =
"org.apache.kafka.common.security.scram.ScramLoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "false";
+ final String optionServiceName = "kafka";
+ final String optionTokenAuth = "true";
+ final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ";
+ final String optionPassword = "********";
+
+ configuration.setProperty("atlas.kafka.bootstrap.servers",
"localhost:9100");
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",
optionServiceName);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth",
optionTokenAuth);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.username",
optionUsername);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.password",
optionPassword);
+ configuration.setProperty("hadoop.security.credential.provider.path",
providerUrl);
+
+ try (MockedStatic mockedKafkaUtilsClass =
Mockito.mockStatic(KafkaUtils.class)) {
+ mockedKafkaUtilsClass.when(() ->
KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+ mockedKafkaUtilsClass.when(() ->
KafkaUtils.setKafkaJAASProperties(configuration,
properties)).thenCallRealMethod();
+
+ KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+ String newPropertyValue =
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),
"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=\"" +
optionUseKeyTab + "\""), "useKeyTab not present in new property or value
doesn't match");
+ assertTrue(newPropertyValue.contains("storeKey=\"" +
optionStoreKey + "\""), "storeKey not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("serviceName=\"" +
optionServiceName + "\""), "serviceName not present in new property or value
doesn't match");
+ assertTrue(newPropertyValue.contains("tokenauth=\"" +
optionTokenAuth + "\""), "tokenauth not pres////.ent in new property or value
doesn't match");
+ assertTrue(newPropertyValue.contains("username=\"" +
optionUsername + "\""), "username not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("password=\"" +
JAAS_MASKED_PASSWORD + "\""), "password not present in new property or value
doesn't match");
+ assertJaaSConfigLoadable(newPropertyValue);
+ }
+ }
+
private void assertJaaSConfigLoadable(String jaasConfig) {
// Ensure that JaaS config can be loaded
Map<String, Password> jaasConfigs = new HashMap<>();
@@ -273,6 +373,27 @@ public class KafkaUtilsTest {
}
}
+ protected void setupCredentials() throws Exception {
+ jksPath = new
Path(Files.createTempDirectory("tempproviders").toString(), "kafka.jceks");
+ providerUrl = JavaKeyStoreProvider.SCHEME_NAME + "://file/" +
jksPath.toUri();
+ org.apache.hadoop.conf.Configuration conf = new
org.apache.hadoop.conf.Configuration(false);
+
+ conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
providerUrl);
+
+ CredentialProvider provider =
CredentialProviderFactory.getProviders(conf).get(0);
+
+ // create new aliases
+ try {
+
provider.createCredentialEntry("atlas.jaas.KafkaClient.option.password",
JAAS_MASKED_PASSWORD.toCharArray());
-}
+ // write out so that it can be found in checks
+ provider.flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+
+}