This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 39cb726bebe9 feat(common): Add Policy for cleanup/rollback before each 
write (#18197)
39cb726bebe9 is described below

commit 39cb726bebe9c26e494ce33a4341a0d1b56bce27
Author: Krishen <[email protected]>
AuthorDate: Wed Mar 11 10:04:26 2026 -0700

    feat(common): Add Policy for cleanup/rollback before each write (#18197)
    
    Summary and Changelog
    Summary: New config hoodie.cleaner.prewrite.cleaner.policy controls whether 
to run a CLEAN and/or rollback of failed writes before starting a new ingestion 
commit. Supports "clean" (run CLEAN, which also rolls back failed writes) and 
"rollback_failed_writes" (only roll back failed writes). Default is empty (no 
pre-write clean/rollback). Intended for temporary use to “force” a clean before 
the next write when needed; not recommended as a default because a slow or 
failing clean can furt [...]
    
    Changelog:
    
    HoodieCleanConfig: Added PREWRITE_CLEANER_POLICY and 
withPreWriteCleanerPolicy(String).
    HoodieWriteConfig: Added getPreWriteCleanerPolicy().
    BaseHoodieWriteClient: Invoke pre-write cleaner policy in startCommit
    TestCleaner: Added parameterized test testPreWriteCleanPolicy for both 
policy values
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  23 +++
 .../org/apache/hudi/config/HoodieCleanConfig.java  |  13 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   5 +
 .../java/org/apache/hudi/table/TestCleaner.java    | 170 +++++++++++++++++++++
 .../common/model/HoodiePreWriteCleanerPolicy.java  |  74 +++++++++
 5 files changed, 285 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 3ec602f6a8e8..2ec7476f6501 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -1046,6 +1047,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       // unclear what instant to use, since upgrade does have a given instant.
       executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient, 
Option.empty()));
     }
+    runPreWriteCleanerPolicy(metaClient);
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
         HoodieTimeline.COMMIT_ACTION, () -> 
tableServiceClient.rollbackFailedWrites(metaClient));
 
@@ -1716,4 +1718,25 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       }
     });
   }
+
+  private void runPreWriteCleanerPolicy(HoodieTableMetaClient metaClient) {
+    if (!config.areTableServicesEnabled()) {
+      log.info("Skipping pre-write cleaner policy since table services are 
disabled");
+      return;
+    }
+    HoodiePreWriteCleanerPolicy policy = config.getPreWriteCleanerPolicy();
+    if (policy.isNone()) {
+      return;
+    }
+    switch (policy) {
+      case CLEAN:
+        clean();
+        break;
+      case ROLLBACK_FAILED_WRITES:
+        rollbackFailedWrites(metaClient);
+        break;
+      default:
+        break;
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
index e5a638956575..3242a886a3f0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
@@ -212,6 +213,12 @@ public class HoodieCleanConfig extends HoodieConfig {
           + "By using local engine context, file listing is performed on the 
driver, allowing targeted memory scaling. "
           + "When enabled, both non-partitioned datasets and metadata tables 
use the driver for scheduling cleans.");
 
+  public static final ConfigProperty<String> PREWRITE_CLEANER_POLICY = 
ConfigProperty
+      .key("hoodie.prewrite.cleaner.policy")
+      .defaultValue(HoodiePreWriteCleanerPolicy.NONE.name())
+      .sinceVersion("1.2.0")
+      .withDocumentation(HoodiePreWriteCleanerPolicy.class);
+
   private static final String CLEAN_PARTITION_FILTER_REGEX_KEY = 
"hoodie.clean.partition.filter.regex";
   private static final String CLEAN_PARTITION_FILTER_SELECTED_KEY = 
"hoodie.clean.partition.filter.selected";
 
@@ -402,9 +409,15 @@ public class HoodieCleanConfig extends HoodieConfig {
       return this;
     }
 
+    public HoodieCleanConfig.Builder 
withPreWriteCleanerPolicy(HoodiePreWriteCleanerPolicy preWriteCleanerPolicy) {
+      cleanConfig.setValue(PREWRITE_CLEANER_POLICY, 
preWriteCleanerPolicy.name());
+      return this;
+    }
+
     public HoodieCleanConfig build() {
       cleanConfig.setDefaults(HoodieCleanConfig.class.getName());
       HoodieCleaningPolicy.valueOf(cleanConfig.getString(CLEANER_POLICY));
+      
HoodiePreWriteCleanerPolicy.fromString(cleanConfig.getString(PREWRITE_CLEANER_POLICY));
       return cleanConfig;
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 734339d5fdef..d68e3e6a94a7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FileSystemRetryConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -1970,6 +1971,10 @@ public class HoodieWriteConfig extends HoodieConfig {
         .valueOf(getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY));
   }
 
+  public HoodiePreWriteCleanerPolicy getPreWriteCleanerPolicy() {
+    return 
HoodiePreWriteCleanerPolicy.fromString(getString(HoodieCleanConfig.PREWRITE_CLEANER_POLICY));
+  }
+
   public String getCompactionSpecifyPartitionPathRegex() {
     return 
getString(HoodieCompactionConfig.COMPACTION_SPECIFY_PARTITION_PATH_REGEX);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 24fcb656ef1d..6b4f4f716019 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -84,8 +85,11 @@ import org.apache.hudi.testutils.HoodieCleanerTestBase;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.hudi.client.WriteClientTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
@@ -105,6 +109,7 @@ import java.util.stream.Stream;
 
 import scala.Tuple3;
 
+import static 
org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_COMMITS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
@@ -1525,4 +1530,169 @@ public class TestCleaner extends HoodieCleanerTestBase {
       testTable.close();
     }
   }
+
+  private static Stream<Arguments> 
preWriteCleanPolicyTypeAndCommitTimeSpecified() {
+    return Stream.of(
+        Arguments.of(HoodiePreWriteCleanerPolicy.CLEAN, true),
+        Arguments.of(HoodiePreWriteCleanerPolicy.CLEAN, false),
+        Arguments.of(HoodiePreWriteCleanerPolicy.ROLLBACK_FAILED_WRITES, true),
+        Arguments.of(HoodiePreWriteCleanerPolicy.ROLLBACK_FAILED_WRITES, false)
+    );
+  }
+
+  /**
+   * Test that both pre write clean policies (CLEAN and 
ROLLBACK_FAILED_WRITES) are enforced/executed by APIs
+   * used for starting an (ingestion) write commit (startCommit and 
startCommitWithTime).
+   */
+  @ParameterizedTest
+  @MethodSource("preWriteCleanPolicyTypeAndCommitTimeSpecified")
+  public void testPreWriteCleanPolicy(HoodiePreWriteCleanerPolicy policy, 
boolean commitTimeSpecified) throws Exception {
+    int maxCommits = 2; // keep up to 2 commits from the past
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            // Disable auto clean to ensure that clean/rollback only happens 
during pre-write phase
+            .withAutoClean(false)
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withCleanerPolicy(KEEP_LATEST_COMMITS)
+            .withPreWriteCleanerPolicy(policy)
+            .retainCommits(maxCommits).build())
+        .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .build();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    // Complete 1 insert and 3 upserts.
+    String firstCommit = WriteClientTestUtils.createNewInstantTime();
+    insertBatch(cfg, client, firstCommit, "000", 1000,
+        SparkRDDWriteClient::insert, false, true, 1000, 1000, 1, 
Option.empty(), INSTANT_GENERATOR);
+
+    String secondCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, secondCommit, firstCommit, 
Option.of(Arrays.asList(firstCommit)),
+        "000", 100, SparkRDDWriteClient::upsert, false, true,
+        100, 1000, 2, true, INSTANT_GENERATOR);
+
+    String thirdCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, thirdCommit, secondCommit, 
Option.of(Arrays.asList(secondCommit)),
+        "000", 100, SparkRDDWriteClient::upsert, false, true,
+        100, 1000, 3, true, INSTANT_GENERATOR);
+
+    String fourthCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, fourthCommit, thirdCommit, 
Option.of(Arrays.asList(thirdCommit)),
+        "000", 100, SparkRDDWriteClient::upsert, false, true,
+        100, 1000, 4, true, INSTANT_GENERATOR);
+
+    final String fifthCommit;
+    if (commitTimeSpecified) {
+      fifthCommit = WriteClientTestUtils.createNewInstantTime();
+      WriteClientTestUtils.startCommitWithTime(client, fifthCommit);
+    } else {
+      fifthCommit = client.startCommit();
+    }
+    // Stop heartbeat so that HoodieFailedWritesCleaningPolicy.LAZY rollback 
policy will consider
+    // this inflight as a failed write
+    client.getHeartbeatClient().stop(fifthCommit);
+
+    // fifthCommit inflight will still be present, but if policy is CLEAN then 
a clean should be completed
+    assertEquals(5, 
metaClient.reloadActiveTimeline().getWriteTimeline().countInstants());
+    if (policy.isClean()) {
+      assertEquals(1, 
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+    } else {
+      assertEquals(0, 
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+    }
+
+    String sixthCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, sixthCommit, fourthCommit, 
Option.of(Arrays.asList(fourthCommit)),
+        "000", 100, SparkRDDWriteClient::upsert, false, true,
+        100, 1000, 5, true, INSTANT_GENERATOR);
+
+    final String seventhCommit;
+    if (commitTimeSpecified) {
+      seventhCommit = WriteClientTestUtils.createNewInstantTime();
+      WriteClientTestUtils.startCommitWithTime(client, seventhCommit);
+    } else {
+      seventhCommit = client.startCommit();
+    }
+
+    // fifthCommit inflight should be rolled back for both CLEAN and 
ROLLBACK_FAILED_WRITES policies.
+    // But only the former should lead to creating another clean
+    assertEquals(6, 
metaClient.reloadActiveTimeline().getWriteTimeline().countInstants());
+    if (policy.isClean()) {
+      assertEquals(2, 
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+    } else {
+      assertEquals(0, 
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+    }
+  }
+
+  /**
+   * Test that pre-write clean/rollback does NOT happen when table services 
are disabled,
+   * even if the user explicitly sets a prewrite cleaner policy. The guard in
+   * BaseHoodieWriteClient.runPreWriteCleanerPolicy should skip execution.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testPreWriteCleanPolicyDisabledWhenTableServicesDisabled(boolean 
commitTimeSpecified) throws Exception {
+    int maxCommits = 2;
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withTableServicesEnabled(false)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withAutoClean(false)
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withCleanerPolicy(KEEP_LATEST_COMMITS)
+            .withPreWriteCleanerPolicy(HoodiePreWriteCleanerPolicy.CLEAN)
+            .retainCommits(maxCommits).build())
+        .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .build();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    String firstCommit = WriteClientTestUtils.createNewInstantTime();
+    insertBatch(cfg, client, firstCommit, "000", 1000,
+        SparkRDDWriteClient::insert, false, true, 1000, 1000, 1, 
Option.empty(), INSTANT_GENERATOR);
+
+    String secondCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, secondCommit, firstCommit, 
Option.of(Arrays.asList(firstCommit)),
+        "000", 100, SparkRDDWriteClient::upsert, false, true,
+        100, 1000, 2, true, INSTANT_GENERATOR);
+
+    String thirdCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, thirdCommit, secondCommit, 
Option.of(Arrays.asList(secondCommit)),
+        "000", 100, SparkRDDWriteClient::upsert, false, true,
+        100, 1000, 3, true, INSTANT_GENERATOR);
+
+    String fourthCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, fourthCommit, thirdCommit, 
Option.of(Arrays.asList(thirdCommit)),
+        "000", 100, SparkRDDWriteClient::upsert, false, true,
+        100, 1000, 4, true, INSTANT_GENERATOR);
+
+    final String fifthCommit;
+    if (commitTimeSpecified) {
+      fifthCommit = WriteClientTestUtils.createNewInstantTime();
+      WriteClientTestUtils.startCommitWithTime(client, fifthCommit);
+    } else {
+      fifthCommit = client.startCommit();
+    }
+    client.getHeartbeatClient().stop(fifthCommit);
+
+    // With table services disabled, pre-write clean should NOT have occurred.
+    assertEquals(5, 
metaClient.reloadActiveTimeline().getWriteTimeline().countInstants());
+    assertEquals(0, 
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+
+    // fifthCommit inflight is still on the timeline (not rolled back), so 6th 
commit makes 6 total
+    String sixthCommit = WriteClientTestUtils.createNewInstantTime();
+    updateBatch(cfg, client, sixthCommit, fourthCommit, 
Option.of(Arrays.asList(fourthCommit)),
+        "000", 100, SparkRDDWriteClient::upsert, false, true,
+        100, 1000, 6, true, INSTANT_GENERATOR);
+
+    final String seventhCommit;
+    if (commitTimeSpecified) {
+      seventhCommit = WriteClientTestUtils.createNewInstantTime();
+      WriteClientTestUtils.startCommitWithTime(client, seventhCommit);
+    } else {
+      seventhCommit = client.startCommit();
+    }
+
+    // fifthCommit inflight should NOT have been rolled back since table 
services are disabled.
+    assertEquals(7, 
metaClient.reloadActiveTimeline().getWriteTimeline().countInstants());
+    assertEquals(0, 
metaClient.getActiveTimeline().getCleanerTimeline().countInstants());
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreWriteCleanerPolicy.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreWriteCleanerPolicy.java
new file mode 100644
index 000000000000..2b9ab083875b
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreWriteCleanerPolicy.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.EnumDescription;
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+/**
+ * Policy for running clean and/or rollback of failed writes before starting a 
new ingestion commit
+ * (when startCommit or startCommitWithTime is invoked).
+ */
+@EnumDescription("If set, force attempting clean and/or failed writes rollback 
before starting a new ingestion write commit. "
+    + "This should only be set to ensure that data files do not build up on 
DFS if an ingestion "
+    + "writer is perpetually failing before completing a CLEAN.")
+public enum HoodiePreWriteCleanerPolicy {
+
+  @EnumFieldDescription("No pre-write clean or rollback. Default behavior.")
+  NONE,
+
+  @EnumFieldDescription("Force a CLEAN table service call before starting the 
write (also performs rollback of failed writes).")
+  CLEAN,
+
+  @EnumFieldDescription("Only perform rollback of failed writes before 
starting the write.")
+  ROLLBACK_FAILED_WRITES;
+
+  public boolean isNone() {
+    return this == NONE;
+  }
+
+  public boolean isClean() {
+    return this == CLEAN;
+  }
+
+  public boolean isRollbackFailedWrites() {
+    return this == ROLLBACK_FAILED_WRITES;
+  }
+
+  /**
+   * Parses the config value into an enum. Supports enum names (NONE, CLEAN, 
ROLLBACK_FAILED_WRITES)
+   * and legacy string values ("clean", "rollback_failed_writes", empty/null 
for NONE).
+   */
+  public static HoodiePreWriteCleanerPolicy fromString(String value) {
+    if (value == null || value.isEmpty()) {
+      return NONE;
+    }
+    String normalized = value.trim();
+    if (normalized.isEmpty()) {
+      return NONE;
+    }
+    if ("rollback_failed_writes".equalsIgnoreCase(normalized)) {
+      return ROLLBACK_FAILED_WRITES;
+    }
+    if ("clean".equalsIgnoreCase(normalized)) {
+      return CLEAN;
+    }
+    return HoodiePreWriteCleanerPolicy.valueOf(normalized.toUpperCase());
+  }
+}

Reply via email to