lokeshj1703 commented on code in PR #13526:
URL: https://github.com/apache/hudi/pull/13526#discussion_r2221841788


##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -185,6 +189,49 @@ public void 
testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode,
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"avro", "parquet"})
+  public void testReadFileGroupWithMultipleOrderingFields(String 
logDataBlockFormat) throws Exception {
+    RecordMergeMode recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(recordMergeMode, true));
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
+    writeConfigs.put("hoodie.datasource.write.table.type", 
HoodieTableType.MERGE_ON_READ.name());
+    // Use two precombine values - combination of timestamp and rider
+    String orderingValues = "timestamp,rider";
+    writeConfigs.put("hoodie.datasource.write.precombine.field", 
orderingValues);
+    writeConfigs.put("hoodie.payload.ordering.field", orderingValues);
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
+      // Initial commit. rider column gets value of rider-002
+      List<HoodieRecord> initialRecords = dataGen.generateInserts("002", 100);
+      commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
+      validateOutputFromFileGroupReader(
+          getStorageConf(), getBasePath(), true, 0, recordMergeMode,
+          initialRecords, initialRecords);
+
+      // The updates have rider values as rider-001 and the existing records 
have rider values as rider-001

Review Comment:
   Addressed



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1955,6 +1957,62 @@ public void testFilterDupesWithPrecombine(
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType 
tableType) throws Exception {
+    String tableBasePath = basePath + "/test_with_multiple_ordering_fields";
+    HoodieDeltaStreamer.Config cfg =
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    cfg.tableType = tableType.name();
+    cfg.filterDupes = true;
+    cfg.sourceOrderingFields = "timestamp,rider";
+    cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.recordMergeStrategyId = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+    // Set record merge mode to event time ordering
+    cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name()));
+    //addRecordMerger(HoodieRecordType.AVRO, cfg.configs);

Review Comment:
   Addressed



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java:
##########
@@ -41,6 +41,7 @@ public class TestDataSource extends AbstractBaseTestSource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDataSource.class);
   public static boolean returnEmptyBatch = false;
+  public static Option<String> recordInstantTime = Option.empty();

Review Comment:
   Addressed



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1955,6 +1957,62 @@ public void testFilterDupesWithPrecombine(
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType 
tableType) throws Exception {
+    String tableBasePath = basePath + "/test_with_multiple_ordering_fields";
+    HoodieDeltaStreamer.Config cfg =
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    cfg.tableType = tableType.name();
+    cfg.filterDupes = true;
+    cfg.sourceOrderingFields = "timestamp,rider";
+    cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.recordMergeStrategyId = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+    // Set record merge mode to event time ordering
+    cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name()));
+    //addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
+    TestDataSource.recordInstantTime = Option.of("002");
+    new HoodieStreamer(cfg, jsc).sync();
+
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
+
+    // Generate new updates with lower recordInstantTime so that updates are 
rejected
+    TestDataSource.recordInstantTime = Option.of("001");
+    runStreamSync(cfg, false, 50, WriteOperationType.UPSERT);
+    int numInserts = 25;
+    // TestDataSource generates 500 inserts, 450 updates and 50 deletes
+    assertRecordCount(1025, tableBasePath, sqlContext); // if filter dupes is 
not enabled, we should be expecting 3000 records here.
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
+    // Filter records with rider-001 value and deduct the number of inserts
+    long numUpdates = 
sparkSession.read().format("hudi").load(tableBasePath).filter("rider = 
'rider-001'").count()
+        - numInserts;
+    // There should be no updates since ordering value rider-001 is lower than 
existing record ordering value rider-002
+    assertEquals(0, numUpdates);
+
+    // Generate new updates with lower recordInstantTime so that updates are 
rejected

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to