This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2dbb273 [HUDI-3721] Delete MDT if necessary when trigger rollback to savepoint (#5173) 2dbb273 is described below commit 2dbb273d26ea38ecf28af704876861554250449c Author: YueZhang <69956021+zhangyue19921...@users.noreply.github.com> AuthorDate: Thu Mar 31 11:26:37 2022 +0800 [HUDI-3721] Delete MDT if necessary when trigger rollback to savepoint (#5173) Co-authored-by: yuezhang <yuezh...@freewheel.tv> --- .../hudi/cli/integ/ITTestSavepointsCommand.java | 53 ++++++++++++++++++++++ .../apache/hudi/client/BaseHoodieWriteClient.java | 45 ++++++++++++++---- .../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +- .../apache/hudi/client/HoodieJavaWriteClient.java | 2 +- .../apache/hudi/client/SparkRDDWriteClient.java | 12 +++-- .../hudi/client/TestTableSchemaEvolution.java | 4 +- .../functional/TestHoodieBackedMetadata.java | 2 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 2 +- .../TestHoodieSparkMergeOnReadTableRollback.java | 8 ++-- 9 files changed, 106 insertions(+), 24 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java index 5f8021a..7de1c2d 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java @@ -22,6 +22,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.commands.TableCommand; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -29,6 +31,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; @@ -119,6 +124,54 @@ public class ITTestSavepointsCommand extends AbstractShellIntegrationTest { } /** + * Test case of command 'savepoint rollback' with metadata table bootstrap. + */ + @Test + public void testRollbackToSavepointWithMetadataTableEnable() throws IOException { + // generate for savepoints + for (int i = 101; i < 105; i++) { + String instantTime = String.valueOf(i); + HoodieTestDataGenerator.createCommitFile(tablePath, instantTime, jsc.hadoopConfiguration()); + } + + // generate one savepoint at 102 + String savepoint = "102"; + HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint, jsc.hadoopConfiguration()); + + // re-bootstrap metadata table + // delete first + String basePath = metaClient.getBasePath(); + Path metadataTableBasePath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath)); + metaClient.getFs().delete(metadataTableBasePath, true); + + // then bootstrap metadata table at instant 104 + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); + SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc)); + + assertTrue(HoodieCLI.fs.exists(metadataTableBasePath)); + + // roll back to savepoint + CommandResult cr = getShell().executeCommand( + String.format("savepoint rollback --savepoint %s --sparkMaster %s", savepoint, "local")); + + assertAll("Command run failed", + () -> assertTrue(cr.isSuccess()), + () -> assertEquals( + String.format("Savepoint \"%s\" rolled back", savepoint), cr.getResult().toString())); + + // there is 1 restore instant + HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); + assertEquals(1, timeline.getRestoreTimeline().countInstants()); + + // 103 and 104 instant had rollback + assertFalse(timeline.getCommitTimeline().containsInstant( + new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103"))); + assertFalse(timeline.getCommitTimeline().containsInstant( + new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104"))); + } + + /** * Test case of command 'savepoint delete'. */ @Test diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 028fdac..0e372cb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hadoop.fs.Path; import org.apache.hudi.async.AsyncArchiveService; import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.avro.model.HoodieCleanMetadata; @@ -66,6 +67,7 @@ import org.apache.hudi.exception.HoodieRestoreException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.BulkInsertPartitioner; @@ -643,9 +645,30 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, * @return true if the savepoint was restored to successfully */ public void restoreToSavepoint(String savepointTime) { - HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty()); + boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled(); + if (initialMetadataTableIfNecessary) { + try { + // Delete metadata table directly when users trigger savepoint rollback if mdt existed and beforeTimelineStarts + String metadataTableBasePathStr = HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()); + HoodieTableMetaClient mdtClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build(); + // Same as HoodieTableMetadataUtil#processRollbackMetadata + HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime); + // The instant required to sync rollback to MDT has been archived and the mdt syncing will be failed + // So that we need to delete the whole MDT here. + if (mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) { + mdtClient.getFs().delete(new Path(metadataTableBasePathStr), true); + // rollbackToSavepoint action will try to bootstrap MDT at first but sync to MDT will fail at the current scenario. + // so that we need to disable metadata initialized here. + initialMetadataTableIfNecessary = false; + } + } catch (Exception e) { + // Metadata directory does not exist + } + } + + HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary); SavepointHelpers.validateSavepointPresence(table, savepointTime); - restoreToInstant(savepointTime); + restoreToInstant(savepointTime, initialMetadataTableIfNecessary); SavepointHelpers.validateSavepointRestore(table, savepointTime); } @@ -659,7 +682,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, /** * @Deprecated * Rollback the inflight record changes with the given commit time. This - * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)} + * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean) * * @param commitInstantTime Instant time of the commit * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt. @@ -717,12 +740,12 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, * * @param instantTime Instant time to which restoration is requested */ - public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws HoodieRestoreException { + public HoodieRestoreMetadata restoreToInstant(final String instantTime, boolean initialMetadataTableIfNecessary) throws HoodieRestoreException { LOG.info("Begin restore to instant " + instantTime); final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime(); Timer.Context timerContext = metrics.getRollbackCtx(); try { - HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty()); + HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary); Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime); if (restorePlanOption.isPresent()) { HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime); @@ -1288,14 +1311,14 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, * @param instantTime current inflight instant time * @return instantiated {@link HoodieTable} */ - protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime); + protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary); /** * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping * operations such as: * * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on - * {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead + * {@link #doInitTable(HoodieTableMetaClient, Option, boolean)} instead * * <ul> * <li>Checking whether upgrade/downgrade is required</li> @@ -1303,7 +1326,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, * <li>Initializing metrics contexts</li> * </ul> */ - protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) { + protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime, boolean initialMetadataTableIfNecessary) { HoodieTableMetaClient metaClient = createMetaClient(true); // Setup write schemas for deletes if (operationType == WriteOperationType.DELETE) { @@ -1315,7 +1338,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, this.txnManager.beginTransaction(); try { tryUpgrade(metaClient, instantTime); - table = doInitTable(metaClient, instantTime); + table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary); } finally { this.txnManager.endTransaction(); } @@ -1348,6 +1371,10 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, return table; } + protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) { + return initTable(operationType, instantTime, config.isMetadataTableEnabled()); + } + /** * Sets write schema from last instant since deletes may not have schema set in the config. */ diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 4523705..626727a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -398,7 +398,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends } @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) { + protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) { // Create a Hoodie table which encapsulated the commits and files visible return getHoodieTable(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index faf46e0..a506131 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -233,7 +233,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends } @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) { + protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) { // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); // Create a Hoodie table which encapsulated the commits and files visible diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index f6d0632..b3e3c25 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -425,11 +425,13 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends } @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) { - // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, - // if it didn't exist before - // See https://issues.apache.org/jira/browse/HUDI-3343 for more details - initializeMetadataTable(instantTime); + protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) { + if (initialMetadataTableIfNecessary) { + // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, + // if it didn't exist before + // See https://issues.apache.org/jira/browse/HUDI-3343 for more details + initializeMetadataTable(instantTime); + } // Create a Hoodie table which encapsulated the commits and files visible return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 3fb4549..1cb7bcb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -291,7 +291,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { } // Rollback to the original schema - client.restoreToInstant("004"); + client.restoreToInstant("004", hoodieWriteConfig.isMetadataTableEnabled()); checkLatestDeltaCommit("004"); // Updates with original schema are now allowed @@ -432,7 +432,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { // Revert to the older commit and ensure that the original schema can now // be used for inserts and inserts. - client.restoreToInstant("003"); + client.restoreToInstant("003", hoodieWriteConfig.isMetadataTableEnabled()); curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003")); checkReadRecords("000", numRecords); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index d7d0d26..3497a68 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1208,7 +1208,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(client); // Restore - client.restoreToInstant("0000006"); + client.restoreToInstant("0000006", writeConfig.isMetadataTableEnabled()); validateMetadata(client); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index ce0cc37..3b78954 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -585,7 +585,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { client.savepoint("004", "user1","comment1"); - client.restoreToInstant("004"); + client.restoreToInstant("004", config.isMetadataTableEnabled()); assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index bce2ec8..339e9e1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -150,7 +150,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction // NOTE: First writer will have Metadata table DISABLED HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE); - + addConfigsForPopulateMetaFields(cfgBuilder, true); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -480,7 +480,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction copyOfRecords.clear(); // Rollback latest commit first - client.restoreToInstant("000"); + client.restoreToInstant("000", cfg.isMetadataTableEnabled()); metaClient = HoodieTableMetaClient.reload(metaClient); allFiles = listAllBaseFilesInPath(hoodieTable); @@ -530,7 +530,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction if (!restoreAfterCompaction) { // restore to 002 and validate records. - client.restoreToInstant("002"); + client.restoreToInstant("002", cfg.isMetadataTableEnabled()); validateRecords(cfg, metaClient, updates1); } else { // trigger compaction and then trigger couple of upserts followed by restore. @@ -546,7 +546,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction validateRecords(cfg, metaClient, updates5); // restore to 003 and validate records. - client.restoreToInstant("003"); + client.restoreToInstant("003", cfg.isMetadataTableEnabled()); validateRecords(cfg, metaClient, updates2); } }