yihua commented on code in PR #13868:
URL: https://github.com/apache/hudi/pull/13868#discussion_r2334996143


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Storage-based audit service implementation for lock provider operations.
+ * Writes audit records to a single JSONL file per transaction to track lock 
lifecycle events.
+ */
+public class StorageLockProviderAuditService implements AuditService {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(StorageLockProviderAuditService.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  
+  // Audit configuration constants
+  public static final String AUDIT_FOLDER_NAME = "audit";
+  public static final String AUDIT_CONFIG_FILE_NAME = "audit_enabled.json";
+  public static final String STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD = 
"STORAGE_LOCK_AUDIT_SERVICE_ENABLED";
+  
+  /**
+   * Constructs the full path to the audit configuration file for a given 
table.
+   *
+   * @param basePath The base path of the Hudi table
+   * @return The full path to the audit_enabled.json configuration file
+   */
+  public static String getAuditConfigPath(String basePath) {
+    String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+    return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR, 
AUDIT_CONFIG_FILE_NAME);
+  }
+  
+  /**
+   * Constructs the full path to the audit folder for a given table.
+   *
+   * @param basePath The base path of the Hudi table
+   * @return The full path to the audit folder where audit files are stored
+   */
+  public static String getAuditFolderPath(String basePath) {
+    String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+    return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR, 
AUDIT_FOLDER_NAME);
+  }
+  
+  private final String ownerId;
+  private final long transactionStartTime;
+  private final String auditFilePath;
+  private final StorageLockClient storageLockClient;
+  private final Function<Long, Long> lockExpirationFunction;
+  private final Supplier<Boolean> lockHeldSupplier;
+  private final StringBuilder auditBuffer;
+  
+  /**
+   * Constructor for StorageLockProviderAuditService.
+   * 
+   * @param basePath The base path where audit files will be written
+   * @param ownerId The full owner ID
+   * @param transactionStartTime The timestamp when the transaction started 
(lock acquired)
+   * @param storageLockClient The storage client for writing audit files
+   * @param lockExpirationFunction Function that takes a timestamp and returns 
the lock expiration time
+   * @param lockHeldSupplier Supplier that provides whether the lock is 
currently held
+   */
+  public StorageLockProviderAuditService(
+      String basePath,
+      String ownerId,
+      long transactionStartTime,
+      StorageLockClient storageLockClient,
+      Function<Long, Long> lockExpirationFunction,
+      Supplier<Boolean> lockHeldSupplier) {
+    this.ownerId = ownerId;
+    this.transactionStartTime = transactionStartTime;
+    this.storageLockClient = storageLockClient;
+    this.lockExpirationFunction = lockExpirationFunction;
+    this.lockHeldSupplier = lockHeldSupplier;
+    this.auditBuffer = new StringBuilder();
+    
+    // Generate audit file path: <txn-start>_<full-owner-id>.jsonl
+    String filename = String.format("%d_%s.jsonl", transactionStartTime, 
ownerId);
+    this.auditFilePath = String.format("%s%s%s",
+        getAuditFolderPath(basePath),
+        StoragePath.SEPARATOR,
+        filename);
+    
+    LOG.debug("Initialized audit service for transaction starting at {} with 
file: {}", 
+        transactionStartTime, auditFilePath);
+  }
+  
+  @Override
+  public synchronized void recordOperation(AuditOperationState state, long 
timestamp) throws Exception {
+    // Create audit record
+    Map<String, Object> auditRecord = new HashMap<>();
+    auditRecord.put("ownerId", ownerId);
+    auditRecord.put("transactionStartTime", transactionStartTime);
+    auditRecord.put("timestamp", timestamp);
+    auditRecord.put("state", state.name());
+    auditRecord.put("lockExpiration", lockExpirationFunction.apply(timestamp));
+    auditRecord.put("lockHeld", lockHeldSupplier.get());
+    
+    // Convert to JSON and append newline for JSONL format
+    String jsonLine = OBJECT_MAPPER.writeValueAsString(auditRecord) + "\n";
+    
+    // Append to buffer
+    auditBuffer.append(jsonLine);
+    
+    // Write the accumulated audit records to file
+    writeAuditFile();
+    
+    LOG.debug("Recorded audit operation: state={}, timestamp={}, file={}", 
+        state, timestamp, auditFilePath);
+  }
+  

Review Comment:
   I'm OK with that



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for StorageLockProviderAuditService.
+ */
+public class TestStorageLockProviderAuditService {
+
+  private static final String BASE_PATH = "s3://bucket/table";
+  private static final String OWNER_ID = 
"writer-12345678-9abc-def0-1234-567890abcdef";
+  private static final long TRANSACTION_START_TIME = 1234567890000L;
+  private static final long LOCK_EXPIRATION = 1000000L;
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private StorageLockClient storageLockClient;
+  private Supplier<Boolean> lockHeldSupplier;
+  private StorageLockProviderAuditService auditService;
+
+  @BeforeEach
+  void setUp() {
+    storageLockClient = mock(StorageLockClient.class);
+    lockHeldSupplier = (Supplier<Boolean>) mock(Supplier.class);
+
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    auditService = new StorageLockProviderAuditService(
+        BASE_PATH,
+        OWNER_ID,
+        TRANSACTION_START_TIME,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+  }
+
+  @Test
+  void testRecordOperationStartSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    ArgumentCaptor<String> pathCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(1)).writeObject(pathCaptor.capture(), 
contentCaptor.capture());
+
+    String expectedPath = 
String.format("%s%s.hoodie%s.locks%saudit%s%d_%s.jsonl",
+        BASE_PATH,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        TRANSACTION_START_TIME,
+        OWNER_ID);
+    assertEquals(expectedPath, pathCaptor.getValue());
+
+    // Parse JSONL content (should be a single line)
+    String[] lines = contentCaptor.getValue().trim().split("\n");
+    assertEquals(1, lines.length, "Should have one JSON line");
+
+    Map<String, Object> auditRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    assertEquals(OWNER_ID, auditRecord.get("ownerId"));
+    assertEquals(TRANSACTION_START_TIME, ((Number) 
auditRecord.get("transactionStartTime")).longValue());
+    assertEquals(timestamp, ((Number) 
auditRecord.get("timestamp")).longValue());
+    assertEquals("START", auditRecord.get("state"));
+    assertEquals(LOCK_EXPIRATION, ((Number) 
auditRecord.get("lockExpiration")).longValue());
+    assertTrue((Boolean) auditRecord.get("lockHeld"));
+  }
+
+  @Test
+  void testRecordOperationRenewSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    // First record START
+    auditService.recordOperation(AuditOperationState.START, timestamp - 1000);
+    // Then record RENEW
+    auditService.recordOperation(AuditOperationState.RENEW, timestamp);
+
+    // Verify the file is written twice (once for START, once after RENEW)
+    verify(storageLockClient, times(2)).writeObject(
+        contains(String.format("%d_%s.jsonl", TRANSACTION_START_TIME, 
OWNER_ID)),
+        anyString());
+  }
+
+  @Test
+  void testRecordOperationEndSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    when(lockHeldSupplier.get()).thenReturn(false);
+
+    auditService.recordOperation(AuditOperationState.END, timestamp);
+
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", TRANSACTION_START_TIME, 
OWNER_ID)),
+        contentCaptor.capture());
+
+    String[] lines = contentCaptor.getValue().trim().split("\n");
+    Map<String, Object> auditRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    assertEquals("END", auditRecord.get("state"));
+    assertFalse((Boolean) auditRecord.get("lockHeld"));
+    // In real usage, lock expiration is still calculated normally even when 
ending
+    assertEquals(LOCK_EXPIRATION, ((Number) 
auditRecord.get("lockExpiration")).longValue());
+  }
+
+  @Test
+  void testRecordOperationWriteFailure() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(false);
+
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    verify(storageLockClient, times(1)).writeObject(anyString(), anyString());
+  }
+
+  @Test
+  void testRecordOperationWriteException() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), anyString()))
+        .thenThrow(new RuntimeException("Storage error"));
+
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    verify(storageLockClient, times(1)).writeObject(anyString(), anyString());
+  }
+
+  @Test
+  void testFileNameWithFullUuid() throws Exception {
+    String ownerIdWithFullUuid = "12345678-9abc-def0-1234-567890abcdef";
+    long txnStartTime = System.currentTimeMillis();
+    StorageLockProviderAuditService service = new 
StorageLockProviderAuditService(
+        BASE_PATH,
+        ownerIdWithFullUuid,
+        txnStartTime,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    service.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", txnStartTime, 
ownerIdWithFullUuid)),
+        anyString());
+  }
+
+  @Test
+  void testFileNameWithShortOwnerId() throws Exception {
+    String shortOwnerId = "abc123";
+    long txnStartTime = System.currentTimeMillis();
+    StorageLockProviderAuditService service = new 
StorageLockProviderAuditService(
+        BASE_PATH,
+        shortOwnerId,
+        txnStartTime,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    service.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", txnStartTime, shortOwnerId)),
+        anyString());
+  }
+
+  @Test
+  void testFileNameWithRegularOwnerId() throws Exception {
+    String regularOwnerId = "regular-owner-id-without-uuid";
+    long txnStartTime = System.currentTimeMillis();
+    StorageLockProviderAuditService service = new 
StorageLockProviderAuditService(
+        BASE_PATH,
+        regularOwnerId,
+        txnStartTime,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    service.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", txnStartTime, regularOwnerId)),
+        anyString());
+  }
+
+  @Test
+  void testMultipleOperationsInSequence() throws Exception {
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    long startTime = System.currentTimeMillis();
+    long renewTime = startTime + 1000;
+    long endTime = startTime + 2000;
+
+    when(lockHeldSupplier.get()).thenReturn(true, true, false);
+
+    auditService.recordOperation(AuditOperationState.START, startTime);
+    auditService.recordOperation(AuditOperationState.RENEW, renewTime);
+    auditService.recordOperation(AuditOperationState.END, endTime);
+
+    // All operations write to the same JSONL file
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(3)).writeObject(
+        contains(String.format("%d_%s.jsonl", TRANSACTION_START_TIME, 
OWNER_ID)),
+        contentCaptor.capture());
+
+    // The last write should contain all three records
+    String lastContent = contentCaptor.getValue();
+    String[] lines = lastContent.trim().split("\n");
+    assertEquals(3, lines.length, "Should have three JSON lines");
+
+    Map<String, Object> startRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    Map<String, Object> renewRecord = OBJECT_MAPPER.readValue(lines[1], 
Map.class);
+    Map<String, Object> endRecord = OBJECT_MAPPER.readValue(lines[2], 
Map.class);
+
+    assertEquals("START", startRecord.get("state"));
+    assertEquals("RENEW", renewRecord.get("state"));
+    assertEquals("END", endRecord.get("state"));
+  }
+
+  @Test
+  void testAuditRecordContentStructure() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    auditService.recordOperation(AuditOperationState.RENEW, timestamp);
+
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient).writeObject(anyString(), 
contentCaptor.capture());
+
+    String[] lines = contentCaptor.getValue().trim().split("\n");
+    Map<String, Object> auditRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+
+    assertNotNull(auditRecord.get("ownerId"));
+    assertNotNull(auditRecord.get("transactionStartTime"));
+    assertNotNull(auditRecord.get("timestamp"));
+    assertNotNull(auditRecord.get("state"));
+    assertNotNull(auditRecord.get("lockExpiration"));
+    assertNotNull(auditRecord.get("lockHeld"));
+
+    assertEquals(6, auditRecord.size(), "Audit record should contain exactly 6 
fields");
+  }

Review Comment:
   Could the audit record validation be merged into other tests, i.e., when 
validating the record, go through this validation logic (in a method)?



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for StorageLockProviderAuditService.
+ */
+public class TestStorageLockProviderAuditService {
+
+  private static final String BASE_PATH = "s3://bucket/table";
+  private static final String OWNER_ID = 
"writer-12345678-9abc-def0-1234-567890abcdef";
+  private static final long TRANSACTION_START_TIME = 1234567890000L;
+  private static final long LOCK_EXPIRATION = 1000000L;
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private StorageLockClient storageLockClient;
+  private Supplier<Boolean> lockHeldSupplier;
+  private StorageLockProviderAuditService auditService;
+
+  @BeforeEach
+  void setUp() {
+    storageLockClient = mock(StorageLockClient.class);
+    lockHeldSupplier = (Supplier<Boolean>) mock(Supplier.class);
+
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    auditService = new StorageLockProviderAuditService(
+        BASE_PATH,
+        OWNER_ID,
+        TRANSACTION_START_TIME,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+  }
+
+  @Test
+  void testRecordOperationStartSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    ArgumentCaptor<String> pathCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(1)).writeObject(pathCaptor.capture(), 
contentCaptor.capture());
+
+    String expectedPath = 
String.format("%s%s.hoodie%s.locks%saudit%s%d_%s.jsonl",
+        BASE_PATH,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        TRANSACTION_START_TIME,
+        OWNER_ID);
+    assertEquals(expectedPath, pathCaptor.getValue());
+
+    // Parse JSONL content (should be a single line)
+    String[] lines = contentCaptor.getValue().trim().split("\n");
+    assertEquals(1, lines.length, "Should have one JSON line");
+
+    Map<String, Object> auditRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    assertEquals(OWNER_ID, auditRecord.get("ownerId"));
+    assertEquals(TRANSACTION_START_TIME, ((Number) 
auditRecord.get("transactionStartTime")).longValue());
+    assertEquals(timestamp, ((Number) 
auditRecord.get("timestamp")).longValue());
+    assertEquals("START", auditRecord.get("state"));
+    assertEquals(LOCK_EXPIRATION, ((Number) 
auditRecord.get("lockExpiration")).longValue());
+    assertTrue((Boolean) auditRecord.get("lockHeld"));
+  }

Review Comment:
   Should we have a single test on `START`, `RENEW`, and `CLOSE` operation and 
validate all entries in the audit log file?  It looks redundant to have 3 tests 
on each.



##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.shell.standard.ShellComponent;
+import org.springframework.shell.standard.ShellMethod;
+import org.springframework.shell.standard.ShellOption;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * CLI commands for managing Hudi table lock auditing functionality.
+ */
+@ShellComponent
+public class LockAuditingCommand {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LockAuditingCommand.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  @ShellMethod(key = "locks audit enable", value = "Enable storage lock audit 
service for the current table")
+  public String enableLockAudit() {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    try {
+      // Create the audit config file path using utility method
+      String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+      
+      // Create the JSON content
+      ObjectNode configJson = OBJECT_MAPPER.createObjectNode();
+      
configJson.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
 true);
+      String jsonContent = OBJECT_MAPPER.writeValueAsString(configJson);
+      
+      // Write the config file using HoodieStorage
+      StoragePath configPath = new StoragePath(auditConfigPath);
+      try (OutputStream outputStream = HoodieCLI.storage.create(configPath, 
true)) {
+        outputStream.write(jsonContent.getBytes());
+      }
+      
+      return String.format("Lock audit enabled successfully.\nAudit config 
written to: %s\n"
+          + "Audit files will be stored at: %s", auditConfigPath, 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+      
+    } catch (Exception e) {
+      LOG.error("Error enabling lock audit", e);
+      return String.format("Failed to enable lock audit: %s", e.getMessage());
+    }
+  }
+
+  @ShellMethod(key = "locks audit disable", value = "Disable storage lock 
audit service for the current table")
+  public String disableLockAudit(
+      @ShellOption(value = {"--keepAuditFiles"}, defaultValue = "true",
+          help = "Keep existing audit files when disabling") final boolean 
keepAuditFiles) {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    try {
+      // Create the audit config file path
+      String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+      
+      // Check if config file exists
+      StoragePath configPath = new StoragePath(auditConfigPath);
+      if (!HoodieCLI.storage.exists(configPath)) {
+        return "Lock audit is already disabled (no configuration file found).";
+      }
+      
+      // Create the JSON content with audit disabled
+      ObjectNode configJson = OBJECT_MAPPER.createObjectNode();
+      
configJson.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
 false);
+      String jsonContent = OBJECT_MAPPER.writeValueAsString(configJson);
+      
+      // Write the config file
+      try (OutputStream outputStream = HoodieCLI.storage.create(configPath, 
true)) {
+        outputStream.write(jsonContent.getBytes());
+      }
+      
+      String message = String.format("Lock audit disabled successfully.\nAudit 
config updated at: %s", auditConfigPath);
+      
+      if (keepAuditFiles) {
+        message += String.format("\nExisting audit files preserved at: %s", 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+      } else {
+        // Todo: write then call the api method to prune the old files
+        message += String.format("\nAudit files cleaned up at: %s", 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+      }
+      
+      return message;
+      
+    } catch (Exception e) {
+      LOG.error("Error disabling lock audit", e);
+      return String.format("Failed to disable lock audit: %s", e.getMessage());
+    }
+  }
+
+  @ShellMethod(key = "locks audit status", value = "Show the current status of 
lock audit service")
+  public String showLockAuditStatus() {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    try {
+      // Create the audit config file path
+      String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+      
+      // Check if config file exists
+      StoragePath configPath = new StoragePath(auditConfigPath);
+      if (!HoodieCLI.storage.exists(configPath)) {
+        return String.format("Lock Audit Status: DISABLED\n"
+            + "Table: %s\n"
+            + "Config file: %s (not found)\n"
+            + "Use 'locks audit enable' to enable audit logging.", 
+            HoodieCLI.basePath, auditConfigPath);
+      }
+      
+      // Read and parse the configuration
+      String configContent;
+      try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+        configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+      }
+      JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+      JsonNode enabledNode = 
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+      boolean isEnabled = enabledNode != null && enabledNode.asBoolean(false);

Review Comment:
   Got it, I'm wondering if the `HoodieStorage` APIs can be used.



##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockCommand.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.shell.standard.ShellComponent;
+import org.springframework.shell.standard.ShellMethod;
+import org.springframework.shell.standard.ShellOption;
+
+import org.apache.hudi.common.util.FileIOUtils;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService.AUDIT_FOLDER_NAME;
+
+/**
+ * CLI commands for managing Hudi table locking and audit functionality.
+ */
+@ShellComponent
+public class LockCommand {

Review Comment:
   Sg



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for StorageLockProviderAuditService.
+ */
+public class TestStorageLockProviderAuditService {
+
+  private static final String BASE_PATH = "s3://bucket/table";
+  private static final String OWNER_ID = 
"writer-12345678-9abc-def0-1234-567890abcdef";
+  private static final long TRANSACTION_START_TIME = 1234567890000L;
+  private static final long LOCK_EXPIRATION = 1000000L;
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private StorageLockClient storageLockClient;
+  private Supplier<Boolean> lockHeldSupplier;
+  private StorageLockProviderAuditService auditService;
+
+  @BeforeEach
+  void setUp() {
+    storageLockClient = mock(StorageLockClient.class);
+    lockHeldSupplier = (Supplier<Boolean>) mock(Supplier.class);
+
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    auditService = new StorageLockProviderAuditService(
+        BASE_PATH,
+        OWNER_ID,
+        TRANSACTION_START_TIME,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+  }
+
+  @Test
+  void testRecordOperationStartSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    ArgumentCaptor<String> pathCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(1)).writeObject(pathCaptor.capture(), 
contentCaptor.capture());
+
+    String expectedPath = 
String.format("%s%s.hoodie%s.locks%saudit%s%d_%s.jsonl",
+        BASE_PATH,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        TRANSACTION_START_TIME,
+        OWNER_ID);
+    assertEquals(expectedPath, pathCaptor.getValue());
+
+    // Parse JSONL content (should be a single line)
+    String[] lines = contentCaptor.getValue().trim().split("\n");
+    assertEquals(1, lines.length, "Should have one JSON line");
+
+    Map<String, Object> auditRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    assertEquals(OWNER_ID, auditRecord.get("ownerId"));
+    assertEquals(TRANSACTION_START_TIME, ((Number) 
auditRecord.get("transactionStartTime")).longValue());
+    assertEquals(timestamp, ((Number) 
auditRecord.get("timestamp")).longValue());
+    assertEquals("START", auditRecord.get("state"));
+    assertEquals(LOCK_EXPIRATION, ((Number) 
auditRecord.get("lockExpiration")).longValue());
+    assertTrue((Boolean) auditRecord.get("lockHeld"));
+  }
+
+  @Test
+  void testRecordOperationRenewSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    // First record START
+    auditService.recordOperation(AuditOperationState.START, timestamp - 1000);
+    // Then record RENEW
+    auditService.recordOperation(AuditOperationState.RENEW, timestamp);
+
+    // Verify the file is written twice (once for START, once after RENEW)
+    verify(storageLockClient, times(2)).writeObject(
+        contains(String.format("%d_%s.jsonl", TRANSACTION_START_TIME, 
OWNER_ID)),
+        anyString());
+  }
+
+  @Test
+  void testRecordOperationEndSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    when(lockHeldSupplier.get()).thenReturn(false);
+
+    auditService.recordOperation(AuditOperationState.END, timestamp);
+
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", TRANSACTION_START_TIME, 
OWNER_ID)),
+        contentCaptor.capture());
+
+    String[] lines = contentCaptor.getValue().trim().split("\n");
+    Map<String, Object> auditRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    assertEquals("END", auditRecord.get("state"));
+    assertFalse((Boolean) auditRecord.get("lockHeld"));
+    // In real usage, lock expiration is still calculated normally even when 
ending
+    assertEquals(LOCK_EXPIRATION, ((Number) 
auditRecord.get("lockExpiration")).longValue());
+  }
+
+  @Test
+  void testRecordOperationWriteFailure() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(false);
+
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    verify(storageLockClient, times(1)).writeObject(anyString(), anyString());
+  }
+
+  @Test
+  void testRecordOperationWriteException() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), anyString()))
+        .thenThrow(new RuntimeException("Storage error"));
+
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    verify(storageLockClient, times(1)).writeObject(anyString(), anyString());
+  }
+
+  @Test
+  void testFileNameWithFullUuid() throws Exception {
+    String ownerIdWithFullUuid = "12345678-9abc-def0-1234-567890abcdef";
+    long txnStartTime = System.currentTimeMillis();
+    StorageLockProviderAuditService service = new 
StorageLockProviderAuditService(
+        BASE_PATH,
+        ownerIdWithFullUuid,
+        txnStartTime,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    service.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", txnStartTime, 
ownerIdWithFullUuid)),
+        anyString());
+  }
+
+  @Test
+  void testFileNameWithShortOwnerId() throws Exception {
+    String shortOwnerId = "abc123";
+    long txnStartTime = System.currentTimeMillis();
+    StorageLockProviderAuditService service = new 
StorageLockProviderAuditService(
+        BASE_PATH,
+        shortOwnerId,
+        txnStartTime,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    service.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", txnStartTime, shortOwnerId)),
+        anyString());
+  }
+
+  @Test
+  void testFileNameWithRegularOwnerId() throws Exception {
+    String regularOwnerId = "regular-owner-id-without-uuid";
+    long txnStartTime = System.currentTimeMillis();
+    StorageLockProviderAuditService service = new 
StorageLockProviderAuditService(
+        BASE_PATH,
+        regularOwnerId,
+        txnStartTime,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    service.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", txnStartTime, regularOwnerId)),
+        anyString());
+  }

Review Comment:
   Could these tests be merged into one with parameters?  They look similar.



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for StorageLockProviderAuditService.
+ */
+public class TestStorageLockProviderAuditService {
+
+  private static final String BASE_PATH = "s3://bucket/table";
+  private static final String OWNER_ID = 
"writer-12345678-9abc-def0-1234-567890abcdef";
+  private static final long TRANSACTION_START_TIME = 1234567890000L;
+  private static final long LOCK_EXPIRATION = 1000000L;
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private StorageLockClient storageLockClient;
+  private Supplier<Boolean> lockHeldSupplier;
+  private StorageLockProviderAuditService auditService;
+
+  @BeforeEach
+  void setUp() {
+    storageLockClient = mock(StorageLockClient.class);
+    lockHeldSupplier = (Supplier<Boolean>) mock(Supplier.class);
+
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    auditService = new StorageLockProviderAuditService(
+        BASE_PATH,
+        OWNER_ID,
+        TRANSACTION_START_TIME,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+  }
+
+  @Test
+  void testRecordOperationStartSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    ArgumentCaptor<String> pathCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(1)).writeObject(pathCaptor.capture(), 
contentCaptor.capture());
+
+    String expectedPath = 
String.format("%s%s.hoodie%s.locks%saudit%s%d_%s.jsonl",
+        BASE_PATH,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        TRANSACTION_START_TIME,
+        OWNER_ID);
+    assertEquals(expectedPath, pathCaptor.getValue());
+
+    // Parse JSONL content (should be a single line)
+    String[] lines = contentCaptor.getValue().trim().split("\n");
+    assertEquals(1, lines.length, "Should have one JSON line");
+
+    Map<String, Object> auditRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    assertEquals(OWNER_ID, auditRecord.get("ownerId"));
+    assertEquals(TRANSACTION_START_TIME, ((Number) 
auditRecord.get("transactionStartTime")).longValue());
+    assertEquals(timestamp, ((Number) 
auditRecord.get("timestamp")).longValue());
+    assertEquals("START", auditRecord.get("state"));
+    assertEquals(LOCK_EXPIRATION, ((Number) 
auditRecord.get("lockExpiration")).longValue());
+    assertTrue((Boolean) auditRecord.get("lockHeld"));
+  }
+
+  @Test
+  void testRecordOperationRenewSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    // First record START
+    auditService.recordOperation(AuditOperationState.START, timestamp - 1000);
+    // Then record RENEW
+    auditService.recordOperation(AuditOperationState.RENEW, timestamp);
+
+    // Verify the file is written twice (once for START, once after RENEW)
+    verify(storageLockClient, times(2)).writeObject(
+        contains(String.format("%d_%s.jsonl", TRANSACTION_START_TIME, 
OWNER_ID)),
+        anyString());
+  }
+
+  @Test
+  void testRecordOperationEndSuccess() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    when(lockHeldSupplier.get()).thenReturn(false);
+
+    auditService.recordOperation(AuditOperationState.END, timestamp);
+
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(1)).writeObject(
+        contains(String.format("%d_%s.jsonl", TRANSACTION_START_TIME, 
OWNER_ID)),
+        contentCaptor.capture());
+
+    String[] lines = contentCaptor.getValue().trim().split("\n");
+    Map<String, Object> auditRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    assertEquals("END", auditRecord.get("state"));
+    assertFalse((Boolean) auditRecord.get("lockHeld"));
+    // In real usage, lock expiration is still calculated normally even when 
ending
+    assertEquals(LOCK_EXPIRATION, ((Number) 
auditRecord.get("lockExpiration")).longValue());
+  }
+
+  @Test
+  void testRecordOperationWriteFailure() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(false);

Review Comment:
   Revisiting the API `StorageLockClient#writeObject(String filePath, String 
content)` and `StorageLockClient#readObject(String filePath, boolean 
checkExistsFirst)` execute the object/file operations only.  Should 
`HoodieStorage` object be used to read and write files instead of adding these 
APIs to `StorageLockClient`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to