This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0685e8bbebee [HUDI-9757] Support TLS authentication for Datahub sync
(#13861)
0685e8bbebee is described below
commit 0685e8bbebeee0fdecce6bf41da2b6136dc37c2e
Author: tiennguyen-onehouse <[email protected]>
AuthorDate: Tue Sep 23 06:49:50 2025 -0700
[HUDI-9757] Support TLS authentication for Datahub sync (#13861)
---
.../sync/datahub/config/DataHubSyncConfig.java | 72 +++-
.../config/TlsEnabledDataHubEmitterSupplier.java | 176 ++++++++
.../sync/datahub/config/TestDataHubSyncConfig.java | 88 ++++
.../TestTlsEnabledDataHubEmitterSupplier.java | 460 +++++++++++++++++++++
.../src/test/resources/multi-ca-cert.pem | 60 +++
.../src/test/resources/test-ca-cert.pem | 21 +
.../src/test/resources/test-keystore.p12 | Bin 0 -> 2706 bytes
.../src/test/resources/test-truststore.p12 | Bin 0 -> 2706 bytes
8 files changed, 876 insertions(+), 1 deletion(-)
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
index d4fc7bdf4388..b235bb5681eb 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
@@ -136,6 +136,47 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
.markAdvanced()
.withDocumentation("The name of the destination table that we should
sync the hudi table to.");
+ // TLS Configuration Properties
+ public static final ConfigProperty<String>
META_SYNC_DATAHUB_TLS_CA_CERT_PATH = ConfigProperty
+ .key("hoodie.meta.sync.datahub.tls.ca.cert.path")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Path to the CA certificate file for TLS
verification. "
+ + "Used when connecting to DataHub over HTTPS with custom CA
certificates.");
+
+ public static final ConfigProperty<String>
META_SYNC_DATAHUB_TLS_KEYSTORE_PATH = ConfigProperty
+ .key("hoodie.meta.sync.datahub.tls.keystore.path")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Path to the keystore file for TLS client
authentication. "
+ + "Used when connecting to DataHub over HTTPS with mutual TLS
authentication.");
+
+ public static final ConfigProperty<String>
META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD = ConfigProperty
+ .key("hoodie.meta.sync.datahub.tls.keystore.password")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Password for the keystore file. Optional but
recommended for security. "
+ + "If not provided, an empty password will be used.");
+
+ public static final ConfigProperty<String>
META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH = ConfigProperty
+ .key("hoodie.meta.sync.datahub.tls.truststore.path")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Path to the truststore file for TLS server
verification. "
+ + "Alternative to CA certificate file for trust management.");
+
+ public static final ConfigProperty<String>
META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD = ConfigProperty
+ .key("hoodie.meta.sync.datahub.tls.truststore.password")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Password for the truststore file. Optional but
recommended for security. "
+ + "If not provided, an empty password will be used.");
+
public DataHubSyncConfig(Properties props) {
super(props);
// Log warning if the domain identifier is provided but is not in urn form
@@ -156,7 +197,16 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
public RestEmitter getRestEmitter() {
if (contains(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS)) {
- return ((DataHubEmitterSupplier)
ReflectionUtils.loadClass(getString(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS))).get();
+ String supplierClass =
getString(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS);
+
+ // Check if the supplier has a constructor that takes TypedProperties
+ if (ReflectionUtils.hasConstructor(supplierClass, new Class<?>[]
{TypedProperties.class})) {
+ return ((DataHubEmitterSupplier)
ReflectionUtils.loadClass(supplierClass,
+ new Class<?>[] {TypedProperties.class}, props)).get();
+ } else {
+ // Fall back to no-arg constructor for backward compatibility
+ return ((DataHubEmitterSupplier)
ReflectionUtils.loadClass(supplierClass)).get();
+ }
} else if (contains(META_SYNC_DATAHUB_EMITTER_SERVER)) {
return RestEmitter.create(b ->
b.server(getString(META_SYNC_DATAHUB_EMITTER_SERVER)).token(getStringOrDefault(META_SYNC_DATAHUB_EMITTER_TOKEN,
null)));
} else {
@@ -197,6 +247,21 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
@Parameter(names = {"--emitter-supplier-class"}, description = "Pluggable
class to supply a DataHub REST emitter to connect to the DataHub instance. This
overwrites other emitter configs.")
public String emitterSupplierClass;
+ @Parameter(names = {"--tls-ca-cert-path"}, description = "Path to the CA
certificate file for TLS verification when connecting to DataHub over HTTPS
with custom CA certificates.")
+ public String tlsCaCertPath;
+
+ @Parameter(names = {"--tls-keystore-path"}, description = "Path to the
keystore file for TLS client authentication.")
+ public String tlsKeystorePath;
+
+ @Parameter(names = {"--tls-keystore-password"}, description = "Password
for the keystore file. Optional but recommended for security.")
+ public String tlsKeystorePassword;
+
+ @Parameter(names = {"--tls-truststore-path"}, description = "Path to the
truststore file for TLS server verification.")
+ public String tlsTruststorePath;
+
+ @Parameter(names = {"--tls-truststore-password"}, description = "Password
for the truststore file. Optional but recommended for security.")
+ public String tlsTruststorePassword;
+
@Parameter(names = {"--data-platform-name"}, description = "String used to
represent Hudi when creating its "
+ "corresponding DataPlatform entity within Datahub")
public String dataPlatformName;
@@ -232,6 +297,11 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
emitterServer);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_TOKEN.key(),
emitterToken);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(),
emitterSupplierClass);
+ props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(),
tlsCaCertPath);
+ props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(),
tlsKeystorePath);
+
props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(),
tlsKeystorePassword);
+ props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(),
tlsTruststorePath);
+
props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(),
tlsTruststorePassword);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(),
dataPlatformName);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(),
dataPlatformInstanceName);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(),
datasetEnv);
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
new file mode 100644
index 000000000000..042182fd68c7
--- /dev/null
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub.config;
+
+import datahub.client.rest.RestEmitter;
+import datahub.shaded.org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import
datahub.shaded.org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.sync.datahub.HoodieDataHubSyncException;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.FileInputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.Collection;
+
+/**
+ * Custom DataHub emitter supplier that supports TLS configuration with CA
certificates.
+ * This class reads TLS configuration from Hudi properties and creates a
RestEmitter
+ * with proper SSL/TLS context for secure communication with DataHub servers.
+ */
+public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TlsEnabledDataHubEmitterSupplier.class);
+
+ private final TypedProperties config;
+
+ public TlsEnabledDataHubEmitterSupplier(TypedProperties config) {
+ this.config = config;
+ }
+
+ @Override
+ public RestEmitter get() {
+ try {
+ String serverUrl = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER, true);
+ if (serverUrl == null || serverUrl.isEmpty()) {
+ throw new IllegalArgumentException(
+ "DataHub server URL must be specified with " +
DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key());
+ }
+
+ String token = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_TOKEN, true);
+ String caCertPath = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_CA_CERT_PATH, true);
+ String keystorePath = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PATH, true);
+ String keystorePassword = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD, true);
+ String truststorePath = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH, true);
+ String truststorePassword = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD, true);
+
+ LOG.info("Creating DataHub RestEmitter with TLS configuration for
server: {}", serverUrl);
+
+ return RestEmitter.create(builder -> {
+ builder.server(serverUrl);
+
+ if (token != null && !token.isEmpty()) {
+ builder.token(token);
+ }
+
+ // Configure TLS/SSL context if any TLS configuration is provided
+ if (hasTlsConfiguration(caCertPath, keystorePath, truststorePath)) {
+ LOG.info("Configuring TLS for DataHub connection");
+ SSLContext sslContext = createSSLContext(caCertPath, keystorePath,
keystorePassword, truststorePath, truststorePassword);
+
+ builder.customizeHttpAsyncClient(httpClientBuilder -> {
+ ClientTlsStrategyBuilder tlsStrategyBuilder =
ClientTlsStrategyBuilder.create();
+ tlsStrategyBuilder.setSslContext(sslContext);
+
+ PoolingAsyncClientConnectionManagerBuilder
connectionManagerBuilder =
+ PoolingAsyncClientConnectionManagerBuilder.create();
+
connectionManagerBuilder.setTlsStrategy(tlsStrategyBuilder.build());
+
+
httpClientBuilder.setConnectionManager(connectionManagerBuilder.build());
+ });
+ LOG.info("Successfully configured TLS for DataHub connection");
+ }
+ });
+ } catch (Exception e) {
+ throw new HoodieDataHubSyncException("Failed to create TLS-enabled
DataHub emitter", e);
+ }
+ }
+
+ private boolean hasTlsConfiguration(String caCertPath, String keystorePath,
String truststorePath) {
+ return (caCertPath != null && !caCertPath.isEmpty())
+ || (keystorePath != null && !keystorePath.isEmpty())
+ || (truststorePath != null && !truststorePath.isEmpty());
+ }
+
+ private SSLContext createSSLContext(String caCertPath, String keystorePath,
String keystorePassword,
+ String truststorePath, String
truststorePassword) throws HoodieDataHubSyncException {
+ try {
+ SSLContextBuilder sslContextBuilder = SSLContexts.custom();
+
+ // Configure client keystore for mutual TLS authentication
+ if (keystorePath != null && !keystorePath.isEmpty()) {
+ if (!Files.exists(Paths.get(keystorePath))) {
+ throw new HoodieDataHubSyncException("Keystore file not found: " +
keystorePath);
+ }
+ if (keystorePassword == null || keystorePassword.isEmpty()) {
+ LOG.warn("No password provided for keystore {}. Using empty password
- consider using password-protected keystores for better security.",
keystorePath);
+ }
+ LOG.info("Loading keystore from: {}", keystorePath);
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ char[] keystorePasswordChars = (keystorePassword != null &&
!keystorePassword.isEmpty())
+ ? keystorePassword.toCharArray() : new char[0];
+ try (FileInputStream keystoreInputStream = new
FileInputStream(keystorePath)) {
+ keyStore.load(keystoreInputStream, keystorePasswordChars);
+ }
+ sslContextBuilder.loadKeyMaterial(keyStore, keystorePasswordChars);
+ }
+
+ // Configure truststore or CA certificate for server certificate
verification
+ if (truststorePath != null && !truststorePath.isEmpty()) {
+ if (!Files.exists(Paths.get(truststorePath))) {
+ throw new HoodieDataHubSyncException("Truststore file not found: " +
truststorePath);
+ }
+ if (truststorePassword == null || truststorePassword.isEmpty()) {
+ LOG.warn("No password provided for truststore {}. Using empty
password - consider using password-protected truststores for better security.",
truststorePath);
+ }
+ LOG.info("Loading truststore from: {}", truststorePath);
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ char[] truststorePasswordChars = (truststorePassword != null &&
!truststorePassword.isEmpty())
+ ? truststorePassword.toCharArray() : new char[0];
+ try (FileInputStream trustStoreInputStream = new
FileInputStream(truststorePath)) {
+ trustStore.load(trustStoreInputStream, truststorePasswordChars);
+ }
+ sslContextBuilder.loadTrustMaterial(trustStore, null);
+ } else if (caCertPath != null && !caCertPath.isEmpty()) {
+ if (!Files.exists(Paths.get(caCertPath))) {
+ throw new HoodieDataHubSyncException("CA certificate file not found:
" + caCertPath);
+ }
+ LOG.info("Loading CA certificate from: {}", caCertPath);
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ trustStore.load(null, null);
+
+ CertificateFactory certificateFactory =
CertificateFactory.getInstance("X.509");
+ try (FileInputStream certInputStream = new
FileInputStream(caCertPath)) {
+ Collection<? extends Certificate> caCerts =
certificateFactory.generateCertificates(certInputStream);
+ int certIndex = 0;
+ for (Certificate caCert : caCerts) {
+ trustStore.setCertificateEntry("ca-cert-" + certIndex, caCert);
+ certIndex++;
+ }
+ LOG.info("Loaded {} CA certificate(s) from: {}", caCerts.size(),
caCertPath);
+ }
+ sslContextBuilder.loadTrustMaterial(trustStore, null);
+ }
+
+ return sslContextBuilder.build();
+ } catch (Exception e) {
+ throw new HoodieDataHubSyncException("Failed to create SSL context with
TLS configuration", e);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
index 7c33d3f0dd0d..50d179b6f986 100644
---
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
+++
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
@@ -22,9 +22,16 @@ package org.apache.hudi.sync.datahub.config;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DatasetUrn;
import datahub.client.rest.RestEmitter;
+import org.apache.hudi.sync.datahub.HoodieDataHubSyncException;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Objects;
import java.util.Properties;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
@@ -33,8 +40,11 @@ import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DA
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TABLE_NAME;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PATH;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
class TestDataHubSyncConfig {
@@ -91,6 +101,84 @@ class TestDataHubSyncConfig {
assertEquals(FabricType.PROD, datasetUrn.getOriginEntity());
}
+ @Test
+ void testGetEmitterWithTlsEnabledSupplier(@TempDir Path tempDir) throws
IOException {
+ String testServerUrl = "https://datahub.example.com";
+ String testToken = "test-token-123";
+ Path caCertPath = tempDir.resolve("ca-cert.pem");
+
+ try (InputStream certStream =
getClass().getClassLoader().getResourceAsStream("test-ca-cert.pem")) {
+ Objects.requireNonNull(certStream, "test-ca-cert.pem not found in
resources");
+ Files.copy(certStream, caCertPath);
+ }
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(),
TlsEnabledDataHubEmitterSupplier.class.getName());
+
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key(),
testServerUrl);
+ props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_TOKEN.key(),
testToken);
+
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(),
caCertPath.toString());
+
+ DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+ RestEmitter emitter = syncConfig.getRestEmitter();
+ assertNotNull(emitter);
+ }
+
+ @Test
+ void testGetEmitterWithTlsEnabledSupplierWithoutCert() {
+ String testServerUrl = "https://datahub.example.com";
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(),
TlsEnabledDataHubEmitterSupplier.class.getName());
+
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key(),
testServerUrl);
+
+ DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+ RestEmitter emitter = syncConfig.getRestEmitter();
+ assertNotNull(emitter);
+ }
+
+ @Test
+ void testGetEmitterWithTlsEnabledSupplierMissingServer() {
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(),
TlsEnabledDataHubEmitterSupplier.class.getName());
+
+ DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+ assertThrows(HoodieDataHubSyncException.class, syncConfig::getRestEmitter);
+ }
+
+ @Test
+ void testGetEmitterWithTlsEnabledSupplierInvalidCertPath() {
+ String testServerUrl = "https://datahub.example.com";
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(),
TlsEnabledDataHubEmitterSupplier.class.getName());
+
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key(),
testServerUrl);
+
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(),
"/nonexistent/path/cert.pem");
+
+ DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+ assertThrows(HoodieDataHubSyncException.class, syncConfig::getRestEmitter);
+ }
+
+ @Test
+ void testGetEmitterWithTlsEnabledSupplierKeystore(@TempDir Path tempDir)
throws IOException {
+ String testServerUrl = "https://datahub.example.com";
+ Path keystorePath = tempDir.resolve("test-keystore.p12");
+
+ try (InputStream keystoreStream =
getClass().getClassLoader().getResourceAsStream("test-keystore.p12")) {
+ Objects.requireNonNull(keystoreStream, "test-keystore.p12 not found in
resources");
+ Files.copy(keystoreStream, keystorePath);
+ }
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(),
TlsEnabledDataHubEmitterSupplier.class.getName());
+
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key(),
testServerUrl);
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(),
keystorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(),
"testpass");
+
+ DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+ RestEmitter emitter = syncConfig.getRestEmitter();
+ assertNotNull(emitter);
+ }
+
public static class DummySupplier implements DataHubEmitterSupplier {
@Override
diff --git
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestTlsEnabledDataHubEmitterSupplier.java
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestTlsEnabledDataHubEmitterSupplier.java
new file mode 100644
index 000000000000..af8e325106aa
--- /dev/null
+++
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestTlsEnabledDataHubEmitterSupplier.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub.config;
+
+import datahub.client.rest.RestEmitter;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.sync.datahub.HoodieDataHubSyncException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.util.Objects;
+import java.util.Properties;
+
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_TOKEN;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_CA_CERT_PATH;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PATH;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class TestTlsEnabledDataHubEmitterSupplier {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ void testEmitterCreationWithBasicConfig() {
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_TOKEN.key(), "test-token");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created successfully");
+ }
+
+ @Test
+ void testEmitterCreationWithCACertificate() throws Exception {
+ // Create a dummy CA certificate file for testing
+ Path caCertPath = createDummyCertificateFile();
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(),
caCertPath.toString());
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created with CA certificate");
+ }
+
+ @Test
+ void testEmitterCreationWithMultipleCACertificates() throws Exception {
+ // Load a PEM file with multiple certificates from resources
+ Path multiCertPath = copyMultipleCertificateFromResources();
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(),
multiCertPath.toString());
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created with multiple CA
certificates");
+ }
+
+ @Test
+ void testEmitterCreationFailsWithoutServerUrl() {
+ Properties props = new Properties();
+ // No server URL provided
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ assertThrows(HoodieDataHubSyncException.class, supplier::get,
+ "Should throw exception when server URL is not provided");
+ }
+
+ @Test
+ void testEmitterCreationFailsWithInvalidCACertPath() {
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(),
"/non/existent/path/cert.pem");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ // Should throw exception when CA cert path is invalid
+ assertThrows(HoodieDataHubSyncException.class, supplier::get,
+ "Should throw HoodieDataHubSyncException when CA certificate file
doesn't exist");
+ }
+
+ @Test
+ void testEmitterCreationWithKeystore() throws Exception {
+ Path keystorePath = copyKeystoreFromResources("test-keystore.p12");
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(),
keystorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(),
"testpass");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created with keystore");
+ }
+
+ @Test
+ void testEmitterCreationWithTruststore() throws Exception {
+ Path truststorePath = copyKeystoreFromResources("test-truststore.p12");
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(),
truststorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(),
"testpass");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created with truststore");
+ }
+
+ @Test
+ void testEmitterCreationWithKeystoreAndTruststore() throws Exception {
+ Path keystorePath = copyKeystoreFromResources("test-keystore.p12");
+ Path truststorePath = copyKeystoreFromResources("test-truststore.p12");
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(),
keystorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(),
"testpass");
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(),
truststorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(),
"testpass");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created with both keystore and
truststore");
+ }
+
+ @Test
+ void testEmitterCreationWithKeystoreWithoutPassword() throws Exception {
+ // Create a password-less keystore for testing
+ Path keystorePath = createPasswordlessKeystore();
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(),
keystorePath.toString());
+ // No password provided - should work with warning
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created with password-less
keystore");
+ }
+
+ @Test
+ void testEmitterCreationFailsWithInvalidKeystorePath() {
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(),
"/non/existent/keystore.p12");
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(),
"testpass");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ assertThrows(HoodieDataHubSyncException.class, supplier::get,
+ "Should throw exception when keystore file doesn't exist");
+ }
+
+ private Path copyKeystoreFromResources(String resourceName) throws Exception
{
+ Path keystorePath = tempDir.resolve(resourceName);
+
+ try (InputStream keystoreStream =
getClass().getClassLoader().getResourceAsStream(resourceName)) {
+ Objects.requireNonNull(keystoreStream, resourceName + " not found in
resources");
+ Files.copy(keystoreStream, keystorePath);
+ }
+
+ return keystorePath;
+ }
+
+ @Test
+ void testEmitterCreationFailsWithKeystoreWrongPassword() throws Exception {
+ Path keystorePath = copyKeystoreFromResources("test-keystore.p12");
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(),
keystorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(),
"wrongpassword");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ assertThrows(HoodieDataHubSyncException.class, supplier::get,
+ "Should throw exception when keystore password is incorrect");
+ }
+
+ @Test
+ void testEmitterCreationFailsWithTruststoreWrongPassword() throws Exception {
+ Path truststorePath = copyKeystoreFromResources("test-truststore.p12");
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(),
truststorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(),
"wrongpassword");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ assertThrows(HoodieDataHubSyncException.class, supplier::get,
+ "Should throw exception when truststore password is incorrect");
+ }
+
+ @Test
+ void testEmitterCreationWithTruststoreWithoutPassword() throws Exception {
+ // Create a password-less truststore for testing
+ Path truststorePath = createPasswordlessTruststore();
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(),
truststorePath.toString());
+ // No password provided - should work with warning
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created with password-less
truststore");
+ }
+
+ @Test
+ void testEmitterCreationWithCACertAndTruststore() throws Exception {
+ // When both CA cert and truststore are provided, truststore should take
precedence
+ Path caCertPath = createDummyCertificateFile();
+ Path truststorePath = copyKeystoreFromResources("test-truststore.p12");
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(),
caCertPath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(),
truststorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(),
"testpass");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created with truststore taking
precedence over CA cert");
+ }
+
+ @Test
+ void testEmitterCreationWithEmptyTlsPaths() {
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), ""); // Empty
string
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), ""); // Empty
string
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ // Should work without TLS configuration when paths are empty
+ RestEmitter emitter = supplier.get();
+ assertNotNull(emitter, "Emitter should be created without TLS when paths
are empty");
+ }
+
+ @Test
+ void testEmitterCreationFailsWithInvalidTruststorePath() {
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(),
"/non/existent/truststore.p12");
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(),
"testpass");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ assertThrows(HoodieDataHubSyncException.class, supplier::get,
+ "Should throw exception when truststore file doesn't exist");
+ }
+
+ @Test
+ void testEmitterCreationFailsWithInvalidKeystoreFormat() throws Exception {
+ // Create an invalid keystore file (just text content)
+ Path invalidKeystorePath = tempDir.resolve("invalid-keystore.p12");
+ Files.write(invalidKeystorePath, "This is not a valid keystore
file".getBytes());
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(),
invalidKeystorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(),
"testpass");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ assertThrows(HoodieDataHubSyncException.class, supplier::get,
+ "Should throw exception when keystore file format is invalid");
+ }
+
+ @Test
+ void testEmitterCreationFailsWithInvalidTruststoreFormat() throws Exception {
+ // Create an invalid truststore file (just text content)
+ Path invalidTruststorePath = tempDir.resolve("invalid-truststore.p12");
+ Files.write(invalidTruststorePath, "This is not a valid truststore
file".getBytes());
+
+ Properties props = new Properties();
+ props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(),
"https://datahub.example.com:8080");
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(),
invalidTruststorePath.toString());
+ props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(),
"testpass");
+
+ TypedProperties typedProps = new TypedProperties();
+ TypedProperties.putAll(typedProps, props);
+ TlsEnabledDataHubEmitterSupplier supplier = new
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+ assertThrows(HoodieDataHubSyncException.class, supplier::get,
+ "Should throw exception when truststore file format is invalid");
+ }
+
+ private Path createPasswordlessKeystore() throws Exception {
+ Path keystorePath = tempDir.resolve("passwordless-keystore.p12");
+
+ // Generate a keystore without password using keytool command
+ ProcessBuilder pb = new ProcessBuilder(
+ "keytool", "-genkeypair", "-keyalg", "RSA", "-keysize", "2048",
+ "-storetype", "PKCS12", "-keystore", keystorePath.toString(),
+ "-validity", "365", "-storepass", "", "-keypass", "",
+ "-dname", "CN=test-passwordless,O=Apache Hudi,L=Test,ST=Test,C=US",
+ "-alias", "test-passwordless"
+ );
+
+ Process process = pb.start();
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ // Fallback: create a simple keystore programmatically
+ return createSimplePasswordlessKeystore();
+ }
+
+ return keystorePath;
+ }
+
+ private Path createPasswordlessTruststore() throws Exception {
+ Path truststorePath = tempDir.resolve("passwordless-truststore.p12");
+
+ // Generate a truststore without password using keytool command
+ ProcessBuilder pb = new ProcessBuilder(
+ "keytool", "-genkeypair", "-keyalg", "RSA", "-keysize", "2048",
+ "-storetype", "PKCS12", "-keystore", truststorePath.toString(),
+ "-validity", "365", "-storepass", "", "-keypass", "",
+ "-dname", "CN=test-truststore-passwordless,O=Apache
Hudi,L=Test,ST=Test,C=US",
+ "-alias", "test-truststore-passwordless"
+ );
+
+ Process process = pb.start();
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ // Fallback: create a simple truststore programmatically
+ return createSimplePasswordlessTruststore();
+ }
+
+ return truststorePath;
+ }
+
+ private Path createSimplePasswordlessKeystore() throws Exception {
+ Path keystorePath = tempDir.resolve("simple-passwordless-keystore.p12");
+
+ // Create a simple keystore without password programmatically
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ keyStore.load(null, null); // Initialize empty keystore
+
+ try (FileOutputStream fos = new FileOutputStream(keystorePath.toFile())) {
+ keyStore.store(fos, new char[0]); // Store with empty password
+ }
+
+ return keystorePath;
+ }
+
+ private Path createSimplePasswordlessTruststore() throws Exception {
+ Path truststorePath =
tempDir.resolve("simple-passwordless-truststore.p12");
+
+ // Create a simple truststore without password programmatically
+ KeyStore trustStore = KeyStore.getInstance("PKCS12");
+ trustStore.load(null, null); // Initialize empty truststore
+
+ try (FileOutputStream fos = new FileOutputStream(truststorePath.toFile()))
{
+ trustStore.store(fos, new char[0]); // Store with empty password
+ }
+
+ return truststorePath;
+ }
+
+ private Path createDummyCertificateFile() throws Exception {
+ Path certPath = tempDir.resolve("ca-cert.pem");
+
+ try (InputStream certStream =
getClass().getClassLoader().getResourceAsStream("test-ca-cert.pem")) {
+ Objects.requireNonNull(certStream, "test-ca-cert.pem not found in
resources");
+ Files.copy(certStream, certPath);
+ }
+
+ return certPath;
+ }
+
+ private Path copyMultipleCertificateFromResources() throws Exception {
+ Path multiCertPath = tempDir.resolve("multi-ca-cert.pem");
+
+ try (InputStream certStream =
getClass().getClassLoader().getResourceAsStream("multi-ca-cert.pem")) {
+ Objects.requireNonNull(certStream, "multi-ca-cert.pem not found in
resources");
+ Files.copy(certStream, multiCertPath);
+ }
+
+ return multiCertPath;
+ }
+
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/test/resources/multi-ca-cert.pem
b/hudi-sync/hudi-datahub-sync/src/test/resources/multi-ca-cert.pem
new file mode 100644
index 000000000000..08d0849467d1
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/resources/multi-ca-cert.pem
@@ -0,0 +1,60 @@
+-----BEGIN CERTIFICATE-----
+MIIDTzCCAjegAwIBAgIUJMP4cxtT65p8L9Wt1HXEo8sgA7UwDQYJKoZIhvcNAQEL
+BQAwNzESMBAGA1UEAwwJVGVzdCBDQSAxMRQwEgYDVQQKDAtBcGFjaGUgSHVkaTEL
+MAkGA1UEBhMCVVMwHhcNMjUwOTA4MTc0OTE2WhcNMjYwOTA4MTc0OTE2WjA3MRIw
+EAYDVQQDDAlUZXN0IENBIDExFDASBgNVBAoMC0FwYWNoZSBIdWRpMQswCQYDVQQG
+EwJVUzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANZL4s56/5fs4wzq
++iZ6iUUVxb3ZcviDCNqpcfRYjNGjA332EffIvsPbemwUqm7At85tZFFd3zFaafzX
+PwEI1yC92N58isTw903eFRHD4FzXHzo6v+Rb5adEWTTabH1AoARs/UaQh6OzM11w
+T04hkJbd8QIgLxnS+YiCw7mYK3Lz9PcicbeajUAf/CbS2nw+i5GtieDp5DFM2mNI
+XWuvCr+8VKXp4ZcycBkWCjEuF+mpEVwkMq9mHqKVUq62APC7Zuu3MWhJJ3fBqUuQ
+cIElv2X8BU5CvzEjvRImLTc4iPEvci4g6wYRbSFf8bnJ0LJGK4fBf70FTfmMaws
+MysTszxUCAwEAAaNTMFEwHQYDVR0OBBYEFDR1Nl4BFM52C2aRmwhhVuzv+UW9MB8G
+A1UdIwQYMBaAFDR1Nl4BFM52C2aRmwhhVuzv+UW9MA8GA1UdEwEB/wQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAERhaw8v5YSyMJeCuXi7G15sxa54zQ12pqA38nru
+EHz+cjVwcB/bE4mcHIjH6JyEOpsQVsbocxojpEPPLEiiozCimdwHOHglyncACDlu
+TZsPkE9CMeeaUgMbCC67imVxiR4nGvxSVHBlPfw4gXXdExaMFwlHXNWUDCQXIjpi
++U832P7IxnW4mUGSPTrIspGtOV01R83niErec52yqGYz20JrUw0VI3tN3r+mB7pt
+ApyjxhsGld6hD8P9T1obDLiJVJMhzn9sYHY4Z54rQ22hW1ZU3nLJunMo9wfajsfu
+ZJqrDOGxh7UyAP6RmJExcyyCk5/3B21HqLdpQ8BNodDTtx0=
+-----END CERTIFICATE-----
+-----BEGIN CERTIFICATE-----
+MIIDTzCCAjegAwIBAgIUIC9NlvOG6qYhOgk8Jpu+45cmRL4wDQYJKoZIhvcNAQEL
+BQAwNzESMBAGA1UEAwwJVGVzdCBDQSAyMRQwEgYDVQQKDAtBcGFjaGUgSHVkaTEL
+MAkGA1UEBhMCVVMwHhcNMjUwOTA4MTc0OTIwWhcNMjYwOTA4MTc0OTIwWjA3MRIw
+EAYDVQQDDAlUZXN0IENBIDIxFDASBgNVBAoMC0FwYWNoZSBIdWRpMQswCQYDVQQG
+EwJVUzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKPPinO3IjngBkXy
+vnn8ERktJ9cV5pRKU3edtvTt1TpFVflh6hve+Q7wom5hGF7omKRiyGnDn0t5jhAN
+aIqxuoWYqLmIdw9Oydr2YHaebHna9TX4YgZQXkF4rzW1CluDfp6SorayAGpeYSzz
+CDak3f/mS+NnUwRP5BVbyidwZFISNdqXh4loz4sRNnbZldnVE1edIonMlsjVfFwp
+zULHwizAJkkvj/uAMJvZzbm+0xQsi6vjPCDpjTVbIBAhE1LESy61Tqr24n6EoHQD
+W8pXJeHYMXs8NxzFeLZPzwB3e/9VjpE+Unz4QmpJhc6AHWv4hvMC8h1qqn06z7MO
+Ng43tPsCAwEAAaNTMFEwHQYDVR0OBBYEFDgOAcZDctliZQDkDZ6VtThD8hP7MB8G
+A1UdIwQYMBaAFDgOAcZDctliZQDkDZ6VtThD8hP7MA8GA1UdEwEB/wQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAEyPsgQ9mgaqhc2NRu34VZ7jxN+R7xG+lqEW1eE1
+/iFDKIFI00uiKdXcfESTwxFfq3l4+0C+cr7RgVc6ENwlxZ3WveZQQSfO1gQ7xs7y
+KuibIv59bD91dmsWAG2v7wA04NTRqcsU8V31iJ/SAHoo/3ZJFF+MafF0kgPYKgI+
+rHC68PWs8LcvRHFmtjXwz0/ttZCo4I9Uh6c5kq7TN+od7vbR4z/FEMTqBfTLjJ9y
+wC4M2fJKzZFSIXzK/UTLVNP1JBOTGTuFqgrJzyuf50844BHWCv4YzKZRD+jE1M2F
+rbjlt36sZZXoQh7DiItqiFMgNri9xRQd2kNd/BCupbAzcBY=
+-----END CERTIFICATE-----
+-----BEGIN CERTIFICATE-----
+MIIDTzCCAjegAwIBAgIUGc1b28jv8DlS+LR0xmaso4t8kXwwDQYJKoZIhvcNAQEL
+BQAwNzESMBAGA1UEAwwJVGVzdCBDQSAzMRQwEgYDVQQKDAtBcGFjaGUgSHVkaTEL
+MAkGA1UEBhMCVVMwHhcNMjUwOTA4MTc0OTI1WhcNMjYwOTA4MTc0OTI1WjA3MRIw
+EAYDVQQDDAlUZXN0IENBIDMxFDASBgNVBAoMC0FwYWNoZSBIdWRpMQswCQYDVQQG
+EwJVUzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMMvjM9TQ+JgDqoN
+2YtTo9IsjCUbec0CjadxTbKsNaDZlF/6Jqtq5fk3YeOZZxTcbM1spNRDlbAx7ldi
+K6x5a2Wu+T+Ml1L2ODu1K1yxVKS5+U3GBiBkqwsqqeMrZOKXcwOWr62CEe2lNV4D
+XeD1JoN90e6NX9JDu2yKRXKj8wS9UmCdspbJZHTbEkuJV8I/WA8cp5niFEOTojQ/
+BBHOLwEym/2AXxuRam6JxDLlZAALqdP9AW8mHuGi5PKJ+CKxdO9nddpdPvCNG1QH
+X1rRJ365LfZRA4B8v/D9YYQvcdbbBSXgW6eBfNhKXmZqFyoGu6qbcRZHneyEXIRl
+IfhYDe8CAwEAAaNTMFEwHQYDVR0OBBYEFIK2pasMLnLzFLVJQ2elI4Ew+QYFMB8G
+A1UdIwQYMBaAFIK2pasMLnLzFLVJQ2elI4Ew+QYFMA8GA1UdEwEB/wQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAAtw6JBdFlC+m16S3RlWs1OaHtKY6Ju9Mc48aWBX
+kZEEbnp/H9oJ791Yxut4t6II5Eeo5Qx53ujjKTLIP2pENaBoWUdDpvDRmwbSzU42
+MZjqerK7ABsRYa9Q2HNj20UuazRLd7/f0Lc3wPv7iMo71+xjt36KIKZI9Cj5R2GM
+OEF0t1A2chmf+NKEnnVyx8uM3eaGnBZpCOcoc/kNJRP56gXOa/Ebx48zvyXLV0Ef
+fU5bFtngc7GjMW9u5AWvE5vKNxddUqGWV4XWrlLsQsf30yqVvFn0lRIxMNDMCukn
+2ZGM83W3RozxqZNpbZzw7UIwhA9EvOxhicqYFJ+nOIsaVEY=
+-----END CERTIFICATE-----
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/test/resources/test-ca-cert.pem
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-ca-cert.pem
new file mode 100644
index 000000000000..4a8a3cc68e3d
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/resources/test-ca-cert.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDazCCAlOgAwIBAgIUFtGMq5vPqvNXGDCH2rJ5n0OhLuQwDQYJKoZIhvcNAQEL
+BQAwRTELMAkGA1UEBhMCVVMxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNDAxMDEwMDAwMDBaFw0yNTAx
+MDEwMDAwMDBaMEUxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
+HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
+AQUAA4IBDwAwggEKAoIBAQC7bthgxE4i9lRaKov+kFVWy8Bj2w7V1EhGNxUe7j5O
+HOLx3hE2B1BcSa2no3W7XgJ9cYQ7jVaDj3bPP7vVZKQ4YCqFH9z2l1kZQKjZQ7gU
+FpqRRZHJpYxGKwP7YsBsVz3FJKmUyGP5cFq8mLBIcLa2Y9riYZQz7Wm7VgKzR3eZ
+YxDF9N5PbQnQt4bCFed0TlcX5gYFUqiRW5AUe5mECjH7hcWGaAXQBQKNbLiKVYEq
+jO4Y3F8xZ7CiMUVLkQMjKwYGNP3iG3msFS9bTiLcHwIx2RbFjxGg+gfvZ8EiYpFE
+ZQPvLsbmjqHc5ufhQJ3lfzYF2FjLcVx1VrsVhTQmB9EfAgMBAAGjUzBRMB0GA1Ud
+DgQWBBTWV4Dr2cKLsHhP0Hxc+CmZx2DZBzAfBgNVHSMEGDAWgBTWV4Dr2cKLsHhP
+0Hxc+CmZx2DZBzAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCR
+jf+kzYIFU+3M0hBpVz7cPYlhSI7gWdSIvY5kYhX4vvRBFGJ8OFo9flUd4vvO+GyV
+g+UNmGpgslqYdXuFmC2IftPRgaLpohZWvmR1CYUS3AbcuWUqFKchTHPaJLZTAcNu
+xYrqULYzsQxLb1FcXHoqntQe1YvLlkRRPkS9E3Lf0KO6dOlAwJI6qCvhzvtguinh
+FcTnSsJl6YQF8gMRdGxGPH5OKnpVr6xVkQ0gzc7P5AdVRDkVAjI8S5tgdMjHJB0L
+yN3nwCWMJgJqN5ZkJVz5cYtXdlfJBaDjXRbV+jpOOJRDr6Fy8PQ1EEvhQEoVHQz0
+FKevncF0IEv9ZFHQPTLM
+-----END CERTIFICATE-----
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/test/resources/test-keystore.p12
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-keystore.p12
new file mode 100644
index 000000000000..382210075367
Binary files /dev/null and
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-keystore.p12 differ
diff --git a/hudi-sync/hudi-datahub-sync/src/test/resources/test-truststore.p12
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-truststore.p12
new file mode 100644
index 000000000000..160e2ed588a7
Binary files /dev/null and
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-truststore.p12 differ