nsivabalan commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2217867136


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,162 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode.

Review Comment:
   We need to explain what exactly we are looking to do as part of this upgrade



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -71,6 +85,15 @@ public class UpgradeDowngradeUtils {
       Pair.of(REPLACE_COMMIT_ACTION, CLUSTERING_ACTION)
   );
   static final Map<String, String> EIGHT_TO_SIX_TIMELINE_ACTION_MAP = 
CollectionUtils.reverseMap(SIX_TO_EIGHT_TIMELINE_ACTION_MAP);
+  static final Set<String> PAYLOAD_CLASSES_TO_HANDLE = new 
HashSet<>(Arrays.asList(
+      AWSDmsAvroPayload.class.getName(),

Review Comment:
   we can't store this here right. this should go into 
EightToNinveUpgradeHandler class



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,162 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.

Review Comment:
   what is meant by "placeholder" here. 
   Can we just say "Version8 table version maps to Hudi version 1.0.0 to 1.0.2" 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,162 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode.
+ */
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    // If auto upgrade is disabled, set writer version to 8 and return.
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
+    }
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step.
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+    // Handle merge mode config.
+    upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+    // Handle partial update mode config.
+    upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+    // Handle merge properties config.
+    upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+    // Handle payload class configs.
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+    upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    return Pair.of(tablePropsToAdd, tablePropsToRemove);
+  }
+
+  private void upgradePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                         List<ConfigProperty> 
tablePropsToRemove,
+                                         HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
+      tablePropsToAdd.put(LEGACY_PAYLOAD_CLASS_NAME, payloadClass);
+      tablePropsToRemove.add(PAYLOAD_CLASS_NAME);
+    }
+  }
+
+  private void upgradeMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                      HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+        || payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());

Review Comment:
   can we introduce a Set upfront and use it here 
   `payloadsMappedToCommitTimeBasedMergeMode.contains(payloadClass)`
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,162 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode.
+ */
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    // If auto upgrade is disabled, set writer version to 8 and return.
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
+    }
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step.
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+    // Handle merge mode config.
+    upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+    // Handle partial update mode config.
+    upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+    // Handle merge properties config.
+    upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+    // Handle payload class configs.
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+    upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    return Pair.of(tablePropsToAdd, tablePropsToRemove);
+  }
+
+  private void upgradePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                         List<ConfigProperty> 
tablePropsToRemove,
+                                         HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
+      tablePropsToAdd.put(LEGACY_PAYLOAD_CLASS_NAME, payloadClass);
+      tablePropsToRemove.add(PAYLOAD_CLASS_NAME);
+    }
+  }
+
+  private void upgradeMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                      HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+        || payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+      tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+    } else if (payloadClass.equals(PartialUpdateAvroPayload.class.getName())
+        || payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())
+        || payloadClass.equals(EventTimeAvroPayload.class.getName())
+        || payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.EVENT_TIME_ORDERING.name());

Review Comment:
   same for event time based merge mode as well 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,162 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode.
+ */
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    // If auto upgrade is disabled, set writer version to 8 and return.
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
+    }
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step.
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+    // Handle merge mode config.
+    upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+    // Handle partial update mode config.
+    upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+    // Handle merge properties config.
+    upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+    // Handle payload class configs.
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+    upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    return Pair.of(tablePropsToAdd, tablePropsToRemove);
+  }
+
+  private void upgradePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                         List<ConfigProperty> 
tablePropsToRemove,
+                                         HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
+      tablePropsToAdd.put(LEGACY_PAYLOAD_CLASS_NAME, payloadClass);
+      tablePropsToRemove.add(PAYLOAD_CLASS_NAME);
+    }
+  }
+
+  private void upgradeMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                      HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+        || payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+      tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+    } else if (payloadClass.equals(PartialUpdateAvroPayload.class.getName())
+        || payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())
+        || payloadClass.equals(EventTimeAvroPayload.class.getName())
+        || payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.EVENT_TIME_ORDERING.name());
+      tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+    }
+    // else: No op, which means merge strategy id and merge mode are not 
changed.
+  }
+
+  private void upgradePartialUpdateModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                              HoodieTableConfig tableConfig) {
+    // Set partial update mode for all tables.
+    tablePropsToAdd.put(PARTIAL_UPDATE_MODE, PartialUpdateMode.NONE.name());
+
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+        || payloadClass.equals(PartialUpdateAvroPayload.class.getName())) {
+      tablePropsToAdd.put(PARTIAL_UPDATE_MODE, 
PartialUpdateMode.IGNORE_DEFAULTS.name());
+    } else if 
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
+      tablePropsToAdd.put(PARTIAL_UPDATE_MODE, 
PartialUpdateMode.IGNORE_MARKERS.name());
+    } else {
+      tablePropsToAdd.put(PARTIAL_UPDATE_MODE, PartialUpdateMode.NONE.name());
+    }
+  }
+
+  private void upgradeMergePropertiesConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                            HoodieTableConfig tableConfig) {
+    // Set merge properties for all tables.
+    String mergeProperties = "";

Review Comment:
   `StringUtils.EMPTY_STRING`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -20,18 +20,76 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class NineToEightDowngradeHandler implements DowngradeHandler {
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * The major change introduced in version 9 is two table configurations for 
payload deprecation.
+ * During the downgrade, we remove these two table configurations.
+ */
+public class NineToEightDowngradeHandler implements DowngradeHandler {
   @Override
-  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    return Pair.of(Collections.emptyMap(), Collections.emptyList());
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config,
+                                                                           
HoodieEngineContext context,
+                                                                           
String instantTime,
+                                                                           
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.NINE);
+    // Remove partial update mode and merge properties configs.
+    List<ConfigProperty> propertiesToRemove = new ArrayList<>();
+    propertiesToRemove.add(MERGE_PROPERTIES);
+    propertiesToRemove.add(PARTIAL_UPDATE_MODE);
+    // For specified payload classes, add strategy id and custom merge mode.
+    Map<ConfigProperty, String> propertiesToAdd = new HashMap<>();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    String payloadClass = tableConfig.getLegacyPayloadClass();
+    if (!StringUtils.isNullOrEmpty(payloadClass) && 
(PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass))) {
+      propertiesToRemove.add(LEGACY_PAYLOAD_CLASS_NAME);
+      propertiesToAdd.put(PAYLOAD_CLASS_NAME, payloadClass);
+      if (!payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())
+          && !payloadClass.equals(DefaultHoodieRecordPayload.class.getName())) 
{
+        propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);

Review Comment:
   actually its simpler right. 
   we could only have 4 possible values for merge strategy id. 
   
   during upgrade, I am ok removing the merge strategy id for commit time based 
and event time based merge modes. 
   
   during downgrade, we will have the payload config set in the legacy property 
in table config. 
   
   so, if the v8 payload class refers to OverwriteWithLatest, we will set the 
merge strategyId mapping to commit time based merge mode. 
   if the v8 payload class refers to DefaultHoodieRecordPayload, we will set 
the merge strategyId mapping to event time based merge mode. 
   If the v8 payload class refers to any of build in payloads that we know of, 
we should set the merge strategyId to custom. 
   If not, we leave the merge strategyId as is. 
   
    



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,154 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode.
+ */
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    // If auto upgrade is disabled, set writer version to 8 and return.
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
+    }
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step.
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+    // Handle merge mode config.
+    upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+    // Handle partial update mode config.
+    upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+    // Handle merge properties config.
+    upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+    // Handle payload class configs.
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+    upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    return Pair.of(tablePropsToAdd, tablePropsToRemove);
+  }
+
+  private void upgradePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                         List<ConfigProperty> 
tablePropsToRemove,
+                                         HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
+      tablePropsToAdd.put(LEGACY_PAYLOAD_CLASS_NAME, payloadClass);
+      tablePropsToRemove.add(PAYLOAD_CLASS_NAME);
+    }
+  }
+
+  private void upgradeMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                      HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+        || payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+      tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+    } else if (payloadClass.equals(PartialUpdateAvroPayload.class.getName())
+        || payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())

Review Comment:
   we can punt mysql payload then for time being. but lets file a jira to fix 
the upgrade handler once multiple ordering field support is landed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to