This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5cb51220fe [HUDI-3848] Fixing minor bug in listing based rollback
request generation (#6244)
5cb51220fe is described below
commit 5cb51220fecdd82e2651bbd1cc8b41533ec5008b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Aug 2 10:08:38 2022 -0400
[HUDI-3848] Fixing minor bug in listing based rollback request generation
(#6244)
---
.../rollback/ListingBasedRollbackStrategy.java | 6 +-
.../TestCopyOnWriteRollbackActionExecutor.java | 67 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index aa9e0b6583..10b02d7fff 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -242,11 +242,11 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
return fs.listStatus(Arrays.stream(filePaths).filter(entry -> {
try {
return fs.exists(entry);
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Exists check failed for " + entry.toString(), e);
}
- // if IOException is thrown, do not ignore. lets try to add the file of
interest to be deleted. we can't miss any files to be rolled back.
- return false;
+ // if any Exception is thrown, do not ignore. let's try to add the file
of interest to be deleted. we can't miss any files to be rolled back.
+ return true;
}).toArray(Path[]::new), pathFilter);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index e5dd5b087a..237f069178 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -20,24 +20,37 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.testutils.Assertions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -45,10 +58,12 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackTestBase {
@BeforeEach
@@ -133,6 +148,58 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
assertFalse(testTable.baseFileExists(p2, "002", "id22"));
}
+ @Test
+ public void testListBasedRollbackStrategy() throws Exception {
+ //just generate two partitions
+ dataGen = new HoodieTestDataGenerator(new String[]
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH,
DEFAULT_THIRD_PARTITION_PATH});
+ HoodieWriteConfig cfg =
getConfigBuilder().withRollbackUsingMarkers(false).build();
+ // 1. prepare data
+ HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[]
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records =
dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+ Assertions.assertNoWriteErrors(statuses.collect());
+
+ newCommitTime = "002";
+ client.startCommitWithTime(newCommitTime);
+ records = dataGen.generateUpdates(newCommitTime, records);
+ statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
+ Assertions.assertNoWriteErrors(statuses.collect());
+
+ context = new HoodieSparkEngineContext(jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ HoodieInstant needRollBackInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "002");
+ String rollbackInstant = "003";
+
+ ListingBasedRollbackStrategy rollbackStrategy = new
ListingBasedRollbackStrategy(table, context, table.getConfig(),
rollbackInstant);
+ List<HoodieRollbackRequest> rollBackRequests =
rollbackStrategy.getRollbackRequests(needRollBackInstant);
+ rollBackRequests.forEach(entry -> System.out.println(" " +
entry.getPartitionPath() + ", " + entry.getFileId() + " "
+ + Arrays.toString(entry.getFilesToBeDeleted().toArray())));
+
+ HoodieRollbackRequest rollbackRequest =
rollBackRequests.stream().filter(entry ->
entry.getPartitionPath().equals(DEFAULT_FIRST_PARTITION_PATH)).findFirst().get();
+
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ MockitoAnnotations.initMocks(this);
+
+ // mock to throw exception when fs.exists() is invoked
+ System.out.println("Fs.exists() call for " +
rollbackRequest.getFilesToBeDeleted().get(0).toString());
+ Mockito.when(fs.exists(any()))
+ .thenThrow(new IOException("Failing exists call for " +
rollbackRequest.getFilesToBeDeleted().get(0)));
+
+ rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg,
rollbackInstant);
+ List<HoodieRollbackRequest> rollBackRequestsUpdated =
rollbackStrategy.getRollbackRequests(needRollBackInstant);
+ rollBackRequestsUpdated.forEach(entry -> System.out.println(" " +
entry.getPartitionPath() + ", " + entry.getFileId() + " "
+ + Arrays.toString(entry.getFilesToBeDeleted().toArray())));
+
+ assertEquals(rollBackRequests, rollBackRequestsUpdated);
+ }
+
+
// Verify that rollback works with replacecommit
@ParameterizedTest
@ValueSource(booleans = {true, false})