vinothchandar commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2226892658


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,198 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
 import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode. That is,
+ *   for table with payload class defined in RFC-97,
+ *     remove hoodie.compaction.payload.class from table_configs
+ *     add hoodie.legacy.payload.class=payload to table_configs
+ *     set hoodie.table.partial.update.mode based on RFC-97
+ *     set hoodie.table.merge.properties based on RFC-97
+ *     set hoodie.record.merge.mode based on RFC-97
+ *     set hoodie.record.merge.strategy.id based on RFC-97
+ *   for table with event_time/commit_time merge mode,
+ *     set hoodie.table.partial.update.mode to default value
+ *     set hoodie.table.merge.properties to default value
+ *   for table with custom merger or payload,
+ *     set hoodie.table.partial.update.mode to default value
+ *     set hoodie.table.merge.properties to default value
+ */
 public class EightToNineUpgradeHandler implements UpgradeHandler {
+  private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE = 
new HashSet<>(Arrays.asList(
+      EventTimeAvroPayload.class.getName(),
+      MySqlDebeziumAvroPayload.class.getName(),
+      PartialUpdateAvroPayload.class.getName(),
+      PostgresDebeziumAvroPayload.class.getName()));
+  private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE 
= new HashSet<>(Arrays.asList(
+      AWSDmsAvroPayload.class.getName(),
+      OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
 
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
-                                             HoodieEngineContext context,
-                                             String instantTime,
-                                             SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
-    // Rollback and run compaction in one step
-    rollbackFailedWritesAndCompact(
-        table, context, config, upgradeDowngradeHelper,
-        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
-        HoodieTableVersion.NINE);
-
-    // If auto upgrade is disabled, set writer version to 8 and return
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    // If auto upgrade is disabled, set writer version to 8 and return.
     if (!config.autoUpgrade()) {
-      config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
-      return Collections.emptyMap();
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
     }
     HoodieTableMetaClient metaClient = table.getMetaClient();
-
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
     // Populate missing index versions indexes
     Option<HoodieIndexMetadata> indexMetadataOpt = 
metaClient.getIndexMetadata();
     if (indexMetadataOpt.isPresent()) {
       populateIndexVersionIfMissing(indexMetadataOpt);
-
       // Write the updated index metadata back to storage
       HoodieTableMetaClient.writeIndexMetadataToStorage(
           metaClient.getStorage(),
           metaClient.getIndexDefinitionPath(),
           indexMetadataOpt.get(),
           metaClient.getTableConfig().getTableVersion());
     }
-    return Collections.emptyMap();
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step.
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+    // Handle merge mode config.
+    upgradeMergeModeConfig(tablePropsToAdd, tableConfig);

Review Comment:
   have we thought about idempotency/recovery if upgrade fails in between.. 
what happens?
   
   We should make 1 write to `hoodie.properties`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,198 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
 import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode. That is,
+ *   for table with payload class defined in RFC-97,
+ *     remove hoodie.compaction.payload.class from table_configs
+ *     add hoodie.legacy.payload.class=payload to table_configs
+ *     set hoodie.table.partial.update.mode based on RFC-97
+ *     set hoodie.table.merge.properties based on RFC-97
+ *     set hoodie.record.merge.mode based on RFC-97
+ *     set hoodie.record.merge.strategy.id based on RFC-97
+ *   for table with event_time/commit_time merge mode,
+ *     set hoodie.table.partial.update.mode to default value
+ *     set hoodie.table.merge.properties to default value
+ *   for table with custom merger or payload,
+ *     set hoodie.table.partial.update.mode to default value
+ *     set hoodie.table.merge.properties to default value
+ */
 public class EightToNineUpgradeHandler implements UpgradeHandler {
+  private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE = 
new HashSet<>(Arrays.asList(
+      EventTimeAvroPayload.class.getName(),
+      MySqlDebeziumAvroPayload.class.getName(),
+      PartialUpdateAvroPayload.class.getName(),
+      PostgresDebeziumAvroPayload.class.getName()));
+  private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE 
= new HashSet<>(Arrays.asList(
+      AWSDmsAvroPayload.class.getName(),
+      OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
 
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
-                                             HoodieEngineContext context,
-                                             String instantTime,
-                                             SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
-    // Rollback and run compaction in one step
-    rollbackFailedWritesAndCompact(
-        table, context, config, upgradeDowngradeHelper,
-        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
-        HoodieTableVersion.NINE);
-
-    // If auto upgrade is disabled, set writer version to 8 and return
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    // If auto upgrade is disabled, set writer version to 8 and return.
     if (!config.autoUpgrade()) {
-      config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
-      return Collections.emptyMap();
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
     }
     HoodieTableMetaClient metaClient = table.getMetaClient();
-
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
     // Populate missing index versions indexes
     Option<HoodieIndexMetadata> indexMetadataOpt = 
metaClient.getIndexMetadata();
     if (indexMetadataOpt.isPresent()) {
       populateIndexVersionIfMissing(indexMetadataOpt);
-
       // Write the updated index metadata back to storage
       HoodieTableMetaClient.writeIndexMetadataToStorage(
           metaClient.getStorage(),
           metaClient.getIndexDefinitionPath(),
           indexMetadataOpt.get(),
           metaClient.getTableConfig().getTableVersion());
     }
-    return Collections.emptyMap();
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);
+    // Rollback and run compaction in one step.
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+    // Handle merge mode config.
+    upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+    // Handle partial update mode config.
+    upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+    // Handle merge properties config.
+    upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+    // Handle payload class configs.
+    List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+    upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    return Pair.of(tablePropsToAdd, tablePropsToRemove);
+  }
+
+  private void upgradePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd,

Review Comment:
   the `upgradeXXXX` naming is misleading. it feels as though you are actually 
changing storage.. 
   
   rename all these to `changeXXX` e.g `changePayloadClassConfigs` 



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java:
##########
@@ -35,85 +46,200 @@
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mock;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.MockedStatic;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.AbstractMap;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
+import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static org.apache.hudi.common.table.PartialUpdateMode.IGNORE_MARKERS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.LENIENT)
 class TestEightToNineUpgradeHandler {
-
-  @TempDir
-  private Path tempDir;
-
-  @Mock
-  private HoodieWriteConfig config;
-  @Mock
-  private HoodieEngineContext context;
-  @Mock
-  private SupportsUpgradeDowngrade upgradeDowngradeHelper;
-  @Mock
-  private HoodieTable table;
-  @Mock
-  private HoodieTableMetaClient metaClient;
-  @Mock
-  private HoodieTableConfig tableConfig;
-  @Mock
-  private HoodieStorage storage;
-
-  private EightToNineUpgradeHandler upgradeHandler;
+  private final EightToNineUpgradeHandler handler = new 
EightToNineUpgradeHandler();

Review Comment:
   i want to ensure we cover all major payloads in use. with a functional test 
as well.. where we ensure the table is writable/readable after.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,198 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.table.HoodieTable;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
 import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ *  Deprecate a given set of payload classes to prefer merge mode. That is,
+ *   for table with payload class defined in RFC-97,
+ *     remove hoodie.compaction.payload.class from table_configs
+ *     add hoodie.legacy.payload.class=payload to table_configs
+ *     set hoodie.table.partial.update.mode based on RFC-97
+ *     set hoodie.table.merge.properties based on RFC-97
+ *     set hoodie.record.merge.mode based on RFC-97
+ *     set hoodie.record.merge.strategy.id based on RFC-97
+ *   for table with event_time/commit_time merge mode,
+ *     set hoodie.table.partial.update.mode to default value
+ *     set hoodie.table.merge.properties to default value
+ *   for table with custom merger or payload,
+ *     set hoodie.table.partial.update.mode to default value
+ *     set hoodie.table.merge.properties to default value
+ */
 public class EightToNineUpgradeHandler implements UpgradeHandler {
+  private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE = 
new HashSet<>(Arrays.asList(
+      EventTimeAvroPayload.class.getName(),
+      MySqlDebeziumAvroPayload.class.getName(),
+      PartialUpdateAvroPayload.class.getName(),
+      PostgresDebeziumAvroPayload.class.getName()));
+  private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE 
= new HashSet<>(Arrays.asList(
+      AWSDmsAvroPayload.class.getName(),
+      OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
 
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
-                                             HoodieEngineContext context,
-                                             String instantTime,
-                                             SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieWriteConfig config,
+                                                                         
HoodieEngineContext context,
+                                                                         
String instantTime,
+                                                                         
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
-    // Rollback and run compaction in one step
-    rollbackFailedWritesAndCompact(
-        table, context, config, upgradeDowngradeHelper,
-        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
-        HoodieTableVersion.NINE);
-
-    // If auto upgrade is disabled, set writer version to 8 and return
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    // If auto upgrade is disabled, set writer version to 8 and return.
     if (!config.autoUpgrade()) {
-      config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, 
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
-      return Collections.emptyMap();
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return Pair.of(tablePropsToAdd, Collections.emptyList());
     }
     HoodieTableMetaClient metaClient = table.getMetaClient();
-
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
     // Populate missing index versions indexes
     Option<HoodieIndexMetadata> indexMetadataOpt = 
metaClient.getIndexMetadata();
     if (indexMetadataOpt.isPresent()) {
       populateIndexVersionIfMissing(indexMetadataOpt);
-
       // Write the updated index metadata back to storage
       HoodieTableMetaClient.writeIndexMetadataToStorage(
           metaClient.getStorage(),
           metaClient.getIndexDefinitionPath(),
           indexMetadataOpt.get(),
           metaClient.getTableConfig().getTableVersion());
     }
-    return Collections.emptyMap();
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it.
+    checkAndHandleMetadataTable(context, table, config, metaClient);

Review Comment:
   what do we mean by 
   
   > existing metadata table is behind the data table 
   
   we cannot blow away MT for this change. I'd like to understand why this is 
absolutely needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to