This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e0c159bf4a [HUDI-7198] Create nested node path if does not exist for 
zookeeper. (#10438)
1e0c159bf4a is described below

commit 1e0c159bf4abc14d871a7e3fcf53a088992d6915
Author: harshal <[email protected]>
AuthorDate: Thu Jan 4 12:59:16 2024 +0530

    [HUDI-7198] Create nested node path if does not exist for zookeeper. 
(#10438)
---
 .../lock/ZookeeperBasedLockProvider.java           | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
index 31b92dcf914..4299a603ece 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
@@ -31,6 +31,7 @@ import 
org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.retry.BoundedExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,8 +75,48 @@ public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMute
         
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY,
 DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
         .build();
     this.curatorFrameworkClient.start();
+    createPathIfNotExists();
   }
 
+  private String getLockPath() {
+    return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
+        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
+  }
+
+  private void createPathIfNotExists() {
+    try {
+      String lockPath = getLockPath();
+      LOG.info(String.format("Creating zookeeper path %s if not exists", 
lockPath));
+      String[] parts = lockPath.split("/");
+      StringBuilder currentPath = new StringBuilder();
+      for (String part : parts) {
+        if (!part.isEmpty()) {
+          currentPath.append("/").append(part);
+          createNodeIfNotExists(currentPath.toString());
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to create ZooKeeper path: " + e.getMessage());
+      throw new HoodieLockException("Failed to initialize ZooKeeper path", e);
+    }
+  }
+
+  private void createNodeIfNotExists(String path) throws Exception {
+    if (this.curatorFrameworkClient.checkExists().forPath(path) == null) {
+      try {
+        this.curatorFrameworkClient.create().forPath(path);
+        // to avoid failure due to synchronous calls.
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.NODEEXISTS) {
+          LOG.debug(String.format("Node already exist for path = %s", path));
+        } else {
+          throw new HoodieLockException("Failed to create zookeeper node", e);
+        }
+      }
+    }
+  }
+
+
   // Only used for testing
   public ZookeeperBasedLockProvider(
       final LockConfiguration lockConfiguration, final CuratorFramework 
curatorFrameworkClient) {
@@ -85,6 +126,7 @@ public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMute
     synchronized (this.curatorFrameworkClient) {
       if (this.curatorFrameworkClient.getState() != 
CuratorFrameworkState.STARTED) {
         this.curatorFrameworkClient.start();
+        createPathIfNotExists();
       }
     }
   }

Reply via email to