This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 39cb726bebe9 feat(common): Add Policy for cleanup/rollback before each
write (#18197)
39cb726bebe9 is described below
commit 39cb726bebe9c26e494ce33a4341a0d1b56bce27
Author: Krishen <[email protected]>
AuthorDate: Wed Mar 11 10:04:26 2026 -0700
feat(common): Add Policy for cleanup/rollback before each write (#18197)
Summary and Changelog
Summary: New config hoodie.cleaner.prewrite.cleaner.policy controls whether
to run a CLEAN and/or rollback of failed writes before starting a new ingestion
commit. Supports "clean" (run CLEAN, which also rolls back failed writes) and
"rollback_failed_writes" (only roll back failed writes). Default is empty (no
pre-write clean/rollback). Intended for temporary use to “force” a clean before
the next write when needed; not recommended as a default because a slow or
failing clean can furt [...]
Changelog:
HoodieCleanConfig: Added PREWRITE_CLEANER_POLICY and
withPreWriteCleanerPolicy(String).
HoodieWriteConfig: Added getPreWriteCleanerPolicy().
BaseHoodieWriteClient: Invoke pre-write cleaner policy in startCommit
TestCleaner: Added parameterized test testPreWriteCleanPolicy for both
policy values
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 23 +++
.../org/apache/hudi/config/HoodieCleanConfig.java | 13 ++
.../org/apache/hudi/config/HoodieWriteConfig.java | 5 +
.../java/org/apache/hudi/table/TestCleaner.java | 170 +++++++++++++++++++++
.../common/model/HoodiePreWriteCleanerPolicy.java | 74 +++++++++
5 files changed, 285 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 3ec602f6a8e8..2ec7476f6501 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -1046,6 +1047,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
// unclear what instant to use, since upgrade does have a given instant.
executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient,
Option.empty()));
}
+ runPreWriteCleanerPolicy(metaClient);
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites(metaClient));
@@ -1716,4 +1718,25 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
}
});
}
+
+ private void runPreWriteCleanerPolicy(HoodieTableMetaClient metaClient) {
+ if (!config.areTableServicesEnabled()) {
+ log.info("Skipping pre-write cleaner policy since table services are
disabled");
+ return;
+ }
+ HoodiePreWriteCleanerPolicy policy = config.getPreWriteCleanerPolicy();
+ if (policy.isNone()) {
+ return;
+ }
+ switch (policy) {
+ case CLEAN:
+ clean();
+ break;
+ case ROLLBACK_FAILED_WRITES:
+ rollbackFailedWrites(metaClient);
+ break;
+ default:
+ break;
+ }
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
index e5a638956575..3242a886a3f0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
@@ -212,6 +213,12 @@ public class HoodieCleanConfig extends HoodieConfig {
+ "By using local engine context, file listing is performed on the
driver, allowing targeted memory scaling. "
+ "When enabled, both non-partitioned datasets and metadata tables
use the driver for scheduling cleans.");
+ public static final ConfigProperty<String> PREWRITE_CLEANER_POLICY =
ConfigProperty
+ .key("hoodie.prewrite.cleaner.policy")
+ .defaultValue(HoodiePreWriteCleanerPolicy.NONE.name())
+ .sinceVersion("1.2.0")
+ .withDocumentation(HoodiePreWriteCleanerPolicy.class);
+
private static final String CLEAN_PARTITION_FILTER_REGEX_KEY =
"hoodie.clean.partition.filter.regex";
private static final String CLEAN_PARTITION_FILTER_SELECTED_KEY =
"hoodie.clean.partition.filter.selected";
@@ -402,9 +409,15 @@ public class HoodieCleanConfig extends HoodieConfig {
return this;
}
+ public HoodieCleanConfig.Builder
withPreWriteCleanerPolicy(HoodiePreWriteCleanerPolicy preWriteCleanerPolicy) {
+ cleanConfig.setValue(PREWRITE_CLEANER_POLICY,
preWriteCleanerPolicy.name());
+ return this;
+ }
+
public HoodieCleanConfig build() {
cleanConfig.setDefaults(HoodieCleanConfig.class.getName());
HoodieCleaningPolicy.valueOf(cleanConfig.getString(CLEANER_POLICY));
+
HoodiePreWriteCleanerPolicy.fromString(cleanConfig.getString(PREWRITE_CLEANER_POLICY));
return cleanConfig;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 734339d5fdef..d68e3e6a94a7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FileSystemRetryConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -1970,6 +1971,10 @@ public class HoodieWriteConfig extends HoodieConfig {
.valueOf(getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY));
}
+ public HoodiePreWriteCleanerPolicy getPreWriteCleanerPolicy() {
+ return
HoodiePreWriteCleanerPolicy.fromString(getString(HoodieCleanConfig.PREWRITE_CLEANER_POLICY));
+ }
+
public String getCompactionSpecifyPartitionPathRegex() {
return
getString(HoodieCompactionConfig.COMPACTION_SPECIFY_PARTITION_PATH_REGEX);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 24fcb656ef1d..6b4f4f716019 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -84,8 +85,11 @@ import org.apache.hudi.testutils.HoodieCleanerTestBase;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.hudi.client.WriteClientTestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -105,6 +109,7 @@ import java.util.stream.Stream;
import scala.Tuple3;
+import static
org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_COMMITS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
@@ -1525,4 +1530,169 @@ public class TestCleaner extends HoodieCleanerTestBase {
testTable.close();
}
}
+
+ private static Stream<Arguments>
preWriteCleanPolicyTypeAndCommitTimeSpecified() {
+ return Stream.of(
+ Arguments.of(HoodiePreWriteCleanerPolicy.CLEAN, true),
+ Arguments.of(HoodiePreWriteCleanerPolicy.CLEAN, false),
+ Arguments.of(HoodiePreWriteCleanerPolicy.ROLLBACK_FAILED_WRITES, true),
+ Arguments.of(HoodiePreWriteCleanerPolicy.ROLLBACK_FAILED_WRITES, false)
+ );
+ }
+
+ /**
+ * Test that both pre write clean policies (CLEAN and
ROLLBACK_FAILED_WRITES) are enforced/executed by APIs
+ * used for starting an (ingestion) write commit (startCommit and
startCommitWithTime).
+ */
+ @ParameterizedTest
+ @MethodSource("preWriteCleanPolicyTypeAndCommitTimeSpecified")
+ public void testPreWriteCleanPolicy(HoodiePreWriteCleanerPolicy policy,
boolean commitTimeSpecified) throws Exception {
+ int maxCommits = 2; // keep up to 2 commits from the past
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ // Disable auto clean to ensure that clean/rollback only happens
during pre-write phase
+ .withAutoClean(false)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withCleanerPolicy(KEEP_LATEST_COMMITS)
+ .withPreWriteCleanerPolicy(policy)
+ .retainCommits(maxCommits).build())
+ .withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .build();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ // Complete 1 insert and 3 upserts.
+ String firstCommit = WriteClientTestUtils.createNewInstantTime();
+ insertBatch(cfg, client, firstCommit, "000", 1000,
+ SparkRDDWriteClient::insert, false, true, 1000, 1000, 1,
Option.empty(), INSTANT_GENERATOR);
+
+ String secondCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, secondCommit, firstCommit,
Option.of(Arrays.asList(firstCommit)),
+ "000", 100, SparkRDDWriteClient::upsert, false, true,
+ 100, 1000, 2, true, INSTANT_GENERATOR);
+
+ String thirdCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, thirdCommit, secondCommit,
Option.of(Arrays.asList(secondCommit)),
+ "000", 100, SparkRDDWriteClient::upsert, false, true,
+ 100, 1000, 3, true, INSTANT_GENERATOR);
+
+ String fourthCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, fourthCommit, thirdCommit,
Option.of(Arrays.asList(thirdCommit)),
+ "000", 100, SparkRDDWriteClient::upsert, false, true,
+ 100, 1000, 4, true, INSTANT_GENERATOR);
+
+ final String fifthCommit;
+ if (commitTimeSpecified) {
+ fifthCommit = WriteClientTestUtils.createNewInstantTime();
+ WriteClientTestUtils.startCommitWithTime(client, fifthCommit);
+ } else {
+ fifthCommit = client.startCommit();
+ }
+ // Stop heartbeat so that HoodieFailedWritesCleaningPolicy.LAZY rollback
policy will consider
+ // this inflight as a failed write
+ client.getHeartbeatClient().stop(fifthCommit);
+
+ // fifthCommit inflight will still be present, but if policy is CLEAN then
a clean should be completed
+ assertEquals(5,
metaClient.reloadActiveTimeline().getWriteTimeline().countInstants());
+ if (policy.isClean()) {
+ assertEquals(1,
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+ } else {
+ assertEquals(0,
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+ }
+
+ String sixthCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, sixthCommit, fourthCommit,
Option.of(Arrays.asList(fourthCommit)),
+ "000", 100, SparkRDDWriteClient::upsert, false, true,
+ 100, 1000, 5, true, INSTANT_GENERATOR);
+
+ final String seventhCommit;
+ if (commitTimeSpecified) {
+ seventhCommit = WriteClientTestUtils.createNewInstantTime();
+ WriteClientTestUtils.startCommitWithTime(client, seventhCommit);
+ } else {
+ seventhCommit = client.startCommit();
+ }
+
+ // fifthCommit inflight should be rolled back for both CLEAN and
ROLLBACK_FAILED_WRITES policies.
+ // But only the former should lead to creating another clean
+ assertEquals(6,
metaClient.reloadActiveTimeline().getWriteTimeline().countInstants());
+ if (policy.isClean()) {
+ assertEquals(2,
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+ } else {
+ assertEquals(0,
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+ }
+ }
+
+ /**
+ * Test that pre-write clean/rollback does NOT happen when table services
are disabled,
+ * even if the user explicitly sets a prewrite cleaner policy. The guard in
+ * BaseHoodieWriteClient.runPreWriteCleanerPolicy should skip execution.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testPreWriteCleanPolicyDisabledWhenTableServicesDisabled(boolean
commitTimeSpecified) throws Exception {
+ int maxCommits = 2;
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withTableServicesEnabled(false)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withCleanerPolicy(KEEP_LATEST_COMMITS)
+ .withPreWriteCleanerPolicy(HoodiePreWriteCleanerPolicy.CLEAN)
+ .retainCommits(maxCommits).build())
+ .withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .build();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ String firstCommit = WriteClientTestUtils.createNewInstantTime();
+ insertBatch(cfg, client, firstCommit, "000", 1000,
+ SparkRDDWriteClient::insert, false, true, 1000, 1000, 1,
Option.empty(), INSTANT_GENERATOR);
+
+ String secondCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, secondCommit, firstCommit,
Option.of(Arrays.asList(firstCommit)),
+ "000", 100, SparkRDDWriteClient::upsert, false, true,
+ 100, 1000, 2, true, INSTANT_GENERATOR);
+
+ String thirdCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, thirdCommit, secondCommit,
Option.of(Arrays.asList(secondCommit)),
+ "000", 100, SparkRDDWriteClient::upsert, false, true,
+ 100, 1000, 3, true, INSTANT_GENERATOR);
+
+ String fourthCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, fourthCommit, thirdCommit,
Option.of(Arrays.asList(thirdCommit)),
+ "000", 100, SparkRDDWriteClient::upsert, false, true,
+ 100, 1000, 4, true, INSTANT_GENERATOR);
+
+ final String fifthCommit;
+ if (commitTimeSpecified) {
+ fifthCommit = WriteClientTestUtils.createNewInstantTime();
+ WriteClientTestUtils.startCommitWithTime(client, fifthCommit);
+ } else {
+ fifthCommit = client.startCommit();
+ }
+ client.getHeartbeatClient().stop(fifthCommit);
+
+ // With table services disabled, pre-write clean should NOT have occurred.
+ assertEquals(5,
metaClient.reloadActiveTimeline().getWriteTimeline().countInstants());
+ assertEquals(0,
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+
+ // fifthCommit inflight is still on the timeline (not rolled back), so 6th
commit makes 6 total
+ String sixthCommit = WriteClientTestUtils.createNewInstantTime();
+ updateBatch(cfg, client, sixthCommit, fourthCommit,
Option.of(Arrays.asList(fourthCommit)),
+ "000", 100, SparkRDDWriteClient::upsert, false, true,
+ 100, 1000, 6, true, INSTANT_GENERATOR);
+
+ final String seventhCommit;
+ if (commitTimeSpecified) {
+ seventhCommit = WriteClientTestUtils.createNewInstantTime();
+ WriteClientTestUtils.startCommitWithTime(client, seventhCommit);
+ } else {
+ seventhCommit = client.startCommit();
+ }
+
+ // fifthCommit inflight should NOT have been rolled back since table
services are disabled.
+ assertEquals(7,
metaClient.reloadActiveTimeline().getWriteTimeline().countInstants());
+ assertEquals(0,
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreWriteCleanerPolicy.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreWriteCleanerPolicy.java
new file mode 100644
index 000000000000..2b9ab083875b
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreWriteCleanerPolicy.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.EnumDescription;
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+/**
+ * Policy for running clean and/or rollback of failed writes before starting a
new ingestion commit
+ * (when startCommit or startCommitWithTime is invoked).
+ */
+@EnumDescription("If set, force attempting clean and/or failed writes rollback
before starting a new ingestion write commit. "
+ + "This should only be set to ensure that data files do not build up on
DFS if an ingestion "
+ + "writer is perpetually failing before completing a CLEAN.")
+public enum HoodiePreWriteCleanerPolicy {
+
+ @EnumFieldDescription("No pre-write clean or rollback. Default behavior.")
+ NONE,
+
+ @EnumFieldDescription("Force a CLEAN table service call before starting the
write (also performs rollback of failed writes).")
+ CLEAN,
+
+ @EnumFieldDescription("Only perform rollback of failed writes before
starting the write.")
+ ROLLBACK_FAILED_WRITES;
+
+ public boolean isNone() {
+ return this == NONE;
+ }
+
+ public boolean isClean() {
+ return this == CLEAN;
+ }
+
+ public boolean isRollbackFailedWrites() {
+ return this == ROLLBACK_FAILED_WRITES;
+ }
+
+ /**
+ * Parses the config value into an enum. Supports enum names (NONE, CLEAN,
ROLLBACK_FAILED_WRITES)
+ * and legacy string values ("clean", "rollback_failed_writes", empty/null
for NONE).
+ */
+ public static HoodiePreWriteCleanerPolicy fromString(String value) {
+ if (value == null || value.isEmpty()) {
+ return NONE;
+ }
+ String normalized = value.trim();
+ if (normalized.isEmpty()) {
+ return NONE;
+ }
+ if ("rollback_failed_writes".equalsIgnoreCase(normalized)) {
+ return ROLLBACK_FAILED_WRITES;
+ }
+ if ("clean".equalsIgnoreCase(normalized)) {
+ return CLEAN;
+ }
+ return HoodiePreWriteCleanerPolicy.valueOf(normalized.toUpperCase());
+ }
+}