This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7be8944723d5cd5c1c54f4f94bdafe78351570ac
Author: 董可伦 <[email protected]>
AuthorDate: Thu Dec 30 11:53:17 2021 +0800

    [HUDI-2675] Fix the exception 'Not an Avro data file' when archive and 
clean (#4016)
---
 .../hudi/table/HoodieTimelineArchiveLog.java       | 37 ++++++-----
 .../table/action/clean/CleanActionExecutor.java    | 14 +++--
 .../hudi/table/action/clean/CleanPlanner.java      | 14 +++--
 .../hudi/io/TestHoodieTimelineArchiveLog.java      | 45 ++++----------
 .../java/org/apache/hudi/table/TestCleaner.java    | 71 +++++++++++++++++++---
 .../hudi/testutils/HoodieClientTestHarness.java    | 34 ++++++++++-
 .../table/timeline/HoodieActiveTimeline.java       | 12 +---
 .../table/timeline/HoodieDefaultTimeline.java      |  7 ++-
 .../hudi/common/table/timeline/HoodieTimeline.java |  2 +
 .../hudi/common/testutils/FileCreateUtils.java     | 17 +++++-
 .../hudi/common/testutils/HoodieTestTable.java     | 18 ++++--
 11 files changed, 185 insertions(+), 86 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 43a85dd..58e03f5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -156,7 +156,7 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
 
   private Stream<HoodieInstant> getCommitInstantsToArchive() {
     // TODO (na) : Add a way to return actions associated with a timeline and 
then merge/unify
-    // with logic above to avoid Stream.concats
+    // with logic above to avoid Stream.concat
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
     Option<HoodieInstant> oldestPendingCompactionAndReplaceInstant = 
table.getActiveTimeline()
@@ -176,7 +176,7 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
       // Actually do the commits
       Stream<HoodieInstant> instantToArchiveStream = 
commitTimeline.getInstants()
           .filter(s -> {
-            // if no savepoint present, then dont filter
+            // if no savepoint present, then don't filter
             return !(firstSavepoint.isPresent() && 
HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), 
LESSER_THAN_OR_EQUALS, s.getTimestamp()));
           }).filter(s -> {
             // Ensure commits >= oldest pending compaction commit is retained
@@ -233,9 +233,9 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
   private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, 
HoodieEngineContext context) throws IOException {
     LOG.info("Deleting instants " + archivedInstants);
     boolean success = true;
-    List<String> instantFiles = archivedInstants.stream().map(archivedInstant 
-> {
-      return new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
-    }).map(Path::toString).collect(Collectors.toList());
+    List<String> instantFiles = archivedInstants.stream().map(archivedInstant 
->
+        new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
+    ).map(Path::toString).collect(Collectors.toList());
 
     context.setJobStatus(this.getClass().getSimpleName(), "Delete archived 
instants");
     Map<String, Boolean> resultDeleteInstantFiles = 
FSUtils.parallelizeFilesProcess(context,
@@ -265,7 +265,7 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
         || 
(i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
     LOG.info("Latest Committed Instant=" + latestCommitted);
     if (latestCommitted.isPresent()) {
-      success &= 
deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());
+      success &= 
deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
     }
     return success;
   }
@@ -277,7 +277,7 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
    * @return success if all eligible file deleted successfully
    * @throws IOException in case of error
    */
-  private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant 
thresholdInstant) throws IOException {
+  private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant 
thresholdInstant) throws IOException {
     List<HoodieInstant> instants = null;
     boolean success = true;
     try {
@@ -291,12 +291,12 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
        * On some FSs deletion of all files in the directory can auto remove 
the directory itself.
        * GCS is one example, as it doesn't have real directories and 
subdirectories. When client
        * removes all the files from a "folder" on GCS is has to create a 
special "/" to keep the folder
-       * around. If this doesn't happen (timeout, misconfigured client, ...) 
folder will be deleted and
+       * around. If this doesn't happen (timeout, mis configured client, ...) 
folder will be deleted and
        * in this case we should not break when aux folder is not found.
        * GCS information: 
(https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork)
        */
       LOG.warn("Aux path not found. Skipping: " + 
metaClient.getMetaAuxiliaryPath());
-      return success;
+      return true;
     }
 
     List<HoodieInstant> instantsToBeDeleted =
@@ -308,7 +308,7 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
       Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), 
deleteInstant.getFileName());
       if (metaClient.getFs().exists(metaFile)) {
         success &= metaClient.getFs().delete(metaFile, false);
-        LOG.info("Deleted instant file in auxiliary metapath : " + metaFile);
+        LOG.info("Deleted instant file in auxiliary meta path : " + metaFile);
       }
     }
     return success;
@@ -321,10 +321,19 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         try {
-          deleteAnyLeftOverMarkers(context, hoodieInstant);
-          records.add(convertToAvroRecord(hoodieInstant));
-          if (records.size() >= this.config.getCommitArchivalBatchSize()) {
-            writeToFile(wrapperSchema, records);
+          if (table.getActiveTimeline().isEmpty(hoodieInstant)
+                  && (
+                          
hoodieInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)
+                          || 
(hoodieInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && 
hoodieInstant.isCompleted())
+                     )
+          ) {
+            
table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
+          } else {
+            deleteAnyLeftOverMarkers(context, hoodieInstant);
+            records.add(convertToAvroRecord(hoodieInstant));
+            if (records.size() >= this.config.getCommitArchivalBatchSize()) {
+              writeToFile(wrapperSchema, records);
+            }
           }
         } catch (Exception e) {
           LOG.error("Failed to archive commits, .commit file: " + 
hoodieInstant.getFileName(), e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index f1f2e53..9813b2b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -230,11 +230,15 @@ public class CleanActionExecutor<T extends 
HoodieRecordPayload, I, K, O> extends
         
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
     if (pendingCleanInstants.size() > 0) {
       pendingCleanInstants.forEach(hoodieInstant -> {
-        LOG.info("Finishing previously unfinished cleaner instant=" + 
hoodieInstant);
-        try {
-          cleanMetadataList.add(runPendingClean(table, hoodieInstant));
-        } catch (Exception e) {
-          LOG.warn("Failed to perform previous clean operation, instant: " + 
hoodieInstant, e);
+        if (table.getCleanTimeline().isEmpty(hoodieInstant)) {
+          table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
+        } else {
+          LOG.info("Finishing previously unfinished cleaner instant=" + 
hoodieInstant);
+          try {
+            cleanMetadataList.add(runPendingClean(table, hoodieInstant));
+          } catch (Exception e) {
+            LOG.warn("Failed to perform previous clean operation, instant: " + 
hoodieInstant, e);
+          }
         }
       });
       table.getMetaClient().reloadActiveTimeline();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 80727ff..27937af 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -146,11 +146,15 @@ public class CleanPlanner<T extends HoodieRecordPayload, 
I, K, O> implements Ser
     if (config.incrementalCleanerModeEnabled()) {
       Option<HoodieInstant> lastClean = 
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
       if (lastClean.isPresent()) {
-        HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
-            
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
-        if ((cleanMetadata.getEarliestCommitToRetain() != null)
-            && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
-          return getPartitionPathsForIncrementalCleaning(cleanMetadata, 
instantToRetain);
+        if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
+          
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
+        } else {
+          HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
+                  
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
+          if ((cleanMetadata.getEarliestCommitToRetain() != null)
+                  && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) 
{
+            return getPartitionPathsForIncrementalCleaning(cleanMetadata, 
instantToRetain);
+          }
         }
       }
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index 2112031..faa0621 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -18,15 +18,10 @@
 
 package org.apache.hudi.io;
 
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.utils.MetadataConversionUtils;
-import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -35,13 +30,11 @@ import 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -70,11 +63,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -502,8 +493,9 @@ public class TestHoodieTimelineArchiveLog extends 
HoodieClientTestHarness {
         HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, 
commitsAfterArchival);
   }
 
-  @Test
-  public void testArchiveCompletedRollbackAndClean() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testArchiveCompletedRollbackAndClean(boolean isEmpty) throws 
Exception {
     init();
     int minInstantsToKeep = 2;
     int maxInstantsToKeep = 10;
@@ -519,11 +511,11 @@ public class TestHoodieTimelineArchiveLog extends 
HoodieClientTestHarness {
 
     int startInstant = 1;
     for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
-      createCleanMetadata(startInstant + "", false);
+      createCleanMetadata(startInstant + "", false, isEmpty || i % 2 == 0);
     }
 
     for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
-      createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", 
false);
+      createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", 
false, isEmpty || i % 2 == 0);
     }
 
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
@@ -701,31 +693,16 @@ public class TestHoodieTimelineArchiveLog extends 
HoodieClientTestHarness {
     return allInstants;
   }
 
-  private HoodieInstant createCleanMetadata(String instantTime, boolean 
inflightOnly) throws IOException {
-    HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""), "", new HashMap<>(),
-        CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
-    if (inflightOnly) {
-      HoodieTestTable.of(metaClient).addInflightClean(instantTime, 
cleanerPlan);
-    } else {
-      HoodieCleanStat cleanStats = new HoodieCleanStat(
-          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
-          HoodieTestUtils.DEFAULT_PARTITION_PATHS[new 
Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
-          Collections.emptyList(),
-          Collections.emptyList(),
-          Collections.emptyList(),
-          instantTime);
-      HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, 
Option.of(0L), Collections.singletonList(cleanStats));
-      HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, 
cleanMetadata);
-    }
-    return new HoodieInstant(inflightOnly, "clean", instantTime);
+  private void createCommitAndRollbackFile(String commitToRollback, String 
rollbackTIme, boolean isRollbackInflight) throws IOException {
+    createCommitAndRollbackFile(commitToRollback, rollbackTIme, 
isRollbackInflight, false);
   }
 
-  private void createCommitAndRollbackFile(String commitToRollback, String 
rollbackTIme, boolean isRollbackInflight) throws IOException {
+  private void createCommitAndRollbackFile(String commitToRollback, String 
rollbackTIme, boolean isRollbackInflight, boolean isEmpty) throws IOException {
     HoodieTestDataGenerator.createCommitFile(basePath, commitToRollback, 
wrapperFs.getConf());
-    createRollbackMetadata(rollbackTIme, commitToRollback, isRollbackInflight);
+    createRollbackMetadata(rollbackTIme, commitToRollback, isRollbackInflight, 
isEmpty);
   }
 
-  private HoodieInstant createRollbackMetadata(String rollbackTime, String 
commitToRollback, boolean inflight) throws IOException {
+  private HoodieInstant createRollbackMetadata(String rollbackTime, String 
commitToRollback, boolean inflight, boolean isEmpty) throws IOException {
     if (inflight) {
       HoodieTestTable.of(metaClient).addInflightRollback(rollbackTime);
     } else {
@@ -738,7 +715,7 @@ public class TestHoodieTimelineArchiveLog extends 
HoodieClientTestHarness {
           .setPartitionMetadata(Collections.emptyMap())
           .setInstantsRollback(Collections.emptyList())
           .build();
-      HoodieTestTable.of(metaClient).addRollback(rollbackTime, 
hoodieRollbackMetadata);
+      HoodieTestTable.of(metaClient).addRollback(rollbackTime, 
hoodieRollbackMetadata, isEmpty);
     }
     return new HoodieInstant(inflight, "rollback", rollbackTime);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 491bba2..8a1f4ab 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -259,7 +259,7 @@ public class TestCleaner extends HoodieClientTestBase {
    * @param insertFn Insert API to be tested
    * @param upsertFn Upsert API to be tested
    * @param isPreppedAPI Flag to indicate if a prepped-version is used. If 
true, a wrapper function will be used during
-   *        record generation to also tag the regards (de-dupe is implicit as 
we use uniq record-gen APIs)
+   *        record generation to also tag the regards (de-dupe is implicit as 
we use unique record-gen APIs)
    * @throws Exception in case of errors
    */
   private void testInsertAndCleanByVersions(
@@ -274,7 +274,7 @@ public class TestCleaner extends HoodieClientTestBase {
         
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
         .build();
-    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
 
       final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction =
           generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
@@ -429,7 +429,7 @@ public class TestCleaner extends HoodieClientTestBase {
    * @param insertFn Insert API to be tested
    * @param upsertFn Upsert API to be tested
    * @param isPreppedAPI Flag to indicate if a prepped-version is used. If 
true, a wrapper function will be used during
-   *        record generation to also tag the regards (de-dupe is implicit as 
we use uniq record-gen APIs)
+   *        record generation to also tag the regards (de-dupe is implicit as 
we use unique record-gen APIs)
    * @throws Exception in case of errors
    */
   private void testInsertAndCleanByCommits(
@@ -550,10 +550,10 @@ public class TestCleaner extends HoodieClientTestBase {
     HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
     assertTrue(timeline.getTimelineOfActions(
         
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants()
 == 3);
-    Option<HoodieInstant> rolleBackInstantForFailedCommit = 
timeline.getTimelineOfActions(
+    Option<HoodieInstant> rollBackInstantForFailedCommit = 
timeline.getTimelineOfActions(
         
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
     HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeAvroMetadata(
-        
timeline.getInstantDetails(rolleBackInstantForFailedCommit.get()).get(), 
HoodieRollbackMetadata.class);
+        
timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), 
HoodieRollbackMetadata.class);
     // Rollback of one of the failed writes should have deleted 3 files
     assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
   }
@@ -750,6 +750,59 @@ public class TestCleaner extends HoodieClientTestBase {
     assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
   }
 
+  @Test
+  public void testCleanEmptyInstants() throws Exception {
+    HoodieWriteConfig config =
+            HoodieWriteConfig.newBuilder()
+                    .withPath(basePath)
+                    
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+                    .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                            
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    int commitCount = 20;
+    int cleanCount = 20;
+
+    int startInstant = 1;
+    for (int i = 0; i < commitCount; i++, startInstant++) {
+      String commitTime = makeNewCommitTime(startInstant);
+      HoodieTestTable.of(metaClient).addCommit(commitTime);
+    }
+
+    for (int i = 0; i < cleanCount; i++, startInstant++) {
+      String commitTime = makeNewCommitTime(startInstant);
+      createCleanMetadata(commitTime + "", false, true);
+    }
+
+    List<HoodieCleanStat> cleanStats = runCleaner(config);
+    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
+
+    assertEquals(0, cleanStats.size(), "Must not clean any files");
+    assertEquals(1, timeline.getTimelineOfActions(
+            
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().countInstants());
+    assertEquals(0, timeline.getTimelineOfActions(
+            
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflights().countInstants());
+    assertEquals(--cleanCount, timeline.getTimelineOfActions(
+            
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants());
+    assertTrue(timeline.getTimelineOfActions(
+            
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--startInstant)));
+
+    cleanStats = runCleaner(config);
+    timeline = metaClient.reloadActiveTimeline();
+
+    assertEquals(0, cleanStats.size(), "Must not clean any files");
+    assertEquals(1, timeline.getTimelineOfActions(
+            
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().countInstants());
+    assertEquals(0, timeline.getTimelineOfActions(
+            
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflights().countInstants());
+    assertEquals(--cleanCount, timeline.getTimelineOfActions(
+            
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants());
+    assertTrue(timeline.getTimelineOfActions(
+            
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--startInstant)));
+  }
+
+
   /**
    * Test HoodieTable.clean() Cleaning by versions logic for MOR table with 
Log files.
    */
@@ -1425,7 +1478,7 @@ public class TestCleaner extends HoodieClientTestBase {
    *
    * @param insertFn Insert API to be tested
    * @param isPreppedAPI Flag to indicate if a prepped-version is used. If 
true, a wrapper function will be used during
-   *        record generation to also tag the regards (de-dupe is implicit as 
we use uniq record-gen APIs)
+   *        record generation to also tag the regards (de-dupe is implicit as 
we use unique record-gen APIs)
    * @throws Exception in case of errors
    */
   private void testInsertAndCleanFailedWritesByVersions(
@@ -1441,7 +1494,7 @@ public class TestCleaner extends HoodieClientTestBase {
         .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
         
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .build();
-    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
 
       final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction =
           generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
@@ -1474,10 +1527,10 @@ public class TestCleaner extends HoodieClientTestBase {
       HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
       assertTrue(timeline.getTimelineOfActions(
           
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants()
 == 3);
-      Option<HoodieInstant> rolleBackInstantForFailedCommit = 
timeline.getTimelineOfActions(
+      Option<HoodieInstant> rollBackInstantForFailedCommit = 
timeline.getTimelineOfActions(
           
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
       HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeAvroMetadata(
-          
timeline.getInstantDetails(rolleBackInstantForFailedCommit.get()).get(), 
HoodieRollbackMetadata.class);
+          
timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), 
HoodieRollbackMetadata.class);
       // Rollback of one of the failed writes should have deleted 3 files
       assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 1e07485..d312b64 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -17,13 +17,18 @@
 
 package org.apache.hudi.testutils;
 
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -31,7 +36,9 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
@@ -85,12 +92,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
+import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -671,7 +680,7 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
     Assertions.assertEquals(MetadataPartitionType.values().length, 
metadataTablePartitions.size());
 
     // Metadata table should automatically compact and clean
-    // versions are +1 as autoclean / compaction happens end of commits
+    // versions are +1 as autoClean / compaction happens end of commits
     int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() 
+ 1;
     HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metadataMetaClient, 
metadataMetaClient.getActiveTimeline());
     metadataTablePartitions.forEach(partition -> {
@@ -682,4 +691,27 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
           + numFileVersions + " but was " + latestSlices.size());
     });
   }
+
+  public HoodieInstant createCleanMetadata(String instantTime, boolean 
inflightOnly) throws IOException {
+    return createCleanMetadata(instantTime, inflightOnly, false);
+  }
+
+  public HoodieInstant createCleanMetadata(String instantTime, boolean 
inflightOnly, boolean isEmpty) throws IOException {
+    HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""), "", new HashMap<>(),
+            CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
+    if (inflightOnly) {
+      HoodieTestTable.of(metaClient).addInflightClean(instantTime, 
cleanerPlan);
+    } else {
+      HoodieCleanStat cleanStats = new HoodieCleanStat(
+              HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
+              HoodieTestUtils.DEFAULT_PARTITION_PATHS[new 
Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              instantTime);
+      HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, 
Option.of(0L), Collections.singletonList(cleanStats));
+      HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, 
cleanMetadata, isEmpty);
+    }
+    return new HoodieInstant(inflightOnly, "clean", instantTime);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index b6f1a32..ca131f3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -79,13 +79,6 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
   }
 
   /**
-   * Format the java.time.Instant to a String representing the timestamp of a 
Hoodie Instant.
-   */
-  public static String formatInstantTime(Instant timestamp) {
-    return HoodieInstantTimeGenerator.formatInstantTime(timestamp);
-  }
-
-  /**
    * Format the Date to a String representing the timestamp of a Hoodie 
Instant.
    */
   public static String formatDate(Date timestamp) {
@@ -198,9 +191,8 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     }
   }
 
-  public void deletePendingIfExists(HoodieInstant.State state, String action, 
String instantStr) {
-    HoodieInstant instant = new HoodieInstant(state, action, instantStr);
-    ValidationUtils.checkArgument(!instant.isCompleted());
+  public void deleteEmptyInstantIfExists(HoodieInstant instant) {
+    ValidationUtils.checkArgument(isEmpty(instant));
     deleteInstantFileIfExists(instant);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index aa48db7..15691f1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -123,7 +123,7 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   @Override
   public HoodieTimeline getCompletedReplaceTimeline() {
     return new HoodieDefaultTimeline(
-        instants.stream().filter(s -> 
s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(s -> s.isCompleted()), 
details);
+        instants.stream().filter(s -> 
s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(HoodieInstant::isCompleted),
 details);
   }
 
   @Override
@@ -375,6 +375,11 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   }
 
   @Override
+  public boolean isEmpty(HoodieInstant instant) {
+    return getInstantDetails(instant).get().length == 0;
+  }
+
+  @Override
   public String toString() {
     return this.getClass().getName() + ": " + 
instants.stream().map(Object::toString).collect(Collectors.joining(","));
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index bb6bed8..3b2779c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -289,6 +289,8 @@ public interface HoodieTimeline extends Serializable {
    */
   Option<byte[]> getInstantDetails(HoodieInstant instant);
 
+  boolean isEmpty(HoodieInstant instant);
+
   /**
    * Check WriteOperationType is DeletePartition.
    */
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 3754f37..1968ef4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.testutils;
 
+import org.apache.directory.api.util.Strings;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -219,14 +220,26 @@ public class FileCreateUtils {
     createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, 
serializeCleanMetadata(metadata).get());
   }
 
+  public static void createCleanFile(String basePath, String instantTime, 
HoodieCleanMetadata metadata, boolean isEmpty) throws IOException {
+    createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, 
isEmpty ? Strings.EMPTY_BYTES : serializeCleanMetadata(metadata).get());
+  }
+
   public static void createRequestedCleanFile(String basePath, String 
instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
     createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_CLEAN_EXTENSION, 
serializeCleanerPlan(cleanerPlan).get());
   }
 
+  public static void createRequestedCleanFile(String basePath, String 
instantTime, HoodieCleanerPlan cleanerPlan, boolean isEmpty) throws IOException 
{
+    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_CLEAN_EXTENSION, isEmpty ? Strings.EMPTY_BYTES : 
serializeCleanerPlan(cleanerPlan).get());
+  }
+
   public static void createInflightCleanFile(String basePath, String 
instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
     createMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_CLEAN_EXTENSION, 
serializeCleanerPlan(cleanerPlan).get());
   }
 
+  public static void createInflightCleanFile(String basePath, String 
instantTime, HoodieCleanerPlan cleanerPlan, boolean isEmpty) throws IOException 
{
+    createMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_CLEAN_EXTENSION, isEmpty ? Strings.EMPTY_BYTES : 
serializeCleanerPlan(cleanerPlan).get());
+  }
+
   public static void createRequestedRollbackFile(String basePath, String 
instantTime, HoodieRollbackPlan plan) throws IOException {
     createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get());
   }
@@ -235,8 +248,8 @@ public class FileCreateUtils {
     createMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
   }
 
-  public static void createRollbackFile(String basePath, String instantTime, 
HoodieRollbackMetadata hoodieRollbackMetadata) throws IOException {
-    createMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION, 
serializeRollbackMetadata(hoodieRollbackMetadata).get());
+  public static void createRollbackFile(String basePath, String instantTime, 
HoodieRollbackMetadata hoodieRollbackMetadata, boolean isEmpty) throws 
IOException {
+    createMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION, 
isEmpty ? Strings.EMPTY_BYTES : 
serializeRollbackMetadata(hoodieRollbackMetadata).get());
   }
 
   public static void createRestoreFile(String basePath, String instantTime, 
HoodieRestoreMetadata hoodieRestoreMetadata) throws IOException {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 8bd1ea7..7b8148a 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -279,9 +279,13 @@ public class HoodieTestTable {
   }
 
   public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan 
cleanerPlan, HoodieCleanMetadata metadata) throws IOException {
-    createRequestedCleanFile(basePath, instantTime, cleanerPlan);
-    createInflightCleanFile(basePath, instantTime, cleanerPlan);
-    createCleanFile(basePath, instantTime, metadata);
+    return addClean(instantTime, cleanerPlan, metadata, false);
+  }
+
+  public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan 
cleanerPlan, HoodieCleanMetadata metadata, boolean isEmpty) throws IOException {
+    createRequestedCleanFile(basePath, instantTime, cleanerPlan, isEmpty);
+    createInflightCleanFile(basePath, instantTime, cleanerPlan, isEmpty);
+    createCleanFile(basePath, instantTime, metadata, isEmpty);
     currentInstantTime = instantTime;
     return this;
   }
@@ -324,8 +328,12 @@ public class HoodieTestTable {
   }
 
   public HoodieTestTable addRollback(String instantTime, 
HoodieRollbackMetadata rollbackMetadata) throws IOException {
+    return addRollback(instantTime, rollbackMetadata, false);
+  }
+
+  public HoodieTestTable addRollback(String instantTime, 
HoodieRollbackMetadata rollbackMetadata, boolean isEmpty) throws IOException {
     createInflightRollbackFile(basePath, instantTime);
-    createRollbackFile(basePath, instantTime, rollbackMetadata);
+    createRollbackFile(basePath, instantTime, rollbackMetadata, isEmpty);
     currentInstantTime = instantTime;
     return this;
   }
@@ -1015,7 +1023,7 @@ public class HoodieTestTable {
    * @param tableType                     - Hudi table type
    * @param commitTime                    - Write commit time
    * @param partitionToFilesNameLengthMap - Map of partition names to its list 
of files and their lengths
-   * @return Test tabke state for the requested partitions and files
+   * @return Test table state for the requested partitions and files
    */
   private static HoodieTestTableState 
getTestTableStateWithPartitionFileInfo(WriteOperationType operationType,
                                                                              
HoodieTableType tableType,

Reply via email to