This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fc41c22b9f36 [HUDI-9638] Infer table configs for new table creation in
v9 (#13615)
fc41c22b9f36 is described below
commit fc41c22b9f360d0e08860f20a37a5ebf520c200c
Author: Lin Liu <[email protected]>
AuthorDate: Thu Aug 14 16:22:35 2025 -0700
[HUDI-9638] Infer table configs for new table creation in v9 (#13615)
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: sivabalan <[email protected]>
---
.../hudi/utils/HoodieWriterClientTestHarness.java | 5 +-
.../hudi/common/engine/HoodieReaderContext.java | 7 +-
.../hudi/common/model/AWSDmsAvroPayload.java | 3 +-
.../hudi/common/model/HoodieRecordPayload.java | 29 ++-
.../hudi/common/table/HoodieTableConfig.java | 153 ++++++++++++-
.../hudi/common/table/HoodieTableMetaClient.java | 25 ++-
.../table/read/FileGroupReaderSchemaHandler.java | 24 ++-
.../common/table/read/PartialUpdateStrategy.java | 7 +-
.../org/apache/hudi/common/util/ConfigUtils.java | 4 +-
.../common/table/read/SchemaHandlerTestBase.java | 3 +-
.../apache/hudi/common/util/TestConfigUtils.java | 94 ++++----
.../java/org/apache/hudi/util/StreamerUtil.java | 2 +-
.../apache/hudi/utils/TestFlinkWriteClients.java | 4 +-
.../hudi/common/table/TestHoodieTableConfig.java | 236 ++++++++++++++++++++-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 3 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 37 +++-
.../org/apache/hudi/TestHoodieWriterUtils.java | 42 ++++
.../hudi/command/MergeIntoHoodieTableCommand.scala | 2 +-
.../apache/hudi/TestDecimalTypeDataWorkflow.scala | 1 -
.../hudi/functional/TestBufferedRecordMerger.java | 2 +-
.../hudi/functional/TestHoodieBackedMetadata.java | 22 +-
.../apache/hudi/functional/TestMORDataSource.scala | 6 +-
.../functional/TestPayloadDeprecationFlow.scala | 145 ++++++++++---
.../hudi/functional/TestRecordLevelIndex.scala | 2 +-
.../others/TestMergeModeCommitTimeOrdering.scala | 1 -
.../others/TestMergeModeEventTimeOrdering.scala | 1 -
.../sql/hudi/procedure/TestRepairsProcedure.scala | 1 -
.../hudi/utilities/streamer/HoodieStreamer.java | 2 +-
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 2 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 11 +-
30 files changed, 709 insertions(+), 167 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index 9268f811a325..e534e126efae 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -27,7 +27,6 @@ import
org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineType;
@@ -554,11 +553,9 @@ public abstract class HoodieWriterClientTestHarness
extends HoodieCommonTestHarn
.getReaderContextFactoryForWrite(metaClient,
HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps()).getContext();
List<String> orderingFieldNames = getOrderingFieldNames(
readerContext.getMergeMode(), writeClient.getConfig().getProps(),
metaClient);
- RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null,
writeConfig.getPayloadClass(), null,
- String.join(",", orderingFieldNames),
metaClient.getTableConfig().getTableVersion()).getLeft();
BufferedRecordMerger<HoodieRecord> recordMerger =
BufferedRecordMergerFactory.create(
readerContext,
- recordMergeMode,
+ metaClient.getTableConfig().getRecordMergeMode(),
false,
Option.ofNullable(writeClient.getConfig().getRecordMerger()),
orderingFieldNames,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index c498b836c396..5c57275496ac 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -55,6 +55,7 @@ import java.util.List;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
/**
* An abstract reader context class for {@code HoodieFileGroupReader} to use,
containing APIs for
@@ -270,12 +271,12 @@ public abstract class HoodieReaderContext<T> {
HoodieTableVersion tableVersion = tableConfig.getTableVersion();
// If the provided payload class differs from the table's payload class,
we need to infer the correct merging behavior.
if (isIngestion && providedPayloadClass.map(className ->
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
- Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(null, providedPayloadClass.get(),
null,
+ Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferMergingConfigsForWrites(null,
providedPayloadClass.get(), null,
tableConfig.getPreCombineFieldsStr().orElse(null), tableVersion);
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
- } else if (!tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
- Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(
+ } else if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+ Triple<RecordMergeMode, String, String> triple =
inferMergingConfigsForPreV9Table(
recordMergeMode, tableConfig.getPayloadClass(),
mergeStrategyId, tableConfig.getPreCombineFieldsStr().orElse(null),
tableVersion);
recordMergeMode = triple.getLeft();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
index e58e2c5ec176..0bd06b20404a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
@@ -45,6 +45,7 @@ import java.util.Properties;
public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
public static final String OP_FIELD = "Op";
+ public static final String DELETE_OPERATION_VALUE = "D";
public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
@@ -116,6 +117,6 @@ public class AWSDmsAvroPayload extends
OverwriteWithLatestAvroPayload {
}
private static boolean isDMSDeleteRecord(GenericRecord record) {
- return record.get(OP_FIELD) != null &&
record.get(OP_FIELD).toString().equalsIgnoreCase("D");
+ return record.get(OP_FIELD) != null &&
record.get(OP_FIELD).toString().equalsIgnoreCase(DELETE_OPERATION_VALUE);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 4cc2a493df17..bd11d3a0d3fb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -23,7 +23,7 @@ import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -35,7 +35,10 @@ import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
/**
* Every Hoodie table has an implementation of the
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on
record specific logic.
@@ -182,16 +185,34 @@ public interface HoodieRecordPayload<T extends
HoodieRecordPayload> extends Seri
}
static String getPayloadClassName(Properties props) {
- return
getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME);
+ Option<String> payloadOpt = getPayloadClassNameIfPresent(props);
+ if (payloadOpt.isPresent()) {
+ return payloadOpt.get();
+ }
+ // Note: starting from version 9, payload class is not necessary set, but
+ // merge mode must exist. Therefore, we use merge mode to infer
+ // the payload class for certain corner cases, like for MIT command.
+
+ if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE)
+ && ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE, "")
+ .equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) {
+ return OverwriteWithLatestAvroPayload.class.getName();
+ }
+ return DEFAULT_PAYLOAD_CLASS_NAME;
}
+ // NOTE: PAYLOAD_CLASS_NAME is before LEGACY_PAYLOAD_CLASS_NAME to make sure
+ // some temporary payload class setting is respect.
static Option<String> getPayloadClassNameIfPresent(Properties props) {
String payloadClassName = null;
- if (props.containsKey(PAYLOAD_CLASS_NAME.key())) {
- payloadClassName = props.getProperty(PAYLOAD_CLASS_NAME.key());
+ if (ConfigUtils.containsConfigProperty(props, PAYLOAD_CLASS_NAME)) {
+ payloadClassName = ConfigUtils.getStringWithAltKeys(props,
PAYLOAD_CLASS_NAME);
+ } else if (props.containsKey(LEGACY_PAYLOAD_CLASS_NAME.key())) {
+ payloadClassName = ConfigUtils.getStringWithAltKeys(props,
LEGACY_PAYLOAD_CLASS_NAME);
} else if (props.containsKey("hoodie.datasource.write.payload.class")) {
payloadClassName =
props.getProperty("hoodie.datasource.write.payload.class");
}
+
// There could be tables written with payload class from com.uber.hoodie.
// Need to transparently change to org.apache.hudi.
return Option.ofNullable(payloadClassName).map(className ->
className.replace("com.uber.hoodie", "org.apache.hudi"));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 450f7df7eb3d..65a9b591fb39 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.OrderedProperties;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.BootstrapIndexType;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
@@ -37,7 +38,11 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -71,6 +76,7 @@ import java.lang.reflect.Modifier;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -94,7 +100,12 @@ import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAM
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
+import static
org.apache.hudi.common.model.AWSDmsAvroPayload.DELETE_OPERATION_VALUE;
+import static org.apache.hudi.common.model.AWSDmsAvroPayload.OP_FIELD;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.util.ConfigUtils.fetchConfigs;
@@ -121,10 +132,34 @@ public class HoodieTableConfig extends HoodieConfig {
public static final String PARTIAL_UPDATE_CUSTOM_MARKER =
"hoodie.write.partial.update.custom.marker";
public static final String DEBEZIUM_UNAVAILABLE_VALUE =
"__debezium_unavailable_value";
// This prefix is used to set merging related properties.
- // A reader might need to read some merger properties to function as
expected,
- // and Hudi stores properties with this prefix so the reader parses these
properties,
- // and produces a map of key value pairs (Key1->Value1, Key2->Value2, ...)
to use.
- public static final String MERGE_PROPERTIES_PREFIX =
"hoodie.table.merge.properties.";
+ // A reader might need to read some writer properties to function as
expected,
+ // and Hudi stores properties with this prefix so the reader parses these
properties to fetch any custom property.
+ public static final String RECORD_MERGE_PROPERTY_PREFIX =
"hoodie.record.merge.property.";
+ public static final Set<String> PAYLOADS_UNDER_DEPRECATION =
Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(
+ AWSDmsAvroPayload.class.getName(),
+ DefaultHoodieRecordPayload.class.getName(),
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+ OverwriteWithLatestAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName())));
+
+ public static final Set<String> EVENT_TIME_ORDERING_PAYLOADS =
Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(
+ DefaultHoodieRecordPayload.class.getName(),
+ EventTimeAvroPayload.class.getName(),
+ MySqlDebeziumAvroPayload.class.getName(),
+ PartialUpdateAvroPayload.class.getName(),
+ PostgresDebeziumAvroPayload.class.getName())));
+
+ public static final Set<String> BUILTIN_MERGE_STRATEGIES =
Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(
+ COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
+ CUSTOM_MERGE_STRATEGY_UUID,
+ EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
+ PAYLOAD_BASED_MERGE_STRATEGY_UUID)));
public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
.key("hoodie.database.name")
@@ -230,6 +265,12 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Payload class to use for performing merges,
compactions, i.e merge delta logs with current base file and then "
+ " produce a new base file.");
+ public static final ConfigProperty<String> LEGACY_PAYLOAD_CLASS_NAME =
ConfigProperty
+ .key("hoodie.table.legacy.payload.class")
+ .noDefaultValue()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Payload class to indicate the payload class that is
used to create the table and is not used anymore.");
+
// This is the default payload class used by Hudi 0.x releases (table
version 6 and below)
public static final String DEFAULT_PAYLOAD_CLASS_NAME =
DefaultHoodieRecordPayload.class.getName();
@@ -801,19 +842,109 @@ public class HoodieTableConfig extends HoodieConfig {
return HoodieRecordPayload.getPayloadClassName(this);
}
+ public String getLegacyPayloadClass() {
+ return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+ }
+
public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * which is based on the logic of table version < 9, and then tuned for
version 9 logic.
+ * This approach fits the same behavior of upgrade from 8 to 9.
+ */
+ static Map<String, String>
inferMergingConfigsForV9TableCreation(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+ String
orderingFieldName,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> reconciledConfigs = new HashMap<>();
+ if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
+ throw new HoodieIOException("Unsupported flow for table versions less
than 9");
+ }
+
+ // Step 1: Infer merging configs based on input information.
+ // This step is important since it provides the same configs before we do
table upgrade.
+ // Then additional logic for table version 9 could be verified.
+ Triple<RecordMergeMode, String, String> inferredConfigs =
inferMergingConfigsForPreV9Table(
+ recordMergeMode, payloadClassName, recordMergeStrategyId,
orderingFieldName, tableVersion);
+ recordMergeMode = inferredConfigs.getLeft();
+ recordMergeStrategyId = inferredConfigs.getRight();
+
+ // Step 2: Handle Version 9 specific logic.
+ // CASE 0: For tables with special merger properties, e.g., with
non-builtin mergers.
+ // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
+ // NOTE: Payload class should NOT be set for these cases.
+ if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+ || StringUtils.isNullOrEmpty(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
recordMergeStrategyId);
+ } else {
+ // For tables using payload classes.
+ // CASE 2: Custom payload class. We set these properties explicitly.
+ if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+ reconciledConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else { // CASE 3: Payload classes are under deprecation.
+ // Standard merging configs.
+ // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of
PAYLOAD_CLASS_NAME here.
+ if (EVENT_TIME_ORDERING_PAYLOADS.contains(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(),
EVENT_TIME_ORDERING.name());
+ reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(),
payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(),
COMMIT_TIME_ORDERING.name());
+ reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(),
payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ }
+ // Partial update mode config.
+ // Certain payloads are migrated to non payload way from 1.1 Hudi
binary.
+ // Hence we need to set the right value for partial update mode for
some of the cases.
+ if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+ ||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
+ } else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_MARKERS.name());
+ }
+ // Additional custom merge properties.
+ // Cretain payloads are migrated to non payload way from 1.1 Hudi
binary and the reader might need certain properties for the
+ // merge to function as expected. Handing such special cases here.
+ if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+ } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName()))
{
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY,
OP_FIELD);
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER,
DELETE_OPERATION_VALUE);
+ }
+ }
+ }
+ return reconciledConfigs;
+ }
+
+ /**
+ * To be invoked for table creation flows or writer flows.
+ * @return the merging configs to use.
+ */
+ public static Triple<RecordMergeMode, String, String>
inferMergingConfigsForWrites(RecordMergeMode recordMergeMode,
+
String payloadClassName,
+
String recordMergeStrategyId,
+
String orderingFieldNamesAsString,
+
HoodieTableVersion tableVersion) {
+ return inferMergingConfigsForPreV9Table(recordMergeMode, payloadClassName,
recordMergeStrategyId, orderingFieldNamesAsString, tableVersion);
+ }
+
/**
* Infers the merging behavior based on what the user sets (or doesn't set).
- * Validates that the user has not set an illegal combination of configs
+ * Validates that the user has not set an illegal combination of configs.
+ * This function infers basic merging properties used by table version <= 8.
*/
- public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode,
-
String payloadClassName,
-
String recordMergeStrategyId,
-
String orderingFieldNamesAsString,
-
HoodieTableVersion tableVersion) {
+ public static Triple<RecordMergeMode, String, String>
inferMergingConfigsForPreV9Table(RecordMergeMode recordMergeMode,
+
String payloadClassName,
+
String recordMergeStrategyId,
+
String orderingFieldNamesAsString,
+
HoodieTableVersion tableVersion) {
RecordMergeMode inferredRecordMergeMode;
String inferredPayloadClassName;
String inferredRecordMergeStrategyId;
@@ -1204,7 +1335,7 @@ public class HoodieTableConfig extends HoodieConfig {
}
public Map<String, String> getTableMergeProperties() {
- return ConfigUtils.extractWithPrefix(this.props, MERGE_PROPERTIES_PREFIX);
+ return ConfigUtils.extractWithPrefix(this.props,
RECORD_MERGE_PROPERTY_PREFIX);
}
public Map<String, String> propsMap() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 1b15c401103e..a96f3ad89405 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -90,6 +90,8 @@ import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_PATH;
import static org.apache.hudi.common.table.HoodieTableConfig.VERSION;
+import static
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
+import static
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForV9TableCreation;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -1480,13 +1482,22 @@ public class HoodieTableMetaClient implements
Serializable {
tableConfig.setTableVersion(tableVersion);
tableConfig.setInitialVersion(tableVersion);
- Triple<RecordMergeMode, String, String> mergeConfigs =
- HoodieTableConfig.inferCorrectMergingBehavior(
- recordMergeMode, payloadClassName, recordMergerStrategyId,
preCombineFields,
- tableVersion);
- tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
- tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), mergeConfigs.getMiddle());
- tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, mergeConfigs.getRight());
+ // For table version <= 8
+ if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
+ Triple<RecordMergeMode, String, String> mergeConfigs =
+ inferMergingConfigsForPreV9Table(
+ recordMergeMode, payloadClassName, recordMergerStrategyId,
preCombineFields,
+ tableVersion);
+ tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
+ tableConfig.setValue(PAYLOAD_CLASS_NAME.key(),
mergeConfigs.getMiddle());
+ tableConfig.setValue(RECORD_MERGE_STRATEGY_ID,
mergeConfigs.getRight());
+ } else { // For table version >= 9
+ Map<String, String> mergeConfigs =
inferMergingConfigsForV9TableCreation(
+ recordMergeMode, payloadClassName, recordMergerStrategyId,
preCombineFields, tableVersion);
+ for (Map.Entry<String, String> config : mergeConfigs.entrySet()) {
+ tableConfig.setValue(config.getKey(), config.getValue());
+ }
+ }
if (null != tableCreateSchema) {
tableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA,
tableCreateSchema);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 2404eb91fd0e..0414d9857a98 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
@@ -57,6 +58,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
import static
org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference;
import static org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
+import static
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
/**
* This class is responsible for handling the schema for the file group reader.
@@ -206,14 +208,18 @@ public class FileGroupReaderSchemaHandler<T> {
boolean
hasBuiltInDelete,
Option<Pair<String,
String>> customDeleteMarkerKeyAndValue,
boolean
hasInstantRange) {
- Triple<RecordMergeMode, String, String> mergingConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(
- cfg.getRecordMergeMode(),
- cfg.getPayloadClass(),
- cfg.getRecordMergeStrategyId(),
- cfg.getPreCombineFieldsStr().orElse(null),
- cfg.getTableVersion());
-
- if (mergingConfigs.getLeft() == RecordMergeMode.CUSTOM) {
+ RecordMergeMode mergeMode = cfg.getRecordMergeMode();
+ if (cfg.getTableVersion().lesserThan(HoodieTableVersion.NINE)) {
+ Triple<RecordMergeMode, String, String> mergingConfigs =
inferMergingConfigsForPreV9Table(
+ cfg.getRecordMergeMode(),
+ cfg.getPayloadClass(),
+ cfg.getRecordMergeStrategyId(),
+ cfg.getPreCombineFieldsStr().orElse(null),
+ cfg.getTableVersion());
+ mergeMode = mergingConfigs.getLeft();
+ }
+
+ if (mergeMode == RecordMergeMode.CUSTOM) {
return recordMerger.get().getMandatoryFieldsForMerging(tableSchema, cfg,
props);
}
@@ -234,7 +240,7 @@ public class FileGroupReaderSchemaHandler<T> {
}
}
// Add precombine field for event time ordering merge mode.
- if (mergingConfigs.getLeft() == RecordMergeMode.EVENT_TIME_ORDERING) {
+ if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
List<String> preCombineFields = cfg.getPreCombineFields();
requiredFields.addAll(preCombineFields);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
index 0c3d5c0a8a51..3a925cba9c58 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
@@ -25,14 +25,15 @@ import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.avro.Schema;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDefaultValue;
import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
-import static
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
import static org.apache.hudi.common.util.ConfigUtils.extractWithPrefix;
/**
@@ -41,7 +42,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.extractWithPrefix;
* {@link BufferedRecordMergerFactory.CommitTimePartialRecordMerger} and
* {@link BufferedRecordMergerFactory.EventTimePartialRecordMerger}.
*/
-public class PartialUpdateStrategy<T> {
+public class PartialUpdateStrategy<T> implements Serializable {
private final RecordContext<T> recordContext;
private final PartialUpdateMode partialUpdateMode;
private final Map<String, String> mergeProperties;
@@ -178,6 +179,6 @@ public class PartialUpdateStrategy<T> {
}
static Map<String, String> parseMergeProperties(TypedProperties props) {
- return extractWithPrefix(props, MERGE_PROPERTIES_PREFIX);
+ return extractWithPrefix(props, RECORD_MERGE_PROPERTY_PREFIX);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 937748677b45..70cf9cb376b2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -253,11 +253,11 @@ public class ConfigUtils {
* Whether the properties contain a config. If any of the key or alternative
keys of the
* {@link ConfigProperty} exists in the properties, this method returns
{@code true}.
*
- * @param props Configs in {@link TypedProperties}
+ * @param props Configs in {@link Properties}
* @param configProperty Config to look up.
* @return {@code true} if exists; {@code false} otherwise.
*/
- public static boolean containsConfigProperty(TypedProperties props,
+ public static boolean containsConfigProperty(Properties props,
ConfigProperty<?>
configProperty) {
if (!props.containsKey(configProperty.key())) {
for (String alternative : configProperty.getAlternatives()) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
index bed30d3a2538..77d85dc0f5b3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
@@ -257,6 +257,7 @@ public abstract class SchemaHandlerTestBase {
private static void setupMORTable(RecordMergeMode mergeMode, boolean
hasPrecombine, HoodieTableConfig hoodieTableConfig) {
when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+
when(hoodieTableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
if (hasPrecombine) {
when(hoodieTableConfig.getPreCombineFieldsStr()).thenReturn(Option.of("timestamp"));
when(hoodieTableConfig.getPreCombineFields()).thenReturn(Collections.singletonList("timestamp"));
@@ -266,8 +267,6 @@ public abstract class SchemaHandlerTestBase {
}
if (mergeMode == CUSTOM) {
when(hoodieTableConfig.getRecordMergeStrategyId()).thenReturn("asdf");
- // NOTE: in this test custom doesn't have any meta cols because it is
more interesting of a test case
-
when(hoodieTableConfig.getTableVersion()).thenReturn(HoodieTableVersion.EIGHT);
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index a287092c88c3..9075228c7802 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -38,7 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
-import static
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -203,8 +203,8 @@ public class TestConfigUtils {
@Test
void testParseValidProperties() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + "Ki", "Vi");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "Ki", "Vi");
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(1, result.size());
assertEquals("Vi", result.get("Ki"));
}
@@ -212,18 +212,18 @@ public class TestConfigUtils {
@Test
void testMissingKeyReturnsEmptyMap() {
TypedProperties props = new TypedProperties(); // no property set
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertTrue(result.isEmpty());
}
@Test
void testMultipleValidProperties() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + "key1", "value1");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "key2", "value2");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "key3", "value3");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key1", "value1");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key2", "value2");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key3", "value3");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(3, result.size());
assertEquals("value1", result.get("key1"));
assertEquals("value2", result.get("key2"));
@@ -233,11 +233,11 @@ public class TestConfigUtils {
@Test
void testPropertiesWithDifferentPrefixes() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + "mergeKey", "mergeValue");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "mergeKey", "mergeValue");
props.setProperty("other.prefix.key", "otherValue");
- props.setProperty("hoodie.merge.custom.property.prefix",
"directPrefixValue");
+ props.setProperty("hoodie.merge.custom.property", "directPrefixValue");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(1, result.size());
assertEquals("mergeValue", result.get("mergeKey"));
}
@@ -245,9 +245,9 @@ public class TestConfigUtils {
@Test
void testPropertiesWithEmptyValues() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + "emptyKey", "");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "emptyKey", "");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(1, result.size());
assertEquals("", result.get("emptyKey"));
}
@@ -255,11 +255,11 @@ public class TestConfigUtils {
@Test
void testPropertiesWithSpecialCharacters() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + "key.with.dots",
"value.with.dots");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "key_with_underscores",
"value_with_underscores");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "key-with-dashes",
"value-with-dashes");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key.with.dots",
"value.with.dots");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key_with_underscores",
"value_with_underscores");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key-with-dashes",
"value-with-dashes");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(3, result.size());
assertEquals("value.with.dots", result.get("key.with.dots"));
assertEquals("value_with_underscores", result.get("key_with_underscores"));
@@ -269,11 +269,11 @@ public class TestConfigUtils {
@Test
void testPropertiesWithNumericValues() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + "intKey", "123");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "doubleKey", "123.45");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "booleanKey", "true");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "intKey", "123");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "doubleKey", "123.45");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "booleanKey", "true");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(3, result.size());
assertEquals("123", result.get("intKey"));
assertEquals("123.45", result.get("doubleKey"));
@@ -283,9 +283,9 @@ public class TestConfigUtils {
@Test
void testPropertiesWithWhitespace() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + " spacedKey ", "
spacedValue ");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + " spacedKey ", "
spacedValue ");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(1, result.size());
assertEquals("spacedValue", result.get("spacedKey")); // Values should be
trimmed
}
@@ -293,10 +293,10 @@ public class TestConfigUtils {
@Test
void testPropertiesWithWhitespaceInKeysAndValues() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + " keyWithSpaces ", "
valueWithSpaces ");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "keyWithoutSpaces",
"valueWithoutSpaces");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + " keyWithSpaces ", "
valueWithSpaces ");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "keyWithoutSpaces",
"valueWithoutSpaces");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(2, result.size());
assertEquals("valueWithSpaces", result.get("keyWithSpaces")); // Both key
and value should be trimmed
assertEquals("valueWithoutSpaces", result.get("keyWithoutSpaces"));
@@ -305,28 +305,28 @@ public class TestConfigUtils {
@Test
void testPropertiesWithExactPrefixMatch() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX, "exactPrefixValue");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX, "exactPrefixValue");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(0, result.size()); // Exact prefix match should not be
included as it has no suffix
}
@Test
void testPropertiesWithPrefixFollowedByDot() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX, "valueAfterDot");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX, "valueAfterDot");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(0, result.size()); // Empty key after trimming should be
filtered out
}
@Test
void testPropertiesWithWhitespaceOnlyKeys() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + " ",
"valueForWhitespaceKey");
- props.setProperty(MERGE_PROPERTIES_PREFIX + " \t \n ",
"valueForTabNewlineKey");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + " ",
"valueForWhitespaceKey");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + " \t \n ",
"valueForTabNewlineKey");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(0, result.size());
}
@@ -334,21 +334,21 @@ public class TestConfigUtils {
void testPropertiesWithNullKeys() {
TypedProperties props = new TypedProperties();
// Note: TypedProperties doesn't allow null keys, but we test the edge case
- props.setProperty(MERGE_PROPERTIES_PREFIX, "valueForNullKey");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX, "valueForNullKey");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(0, result.size()); // Empty key should be filtered out
}
@Test
void testPropertiesWithMixedValidAndInvalidKeys() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + "validKey", "validValue");
- props.setProperty(MERGE_PROPERTIES_PREFIX + " ", "invalidValue1");
- props.setProperty(MERGE_PROPERTIES_PREFIX, "invalidValue2");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "anotherValidKey",
"anotherValidValue");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "validKey", "validValue");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + " ", "invalidValue1");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX, "invalidValue2");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "anotherValidKey",
"anotherValidValue");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(2, result.size()); // Only valid keys should be included
assertEquals("validValue", result.get("validKey"));
assertEquals("anotherValidValue", result.get("anotherValidKey"));
@@ -357,10 +357,10 @@ public class TestConfigUtils {
@Test
void testPropertiesWithCaseSensitivity() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + "Key1", "Value1");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "key1", "value1");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "Key1", "Value1");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "key1", "value1");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(2, result.size());
assertEquals("Value1", result.get("Key1"));
assertEquals("value1", result.get("key1"));
@@ -369,10 +369,10 @@ public class TestConfigUtils {
@Test
void testPropertiesWithLeadingAndTrailingWhitespace() {
TypedProperties props = new TypedProperties();
- props.setProperty(MERGE_PROPERTIES_PREFIX + " leadingSpaceKey",
"trailingSpaceValue ");
- props.setProperty(MERGE_PROPERTIES_PREFIX + "trailingSpaceKey ", "
leadingSpaceValue");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + " leadingSpaceKey",
"trailingSpaceValue ");
+ props.setProperty(RECORD_MERGE_PROPERTY_PREFIX + "trailingSpaceKey ", "
leadingSpaceValue");
- Map<String, String> result = ConfigUtils.extractWithPrefix(props,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(props,
RECORD_MERGE_PROPERTY_PREFIX);
assertEquals(2, result.size());
assertEquals("trailingSpaceValue", result.get("leadingSpaceKey")); //
Trimmed
assertEquals("leadingSpaceValue", result.get("trailingSpaceKey")); //
Trimmed
@@ -380,7 +380,7 @@ public class TestConfigUtils {
@Test
void testNullProperties() {
- Map<String, String> result = ConfigUtils.extractWithPrefix(null,
MERGE_PROPERTIES_PREFIX);
+ Map<String, String> result = ConfigUtils.extractWithPrefix(null,
RECORD_MERGE_PROPERTY_PREFIX);
assertTrue(result.isEmpty());
}
}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 0369d0e16803..3a3015e1f2a4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -403,7 +403,7 @@ public class StreamerUtil {
* @return The correct merging behaviour: <merge_mode, payload_class,
merge_strategy_id>
*/
public static Triple<RecordMergeMode, String, String>
inferMergingBehavior(Configuration conf) {
- return HoodieTableConfig.inferCorrectMergingBehavior(
+ return HoodieTableConfig.inferMergingConfigsForWrites(
getMergeMode(conf), getPayloadClass(conf), getMergeStrategyId(conf),
OptionsResolver.getPreCombineField(conf), HoodieTableVersion.EIGHT);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 7d8b39bd5942..4ef8480ad5fc 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -147,8 +147,8 @@ public class TestFlinkWriteClients {
HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);
HoodieTableConfig tableConfig = metaClient.getTableConfig();
- assertThat(tableConfig.getRecordMergeMode(), is(RecordMergeMode.CUSTOM));
- assertThat(tableConfig.getRecordMergeStrategyId(),
is(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID));
+ assertThat(tableConfig.getRecordMergeMode(),
is(RecordMergeMode.EVENT_TIME_ORDERING));
+ assertThat(tableConfig.getRecordMergeStrategyId(),
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
assertThat(tableConfig.getPayloadClass(),
is(PartialUpdateAvroPayload.class.getName()));
HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 8b37c89a3a6c..cfe9d2835ea7 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -21,8 +21,14 @@ package org.apache.hudi.common.table;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
@@ -61,12 +67,19 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM;
import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.getRecordMergeStrategyId;
+import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
+import static
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
import static
org.apache.hudi.common.table.HoodieTableConfig.inferRecordMergeModeFromPayloadClass;
import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -298,7 +311,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
@Test
void testDefinedTableConfigs() {
List<ConfigProperty<?>> configProperties =
HoodieTableConfig.definedTableConfigs();
- assertEquals(40, configProperties.size());
+ assertEquals(41, configProperties.size());
configProperties.forEach(c -> {
assertNotNull(c);
assertFalse(c.doc().isEmpty());
@@ -318,8 +331,8 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
Properties props = new Properties();
props.setProperty(HoodieTableConfig.NAME.key(), "test-table");
// no merge props
- props.setProperty(HoodieTableConfig.MERGE_PROPERTIES_PREFIX + "key1",
"value1");
- props.setProperty(HoodieTableConfig.MERGE_PROPERTIES_PREFIX + "key2",
"value2");
+ props.setProperty(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + "key1",
"value1");
+ props.setProperty(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + "key2",
"value2");
// add some random property which does not match the prefix.
props.setProperty("key3", "value3");
@@ -331,7 +344,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
assertEquals(expectedProps, config.getTableMergeProperties());
}
- private static Stream<Arguments> argumentsForInferringRecordMergeMode() {
+ private static Stream<Arguments> testInferMergingConfigsForPreV9Table() {
String defaultPayload = DefaultHoodieRecordPayload.class.getName();
String overwritePayload = OverwriteWithLatestAvroPayload.class.getName();
String customPayload = "custom_payload";
@@ -475,11 +488,11 @@ class TestHoodieTableConfig extends
HoodieCommonTestHarness {
}
@ParameterizedTest
- @MethodSource("argumentsForInferringRecordMergeMode")
- void testInferMergeMode(RecordMergeMode inputMergeMode, String
inputPayloadClass,
- String inputMergeStrategy, String
orderingFieldName,
- String shouldThrowString, RecordMergeMode
outputMergeMode,
- String outputPayloadClass, String
outputMergeStrategy) throws IOException {
+ @MethodSource
+ void testInferMergingConfigsForPreV9Table(RecordMergeMode inputMergeMode,
String inputPayloadClass,
+ String inputMergeStrategy, String
orderingFieldName,
+ String shouldThrowString,
RecordMergeMode outputMergeMode,
+ String outputPayloadClass, String
outputMergeStrategy) throws IOException {
Arrays.stream(new HoodieTableVersion[] {HoodieTableVersion.EIGHT,
HoodieTableVersion.SIX})
.forEach(tableVersion -> {
boolean shouldThrow = "eight-only".equals(shouldThrowString)
@@ -496,12 +509,12 @@ class TestHoodieTableConfig extends
HoodieCommonTestHarness {
}
if (shouldThrow) {
assertThrows(IllegalArgumentException.class,
- () -> HoodieTableConfig.inferCorrectMergingBehavior(
+ () -> inferMergingConfigsForPreV9Table(
inputMergeMode, inputPayloadClass, inputMergeStrategy,
orderingFieldName,
tableVersion));
} else {
Triple<RecordMergeMode, String, String> inferredConfigs =
- HoodieTableConfig.inferCorrectMergingBehavior(
+ inferMergingConfigsForPreV9Table(
inputMergeMode, inputPayloadClass, inputMergeStrategy,
orderingFieldName,
tableVersion);
assertEquals(expectedMergeMode, inferredConfigs.getLeft());
@@ -510,4 +523,205 @@ class TestHoodieTableConfig extends
HoodieCommonTestHarness {
}
});
}
+
+ private static Stream<Arguments> testInferMergingConfigsForV9TableCreation()
{
+ return Stream.of(
+ // Test case: Non-version 9 table should return empty configs
+ arguments(
+ "Non-version 9 table", EVENT_TIME_ORDERING, null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.EIGHT,
+ 0, null, null, null, null, null, null, null, null, null),
+
+ // Test case: Version 9 table with null payload class and event time
ordering
+ arguments("Version 9 with event time ordering", EVENT_TIME_ORDERING,
null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.NINE,
+ 2, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+ // Test case: Version 9 table with null payload class and commit time
ordering
+ arguments("Version 9 with commit time ordering", COMMIT_TIME_ORDERING,
null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
+ 2, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+ // Test case: Version 9 table with null payload class and custom merge
mode
+ arguments("Version 9 with custom merge mode", CUSTOM, null,
CUSTOM_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
+ 2, CUSTOM.name(), null, CUSTOM_MERGE_STRATEGY_UUID, null, null,
null, null, null),
+
+ // Test case: Version 9 table with custom payload class (not under
deprecation)
+ arguments("Version 9 with custom payload", null,
"com.example.CustomPayload", null, null, HoodieTableVersion.NINE,
+ 3, CUSTOM.name(), "com.example.CustomPayload",
PAYLOAD_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+ // Test case: Version 9 table with event time based payload
(DefaultHoodieRecordPayload)
+ arguments("Version 9 with DefaultHoodieRecordPayload", null,
DefaultHoodieRecordPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+ 3, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
DefaultHoodieRecordPayload.class.getName(), null, null, null, null),
+
+ // Test case: Version 9 table with commit time based payload
(OverwriteWithLatestAvroPayload)
+ arguments("Version 9 with OverwriteWithLatestAvroPayload", null,
OverwriteWithLatestAvroPayload.class.getName(), null, null,
HoodieTableVersion.NINE,
+ 3, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
OverwriteWithLatestAvroPayload.class.getName(), null, null, null, null),
+
+ // Test case: Version 9 table with PartialUpdateAvroPayload (should
set partial update mode)
+ arguments("Version 9 with PartialUpdateAvroPayload", null,
PartialUpdateAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+ 4, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, PartialUpdateAvroPayload.class.getName(),
PartialUpdateMode.IGNORE_DEFAULTS.name(),
+ null, null, null),
+
+ // Test case: Version 9 table with
OverwriteNonDefaultsWithLatestAvroPayload (should set partial update mode)
+ arguments("Version 9 with OverwriteNonDefaultsWithLatestAvroPayload",
null, OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), null, null,
HoodieTableVersion.NINE,
+ 4, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+ PartialUpdateMode.IGNORE_DEFAULTS.name(), null, null, null),
+
+ // Test case: Version 9 table with PostgresDebeziumAvroPayload (should
set partial update mode and custom properties)
+ arguments("Version 9 with PostgresDebeziumAvroPayload", null,
PostgresDebeziumAvroPayload.class.getName(), null, "ts",
HoodieTableVersion.NINE,
+ 5, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
PostgresDebeziumAvroPayload.class.getName(),
+ PartialUpdateMode.IGNORE_MARKERS.name(),
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, null, null),
+
+ // Test case: Version 9 table with AWSDmsAvroPayload (should set
custom delete properties)
+ arguments("Version 9 with AWSDmsAvroPayload", null,
AWSDmsAvroPayload.class.getName(), null, null, HoodieTableVersion.NINE,
+ 5, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, AWSDmsAvroPayload.class.getName(), null,
null, "Op", "D"),
+
+ // Test case: Version 9 table with EventTimeAvroPayload (event time
based payload)
+ arguments("Version 9 with EventTimeAvroPayload", null,
EventTimeAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+ 3, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, EventTimeAvroPayload.class.getName(),
null, null, null, null),
+
+ // Test case: Version 9 table with MySqlDebeziumAvroPayload (event
time based payload)
+ arguments("Version 9 with MySqlDebeziumAvroPayload", null,
MySqlDebeziumAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+ 3, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, MySqlDebeziumAvroPayload.class.getName(),
null, null, null, null)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void testInferMergingConfigsForV9TableCreation(String testName,
RecordMergeMode recordMergeMode, String payloadClassName,
+ String recordMergeStrategyId,
String orderingFieldName, HoodieTableVersion tableVersion,
+ int expectedConfigSize,
String expectedMergeMode, String expectedPayloadClass,
+ String
expectedMergeStrategyId, String expectedLegacyPayloadClass,
+ String
expectedPartialUpdateMode, String expectedDebeziumMarker,
+ String expectedDeleteKey,
String expectedDeleteMarker) {
+ if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
+
assertExceptionWithInferMergingConfigsForV9TableCreation(recordMergeMode,
payloadClassName, recordMergeStrategyId, orderingFieldName, tableVersion);
+ } else {
+ Map<String, String> configs =
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+ recordMergeMode, payloadClassName, recordMergeStrategyId,
orderingFieldName, tableVersion);
+
+ assertEquals(expectedConfigSize, configs.size(), "Config size mismatch
for: " + testName);
+ if (expectedMergeMode != null) {
+ assertEquals(expectedMergeMode, configs.get(RECORD_MERGE_MODE.key()),
+ "Merge mode mismatch for: " + testName);
+ } else {
+ assertFalse(configs.containsKey(RECORD_MERGE_MODE.key()), "Record
merge mode not expected to be set");
+ }
+ if (expectedPayloadClass != null) {
+ assertEquals(expectedPayloadClass,
configs.get(PAYLOAD_CLASS_NAME.key()),
+ "Payload class mismatch for: " + testName);
+ } else {
+ assertFalse(configs.containsKey(PAYLOAD_CLASS_NAME.key()), "Payload
class not expected to be set");
+ }
+ if (expectedMergeStrategyId != null) {
+ assertEquals(expectedMergeStrategyId,
configs.get(RECORD_MERGE_STRATEGY_ID.key()),
+ "Merge strategy ID mismatch for: " + testName);
+ } else {
+ assertFalse(configs.containsKey(RECORD_MERGE_STRATEGY_ID.key()),
"Record merge strategy id not expected to be set");
+ }
+
+ if (expectedLegacyPayloadClass != null) {
+ assertEquals(expectedLegacyPayloadClass,
configs.get(LEGACY_PAYLOAD_CLASS_NAME.key()),
+ "Legacy payload class mismatch for: " + testName);
+ } else {
+ assertFalse(configs.containsKey(LEGACY_PAYLOAD_CLASS_NAME.key()),
"Legacy payload class not expected to be set");
+ }
+
+ if (expectedPartialUpdateMode != null) {
+ assertEquals(expectedPartialUpdateMode,
configs.get(HoodieTableConfig.PARTIAL_UPDATE_MODE.key()),
+ "Partial update mode mismatch for: " + testName);
+ } else {
+
assertFalse(configs.containsKey(HoodieTableConfig.PARTIAL_UPDATE_MODE.key()),
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() + " not expected to be set");
+ }
+
+ if (expectedDebeziumMarker != null) {
+ assertEquals(expectedDebeziumMarker, configs.get(
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
+ "Debezium marker mismatch for: " + testName);
+ } else {
+
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
+ HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
+ "Custom merge property " +
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER + " not expected to be set");
+ }
+
+ if (expectedDeleteKey != null) {
+ assertEquals(expectedDeleteKey,
configs.get(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY),
+ "Delete key mismatch for: " + testName);
+ } else {
+
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
+ DELETE_KEY),
+ "Custom merge property " +
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + " not expected
to be set");
+ }
+
+ if (expectedDeleteMarker != null) {
+ assertEquals(expectedDeleteMarker,
configs.get(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER),
+ "Delete marker mismatch for: " + testName);
+ } else {
+
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
+ DELETE_MARKER),
+ "Custom merge property " +
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + " not
expected to be set");
+ }
+ }
+ }
+
+ @Test
+ void testInferMergingConfigsForVersion9WithInconsistentConfigs() {
+ // Test case: Inconsistent merge mode and strategy should throw exception
+ assertThrows(IllegalArgumentException.class, () -> {
+ HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+ EVENT_TIME_ORDERING, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
"ts", HoodieTableVersion.NINE);
+ });
+ assertThrows(IllegalArgumentException.class, () -> {
+ HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+ COMMIT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
null, HoodieTableVersion.NINE);
+ });
+ assertThrows(IllegalArgumentException.class, () -> {
+ HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+ CUSTOM, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
HoodieTableVersion.NINE);
+ });
+ }
+
+ @Test
+ void testInferMergingConfigsForVersion9EdgeCases() {
+ // Test case: Empty string payload class should be treated as null
+ Map<String, String> configs =
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+ EVENT_TIME_ORDERING, "", EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts",
HoodieTableVersion.NINE);
+ assertEquals(2, configs.size());
+ assertEquals(EVENT_TIME_ORDERING.name(),
configs.get(RECORD_MERGE_MODE.key()));
+ assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
+
+ // Test case: Non-version 9 table with all parameters should throw
+
assertExceptionWithInferMergingConfigsForV9TableCreation(EVENT_TIME_ORDERING,
DefaultHoodieRecordPayload.class.getName(),
+ EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.EIGHT);
+
+ // Test case: Version 9 table with null ordering field for event time
ordering should still work
+ configs = HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+ EVENT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
HoodieTableVersion.NINE);
+ assertEquals(2, configs.size());
+ assertEquals(EVENT_TIME_ORDERING.name(),
configs.get(RECORD_MERGE_MODE.key()));
+ assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
+ }
+
+ @Test
+ void testInferMergingConfigsForVersion9WithAllTableVersions() {
+ // Test that only version 9 returns configs, others throw exception
+ for (HoodieTableVersion version : HoodieTableVersion.values()) {
+ if (version.lesserThan(HoodieTableVersion.NINE)) {
+
assertExceptionWithInferMergingConfigsForV9TableCreation(EVENT_TIME_ORDERING,
DefaultHoodieRecordPayload.class.getName(),
+ EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", version);
+ } else {
+ Map<String, String> configs =
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+ EVENT_TIME_ORDERING, DefaultHoodieRecordPayload.class.getName(),
+ EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", version);
+ assertEquals(3, configs.size(), "Table version 9 and above should
return 3 configs");
+ }
+ }
+ }
+
+ private void
assertExceptionWithInferMergingConfigsForV9TableCreation(RecordMergeMode
recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+ String
orderingFieldName,
+
HoodieTableVersion tableVersion) {
+ HoodieIOException ioException = assertThrows(HoodieIOException.class, ()
-> {
+ HoodieTableConfig.inferMergingConfigsForV9TableCreation(recordMergeMode,
payloadClassName,
+ recordMergeStrategyId, orderingFieldName, tableVersion);
+ });
+ assertEquals("Unsupported flow for table versions less than 9",
ioException.getMessage().toString());
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1ac3a3fb0f3c..f2d938ceeaed 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -1125,10 +1125,11 @@ class HoodieSparkSqlWriterInternal {
HoodieTableVersion.fromVersionCode(
SparkConfigUtils.getStringWithAltKeys(mergedParams,
WRITE_TABLE_VERSION).toInt)
}
+ // Handle merge properties.
if (!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_MODE.key())
||
!mergedParams.contains(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key())
||
!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key())) {
- val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
+ val inferredMergeConfigs =
HoodieTableConfig.inferMergingConfigsForWrites(
RecordMergeMode.getValue(mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
null)),
mergedParams.getOrElse(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), ""),
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(),
""),
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index b4be1229605e..669642a644e2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -24,6 +24,7 @@ import
org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonCo
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord,
OverwriteWithLatestAvroPayload, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
+import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.HoodieWriteConfig.{RECORD_MERGE_MODE,
SPARK_SQL_MERGE_INTO_PREPPED_KEY}
import org.apache.hudi.exception.HoodieException
@@ -216,6 +217,33 @@ object HoodieWriterUtils {
validateTableConfig(spark, params, tableConfig, false)
}
+ /**
+ * This function adds specific rules to choose the right config key for
payload class for version 9 tables.
+ *
+ * RULE 1: When
+ * 1. table version is 9,
+ * 2. writer key is a payload class key, and
+ * 3. table config has legacy payload class configured,
+ * then
+ * return legacy payload class key.
+ *
+ * Basic rule:
+ * return writer key.
+ */
+ def getPayloadClassConfigKeyFromTableConfig(key: String, tableConfig:
HoodieConfig): String = {
+ if (tableConfig == null) {
+ key
+ } else {
+ if (tableConfig.getInt(HoodieTableConfig.VERSION) ==
HoodieTableVersion.NINE.versionCode()
+ && !StringUtils.isNullOrEmpty(tableConfig.getStringOrDefault(
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME,
StringUtils.EMPTY_STRING).trim)) {
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key
+ } else {
+ key
+ }
+ }
+ }
+
/**
* Detects conflicts between new parameters and existing table configurations
*/
@@ -227,9 +255,14 @@ object HoodieWriterUtils {
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
if (!shouldIgnoreConfig(key, value, params, tableConfig)) {
- val existingValue =
getStringFromTableConfigWithAlternatives(tableConfig, key)
+ val keyInTableConfig = if
(key.equals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key)) {
+ getPayloadClassConfigKeyFromTableConfig(key, tableConfig)
+ } else {
+ key
+ }
+ val existingValue =
getStringFromTableConfigWithAlternatives(tableConfig, keyInTableConfig)
if (null != existingValue && !resolver(existingValue, value)) {
-
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(keyInTableConfig)}\n")
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
index 959654ff7680..a8ec70d657eb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
@@ -17,20 +17,27 @@
package org.apache.hudi;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.util.JavaScalaConverters;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.Properties;
+import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getMetaClientBuilder;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
class TestHoodieWriterUtils extends HoodieClientTestBase {
@@ -43,4 +50,39 @@ class TestHoodieWriterUtils extends HoodieClientTestBase {
properties.put(HoodieTableConfig.DATABASE_NAME.key(),
"databaseFromCatalog");
Assertions.assertDoesNotThrow(() ->
HoodieWriterUtils.validateTableConfig(sparkSession,
JavaScalaConverters.convertJavaPropertiesToScalaMap(properties), tableConfig));
}
+
+ @Test
+ void
testGetKeyInTableConfigTableVersion9PayloadClassKeyWithoutLegacyPayloadClass() {
+ HoodieConfig config = new HoodieConfig();
+ String result = HoodieWriterUtils.getPayloadClassConfigKeyFromTableConfig(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+ Assertions.assertEquals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
result);
+ }
+
+ private static Stream<Arguments>
testGetKeyInTableConfigTableVersion9PayloadClassKeyWithLegacyPayloadClass() {
+
+ Stream<Arguments> arguments = Stream.of(
+ arguments(HoodieTableVersion.EIGHT,
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.compaction.payload.class"),
+ arguments(HoodieTableVersion.NINE,
+ "", "hoodie.compaction.payload.class"),
+ arguments(HoodieTableVersion.NINE,
+ " ", "hoodie.compaction.payload.class"),
+ arguments(HoodieTableVersion.NINE,
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.table.legacy.payload.class")
+ );
+ return arguments;
+ }
+
+ @ParameterizedTest
+ @MethodSource()
+ void
testGetKeyInTableConfigTableVersion9PayloadClassKeyWithLegacyPayloadClass(
+ HoodieTableVersion tableVersion, String legacyPayloadToSet, String
expectedKey) {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME,
legacyPayloadToSet);
+ config.setValue(HoodieTableConfig.VERSION,
String.valueOf(tableVersion.versionCode()));
+ String result = HoodieWriterUtils.getPayloadClassConfigKeyFromTableConfig(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+ Assertions.assertEquals(expectedKey, result);
+ }
}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 56507994fa69..bd4e62a070b7 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -772,7 +772,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable,
}
val mergeMode = if
(tableConfig.getTableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
- val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
+ val inferredMergeConfigs =
HoodieTableConfig.inferMergingConfigsForWrites(
tableConfig.getRecordMergeMode,
tableConfig.getPayloadClass,
tableConfig.getRecordMergeStrategyId,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDecimalTypeDataWorkflow.scala
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDecimalTypeDataWorkflow.scala
index c4014bc5719a..8913e2a2df65 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDecimalTypeDataWorkflow.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDecimalTypeDataWorkflow.scala
@@ -26,7 +26,6 @@ import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType,
StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
index 5dbe1faca544..a5b2b996d9f6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
@@ -126,7 +126,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
void testRegularMerging(RecordMergeMode mergeMode, PartialUpdateMode
updateMode, MergeStage stage) throws IOException {
if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
props.put(
- HoodieTableConfig.MERGE_PROPERTIES_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER,
IGNORE_MARKERS_VALUE);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index ecd3dd3f45f9..bc749a0c8428 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -34,7 +34,6 @@ import
org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
@@ -88,7 +87,6 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
@@ -434,13 +432,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(testTable, true);
assertTrue(metadataWriter.isPresent());
- Triple<RecordMergeMode, String, String> inferredMergeConfs =
- HoodieTableConfig.inferCorrectMergingBehavior(
- writeConfig.getRecordMergeMode(), writeConfig.getPayloadClass(),
- writeConfig.getRecordMergeStrategyId(), String.join(",",
writeConfig.getPreCombineFields()),
- metaClient.getTableConfig().getTableVersion());
- HoodieTableConfig hoodieTableConfig =
- new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
inferredMergeConfs.getLeft(), inferredMergeConfs.getMiddle(),
inferredMergeConfs.getRight());
+ HoodieTableConfig hoodieTableConfig = new HoodieTableConfig(
+ this.storage, metaClient.getMetaPath(),
writeConfig.getRecordMergeMode(), writeConfig.getPayloadClass(),
writeConfig.getRecordMergeStrategyId());
assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
// Turn off metadata table
@@ -457,16 +450,11 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
Option metadataWriter2 = table2.getMetadataWriter(instant2);
assertFalse(metadataWriter2.isPresent());
- Triple<RecordMergeMode, String, String> inferredMergeConfs2 =
- HoodieTableConfig.inferCorrectMergingBehavior(
- writeConfig2.getRecordMergeMode(), writeConfig2.getPayloadClass(),
- writeConfig2.getRecordMergeStrategyId(), String.join(",",
writeConfig2.getPreCombineFields()),
- metaClient.getTableConfig().getTableVersion());
HoodieTableConfig hoodieTableConfig2 =
new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
- inferredMergeConfs2.getLeft(),
- inferredMergeConfs2.getMiddle(),
- inferredMergeConfs2.getRight());
+ writeConfig2.getRecordMergeMode(),
+ writeConfig2.getPayloadClass(),
+ writeConfig2.getRecordMergeStrategyId());
assertEquals(Collections.emptySet(),
hoodieTableConfig2.getMetadataPartitions());
// Assert metadata table folder is deleted
assertFalse(metaClient.getStorage().exists(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 9176369a4d67..b76b8a611c49 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1558,8 +1558,9 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
}
}
- @Test
- def testMergerStrategySet(): Unit = {
+ @ParameterizedTest
+ @CsvSource(Array("8", "9"))
+ def testMergerStrategySet(tableVersion: String): Unit = {
val (writeOpts, _) = getWriterReaderOpts()
val input = recordsToStrings(dataGen.generateInserts("000", 1)).asScala
val inputDf= spark.read.json(spark.sparkContext.parallelize(input.toSeq,
1))
@@ -1570,6 +1571,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(),
mergerStrategyName)
.option(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
RecordMergeMode.CUSTOM.name())
+ .option(HoodieWriteConfig.WRITE_TABLE_VERSION.key, tableVersion)
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = createMetaClient(spark, basePath)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index 647ee6868df5..3f6840fc6d5a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -21,27 +21,29 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE}
-import org.apache.hudi.common.model.{AWSDmsAvroPayload, EventTimeAvroPayload,
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload,
PartialUpdateAvroPayload}
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.model.{AWSDmsAvroPayload,
DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger,
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload,
PartialUpdateAvroPayload}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY,
DELETE_MARKER}
+import org.apache.hudi.common.model.debezium.{MySqlDebeziumAvroPayload,
PostgresDebeziumAvroPayload}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieException
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.SaveMode
-import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.scalatest.Assertions.assertThrows
class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness {
@ParameterizedTest
- @MethodSource(Array("provideParams"))
+ @MethodSource(Array("providePayloadClassTestCases"))
def testMergerBuiltinPayload(tableType: String,
- payloadClazz: String): Unit = {
+ payloadClazz: String,
+ expectedConfigs: Map[String, String]): Unit = {
val opts: Map[String, String] = Map(
- HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
- HoodieTableConfig.MERGE_PROPERTIES_PREFIX +
"hoodie.payload.delete.field" -> "Op",
- HoodieTableConfig.MERGE_PROPERTIES_PREFIX +
"hoodie.payload.delete.marker" -> "d")
+ HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz)
val columns = Seq("ts", "key", "rider", "driver", "fare", "Op")
-
// 1. Add an insert.
val data = Seq(
(10, "1", "rider-A", "driver-A", 19.10, "i"),
@@ -59,15 +61,30 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
options(opts).
mode(SaveMode.Overwrite).
save(basePath)
+ // Verify table was created successfully
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
+ val tableConfig = metaClient.getTableConfig
+ // Verify table version is 9
+ assertEquals(9, tableConfig.getTableVersion.versionCode())
+ // Verify expected configs are set correctly
+ expectedConfigs.foreach { case (key, expectedValue) =>
+ if (expectedValue != null) {
+ assertEquals(expectedValue, tableConfig.getString(key), s"Config $key
should be $expectedValue")
+ } else {
+ assertFalse(tableConfig.contains(key), s"Config $key should not be
present")
+ }
+ }
// 2. Add an update.
val firstUpdateData = Seq(
- (11, "1", "rider-X", "driver-X", 19.10, "d"),
+ (11, "1", "rider-X", "driver-X", 19.10, "D"),
(11, "2", "rider-Y", "driver-Y", 27.70, "u"))
val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
firstUpdate.write.format("hudi").
option(OPERATION.key(), "upsert").
option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
- options(opts).
mode(SaveMode.Append).
save(basePath)
// 3. Add an update.
@@ -78,27 +95,41 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns:
_*)
secondUpdate.write.format("hudi").
option(OPERATION.key(), "upsert").
- option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1").
- options(opts).
mode(SaveMode.Append).
save(basePath)
- // 4. Validate.
- val df = spark.read.format("hudi").options(opts).load(basePath)
+ // 4. Add a trivial update to trigger payload class mismatch.
+ val thirdUpdateData = Seq(
+ (12, "3", "rider-CC", "driver-CC", 33.90, "i"))
+ val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+ if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+ assertThrows[HoodieException] {
+ thirdUpdate.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+ option(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
+ classOf[MySqlDebeziumAvroPayload].getName). // Position is
important.
+ mode(SaveMode.Append).
+ save(basePath)
+ }
+ }
+ // 5. Validate.
+ val df = spark.read.format("hudi").load(basePath)
val finalDf = df.select("ts", "key", "rider", "driver", "fare",
"Op").sort("key")
val expectedData = if
(!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
- if (payloadClazz.equals(classOf[PartialUpdateAvroPayload].getName)
- || payloadClazz.equals(classOf[EventTimeAvroPayload].getName)) {
+ if
(HoodieTableConfig.EVENT_TIME_ORDERING_PAYLOADS.contains(payloadClazz)) {
Seq(
- (11, "1", "rider-X", "driver-X", 19.10, "d"),
+ (11, "1", "rider-X", "driver-X", 19.10, "D"),
(11, "2", "rider-Y", "driver-Y", 27.70, "u"),
(12, "3", "rider-CC", "driver-CC", 33.90, "i"),
(10, "4", "rider-D", "driver-D", 34.15, "i"),
(12, "5", "rider-EE", "driver-EE", 17.85, "i"))
} else {
Seq(
- (11, "1", "rider-X", "driver-X", 19.10, "d"),
+ (11, "1", "rider-X", "driver-X", 19.10, "D"),
(11, "2", "rider-Y", "driver-Y", 27.70, "u"),
(12, "3", "rider-CC", "driver-CC", 33.90, "i"),
(9, "4", "rider-DD", "driver-DD", 34.15, "i"),
@@ -120,13 +151,77 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
// TODO: Add COPY_ON_WRITE table type tests when write path is updated
accordingly.
object TestPayloadDeprecationFlow {
- def provideParams(): java.util.List[Arguments] = {
+ def providePayloadClassTestCases(): java.util.List[Arguments] = {
java.util.Arrays.asList(
- Arguments.of("MERGE_ON_READ",
classOf[OverwriteWithLatestAvroPayload].getName),
- Arguments.of("MERGE_ON_READ",
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName),
- Arguments.of("MERGE_ON_READ", classOf[PartialUpdateAvroPayload].getName),
- Arguments.of("MERGE_ON_READ", classOf[EventTimeAvroPayload].getName),
- Arguments.of("MERGE_ON_READ", classOf[AWSDmsAvroPayload].getName)
+ Arguments.of(
+ "MERGE_ON_READ",
+ classOf[DefaultHoodieRecordPayload].getName,
+ Map(
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)),
+ Arguments.of(
+ "MERGE_ON_READ",
+ classOf[OverwriteWithLatestAvroPayload].getName,
+ Map(
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+ )
+ ),
+ Arguments.of(
+ "MERGE_ON_READ",
+ classOf[PartialUpdateAvroPayload].getName,
+ Map(
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PartialUpdateAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+ Arguments.of(
+ "MERGE_ON_READ",
+ classOf[PostgresDebeziumAvroPayload].getName,
+ Map(
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PostgresDebeziumAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
+ + HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER ->
"__debezium_unavailable_value"),
+ Arguments.of(
+ "MERGE_ON_READ",
+ classOf[AWSDmsAvroPayload].getName,
+ Map(
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[AWSDmsAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY -> "Op",
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER -> "D"),
+ Arguments.of(
+ "MERGE_ON_READ",
+ classOf[MySqlDebeziumAvroPayload].getName,
+ Map(
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[MySqlDebeziumAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)),
+ Arguments.of(
+ "MERGE_ON_READ",
+ classOf[EventTimeAvroPayload].getName,
+ Map(
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[EventTimeAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
+ )
+ ),
+ Arguments.of(
+ "MERGE_ON_READ",
+ classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
+ Map(
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
+ )
+ )
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 61df5fc76d17..5127924ac08f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -267,7 +267,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
deleteDf.cache()
deleteDf.write.format("hudi")
.options(hudiOpts)
- .option("hoodie.datasource.write.payload.class",
"org.apache.hudi.common.model.EmptyHoodieRecordPayload")
+ .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
val prevDf = mergedDfList.last
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
index 2b359d483ba0..eac9a29e5d88 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
@@ -71,7 +71,6 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
Map(
HoodieTableConfig.VERSION.key -> tableVersion,
HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
- HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName,
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key ->
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
}
val nonExistentConfigs = if (tableVersion.toInt == 6) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
index 572eaa82744b..7122183e249c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
@@ -78,7 +78,6 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
HoodieTableConfig.VERSION.key ->
String.valueOf(HoodieTableVersion.current().versionCode()),
HoodieTableConfig.PRECOMBINE_FIELDS.key -> "ts",
HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name(),
- HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[DefaultHoodieRecordPayload].getName,
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key ->
EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
}
val nonExistentConfigs = if (tableVersion.toInt == 6) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index a5e9e56fb81d..0fc23f5af60b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -136,7 +136,6 @@ class TestRepairsProcedure extends
HoodieSparkProcedureTestBase {
val tableVersion = HoodieTableVersion.current().versionCode()
val expectedOutput =s"""
|[hoodie.archivelog.folder,archived,archive]
-
|[hoodie.compaction.payload.class,org.apache.hudi.common.model.DefaultHoodieRecordPayload,null]
|[hoodie.database.name,default,null]
|[hoodie.datasource.write.drop.partition.columns,false,false]
|[hoodie.datasource.write.hive_style_partitioning,true,null]
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 02b189e382f4..191308ad4ddc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -157,7 +157,7 @@ public class HoodieStreamer implements Serializable {
Option<TypedProperties> propsOverride,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
this.properties = combineProperties(cfg, propsOverride,
jssc.hadoopConfiguration());
Triple<RecordMergeMode, String, String> mergingConfigs =
- HoodieTableConfig.inferCorrectMergingBehavior(
+ HoodieTableConfig.inferMergingConfigsForWrites(
cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingFields,
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(this.properties,
HoodieWriteConfig.WRITE_TABLE_VERSION)));
cfg.recordMergeMode = mergingConfigs.getLeft();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 7ba953907280..a5a72e395aa7 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -650,7 +650,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
}
cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
Triple<RecordMergeMode, String, String> mergeCfgs =
- HoodieTableConfig.inferCorrectMergingBehavior(
+ HoodieTableConfig.inferMergingConfigsForWrites(
cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingFields,
HoodieTableVersion.current());
cfg.recordMergeMode = mergeCfgs.getLeft();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index a5e0ad7d6831..f5148f3fcd85 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1858,6 +1858,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
true, false, null, "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
assertRecordCount(1000, dataSetBasePath, sqlContext);
+ HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
+ assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
@@ -1865,9 +1867,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
- //now assert that hoodie.properties file now has updated payload class name
- HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
- assertEquals(metaClient.getTableConfig().getPayloadClass(),
DummyAvroPayload.class.getName());
+ // NOTE: Payload class cannot be updated, though the write can be executed
using different payload classes in the runtime.
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
}
@Test
@@ -1884,6 +1886,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertEquals(metaClient.getTableConfig().getPayloadClass(),
PartialUpdateAvroPayload.class.getName());
}
+ @Disabled("To be fixed with HUDI-9714")
@Test
public void testPayloadClassUpdateWithCOWTable() throws Exception {
String dataSetBasePath = basePath + "/test_dataset_cow";
@@ -1900,7 +1903,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
props.load(inputStream);
}
- assertTrue(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
+ assertFalse(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
assertTrue(props.containsKey(HoodieTableConfig.RECORD_MERGE_MODE.key()));
assertTrue(props.containsKey(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key()));