lokeshj1703 commented on code in PR #13615:
URL: https://github.com/apache/hudi/pull/13615#discussion_r2265949724


##########
hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java:
##########
@@ -45,6 +45,7 @@
 public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
 
   public static final String OP_FIELD = "Op";
+  public static final String D_VALUE = "D";

Review Comment:
   NIT: Rename to DELETE_OPERATION_VALUE



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java:
##########
@@ -128,6 +128,7 @@ void testRecordMergeConfigForCommitTimeOrdering(boolean 
useLegacyConfig) throws
     assertThat(tableConfig.getRecordMergeStrategyId(), 
is(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID));
     assertThat(tableConfig.getPayloadClass(), 
is(OverwriteWithLatestAvroPayload.class.getName()));
 
+

Review Comment:
   unintended change



##########
hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java:
##########
@@ -43,4 +48,67 @@ void validateTableConfig() throws IOException {
     properties.put(HoodieTableConfig.DATABASE_NAME.key(), 
"databaseFromCatalog");
     Assertions.assertDoesNotThrow(() -> 
HoodieWriterUtils.validateTableConfig(sparkSession, 
JavaScalaConverters.convertJavaPropertiesToScalaMap(properties), tableConfig));
   }
+
+  @Test

Review Comment:
   Can we club these testcases in a single test or fewer tests?



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java:
##########
@@ -510,4 +522,166 @@ void testInferMergeMode(RecordMergeMode inputMergeMode, 
String inputPayloadClass
           }
         });
   }
+
+  private static Stream<Arguments> 
argumentsForInferMergingConfigsForVersion9() {
+    return Stream.of(
+        // Test case: Non-version 9 table should return empty configs
+        arguments(
+            "Non-version 9 table", EVENT_TIME_ORDERING, null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.EIGHT,
+            0, null, null, null, null, null, null, null, null, null),
+
+        // Test case: Version 9 table with null payload class and event time 
ordering
+        arguments("Version 9 with event time ordering", EVENT_TIME_ORDERING, 
null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.NINE,
+            2, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+        // Test case: Version 9 table with null payload class and commit time 
ordering
+        arguments("Version 9 with commit time ordering", COMMIT_TIME_ORDERING, 
null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
+            2, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+        // Test case: Version 9 table with null payload class and custom merge 
mode
+        arguments("Version 9 with custom merge mode", CUSTOM, null, 
CUSTOM_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
+            2, CUSTOM.name(), null, CUSTOM_MERGE_STRATEGY_UUID, null, null, 
null, null, null),
+
+        // Test case: Version 9 table with custom payload class (not under 
deprecation)
+        arguments("Version 9 with custom payload", null, 
"com.example.CustomPayload", null, null, HoodieTableVersion.NINE,
+            3, CUSTOM.name(), "com.example.CustomPayload", 
PAYLOAD_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+
+        // Test case: Version 9 table with event time based payload 
(DefaultHoodieRecordPayload)
+        arguments("Version 9 with DefaultHoodieRecordPayload", null, 
DefaultHoodieRecordPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+            3, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
DefaultHoodieRecordPayload.class.getName(), null, null, null, null),
+
+        // Test case: Version 9 table with commit time based payload 
(OverwriteWithLatestAvroPayload)
+        arguments("Version 9 with OverwriteWithLatestAvroPayload", null, 
OverwriteWithLatestAvroPayload.class.getName(), null, null, 
HoodieTableVersion.NINE,
+            3, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
OverwriteWithLatestAvroPayload.class.getName(), null, null, null, null),
+
+        // Test case: Version 9 table with PartialUpdateAvroPayload (should 
set partial update mode)
+        arguments("Version 9 with PartialUpdateAvroPayload", null, 
PartialUpdateAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+            4, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, PartialUpdateAvroPayload.class.getName(), 
PartialUpdateMode.IGNORE_DEFAULTS.name(),
+            null, null, null),
+
+        // Test case: Version 9 table with 
OverwriteNonDefaultsWithLatestAvroPayload (should set partial update mode)
+        arguments("Version 9 with OverwriteNonDefaultsWithLatestAvroPayload", 
null, OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), null, null, 
HoodieTableVersion.NINE,
+            4, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+            PartialUpdateMode.IGNORE_DEFAULTS.name(), null, null, null),
+
+        // Test case: Version 9 table with PostgresDebeziumAvroPayload (should 
set partial update mode and custom properties)
+        arguments("Version 9 with PostgresDebeziumAvroPayload", null, 
PostgresDebeziumAvroPayload.class.getName(), null, "ts", 
HoodieTableVersion.NINE,
+            5, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
PostgresDebeziumAvroPayload.class.getName(),
+            PartialUpdateMode.IGNORE_MARKERS.name(), 
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, null, null),
+
+        // Test case: Version 9 table with AWSDmsAvroPayload (should set 
custom delete properties)
+        arguments("Version 9 with AWSDmsAvroPayload", null, 
AWSDmsAvroPayload.class.getName(), null, null, HoodieTableVersion.NINE,
+            5, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, AWSDmsAvroPayload.class.getName(), null, 
null, "Op", "D"),
+
+        // Test case: Version 9 table with EventTimeAvroPayload (event time 
based payload)
+        arguments("Version 9 with EventTimeAvroPayload", null, 
EventTimeAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+            3, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, EventTimeAvroPayload.class.getName(), 
null, null, null, null),
+
+        // Test case: Version 9 table with MySqlDebeziumAvroPayload (event 
time based payload)
+        arguments("Version 9 with MySqlDebeziumAvroPayload", null, 
MySqlDebeziumAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
+            3, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, MySqlDebeziumAvroPayload.class.getName(), 
null, null, null, null)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsForInferMergingConfigsForVersion9")
+  void testInferMergingConfigsForVersion9(String testName, RecordMergeMode 
recordMergeMode, String payloadClassName,
+                                          String recordMergeStrategyId, String 
orderingFieldName, HoodieTableVersion tableVersion,
+                                          int expectedConfigSize, String 
expectedMergeMode, String expectedPayloadClass,
+                                          String expectedMergeStrategyId, 
String expectedLegacyPayloadClass,
+                                          String expectedPartialUpdateMode, 
String expectedDebeziumMarker,
+                                          String expectedDeleteKey, String 
expectedDeleteMarker) {
+    Map<String, String> configs = 
HoodieTableConfig.inferMergingConfigsForVersion9(
+        recordMergeMode, payloadClassName, recordMergeStrategyId, 
orderingFieldName, tableVersion);
+
+    assertEquals(expectedConfigSize, configs.size(), "Config size mismatch 
for: " + testName);
+    if (expectedMergeMode != null) {
+      assertEquals(expectedMergeMode, configs.get(RECORD_MERGE_MODE.key()),
+          "Merge mode mismatch for: " + testName);
+    }
+    if (expectedPayloadClass != null) {
+      assertEquals(expectedPayloadClass, configs.get(PAYLOAD_CLASS_NAME.key()),
+          "Payload class mismatch for: " + testName);
+    }
+    if (expectedMergeStrategyId != null) {
+      assertEquals(expectedMergeStrategyId, 
configs.get(RECORD_MERGE_STRATEGY_ID.key()),
+          "Merge strategy ID mismatch for: " + testName);
+    }
+    if (expectedLegacyPayloadClass != null) {
+      assertEquals(expectedLegacyPayloadClass, 
configs.get(LEGACY_PAYLOAD_CLASS_NAME.key()),
+          "Legacy payload class mismatch for: " + testName);
+    }
+    if (expectedPartialUpdateMode != null) {
+      assertEquals(expectedPartialUpdateMode, 
configs.get(HoodieTableConfig.PARTIAL_UPDATE_MODE.key()),
+          "Partial update mode mismatch for: " + testName);
+    }
+    if (expectedDebeziumMarker != null) {
+      assertEquals(expectedDebeziumMarker, configs.get(
+          HoodieTableConfig.MERGE_CUSTOM_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
+          "Debezium marker mismatch for: " + testName);
+    }
+    if (expectedDeleteKey != null) {
+      assertEquals(expectedDeleteKey, 
configs.get(HoodieTableConfig.MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY),
+          "Delete key mismatch for: " + testName);
+    }
+    if (expectedDeleteMarker != null) {
+      assertEquals(expectedDeleteMarker, 
configs.get(HoodieTableConfig.MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_MARKER),
+          "Delete marker mismatch for: " + testName);
+    }
+  }
+
+  @Test
+  void testInferMergingConfigsForVersion9WithInconsistentConfigs() {
+    // Test case: Inconsistent merge mode and strategy should throw exception
+    assertThrows(IllegalArgumentException.class, () -> {
+      HoodieTableConfig.inferMergingConfigsForVersion9(
+          EVENT_TIME_ORDERING, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
"ts", HoodieTableVersion.NINE);
+    });
+    assertThrows(IllegalArgumentException.class, () -> {
+      HoodieTableConfig.inferMergingConfigsForVersion9(
+          COMMIT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
null, HoodieTableVersion.NINE);
+    });
+    assertThrows(IllegalArgumentException.class, () -> {
+      HoodieTableConfig.inferMergingConfigsForVersion9(
+          CUSTOM, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, 
HoodieTableVersion.NINE);
+    });
+  }
+
+  @Test
+  void testInferMergingConfigsForVersion9EdgeCases() {
+    // Test case: Empty string payload class should be treated as null
+    Map<String, String> configs = 
HoodieTableConfig.inferMergingConfigsForVersion9(
+        EVENT_TIME_ORDERING, "", EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", 
HoodieTableVersion.NINE);
+    assertEquals(2, configs.size());
+    assertEquals(EVENT_TIME_ORDERING.name(), 
configs.get(RECORD_MERGE_MODE.key()));
+    assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
+
+    // Test case: Non-version 9 table with all parameters should return empty 
configs
+    configs = HoodieTableConfig.inferMergingConfigsForVersion9(
+        EVENT_TIME_ORDERING, DefaultHoodieRecordPayload.class.getName(),
+        EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.EIGHT);
+    assertEquals(0, configs.size());
+
+    // Test case: Version 9 table with null ordering field for event time 
ordering should still work
+    configs = HoodieTableConfig.inferMergingConfigsForVersion9(
+        EVENT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, 
HoodieTableVersion.NINE);
+    assertEquals(2, configs.size());
+    assertEquals(EVENT_TIME_ORDERING.name(), 
configs.get(RECORD_MERGE_MODE.key()));
+    assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
+  }
+
+  @Test
+  void testInferMergingConfigsForVersion9WithAllTableVersions() {
+    // Test that only version 9 returns configs, others return empty
+    for (HoodieTableVersion version : HoodieTableVersion.values()) {
+      Map<String, String> configs = 
HoodieTableConfig.inferMergingConfigsForVersion9(
+          EVENT_TIME_ORDERING, DefaultHoodieRecordPayload.class.getName(), 
+          EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", version);
+      if (version == HoodieTableVersion.NINE) {
+        assertTrue(configs.size() > 0, "Version 9 should return configs");
+      } else {
+        assertEquals(0, configs.size(), "Non-version 9 should return empty 
configs");

Review Comment:
   Should we throw an error in non version 9 for this API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to