This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cb51220fe [HUDI-3848] Fixing minor bug in listing based rollback 
request generation (#6244)
5cb51220fe is described below

commit 5cb51220fecdd82e2651bbd1cc8b41533ec5008b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Aug 2 10:08:38 2022 -0400

    [HUDI-3848] Fixing minor bug in listing based rollback request generation 
(#6244)
---
 .../rollback/ListingBasedRollbackStrategy.java     |  6 +-
 .../TestCopyOnWriteRollbackActionExecutor.java     | 67 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index aa9e0b6583..10b02d7fff 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -242,11 +242,11 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
     return fs.listStatus(Arrays.stream(filePaths).filter(entry -> {
       try {
         return fs.exists(entry);
-      } catch (IOException e) {
+      } catch (Exception e) {
         LOG.error("Exists check failed for " + entry.toString(), e);
       }
-      // if IOException is thrown, do not ignore. lets try to add the file of 
interest to be deleted. we can't miss any files to be rolled back.
-      return false;
+      // if any Exception is thrown, do not ignore. let's try to add the file 
of interest to be deleted. we can't miss any files to be rolled back.
+      return true;
     }).toArray(Path[]::new), pathFilter);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index e5dd5b087a..237f069178 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -20,24 +20,37 @@ package org.apache.hudi.table.action.rollback;
 
 import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.testutils.Assertions;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -45,10 +58,12 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
 
 public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackTestBase {
   @BeforeEach
@@ -133,6 +148,58 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     assertFalse(testTable.baseFileExists(p2, "002", "id22"));
   }
 
+  @Test
+  public void testListBasedRollbackStrategy() throws Exception {
+    //just generate two partitions
+    dataGen = new HoodieTestDataGenerator(new String[] 
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH});
+    HoodieWriteConfig cfg = 
getConfigBuilder().withRollbackUsingMarkers(false).build();
+    // 1. prepare data
+    HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[] 
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    String newCommitTime = "001";
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> records = 
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+    Assertions.assertNoWriteErrors(statuses.collect());
+
+    newCommitTime = "002";
+    client.startCommitWithTime(newCommitTime);
+    records = dataGen.generateUpdates(newCommitTime, records);
+    statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
+    Assertions.assertNoWriteErrors(statuses.collect());
+
+    context = new HoodieSparkEngineContext(jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    HoodieInstant needRollBackInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "002");
+    String rollbackInstant = "003";
+
+    ListingBasedRollbackStrategy rollbackStrategy = new 
ListingBasedRollbackStrategy(table, context, table.getConfig(), 
rollbackInstant);
+    List<HoodieRollbackRequest> rollBackRequests = 
rollbackStrategy.getRollbackRequests(needRollBackInstant);
+    rollBackRequests.forEach(entry -> System.out.println(" " + 
entry.getPartitionPath() + ", " + entry.getFileId() + " "
+        + Arrays.toString(entry.getFilesToBeDeleted().toArray())));
+
+    HoodieRollbackRequest rollbackRequest = 
rollBackRequests.stream().filter(entry -> 
entry.getPartitionPath().equals(DEFAULT_FIRST_PARTITION_PATH)).findFirst().get();
+
+    FileSystem fs = Mockito.mock(FileSystem.class);
+    MockitoAnnotations.initMocks(this);
+
+    // mock to throw exception when fs.exists() is invoked
+    System.out.println("Fs.exists() call for " + 
rollbackRequest.getFilesToBeDeleted().get(0).toString());
+    Mockito.when(fs.exists(any()))
+        .thenThrow(new IOException("Failing exists call for " + 
rollbackRequest.getFilesToBeDeleted().get(0)));
+
+    rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg, 
rollbackInstant);
+    List<HoodieRollbackRequest> rollBackRequestsUpdated = 
rollbackStrategy.getRollbackRequests(needRollBackInstant);
+    rollBackRequestsUpdated.forEach(entry -> System.out.println(" " + 
entry.getPartitionPath() + ", " + entry.getFileId() + " "
+        + Arrays.toString(entry.getFilesToBeDeleted().toArray())));
+
+    assertEquals(rollBackRequests, rollBackRequestsUpdated);
+  }
+
+
   // Verify that rollback works with replacecommit
   @ParameterizedTest
   @ValueSource(booleans = {true, false})

Reply via email to