This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ddfcc92b131d [HUDI-7503] Compaction execution should fail if another
active writer is already executing the same plan (#18012)
ddfcc92b131d is described below
commit ddfcc92b131d8d5867535c5eae47d0e80b16b2ca
Author: Krishen <[email protected]>
AuthorDate: Wed Mar 11 09:37:18 2026 -0700
[HUDI-7503] Compaction execution should fail if another active writer is
already executing the same plan (#18012)
Updated compact to start a heartbeat (within a transaction) before
attempting to execute a plan.
If multiple writers attempt to execute same compact/logcompact plan at same
time, only one of them will process and the rest will fail with an exception
(upon seeing a heartbeat has already been started) and will abort.
Summary and Changelog
Without this change, if multiple jobs are launched at the same that target
executing the same compact plan (due to a non-HUDI related user-side
configuration/orchestration issue) then one job can execute the compact plan
and create an inflight/commit instant file while the other jobs can roll it
back (and delete inflight instant files or data files). This can lead to
dataset being in a corrupted state.
With this change, we are updating compact API (that executes an existing
compaction plan) to first take the table lock (by starting a transaction),
start a heartbeat, and release the table lock (end the transaction). Though if
a heartbeat already exists, then an exception will be thrown. The heartbeat
will be closed when the compaction plan execution succeeds. This means that if
compaction execution fails but a spark task/driver is still lingering and
writing/deleting files, the heart [...]
If multiple concurrent job attempt to try execute a compaction plan, then
all jobs except the first one will fail (until the heartbeat of first on
expires). Note that the table lock mentioned above is needed to ensure that
multiple writers don't independently check that heartbeat is inactive and start
it.
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../hudi/client/BaseHoodieTableServiceClient.java | 110 ++++--
.../hudi/client/HoodieFlinkTableServiceClient.java | 56 +--
.../hudi/client/TestHoodieClientMultiWriter.java | 381 +++++++++++++++++++++
...dieSparkMergeOnReadTableInsertUpdateDelete.java | 2 +
4 files changed, 490 insertions(+), 59 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 4e08e9fa6824..970df09cffea 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -58,7 +58,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieHeartbeatException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.exception.HoodieLogCompactException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -309,21 +311,45 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
* @return Collection of Write Status
*/
protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table,
String compactionInstantTime, boolean shouldComplete) {
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
HoodieInstant inflightInstant =
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
- table.getMetaClient().reloadActiveTimeline();
+ boolean isMultiWriter =
config.getWriteConcurrencyMode().supportsMultiWriter();
+ if (isMultiWriter) {
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for
compact with auto-commit, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginStateChange(Option.of(inflightInstant),
txnManager.getLastCompletedTransactionOwner());
+ validateHeartBeat(compactionInstantTime);
+ if
(!table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstantTime))
{
+ throw new HoodieException("Requested compaction instant " +
compactionInstantTime + " is not present as pending or already completed in the
active timeline.");
+ }
+ this.heartbeatClient.start(compactionInstantTime);
+ } finally {
+ txnManager.endStateChange(Option.of(inflightInstant));
+ }
}
- compactionTimer = metrics.getCompactionCtx();
- HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime,
WriteOperationType.COMPACT);
- HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(updatedWriteMetadata);
- if (shouldComplete) {
- commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
+ try {
+ HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
+ if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
+ table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ compactionTimer = metrics.getCompactionCtx();
+ HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
+ HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime,
WriteOperationType.COMPACT);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(updatedWriteMetadata);
+ if (shouldComplete) {
+ commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
+ }
+ return compactionWriteMetadata;
+ } catch (Exception e) {
+ if (isMultiWriter) {
+ this.heartbeatClient.stop(compactionInstantTime);
+ }
+ throw e;
}
- return compactionWriteMetadata;
}
/**
@@ -360,32 +386,38 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
* Commit Compaction and track metrics.
*/
protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable
table, String compactionCommitTime, List<HoodieWriteStat>
partialMetadataWriteStats) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
- List<HoodieWriteStat> writeStats = metadata.getWriteStats();
- handleWriteErrors(writeStats, TableServiceType.COMPACT);
- InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
- final HoodieInstant compactionInstant =
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
try {
- this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
- finalizeWrite(table, compactionCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- writeToMetadataTable(table, compactionCommitTime, metadata,
partialMetadataWriteStats);
- log.info("Committing Compaction {}", compactionCommitTime);
- CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
- log.debug("Compaction {} finished with result: {}",
compactionCommitTime, metadata);
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ handleWriteErrors(writeStats, TableServiceType.COMPACT);
+ InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
+ final HoodieInstant compactionInstant =
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
+ try {
+ this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
+ finalizeWrite(table, compactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ writeToMetadataTable(table, compactionCommitTime, metadata,
partialMetadataWriteStats);
+ log.info("Committing Compaction {}", compactionCommitTime);
+ CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
+ log.debug("Compaction {} finished with result: {}",
compactionCommitTime, metadata);
+ } finally {
+ this.txnManager.endStateChange(Option.of(compactionInstant));
+ releaseResources(compactionCommitTime);
+ }
+ WriteMarkersFactory.get(config.getMarkersType(), table,
compactionCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (compactionTimer != null) {
+ long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+
TimelineUtils.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant
->
+ metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs,
metadata, COMPACTION_ACTION)
+ );
+ }
+ log.info("Compacted successfully on commit {}", compactionCommitTime);
} finally {
- this.txnManager.endStateChange(Option.of(compactionInstant));
- releaseResources(compactionCommitTime);
- }
- WriteMarkersFactory.get(config.getMarkersType(), table,
compactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-
TimelineUtils.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant
->
- metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs,
metadata, COMPACTION_ACTION)
- );
+ if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
+ this.heartbeatClient.stop(compactionCommitTime);
+ }
}
- log.info("Compacted successfully on commit {}", compactionCommitTime);
}
protected void writeToMetadataTable(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata, List<HoodieWriteStat> partialMetadataWriteStats)
{
@@ -1299,4 +1331,14 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
protected void releaseResources(String instantTime) {
// do nothing here
}
+
+ private void validateHeartBeat(String instantTime) {
+ try {
+ if (!this.heartbeatClient.isHeartbeatExpired(instantTime)) {
+ throw new HoodieLockException("Cannot execute instant " + instantTime
+ " due to heartbeat by concurrent writer/job");
+ }
+ } catch (IOException e) {
+ throw new HoodieHeartbeatException("Error accessing heartbeat of instant
to execute " + instantTime, e);
+ }
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 10925c1b7cb3..5c50a18acc5c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -70,35 +70,41 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
@Override
protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable
table, String compactionCommitTime, List<HoodieWriteStat>
partialMetadataWriteStats) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
- List<HoodieWriteStat> writeStats = metadata.getWriteStats();
- final HoodieInstant compactionInstant =
table.getInstantGenerator().getCompactionInflightInstant(compactionCommitTime);
try {
- this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
- finalizeWrite(table, compactionCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- // Do not do any conflict resolution here as we do with regular writes.
We take the lock here to ensure all writes to metadata table happens within a
- // single lock (single writer). Because more than one write to metadata
table will result in conflicts since all of them updates the same partition.
- writeTableMetadata(table, compactionCommitTime, metadata);
- log.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
- CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
- } finally {
- this.txnManager.endStateChange(Option.of(compactionInstant));
- }
- WriteMarkersFactory
- .get(config.getMarkersType(), table, compactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ final HoodieInstant compactionInstant =
table.getInstantGenerator().getCompactionInflightInstant(compactionCommitTime);
try {
-
metrics.updateCommitMetrics(TimelineUtils.parseDateFromInstantTime(compactionCommitTime).getTime(),
- durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
- } catch (ParseException e) {
- throw new HoodieCommitException("Commit time is not of valid format.
Failed to commit compaction "
- + config.getBasePath() + " at time " + compactionCommitTime, e);
+ this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
+ finalizeWrite(table, compactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ // Do not do any conflict resolution here as we do with regular
writes. We take the lock here to ensure all writes to metadata table happens
within a
+ // single lock (single writer). Because more than one write to
metadata table will result in conflicts since all of them updates the same
partition.
+ writeTableMetadata(table, compactionCommitTime, metadata);
+ log.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
+ CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
+ } finally {
+ this.txnManager.endStateChange(Option.of(compactionInstant));
+ }
+ WriteMarkersFactory
+ .get(config.getMarkersType(), table, compactionCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (compactionTimer != null) {
+ long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+ try {
+
metrics.updateCommitMetrics(TimelineUtils.parseDateFromInstantTime(compactionCommitTime).getTime(),
+ durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Commit time is not of valid format.
Failed to commit compaction "
+ + config.getBasePath() + " at time " + compactionCommitTime, e);
+ }
+ }
+ log.info("Compacted successfully on commit " + compactionCommitTime);
+ } finally {
+ if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
+ this.heartbeatClient.stop(compactionCommitTime);
}
}
- log.info("Compacted successfully on commit " + compactionCommitTime);
}
protected void completeClustering(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 1a0c0c730741..f07414f18af1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -60,6 +60,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieSchemaEvolutionConflictException;
import org.apache.hudi.exception.HoodieSchemaNotFoundException;
import org.apache.hudi.exception.HoodieWriteConflictException;
@@ -915,6 +916,386 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
client3.close();
}
+ /**
+ * Test that when two writers attempt to execute the same compaction plan
concurrently,
+ * at least one will succeed and create a completed compaction. Verifies
that the timeline
+ * has compaction requested and completed instant files.
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testConcurrentCompactionExecutionOnSamePlan() throws Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+
+ // Verify compaction is in requested state before execution
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Two clients attempting to execute the same compaction plan concurrently
+ final int threadCount = 2;
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+ final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+ final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ Future<?> future1 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await();
+
+ // Attempt to execute compaction with auto-complete
+ client1.compact(compactionInstantTime, true);
+ writer1Succeeded.set(true);
+ } catch (Exception e) {
+ // Expected - one writer may fail due to concurrent execution
+ LOG.info("Writer 1 failed with exception: " + e.getMessage());
+ writer1Succeeded.set(false);
+ }
+ });
+
+ Future<?> future2 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await();
+
+ // Attempt to execute compaction with auto-complete
+ client2.compact(compactionInstantTime, true);
+ writer2Succeeded.set(true);
+ } catch (Exception e) {
+ // Expected - one writer may fail due to concurrent execution
+ LOG.info("Writer 2 failed with exception: " + e.getMessage());
+ writer2Succeeded.set(false);
+ }
+ });
+
+ // Wait for both futures to complete
+ future1.get();
+ future2.get();
+
+ // Verify at least one writer succeeded
+ assertTrue(writer1Succeeded.get() || writer2Succeeded.get(),
+ "At least one writer should succeed in executing the compaction");
+
+ // Verify exactly one writer succeeded
+ assertTrue(writer1Succeeded.get() ^ writer2Succeeded.get(),
+ "Exactly one writer should succeed in executing the compaction");
+
+ // Reload timeline and verify compaction is completed
+ HoodieTimeline reloadedTimeline = metaClient.reloadActiveTimeline();
+
+ // Verify compaction instant is completed
+ HoodieTimeline completedCompactionTimeline =
reloadedTimeline.filterCompletedInstants().getCommitTimeline();
+
+ // Verify there is a completed compaction instant
+ boolean hasCompletedCompaction =
completedCompactionTimeline.getInstantsAsStream()
+ .anyMatch(instant ->
instant.requestedTime().equals(compactionInstantTime));
+ assertTrue(hasCompletedCompaction,
+ "The completed compaction instant should be in the timeline");
+
+ // Verify no pending compaction exists for this instant (it should be
completed)
+ HoodieTimeline finalPendingCompactionTimeline =
reloadedTimeline.filterPendingCompactionTimeline();
+
assertFalse(finalPendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should no longer be pending after execution");
+
+ // Clean up
+ executors.shutdown();
+ client.close();
+ client1.close();
+ client2.close();
+ FileIOUtils.deleteDirectory(new File(basePath));
+ }
+
+ /**
+ * Test that when writer1 starts compaction (with auto commit enabled)
execution and fails mid-way,
+ * and after sometime writer2 starts and is able to rollback and execute the
failed compaction
+ * (after heartbeat expired).
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testCompactionRecoveryAfterWriterFailureWithHeartbeatExpiry()
throws Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ // Use short heartbeat interval so we can wait for expiry in test
+ int heartbeatIntervalMs = 2000;
+ int numTolerableHeartbeatMisses = 1;
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled and short heartbeat
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(heartbeatIntervalMs)
+ .withHeartbeatTolerableMisses(numTolerableHeartbeatMisses)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+ client.close();
+
+ // Verify compaction is in requested state before execution
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Writer1: Start compaction with auto-commit, but simulate failure by
starting heartbeat
+ // and then stopping the writer without completing the compaction
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+
+ // Simulate writer1 starting compaction but failing mid-way:
+ // 1. Start heartbeat for the compaction instant
+ // 2. Transition the compaction to inflight state (simulating that
execution started)
+ // 3. Then "fail" by stopping heartbeat and closing the client without
completing
+
+ // Start heartbeat for this compaction instant (simulating writer1 started
execution)
+ client1.getHeartbeatClient().start(compactionInstantTime);
+
+ // Transition compaction from requested to inflight (simulating execution
started)
+ HoodieInstant requestedInstant = metaClient.reloadActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .getInstantsAsStream()
+ .filter(i -> i.requestedTime().equals(compactionInstantTime))
+ .findFirst()
+ .get();
+
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(requestedInstant);
+
+ // Verify compaction is now inflight
+ HoodieTimeline reloadedTimeline = metaClient.reloadActiveTimeline();
+ HoodieInstant inflightInstant =
INSTANT_GENERATOR.getCompactionInflightInstant(compactionInstantTime);
+
assertTrue(reloadedTimeline.filterPendingCompactionTimeline().containsInstant(inflightInstant),
+ "Compaction instant should be in inflight state");
+
+ // Simulate writer1 failure by stopping heartbeat (deletes heartbeat file
from DFS, so it is immediately
+ // considered expired when writer2 checks)
+ client1.getHeartbeatClient().stop(compactionInstantTime);
+ client1.close();
+
+ // Writer2: Now comes in after heartbeat expired, should be able to
rollback and execute compaction
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ // Writer2 attempts to execute the compaction with auto-commit
+ // This should detect expired heartbeat, rollback the inflight compaction,
and re-execute it
+ assertDoesNotThrow(() -> {
+ client2.compact(compactionInstantTime, true);
+ }, "Writer2 should be able to rollback and execute the failed compaction
after heartbeat expires");
+
+ // Reload timeline and verify compaction is completed
+ HoodieTimeline finalTimeline = metaClient.reloadActiveTimeline();
+
+ // Verify compaction instant is completed
+ HoodieTimeline completedCompactionTimeline =
finalTimeline.filterCompletedInstants().getCommitTimeline();
+ boolean hasCompletedCompaction =
completedCompactionTimeline.getInstantsAsStream()
+ .anyMatch(instant ->
instant.requestedTime().equals(compactionInstantTime));
+ assertTrue(hasCompletedCompaction,
+ "The compaction instant should be completed after writer2 recovery");
+
+ // Verify no pending compaction exists for this instant (it should be
completed)
+ HoodieTimeline finalPendingCompactionTimeline =
finalTimeline.filterPendingCompactionTimeline();
+
assertFalse(finalPendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should no longer be pending after recovery
execution");
+
+ // Verify no inflight rollback instants remain
+ HoodieTimeline inflightRollbacks =
finalTimeline.getRollbackTimeline().filterInflights();
+ assertEquals(0, inflightRollbacks.countInstants(),
+ "There should be no inflight rollback instants after recovery");
+
+ // Verify exactly 1 completed rollback instant (from rolling back
writer1's failed compaction)
+ assertEquals(1,
finalTimeline.getRollbackTimeline().filterCompletedInstants().countInstants(),
+ "There should be exactly 1 completed rollback instant from rolling
back the failed compaction");
+
+ // Clean up
+ client2.close();
+ FileIOUtils.deleteDirectory(new File(basePath));
+ }
+
+ /**
+ * Test that when a writer starts compaction but the compaction instant is
no longer in the
+ * active timeline (either already completed or removed), it should throw an
appropriate error.
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testCompactionFailsWhenInstantNotInActiveTimeline() throws
Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+ client.close();
+
+ // Verify compaction is in pending state
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Now, simulate the scenario where the compaction instant is removed from
the active timeline
+ // This could happen if another writer already completed the compaction
and it was archived,
+ // or the instant was manually deleted. We simulate this by deleting the
compaction instant files.
+ HoodieInstant requestedInstant = metaClient.reloadActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .getInstantsAsStream()
+ .filter(i -> i.requestedTime().equals(compactionInstantTime))
+ .findFirst()
+ .get();
+
+ // Delete the compaction instant from timeline (simulating it's no longer
in active timeline)
+ metaClient.getActiveTimeline().deletePending(requestedInstant);
+
+ // Verify compaction instant is no longer in the timeline
+ pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertFalse(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should no longer be in pending timeline after
deletion");
+
+ // Writer attempts to execute the compaction that is no longer in the
timeline
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ // Attempting to compact should throw HoodieException since the instant is
not in active timeline
+ HoodieException exception = assertThrows(HoodieException.class, () -> {
+ client2.compact(compactionInstantTime, true);
+ }, "Compaction should throw an error when instant is not in active
timeline");
+
+ // Verify the error message indicates the compaction instant is not in
active timeline
+ assertTrue(exception.getMessage().contains("is not present as pending or
already completed in the active timeline"),
+ "Exception message should indicate compaction instant is not in active
timeline. Actual: " + exception.getMessage());
+
+ // Clean up
+ client2.close();
+ FileIOUtils.deleteDirectory(new File(basePath));
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ",
"COPY_ON_WRITE"})
public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType
tableType) throws Exception {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index ca859e29613d..d1f21ddb3c4d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -243,9 +243,11 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
client.compact(compactionInstant.get());
+
client.getTableServiceClient().getHeartbeatClient().stop(compactionInstant.get());
// trigger compaction again.
client.compact(compactionInstant.get());
+
client.getTableServiceClient().getHeartbeatClient().stop(compactionInstant.get());
metaClient.reloadActiveTimeline();
// verify that there is no new rollback instant generated