This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dbb273  [HUDI-3721] Delete MDT if necessary when trigger rollback to 
savepoint (#5173)
2dbb273 is described below

commit 2dbb273d26ea38ecf28af704876861554250449c
Author: YueZhang <69956021+zhangyue19921...@users.noreply.github.com>
AuthorDate: Thu Mar 31 11:26:37 2022 +0800

    [HUDI-3721] Delete MDT if necessary when trigger rollback to savepoint 
(#5173)
    
    Co-authored-by: yuezhang <yuezh...@freewheel.tv>
---
 .../hudi/cli/integ/ITTestSavepointsCommand.java    | 53 ++++++++++++++++++++++
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 45 ++++++++++++++----
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  2 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |  2 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    | 12 +++--
 .../hudi/client/TestTableSchemaEvolution.java      |  4 +-
 .../functional/TestHoodieBackedMetadata.java       |  2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  2 +-
 .../TestHoodieSparkMergeOnReadTableRollback.java   |  8 ++--
 9 files changed, 106 insertions(+), 24 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
index 5f8021a..7de1c2d 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.commands.TableCommand;
 import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -29,6 +31,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.shell.core.CommandResult;
@@ -119,6 +124,54 @@ public class ITTestSavepointsCommand extends 
AbstractShellIntegrationTest {
   }
 
   /**
+   * Test case of command 'savepoint rollback' with metadata table bootstrap.
+   */
+  @Test
+  public void testRollbackToSavepointWithMetadataTableEnable() throws 
IOException {
+    // generate for savepoints
+    for (int i = 101; i < 105; i++) {
+      String instantTime = String.valueOf(i);
+      HoodieTestDataGenerator.createCommitFile(tablePath, instantTime, 
jsc.hadoopConfiguration());
+    }
+
+    // generate one savepoint at 102
+    String savepoint = "102";
+    HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint, 
jsc.hadoopConfiguration());
+
+    // re-bootstrap metadata table
+    // delete first
+    String basePath = metaClient.getBasePath();
+    Path metadataTableBasePath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(basePath));
+    metaClient.getFs().delete(metadataTableBasePath, true);
+
+    // then bootstrap metadata table at instant 104
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
+    SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, 
new HoodieSparkEngineContext(jsc));
+
+    assertTrue(HoodieCLI.fs.exists(metadataTableBasePath));
+
+    // roll back to savepoint
+    CommandResult cr = getShell().executeCommand(
+        String.format("savepoint rollback --savepoint %s --sparkMaster %s", 
savepoint, "local"));
+
+    assertAll("Command run failed",
+        () -> assertTrue(cr.isSuccess()),
+        () -> assertEquals(
+            String.format("Savepoint \"%s\" rolled back", savepoint), 
cr.getResult().toString()));
+
+    // there is 1 restore instant
+    HoodieActiveTimeline timeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
+    assertEquals(1, timeline.getRestoreTimeline().countInstants());
+
+    // 103 and 104 instant had rollback
+    assertFalse(timeline.getCommitTimeline().containsInstant(
+        new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
+    assertFalse(timeline.getCommitTimeline().containsInstant(
+        new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104")));
+  }
+
+  /**
    * Test case of command 'savepoint delete'.
    */
   @Test
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 028fdac..0e372cb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.async.AsyncArchiveService;
 import org.apache.hudi.async.AsyncCleanerService;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -66,6 +67,7 @@ import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -643,9 +645,30 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    * @return true if the savepoint was restored to successfully
    */
   public void restoreToSavepoint(String savepointTime) {
-    HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty());
+    boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
+    if (initialMetadataTableIfNecessary) {
+      try {
+        // Delete metadata table directly when users trigger savepoint 
rollback if mdt existed and beforeTimelineStarts
+        String metadataTableBasePathStr = 
HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath());
+        HoodieTableMetaClient mdtClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build();
+        // Same as HoodieTableMetadataUtil#processRollbackMetadata
+        HoodieInstant syncedInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
+        // The instant required to sync rollback to MDT has been archived and 
the mdt syncing will be failed
+        // So that we need to delete the whole MDT here.
+        if 
(mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
 {
+          mdtClient.getFs().delete(new Path(metadataTableBasePathStr), true);
+          // rollbackToSavepoint action will try to bootstrap MDT at first but 
sync to MDT will fail at the current scenario.
+          // so that we need to disable metadata initialized here.
+          initialMetadataTableIfNecessary = false;
+        }
+      } catch (Exception e) {
+        // Metadata directory does not exist
+      }
+    }
+
+    HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty(), initialMetadataTableIfNecessary);
     SavepointHelpers.validateSavepointPresence(table, savepointTime);
-    restoreToInstant(savepointTime);
+    restoreToInstant(savepointTime, initialMetadataTableIfNecessary);
     SavepointHelpers.validateSavepointRestore(table, savepointTime);
   }
 
@@ -659,7 +682,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
   /**
    * @Deprecated
    * Rollback the inflight record changes with the given commit time. This
-   * will be removed in future in favor of {@link 
BaseHoodieWriteClient#restoreToInstant(String)}
+   * will be removed in future in favor of {@link 
BaseHoodieWriteClient#restoreToInstant(String, boolean)
    *
    * @param commitInstantTime Instant time of the commit
    * @param pendingRollbackInfo pending rollback instant and plan if rollback 
failed from previous attempt.
@@ -717,12 +740,12 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    *
    * @param instantTime Instant time to which restoration is requested
    */
-  public HoodieRestoreMetadata restoreToInstant(final String instantTime) 
throws HoodieRestoreException {
+  public HoodieRestoreMetadata restoreToInstant(final String instantTime, 
boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
     LOG.info("Begin restore to instant " + instantTime);
     final String restoreInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
     Timer.Context timerContext = metrics.getRollbackCtx();
     try {
-      HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty());
+      HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty(), initialMetadataTableIfNecessary);
       Option<HoodieRestorePlan> restorePlanOption = 
table.scheduleRestore(context, restoreInstantTime, instantTime);
       if (restorePlanOption.isPresent()) {
         HoodieRestoreMetadata restoreMetadata = table.restore(context, 
restoreInstantTime, instantTime);
@@ -1288,14 +1311,14 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    * @param instantTime current inflight instant time
    * @return instantiated {@link HoodieTable}
    */
-  protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime);
+  protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary);
 
   /**
    * Instantiates and initializes instance of {@link HoodieTable}, performing 
crucial bootstrapping
    * operations such as:
    *
    * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please 
check on
-   * {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead
+   * {@link #doInitTable(HoodieTableMetaClient, Option, boolean)} instead
    *
    * <ul>
    *   <li>Checking whether upgrade/downgrade is required</li>
@@ -1303,7 +1326,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    *   <li>Initializing metrics contexts</li>
    * </ul>
    */
-  protected final HoodieTable initTable(WriteOperationType operationType, 
Option<String> instantTime) {
+  protected final HoodieTable initTable(WriteOperationType operationType, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
     // Setup write schemas for deletes
     if (operationType == WriteOperationType.DELETE) {
@@ -1315,7 +1338,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     this.txnManager.beginTransaction();
     try {
       tryUpgrade(metaClient, instantTime);
-      table = doInitTable(metaClient, instantTime);
+      table = doInitTable(metaClient, instantTime, 
initialMetadataTableIfNecessary);
     } finally {
       this.txnManager.endTransaction();
     }
@@ -1348,6 +1371,10 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     return table;
   }
 
+  protected final HoodieTable initTable(WriteOperationType operationType, 
Option<String> instantTime) {
+    return initTable(operationType, instantTime, 
config.isMetadataTableEnabled());
+  }
+
   /**
      * Sets write schema from last instant since deletes may not have schema 
set in the config.
      */
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 4523705..626727a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -398,7 +398,7 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime) {
+  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
     // Create a Hoodie table which encapsulated the commits and files visible
     return getHoodieTable();
   }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index faf46e0..a506131 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -233,7 +233,7 @@ public class HoodieJavaWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime) {
+  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
     // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
 
     // Create a Hoodie table which encapsulated the commits and files visible
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index f6d0632..b3e3c25 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -425,11 +425,13 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime) {
-    // Initialize Metadata Table to make sure it's bootstrapped _before_ the 
operation,
-    // if it didn't exist before
-    // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
-    initializeMetadataTable(instantTime);
+  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
+    if (initialMetadataTableIfNecessary) {
+      // Initialize Metadata Table to make sure it's bootstrapped _before_ the 
operation,
+      // if it didn't exist before
+      // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
+      initializeMetadataTable(instantTime);
+    }
 
     // Create a Hoodie table which encapsulated the commits and files visible
     return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, 
metaClient, config.isMetadataTableEnabled());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 3fb4549..1cb7bcb 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -291,7 +291,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     }
 
     // Rollback to the original schema
-    client.restoreToInstant("004");
+    client.restoreToInstant("004", hoodieWriteConfig.isMetadataTableEnabled());
     checkLatestDeltaCommit("004");
 
     // Updates with original schema are now allowed
@@ -432,7 +432,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
 
     // Revert to the older commit and ensure that the original schema can now
     // be used for inserts and inserts.
-    client.restoreToInstant("003");
+    client.restoreToInstant("003", hoodieWriteConfig.isMetadataTableEnabled());
     curTimeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
     assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
     checkReadRecords("000", numRecords);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index d7d0d26..3497a68 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1208,7 +1208,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       validateMetadata(client);
 
       // Restore
-      client.restoreToInstant("0000006");
+      client.restoreToInstant("0000006", writeConfig.isMetadataTableEnabled());
       validateMetadata(client);
     }
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index ce0cc37..3b78954 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -585,7 +585,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     client.savepoint("004", "user1","comment1");
 
-    client.restoreToInstant("004");
+    client.restoreToInstant("004", config.isMetadataTableEnabled());
 
     
assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index bce2ec8..339e9e1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -150,7 +150,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
     // NOTE: First writer will have Metadata table DISABLED
     HoodieWriteConfig.Builder cfgBuilder =
         getConfigBuilder(false, rollbackUsingMarkers, 
HoodieIndex.IndexType.SIMPLE);
-    
+
     addConfigsForPopulateMetaFields(cfgBuilder, true);
     HoodieWriteConfig cfg = cfgBuilder.build();
 
@@ -480,7 +480,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
       copyOfRecords.clear();
 
       // Rollback latest commit first
-      client.restoreToInstant("000");
+      client.restoreToInstant("000", cfg.isMetadataTableEnabled());
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
       allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -530,7 +530,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
 
       if (!restoreAfterCompaction) {
         // restore to 002 and validate records.
-        client.restoreToInstant("002");
+        client.restoreToInstant("002", cfg.isMetadataTableEnabled());
         validateRecords(cfg, metaClient, updates1);
       } else {
         // trigger compaction and then trigger couple of upserts followed by 
restore.
@@ -546,7 +546,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
         validateRecords(cfg, metaClient, updates5);
 
         // restore to 003 and validate records.
-        client.restoreToInstant("003");
+        client.restoreToInstant("003", cfg.isMetadataTableEnabled());
         validateRecords(cfg, metaClient, updates2);
       }
     }

Reply via email to