nsivabalan commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2233034476
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,55 +19,198 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieIndexVersion;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.PAYLOAD_CLASSES_TO_HANDLE;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.checkAndHandleMetadataTable;
import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
+/**
+ * Version 8 presents Hudi version from 1.0.0 to 1.0.2.
+ * Version 9 presents Hudi version >= 1.1.0.
+ * Major upgrade logic:
+ * Deprecate a given set of payload classes to prefer merge mode. That is,
+ * for table with payload class defined in RFC-97,
+ * remove hoodie.compaction.payload.class from table_configs
+ * add hoodie.legacy.payload.class=payload to table_configs
+ * set hoodie.table.partial.update.mode based on RFC-97
+ * set hoodie.table.merge.properties based on RFC-97
+ * set hoodie.record.merge.mode based on RFC-97
+ * set hoodie.record.merge.strategy.id based on RFC-97
+ * for table with event_time/commit_time merge mode,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ * for table with custom merger or payload,
+ * set hoodie.table.partial.update.mode to default value
+ * set hoodie.table.merge.properties to default value
+ */
public class EightToNineUpgradeHandler implements UpgradeHandler {
+ private static final Set<String> PAYLOADS_UPGRADE_TO_EVENT_TIME_MERGE_MODE =
new HashSet<>(Arrays.asList(
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName()));
+ private static final Set<String> PAYLOADS_UPGRADE_TO_COMMIT_TIME_MERGE_MODE
= new HashSet<>(Arrays.asList(
+ AWSDmsAvroPayload.class.getName(),
+ OverwriteNonDefaultsWithLatestAvroPayload.class.getName()));
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
- HoodieEngineContext context,
- String instantTime,
- SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
upgrade(HoodieWriteConfig config,
+
HoodieEngineContext context,
+
String instantTime,
+
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
- // Rollback and run compaction in one step
- rollbackFailedWritesAndCompact(
- table, context, config, upgradeDowngradeHelper,
-
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
- HoodieTableVersion.NINE);
-
- // If auto upgrade is disabled, set writer version to 8 and return
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ // If auto upgrade is disabled, set writer version to 8 and return.
if (!config.autoUpgrade()) {
- config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
- return Collections.emptyMap();
+ config.setValue(
+ HoodieWriteConfig.WRITE_TABLE_VERSION,
+ String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+ return Pair.of(tablePropsToAdd, Collections.emptyList());
}
HoodieTableMetaClient metaClient = table.getMetaClient();
-
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
// Populate missing index versions indexes
Option<HoodieIndexMetadata> indexMetadataOpt =
metaClient.getIndexMetadata();
if (indexMetadataOpt.isPresent()) {
populateIndexVersionIfMissing(indexMetadataOpt);
-
// Write the updated index metadata back to storage
HoodieTableMetaClient.writeIndexMetadataToStorage(
metaClient.getStorage(),
metaClient.getIndexDefinitionPath(),
indexMetadataOpt.get(),
metaClient.getTableConfig().getTableVersion());
}
- return Collections.emptyMap();
+ // If metadata is enabled for the data table, and
+ // existing metadata table is behind the data table, then delete it.
+ checkAndHandleMetadataTable(context, table, config, metaClient);
+ // Rollback and run compaction in one step.
+ rollbackFailedWritesAndCompact(
+ table, context, config, upgradeDowngradeHelper,
+
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+ HoodieTableVersion.EIGHT);
+ // Handle merge mode config.
+ upgradeMergeModeConfig(tablePropsToAdd, tableConfig);
Review Comment:
agreed. will take care of it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]