This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ddfcc92b131d [HUDI-7503] Compaction execution should fail if another 
active writer is already executing the same plan (#18012)
ddfcc92b131d is described below

commit ddfcc92b131d8d5867535c5eae47d0e80b16b2ca
Author: Krishen <[email protected]>
AuthorDate: Wed Mar 11 09:37:18 2026 -0700

    [HUDI-7503] Compaction execution should fail if another active writer is 
already executing the same plan (#18012)
    
    Updated compact to start a heartbeat (within a transaction) before 
attempting to execute a plan.
    
    If multiple writers attempt to execute same compact/logcompact plan at same 
time, only one of them will process and the rest will fail with an exception 
(upon seeing a heartbeat has already been started) and will abort.
    
    Summary and Changelog
    Without this change, if multiple jobs are launched at the same that target 
executing the same compact plan (due to a non-HUDI related user-side 
configuration/orchestration issue) then one job can execute the compact plan 
and create an inflight/commit instant file while the other jobs can roll it 
back (and delete inflight instant files or data files). This can lead to 
dataset being in a corrupted state.
    
    With this change, we are updating compact API (that executes an existing 
compaction plan) to first take the table lock (by starting a transaction), 
start a heartbeat, and release the table lock (end the transaction). Though if 
a heartbeat already exists, then an exception will be thrown. The heartbeat 
will be closed when the compaction plan execution succeeds. This means that if 
compaction execution fails but a spark task/driver is still lingering and 
writing/deleting files, the heart [...]
    
    If multiple concurrent job attempt to try execute a compaction plan, then 
all jobs except the first one will fail (until the heartbeat of first on 
expires). Note that the table lock mentioned above is needed to ensure that 
multiple writers don't independently check that heartbeat is inactive and start 
it.
    
    
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 110 ++++--
 .../hudi/client/HoodieFlinkTableServiceClient.java |  56 +--
 .../hudi/client/TestHoodieClientMultiWriter.java   | 381 +++++++++++++++++++++
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java |   2 +
 4 files changed, 490 insertions(+), 59 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 4e08e9fa6824..970df09cffea 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -58,7 +58,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieHeartbeatException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
 import org.apache.hudi.exception.HoodieLogCompactException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -309,21 +311,45 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
    * @return Collection of Write Status
    */
   protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table, 
String compactionInstantTime, boolean shouldComplete) {
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
     InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
     HoodieInstant inflightInstant = 
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
-      table.getMetaClient().reloadActiveTimeline();
+    boolean isMultiWriter = 
config.getWriteConcurrencyMode().supportsMultiWriter();
+    if (isMultiWriter) {
+      try {
+        // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+        // compact job will abort if they attempt to execute compact before 
heartbeat expires
+        // Note that as long as all jobs for this table use this API for 
compact with auto-commit, then this alone should prevent
+        // compact rollbacks from running concurrently to compact commits.
+        txnManager.beginStateChange(Option.of(inflightInstant), 
txnManager.getLastCompletedTransactionOwner());
+        validateHeartBeat(compactionInstantTime);
+        if 
(!table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstantTime))
 {
+          throw new HoodieException("Requested compaction instant " + 
compactionInstantTime + " is not present as pending or already completed in the 
active timeline.");
+        }
+        this.heartbeatClient.start(compactionInstantTime);
+      } finally {
+        txnManager.endStateChange(Option.of(inflightInstant));
+      }
     }
-    compactionTimer = metrics.getCompactionCtx();
-    HoodieWriteMetadata<T> writeMetadata = table.compact(context, 
compactionInstantTime);
-    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime, 
WriteOperationType.COMPACT);
-    HoodieWriteMetadata<O> compactionWriteMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
-    if (shouldComplete) {
-      commitCompaction(compactionInstantTime, compactionWriteMetadata, 
Option.of(table));
+    try {
+      HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
+      if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
+        table.rollbackInflightCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
+        table.getMetaClient().reloadActiveTimeline();
+      }
+      compactionTimer = metrics.getCompactionCtx();
+      HoodieWriteMetadata<T> writeMetadata = table.compact(context, 
compactionInstantTime);
+      HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime, 
WriteOperationType.COMPACT);
+      HoodieWriteMetadata<O> compactionWriteMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
+      if (shouldComplete) {
+        commitCompaction(compactionInstantTime, compactionWriteMetadata, 
Option.of(table));
+      }
+      return compactionWriteMetadata;
+    } catch (Exception e) {
+      if (isMultiWriter) {
+        this.heartbeatClient.stop(compactionInstantTime);
+      }
+      throw e;
     }
-    return compactionWriteMetadata;
   }
 
   /**
@@ -360,32 +386,38 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
    * Commit Compaction and track metrics.
    */
   protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable 
table, String compactionCommitTime, List<HoodieWriteStat> 
partialMetadataWriteStats) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction: " + config.getTableName());
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
-    handleWriteErrors(writeStats, TableServiceType.COMPACT);
-    InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
-    final HoodieInstant compactionInstant = 
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
     try {
-      this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
-      finalizeWrite(table, compactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      writeToMetadataTable(table, compactionCommitTime, metadata, 
partialMetadataWriteStats);
-      log.info("Committing Compaction {}", compactionCommitTime);
-      CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
-      log.debug("Compaction {} finished with result: {}", 
compactionCommitTime, metadata);
+      this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction: " + config.getTableName());
+      List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+      handleWriteErrors(writeStats, TableServiceType.COMPACT);
+      InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
+      final HoodieInstant compactionInstant = 
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
+      try {
+        this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
+        finalizeWrite(table, compactionCommitTime, writeStats);
+        // commit to data table after committing to metadata table.
+        writeToMetadataTable(table, compactionCommitTime, metadata, 
partialMetadataWriteStats);
+        log.info("Committing Compaction {}", compactionCommitTime);
+        CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
+        log.debug("Compaction {} finished with result: {}", 
compactionCommitTime, metadata);
+      } finally {
+        this.txnManager.endStateChange(Option.of(compactionInstant));
+        releaseResources(compactionCommitTime);
+      }
+      WriteMarkersFactory.get(config.getMarkersType(), table, 
compactionCommitTime)
+          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+      if (compactionTimer != null) {
+        long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+        
TimelineUtils.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant
 ->
+            metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, COMPACTION_ACTION)
+        );
+      }
+      log.info("Compacted successfully on commit {}", compactionCommitTime);
     } finally {
-      this.txnManager.endStateChange(Option.of(compactionInstant));
-      releaseResources(compactionCommitTime);
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, 
compactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      
TimelineUtils.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant
 ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, COMPACTION_ACTION)
-      );
+      if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
+        this.heartbeatClient.stop(compactionCommitTime);
+      }
     }
-    log.info("Compacted successfully on commit {}", compactionCommitTime);
   }
 
   protected void writeToMetadataTable(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata, List<HoodieWriteStat> partialMetadataWriteStats) 
{
@@ -1299,4 +1331,14 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
   protected void releaseResources(String instantTime) {
     // do nothing here
   }
+
+  private void validateHeartBeat(String instantTime) {
+    try {
+      if (!this.heartbeatClient.isHeartbeatExpired(instantTime)) {
+        throw new HoodieLockException("Cannot execute instant " + instantTime 
+ " due to heartbeat by concurrent writer/job");
+      }
+    } catch (IOException e) {
+      throw new HoodieHeartbeatException("Error accessing heartbeat of instant 
to execute " + instantTime, e);
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 10925c1b7cb3..5c50a18acc5c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -70,35 +70,41 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
 
   @Override
   protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable 
table, String compactionCommitTime, List<HoodieWriteStat> 
partialMetadataWriteStats) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction: " + config.getTableName());
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
-    final HoodieInstant compactionInstant = 
table.getInstantGenerator().getCompactionInflightInstant(compactionCommitTime);
     try {
-      this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
-      finalizeWrite(table, compactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      // Do not do any conflict resolution here as we do with regular writes. 
We take the lock here to ensure all writes to metadata table happens within a
-      // single lock (single writer). Because more than one write to metadata 
table will result in conflicts since all of them updates the same partition.
-      writeTableMetadata(table, compactionCommitTime, metadata);
-      log.info("Committing Compaction {} finished with result {}.", 
compactionCommitTime, metadata);
-      CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
-    } finally {
-      this.txnManager.endStateChange(Option.of(compactionInstant));
-    }
-    WriteMarkersFactory
-        .get(config.getMarkersType(), table, compactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+      this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction: " + config.getTableName());
+      List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+      final HoodieInstant compactionInstant = 
table.getInstantGenerator().getCompactionInflightInstant(compactionCommitTime);
       try {
-        
metrics.updateCommitMetrics(TimelineUtils.parseDateFromInstantTime(compactionCommitTime).getTime(),
-            durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
-      } catch (ParseException e) {
-        throw new HoodieCommitException("Commit time is not of valid format. 
Failed to commit compaction "
-            + config.getBasePath() + " at time " + compactionCommitTime, e);
+        this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
+        finalizeWrite(table, compactionCommitTime, writeStats);
+        // commit to data table after committing to metadata table.
+        // Do not do any conflict resolution here as we do with regular 
writes. We take the lock here to ensure all writes to metadata table happens 
within a
+        // single lock (single writer). Because more than one write to 
metadata table will result in conflicts since all of them updates the same 
partition.
+        writeTableMetadata(table, compactionCommitTime, metadata);
+        log.info("Committing Compaction {} finished with result {}.", 
compactionCommitTime, metadata);
+        CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
+      } finally {
+        this.txnManager.endStateChange(Option.of(compactionInstant));
+      }
+      WriteMarkersFactory
+          .get(config.getMarkersType(), table, compactionCommitTime)
+          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+      if (compactionTimer != null) {
+        long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+        try {
+          
metrics.updateCommitMetrics(TimelineUtils.parseDateFromInstantTime(compactionCommitTime).getTime(),
+              durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+        } catch (ParseException e) {
+          throw new HoodieCommitException("Commit time is not of valid format. 
Failed to commit compaction "
+              + config.getBasePath() + " at time " + compactionCommitTime, e);
+        }
+      }
+      log.info("Compacted successfully on commit " + compactionCommitTime);
+    } finally {
+      if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
+        this.heartbeatClient.stop(compactionCommitTime);
       }
     }
-    log.info("Compacted successfully on commit " + compactionCommitTime);
   }
 
   protected void completeClustering(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 1a0c0c730741..f07414f18af1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -60,6 +60,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieSchemaEvolutionConflictException;
 import org.apache.hudi.exception.HoodieSchemaNotFoundException;
 import org.apache.hudi.exception.HoodieWriteConflictException;
@@ -915,6 +916,386 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     client3.close();
   }
 
+  /**
+   * Test that when two writers attempt to execute the same compaction plan 
concurrently,
+   * at least one will succeed and create a completed compaction. Verifies 
that the timeline
+   * has compaction requested and completed instant files.
+   *
+   * This test uses a MOR table with multiwriter/optimistic concurrent control 
enabled.
+   */
+  @Test
+  public void testConcurrentCompactionExecutionOnSamePlan() throws Exception {
+    // Set up MOR table
+    setUpMORTestTable();
+
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "1000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, 
"3");
+
+    // Build write config with multiwriter/OCC enabled
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withAutoClean(false).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .withConflictResolutionStrategy(new 
SimpleConcurrentFileWritesConflictResolutionStrategy())
+            .build())
+        .withProperties(properties);
+
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+    // Create initial commit with inserts (creates base files)
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+    // Create delta commits (upserts create log files which are needed for 
compaction)
+    String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(), 
secondCommitTime, 100);
+    String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(), 
thirdCommitTime, 100);
+
+    // Schedule compaction - this creates the compaction plan (requested 
instant)
+    Option<String> compactionInstantOpt = 
client.scheduleTableService(Option.empty(), Option.empty(), 
TableServiceType.COMPACT);
+    assertTrue(compactionInstantOpt.isPresent(), "Compaction should be 
scheduled");
+    String compactionInstantTime = compactionInstantOpt.get();
+
+    // Verify compaction is in requested state before execution
+    HoodieTimeline pendingCompactionTimeline = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+    
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+        "Compaction instant should be in pending state after scheduling");
+
+    // Two clients attempting to execute the same compaction plan concurrently
+    final int threadCount = 2;
+    final ExecutorService executors = 
Executors.newFixedThreadPool(threadCount);
+    final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+    final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+    final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+    Future<?> future1 = executors.submit(() -> {
+      try {
+        // Wait for both writers to be ready
+        cyclicBarrier.await();
+
+        // Attempt to execute compaction with auto-complete
+        client1.compact(compactionInstantTime, true);
+        writer1Succeeded.set(true);
+      } catch (Exception e) {
+        // Expected - one writer may fail due to concurrent execution
+        LOG.info("Writer 1 failed with exception: " + e.getMessage());
+        writer1Succeeded.set(false);
+      }
+    });
+
+    Future<?> future2 = executors.submit(() -> {
+      try {
+        // Wait for both writers to be ready
+        cyclicBarrier.await();
+
+        // Attempt to execute compaction with auto-complete
+        client2.compact(compactionInstantTime, true);
+        writer2Succeeded.set(true);
+      } catch (Exception e) {
+        // Expected - one writer may fail due to concurrent execution
+        LOG.info("Writer 2 failed with exception: " + e.getMessage());
+        writer2Succeeded.set(false);
+      }
+    });
+
+    // Wait for both futures to complete
+    future1.get();
+    future2.get();
+
+    // Verify at least one writer succeeded
+    assertTrue(writer1Succeeded.get() || writer2Succeeded.get(),
+        "At least one writer should succeed in executing the compaction");
+
+    // Verify exactly one writer succeeded
+    assertTrue(writer1Succeeded.get() ^ writer2Succeeded.get(),
+        "Exactly one writer should succeed in executing the compaction");
+
+    // Reload timeline and verify compaction is completed
+    HoodieTimeline reloadedTimeline = metaClient.reloadActiveTimeline();
+
+    // Verify compaction instant is completed
+    HoodieTimeline completedCompactionTimeline = 
reloadedTimeline.filterCompletedInstants().getCommitTimeline();
+
+    // Verify there is a completed compaction instant
+    boolean hasCompletedCompaction = 
completedCompactionTimeline.getInstantsAsStream()
+        .anyMatch(instant -> 
instant.requestedTime().equals(compactionInstantTime));
+    assertTrue(hasCompletedCompaction,
+        "The completed compaction instant should be in the timeline");
+
+    // Verify no pending compaction exists for this instant (it should be 
completed)
+    HoodieTimeline finalPendingCompactionTimeline = 
reloadedTimeline.filterPendingCompactionTimeline();
+    
assertFalse(finalPendingCompactionTimeline.containsInstant(compactionInstantTime),
+        "Compaction instant should no longer be pending after execution");
+
+    // Clean up
+    executors.shutdown();
+    client.close();
+    client1.close();
+    client2.close();
+    FileIOUtils.deleteDirectory(new File(basePath));
+  }
+
+  /**
+   * Test that when writer1 starts compaction (with auto commit enabled) 
execution and fails mid-way,
+   * and after sometime writer2 starts and is able to rollback and execute the 
failed compaction
+   * (after heartbeat expired).
+   *
+   * This test uses a MOR table with multiwriter/optimistic concurrent control 
enabled.
+   */
+  @Test
+  public void testCompactionRecoveryAfterWriterFailureWithHeartbeatExpiry() 
throws Exception {
+    // Set up MOR table
+    setUpMORTestTable();
+
+    // Use short heartbeat interval so we can wait for expiry in test
+    int heartbeatIntervalMs = 2000;
+    int numTolerableHeartbeatMisses = 1;
+
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "1000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, 
"3");
+
+    // Build write config with multiwriter/OCC enabled and short heartbeat
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+        .withHeartbeatIntervalInMs(heartbeatIntervalMs)
+        .withHeartbeatTolerableMisses(numTolerableHeartbeatMisses)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withAutoClean(false).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .withConflictResolutionStrategy(new 
SimpleConcurrentFileWritesConflictResolutionStrategy())
+            .build())
+        .withProperties(properties);
+
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+    // Create initial commit with inserts (creates base files)
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+    // Create delta commits (upserts create log files which are needed for 
compaction)
+    String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(), 
secondCommitTime, 100);
+    String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(), 
thirdCommitTime, 100);
+
+    // Schedule compaction - this creates the compaction plan (requested 
instant)
+    Option<String> compactionInstantOpt = 
client.scheduleTableService(Option.empty(), Option.empty(), 
TableServiceType.COMPACT);
+    assertTrue(compactionInstantOpt.isPresent(), "Compaction should be 
scheduled");
+    String compactionInstantTime = compactionInstantOpt.get();
+    client.close();
+
+    // Verify compaction is in requested state before execution
+    HoodieTimeline pendingCompactionTimeline = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+    
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+        "Compaction instant should be in pending state after scheduling");
+
+    // Writer1: Start compaction with auto-commit, but simulate failure by 
starting heartbeat
+    // and then stopping the writer without completing the compaction
+    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+
+    // Simulate writer1 starting compaction but failing mid-way:
+    // 1. Start heartbeat for the compaction instant
+    // 2. Transition the compaction to inflight state (simulating that 
execution started)
+    // 3. Then "fail" by stopping heartbeat and closing the client without 
completing
+
+    // Start heartbeat for this compaction instant (simulating writer1 started 
execution)
+    client1.getHeartbeatClient().start(compactionInstantTime);
+
+    // Transition compaction from requested to inflight (simulating execution 
started)
+    HoodieInstant requestedInstant = metaClient.reloadActiveTimeline()
+        .filterPendingCompactionTimeline()
+        .getInstantsAsStream()
+        .filter(i -> i.requestedTime().equals(compactionInstantTime))
+        .findFirst()
+        .get();
+    
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(requestedInstant);
+
+    // Verify compaction is now inflight
+    HoodieTimeline reloadedTimeline = metaClient.reloadActiveTimeline();
+    HoodieInstant inflightInstant = 
INSTANT_GENERATOR.getCompactionInflightInstant(compactionInstantTime);
+    
assertTrue(reloadedTimeline.filterPendingCompactionTimeline().containsInstant(inflightInstant),
+        "Compaction instant should be in inflight state");
+
+    // Simulate writer1 failure by stopping heartbeat (deletes heartbeat file 
from DFS, so it is immediately
+    // considered expired when writer2 checks)
+    client1.getHeartbeatClient().stop(compactionInstantTime);
+    client1.close();
+
+    // Writer2: Now comes in after heartbeat expired, should be able to 
rollback and execute compaction
+    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+    // Writer2 attempts to execute the compaction with auto-commit
+    // This should detect expired heartbeat, rollback the inflight compaction, 
and re-execute it
+    assertDoesNotThrow(() -> {
+      client2.compact(compactionInstantTime, true);
+    }, "Writer2 should be able to rollback and execute the failed compaction 
after heartbeat expires");
+
+    // Reload timeline and verify compaction is completed
+    HoodieTimeline finalTimeline = metaClient.reloadActiveTimeline();
+
+    // Verify compaction instant is completed
+    HoodieTimeline completedCompactionTimeline = 
finalTimeline.filterCompletedInstants().getCommitTimeline();
+    boolean hasCompletedCompaction = 
completedCompactionTimeline.getInstantsAsStream()
+        .anyMatch(instant -> 
instant.requestedTime().equals(compactionInstantTime));
+    assertTrue(hasCompletedCompaction,
+        "The compaction instant should be completed after writer2 recovery");
+
+    // Verify no pending compaction exists for this instant (it should be 
completed)
+    HoodieTimeline finalPendingCompactionTimeline = 
finalTimeline.filterPendingCompactionTimeline();
+    
assertFalse(finalPendingCompactionTimeline.containsInstant(compactionInstantTime),
+        "Compaction instant should no longer be pending after recovery 
execution");
+
+    // Verify no inflight rollback instants remain
+    HoodieTimeline inflightRollbacks = 
finalTimeline.getRollbackTimeline().filterInflights();
+    assertEquals(0, inflightRollbacks.countInstants(),
+        "There should be no inflight rollback instants after recovery");
+
+    // Verify exactly 1 completed rollback instant (from rolling back 
writer1's failed compaction)
+    assertEquals(1, 
finalTimeline.getRollbackTimeline().filterCompletedInstants().countInstants(),
+        "There should be exactly 1 completed rollback instant from rolling 
back the failed compaction");
+
+    // Clean up
+    client2.close();
+    FileIOUtils.deleteDirectory(new File(basePath));
+  }
+
+  /**
+   * Test that when a writer starts compaction but the compaction instant is 
no longer in the
+   * active timeline (either already completed or removed), it should throw an 
appropriate error.
+   *
+   * This test uses a MOR table with multiwriter/optimistic concurrent control 
enabled.
+   */
+  @Test
+  public void testCompactionFailsWhenInstantNotInActiveTimeline() throws 
Exception {
+    // Set up MOR table
+    setUpMORTestTable();
+
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "1000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, 
"3");
+
+    // Build write config with multiwriter/OCC enabled
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withAutoClean(false).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .withConflictResolutionStrategy(new 
SimpleConcurrentFileWritesConflictResolutionStrategy())
+            .build())
+        .withProperties(properties);
+
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+    // Create initial commit with inserts (creates base files)
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+    // Create delta commits (upserts create log files which are needed for 
compaction)
+    String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(), 
secondCommitTime, 100);
+    String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(), 
thirdCommitTime, 100);
+
+    // Schedule compaction - this creates the compaction plan (requested 
instant)
+    Option<String> compactionInstantOpt = 
client.scheduleTableService(Option.empty(), Option.empty(), 
TableServiceType.COMPACT);
+    assertTrue(compactionInstantOpt.isPresent(), "Compaction should be 
scheduled");
+    String compactionInstantTime = compactionInstantOpt.get();
+    client.close();
+
+    // Verify compaction is in pending state
+    HoodieTimeline pendingCompactionTimeline = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+    
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+        "Compaction instant should be in pending state after scheduling");
+
+    // Now, simulate the scenario where the compaction instant is removed from 
the active timeline
+    // This could happen if another writer already completed the compaction 
and it was archived,
+    // or the instant was manually deleted. We simulate this by deleting the 
compaction instant files.
+    HoodieInstant requestedInstant = metaClient.reloadActiveTimeline()
+        .filterPendingCompactionTimeline()
+        .getInstantsAsStream()
+        .filter(i -> i.requestedTime().equals(compactionInstantTime))
+        .findFirst()
+        .get();
+
+    // Delete the compaction instant from timeline (simulating it's no longer 
in active timeline)
+    metaClient.getActiveTimeline().deletePending(requestedInstant);
+
+    // Verify compaction instant is no longer in the timeline
+    pendingCompactionTimeline = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+    
assertFalse(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+        "Compaction instant should no longer be in pending timeline after 
deletion");
+
+    // Writer attempts to execute the compaction that is no longer in the 
timeline
+    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+    // Attempting to compact should throw HoodieException since the instant is 
not in active timeline
+    HoodieException exception = assertThrows(HoodieException.class, () -> {
+      client2.compact(compactionInstantTime, true);
+    }, "Compaction should throw an error when instant is not in active 
timeline");
+
+    // Verify the error message indicates the compaction instant is not in 
active timeline
+    assertTrue(exception.getMessage().contains("is not present as pending or 
already completed in the active timeline"),
+        "Exception message should indicate compaction instant is not in active 
timeline. Actual: " + exception.getMessage());
+
+    // Clean up
+    client2.close();
+    FileIOUtils.deleteDirectory(new File(basePath));
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ", 
"COPY_ON_WRITE"})
   public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType 
tableType) throws Exception {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index ca859e29613d..d1f21ddb3c4d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -243,9 +243,11 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
 
       Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
       client.compact(compactionInstant.get());
+      
client.getTableServiceClient().getHeartbeatClient().stop(compactionInstant.get());
 
       // trigger compaction again.
       client.compact(compactionInstant.get());
+      
client.getTableServiceClient().getHeartbeatClient().stop(compactionInstant.get());
 
       metaClient.reloadActiveTimeline();
       // verify that there is no new rollback instant generated


Reply via email to