This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0685e8bbebee [HUDI-9757] Support TLS authentication for Datahub sync 
(#13861)
0685e8bbebee is described below

commit 0685e8bbebeee0fdecce6bf41da2b6136dc37c2e
Author: tiennguyen-onehouse <[email protected]>
AuthorDate: Tue Sep 23 06:49:50 2025 -0700

    [HUDI-9757] Support TLS authentication for Datahub sync (#13861)
---
 .../sync/datahub/config/DataHubSyncConfig.java     |  72 +++-
 .../config/TlsEnabledDataHubEmitterSupplier.java   | 176 ++++++++
 .../sync/datahub/config/TestDataHubSyncConfig.java |  88 ++++
 .../TestTlsEnabledDataHubEmitterSupplier.java      | 460 +++++++++++++++++++++
 .../src/test/resources/multi-ca-cert.pem           |  60 +++
 .../src/test/resources/test-ca-cert.pem            |  21 +
 .../src/test/resources/test-keystore.p12           | Bin 0 -> 2706 bytes
 .../src/test/resources/test-truststore.p12         | Bin 0 -> 2706 bytes
 8 files changed, 876 insertions(+), 1 deletion(-)

diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
index d4fc7bdf4388..b235bb5681eb 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
@@ -136,6 +136,47 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
       .markAdvanced()
       .withDocumentation("The name of the destination table that we should 
sync the hudi table to.");
 
+  // TLS Configuration Properties
+  public static final ConfigProperty<String> 
META_SYNC_DATAHUB_TLS_CA_CERT_PATH = ConfigProperty
+      .key("hoodie.meta.sync.datahub.tls.ca.cert.path")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Path to the CA certificate file for TLS 
verification. "
+          + "Used when connecting to DataHub over HTTPS with custom CA 
certificates.");
+
+  public static final ConfigProperty<String> 
META_SYNC_DATAHUB_TLS_KEYSTORE_PATH = ConfigProperty
+      .key("hoodie.meta.sync.datahub.tls.keystore.path")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Path to the keystore file for TLS client 
authentication. "
+          + "Used when connecting to DataHub over HTTPS with mutual TLS 
authentication.");
+
+  public static final ConfigProperty<String> 
META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD = ConfigProperty
+      .key("hoodie.meta.sync.datahub.tls.keystore.password")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Password for the keystore file. Optional but 
recommended for security. "
+          + "If not provided, an empty password will be used.");
+
+  public static final ConfigProperty<String> 
META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH = ConfigProperty
+      .key("hoodie.meta.sync.datahub.tls.truststore.path")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Path to the truststore file for TLS server 
verification. "
+          + "Alternative to CA certificate file for trust management.");
+
+  public static final ConfigProperty<String> 
META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD = ConfigProperty
+      .key("hoodie.meta.sync.datahub.tls.truststore.password")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Password for the truststore file. Optional but 
recommended for security. "
+          + "If not provided, an empty password will be used.");
+
   public DataHubSyncConfig(Properties props) {
     super(props);
     // Log warning if the domain identifier is provided but is not in urn form
@@ -156,7 +197,16 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
 
   public RestEmitter getRestEmitter() {
     if (contains(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS)) {
-      return ((DataHubEmitterSupplier) 
ReflectionUtils.loadClass(getString(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS))).get();
+      String supplierClass = 
getString(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS);
+      
+      // Check if the supplier has a constructor that takes TypedProperties
+      if (ReflectionUtils.hasConstructor(supplierClass, new Class<?>[] 
{TypedProperties.class})) {
+        return ((DataHubEmitterSupplier) 
ReflectionUtils.loadClass(supplierClass, 
+            new Class<?>[] {TypedProperties.class}, props)).get();
+      } else {
+        // Fall back to no-arg constructor for backward compatibility
+        return ((DataHubEmitterSupplier) 
ReflectionUtils.loadClass(supplierClass)).get();
+      }
     } else if (contains(META_SYNC_DATAHUB_EMITTER_SERVER)) {
       return RestEmitter.create(b -> 
b.server(getString(META_SYNC_DATAHUB_EMITTER_SERVER)).token(getStringOrDefault(META_SYNC_DATAHUB_EMITTER_TOKEN,
 null)));
     } else {
@@ -197,6 +247,21 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
     @Parameter(names = {"--emitter-supplier-class"}, description = "Pluggable 
class to supply a DataHub REST emitter to connect to the DataHub instance. This 
overwrites other emitter configs.")
     public String emitterSupplierClass;
 
+    @Parameter(names = {"--tls-ca-cert-path"}, description = "Path to the CA 
certificate file for TLS verification when connecting to DataHub over HTTPS 
with custom CA certificates.")
+    public String tlsCaCertPath;
+
+    @Parameter(names = {"--tls-keystore-path"}, description = "Path to the 
keystore file for TLS client authentication.")
+    public String tlsKeystorePath;
+
+    @Parameter(names = {"--tls-keystore-password"}, description = "Password 
for the keystore file. Optional but recommended for security.")
+    public String tlsKeystorePassword;
+
+    @Parameter(names = {"--tls-truststore-path"}, description = "Path to the 
truststore file for TLS server verification.")
+    public String tlsTruststorePath;
+
+    @Parameter(names = {"--tls-truststore-password"}, description = "Password 
for the truststore file. Optional but recommended for security.")
+    public String tlsTruststorePassword;
+
     @Parameter(names = {"--data-platform-name"}, description = "String used to 
represent Hudi when creating its "
         + "corresponding DataPlatform entity within Datahub")
     public String dataPlatformName;
@@ -232,6 +297,11 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
emitterServer);
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_TOKEN.key(), 
emitterToken);
       
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), 
emitterSupplierClass);
+      props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), 
tlsCaCertPath);
+      props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), 
tlsKeystorePath);
+      
props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(), 
tlsKeystorePassword);
+      props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(), 
tlsTruststorePath);
+      
props.setPropertyIfNonNull(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(), 
tlsTruststorePassword);
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), 
dataPlatformName);
       
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(), 
dataPlatformInstanceName);
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(), 
datasetEnv);
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
new file mode 100644
index 000000000000..042182fd68c7
--- /dev/null
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub.config;
+
+import datahub.client.rest.RestEmitter;
+import datahub.shaded.org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import 
datahub.shaded.org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.sync.datahub.HoodieDataHubSyncException;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.FileInputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.Collection;
+
+/**
+ * Custom DataHub emitter supplier that supports TLS configuration with CA 
certificates.
+ * This class reads TLS configuration from Hudi properties and creates a 
RestEmitter
+ * with proper SSL/TLS context for secure communication with DataHub servers.
+ */
+public class TlsEnabledDataHubEmitterSupplier implements 
DataHubEmitterSupplier {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TlsEnabledDataHubEmitterSupplier.class);
+
+  private final TypedProperties config;
+
+  public TlsEnabledDataHubEmitterSupplier(TypedProperties config) {
+    this.config = config;
+  }
+
+  @Override
+  public RestEmitter get() {
+    try {
+      String serverUrl = ConfigUtils.getStringWithAltKeys(config, 
DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER, true);
+      if (serverUrl == null || serverUrl.isEmpty()) {
+        throw new IllegalArgumentException(
+            "DataHub server URL must be specified with " + 
DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key());
+      }
+
+      String token = ConfigUtils.getStringWithAltKeys(config, 
DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_TOKEN, true);
+      String caCertPath = ConfigUtils.getStringWithAltKeys(config, 
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_CA_CERT_PATH, true);
+      String keystorePath = ConfigUtils.getStringWithAltKeys(config, 
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PATH, true);
+      String keystorePassword = ConfigUtils.getStringWithAltKeys(config, 
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD, true);
+      String truststorePath = ConfigUtils.getStringWithAltKeys(config, 
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH, true);
+      String truststorePassword = ConfigUtils.getStringWithAltKeys(config, 
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD, true);
+
+      LOG.info("Creating DataHub RestEmitter with TLS configuration for 
server: {}", serverUrl);
+
+      return RestEmitter.create(builder -> {
+        builder.server(serverUrl);
+        
+        if (token != null && !token.isEmpty()) {
+          builder.token(token);
+        }
+        
+        // Configure TLS/SSL context if any TLS configuration is provided
+        if (hasTlsConfiguration(caCertPath, keystorePath, truststorePath)) {
+          LOG.info("Configuring TLS for DataHub connection");
+          SSLContext sslContext = createSSLContext(caCertPath, keystorePath, 
keystorePassword, truststorePath, truststorePassword);
+          
+          builder.customizeHttpAsyncClient(httpClientBuilder -> {
+            ClientTlsStrategyBuilder tlsStrategyBuilder = 
ClientTlsStrategyBuilder.create();
+            tlsStrategyBuilder.setSslContext(sslContext);
+
+            PoolingAsyncClientConnectionManagerBuilder 
connectionManagerBuilder =
+                PoolingAsyncClientConnectionManagerBuilder.create();
+            
connectionManagerBuilder.setTlsStrategy(tlsStrategyBuilder.build());
+
+            
httpClientBuilder.setConnectionManager(connectionManagerBuilder.build());
+          });
+          LOG.info("Successfully configured TLS for DataHub connection");
+        }
+      });
+    } catch (Exception e) {
+      throw new HoodieDataHubSyncException("Failed to create TLS-enabled 
DataHub emitter", e);
+    }
+  }
+
+  private boolean hasTlsConfiguration(String caCertPath, String keystorePath, 
String truststorePath) {
+    return (caCertPath != null && !caCertPath.isEmpty())
+        || (keystorePath != null && !keystorePath.isEmpty())
+        || (truststorePath != null && !truststorePath.isEmpty());
+  }
+
+  private SSLContext createSSLContext(String caCertPath, String keystorePath, 
String keystorePassword, 
+                                      String truststorePath, String 
truststorePassword) throws HoodieDataHubSyncException {
+    try {
+      SSLContextBuilder sslContextBuilder = SSLContexts.custom();
+
+      // Configure client keystore for mutual TLS authentication
+      if (keystorePath != null && !keystorePath.isEmpty()) {
+        if (!Files.exists(Paths.get(keystorePath))) {
+          throw new HoodieDataHubSyncException("Keystore file not found: " + 
keystorePath);
+        }
+        if (keystorePassword == null || keystorePassword.isEmpty()) {
+          LOG.warn("No password provided for keystore {}. Using empty password 
- consider using password-protected keystores for better security.", 
keystorePath);
+        }
+        LOG.info("Loading keystore from: {}", keystorePath);
+        KeyStore keyStore = KeyStore.getInstance("PKCS12");
+        char[] keystorePasswordChars = (keystorePassword != null && 
!keystorePassword.isEmpty()) 
+            ? keystorePassword.toCharArray() : new char[0];
+        try (FileInputStream keystoreInputStream = new 
FileInputStream(keystorePath)) {
+          keyStore.load(keystoreInputStream, keystorePasswordChars);
+        }
+        sslContextBuilder.loadKeyMaterial(keyStore, keystorePasswordChars);
+      }
+
+      // Configure truststore or CA certificate for server certificate 
verification
+      if (truststorePath != null && !truststorePath.isEmpty()) {
+        if (!Files.exists(Paths.get(truststorePath))) {
+          throw new HoodieDataHubSyncException("Truststore file not found: " + 
truststorePath);
+        }
+        if (truststorePassword == null || truststorePassword.isEmpty()) {
+          LOG.warn("No password provided for truststore {}. Using empty 
password - consider using password-protected truststores for better security.", 
truststorePath);
+        }
+        LOG.info("Loading truststore from: {}", truststorePath);
+        KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+        char[] truststorePasswordChars = (truststorePassword != null && 
!truststorePassword.isEmpty()) 
+            ? truststorePassword.toCharArray() : new char[0];
+        try (FileInputStream trustStoreInputStream = new 
FileInputStream(truststorePath)) {
+          trustStore.load(trustStoreInputStream, truststorePasswordChars);
+        }
+        sslContextBuilder.loadTrustMaterial(trustStore, null);
+      } else if (caCertPath != null && !caCertPath.isEmpty()) {
+        if (!Files.exists(Paths.get(caCertPath))) {
+          throw new HoodieDataHubSyncException("CA certificate file not found: 
" + caCertPath);
+        }
+        LOG.info("Loading CA certificate from: {}", caCertPath);
+        KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+        trustStore.load(null, null);
+
+        CertificateFactory certificateFactory = 
CertificateFactory.getInstance("X.509");
+        try (FileInputStream certInputStream = new 
FileInputStream(caCertPath)) {
+          Collection<? extends Certificate> caCerts = 
certificateFactory.generateCertificates(certInputStream);
+          int certIndex = 0;
+          for (Certificate caCert : caCerts) {
+            trustStore.setCertificateEntry("ca-cert-" + certIndex, caCert);
+            certIndex++;
+          }
+          LOG.info("Loaded {} CA certificate(s) from: {}", caCerts.size(), 
caCertPath);
+        }
+        sslContextBuilder.loadTrustMaterial(trustStore, null);
+      }
+      
+      return sslContextBuilder.build();
+    } catch (Exception e) {
+      throw new HoodieDataHubSyncException("Failed to create SSL context with 
TLS configuration", e);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
index 7c33d3f0dd0d..50d179b6f986 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
@@ -22,9 +22,16 @@ package org.apache.hudi.sync.datahub.config;
 import com.linkedin.common.FabricType;
 import com.linkedin.common.urn.DatasetUrn;
 import datahub.client.rest.RestEmitter;
+import org.apache.hudi.sync.datahub.HoodieDataHubSyncException;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Objects;
 import java.util.Properties;
 
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
@@ -33,8 +40,11 @@ import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DA
 import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS;
 import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS;
 import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TABLE_NAME;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PATH;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 class TestDataHubSyncConfig {
 
@@ -91,6 +101,84 @@ class TestDataHubSyncConfig {
     assertEquals(FabricType.PROD, datasetUrn.getOriginEntity());
   }
 
+  @Test
+  void testGetEmitterWithTlsEnabledSupplier(@TempDir Path tempDir) throws 
IOException {
+    String testServerUrl = "https://datahub.example.com";;
+    String testToken = "test-token-123";
+    Path caCertPath = tempDir.resolve("ca-cert.pem");
+    
+    try (InputStream certStream = 
getClass().getClassLoader().getResourceAsStream("test-ca-cert.pem")) {
+      Objects.requireNonNull(certStream, "test-ca-cert.pem not found in 
resources");
+      Files.copy(certStream, caCertPath);
+    }
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), 
TlsEnabledDataHubEmitterSupplier.class.getName());
+    
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
testServerUrl);
+    props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_TOKEN.key(), 
testToken);
+    
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), 
caCertPath.toString());
+    
+    DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+    RestEmitter emitter = syncConfig.getRestEmitter();
+    assertNotNull(emitter);
+  }
+
+  @Test
+  void testGetEmitterWithTlsEnabledSupplierWithoutCert() {
+    String testServerUrl = "https://datahub.example.com";;
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), 
TlsEnabledDataHubEmitterSupplier.class.getName());
+    
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
testServerUrl);
+    
+    DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+    RestEmitter emitter = syncConfig.getRestEmitter();
+    assertNotNull(emitter);
+  }
+
+  @Test
+  void testGetEmitterWithTlsEnabledSupplierMissingServer() {
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), 
TlsEnabledDataHubEmitterSupplier.class.getName());
+    
+    DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+    assertThrows(HoodieDataHubSyncException.class, syncConfig::getRestEmitter);
+  }
+
+  @Test
+  void testGetEmitterWithTlsEnabledSupplierInvalidCertPath() {
+    String testServerUrl = "https://datahub.example.com";;
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), 
TlsEnabledDataHubEmitterSupplier.class.getName());
+    
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
testServerUrl);
+    
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), 
"/nonexistent/path/cert.pem");
+    
+    DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+    assertThrows(HoodieDataHubSyncException.class, syncConfig::getRestEmitter);
+  }
+
+  @Test
+  void testGetEmitterWithTlsEnabledSupplierKeystore(@TempDir Path tempDir) 
throws IOException {
+    String testServerUrl = "https://datahub.example.com";;
+    Path keystorePath = tempDir.resolve("test-keystore.p12");
+    
+    try (InputStream keystoreStream = 
getClass().getClassLoader().getResourceAsStream("test-keystore.p12")) {
+      Objects.requireNonNull(keystoreStream, "test-keystore.p12 not found in 
resources");
+      Files.copy(keystoreStream, keystorePath);
+    }
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), 
TlsEnabledDataHubEmitterSupplier.class.getName());
+    
props.setProperty(DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
testServerUrl);
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), 
keystorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(), 
"testpass");
+    
+    DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
+    RestEmitter emitter = syncConfig.getRestEmitter();
+    assertNotNull(emitter);
+  }
+
   public static class DummySupplier implements DataHubEmitterSupplier {
 
     @Override
diff --git 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestTlsEnabledDataHubEmitterSupplier.java
 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestTlsEnabledDataHubEmitterSupplier.java
new file mode 100644
index 000000000000..af8e325106aa
--- /dev/null
+++ 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestTlsEnabledDataHubEmitterSupplier.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub.config;
+
+import datahub.client.rest.RestEmitter;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.sync.datahub.HoodieDataHubSyncException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.util.Objects;
+import java.util.Properties;
+
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_SERVER;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_EMITTER_TOKEN;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_CA_CERT_PATH;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PATH;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class TestTlsEnabledDataHubEmitterSupplier {
+
+  @TempDir
+  Path tempDir;
+
+  @Test
+  void testEmitterCreationWithBasicConfig() {
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_TOKEN.key(), "test-token");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created successfully");
+  }
+
+  @Test
+  void testEmitterCreationWithCACertificate() throws Exception {
+    // Create a dummy CA certificate file for testing
+    Path caCertPath = createDummyCertificateFile();
+
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), 
caCertPath.toString());
+
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created with CA certificate");
+  }
+
+  @Test
+  void testEmitterCreationWithMultipleCACertificates() throws Exception {
+    // Load a PEM file with multiple certificates from resources
+    Path multiCertPath = copyMultipleCertificateFromResources();
+
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), 
multiCertPath.toString());
+
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created with multiple CA 
certificates");
+  }
+
+  @Test
+  void testEmitterCreationFailsWithoutServerUrl() {
+    Properties props = new Properties();
+    // No server URL provided
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    assertThrows(HoodieDataHubSyncException.class, supplier::get,
+        "Should throw exception when server URL is not provided");
+  }
+
+  @Test
+  void testEmitterCreationFailsWithInvalidCACertPath() {
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), 
"/non/existent/path/cert.pem");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    // Should throw exception when CA cert path is invalid
+    assertThrows(HoodieDataHubSyncException.class, supplier::get,
+        "Should throw HoodieDataHubSyncException when CA certificate file 
doesn't exist");
+  }
+
+  @Test
+  void testEmitterCreationWithKeystore() throws Exception {
+    Path keystorePath = copyKeystoreFromResources("test-keystore.p12");
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), 
keystorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(), 
"testpass");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created with keystore");
+  }
+
+  @Test
+  void testEmitterCreationWithTruststore() throws Exception {
+    Path truststorePath = copyKeystoreFromResources("test-truststore.p12");
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(), 
truststorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(), 
"testpass");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created with truststore");
+  }
+
+  @Test
+  void testEmitterCreationWithKeystoreAndTruststore() throws Exception {
+    Path keystorePath = copyKeystoreFromResources("test-keystore.p12");
+    Path truststorePath = copyKeystoreFromResources("test-truststore.p12");
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), 
keystorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(), 
"testpass");
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(), 
truststorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(), 
"testpass");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created with both keystore and 
truststore");
+  }
+
+  @Test
+  void testEmitterCreationWithKeystoreWithoutPassword() throws Exception {
+    // Create a password-less keystore for testing
+    Path keystorePath = createPasswordlessKeystore();
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), 
keystorePath.toString());
+    // No password provided - should work with warning
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created with password-less 
keystore");
+  }
+
+  @Test
+  void testEmitterCreationFailsWithInvalidKeystorePath() {
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), 
"/non/existent/keystore.p12");
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(), 
"testpass");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    assertThrows(HoodieDataHubSyncException.class, supplier::get,
+        "Should throw exception when keystore file doesn't exist");
+  }
+
+  private Path copyKeystoreFromResources(String resourceName) throws Exception 
{
+    Path keystorePath = tempDir.resolve(resourceName);
+    
+    try (InputStream keystoreStream = 
getClass().getClassLoader().getResourceAsStream(resourceName)) {
+      Objects.requireNonNull(keystoreStream, resourceName + " not found in 
resources");
+      Files.copy(keystoreStream, keystorePath);
+    }
+    
+    return keystorePath;
+  }
+
+  @Test
+  void testEmitterCreationFailsWithKeystoreWrongPassword() throws Exception {
+    Path keystorePath = copyKeystoreFromResources("test-keystore.p12");
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), 
keystorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(), 
"wrongpassword");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    assertThrows(HoodieDataHubSyncException.class, supplier::get,
+        "Should throw exception when keystore password is incorrect");
+  }
+
+  @Test
+  void testEmitterCreationFailsWithTruststoreWrongPassword() throws Exception {
+    Path truststorePath = copyKeystoreFromResources("test-truststore.p12");
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(), 
truststorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(), 
"wrongpassword");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    assertThrows(HoodieDataHubSyncException.class, supplier::get,
+        "Should throw exception when truststore password is incorrect");
+  }
+
+  @Test
+  void testEmitterCreationWithTruststoreWithoutPassword() throws Exception {
+    // Create a password-less truststore for testing
+    Path truststorePath = createPasswordlessTruststore();
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(), 
truststorePath.toString());
+    // No password provided - should work with warning
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created with password-less 
truststore");
+  }
+
+  @Test
+  void testEmitterCreationWithCACertAndTruststore() throws Exception {
+    // When both CA cert and truststore are provided, truststore should take 
precedence
+    Path caCertPath = createDummyCertificateFile();
+    Path truststorePath = copyKeystoreFromResources("test-truststore.p12");
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), 
caCertPath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(), 
truststorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(), 
"testpass");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created with truststore taking 
precedence over CA cert");
+  }
+
+  @Test
+  void testEmitterCreationWithEmptyTlsPaths() {
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_CA_CERT_PATH.key(), ""); // Empty 
string
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), ""); // Empty 
string
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    // Should work without TLS configuration when paths are empty
+    RestEmitter emitter = supplier.get();
+    assertNotNull(emitter, "Emitter should be created without TLS when paths 
are empty");
+  }
+
+  @Test
+  void testEmitterCreationFailsWithInvalidTruststorePath() {
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(), 
"/non/existent/truststore.p12");
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(), 
"testpass");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    assertThrows(HoodieDataHubSyncException.class, supplier::get,
+        "Should throw exception when truststore file doesn't exist");
+  }
+
+  @Test
+  void testEmitterCreationFailsWithInvalidKeystoreFormat() throws Exception {
+    // Create an invalid keystore file (just text content)
+    Path invalidKeystorePath = tempDir.resolve("invalid-keystore.p12");
+    Files.write(invalidKeystorePath, "This is not a valid keystore 
file".getBytes());
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PATH.key(), 
invalidKeystorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_KEYSTORE_PASSWORD.key(), 
"testpass");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    assertThrows(HoodieDataHubSyncException.class, supplier::get,
+        "Should throw exception when keystore file format is invalid");
+  }
+
+  @Test
+  void testEmitterCreationFailsWithInvalidTruststoreFormat() throws Exception {
+    // Create an invalid truststore file (just text content)
+    Path invalidTruststorePath = tempDir.resolve("invalid-truststore.p12");
+    Files.write(invalidTruststorePath, "This is not a valid truststore 
file".getBytes());
+    
+    Properties props = new Properties();
+    props.setProperty(META_SYNC_DATAHUB_EMITTER_SERVER.key(), 
"https://datahub.example.com:8080";);
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH.key(), 
invalidTruststorePath.toString());
+    props.setProperty(META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD.key(), 
"testpass");
+    
+    TypedProperties typedProps = new TypedProperties();
+    TypedProperties.putAll(typedProps, props);
+    TlsEnabledDataHubEmitterSupplier supplier = new 
TlsEnabledDataHubEmitterSupplier(typedProps);
+    
+    assertThrows(HoodieDataHubSyncException.class, supplier::get,
+        "Should throw exception when truststore file format is invalid");
+  }
+
+  private Path createPasswordlessKeystore() throws Exception {
+    Path keystorePath = tempDir.resolve("passwordless-keystore.p12");
+    
+    // Generate a keystore without password using keytool command
+    ProcessBuilder pb = new ProcessBuilder(
+        "keytool", "-genkeypair", "-keyalg", "RSA", "-keysize", "2048", 
+        "-storetype", "PKCS12", "-keystore", keystorePath.toString(),
+        "-validity", "365", "-storepass", "", "-keypass", "",
+        "-dname", "CN=test-passwordless,O=Apache Hudi,L=Test,ST=Test,C=US",
+        "-alias", "test-passwordless"
+    );
+    
+    Process process = pb.start();
+    int exitCode = process.waitFor();
+    if (exitCode != 0) {
+      // Fallback: create a simple keystore programmatically
+      return createSimplePasswordlessKeystore();
+    }
+    
+    return keystorePath;
+  }
+
+  private Path createPasswordlessTruststore() throws Exception {
+    Path truststorePath = tempDir.resolve("passwordless-truststore.p12");
+    
+    // Generate a truststore without password using keytool command
+    ProcessBuilder pb = new ProcessBuilder(
+        "keytool", "-genkeypair", "-keyalg", "RSA", "-keysize", "2048", 
+        "-storetype", "PKCS12", "-keystore", truststorePath.toString(),
+        "-validity", "365", "-storepass", "", "-keypass", "",
+        "-dname", "CN=test-truststore-passwordless,O=Apache 
Hudi,L=Test,ST=Test,C=US",
+        "-alias", "test-truststore-passwordless"
+    );
+    
+    Process process = pb.start();
+    int exitCode = process.waitFor();
+    if (exitCode != 0) {
+      // Fallback: create a simple truststore programmatically
+      return createSimplePasswordlessTruststore();
+    }
+    
+    return truststorePath;
+  }
+
+  private Path createSimplePasswordlessKeystore() throws Exception {
+    Path keystorePath = tempDir.resolve("simple-passwordless-keystore.p12");
+    
+    // Create a simple keystore without password programmatically
+    KeyStore keyStore = KeyStore.getInstance("PKCS12");
+    keyStore.load(null, null); // Initialize empty keystore
+    
+    try (FileOutputStream fos = new FileOutputStream(keystorePath.toFile())) {
+      keyStore.store(fos, new char[0]); // Store with empty password
+    }
+    
+    return keystorePath;
+  }
+
+  private Path createSimplePasswordlessTruststore() throws Exception {
+    Path truststorePath = 
tempDir.resolve("simple-passwordless-truststore.p12");
+    
+    // Create a simple truststore without password programmatically
+    KeyStore trustStore = KeyStore.getInstance("PKCS12");
+    trustStore.load(null, null); // Initialize empty truststore
+    
+    try (FileOutputStream fos = new FileOutputStream(truststorePath.toFile())) 
{
+      trustStore.store(fos, new char[0]); // Store with empty password
+    }
+    
+    return truststorePath;
+  }
+
+  private Path createDummyCertificateFile() throws Exception {
+    Path certPath = tempDir.resolve("ca-cert.pem");
+
+    try (InputStream certStream = 
getClass().getClassLoader().getResourceAsStream("test-ca-cert.pem")) {
+      Objects.requireNonNull(certStream, "test-ca-cert.pem not found in 
resources");
+      Files.copy(certStream, certPath);
+    }
+
+    return certPath;
+  }
+
+  private Path copyMultipleCertificateFromResources() throws Exception {
+    Path multiCertPath = tempDir.resolve("multi-ca-cert.pem");
+
+    try (InputStream certStream = 
getClass().getClassLoader().getResourceAsStream("multi-ca-cert.pem")) {
+      Objects.requireNonNull(certStream, "multi-ca-cert.pem not found in 
resources");
+      Files.copy(certStream, multiCertPath);
+    }
+
+    return multiCertPath;
+  }
+
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/test/resources/multi-ca-cert.pem 
b/hudi-sync/hudi-datahub-sync/src/test/resources/multi-ca-cert.pem
new file mode 100644
index 000000000000..08d0849467d1
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/resources/multi-ca-cert.pem
@@ -0,0 +1,60 @@
+-----BEGIN CERTIFICATE-----
+MIIDTzCCAjegAwIBAgIUJMP4cxtT65p8L9Wt1HXEo8sgA7UwDQYJKoZIhvcNAQEL
+BQAwNzESMBAGA1UEAwwJVGVzdCBDQSAxMRQwEgYDVQQKDAtBcGFjaGUgSHVkaTEL
+MAkGA1UEBhMCVVMwHhcNMjUwOTA4MTc0OTE2WhcNMjYwOTA4MTc0OTE2WjA3MRIw
+EAYDVQQDDAlUZXN0IENBIDExFDASBgNVBAoMC0FwYWNoZSBIdWRpMQswCQYDVQQG
+EwJVUzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANZL4s56/5fs4wzq
++iZ6iUUVxb3ZcviDCNqpcfRYjNGjA332EffIvsPbemwUqm7At85tZFFd3zFaafzX
+PwEI1yC92N58isTw903eFRHD4FzXHzo6v+Rb5adEWTTabH1AoARs/UaQh6OzM11w
+T04hkJbd8QIgLxnS+YiCw7mYK3Lz9PcicbeajUAf/CbS2nw+i5GtieDp5DFM2mNI
+XWuvCr+8VKXp4ZcycBkWCjEuF+mpEVwkMq9mHqKVUq62APC7Zuu3MWhJJ3fBqUuQ
+cIElv2X8BU5CvzEjvRImLTc4iPEvci4g6wYRbSFf8bnJ0LJGK4fBf70FTfmMaws
+MysTszxUCAwEAAaNTMFEwHQYDVR0OBBYEFDR1Nl4BFM52C2aRmwhhVuzv+UW9MB8G
+A1UdIwQYMBaAFDR1Nl4BFM52C2aRmwhhVuzv+UW9MA8GA1UdEwEB/wQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAERhaw8v5YSyMJeCuXi7G15sxa54zQ12pqA38nru
+EHz+cjVwcB/bE4mcHIjH6JyEOpsQVsbocxojpEPPLEiiozCimdwHOHglyncACDlu
+TZsPkE9CMeeaUgMbCC67imVxiR4nGvxSVHBlPfw4gXXdExaMFwlHXNWUDCQXIjpi
++U832P7IxnW4mUGSPTrIspGtOV01R83niErec52yqGYz20JrUw0VI3tN3r+mB7pt
+ApyjxhsGld6hD8P9T1obDLiJVJMhzn9sYHY4Z54rQ22hW1ZU3nLJunMo9wfajsfu
+ZJqrDOGxh7UyAP6RmJExcyyCk5/3B21HqLdpQ8BNodDTtx0=
+-----END CERTIFICATE-----
+-----BEGIN CERTIFICATE-----
+MIIDTzCCAjegAwIBAgIUIC9NlvOG6qYhOgk8Jpu+45cmRL4wDQYJKoZIhvcNAQEL
+BQAwNzESMBAGA1UEAwwJVGVzdCBDQSAyMRQwEgYDVQQKDAtBcGFjaGUgSHVkaTEL
+MAkGA1UEBhMCVVMwHhcNMjUwOTA4MTc0OTIwWhcNMjYwOTA4MTc0OTIwWjA3MRIw
+EAYDVQQDDAlUZXN0IENBIDIxFDASBgNVBAoMC0FwYWNoZSBIdWRpMQswCQYDVQQG
+EwJVUzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKPPinO3IjngBkXy
+vnn8ERktJ9cV5pRKU3edtvTt1TpFVflh6hve+Q7wom5hGF7omKRiyGnDn0t5jhAN
+aIqxuoWYqLmIdw9Oydr2YHaebHna9TX4YgZQXkF4rzW1CluDfp6SorayAGpeYSzz
+CDak3f/mS+NnUwRP5BVbyidwZFISNdqXh4loz4sRNnbZldnVE1edIonMlsjVfFwp
+zULHwizAJkkvj/uAMJvZzbm+0xQsi6vjPCDpjTVbIBAhE1LESy61Tqr24n6EoHQD
+W8pXJeHYMXs8NxzFeLZPzwB3e/9VjpE+Unz4QmpJhc6AHWv4hvMC8h1qqn06z7MO
+Ng43tPsCAwEAAaNTMFEwHQYDVR0OBBYEFDgOAcZDctliZQDkDZ6VtThD8hP7MB8G
+A1UdIwQYMBaAFDgOAcZDctliZQDkDZ6VtThD8hP7MA8GA1UdEwEB/wQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAEyPsgQ9mgaqhc2NRu34VZ7jxN+R7xG+lqEW1eE1
+/iFDKIFI00uiKdXcfESTwxFfq3l4+0C+cr7RgVc6ENwlxZ3WveZQQSfO1gQ7xs7y
+KuibIv59bD91dmsWAG2v7wA04NTRqcsU8V31iJ/SAHoo/3ZJFF+MafF0kgPYKgI+
+rHC68PWs8LcvRHFmtjXwz0/ttZCo4I9Uh6c5kq7TN+od7vbR4z/FEMTqBfTLjJ9y
+wC4M2fJKzZFSIXzK/UTLVNP1JBOTGTuFqgrJzyuf50844BHWCv4YzKZRD+jE1M2F
+rbjlt36sZZXoQh7DiItqiFMgNri9xRQd2kNd/BCupbAzcBY=
+-----END CERTIFICATE-----
+-----BEGIN CERTIFICATE-----
+MIIDTzCCAjegAwIBAgIUGc1b28jv8DlS+LR0xmaso4t8kXwwDQYJKoZIhvcNAQEL
+BQAwNzESMBAGA1UEAwwJVGVzdCBDQSAzMRQwEgYDVQQKDAtBcGFjaGUgSHVkaTEL
+MAkGA1UEBhMCVVMwHhcNMjUwOTA4MTc0OTI1WhcNMjYwOTA4MTc0OTI1WjA3MRIw
+EAYDVQQDDAlUZXN0IENBIDMxFDASBgNVBAoMC0FwYWNoZSBIdWRpMQswCQYDVQQG
+EwJVUzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMMvjM9TQ+JgDqoN
+2YtTo9IsjCUbec0CjadxTbKsNaDZlF/6Jqtq5fk3YeOZZxTcbM1spNRDlbAx7ldi
+K6x5a2Wu+T+Ml1L2ODu1K1yxVKS5+U3GBiBkqwsqqeMrZOKXcwOWr62CEe2lNV4D
+XeD1JoN90e6NX9JDu2yKRXKj8wS9UmCdspbJZHTbEkuJV8I/WA8cp5niFEOTojQ/
+BBHOLwEym/2AXxuRam6JxDLlZAALqdP9AW8mHuGi5PKJ+CKxdO9nddpdPvCNG1QH
+X1rRJ365LfZRA4B8v/D9YYQvcdbbBSXgW6eBfNhKXmZqFyoGu6qbcRZHneyEXIRl
+IfhYDe8CAwEAAaNTMFEwHQYDVR0OBBYEFIK2pasMLnLzFLVJQ2elI4Ew+QYFMB8G
+A1UdIwQYMBaAFIK2pasMLnLzFLVJQ2elI4Ew+QYFMA8GA1UdEwEB/wQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAAtw6JBdFlC+m16S3RlWs1OaHtKY6Ju9Mc48aWBX
+kZEEbnp/H9oJ791Yxut4t6II5Eeo5Qx53ujjKTLIP2pENaBoWUdDpvDRmwbSzU42
+MZjqerK7ABsRYa9Q2HNj20UuazRLd7/f0Lc3wPv7iMo71+xjt36KIKZI9Cj5R2GM
+OEF0t1A2chmf+NKEnnVyx8uM3eaGnBZpCOcoc/kNJRP56gXOa/Ebx48zvyXLV0Ef
+fU5bFtngc7GjMW9u5AWvE5vKNxddUqGWV4XWrlLsQsf30yqVvFn0lRIxMNDMCukn
+2ZGM83W3RozxqZNpbZzw7UIwhA9EvOxhicqYFJ+nOIsaVEY=
+-----END CERTIFICATE-----
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/test/resources/test-ca-cert.pem 
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-ca-cert.pem
new file mode 100644
index 000000000000..4a8a3cc68e3d
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/resources/test-ca-cert.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDazCCAlOgAwIBAgIUFtGMq5vPqvNXGDCH2rJ5n0OhLuQwDQYJKoZIhvcNAQEL
+BQAwRTELMAkGA1UEBhMCVVMxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNDAxMDEwMDAwMDBaFw0yNTAx
+MDEwMDAwMDBaMEUxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
+HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
+AQUAA4IBDwAwggEKAoIBAQC7bthgxE4i9lRaKov+kFVWy8Bj2w7V1EhGNxUe7j5O
+HOLx3hE2B1BcSa2no3W7XgJ9cYQ7jVaDj3bPP7vVZKQ4YCqFH9z2l1kZQKjZQ7gU
+FpqRRZHJpYxGKwP7YsBsVz3FJKmUyGP5cFq8mLBIcLa2Y9riYZQz7Wm7VgKzR3eZ
+YxDF9N5PbQnQt4bCFed0TlcX5gYFUqiRW5AUe5mECjH7hcWGaAXQBQKNbLiKVYEq
+jO4Y3F8xZ7CiMUVLkQMjKwYGNP3iG3msFS9bTiLcHwIx2RbFjxGg+gfvZ8EiYpFE
+ZQPvLsbmjqHc5ufhQJ3lfzYF2FjLcVx1VrsVhTQmB9EfAgMBAAGjUzBRMB0GA1Ud
+DgQWBBTWV4Dr2cKLsHhP0Hxc+CmZx2DZBzAfBgNVHSMEGDAWgBTWV4Dr2cKLsHhP
+0Hxc+CmZx2DZBzAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCR
+jf+kzYIFU+3M0hBpVz7cPYlhSI7gWdSIvY5kYhX4vvRBFGJ8OFo9flUd4vvO+GyV
+g+UNmGpgslqYdXuFmC2IftPRgaLpohZWvmR1CYUS3AbcuWUqFKchTHPaJLZTAcNu
+xYrqULYzsQxLb1FcXHoqntQe1YvLlkRRPkS9E3Lf0KO6dOlAwJI6qCvhzvtguinh
+FcTnSsJl6YQF8gMRdGxGPH5OKnpVr6xVkQ0gzc7P5AdVRDkVAjI8S5tgdMjHJB0L
+yN3nwCWMJgJqN5ZkJVz5cYtXdlfJBaDjXRbV+jpOOJRDr6Fy8PQ1EEvhQEoVHQz0
+FKevncF0IEv9ZFHQPTLM
+-----END CERTIFICATE-----
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/test/resources/test-keystore.p12 
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-keystore.p12
new file mode 100644
index 000000000000..382210075367
Binary files /dev/null and 
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-keystore.p12 differ
diff --git a/hudi-sync/hudi-datahub-sync/src/test/resources/test-truststore.p12 
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-truststore.p12
new file mode 100644
index 000000000000..160e2ed588a7
Binary files /dev/null and 
b/hudi-sync/hudi-datahub-sync/src/test/resources/test-truststore.p12 differ

Reply via email to