lokeshj1703 commented on code in PR #13615:
URL: https://github.com/apache/hudi/pull/13615#discussion_r2265949724
##########
hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java:
##########
@@ -45,6 +45,7 @@
public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
public static final String OP_FIELD = "Op";
+ public static final String D_VALUE = "D";
Review Comment:
NIT: Rename to DELETE_OPERATION_VALUE
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java:
##########
@@ -128,6 +128,7 @@ void testRecordMergeConfigForCommitTimeOrdering(boolean
useLegacyConfig) throws
assertThat(tableConfig.getRecordMergeStrategyId(),
is(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID));
assertThat(tableConfig.getPayloadClass(),
is(OverwriteWithLatestAvroPayload.class.getName()));
+
Review Comment:
unintended change
##########
hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java:
##########
@@ -43,4 +48,67 @@ void validateTableConfig() throws IOException {
properties.put(HoodieTableConfig.DATABASE_NAME.key(),
"databaseFromCatalog");
Assertions.assertDoesNotThrow(() ->
HoodieWriterUtils.validateTableConfig(sparkSession,
JavaScalaConverters.convertJavaPropertiesToScalaMap(properties), tableConfig));
}
+
+ @Test
Review Comment:
Can we club these testcases in a single test or fewer tests?
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java:
##########
@@ -510,4 +522,166 @@ void testInferMergeMode(RecordMergeMode inputMergeMode,
String inputPayloadClass
}
});
}
+
+ private static Stream<Arguments>
argumentsForInferMergingConfigsForVersion9() {
+ return Stream.of(
+ // Test case: Non-version 9 table should return empty configs
+ arguments(
+ "Non-version 9 table", EVENT_TIME_ORDERING, null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.EIGHT,
+ 0, null, null, null, null, null, null, null, null, null),
+
+ // Test case: Version 9 table with null payload class and event time
ordering
+ arguments("Version 9 with event time ordering", EVENT_TIME_ORDERING,
null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.NINE,
+ 2, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+ // Test case: Version 9 table with null payload class and commit time
ordering
+ arguments("Version 9 with commit time ordering", COMMIT_TIME_ORDERING,
null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
+ 2, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+ // Test case: Version 9 table with null payload class and custom merge
mode
+ arguments("Version 9 with custom merge mode", CUSTOM, null,
CUSTOM_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
+ 2, CUSTOM.name(), null, CUSTOM_MERGE_STRATEGY_UUID, null, null,
null, null, null),
+
+ // Test case: Version 9 table with custom payload class (not under
deprecation)
+ arguments("Version 9 with custom payload", null,
"com.example.CustomPayload", null, null, HoodieTableVersion.NINE,
+ 3, CUSTOM.name(), "com.example.CustomPayload",
PAYLOAD_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+ // Test case: Version 9 table with event time based payload
(DefaultHoodieRecordPayload)
+ arguments("Version 9 with DefaultHoodieRecordPayload", null,
DefaultHoodieRecordPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+ 3, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
DefaultHoodieRecordPayload.class.getName(), null, null, null, null),
+
+ // Test case: Version 9 table with commit time based payload
(OverwriteWithLatestAvroPayload)
+ arguments("Version 9 with OverwriteWithLatestAvroPayload", null,
OverwriteWithLatestAvroPayload.class.getName(), null, null,
HoodieTableVersion.NINE,
+ 3, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
OverwriteWithLatestAvroPayload.class.getName(), null, null, null, null),
+
+ // Test case: Version 9 table with PartialUpdateAvroPayload (should
set partial update mode)
+ arguments("Version 9 with PartialUpdateAvroPayload", null,
PartialUpdateAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+ 4, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, PartialUpdateAvroPayload.class.getName(),
PartialUpdateMode.IGNORE_DEFAULTS.name(),
+ null, null, null),
+
+ // Test case: Version 9 table with
OverwriteNonDefaultsWithLatestAvroPayload (should set partial update mode)
+ arguments("Version 9 with OverwriteNonDefaultsWithLatestAvroPayload",
null, OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), null, null,
HoodieTableVersion.NINE,
+ 4, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+ PartialUpdateMode.IGNORE_DEFAULTS.name(), null, null, null),
+
+ // Test case: Version 9 table with PostgresDebeziumAvroPayload (should
set partial update mode and custom properties)
+ arguments("Version 9 with PostgresDebeziumAvroPayload", null,
PostgresDebeziumAvroPayload.class.getName(), null, "ts",
HoodieTableVersion.NINE,
+ 5, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
PostgresDebeziumAvroPayload.class.getName(),
+ PartialUpdateMode.IGNORE_MARKERS.name(),
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, null, null),
+
+ // Test case: Version 9 table with AWSDmsAvroPayload (should set
custom delete properties)
+ arguments("Version 9 with AWSDmsAvroPayload", null,
AWSDmsAvroPayload.class.getName(), null, null, HoodieTableVersion.NINE,
+ 5, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, AWSDmsAvroPayload.class.getName(), null,
null, "Op", "D"),
+
+ // Test case: Version 9 table with EventTimeAvroPayload (event time
based payload)
+ arguments("Version 9 with EventTimeAvroPayload", null,
EventTimeAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+ 3, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, EventTimeAvroPayload.class.getName(),
null, null, null, null),
+
+ // Test case: Version 9 table with MySqlDebeziumAvroPayload (event
time based payload)
+ arguments("Version 9 with MySqlDebeziumAvroPayload", null,
MySqlDebeziumAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+ 3, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, MySqlDebeziumAvroPayload.class.getName(),
null, null, null, null)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("argumentsForInferMergingConfigsForVersion9")
+ void testInferMergingConfigsForVersion9(String testName, RecordMergeMode
recordMergeMode, String payloadClassName,
+ String recordMergeStrategyId, String
orderingFieldName, HoodieTableVersion tableVersion,
+ int expectedConfigSize, String
expectedMergeMode, String expectedPayloadClass,
+ String expectedMergeStrategyId,
String expectedLegacyPayloadClass,
+ String expectedPartialUpdateMode,
String expectedDebeziumMarker,
+ String expectedDeleteKey, String
expectedDeleteMarker) {
+ Map<String, String> configs =
HoodieTableConfig.inferMergingConfigsForVersion9(
+ recordMergeMode, payloadClassName, recordMergeStrategyId,
orderingFieldName, tableVersion);
+
+ assertEquals(expectedConfigSize, configs.size(), "Config size mismatch
for: " + testName);
+ if (expectedMergeMode != null) {
+ assertEquals(expectedMergeMode, configs.get(RECORD_MERGE_MODE.key()),
+ "Merge mode mismatch for: " + testName);
+ }
+ if (expectedPayloadClass != null) {
+ assertEquals(expectedPayloadClass, configs.get(PAYLOAD_CLASS_NAME.key()),
+ "Payload class mismatch for: " + testName);
+ }
+ if (expectedMergeStrategyId != null) {
+ assertEquals(expectedMergeStrategyId,
configs.get(RECORD_MERGE_STRATEGY_ID.key()),
+ "Merge strategy ID mismatch for: " + testName);
+ }
+ if (expectedLegacyPayloadClass != null) {
+ assertEquals(expectedLegacyPayloadClass,
configs.get(LEGACY_PAYLOAD_CLASS_NAME.key()),
+ "Legacy payload class mismatch for: " + testName);
+ }
+ if (expectedPartialUpdateMode != null) {
+ assertEquals(expectedPartialUpdateMode,
configs.get(HoodieTableConfig.PARTIAL_UPDATE_MODE.key()),
+ "Partial update mode mismatch for: " + testName);
+ }
+ if (expectedDebeziumMarker != null) {
+ assertEquals(expectedDebeziumMarker, configs.get(
+ HoodieTableConfig.MERGE_CUSTOM_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
+ "Debezium marker mismatch for: " + testName);
+ }
+ if (expectedDeleteKey != null) {
+ assertEquals(expectedDeleteKey,
configs.get(HoodieTableConfig.MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY),
+ "Delete key mismatch for: " + testName);
+ }
+ if (expectedDeleteMarker != null) {
+ assertEquals(expectedDeleteMarker,
configs.get(HoodieTableConfig.MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_MARKER),
+ "Delete marker mismatch for: " + testName);
+ }
+ }
+
+ @Test
+ void testInferMergingConfigsForVersion9WithInconsistentConfigs() {
+ // Test case: Inconsistent merge mode and strategy should throw exception
+ assertThrows(IllegalArgumentException.class, () -> {
+ HoodieTableConfig.inferMergingConfigsForVersion9(
+ EVENT_TIME_ORDERING, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
"ts", HoodieTableVersion.NINE);
+ });
+ assertThrows(IllegalArgumentException.class, () -> {
+ HoodieTableConfig.inferMergingConfigsForVersion9(
+ COMMIT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
null, HoodieTableVersion.NINE);
+ });
+ assertThrows(IllegalArgumentException.class, () -> {
+ HoodieTableConfig.inferMergingConfigsForVersion9(
+ CUSTOM, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
HoodieTableVersion.NINE);
+ });
+ }
+
+ @Test
+ void testInferMergingConfigsForVersion9EdgeCases() {
+ // Test case: Empty string payload class should be treated as null
+ Map<String, String> configs =
HoodieTableConfig.inferMergingConfigsForVersion9(
+ EVENT_TIME_ORDERING, "", EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts",
HoodieTableVersion.NINE);
+ assertEquals(2, configs.size());
+ assertEquals(EVENT_TIME_ORDERING.name(),
configs.get(RECORD_MERGE_MODE.key()));
+ assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
+
+ // Test case: Non-version 9 table with all parameters should return empty
configs
+ configs = HoodieTableConfig.inferMergingConfigsForVersion9(
+ EVENT_TIME_ORDERING, DefaultHoodieRecordPayload.class.getName(),
+ EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.EIGHT);
+ assertEquals(0, configs.size());
+
+ // Test case: Version 9 table with null ordering field for event time
ordering should still work
+ configs = HoodieTableConfig.inferMergingConfigsForVersion9(
+ EVENT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
HoodieTableVersion.NINE);
+ assertEquals(2, configs.size());
+ assertEquals(EVENT_TIME_ORDERING.name(),
configs.get(RECORD_MERGE_MODE.key()));
+ assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
+ }
+
+ @Test
+ void testInferMergingConfigsForVersion9WithAllTableVersions() {
+ // Test that only version 9 returns configs, others return empty
+ for (HoodieTableVersion version : HoodieTableVersion.values()) {
+ Map<String, String> configs =
HoodieTableConfig.inferMergingConfigsForVersion9(
+ EVENT_TIME_ORDERING, DefaultHoodieRecordPayload.class.getName(),
+ EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", version);
+ if (version == HoodieTableVersion.NINE) {
+ assertTrue(configs.size() > 0, "Version 9 should return configs");
+ } else {
+ assertEquals(0, configs.size(), "Non-version 9 should return empty
configs");
Review Comment:
Should we throw an error in non version 9 for this API?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]