linliu-code commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2223205279
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,162 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
+/**
+ * Version 8 is the placeholder version from 1.0.0 to 1.0.2.
+ * Version 9 is the placeholder version >= 1.1.0.
+ * Major upgrade logic:
+ * Deprecate a given set of payload classes to prefer merge mode.
+ */
+public class EightToNineUpgradeHandler implements UpgradeHandler {
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
HoodieEngineContext context,
- String instantTime,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-
- return Collections.emptyMap();
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
upgrade(HoodieWriteConfig config,
+
HoodieEngineContext context,
+
String instantTime,
+
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ // If auto upgrade is disabled, set writer version to 8 and return.
+ if (!config.autoUpgrade()) {
+ config.setValue(
+ HoodieWriteConfig.WRITE_TABLE_VERSION,
+ String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+ return Pair.of(tablePropsToAdd, Collections.emptyList());
+ }
+ // If metadata is enabled for the data table, and
+ // existing metadata table is behind the data table, then delete it.
+ checkAndHandleMetadataTable(context, table, config, metaClient);
+ // Rollback and run compaction in one step.
+ rollbackFailedWritesAndCompact(
+ table, context, config, upgradeDowngradeHelper,
+
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+ HoodieTableVersion.EIGHT);
+ // Handle merge mode config.
+ upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
+ // Handle partial update mode config.
+ upgradePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+ // Handle merge properties config.
+ upgradeMergePropertiesConfig(tablePropsToAdd, tableConfig);
+ // Handle payload class configs.
+ List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+ upgradePayloadClassConfig(tablePropsToAdd, tablePropsToRemove,
tableConfig);
+ return Pair.of(tablePropsToAdd, tablePropsToRemove);
+ }
+
+ private void upgradePayloadClassConfig(Map<ConfigProperty, String>
tablePropsToAdd,
+ List<ConfigProperty>
tablePropsToRemove,
+ HoodieTableConfig tableConfig) {
+ String payloadClass = tableConfig.getPayloadClass();
+ if (StringUtils.isNullOrEmpty(payloadClass)) {
+ return;
+ }
+ if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
+ tablePropsToAdd.put(LEGACY_PAYLOAD_CLASS_NAME, payloadClass);
+ tablePropsToRemove.add(PAYLOAD_CLASS_NAME);
+ }
+ }
+
+ private void upgradeMergeModeConfig(Map<ConfigProperty, String>
tablePropsToAdd,
+ HoodieTableConfig tableConfig) {
+ String payloadClass = tableConfig.getPayloadClass();
+ if (StringUtils.isNullOrEmpty(payloadClass)) {
+ return;
+ }
+ if
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+ || payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+ tablePropsToAdd.put(RECORD_MERGE_MODE,
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+ tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID,
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else if (payloadClass.equals(PartialUpdateAvroPayload.class.getName())
+ || payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())
+ || payloadClass.equals(EventTimeAvroPayload.class.getName())
+ || payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
+ tablePropsToAdd.put(RECORD_MERGE_MODE,
RecordMergeMode.EVENT_TIME_ORDERING.name());
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]