This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4499b0b2bd43 feat(spark): ZooKeeper node should hold spark app id (for
helping debug when lock is held for long time) (#18123)
4499b0b2bd43 is described below
commit 4499b0b2bd43bd90f4dd22e9631d401dae4c19da
Author: Krishen <[email protected]>
AuthorDate: Thu Mar 5 14:19:02 2026 -0800
feat(spark): ZooKeeper node should hold spark app id (for helping debug
when lock is held for long time) (#18123)
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../org/apache/hudi/client/BaseHoodieClient.java | 3 +
.../lock/BaseZookeeperBasedLockProvider.java | 4 +-
.../transaction/lock/HoodieInterProcessMutex.java | 65 ++++++++++++++++
.../hudi/client/transaction/lock/LockManager.java | 5 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 15 ++++
.../lock/TestHoodieInterProcessMutex.java | 91 ++++++++++++++++++++++
.../client/common/HoodieSparkEngineContext.java | 5 ++
.../callback/TestHoodieClientInitCallback.java | 1 +
.../hudi/common/config/LockConfiguration.java | 2 +
.../hudi/common/engine/HoodieEngineContext.java | 8 ++
10 files changed, 196 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 71ac0637577e..6fe7771d7252 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -37,6 +37,8 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieWriteConfig;
+
+import static org.apache.hudi.config.HoodieWriteConfig.APPLICATION_ID;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -111,6 +113,7 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
this.context = context;
this.basePath = clientConfig.getBasePath();
this.config = clientConfig;
+ this.config.setValue(APPLICATION_ID, context.getApplicationId());
this.timelineServer = timelineServer;
shouldStopTimelineServer = !timelineServer.isPresent();
this.heartbeatClient = new HoodieHeartbeatClient(storage, this.basePath,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
index af82fe2d7ae0..d5b04c15c005 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
@@ -170,8 +170,8 @@ public abstract class BaseZookeeperBasedLockProvider
implements LockProvider<Int
private void acquireLock(long time, TimeUnit unit) throws Exception {
ValidationUtils.checkArgument(this.lock == null,
generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
- InterProcessMutex newLock = new InterProcessMutex(
- this.curatorFrameworkClient, getLockPath());
+ InterProcessMutex newLock = new HoodieInterProcessMutex(
+ this.curatorFrameworkClient, getLockPath(), this.lockConfiguration);
boolean acquired = newLock.acquire(time, unit);
if (!acquired) {
throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/HoodieInterProcessMutex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/HoodieInterProcessMutex.java
new file mode 100644
index 000000000000..80f482eb2769
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/HoodieInterProcessMutex.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.hudi.common.config.LockConfiguration;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This is a HUDI specific wrapper for {@link InterProcessMutex} to allow for
+ * passing in and utilizing information from {@link LockConfiguration}.
+ * For example, the application id is passed as part of the lock information,
+ * meaning that when the (distributed) lock is acquired other users can see the
+ * application id as part of the lock information (when using tools like zkcli)
+ */
+public class HoodieInterProcessMutex extends InterProcessMutex {
+
+ // Data that will be added to lock node upon lock being acquired.
+ // This can be used to provide metadata about the thread holding the lock,
+ // such as application id of the job
+ private final byte[] lockNodeBytes;
+
+ public HoodieInterProcessMutex(final CuratorFramework client, final String
path,
+ final LockConfiguration config) {
+ super(client, path);
+ Map<String, String> lockNodeData = new HashMap<>();
+ String applicationId = config
+ .getConfig()
+ .getString(LockConfiguration.LOCK_HOLDER_APP_ID_KEY, "Unknown");
+ lockNodeData.put("application_id", applicationId);
+ this.lockNodeBytes = lockNodeData
+ .entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue())
+ .collect(Collectors.joining(","))
+ .getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ protected byte[] getLockNodeBytes() {
+ return this.lockNodeBytes;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index a660cb3c4e09..21eb5da61575 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -61,7 +61,10 @@ public class LockManager implements Serializable,
AutoCloseable {
public LockManager(HoodieWriteConfig writeConfig, HoodieStorage storage,
TypedProperties lockProps) {
this.writeConfig = writeConfig;
this.storageConf = storage.getConf().newInstance();
- this.lockConfiguration = new LockConfiguration(lockProps);
+ TypedProperties lockPropsWithAppId = new TypedProperties();
+ lockPropsWithAppId.putAll(lockProps);
+ lockPropsWithAppId.put(LockConfiguration.LOCK_HOLDER_APP_ID_KEY,
writeConfig.getApplicationId());
+ this.lockConfiguration = new LockConfiguration(lockPropsWithAppId);
maxRetries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
maxWaitTimeInMs =
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f961fca2c314..734339d5fdef 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -939,6 +939,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Flag to indicate whether to ignore any non exception
error (e.g. write status error)."
+ "By default true for backward compatibility.");
+ public static final ConfigProperty<String> APPLICATION_ID = ConfigProperty
+ .key("hoodie.write.application.id")
+ .defaultValue("Unknown")
+ .markAdvanced()
+ .withDocumentation("Application identifier (e.g. Spark application id)
used to populate lock metadata so lock holders can be identified.");
+
/**
* Config key with boolean value that indicates whether record being written
during MERGE INTO Spark SQL
* operation are already prepped.
@@ -2730,6 +2736,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(HoodieLockConfig.HIVE_TABLE_NAME);
}
+ public String getApplicationId() {
+ return getStringOrDefault(APPLICATION_ID);
+ }
+
public ConflictResolutionStrategy getWriteConflictResolutionStrategy() {
return
ReflectionUtils.loadClass(getString(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME));
}
@@ -3470,6 +3480,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withApplicationId(String appId) {
+ writeConfig.setValue(APPLICATION_ID, appId);
+ return this;
+ }
+
public Builder withProperties(Properties properties) {
this.writeConfig.getProps().putAll(properties);
return this;
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestHoodieInterProcessMutex.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestHoodieInterProcessMutex.java
new file mode 100644
index 000000000000..d9e9fcf70f7a
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestHoodieInterProcessMutex.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_HOLDER_APP_ID_KEY;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link HoodieInterProcessMutex#getLockNodeBytes()}.
+ */
+public class TestHoodieInterProcessMutex {
+
+ private static final String LOCK_PATH = "/hudi/test/lock";
+
+ @Test
+ public void testGetLockNodeBytesContainsApplicationIdFromConfig() throws
Exception {
+ String appId = "my-spark-app-12345";
+ LockConfiguration config = new
LockConfiguration(createPropsWithAppId(appId));
+
+ HoodieInterProcessMutex mutex = new
HoodieInterProcessMutex(mock(CuratorFramework.class), LOCK_PATH, config);
+ byte[] lockNodeBytes = getLockNodeBytes(mutex);
+
+ String lockNodeData = new String(lockNodeBytes, StandardCharsets.UTF_8);
+ assertTrue(lockNodeData.contains("application_id=" + appId),
+ "Lock node data should contain application_id from config: " +
lockNodeData);
+ }
+
+ @Test
+ public void testGetLockNodeBytesDefaultsToUnknownWhenAppIdNotSet() throws
Exception {
+ LockConfiguration config = new LockConfiguration(new Properties());
+
+ HoodieInterProcessMutex mutex = new
HoodieInterProcessMutex(mock(CuratorFramework.class), LOCK_PATH, config);
+ byte[] lockNodeBytes = getLockNodeBytes(mutex);
+
+ String lockNodeData = new String(lockNodeBytes, StandardCharsets.UTF_8);
+ assertTrue(lockNodeData.contains("application_id=Unknown"),
+ "Lock node data should default application_id to Unknown: " +
lockNodeData);
+ }
+
+ @Test
+ public void testGetLockNodeBytesFormat() throws Exception {
+ String appId = "test-app";
+ LockConfiguration config = new
LockConfiguration(createPropsWithAppId(appId));
+
+ HoodieInterProcessMutex mutex = new
HoodieInterProcessMutex(mock(CuratorFramework.class), LOCK_PATH, config);
+ byte[] lockNodeBytes = getLockNodeBytes(mutex);
+
+ String expected = "application_id=" + appId;
+ assertArrayEquals(expected.getBytes(StandardCharsets.UTF_8), lockNodeBytes,
+ "Lock node bytes should be in key=value format");
+ }
+
+ private static Properties createPropsWithAppId(String appId) {
+ Properties props = new Properties();
+ props.setProperty(LOCK_HOLDER_APP_ID_KEY, appId);
+ return props;
+ }
+
+ private static byte[] getLockNodeBytes(HoodieInterProcessMutex mutex) throws
Exception {
+ Method method =
HoodieInterProcessMutex.class.getDeclaredMethod("getLockNodeBytes");
+ method.setAccessible(true);
+ return (byte[]) method.invoke(mutex);
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index ea435d4a39a0..7e0d9db2feaf 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -249,6 +249,11 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
javaSparkContext.cancelAllJobs();
}
+ @Override
+ public String getApplicationId() {
+ return javaSparkContext.sc().applicationId();
+ }
+
@Override
public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
Function2<O, I, O> seqOpFunc = seqOp::apply;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
index 41edbd04b972..a5458674763a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
@@ -85,6 +85,7 @@ public class TestHoodieClientInitCallback {
StorageConfiguration storageConfToReturn = getDefaultStorageConf();
when(engineContext.getStorageConf()).thenReturn(storageConfToReturn);
+ when(engineContext.getApplicationId()).thenReturn("test-app-id");
}
@Test
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
index 9170b941a8c6..809e33954425 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
@@ -81,6 +81,8 @@ public class LockConfiguration implements Serializable {
public static final String ZK_LOCK_KEY_PROP_KEY =
ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "lock_key";
+ public static final String LOCK_HOLDER_APP_ID_KEY = LOCK_PREFIX + "app_id";
+
/** @deprecated Use {@link #LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY}
*/
@Deprecated
public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP =
LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 327dda0a736f..2fce591b60e9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -123,6 +123,14 @@ public abstract class HoodieEngineContext {
public abstract void cancelAllJobs();
+ /**
+ * Returns the application id of the engine (e.g. Spark application id).
+ * Used to populate lock metadata so lock holders can be identified.
+ */
+ public String getApplicationId() {
+ return "Unknown";
+ }
+
/**
* Aggregate the elements of each partition, and then the results for all
the partitions, using given combine functions and a neutral "zero value".
*