This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2954027  [HUDI-52] Enabling savepoint and restore for MOR table (#4507)
2954027 is described below

commit 2954027b92ada82c41d0a72cc0b837564a730a89
Author: Sivabalan Narayanan <sivab...@uber.com>
AuthorDate: Thu Jan 6 10:56:08 2022 -0500

    [HUDI-52] Enabling savepoint and restore for MOR table (#4507)
    
    * Enabling restore for MOR table
    
    * Fixing savepoint for compaction commits in MOR
---
 .../hudi/cli/commands/SavepointsCommand.java       | 12 ++-
 .../action/savepoint/SavepointActionExecutor.java  |  9 +--
 .../TestHoodieSparkMergeOnReadTableRollback.java   | 94 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 14 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
index 0ea2fff..d3f8584 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
@@ -78,11 +78,9 @@ public class SavepointsCommand implements CommandMarker {
       throws Exception {
     HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline timeline = 
activeTimeline.getCommitTimeline().filterCompletedInstants();
-    HoodieInstant commitInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, commitTime);
 
-    if (!timeline.containsInstant(commitInstant)) {
-      return "Commit " + commitTime + " not found in Commits " + timeline;
+    if 
(!activeTimeline.getCommitsTimeline().filterCompletedInstants().containsInstant(commitTime))
 {
+      return "Commit " + commitTime + " not found in Commits " + 
activeTimeline;
     }
 
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
@@ -112,10 +110,10 @@ public class SavepointsCommand implements CommandMarker {
       throw new HoodieException("There are no completed instants to run 
rollback");
     }
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline timeline = 
activeTimeline.getCommitTimeline().filterCompletedInstants();
-    HoodieInstant commitInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, instantTime);
+    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    List<HoodieInstant> instants = timeline.getInstants().filter(instant -> 
instant.getTimestamp().equals(instantTime)).collect(Collectors.toList());
 
-    if (!timeline.containsInstant(commitInstant)) {
+    if (instants.isEmpty()) {
       return "Commit " + instantTime + " not found in Commits " + timeline;
     }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index de1d973..134b238 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -65,13 +64,9 @@ public class SavepointActionExecutor<T extends 
HoodieRecordPayload, I, K, O> ext
 
   @Override
   public HoodieSavepointMetadata execute() {
-    if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) 
{
-      throw new UnsupportedOperationException("Savepointing is not supported 
or MergeOnRead table types");
-    }
     Option<HoodieInstant> cleanInstant = 
table.getCompletedCleanTimeline().lastInstant();
-    HoodieInstant commitInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, instantTime);
-    if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
-      throw new HoodieSavepointException("Could not savepoint non-existing 
commit " + commitInstant);
+    if (!table.getCompletedCommitsTimeline().containsInstant(instantTime)) {
+      throw new HoodieSavepointException("Could not savepoint non-existing 
commit " + instantTime);
     }
 
     try {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 5eee262..38becc9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -464,6 +464,100 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
+    HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
+        // Timeline-server-based markers are not used for multi-rollback tests
+        .withMarkersType(MarkerType.DIRECT.name())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
+    HoodieWriteConfig cfg = cfgBuilder.build();
+
+    Properties properties = new Properties();
+    properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
+
+    try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+
+      HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+      List<HoodieRecord> records = insertAndGetRecords("001", client, dataGen, 
200);
+      List<HoodieRecord> updates1 = updateAndGetRecords("002", client, 
dataGen, records);
+      List<HoodieRecord> updates2 = updateAndGetRecords("003", client, 
dataGen, records);
+      List<HoodieRecord> updates3 = updateAndGetRecords("004", client, 
dataGen, records);
+      validateRecords(cfg, metaClient, updates3);
+
+      if (!restoreAfterCompaction) {
+        // restore to 002 and validate records.
+        client.restoreToInstant("002");
+        validateRecords(cfg, metaClient, updates1);
+      } else {
+        // trigger compaction and then trigger couple of upserts followed by 
restore.
+        metaClient = HoodieTableMetaClient.reload(metaClient);
+        String compactionInstantTime = "005";
+        client.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty());
+        JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>) 
client.compact(compactionInstantTime);
+        client.commitCompaction(compactionInstantTime, ws, Option.empty());
+
+        validateRecords(cfg, metaClient, updates3);
+        List<HoodieRecord> updates4 = updateAndGetRecords("006", client, 
dataGen, records);
+        List<HoodieRecord> updates5 = updateAndGetRecords("007", client, 
dataGen, records);
+        validateRecords(cfg, metaClient, updates5);
+
+        // restore to 003 and validate records.
+        client.restoreToInstant("003");
+        validateRecords(cfg, metaClient, updates2);
+      }
+    }
+  }
+
+  private List<HoodieRecord> insertAndGetRecords(String newCommitTime, 
SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, int count) {
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, count);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+    JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, 
newCommitTime);
+    client.commit(newCommitTime, writeStatusJavaRDD);
+    List<WriteStatus> statuses = writeStatusJavaRDD.collect();
+    assertNoWriteErrors(statuses);
+    return records;
+  }
+
+  private List<HoodieRecord> updateAndGetRecords(String newCommitTime, 
SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, List<HoodieRecord> 
records) throws IOException {
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> updates = dataGen.generateUpdates(newCommitTime, 
records);
+    JavaRDD<WriteStatus> writeStatusJavaRDD = 
client.upsert(jsc().parallelize(updates, 1), newCommitTime);
+    client.commit(newCommitTime, writeStatusJavaRDD);
+    return updates;
+  }
+
+  private void validateRecords(HoodieWriteConfig cfg, HoodieTableMetaClient 
metaClient, List<HoodieRecord> expectedRecords) throws IOException {
+
+    HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), 
metaClient);
+    FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
+    HoodieTableFileSystemView tableView = 
getHoodieTableFileSystemView(metaClient, 
hoodieTable.getCompletedCommitsTimeline(), allFiles);
+    List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> 
hf.getPath()).collect(Collectors.toList());
+    List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+        basePath());
+    assertRecords(expectedRecords, recordsRead);
+  }
+
+  private void assertRecords(List<HoodieRecord> inputRecords, 
List<GenericRecord> recordsRead) {
+    assertEquals(recordsRead.size(), inputRecords.size());
+    Map<String, GenericRecord> expectedRecords = new HashMap<>();
+    inputRecords.forEach(entry -> {
+      try {
+        expectedRecords.put(entry.getRecordKey(), ((GenericRecord) 
entry.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get()));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+
+    Map<String, GenericRecord> actualRecords = new HashMap<>();
+    recordsRead.forEach(entry -> 
actualRecords.put(String.valueOf(entry.get("_row_key")), entry));
+    for (Map.Entry<String, GenericRecord> entry : expectedRecords.entrySet()) {
+      assertEquals(String.valueOf(entry.getValue().get("driver")), 
String.valueOf(actualRecords.get(entry.getKey()).get("driver")));
+    }
+  }
+
   private HoodieWriteConfig 
getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) {
     return 
getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields).build();
   }

Reply via email to