This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 075a9a4a02b [HUDI-9155] Add scaffolding for lock provider using 
conditional writes (#12954)
075a9a4a02b is described below

commit 075a9a4a02b29be29b2e99ccfeade80dc6e0079d
Author: Alex R <[email protected]>
AuthorDate: Mon Apr 7 17:30:22 2025 -0700

    [HUDI-9155] Add scaffolding for lock provider using conditional writes 
(#12954)
---
 .../hudi/client/transaction/lock/StorageLock.java  |  63 ++++++++
 .../transaction/lock/models/LockGetResult.java     |  38 +++++
 .../transaction/lock/models/LockUpdateResult.java  |  38 +++++
 .../transaction/lock/models/StorageLockData.java   |  71 +++++++++
 .../transaction/lock/models/StorageLockFile.java   | 133 +++++++++++++++++
 .../lock/models/StorageLockFileTest.java           | 165 +++++++++++++++++++++
 6 files changed, 508 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLock.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLock.java
new file mode 100644
index 00000000000..6ed51803906
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLock.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.client.transaction.lock.models.LockUpdateResult;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.function.Supplier;
+
+/**
+ * Defines a contract for a service which should be able to perform 
conditional writes to object storage.
+ * It expects to be interacting with a single lock file per context (table), 
and will be competing with other instances
+ * to perform writes, so it should handle these cases accordingly (using 
conditional writes).
+ */
+public interface StorageLock extends AutoCloseable {
+  /**
+   * Tries once to create or update a lock file.
+   * @param newLockData The new data to update the lock file with.
+   * @param previousLockFile The previous lock file, use this to conditionally 
update the lock file.
+   * @return A pair containing the result state and the new lock file (if 
successful)
+   */
+  Pair<LockUpdateResult, StorageLockFile> tryCreateOrUpdateLockFile(
+      StorageLockData newLockData,
+      StorageLockFile previousLockFile);
+
+  /**
+   * Tries to create or update a lock file while retrying N times.
+   * All non pre-condition failure related errors should be retried.
+   * @param newLockDataSupplier The new data supplier
+   * @param previousLockFile The previous lock file
+   * @param retryCount Number of retries to attempt
+   * @return A pair containing the result state and the new lock file (if 
successful)
+   */
+  Pair<LockUpdateResult, StorageLockFile> tryCreateOrUpdateLockFileWithRetry(
+      Supplier<StorageLockData> newLockDataSupplier,
+      StorageLockFile previousLockFile,
+      long retryCount);
+
+  /**
+   * Reads the current lock file.
+   * @return The lock retrieve result and the current lock file if 
successfully retrieved.
+   * */
+  Pair<LockGetResult, StorageLockFile> readCurrentLockFile();
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
new file mode 100644
index 00000000000..68809d244c3
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+public enum LockGetResult {
+  // Lock file does not exist with code 0
+  NOT_EXISTS(0),
+  // Successfully retrieved the lock file with code 1
+  SUCCESS(1),
+  // Unable to determine lock state due to transient errors with code 2
+  UNKNOWN_ERROR(2);
+
+  private final int code;
+
+  LockGetResult(int code) {
+    this.code = code;
+  }
+
+  public int getCode() {
+    return code;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpdateResult.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpdateResult.java
new file mode 100644
index 00000000000..b9ee0ad4260
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpdateResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+public enum LockUpdateResult {
+  // Lock was successfully created/updated with code 0
+  SUCCESS(0),
+  // Another process has modified the lock file (precondition failure) with 
code 1
+  ACQUIRED_BY_OTHERS(1),
+  // Unable to determine lock state due to transient errors with code 2
+  UNKNOWN_ERROR(2);
+
+  private final int code;
+
+  LockUpdateResult(int code) {
+    this.code = code;
+  }
+
+  public int getCode() {
+    return code;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
new file mode 100644
index 00000000000..01c7d141ebd
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Pojo for conditional writes-based lock provider.
+ */
+public class StorageLockData {
+  private final boolean expired;
+  private final long validUntil;
+  private final String owner;
+
+  /**
+   * Initializes an object describing a conditionally written lock.
+   * @param expired Whether the lock is expired.
+   * @param validUntil The epoch in ms when the lock is expired.
+   * @param owner The uuid owner of the owner of this lock.
+   */
+  @JsonCreator
+  public StorageLockData(
+      @JsonProperty(value = "expired", required = true) boolean expired,
+      @JsonProperty(value = "validUntil", required = true) long validUntil,
+      @JsonProperty(value = "owner", required = true) String owner) {
+    this.expired = expired;
+    this.validUntil = validUntil;
+    this.owner = owner;
+  }
+
+  /**
+   * Gets the expiration.
+   * @return The long representing the expiration in ms.
+   */
+  public long getValidUntil() {
+    return this.validUntil;
+  }
+
+  /**
+   * Whether the lock is expired.
+   * @return True boolean representing whether the lock is expired.
+   */
+  public boolean isExpired() {
+    return this.expired;
+  }
+
+  /**
+   * The owner.
+   * @return A string representing the uuid of the owner of this lock.
+   */
+  public String getOwner() {
+    return this.owner;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
new file mode 100644
index 00000000000..2a045f31aef
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class StorageLockFile {
+
+  private final StorageLockData data;
+  private final String versionId;
+
+  // Get a custom object mapper. See ConditionalWriteLockData for required 
properties.
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+      // This allows us to add new properties down the line.
+      .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+      // Should not let validUntil or expired be null.
+      .enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
+
+  /**
+   * Initializes a ConditionalWriteLockFile using the data and unique 
versionId.
+   *
+   * @param data      The data in the lock file.
+   * @param versionId The version of this lock file. Used to ensure 
consistency through conditional writes.
+   */
+  public StorageLockFile(StorageLockData data, String versionId) {
+    ValidationUtils.checkArgument(data != null, "Data must not be null.");
+    ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(versionId), 
"VersionId must not be null or empty.");
+    this.data = data;
+    this.versionId = versionId;
+  }
+
+  /**
+   * Factory method to load an input stream into a ConditionalWriteLockFile.
+   *
+   * @param inputStream The input stream of the JSON content.
+   * @param versionId   The unique version identifier for the lock file.
+   * @return A new instance of ConditionalWriteLockFile.
+   * @throws HoodieIOException If deserialization fails.
+   */
+  public static StorageLockFile createFromStream(InputStream inputStream, 
String versionId) {
+    try {
+      StorageLockData data = OBJECT_MAPPER.readValue(inputStream, 
StorageLockData.class);
+      return new StorageLockFile(data, versionId);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to deserialize JSON content into 
ConditionalWriteLockData", e);
+    }
+  }
+
+  /**
+   * Writes the serialized JSON representation of this object to an output 
stream.
+   *
+   * @param outputStream The output stream to write the JSON to.
+   * @throws HoodieIOException If serialization fails.
+   */
+  public void writeToStream(OutputStream outputStream) {
+    try {
+      OBJECT_MAPPER.writeValue(outputStream, this.data);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error writing object to JSON output 
stream", e);
+    }
+  }
+
+  /**
+   * Converts the data to a bytearray. Since we know the payloads will be 
small this is fine.
+   * @return A byte array.
+   * @throws HoodieIOException If serialization fails.
+   */
+  public static byte[] toByteArray(StorageLockData data) {
+    try {
+      return OBJECT_MAPPER.writeValueAsBytes(data);
+    } catch (JsonProcessingException e) {
+      throw new HoodieIOException("Error writing object to byte array", e);
+    }
+  }
+
+  /**
+   * Gets the version id.
+   * @return A string for the version id.
+   */
+  public String getVersionId() {
+    return this.versionId;
+  }
+
+  /**
+   * Gets the expiration time in ms.
+   * @return A long representing the expiration.
+   */
+  public long getValidUntil() {
+    return this.data.getValidUntil();
+  }
+
+  /**
+   * Gets whether the lock is expired.
+   * @return A boolean representing expired.
+   */
+  public boolean isExpired() {
+    return this.data.isExpired();
+  }
+
+  /**
+   * Gets the owner of the lock.
+   * @return A string for the owner of the lock.
+   */
+  public String getOwner() {
+    return this.data.getOwner();
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockFileTest.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockFileTest.java
new file mode 100644
index 00000000000..d1dca9dbaa3
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockFileTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class StorageLockFileTest {
+
+  private static final String JSON_DATA = 
"{\"expired\":false,\"validUntil\":1700000000000,\"owner\":\"testOwner\"}";
+  private static final String JSON_DATA_EXTRA_FIELD = 
"{\"expired\":true,\"validUntil\":1600000000000,\"owner\":\"otherOwner\",\"state\":\"active\"}";
+  private static final String INVALID_JSON = "{\"invalidField\":123}";
+  private static final String VERSION_ID = "testVersionId";
+
+  private InputStream validJsonStream;
+  private InputStream extraFieldValidJsonStream;
+  private InputStream invalidJsonStream;
+
+  @BeforeEach
+  void setup() {
+    validJsonStream = new ByteArrayInputStream(JSON_DATA.getBytes());
+    extraFieldValidJsonStream = new 
ByteArrayInputStream(JSON_DATA_EXTRA_FIELD.getBytes());
+    invalidJsonStream = new ByteArrayInputStream(INVALID_JSON.getBytes());
+  }
+
+  @Test
+  void testCreateValidInputStream() {
+    StorageLockFile file = StorageLockFile.createFromStream(validJsonStream, 
VERSION_ID);
+    assertEquals(1700000000000L, file.getValidUntil());
+    assertEquals("testOwner", file.getOwner());
+    assertEquals(VERSION_ID, file.getVersionId());
+    assertFalse(file.isExpired());
+  }
+
+  @Test
+  void testCreateValidInputStreamExtraField() {
+    StorageLockFile file = 
StorageLockFile.createFromStream(extraFieldValidJsonStream, VERSION_ID);
+    assertEquals(1600000000000L, file.getValidUntil());
+    assertEquals("otherOwner", file.getOwner());
+    assertEquals(VERSION_ID, file.getVersionId());
+    assertTrue(file.isExpired());
+  }
+
+  @Test
+  void testCreateInvalidInputStreamFromMock() throws IOException {
+    InputStream mockInputStream = mock(InputStream.class);
+
+    doThrow(new IOException("Simulated IOException"))
+        .when(mockInputStream)
+        .read();
+    HoodieIOException exception = assertThrows(HoodieIOException.class, () -> 
StorageLockFile.createFromStream(mockInputStream, "versionId"));
+    assertTrue(exception.getMessage().contains("Failed to deserialize"));
+  }
+
+  @Test
+  void testCreateInvalidInputStreamFromBadData() {
+    HoodieIOException exception = assertThrows(HoodieIOException.class, () ->
+        StorageLockFile.createFromStream(invalidJsonStream, VERSION_ID)
+    );
+    assertTrue(exception.getMessage().contains("Failed to deserialize"));
+  }
+
+  @Test
+  void testCreateNullData() {
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () ->
+        new StorageLockFile(null, VERSION_ID)
+    );
+    assertTrue(exception.getMessage().contains("Data must not be null"));
+  }
+
+  @Test
+  void testCreateNullVersionId() {
+    StorageLockData data = new StorageLockData(true, 1700000000000L, 
"testOwner");
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () ->
+        new StorageLockFile(data, null)
+    );
+    assertTrue(exception.getMessage().contains("VersionId must not be null or 
empty."));
+    exception = assertThrows(IllegalArgumentException.class, () ->
+        new StorageLockFile(data, "")
+    );
+    assertTrue(exception.getMessage().contains("VersionId must not be null or 
empty."));
+  }
+
+  @Test
+  void testToJsonStreamValidData() {
+    StorageLockFile file = StorageLockFile.createFromStream(validJsonStream, 
VERSION_ID);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    file.writeToStream(outputStream);
+    String outputJson = new String(outputStream.toByteArray());
+    assertTrue(outputJson.contains("\"expired\":false"));
+    assertTrue(outputJson.contains("\"validUntil\":1700000000000"));
+    assertTrue(outputJson.contains("\"owner\":\"testOwner\""));
+  }
+
+  @Test
+  void testToJsonStreamErrorHandling() throws IOException {
+    OutputStream mockOutputStream = mock(OutputStream.class);
+
+    doThrow(new IOException("Simulated IOException"))
+        .when(mockOutputStream)
+        .write(any(byte[].class), anyInt(), anyInt());
+    StorageLockFile file = new StorageLockFile(
+        new StorageLockData(true, System.currentTimeMillis() + 1000, 
"testOwner"),
+        VERSION_ID);
+
+    HoodieIOException exception = assertThrows(HoodieIOException.class, () -> 
file.writeToStream(mockOutputStream));
+    assertTrue(exception.getMessage().contains("Error writing object to 
JSON"));
+  }
+
+  @Test
+  void testToByteArrayValidData() {
+    StorageLockData data = new StorageLockData(false, 1700000000000L, 
"testOwner");
+    String outputJson = new String(StorageLockFile.toByteArray(data));
+    assertTrue(outputJson.contains("\"expired\":false"));
+    assertTrue(outputJson.contains("\"validUntil\":1700000000000"));
+    assertTrue(outputJson.contains("\"owner\":\"testOwner\""));
+  }
+
+  @Test
+  void testIsExpired() {
+    StorageLockData data = new StorageLockData(true, 
System.currentTimeMillis() - 1000, "testOwner");
+    StorageLockFile file = new StorageLockFile(data, VERSION_ID);
+    assertTrue(file.isExpired());
+  }
+
+  @Test
+  void testGetVersionId() {
+    StorageLockData data = new StorageLockData(false, 1700000000000L, 
"testOwner");
+    StorageLockFile file = new StorageLockFile(data, VERSION_ID);
+    assertEquals(VERSION_ID, file.getVersionId());
+  }
+}

Reply via email to