This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0d2877631a2 [HUDI-8409] Fixing merge mode config during upgrade and downgrade from version 7 to 8 and back (#13046) 0d2877631a2 is described below commit 0d2877631a20133d4f9801e8d1453a65dec62767 Author: Lokesh Jain <lj...@apache.org> AuthorDate: Sat Mar 29 12:03:35 2025 +0530 [HUDI-8409] Fixing merge mode config during upgrade and downgrade from version 7 to 8 and back (#13046) Update the upgrade/downgrade logic for merge mode related configs. --------- Co-authored-by: Lin Liu <linliu.c...@gmail.com> Co-authored-by: sivabalan <n.siv...@gmail.com> --- .../hudi/table/upgrade/DowngradeHandler.java | 7 ++- .../upgrade/EightToSevenDowngradeHandler.java | 43 +++++++++----- .../table/upgrade/FiveToFourDowngradeHandler.java | 6 +- .../table/upgrade/FourToThreeDowngradeHandler.java | 6 +- .../table/upgrade/OneToZeroDowngradeHandler.java | 5 +- .../table/upgrade/SevenToEightUpgradeHandler.java | 68 +++++++++++++++++++--- .../table/upgrade/SevenToSixDowngradeHandler.java | 6 +- .../table/upgrade/SixToFiveDowngradeHandler.java | 7 ++- .../table/upgrade/ThreeToTwoDowngradeHandler.java | 6 +- .../table/upgrade/TwoToOneDowngradeHandler.java | 5 +- .../hudi/table/upgrade/UpgradeDowngrade.java | 23 ++++++-- .../upgrade/TestEightToSevenDowngradeHandler.java | 45 +++++++++++++- .../upgrade/TestSevenToEightUpgradeHandler.java | 60 +++++++++++++++---- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 41 +++++++++++-- .../hudi/functional/TestSevenToEightUpgrade.scala | 15 +++-- 15 files changed, 277 insertions(+), 66 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java index 51981e441de..69e0d82aa3f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java @@ -20,8 +20,10 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import java.util.List; import java.util.Map; /** @@ -36,9 +38,10 @@ public interface DowngradeHandler { * @param context instance of {@link HoodieEngineContext} to be used. * @param instantTime current instant time that should not be touched. * @param upgradeDowngradeHelper instance of {@link SupportsUpgradeDowngrade} to be used. - * @return Map of config properties and its values to be added to table properties. + * @return Map of config properties and its values to be added to table properties along with the list + * of config properties to be removed from the table config. */ - Map<ConfigProperty, String> downgrade( + Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade( HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java index 0df93dc51ec..5bf9ce22eff 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java @@ -25,7 +25,9 @@ import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BootstrapIndexType; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -42,10 +44,11 @@ import org.apache.hudi.common.table.timeline.versioning.v2.ArchivedTimelineLoade import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.common.util.collection.Triple; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieUpgradeDowngradeException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -61,6 +64,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -95,7 +99,7 @@ public class EightToSevenDowngradeHandler implements DowngradeHandler { private static final Set<String> SUPPORTED_METADATA_PARTITION_PATHS = getSupportedMetadataPartitionPaths(); @Override - public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { final HoodieTable table = upgradeDowngradeHelper.getTable(config, context); Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>(); // Rollback and run compaction in one step @@ -133,7 +137,9 @@ public class EightToSevenDowngradeHandler implements DowngradeHandler { // downgrade table properties downgradePartitionFields(config, metaClient.getTableConfig(), tablePropsToAdd); unsetInitialVersion(metaClient.getTableConfig(), tablePropsToAdd); - unsetRecordMergeMode(metaClient.getTableConfig(), tablePropsToAdd); + List<ConfigProperty> tablePropsToRemove = new ArrayList<>(); + tablePropsToRemove.addAll(unsetRecordMergeMode(config, metaClient.getTableConfig(), tablePropsToAdd)); + tablePropsToRemove.add(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID); downgradeKeyGeneratorType(metaClient.getTableConfig(), tablePropsToAdd); downgradeBootstrapIndexType(metaClient.getTableConfig(), tablePropsToAdd); @@ -143,7 +149,7 @@ public class EightToSevenDowngradeHandler implements DowngradeHandler { downgradeMetadataPartitions(context, metaClient.getStorage(), metaClient, tablePropsToAdd); UpgradeDowngradeUtils.updateMetadataTableVersion(context, HoodieTableVersion.SEVEN, metaClient); } - return tablePropsToAdd; + return Pair.of(tablePropsToAdd, tablePropsToRemove); } static void downgradePartitionFields(HoodieWriteConfig config, @@ -161,19 +167,24 @@ public class EightToSevenDowngradeHandler implements DowngradeHandler { tableConfig.getProps().remove(HoodieTableConfig.INITIAL_VERSION.key()); } - static void unsetRecordMergeMode(HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) { - Triple<RecordMergeMode, String, String> mergingConfigs = - HoodieTableConfig.inferCorrectMergingBehavior( - tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(), - tableConfig.getRecordMergeStrategyId(), tableConfig.getPreCombineField(), - tableConfig.getTableVersion()); - if (StringUtils.nonEmpty(mergingConfigs.getMiddle())) { - tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, mergingConfigs.getMiddle()); - } - if (StringUtils.nonEmpty(mergingConfigs.getRight())) { - tablePropsToAdd.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID, mergingConfigs.getRight()); + static List<ConfigProperty> unsetRecordMergeMode(HoodieWriteConfig config, HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) { + String payloadClass = tableConfig.getPayloadClass(); + if (StringUtils.isNullOrEmpty(payloadClass)) { + RecordMergeMode mergeMode = tableConfig.getRecordMergeMode(); + switch (mergeMode) { + case EVENT_TIME_ORDERING: + tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName()); + break; + case COMMIT_TIME_ORDERING: + tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, OverwriteWithLatestAvroPayload.class.getName()); + break; + case CUSTOM: + throw new HoodieUpgradeDowngradeException("Custom payload class must be available for downgrading custom merge mode"); + default: + throw new HoodieUpgradeDowngradeException("Downgrade is not handled for " + mergeMode); + } } - tableConfig.getProps().remove(HoodieTableConfig.RECORD_MERGE_MODE.key()); + return Collections.singletonList(HoodieTableConfig.RECORD_MERGE_MODE); } static void downgradeBootstrapIndexType(HoodieTableConfig tableConfig, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java index e51f5496c2d..57ed25b8051 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java @@ -21,15 +21,17 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import java.util.Collections; +import java.util.List; import java.util.Map; public class FiveToFourDowngradeHandler implements DowngradeHandler { @Override - public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { - return Collections.emptyMap(); + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + return Pair.of(Collections.emptyMap(), Collections.emptyList()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java index 86a594af17c..c4454c0164c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java @@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -33,12 +35,12 @@ import java.util.Map; public class FourToThreeDowngradeHandler implements DowngradeHandler { @Override - public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { if (config.isMetadataTableEnabled()) { // Metadata Table in version 4 has a schema that is not forward compatible. // Hence, it is safe to delete the metadata table, which will be re-initialized in subsequent commit. HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context); } - return Collections.emptyMap(); + return Pair.of(Collections.emptyMap(), Collections.emptyList()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java index 9e4a1fbd638..aaabd1ca2b8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkers; @@ -38,7 +39,7 @@ import java.util.stream.Collectors; public class OneToZeroDowngradeHandler implements DowngradeHandler { @Override - public Map<ConfigProperty, String> downgrade( + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade( HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { HoodieTable table = upgradeDowngradeHelper.getTable(config, context); @@ -50,6 +51,6 @@ public class OneToZeroDowngradeHandler implements DowngradeHandler { WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, inflightInstant.requestedTime()); writeMarkers.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); } - return Collections.emptyMap(); + return Pair.of(Collections.emptyMap(), Collections.emptyList()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java index b7d74ff7b74..4e0d7ec004c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.BootstrapIndexType; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; @@ -178,23 +179,76 @@ public class SevenToEightUpgradeHandler implements UpgradeHandler { } static void upgradeMergeMode(HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) { - if (tableConfig.getPayloadClass() != null - && tableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName())) { - if (HoodieTableType.COPY_ON_WRITE == tableConfig.getTableType()) { - tablePropsToAdd.put( - HoodieTableConfig.RECORD_MERGE_MODE, - RecordMergeMode.COMMIT_TIME_ORDERING.name()); - } else { + String payloadClass = tableConfig.getPayloadClass(); + String preCombineField = tableConfig.getPreCombineField(); + if (isCustomPayloadClass(payloadClass)) { + // This contains a special case: HoodieMetadataPayload. + tablePropsToAdd.put( + HoodieTableConfig.PAYLOAD_CLASS_NAME, + payloadClass); + tablePropsToAdd.put( + HoodieTableConfig.RECORD_MERGE_MODE, + RecordMergeMode.CUSTOM.name()); + tablePropsToAdd.put( + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID, + HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID); + } else if (tableConfig.getTableType() == HoodieTableType.COPY_ON_WRITE) { + setEventTimeOrCommitTimeBasedOnPayload(payloadClass, tablePropsToAdd); + } else { // MOR table + if (StringUtils.nonEmpty(preCombineField)) { + // This contains a special case: OverwriteWithLatestPayload with preCombine field. tablePropsToAdd.put( HoodieTableConfig.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName()); tablePropsToAdd.put( HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.EVENT_TIME_ORDERING.name()); + tablePropsToAdd.put( + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID, + HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID); + } else { + setEventTimeOrCommitTimeBasedOnPayload(payloadClass, tablePropsToAdd); } } } + private static void setEventTimeOrCommitTimeBasedOnPayload(String payloadClass, Map<ConfigProperty, String> tablePropsToAdd) { + // DefaultRecordPayload without preCombine Field. + // This is unlikely to happen. + if (useDefaultHoodieRecordPayload(payloadClass)) { + tablePropsToAdd.put( + HoodieTableConfig.PAYLOAD_CLASS_NAME, + DefaultHoodieRecordPayload.class.getName()); + tablePropsToAdd.put( + HoodieTableConfig.RECORD_MERGE_MODE, + RecordMergeMode.EVENT_TIME_ORDERING.name()); + tablePropsToAdd.put( + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID, + HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID); + } else { + tablePropsToAdd.put( + HoodieTableConfig.PAYLOAD_CLASS_NAME, + OverwriteWithLatestAvroPayload.class.getName()); + tablePropsToAdd.put( + HoodieTableConfig.RECORD_MERGE_MODE, + RecordMergeMode.COMMIT_TIME_ORDERING.name()); + tablePropsToAdd.put( + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID, + HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID); + } + } + + static boolean useDefaultHoodieRecordPayload(String payloadClass) { + return !StringUtils.isNullOrEmpty(payloadClass) + && payloadClass.equals(DefaultHoodieRecordPayload.class.getName()); + } + + static boolean isCustomPayloadClass(String payloadClass) { + return !StringUtils.isNullOrEmpty(payloadClass) + && !payloadClass.equals(DefaultHoodieRecordPayload.class.getName()) + && !payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()); + } + static void setInitialVersion(HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) { if (tableConfig.contains(HoodieTableConfig.VERSION)) { tablePropsToAdd.put(HoodieTableConfig.INITIAL_VERSION, String.valueOf(tableConfig.getTableVersion().versionCode())); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java index 7a5280a57a3..3aae6f68504 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java @@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -36,9 +38,9 @@ import java.util.Map; public class SevenToSixDowngradeHandler implements DowngradeHandler { @Override - public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { final HoodieTable table = upgradeDowngradeHelper.getTable(config, context); UpgradeDowngradeUtils.updateMetadataTableVersion(context, HoodieTableVersion.SIX, table.getMetaClient()); - return Collections.emptyMap(); + return Pair.of(Collections.emptyMap(), Collections.emptyList()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java index bfeaf00447b..1cbb5936a2b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java @@ -25,11 +25,14 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.table.HoodieTable; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS; @@ -47,7 +50,7 @@ import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PART public class SixToFiveDowngradeHandler implements DowngradeHandler { @Override - public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { final HoodieTable table = upgradeDowngradeHelper.getTable(config, context); // Since version 6 includes a new schema field for metadata table(MDT), the MDT needs to be deleted during downgrade to avoid column drop error. @@ -64,7 +67,7 @@ public class SixToFiveDowngradeHandler implements DowngradeHandler { .ifPresent(v -> updatedTableProps.put(TABLE_METADATA_PARTITIONS, v)); Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS_INFLIGHT)) .ifPresent(v -> updatedTableProps.put(TABLE_METADATA_PARTITIONS_INFLIGHT, v)); - return updatedTableProps; + return Pair.of(updatedTableProps, Collections.emptyList()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java index 4f209f05ffc..bc5baaa3302 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java @@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -33,13 +35,13 @@ import java.util.Map; public class ThreeToTwoDowngradeHandler implements DowngradeHandler { @Override - public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { if (config.isMetadataTableEnabled()) { // Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous // removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the // table has been updated and is not forward compatible. Hence, we need to delete the table. HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context); } - return Collections.emptyMap(); + return Pair.of(Collections.emptyMap(), Collections.emptyList()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java index 59193bfc9ce..05e0d1c31c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.storage.HoodieStorage; @@ -52,7 +53,7 @@ import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX; public class TwoToOneDowngradeHandler implements DowngradeHandler { @Override - public Map<ConfigProperty, String> downgrade( + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade( HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { HoodieTable table = upgradeDowngradeHelper.getTable(config, context); @@ -70,7 +71,7 @@ public class TwoToOneDowngradeHandler implements DowngradeHandler { throw new HoodieException("Converting marker files to DIRECT style failed during downgrade", e); } } - return Collections.emptyMap(); + return Pair.of(Collections.emptyMap(), Collections.emptyList()); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java index 2ffabc14128..124cdad0a4c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -40,7 +41,9 @@ import org.apache.hudi.table.HoodieTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Hashtable; +import java.util.List; import java.util.Map; /** @@ -151,13 +154,14 @@ public class UpgradeDowngrade { // Perform the actual upgrade/downgrade; this has to be idempotent, for now. LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion); - Map<ConfigProperty, String> tableProps = new Hashtable<>(); + Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>(); + List<ConfigProperty> tablePropsToRemove = new ArrayList<>(); boolean isDowngrade = false; if (fromVersion.versionCode() < toVersion.versionCode()) { // upgrade while (fromVersion.versionCode() < toVersion.versionCode()) { HoodieTableVersion nextVersion = HoodieTableVersion.fromVersionCode(fromVersion.versionCode() + 1); - tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime)); + tablePropsToAdd.putAll(upgrade(fromVersion, nextVersion, instantTime)); fromVersion = nextVersion; } } else { @@ -165,7 +169,9 @@ public class UpgradeDowngrade { isDowngrade = true; while (fromVersion.versionCode() > toVersion.versionCode()) { HoodieTableVersion prevVersion = HoodieTableVersion.fromVersionCode(fromVersion.versionCode() - 1); - tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime)); + Pair<Map<ConfigProperty, String>, List<ConfigProperty>> tablePropsToAddAndRemove = downgrade(fromVersion, prevVersion, instantTime); + tablePropsToAdd.putAll(tablePropsToAddAndRemove.getLeft()); + tablePropsToRemove.addAll(tablePropsToAddAndRemove.getRight()); fromVersion = prevVersion; } } @@ -174,8 +180,15 @@ public class UpgradeDowngrade { metaClient = HoodieTableMetaClient.reload(metaClient); } // Write out the current version in hoodie.properties.updated file - for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) { + for (Map.Entry<ConfigProperty, String> entry : tablePropsToAdd.entrySet()) { + // add alternate keys. metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue()); + entry.getKey().getAlternatives().forEach(alternateKey -> { + metaClient.getTableConfig().setValue((String)alternateKey, entry.getValue()); + }); + } + for (ConfigProperty configProperty : tablePropsToRemove) { + metaClient.getTableConfig().clearValue(configProperty); } // user could have disabled auto upgrade (probably to deploy the new binary only), // in which case, we should not update the table version @@ -234,7 +247,7 @@ public class UpgradeDowngrade { } } - protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + protected Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java index 300b00a4880..62f0e54f9ed 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java @@ -28,7 +28,9 @@ import org.apache.hudi.common.model.BootstrapIndexType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpgradeDowngradeException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -41,8 +43,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import java.io.File; @@ -67,6 +72,7 @@ import static org.apache.hudi.metadata.MetadataPartitionType.FILES; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -154,6 +160,7 @@ class TestEightToSevenDowngradeHandler { existingTableProps.put(RECORD_MERGE_MODE.key(), RecordMergeMode.EVENT_TIME_ORDERING.name()); existingTableProps.put(BOOTSTRAP_INDEX_TYPE.key(), BootstrapIndexType.HFILE.name()); existingTableProps.put(KEY_GENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name()); + when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING); when(tableConfig.getProps()).thenReturn(new TypedProperties(existingTableProps)); when(config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())).thenReturn("partition_field"); when(tableConfig.getPartitionFieldProp()).thenReturn("partition_field"); @@ -163,8 +170,8 @@ class TestEightToSevenDowngradeHandler { assertEquals("partition_field", tablePropsToAdd.get(PARTITION_FIELDS)); EightToSevenDowngradeHandler.unsetInitialVersion(tableConfig, tablePropsToAdd); assertFalse(tableConfig.getProps().containsKey(INITIAL_VERSION.key())); - EightToSevenDowngradeHandler.unsetRecordMergeMode(tableConfig, tablePropsToAdd); - assertFalse(tableConfig.getProps().containsKey(RECORD_MERGE_MODE.key())); + List<ConfigProperty> propsToRemove = EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig, tablePropsToAdd); + assertTrue(propsToRemove.contains(RECORD_MERGE_MODE)); assertTrue(tablePropsToAdd.containsKey(PAYLOAD_CLASS_NAME)); EightToSevenDowngradeHandler.downgradeBootstrapIndexType(tableConfig, tablePropsToAdd); assertFalse(tablePropsToAdd.containsKey(BOOTSTRAP_INDEX_TYPE)); @@ -173,4 +180,38 @@ class TestEightToSevenDowngradeHandler { assertFalse(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE)); assertFalse(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME)); } + + @ParameterizedTest + @CsvSource({ + "com.example.CustomPayload, CUSTOM, com.example.CustomPayload", + ", CUSTOM, ", + "org.apache.hudi.metadata.HoodieMetadataPayload, CUSTOM, org.apache.hudi.metadata.HoodieMetadataPayload", + "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, COMMIT_TIME_ORDERING, org.apache.hudi.common.model.OverwriteWithLatestAvroPayload", + "org.apache.hudi.common.model.DefaultHoodieRecordPayload, EVENT_TIME_ORDERING, org.apache.hudi.common.model.DefaultHoodieRecordPayload", + ", EVENT_TIME_ORDERING, org.apache.hudi.common.model.DefaultHoodieRecordPayload", + ", COMMIT_TIME_ORDERING, org.apache.hudi.common.model.OverwriteWithLatestAvroPayload" + }) + void testUnsetRecordMergeMode(String payloadClass, String recordMergeMode, String expectedPayloadClass) { + HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class); + Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>(); + + when(tableConfig.getPayloadClass()).thenReturn(payloadClass); + if (StringUtils.isNullOrEmpty(payloadClass)) { + when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.valueOf(recordMergeMode)); + } + + if (!StringUtils.isNullOrEmpty(recordMergeMode) && recordMergeMode.equals("CUSTOM") && StringUtils.isNullOrEmpty(payloadClass)) { + assertThrows(HoodieUpgradeDowngradeException.class, () -> EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig, tablePropsToAdd)); + } else { + List<ConfigProperty> propsToRemove = EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig, tablePropsToAdd); + assertTrue(propsToRemove.stream().anyMatch(cfg -> cfg.key().equals(RECORD_MERGE_MODE.key()))); + assertTrue(!tablePropsToAdd.containsKey(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)); + + if (!StringUtils.isNullOrEmpty(payloadClass)) { + assertFalse(tablePropsToAdd.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME)); + } else { + assertEquals(expectedPayloadClass, tablePropsToAdd.get(HoodieTableConfig.PAYLOAD_CLASS_NAME)); + } + } + } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java index d6d3cb4998c..bcc9d897620 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java @@ -22,18 +22,18 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.RecordMergeMode; -import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; -import org.apache.hudi.table.HoodieTable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import java.util.HashMap; @@ -56,14 +56,7 @@ import static org.mockito.Mockito.isA; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -public class TestSevenToEightUpgradeHandler { - - @Mock - private HoodieTable table; - @Mock - private HoodieTableMetaClient metaClient; - @Mock - private HoodieEngineContext context; +class TestSevenToEightUpgradeHandler { @Mock private HoodieWriteConfig config; @Mock @@ -122,4 +115,49 @@ public class TestSevenToEightUpgradeHandler { assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME)); assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE)); } + + @ParameterizedTest + @CsvSource({ + // hard coding all merge strategy Ids since older version of hudi has different variable name to represent the merge strategy id. + "com.example.CustomPayload, , CUSTOM, " + "00000000-0000-0000-0000-000000000000" + ", com.example.CustomPayload", + "com.example.CustomPayload, preCombineFieldValue, CUSTOM, " + "00000000-0000-0000-0000-000000000000" + ", com.example.CustomPayload", + "org.apache.hudi.metadata.HoodieMetadataPayload, , CUSTOM, " + "00000000-0000-0000-0000-000000000000" + ", org.apache.hudi.metadata.HoodieMetadataPayload", + "org.apache.hudi.metadata.HoodieMetadataPayload, preCombineFieldValue, CUSTOM, " + "00000000-0000-0000-0000-000000000000" + ", org.apache.hudi.metadata.HoodieMetadataPayload", + "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, , COMMIT_TIME_ORDERING, " + "ce9acb64-bde0-424c-9b91-f6ebba25356d" + + ", org.apache.hudi.common.model.OverwriteWithLatestAvroPayload", + "org.apache.hudi.common.model.DefaultHoodieRecordPayload, , EVENT_TIME_ORDERING, " + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + + ", org.apache.hudi.common.model.DefaultHoodieRecordPayload", + ", preCombineFieldValue, EVENT_TIME_ORDERING, " + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ", org.apache.hudi.common.model.DefaultHoodieRecordPayload", + "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, preCombineFieldValue, EVENT_TIME_ORDERING," + + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ", org.apache.hudi.common.model.DefaultHoodieRecordPayload", + ", preCombineFieldValue, EVENT_TIME_ORDERING, " + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ", org.apache.hudi.common.model.DefaultHoodieRecordPayload", + "org.apache.hudi.common.model.DefaultHoodieRecordPayload, preCombineFieldValue, EVENT_TIME_ORDERING," + + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ", org.apache.hudi.common.model.DefaultHoodieRecordPayload", + ", , COMMIT_TIME_ORDERING, " + "ce9acb64-bde0-424c-9b91-f6ebba25356d" + ", org.apache.hudi.common.model.OverwriteWithLatestAvroPayload" + }) + void testUpgradeMergeMode(String payloadClass, String preCombineField, String expectedMergeMode, String expectedStrategy, String expectedPayloadClass) { + HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class); + Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>(); + + when(tableConfig.getPayloadClass()).thenReturn(payloadClass); + when(tableConfig.getPreCombineField()).thenReturn(preCombineField); + + SevenToEightUpgradeHandler.upgradeMergeMode(tableConfig, tablePropsToAdd); + + assertEquals(expectedMergeMode, tablePropsToAdd.get(HoodieTableConfig.RECORD_MERGE_MODE)); + assertEquals(expectedStrategy, tablePropsToAdd.get(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)); + if (expectedPayloadClass != null) { + assertEquals(expectedPayloadClass, tablePropsToAdd.get(HoodieTableConfig.PAYLOAD_CLASS_NAME)); + } else { + assertTrue(!tablePropsToAdd.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME)); + } + } + + private static Map<ConfigProperty, String> createMap(Object... keyValues) { + Map<ConfigProperty, String> map = new HashMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + map.put((ConfigProperty) keyValues[i], (String) keyValues[i + 1]); + } + return map; + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 8f3da8cf31e..e815de43968 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -22,8 +22,8 @@ import org.apache.hudi.DataSourceOptionsHelper.allAlternatives import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties} import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE -import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} -import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload, WriteOperationType} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion} import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.config.HoodieWriteConfig.{RECORD_MERGE_MODE, SPARK_SQL_MERGE_INTO_PREPPED_KEY} import org.apache.hudi.exception.HoodieException @@ -172,14 +172,45 @@ object HoodieWriterUtils { || key.equals(RECORD_MERGE_MODE.key()) || key.equals(RECORD_MERGE_STRATEGY_ID.key()))) - //don't validate the payload only in the case that insert into is using fallback to some legacy configs - ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)) + ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && shouldIgnorePayloadValidation(value, params, tableConfig)) // If hoodie.database.name is empty, ignore validation. ignoreConfig = ignoreConfig || (key.equals(HoodieTableConfig.DATABASE_NAME.key()) && isNullOrEmpty(getStringFromTableConfigWithAlternatives(tableConfig, key))) - ignoreConfig } + def shouldIgnorePayloadValidation(value: String, params: Map[String, String], tableConfig: HoodieConfig): Boolean = { + //don't validate the payload only in the case that insert into is using fallback to some legacy configs + val ignoreConfig = value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME) + if (ignoreConfig) { + ignoreConfig + } else { + if (tableConfig == null) { + true + } else { + // In table version 8, if table Config payload refers to DefaultHoodieRecordPayload and if initial table version is 6, payload class config + // writer props are allowed to be OverwriteWithLatest + val tableVersion = if (tableConfig.contains(HoodieTableConfig.VERSION.key())) { + HoodieTableVersion.fromVersionCode(tableConfig.getInt(HoodieTableConfig.VERSION)) + } else { + HoodieTableVersion.current() + } + val initTableVersion = if (tableConfig.contains(HoodieTableConfig.INITIAL_VERSION.key())) { + HoodieTableVersion.fromVersionCode(tableConfig.getInt(HoodieTableConfig.INITIAL_VERSION)) + } else { + HoodieTableVersion.current() + } + + if (tableVersion == HoodieTableVersion.EIGHT && initTableVersion.lesserThan(HoodieTableVersion.EIGHT) + && value.equals(classOf[OverwriteWithLatestAvroPayload].getName) + && tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()).equals(classOf[DefaultHoodieRecordPayload].getName)) { + true + } else { + ignoreConfig + } + } + } + } + def validateTableConfig(spark: SparkSession, params: Map[String, String], tableConfig: HoodieConfig): Unit = { validateTableConfig(spark, params, tableConfig, false) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala index 3595850841e..32f08888ae0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala @@ -25,7 +25,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode, Typ import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload, TableServiceType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion} import org.apache.hudi.common.table.timeline.InstantComparison.{compareTimestamps, GREATER_THAN_OR_EQUALS} -import org.apache.hudi.common.util.Option +import org.apache.hudi.common.util.{Option, StringUtils} import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig} import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorType @@ -101,13 +101,20 @@ class TestSevenToEightUpgrade extends RecordLevelIndexTestBase { assertEquals(partitionFields, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get()) // After upgrade, based on the payload and table type, the merge mode is updated accordingly. - if (HoodieTableType.COPY_ON_WRITE == tableType) { + if (metaClient.getTableConfig.getTableType == HoodieTableType.COPY_ON_WRITE) { assertEquals(classOf[OverwriteWithLatestAvroPayload].getName, metaClient.getTableConfig.getPayloadClass) assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING.name, metaClient.getTableConfig.getRecordMergeMode.name) assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, metaClient.getTableConfig.getRecordMergeStrategyId) } else { - assertEquals(classOf[DefaultHoodieRecordPayload].getName, metaClient.getTableConfig.getPayloadClass) - assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name, metaClient.getTableConfig.getRecordMergeMode.name) + if (StringUtils.isNullOrEmpty(metaClient.getTableConfig.getPreCombineField)) { + assertEquals(classOf[OverwriteWithLatestAvroPayload].getName, metaClient.getTableConfig.getPayloadClass) + assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING.name, metaClient.getTableConfig.getRecordMergeMode.name) + assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, metaClient.getTableConfig.getRecordMergeStrategyId) + } else { + assertEquals(classOf[DefaultHoodieRecordPayload].getName, metaClient.getTableConfig.getPayloadClass) + assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name, metaClient.getTableConfig.getRecordMergeMode.name) + assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, metaClient.getTableConfig.getRecordMergeStrategyId) + } } }