This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 71079958a7b [HUDI-8395] Fix metaClient handling when running upgrade 
or downgrade (#12224)
71079958a7b is described below

commit 71079958a7bca0d85170bba3a243a52aef1c9770
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Nov 15 19:38:34 2024 +0530

    [HUDI-8395] Fix metaClient handling when running upgrade or downgrade 
(#12224)
    
    * [HUDI-8395] Fix metaClient handling when running upgrade or downgrade
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 .../hudi/table/upgrade/UpgradeDowngrade.java       |  4 ----
 .../org/apache/hudi/util/CommonClientUtils.java    |  4 ++--
 .../hudi/common/model/HoodieCommitMetadata.java    | 12 ++++++++----
 .../hudi/common/table/HoodieTableMetaClient.java   |  8 ++++++++
 .../hudi/common/testutils/HoodieTestUtils.java     |  2 ++
 .../hudi/TestMultipleTableVersionWriting.scala     | 22 ++++++++++++++++------
 7 files changed, 37 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 56f08381daa..3e258d892b6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1437,7 +1437,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
         new UpgradeDowngrade(metaClient, config, context, 
upgradeDowngradeHelper);
 
     if 
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
-      metaClient = HoodieTableMetaClient.reload(metaClient);
       // Ensure no inflight commits by setting EAGER policy and explicitly 
cleaning all failed commits
       List<String> instantsToRollback = 
tableServiceClient.getInstantsToRollback(metaClient, 
HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
 
@@ -1450,6 +1449,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
           .run(HoodieTableVersion.current(), instantTime.orElse(null));
 
+      metaClient.reloadTableConfig();
       metaClient.reloadActiveTimeline();
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 20b351ff6be..9d7dc2df55c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -48,8 +48,6 @@ public class UpgradeDowngrade {
   private HoodieTableMetaClient metaClient;
   protected HoodieWriteConfig config;
   protected HoodieEngineContext context;
-  private StoragePath updatedPropsFilePath;
-  private StoragePath propsFilePath;
 
   public UpgradeDowngrade(
       HoodieTableMetaClient metaClient, HoodieWriteConfig config, 
HoodieEngineContext context,
@@ -57,8 +55,6 @@ public class UpgradeDowngrade {
     this.metaClient = metaClient;
     this.config = config;
     this.context = context;
-    this.updatedPropsFilePath = new StoragePath(metaClient.getMetaPath(), 
HOODIE_UPDATED_PROPERTY_FILE);
-    this.propsFilePath = new StoragePath(metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     this.upgradeDowngradeHelper = upgradeDowngradeHelper;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
index 320839af55b..a7d22d2e453 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
@@ -28,8 +28,8 @@ public class CommonClientUtils {
   public static void validateTableVersion(HoodieTableConfig tableConfig, 
HoodieWriteConfig writeConfig) {
     // mismatch of table versions.
     if (!tableConfig.getTableVersion().equals(writeConfig.getWriteVersion())) {
-      throw new HoodieNotSupportedException(String.format("Table version (%s) 
and Writer version (%s) do not match.",
-          tableConfig.getTableVersion(), writeConfig.getWriteVersion()));
+      throw new HoodieNotSupportedException(String.format("Table version (%s) 
and Writer version (%s) do not match for table at: %s.",
+          tableConfig.getTableVersion(), writeConfig.getWriteVersion(), 
writeConfig.getBasePath()));
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 46874bb10f8..22bf37d9c3d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -500,10 +501,13 @@ public class HoodieCommitMetadata implements Serializable 
{
       if (bytes.length == 0) {
         return clazz.newInstance();
       }
-      return fromJsonString(
-          fromUTF8Bytes(
-              
convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), 
org.apache.hudi.avro.model.HoodieCommitMetadata.class)),
-          clazz);
+      try {
+        return 
fromJsonString(fromUTF8Bytes(convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes),
 org.apache.hudi.avro.model.HoodieCommitMetadata.class)), clazz);
+      } catch (Exception e) {
+        // fall back to the alternative method (0.x)
+        LOG.warn("Primary method failed; trying alternative deserialization 
method.", e);
+        return fromJsonString(new String(bytes, StandardCharsets.UTF_8), 
clazz);
+      }
     } catch (Exception e) {
       throw new IOException("unable to read commit metadata for bytes length: 
" + bytes.length, e);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index d56689863c9..30b4ff73d2f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -437,6 +437,14 @@ public class HoodieTableMetaClient implements Serializable 
{
     return activeTimeline;
   }
 
+  /**
+   * Reload the table config properties.
+   */
+  public synchronized void reloadTableConfig() {
+    this.tableConfig = new HoodieTableConfig(this.storage, metaPath,
+        this.tableConfig.getRecordMergeMode(), 
this.tableConfig.getKeyGeneratorClassName(), 
this.tableConfig.getRecordMergeStrategyId());
+  }
+
   /**
    * Returns next instant time in the correct format. Lock is enabled by 
default.
    */
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index fa5430d78ec..ec8a6c8cb31 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
@@ -137,6 +138,7 @@ public class HoodieTestUtils {
   public static HoodieTableMetaClient init(String basePath, HoodieTableType 
tableType, HoodieTableVersion version) throws IOException {
     Properties properties = new Properties();
     properties.setProperty(HoodieTableConfig.VERSION.key(), 
String.valueOf(version.versionCode()));
+    properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition");
     return init(getDefaultStorageConf(), basePath, tableType, properties);
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala
index 15988d214d7..fffd75f6237 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.HoodieTableVersion
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
+import org.apache.hudi.exception.{HoodieException, 
HoodieUpgradeDowngradeException}
 import org.apache.spark.sql.SaveMode
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Test
@@ -34,7 +34,7 @@ class TestMultipleTableVersionWriting extends 
HoodieSparkWriterTestBase {
 
   @Test
   def testTableVersionAndWriteVersionMatching(): Unit = {
-    val basePath = s"${tempBasePath}/tbl_1";
+    val basePath = s"$tempBasePath/tbl_1"
     val df = spark.range(1).selectExpr("1 as id", "1 as name", "1 as 
partition")
 
     // write table with current version
@@ -43,7 +43,7 @@ class TestMultipleTableVersionWriting extends 
HoodieSparkWriterTestBase {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    val metaClient = HoodieTestUtils.createMetaClient(basePath);
+    val metaClient = HoodieTestUtils.createMetaClient(basePath)
     assertEquals(HoodieTableVersion.current().versionCode(),
       metaClient.getTableConfig.getTableVersion.versionCode())
 
@@ -59,14 +59,24 @@ class TestMultipleTableVersionWriting extends 
HoodieSparkWriterTestBase {
 
   @Test
   def testThrowsExceptionForIncompatibleTableVersion(): Unit = {
-    val basePath = s"${tempBasePath}/tbl_2";
-    HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, 
HoodieTableVersion.SIX);
+    val basePath = s"$tempBasePath/tbl_2"
+    HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, 
HoodieTableVersion.SIX)
 
     val df = spark.range(1).selectExpr("1 as id", "1 as name", "1 as 
partition")
-    assertThrows[HoodieNotSupportedException] {
+    // should error out when starting with table version 6 and writing with 
auto upgrade disabled
+    assertThrows[HoodieUpgradeDowngradeException] {
       df.write.format("hudi")
+        .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key(), "false")
         .mode(SaveMode.Append)
         .save(basePath)
     }
+    // should succeed when writing with auto upgrade enabled (default)
+    df.write.format("hudi")
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val metaClient = HoodieTestUtils.createMetaClient(basePath)
+    assertEquals(HoodieTableVersion.current().versionCode(),
+      metaClient.getTableConfig.getTableVersion.versionCode())
   }
 }

Reply via email to