nsivabalan commented on code in PR #18305:
URL: https://github.com/apache/hudi/pull/18305#discussion_r2933991645


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -949,13 +976,13 @@ protected void archive(HoodieTable table) {
   }
 
   /**
-   * Get inflight timeline excluding compaction and clustering.
+   * Get inflight timeline excluding compaction, log compaction, and 
clustering.
    *
    * @param metaClient
    * @return
    */
   private HoodieTimeline 
getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient 
metaClient) {
-    HoodieTimeline inflightTimelineExcludingCompaction = 
metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
+    HoodieTimeline inflightTimelineExcludingCompaction = 
metaClient.getCommitsTimeline().filterPendingExcludingCompactionAndLogCompaction();

Review Comment:
   hmmm, based on current code, I don't think we can exclude log compaction 
from here. 
   
   as per master, log compaction is treated as yet another ingestion commit. 
So, rollback of failed writes by ingestion writer or by Clean (lazy), should 
account for log compaction instants. 
   
   and this is the api being used w/n rollbackFailedWrites ->  
getInstantsToRollback.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -224,25 +224,46 @@ protected HoodieWriteMetadata<O> logCompact(String 
logCompactionInstantTime, boo
           + "timestamp exists. Instant details %s", 
compactionInstantWithGreaterTimestamp.get()));
     }
 
-    HoodieTimeline pendingLogCompactionTimeline = 
table.getActiveTimeline().filterPendingLogCompactionTimeline();
     InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
     HoodieInstant inflightInstant = 
instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime);
-    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
-      log.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
-      table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Execution is aborted since it found an 
Inflight logcompaction,"
-          + "log compaction plans are mutable plans, so reschedule another 
logcompaction.");
+    boolean isMultiWriter = 
config.getWriteConcurrencyMode().supportsMultiWriter();
+    if (isMultiWriter) {

Review Comment:
   can we move this to a private method named `enableHeartBeatForMultiWriter`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -224,25 +224,46 @@ protected HoodieWriteMetadata<O> logCompact(String 
logCompactionInstantTime, boo
           + "timestamp exists. Instant details %s", 
compactionInstantWithGreaterTimestamp.get()));
     }
 
-    HoodieTimeline pendingLogCompactionTimeline = 
table.getActiveTimeline().filterPendingLogCompactionTimeline();
     InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
     HoodieInstant inflightInstant = 
instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime);
-    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
-      log.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
-      table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Execution is aborted since it found an 
Inflight logcompaction,"
-          + "log compaction plans are mutable plans, so reschedule another 
logcompaction.");
+    boolean isMultiWriter = 
config.getWriteConcurrencyMode().supportsMultiWriter();
+    if (isMultiWriter) {
+      try {
+        txnManager.beginStateChange(Option.of(inflightInstant), 
txnManager.getLastCompletedTransactionOwner());
+        validateHeartBeat(logCompactionInstantTime);

Review Comment:
   whats the expected behavior if the log compaction is completed by the time 
we reach L 233? 
   this thread will hit exception due to heart heat ?
   
   guess we might see heartbeat as expired in both cases
   a. heart beat expired due to a failed write. 
   b. or if the log compaction completed. 
   
   don't we want to differentiate a and b. or its ok to fail current thread in 
both cases. 
    



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -1296,6 +1296,196 @@ public void 
testCompactionFailsWhenInstantNotInActiveTimeline() throws Exception
     FileIOUtils.deleteDirectory(new File(basePath));
   }
 
+  /**
+   * Test that when two writers attempt to execute the same log compaction 
plan concurrently,
+   * at least one will succeed and the other will fail due to heartbeat guard.
+   *
+   * This test uses a MOR table with multiwriter/optimistic concurrent control 
enabled.
+   */
+  @Test
+  public void testConcurrentLogCompactionExecutionOnSamePlan() throws 
Exception {
+    setUpMORTestTable();
+
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "1000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, 
"3");
+
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withAutoClean(false).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(2)
+            .withLogCompactionBlocksThreshold(1).build())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .withConflictResolutionStrategy(new 
SimpleConcurrentFileWritesConflictResolutionStrategy())
+            .build())
+        .withProperties(properties);
+
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+    String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(), 
secondCommitTime, 100);
+    String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(), 
thirdCommitTime, 100);
+
+    Option<String> logCompactionInstantOpt = 
client.scheduleTableService(Option.empty(), Option.empty(), 
TableServiceType.LOG_COMPACT);
+    assertTrue(logCompactionInstantOpt.isPresent(), "Log compaction should be 
scheduled");
+    String logCompactionInstantTime = logCompactionInstantOpt.get();
+
+    HoodieTimeline pendingLogCompactionTimeline = 
metaClient.reloadActiveTimeline().filterPendingLogCompactionTimeline();
+    
assertTrue(pendingLogCompactionTimeline.containsInstant(logCompactionInstantTime),
+        "Log compaction instant should be in pending state after scheduling");
+
+    final int threadCount = 2;
+    final ExecutorService executors = 
Executors.newFixedThreadPool(threadCount);
+    final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+    final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+    final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+    Future<?> future1 = executors.submit(() -> {
+      try {
+        cyclicBarrier.await();
+        client1.logCompact(logCompactionInstantTime, true);
+        writer1Succeeded.set(true);
+      } catch (Exception e) {
+        LOG.info("Writer 1 failed with exception: " + e.getMessage());
+        writer1Succeeded.set(false);
+      }
+    });
+
+    Future<?> future2 = executors.submit(() -> {
+      try {
+        cyclicBarrier.await();
+        client2.logCompact(logCompactionInstantTime, true);
+        writer2Succeeded.set(true);
+      } catch (Exception e) {
+        LOG.info("Writer 2 failed with exception: " + e.getMessage());
+        writer2Succeeded.set(false);
+      }
+    });
+
+    future1.get();
+    future2.get();
+
+    assertTrue(writer1Succeeded.get() || writer2Succeeded.get(),
+        "At least one writer should succeed in executing the log compaction");
+
+    assertTrue(writer1Succeeded.get() ^ writer2Succeeded.get(),
+        "Exactly one writer should succeed in executing the log compaction");
+
+    executors.shutdown();

Review Comment:
   can we move shutdown and close calls and any such clean ups to finally block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to