This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 71079958a7b [HUDI-8395] Fix metaClient handling when running upgrade
or downgrade (#12224)
71079958a7b is described below
commit 71079958a7bca0d85170bba3a243a52aef1c9770
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Nov 15 19:38:34 2024 +0530
[HUDI-8395] Fix metaClient handling when running upgrade or downgrade
(#12224)
* [HUDI-8395] Fix metaClient handling when running upgrade or downgrade
---------
Co-authored-by: danny0405 <[email protected]>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +-
.../hudi/table/upgrade/UpgradeDowngrade.java | 4 ----
.../org/apache/hudi/util/CommonClientUtils.java | 4 ++--
.../hudi/common/model/HoodieCommitMetadata.java | 12 ++++++++----
.../hudi/common/table/HoodieTableMetaClient.java | 8 ++++++++
.../hudi/common/testutils/HoodieTestUtils.java | 2 ++
.../hudi/TestMultipleTableVersionWriting.scala | 22 ++++++++++++++++------
7 files changed, 37 insertions(+), 17 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 56f08381daa..3e258d892b6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1437,7 +1437,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
new UpgradeDowngrade(metaClient, config, context,
upgradeDowngradeHelper);
if
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
- metaClient = HoodieTableMetaClient.reload(metaClient);
// Ensure no inflight commits by setting EAGER policy and explicitly
cleaning all failed commits
List<String> instantsToRollback =
tableServiceClient.getInstantsToRollback(metaClient,
HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
@@ -1450,6 +1449,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
.run(HoodieTableVersion.current(), instantTime.orElse(null));
+ metaClient.reloadTableConfig();
metaClient.reloadActiveTimeline();
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 20b351ff6be..9d7dc2df55c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -48,8 +48,6 @@ public class UpgradeDowngrade {
private HoodieTableMetaClient metaClient;
protected HoodieWriteConfig config;
protected HoodieEngineContext context;
- private StoragePath updatedPropsFilePath;
- private StoragePath propsFilePath;
public UpgradeDowngrade(
HoodieTableMetaClient metaClient, HoodieWriteConfig config,
HoodieEngineContext context,
@@ -57,8 +55,6 @@ public class UpgradeDowngrade {
this.metaClient = metaClient;
this.config = config;
this.context = context;
- this.updatedPropsFilePath = new StoragePath(metaClient.getMetaPath(),
HOODIE_UPDATED_PROPERTY_FILE);
- this.propsFilePath = new StoragePath(metaClient.getMetaPath(),
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
index 320839af55b..a7d22d2e453 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
@@ -28,8 +28,8 @@ public class CommonClientUtils {
public static void validateTableVersion(HoodieTableConfig tableConfig,
HoodieWriteConfig writeConfig) {
// mismatch of table versions.
if (!tableConfig.getTableVersion().equals(writeConfig.getWriteVersion())) {
- throw new HoodieNotSupportedException(String.format("Table version (%s)
and Writer version (%s) do not match.",
- tableConfig.getTableVersion(), writeConfig.getWriteVersion()));
+ throw new HoodieNotSupportedException(String.format("Table version (%s)
and Writer version (%s) do not match for table at: %s.",
+ tableConfig.getTableVersion(), writeConfig.getWriteVersion(),
writeConfig.getBasePath()));
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 46874bb10f8..22bf37d9c3d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -500,10 +501,13 @@ public class HoodieCommitMetadata implements Serializable
{
if (bytes.length == 0) {
return clazz.newInstance();
}
- return fromJsonString(
- fromUTF8Bytes(
-
convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes),
org.apache.hudi.avro.model.HoodieCommitMetadata.class)),
- clazz);
+ try {
+ return
fromJsonString(fromUTF8Bytes(convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes),
org.apache.hudi.avro.model.HoodieCommitMetadata.class)), clazz);
+ } catch (Exception e) {
+ // fall back to the alternative method (0.x)
+ LOG.warn("Primary method failed; trying alternative deserialization
method.", e);
+ return fromJsonString(new String(bytes, StandardCharsets.UTF_8),
clazz);
+ }
} catch (Exception e) {
throw new IOException("unable to read commit metadata for bytes length:
" + bytes.length, e);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index d56689863c9..30b4ff73d2f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -437,6 +437,14 @@ public class HoodieTableMetaClient implements Serializable
{
return activeTimeline;
}
+ /**
+ * Reload the table config properties.
+ */
+ public synchronized void reloadTableConfig() {
+ this.tableConfig = new HoodieTableConfig(this.storage, metaPath,
+ this.tableConfig.getRecordMergeMode(),
this.tableConfig.getKeyGeneratorClassName(),
this.tableConfig.getRecordMergeStrategyId());
+ }
+
/**
* Returns next instant time in the correct format. Lock is enabled by
default.
*/
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index fa5430d78ec..ec8a6c8cb31 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
@@ -137,6 +138,7 @@ public class HoodieTestUtils {
public static HoodieTableMetaClient init(String basePath, HoodieTableType
tableType, HoodieTableVersion version) throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.VERSION.key(),
String.valueOf(version.versionCode()));
+ properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition");
return init(getDefaultStorageConf(), basePath, tableType, properties);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala
index 15988d214d7..fffd75f6237 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableVersion
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
+import org.apache.hudi.exception.{HoodieException,
HoodieUpgradeDowngradeException}
import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@@ -34,7 +34,7 @@ class TestMultipleTableVersionWriting extends
HoodieSparkWriterTestBase {
@Test
def testTableVersionAndWriteVersionMatching(): Unit = {
- val basePath = s"${tempBasePath}/tbl_1";
+ val basePath = s"$tempBasePath/tbl_1"
val df = spark.range(1).selectExpr("1 as id", "1 as name", "1 as
partition")
// write table with current version
@@ -43,7 +43,7 @@ class TestMultipleTableVersionWriting extends
HoodieSparkWriterTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
- val metaClient = HoodieTestUtils.createMetaClient(basePath);
+ val metaClient = HoodieTestUtils.createMetaClient(basePath)
assertEquals(HoodieTableVersion.current().versionCode(),
metaClient.getTableConfig.getTableVersion.versionCode())
@@ -59,14 +59,24 @@ class TestMultipleTableVersionWriting extends
HoodieSparkWriterTestBase {
@Test
def testThrowsExceptionForIncompatibleTableVersion(): Unit = {
- val basePath = s"${tempBasePath}/tbl_2";
- HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE,
HoodieTableVersion.SIX);
+ val basePath = s"$tempBasePath/tbl_2"
+ HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE,
HoodieTableVersion.SIX)
val df = spark.range(1).selectExpr("1 as id", "1 as name", "1 as
partition")
- assertThrows[HoodieNotSupportedException] {
+ // should error out when starting with table version 6 and writing with
auto upgrade disabled
+ assertThrows[HoodieUpgradeDowngradeException] {
df.write.format("hudi")
+ .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key(), "false")
.mode(SaveMode.Append)
.save(basePath)
}
+ // should succeed when writing with auto upgrade enabled (default)
+ df.write.format("hudi")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val metaClient = HoodieTestUtils.createMetaClient(basePath)
+ assertEquals(HoodieTableVersion.current().versionCode(),
+ metaClient.getTableConfig.getTableVersion.versionCode())
}
}