This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 41337396a9d6 perf: Skip unnecessary clean planning for MOR metadata
table file-version cleaning (#17943)
41337396a9d6 is described below
commit 41337396a9d665fa9f4f71e97c0fca57c8adce00
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Mar 24 14:01:02 2026 -0700
perf: Skip unnecessary clean planning for MOR metadata table file-version
cleaning (#17943)
* Avoid checking for clean operation on metadata table when unnecessary.
Some datasets are taking longer time to check for clean commit, so in metadata
table avoiding the instances where checking for clean is not necessary.
---
.../hudi/table/action/clean/CleanPlanner.java | 27 +++++++
.../table/functional/TestCleanPlanExecutor.java | 82 ++++++++++++++++++++++
2 files changed, 109 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 772f0a236219..49e6f7ae6ea3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantComparison;
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -156,12 +157,38 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
case KEEP_LATEST_BY_HOURS:
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
case KEEP_LATEST_FILE_VERSIONS:
+ if (canSkipClean()) {
+ return Collections.emptyList();
+ }
return getPartitionPathsForFullCleaning();
default:
throw new IllegalStateException("Unknown Cleaner Policy");
}
}
+ /**
+ * Returns true if clean can be skipped for {@code
KEEP_LATEST_FILE_VERSIONS} on the MOR metadata table
+ * when only completed delta commits occurred after the last clean.
+ */
+ private boolean canSkipClean() {
+ if
(!HoodieTableType.MERGE_ON_READ.equals(hoodieTable.getMetaClient().getTableType())
+ || !hoodieTable.isMetadataTable()) {
+ return false;
+ }
+ HoodieTimeline activeTimeline = hoodieTable.getActiveTimeline();
+ Option<HoodieInstant> lastCleanInstant =
activeTimeline.getCleanerTimeline().lastInstant();
+ if (!lastCleanInstant.isPresent()) {
+ return false;
+ }
+
+ // Skip only when every completed write after the last clean is still a
deltacommit.
+ return activeTimeline.getWriteTimeline()
+ .filterCompletedInstants()
+ .filter(instant ->
!instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION))
+ .filter(instant ->
InstantComparison.compareTimestamps(instant.getCompletionTime(),
+ GREATER_THAN,
lastCleanInstant.get().requestedTime())).countInstants() == 0;
+ }
+
/**
* Return partition paths for cleaning by commits mode.
* @param instantToRetain Earliest Instant to retain
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index e572fef12260..66385e0952e9 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.functional;
import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.BootstrapFileMapping;
@@ -38,7 +39,9 @@ import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.testutils.HoodieCleanerTestBase;
import org.junit.jupiter.api.Test;
@@ -635,6 +638,85 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
}
}
+ /**
+ * Test canSkipClean for MOR metadata table.
+ * Verifies the optimization that skips clean when:
+ * - Table type is MOR
+ * - Table is a metadata table
+ * - Only delta commits exist after the last clean
+ */
+ @Test
+ public void testCanSkipCleanForMetadataTable() throws Exception {
+ // Initialize metadata table with MOR type
+ String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath);
+
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(metadataTableBasePath)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withMetadataIndexColumnStats(false)
+ .build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
+ .retainFileVersions(1)
+ .build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .build())
+ .build();
+
+ HoodieTableMetaClient metadataMetaClient =
HoodieTestUtils.init(storageConf, metadataTableBasePath,
HoodieTableType.MERGE_ON_READ);
+ HoodieTestTable testTable = HoodieTestTable.of(metadataMetaClient);
+ try {
+ String p0 = "files";
+
+ // Create first delta commit
+ String firstDelta = WriteClientTestUtils.createNewInstantTime();
+ testTable.addDeltaCommit(firstDelta);
+
+ // Create a clean
+ String cleanInstant = WriteClientTestUtils.createNewInstantTime();
+ testTable.addClean(cleanInstant);
+
+ // Create delta commits after clean (these should allow clean to be
skipped)
+ String secondDelta = WriteClientTestUtils.createNewInstantTime();
+ testTable.addDeltaCommit(secondDelta);
+
+ String thirdDelta = WriteClientTestUtils.createNewInstantTime();
+ testTable.addDeltaCommit(thirdDelta);
+
+ // Reload metaClient to get latest timeline
+ metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+
+ // Create CleanPlanner on the metadata table
+ org.apache.hudi.table.HoodieSparkTable sparkTable =
org.apache.hudi.table.HoodieSparkTable.create(config, context,
metadataMetaClient);
+ org.apache.hudi.table.action.clean.CleanPlanner cleanPlanner =
+ new org.apache.hudi.table.action.clean.CleanPlanner(context,
sparkTable, config);
+
+ // Use reflection to invoke the private canSkipClean method
+ java.lang.reflect.Method canSkipCleanMethod =
+
org.apache.hudi.table.action.clean.CleanPlanner.class.getDeclaredMethod("canSkipClean");
+ canSkipCleanMethod.setAccessible(true);
+ boolean result = (boolean) canSkipCleanMethod.invoke(cleanPlanner);
+
+ // Verify canSkipClean returns true for metadata table with only delta
commits after clean
+ assertTrue(result, "canSkipClean should return true for MOR metadata
table with only delta commits after clean");
+
+ // Now add a non-delta commit (e.g., COMMIT from compaction) and verify
it returns false
+ String compactionCommit = WriteClientTestUtils.createNewInstantTime();
+ testTable.addCommit(compactionCommit);
+
+ metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+ sparkTable = org.apache.hudi.table.HoodieSparkTable.create(config,
context, metadataMetaClient);
+ cleanPlanner = new
org.apache.hudi.table.action.clean.CleanPlanner(context, sparkTable, config);
+
+ result = (boolean) canSkipCleanMethod.invoke(cleanPlanner);
+
+ // Verify canSkipClean returns false when there's a non-delta commit
after clean
+ assertFalse(result, "canSkipClean should return false when there are
non-delta commits after the last clean");
+ } finally {
+ testTable.close();
+ }
+ }
+
/**
* Tests cleaning service based on number of hours retained.
*/