This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d2877631a2 [HUDI-8409] Fixing merge mode config during upgrade and 
downgrade from version 7 to 8 and back  (#13046)
0d2877631a2 is described below

commit 0d2877631a20133d4f9801e8d1453a65dec62767
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Sat Mar 29 12:03:35 2025 +0530

    [HUDI-8409] Fixing merge mode config during upgrade and downgrade from 
version 7 to 8 and back  (#13046)
    
    Update the upgrade/downgrade logic for merge mode related configs.
    
    ---------
    
    Co-authored-by: Lin Liu <linliu.c...@gmail.com>
    Co-authored-by: sivabalan <n.siv...@gmail.com>
---
 .../hudi/table/upgrade/DowngradeHandler.java       |  7 ++-
 .../upgrade/EightToSevenDowngradeHandler.java      | 43 +++++++++-----
 .../table/upgrade/FiveToFourDowngradeHandler.java  |  6 +-
 .../table/upgrade/FourToThreeDowngradeHandler.java |  6 +-
 .../table/upgrade/OneToZeroDowngradeHandler.java   |  5 +-
 .../table/upgrade/SevenToEightUpgradeHandler.java  | 68 +++++++++++++++++++---
 .../table/upgrade/SevenToSixDowngradeHandler.java  |  6 +-
 .../table/upgrade/SixToFiveDowngradeHandler.java   |  7 ++-
 .../table/upgrade/ThreeToTwoDowngradeHandler.java  |  6 +-
 .../table/upgrade/TwoToOneDowngradeHandler.java    |  5 +-
 .../hudi/table/upgrade/UpgradeDowngrade.java       | 23 ++++++--
 .../upgrade/TestEightToSevenDowngradeHandler.java  | 45 +++++++++++++-
 .../upgrade/TestSevenToEightUpgradeHandler.java    | 60 +++++++++++++++----
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  | 41 +++++++++++--
 .../hudi/functional/TestSevenToEightUpgrade.scala  | 15 +++--
 15 files changed, 277 insertions(+), 66 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
index 51981e441de..69e0d82aa3f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
@@ -20,8 +20,10 @@ package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -36,9 +38,10 @@ public interface DowngradeHandler {
    * @param context                instance of {@link HoodieEngineContext} to 
be used.
    * @param instantTime            current instant time that should not be 
touched.
    * @param upgradeDowngradeHelper instance of {@link 
SupportsUpgradeDowngrade} to be used.
-   * @return Map of config properties and its values to be added to table 
properties.
+   * @return Map of config properties and its values to be added to table 
properties along with the list
+   * of config properties to be removed from the table config.
    */
-  Map<ConfigProperty, String> downgrade(
+  Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(
       HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime,
       SupportsUpgradeDowngrade upgradeDowngradeHelper);
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 0df93dc51ec..5bf9ce22eff 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -25,7 +25,9 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BootstrapIndexType;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -42,10 +44,11 @@ import 
org.apache.hudi.common.table.timeline.versioning.v2.ArchivedTimelineLoade
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -61,6 +64,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -95,7 +99,7 @@ public class EightToSevenDowngradeHandler implements 
DowngradeHandler {
   private static final Set<String> SUPPORTED_METADATA_PARTITION_PATHS = 
getSupportedMetadataPartitionPaths();
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
     Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
     // Rollback and run compaction in one step
@@ -133,7 +137,9 @@ public class EightToSevenDowngradeHandler implements 
DowngradeHandler {
     // downgrade table properties
     downgradePartitionFields(config, metaClient.getTableConfig(), 
tablePropsToAdd);
     unsetInitialVersion(metaClient.getTableConfig(), tablePropsToAdd);
-    unsetRecordMergeMode(metaClient.getTableConfig(), tablePropsToAdd);
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+    tablePropsToRemove.addAll(unsetRecordMergeMode(config, 
metaClient.getTableConfig(), tablePropsToAdd));
+    tablePropsToRemove.add(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID);
     downgradeKeyGeneratorType(metaClient.getTableConfig(), tablePropsToAdd);
     downgradeBootstrapIndexType(metaClient.getTableConfig(), tablePropsToAdd);
 
@@ -143,7 +149,7 @@ public class EightToSevenDowngradeHandler implements 
DowngradeHandler {
       downgradeMetadataPartitions(context, metaClient.getStorage(), 
metaClient, tablePropsToAdd);
       UpgradeDowngradeUtils.updateMetadataTableVersion(context, 
HoodieTableVersion.SEVEN, metaClient);
     }
-    return tablePropsToAdd;
+    return Pair.of(tablePropsToAdd, tablePropsToRemove);
   }
 
   static void downgradePartitionFields(HoodieWriteConfig config,
@@ -161,19 +167,24 @@ public class EightToSevenDowngradeHandler implements 
DowngradeHandler {
     tableConfig.getProps().remove(HoodieTableConfig.INITIAL_VERSION.key());
   }
 
-  static void unsetRecordMergeMode(HoodieTableConfig tableConfig, 
Map<ConfigProperty, String> tablePropsToAdd) {
-    Triple<RecordMergeMode, String, String> mergingConfigs =
-        HoodieTableConfig.inferCorrectMergingBehavior(
-            tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(),
-            tableConfig.getRecordMergeStrategyId(), 
tableConfig.getPreCombineField(),
-            tableConfig.getTableVersion());
-    if (StringUtils.nonEmpty(mergingConfigs.getMiddle())) {
-      tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, 
mergingConfigs.getMiddle());
-    }
-    if (StringUtils.nonEmpty(mergingConfigs.getRight())) {
-      tablePropsToAdd.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID, 
mergingConfigs.getRight());
+  static List<ConfigProperty> unsetRecordMergeMode(HoodieWriteConfig config, 
HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
+      switch (mergeMode) {
+        case EVENT_TIME_ORDERING:
+          tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, 
DefaultHoodieRecordPayload.class.getName());
+          break;
+        case COMMIT_TIME_ORDERING:
+          tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, 
OverwriteWithLatestAvroPayload.class.getName());
+          break;
+        case CUSTOM:
+          throw new HoodieUpgradeDowngradeException("Custom payload class must 
be available for downgrading custom merge mode");
+        default:
+          throw new HoodieUpgradeDowngradeException("Downgrade is not handled 
for " + mergeMode);
+      }
     }
-    tableConfig.getProps().remove(HoodieTableConfig.RECORD_MERGE_MODE.key());
+    return Collections.singletonList(HoodieTableConfig.RECORD_MERGE_MODE);
   }
 
   static void downgradeBootstrapIndexType(HoodieTableConfig tableConfig,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java
index e51f5496c2d..57ed25b8051 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java
@@ -21,15 +21,17 @@ package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 public class FiveToFourDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
-    return Collections.emptyMap();
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    return Pair.of(Collections.emptyMap(), Collections.emptyList());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java
index 86a594af17c..c4454c0164c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java
@@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -33,12 +35,12 @@ import java.util.Map;
 public class FourToThreeDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     if (config.isMetadataTableEnabled()) {
       // Metadata Table in version 4 has a schema that is not forward 
compatible.
       // Hence, it is safe to delete the metadata table, which will be 
re-initialized in subsequent commit.
       HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
     }
-    return Collections.emptyMap();
+    return Pair.of(Collections.emptyMap(), Collections.emptyList());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
index 9e4a1fbd638..aaabd1ca2b8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkers;
@@ -38,7 +39,7 @@ import java.util.stream.Collectors;
 public class OneToZeroDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(
       HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime,
       SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
@@ -50,6 +51,6 @@ public class OneToZeroDowngradeHandler implements 
DowngradeHandler {
       WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), table, 
inflightInstant.requestedTime());
       writeMarkers.quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
     }
-    return Collections.emptyMap();
+    return Pair.of(Collections.emptyMap(), Collections.emptyList());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
index b7d74ff7b74..4e0d7ec004c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.BootstrapIndexType;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -178,23 +179,76 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
   }
 
   static void upgradeMergeMode(HoodieTableConfig tableConfig, 
Map<ConfigProperty, String> tablePropsToAdd) {
-    if (tableConfig.getPayloadClass() != null
-        && 
tableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
 {
-      if (HoodieTableType.COPY_ON_WRITE == tableConfig.getTableType()) {
-        tablePropsToAdd.put(
-            HoodieTableConfig.RECORD_MERGE_MODE,
-            RecordMergeMode.COMMIT_TIME_ORDERING.name());
-      } else {
+    String payloadClass = tableConfig.getPayloadClass();
+    String preCombineField = tableConfig.getPreCombineField();
+    if (isCustomPayloadClass(payloadClass)) {
+      // This contains a special case: HoodieMetadataPayload.
+      tablePropsToAdd.put(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME,
+          payloadClass);
+      tablePropsToAdd.put(
+          HoodieTableConfig.RECORD_MERGE_MODE,
+          RecordMergeMode.CUSTOM.name());
+      tablePropsToAdd.put(
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+          HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+    } else if (tableConfig.getTableType() == HoodieTableType.COPY_ON_WRITE) {
+      setEventTimeOrCommitTimeBasedOnPayload(payloadClass, tablePropsToAdd);
+    } else { // MOR table
+      if (StringUtils.nonEmpty(preCombineField)) {
+        // This contains a special case: OverwriteWithLatestPayload with 
preCombine field.
         tablePropsToAdd.put(
             HoodieTableConfig.PAYLOAD_CLASS_NAME,
             DefaultHoodieRecordPayload.class.getName());
         tablePropsToAdd.put(
             HoodieTableConfig.RECORD_MERGE_MODE,
             RecordMergeMode.EVENT_TIME_ORDERING.name());
+        tablePropsToAdd.put(
+            HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+            HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+      } else {
+        setEventTimeOrCommitTimeBasedOnPayload(payloadClass, tablePropsToAdd);
       }
     }
   }
 
+  private static void setEventTimeOrCommitTimeBasedOnPayload(String 
payloadClass, Map<ConfigProperty, String> tablePropsToAdd) {
+    // DefaultRecordPayload without preCombine Field.
+    // This is unlikely to happen.
+    if (useDefaultHoodieRecordPayload(payloadClass)) {
+      tablePropsToAdd.put(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME,
+          DefaultHoodieRecordPayload.class.getName());
+      tablePropsToAdd.put(
+          HoodieTableConfig.RECORD_MERGE_MODE,
+          RecordMergeMode.EVENT_TIME_ORDERING.name());
+      tablePropsToAdd.put(
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+          HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+    } else {
+      tablePropsToAdd.put(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME,
+          OverwriteWithLatestAvroPayload.class.getName());
+      tablePropsToAdd.put(
+          HoodieTableConfig.RECORD_MERGE_MODE,
+          RecordMergeMode.COMMIT_TIME_ORDERING.name());
+      tablePropsToAdd.put(
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+          HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+    }
+  }
+
+  static boolean useDefaultHoodieRecordPayload(String payloadClass) {
+    return !StringUtils.isNullOrEmpty(payloadClass)
+        && payloadClass.equals(DefaultHoodieRecordPayload.class.getName());
+  }
+
+  static boolean isCustomPayloadClass(String payloadClass) {
+    return !StringUtils.isNullOrEmpty(payloadClass)
+        && !payloadClass.equals(DefaultHoodieRecordPayload.class.getName())
+        && 
!payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName());
+  }
+
   static void setInitialVersion(HoodieTableConfig tableConfig, 
Map<ConfigProperty, String> tablePropsToAdd) {
     if (tableConfig.contains(HoodieTableConfig.VERSION)) {
       tablePropsToAdd.put(HoodieTableConfig.INITIAL_VERSION, 
String.valueOf(tableConfig.getTableVersion().versionCode()));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java
index 7a5280a57a3..3aae6f68504 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java
@@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -36,9 +38,9 @@ import java.util.Map;
 public class SevenToSixDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
     UpgradeDowngradeUtils.updateMetadataTableVersion(context, 
HoodieTableVersion.SIX, table.getMetaClient());
-    return Collections.emptyMap();
+    return Pair.of(Collections.emptyMap(), Collections.emptyList());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
index bfeaf00447b..1cbb5936a2b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
@@ -25,11 +25,14 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.table.HoodieTable;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -47,7 +50,7 @@ import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PART
 public class SixToFiveDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
 
     // Since version 6 includes a new schema field for metadata table(MDT), 
the MDT needs to be deleted during downgrade to avoid column drop error.
@@ -64,7 +67,7 @@ public class SixToFiveDowngradeHandler implements 
DowngradeHandler {
         .ifPresent(v -> updatedTableProps.put(TABLE_METADATA_PARTITIONS, v));
     
Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS_INFLIGHT))
         .ifPresent(v -> 
updatedTableProps.put(TABLE_METADATA_PARTITIONS_INFLIGHT, v));
-    return updatedTableProps;
+    return Pair.of(updatedTableProps, Collections.emptyList());
   }
 
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
index 4f209f05ffc..bc5baaa3302 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
@@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -33,13 +35,13 @@ import java.util.Map;
 public class ThreeToTwoDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     if (config.isMetadataTableEnabled()) {
       // Metadata Table in version 3 is synchronous and in version 2 is 
asynchronous. Downgrading to asynchronous
       // removes the checks in code to decide whether to use a LogBlock or 
not. Also, the schema for the
       // table has been updated and is not forward compatible. Hence, we need 
to delete the table.
       HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
     }
-    return Collections.emptyMap();
+    return Pair.of(Collections.emptyMap(), Collections.emptyList());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
index 59193bfc9ce..05e0d1c31c4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.MarkerUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.HoodieStorage;
@@ -52,7 +53,7 @@ import static 
org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
 public class TwoToOneDowngradeHandler implements DowngradeHandler {
 
   @Override
-  public Map<ConfigProperty, String> downgrade(
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(
       HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime,
       SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
@@ -70,7 +71,7 @@ public class TwoToOneDowngradeHandler implements 
DowngradeHandler {
         throw new HoodieException("Converting marker files to DIRECT style 
failed during downgrade", e);
       }
     }
-    return Collections.emptyMap();
+    return Pair.of(Collections.emptyMap(), Collections.emptyList());
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 2ffabc14128..124cdad0a4c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -40,7 +41,9 @@ import org.apache.hudi.table.HoodieTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Hashtable;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -151,13 +154,14 @@ public class UpgradeDowngrade {
 
     // Perform the actual upgrade/downgrade; this has to be idempotent, for 
now.
     LOG.info("Attempting to move table from version " + fromVersion + " to " + 
toVersion);
-    Map<ConfigProperty, String> tableProps = new Hashtable<>();
+    Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
     boolean isDowngrade = false;
     if (fromVersion.versionCode() < toVersion.versionCode()) {
       // upgrade
       while (fromVersion.versionCode() < toVersion.versionCode()) {
         HoodieTableVersion nextVersion = 
HoodieTableVersion.fromVersionCode(fromVersion.versionCode() + 1);
-        tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
+        tablePropsToAdd.putAll(upgrade(fromVersion, nextVersion, instantTime));
         fromVersion = nextVersion;
       }
     } else {
@@ -165,7 +169,9 @@ public class UpgradeDowngrade {
       isDowngrade = true;
       while (fromVersion.versionCode() > toVersion.versionCode()) {
         HoodieTableVersion prevVersion = 
HoodieTableVersion.fromVersionCode(fromVersion.versionCode() - 1);
-        tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
+        Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
tablePropsToAddAndRemove = downgrade(fromVersion, prevVersion, instantTime);
+        tablePropsToAdd.putAll(tablePropsToAddAndRemove.getLeft());
+        tablePropsToRemove.addAll(tablePropsToAddAndRemove.getRight());
         fromVersion = prevVersion;
       }
     }
@@ -174,8 +180,15 @@ public class UpgradeDowngrade {
       metaClient = HoodieTableMetaClient.reload(metaClient);
     }
     // Write out the current version in hoodie.properties.updated file
-    for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
+    for (Map.Entry<ConfigProperty, String> entry : tablePropsToAdd.entrySet()) 
{
+      // add alternate keys.
       metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
+      entry.getKey().getAlternatives().forEach(alternateKey -> {
+        metaClient.getTableConfig().setValue((String)alternateKey, 
entry.getValue());
+      });
+    }
+    for (ConfigProperty configProperty : tablePropsToRemove) {
+      metaClient.getTableConfig().clearValue(configProperty);
     }
     // user could have disabled auto upgrade (probably to deploy the new 
binary only),
     // in which case, we should not update the table version
@@ -234,7 +247,7 @@ public class UpgradeDowngrade {
     }
   }
 
-  protected Map<ConfigProperty, String> downgrade(HoodieTableVersion 
fromVersion, HoodieTableVersion toVersion, String instantTime) {
+  protected Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String 
instantTime) {
     if (fromVersion == HoodieTableVersion.ONE && toVersion == 
HoodieTableVersion.ZERO) {
       return new OneToZeroDowngradeHandler().downgrade(config, context, 
instantTime, upgradeDowngradeHelper);
     } else if (fromVersion == HoodieTableVersion.TWO && toVersion == 
HoodieTableVersion.ONE) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java
index 300b00a4880..62f0e54f9ed 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java
@@ -28,7 +28,9 @@ import org.apache.hudi.common.model.BootstrapIndexType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -41,8 +43,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.Mock;
 import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.io.File;
@@ -67,6 +72,7 @@ import static 
org.apache.hudi.metadata.MetadataPartitionType.FILES;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
@@ -154,6 +160,7 @@ class TestEightToSevenDowngradeHandler {
     existingTableProps.put(RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name());
     existingTableProps.put(BOOTSTRAP_INDEX_TYPE.key(), 
BootstrapIndexType.HFILE.name());
     existingTableProps.put(KEY_GENERATOR_TYPE.key(), 
KeyGeneratorType.CUSTOM.name());
+    
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
     when(tableConfig.getProps()).thenReturn(new 
TypedProperties(existingTableProps));
     
when(config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())).thenReturn("partition_field");
     when(tableConfig.getPartitionFieldProp()).thenReturn("partition_field");
@@ -163,8 +170,8 @@ class TestEightToSevenDowngradeHandler {
     assertEquals("partition_field", tablePropsToAdd.get(PARTITION_FIELDS));
     EightToSevenDowngradeHandler.unsetInitialVersion(tableConfig, 
tablePropsToAdd);
     assertFalse(tableConfig.getProps().containsKey(INITIAL_VERSION.key()));
-    EightToSevenDowngradeHandler.unsetRecordMergeMode(tableConfig, 
tablePropsToAdd);
-    assertFalse(tableConfig.getProps().containsKey(RECORD_MERGE_MODE.key()));
+    List<ConfigProperty> propsToRemove = 
EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig, 
tablePropsToAdd);
+    assertTrue(propsToRemove.contains(RECORD_MERGE_MODE));
     assertTrue(tablePropsToAdd.containsKey(PAYLOAD_CLASS_NAME));
     EightToSevenDowngradeHandler.downgradeBootstrapIndexType(tableConfig, 
tablePropsToAdd);
     assertFalse(tablePropsToAdd.containsKey(BOOTSTRAP_INDEX_TYPE));
@@ -173,4 +180,38 @@ class TestEightToSevenDowngradeHandler {
     assertFalse(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE));
     assertFalse(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME));
   }
+
+  @ParameterizedTest
+  @CsvSource({
+      "com.example.CustomPayload, CUSTOM, com.example.CustomPayload",
+      ", CUSTOM, ",
+      "org.apache.hudi.metadata.HoodieMetadataPayload, CUSTOM, 
org.apache.hudi.metadata.HoodieMetadataPayload",
+      "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, 
COMMIT_TIME_ORDERING, 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+      "org.apache.hudi.common.model.DefaultHoodieRecordPayload, 
EVENT_TIME_ORDERING, org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+      ", EVENT_TIME_ORDERING, 
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+      ", COMMIT_TIME_ORDERING, 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+  })
+  void testUnsetRecordMergeMode(String payloadClass, String recordMergeMode, 
String expectedPayloadClass) {
+    HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+
+    when(tableConfig.getPayloadClass()).thenReturn(payloadClass);
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.valueOf(recordMergeMode));
+    }
+
+    if (!StringUtils.isNullOrEmpty(recordMergeMode) && 
recordMergeMode.equals("CUSTOM") && StringUtils.isNullOrEmpty(payloadClass)) {
+      assertThrows(HoodieUpgradeDowngradeException.class, () -> 
EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig, 
tablePropsToAdd));
+    } else {
+      List<ConfigProperty> propsToRemove = 
EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig, 
tablePropsToAdd);
+      assertTrue(propsToRemove.stream().anyMatch(cfg -> 
cfg.key().equals(RECORD_MERGE_MODE.key())));
+      
assertTrue(!tablePropsToAdd.containsKey(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID));
+
+      if (!StringUtils.isNullOrEmpty(payloadClass)) {
+        
assertFalse(tablePropsToAdd.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+      } else {
+        assertEquals(expectedPayloadClass, 
tablePropsToAdd.get(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+      }
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
index d6d3cb4998c..bcc9d897620 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
@@ -22,18 +22,18 @@ package org.apache.hudi.table.upgrade;
 import org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-import org.apache.hudi.table.HoodieTable;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.HashMap;
@@ -56,14 +56,7 @@ import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
-public class TestSevenToEightUpgradeHandler {
-
-  @Mock
-  private HoodieTable table;
-  @Mock
-  private HoodieTableMetaClient metaClient;
-  @Mock
-  private HoodieEngineContext context;
+class TestSevenToEightUpgradeHandler {
   @Mock
   private HoodieWriteConfig config;
   @Mock
@@ -122,4 +115,49 @@ public class TestSevenToEightUpgradeHandler {
     assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME));
     assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE));
   }
+
+  @ParameterizedTest
+  @CsvSource({
+      // hard coding all merge strategy Ids since older version of hudi has 
different variable name to represent the merge strategy id.
+      "com.example.CustomPayload, , CUSTOM, " + 
"00000000-0000-0000-0000-000000000000" + ", com.example.CustomPayload",
+      "com.example.CustomPayload, preCombineFieldValue, CUSTOM, " + 
"00000000-0000-0000-0000-000000000000" + ", com.example.CustomPayload",
+      "org.apache.hudi.metadata.HoodieMetadataPayload, , CUSTOM, " + 
"00000000-0000-0000-0000-000000000000" + ", 
org.apache.hudi.metadata.HoodieMetadataPayload",
+      "org.apache.hudi.metadata.HoodieMetadataPayload, preCombineFieldValue, 
CUSTOM, " + "00000000-0000-0000-0000-000000000000" + ", 
org.apache.hudi.metadata.HoodieMetadataPayload",
+      "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, , 
COMMIT_TIME_ORDERING, " + "ce9acb64-bde0-424c-9b91-f6ebba25356d"
+          + ", org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+      "org.apache.hudi.common.model.DefaultHoodieRecordPayload, , 
EVENT_TIME_ORDERING, " + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5"
+          + ", org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+      ", preCombineFieldValue, EVENT_TIME_ORDERING, " + 
"eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ", 
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+      "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, 
preCombineFieldValue, EVENT_TIME_ORDERING,"
+          + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ", 
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+      ", preCombineFieldValue, EVENT_TIME_ORDERING, " + 
"eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ", 
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+      "org.apache.hudi.common.model.DefaultHoodieRecordPayload, 
preCombineFieldValue, EVENT_TIME_ORDERING,"
+          + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ", 
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+      ", , COMMIT_TIME_ORDERING, " + "ce9acb64-bde0-424c-9b91-f6ebba25356d" + 
", org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+  })
+  void testUpgradeMergeMode(String payloadClass, String preCombineField, 
String expectedMergeMode, String expectedStrategy, String expectedPayloadClass) 
{
+    HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+
+    when(tableConfig.getPayloadClass()).thenReturn(payloadClass);
+    when(tableConfig.getPreCombineField()).thenReturn(preCombineField);
+
+    SevenToEightUpgradeHandler.upgradeMergeMode(tableConfig, tablePropsToAdd);
+
+    assertEquals(expectedMergeMode, 
tablePropsToAdd.get(HoodieTableConfig.RECORD_MERGE_MODE));
+    assertEquals(expectedStrategy, 
tablePropsToAdd.get(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID));
+    if (expectedPayloadClass != null) {
+      assertEquals(expectedPayloadClass, 
tablePropsToAdd.get(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+    } else {
+      
assertTrue(!tablePropsToAdd.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+    }
+  }
+
+  private static Map<ConfigProperty, String> createMap(Object... keyValues) {
+    Map<ConfigProperty, String> map = new HashMap<>();
+    for (int i = 0; i < keyValues.length; i += 2) {
+      map.put((ConfigProperty) keyValues[i], (String) keyValues[i + 1]);
+    }
+    return map;
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 8f3da8cf31e..e815de43968 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -22,8 +22,8 @@ import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, HoodieConfig, TypedProperties}
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, 
OverwriteWithLatestAvroPayload, WriteOperationType}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.config.HoodieWriteConfig.{RECORD_MERGE_MODE, 
SPARK_SQL_MERGE_INTO_PREPPED_KEY}
 import org.apache.hudi.exception.HoodieException
@@ -172,14 +172,45 @@ object HoodieWriterUtils {
       || key.equals(RECORD_MERGE_MODE.key())
       || key.equals(RECORD_MERGE_STRATEGY_ID.key())))
 
-    //don't validate the payload only in the case that insert into is using 
fallback to some legacy configs
-    ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && 
value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME))
+    ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && 
shouldIgnorePayloadValidation(value, params, tableConfig))
     // If hoodie.database.name is empty, ignore validation.
     ignoreConfig = ignoreConfig || 
(key.equals(HoodieTableConfig.DATABASE_NAME.key()) && 
isNullOrEmpty(getStringFromTableConfigWithAlternatives(tableConfig, key)))
-
     ignoreConfig
   }
 
+  def shouldIgnorePayloadValidation(value: String, params: Map[String, 
String], tableConfig: HoodieConfig): Boolean = {
+    //don't validate the payload only in the case that insert into is using 
fallback to some legacy configs
+    val ignoreConfig = value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)
+    if (ignoreConfig) {
+       ignoreConfig
+    } else {
+      if (tableConfig == null) {
+        true
+      } else {
+        // In table version 8, if table Config payload refers to 
DefaultHoodieRecordPayload and if initial table version is 6, payload class 
config
+        // writer props are allowed to be OverwriteWithLatest
+        val tableVersion = if 
(tableConfig.contains(HoodieTableConfig.VERSION.key())) {
+          
HoodieTableVersion.fromVersionCode(tableConfig.getInt(HoodieTableConfig.VERSION))
+        } else {
+          HoodieTableVersion.current()
+        }
+        val initTableVersion = if 
(tableConfig.contains(HoodieTableConfig.INITIAL_VERSION.key())) {
+          
HoodieTableVersion.fromVersionCode(tableConfig.getInt(HoodieTableConfig.INITIAL_VERSION))
+        } else {
+          HoodieTableVersion.current()
+        }
+
+        if (tableVersion == HoodieTableVersion.EIGHT && 
initTableVersion.lesserThan(HoodieTableVersion.EIGHT)
+          && value.equals(classOf[OverwriteWithLatestAvroPayload].getName)
+          && 
tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()).equals(classOf[DefaultHoodieRecordPayload].getName))
 {
+          true
+        } else {
+          ignoreConfig
+        }
+      }
+    }
+  }
+
   def validateTableConfig(spark: SparkSession, params: Map[String, String],
                           tableConfig: HoodieConfig): Unit = {
     validateTableConfig(spark, params, tableConfig, false)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index 3595850841e..32f08888ae0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, 
RecordMergeMode, Typ
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieRecordMerger, HoodieRecordPayload, HoodieTableType, 
OverwriteWithLatestAvroPayload, TableServiceType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
 import 
org.apache.hudi.common.table.timeline.InstantComparison.{compareTimestamps, 
GREATER_THAN_OR_EQUALS}
-import org.apache.hudi.common.util.Option
+import org.apache.hudi.common.util.{Option, StringUtils}
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorType
@@ -101,13 +101,20 @@ class TestSevenToEightUpgrade extends 
RecordLevelIndexTestBase {
     assertEquals(partitionFields, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
 
     // After upgrade, based on the payload and table type, the merge mode is 
updated accordingly.
-    if (HoodieTableType.COPY_ON_WRITE == tableType) {
+    if (metaClient.getTableConfig.getTableType == 
HoodieTableType.COPY_ON_WRITE) {
       assertEquals(classOf[OverwriteWithLatestAvroPayload].getName, 
metaClient.getTableConfig.getPayloadClass)
       assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING.name, 
metaClient.getTableConfig.getRecordMergeMode.name)
       assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
metaClient.getTableConfig.getRecordMergeStrategyId)
     } else {
-      assertEquals(classOf[DefaultHoodieRecordPayload].getName, 
metaClient.getTableConfig.getPayloadClass)
-      assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name, 
metaClient.getTableConfig.getRecordMergeMode.name)
+      if 
(StringUtils.isNullOrEmpty(metaClient.getTableConfig.getPreCombineField)) {
+        assertEquals(classOf[OverwriteWithLatestAvroPayload].getName, 
metaClient.getTableConfig.getPayloadClass)
+        assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING.name, 
metaClient.getTableConfig.getRecordMergeMode.name)
+        assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
metaClient.getTableConfig.getRecordMergeStrategyId)
+      } else {
+        assertEquals(classOf[DefaultHoodieRecordPayload].getName, 
metaClient.getTableConfig.getPayloadClass)
+        assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name, 
metaClient.getTableConfig.getRecordMergeMode.name)
+        assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
metaClient.getTableConfig.getRecordMergeStrategyId)
+      }
     }
   }
 

Reply via email to