This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2954027 [HUDI-52] Enabling savepoint and restore for MOR table (#4507) 2954027 is described below commit 2954027b92ada82c41d0a72cc0b837564a730a89 Author: Sivabalan Narayanan <sivab...@uber.com> AuthorDate: Thu Jan 6 10:56:08 2022 -0500 [HUDI-52] Enabling savepoint and restore for MOR table (#4507) * Enabling restore for MOR table * Fixing savepoint for compaction commits in MOR --- .../hudi/cli/commands/SavepointsCommand.java | 12 ++- .../action/savepoint/SavepointActionExecutor.java | 9 +-- .../TestHoodieSparkMergeOnReadTableRollback.java | 94 ++++++++++++++++++++++ 3 files changed, 101 insertions(+), 14 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java index 0ea2fff..d3f8584 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java @@ -78,11 +78,9 @@ public class SavepointsCommand implements CommandMarker { throws Exception { HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); - HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); - if (!timeline.containsInstant(commitInstant)) { - return "Commit " + commitTime + " not found in Commits " + timeline; + if (!activeTimeline.getCommitsTimeline().filterCompletedInstants().containsInstant(commitTime)) { + return "Commit " + commitTime + " not found in Commits " + activeTimeline; } SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); @@ -112,10 +110,10 @@ public class SavepointsCommand implements CommandMarker { throw new HoodieException("There are no completed instants to run rollback"); } HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); - HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); + HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + List<HoodieInstant> instants = timeline.getInstants().filter(instant -> instant.getTimestamp().equals(instantTime)).collect(Collectors.toList()); - if (!timeline.containsInstant(commitInstant)) { + if (instants.isEmpty()) { return "Commit " + instantTime + " not found in Commits " + timeline; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java index de1d973..134b238 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -24,7 +24,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -65,13 +64,9 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext @Override public HoodieSavepointMetadata execute() { - if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); - } Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant(); - HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); - if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) { - throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant); + if (!table.getCompletedCommitsTimeline().containsInstant(instantTime)) { + throw new HoodieSavepointException("Could not savepoint non-existing commit " + instantTime); } try { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 5eee262..38becc9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -464,6 +464,100 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testMORTableRestore(boolean restoreAfterCompaction) throws Exception { + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false) + // Timeline-server-based markers are not used for multi-rollback tests + .withMarkersType(MarkerType.DIRECT.name()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); + HoodieWriteConfig cfg = cfgBuilder.build(); + + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List<HoodieRecord> records = insertAndGetRecords("001", client, dataGen, 200); + List<HoodieRecord> updates1 = updateAndGetRecords("002", client, dataGen, records); + List<HoodieRecord> updates2 = updateAndGetRecords("003", client, dataGen, records); + List<HoodieRecord> updates3 = updateAndGetRecords("004", client, dataGen, records); + validateRecords(cfg, metaClient, updates3); + + if (!restoreAfterCompaction) { + // restore to 002 and validate records. + client.restoreToInstant("002"); + validateRecords(cfg, metaClient, updates1); + } else { + // trigger compaction and then trigger couple of upserts followed by restore. + metaClient = HoodieTableMetaClient.reload(metaClient); + String compactionInstantTime = "005"; + client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime); + client.commitCompaction(compactionInstantTime, ws, Option.empty()); + + validateRecords(cfg, metaClient, updates3); + List<HoodieRecord> updates4 = updateAndGetRecords("006", client, dataGen, records); + List<HoodieRecord> updates5 = updateAndGetRecords("007", client, dataGen, records); + validateRecords(cfg, metaClient, updates5); + + // restore to 003 and validate records. + client.restoreToInstant("003"); + validateRecords(cfg, metaClient, updates2); + } + } + } + + private List<HoodieRecord> insertAndGetRecords(String newCommitTime, SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, int count) { + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, count); + JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1); + JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + List<WriteStatus> statuses = writeStatusJavaRDD.collect(); + assertNoWriteErrors(statuses); + return records; + } + + private List<HoodieRecord> updateAndGetRecords(String newCommitTime, SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, List<HoodieRecord> records) throws IOException { + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> updates = dataGen.generateUpdates(newCommitTime, records); + JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(jsc().parallelize(updates, 1), newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + return updates; + } + + private void validateRecords(HoodieWriteConfig cfg, HoodieTableMetaClient metaClient, List<HoodieRecord> expectedRecords) throws IOException { + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + basePath()); + assertRecords(expectedRecords, recordsRead); + } + + private void assertRecords(List<HoodieRecord> inputRecords, List<GenericRecord> recordsRead) { + assertEquals(recordsRead.size(), inputRecords.size()); + Map<String, GenericRecord> expectedRecords = new HashMap<>(); + inputRecords.forEach(entry -> { + try { + expectedRecords.put(entry.getRecordKey(), ((GenericRecord) entry.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get())); + } catch (IOException e) { + e.printStackTrace(); + } + }); + + Map<String, GenericRecord> actualRecords = new HashMap<>(); + recordsRead.forEach(entry -> actualRecords.put(String.valueOf(entry.get("_row_key")), entry)); + for (Map.Entry<String, GenericRecord> entry : expectedRecords.entrySet()) { + assertEquals(String.valueOf(entry.getValue().get("driver")), String.valueOf(actualRecords.get(entry.getKey()).get("driver"))); + } + } + private HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) { return getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields).build(); }