nsivabalan commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2217370206


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -20,18 +20,76 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class NineToEightDowngradeHandler implements DowngradeHandler {
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * The major change introduced in version 9 is two table configurations for 
payload deprecation.
+ * During the downgrade, we remove these two table configurations.
+ */
+public class NineToEightDowngradeHandler implements DowngradeHandler {
   @Override
-  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    return Pair.of(Collections.emptyMap(), Collections.emptyList());
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config,
+                                                                           
HoodieEngineContext context,
+                                                                           
String instantTime,
+                                                                           
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.NINE);
+    // Remove partial update mode and merge properties configs.
+    List<ConfigProperty> propertiesToRemove = new ArrayList<>();
+    propertiesToRemove.add(MERGE_PROPERTIES);
+    propertiesToRemove.add(PARTIAL_UPDATE_MODE);
+    // For specified payload classes, add strategy id and custom merge mode.
+    Map<ConfigProperty, String> propertiesToAdd = new HashMap<>();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    String payloadClass = tableConfig.getLegacyPayloadClass();
+    if (!StringUtils.isNullOrEmpty(payloadClass) && 
(PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass))) {
+      propertiesToRemove.add(LEGACY_PAYLOAD_CLASS_NAME);
+      propertiesToAdd.put(PAYLOAD_CLASS_NAME, payloadClass);
+      if (!payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())
+          && !payloadClass.equals(DefaultHoodieRecordPayload.class.getName())) 
{
+        propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);

Review Comment:
   we can ignore making any changes to merge strategy id based on my previous 
feedback 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,154 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode.
+ */
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    // If auto upgrade is disabled, set writer version to 8 and return.
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
+    }
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step.
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+    // Handle merge mode config.
+    upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+    // Handle partial update mode config.
+    upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+    // Handle merge properties config.
+    upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+    // Handle payload class configs.
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+    upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    return Pair.of(tablePropsToAdd, tablePropsToRemove);
+  }
+
+  private void upgradePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                         List<ConfigProperty> 
tablePropsToRemove,
+                                         HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
+      tablePropsToAdd.put(LEGACY_PAYLOAD_CLASS_NAME, payloadClass);
+      tablePropsToRemove.add(PAYLOAD_CLASS_NAME);
+    }
+  }
+
+  private void upgradeMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                      HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+        || payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+      tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+    } else if (payloadClass.equals(PartialUpdateAvroPayload.class.getName())
+        || payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())

Review Comment:
   mysqlDebezium?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,154 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode.
+ */
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    // If auto upgrade is disabled, set writer version to 8 and return.
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
+    }
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step.
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+    // Handle merge mode config.
+    upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+    // Handle partial update mode config.
+    upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+    // Handle merge properties config.
+    upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+    // Handle payload class configs.
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+    upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    return Pair.of(tablePropsToAdd, tablePropsToRemove);
+  }
+
+  private void upgradePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                         List<ConfigProperty> 
tablePropsToRemove,
+                                         HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
+      tablePropsToAdd.put(LEGACY_PAYLOAD_CLASS_NAME, payloadClass);
+      tablePropsToRemove.add(PAYLOAD_CLASS_NAME);
+    }
+  }
+
+  private void upgradeMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                      HoodieTableConfig tableConfig) {
+    String payloadClass = tableConfig.getPayloadClass();
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
+      return;
+    }
+    if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+        || payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+      tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+    } else if (payloadClass.equals(PartialUpdateAvroPayload.class.getName())
+        || payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())
+        || payloadClass.equals(EventTimeAvroPayload.class.getName())) {
+      tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.EVENT_TIME_ORDERING.name());
+      tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);

Review Comment:
   actually can we leave the merge strategyId as is. 
   bcoz, during downgrade we need to set the old value. If not, we need to add 
a legacy property similar t payload class and then set it back during 
downgrade. 
   
   So, keep it simple, lets leave the merge strategy id as is w/o changing 
anything. 
   just that in v9, for commit time and event time based merge mode, these 
merge strategyIds will never be used, which is fine. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to