lokeshj1703 commented on code in PR #13526:
URL: https://github.com/apache/hudi/pull/13526#discussion_r2221841788
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -185,6 +189,49 @@ public void
testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode,
}
}
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "parquet"})
+ public void testReadFileGroupWithMultipleOrderingFields(String
logDataBlockFormat) throws Exception {
+ RecordMergeMode recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(recordMergeMode, true));
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
+ writeConfigs.put("hoodie.datasource.write.table.type",
HoodieTableType.MERGE_ON_READ.name());
+ // Use two precombine values - combination of timestamp and rider
+ String orderingValues = "timestamp,rider";
+ writeConfigs.put("hoodie.datasource.write.precombine.field",
orderingValues);
+ writeConfigs.put("hoodie.payload.ordering.field", orderingValues);
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
+ // Initial commit. rider column gets value of rider-002
+ List<HoodieRecord> initialRecords = dataGen.generateInserts("002", 100);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
+ validateOutputFromFileGroupReader(
+ getStorageConf(), getBasePath(), true, 0, recordMergeMode,
+ initialRecords, initialRecords);
+
+ // The updates have rider values as rider-001 and the existing records
have rider values as rider-001
Review Comment:
Addressed
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1955,6 +1957,62 @@ public void testFilterDupesWithPrecombine(
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType
tableType) throws Exception {
+ String tableBasePath = basePath + "/test_with_multiple_ordering_fields";
+ HoodieDeltaStreamer.Config cfg =
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+ cfg.tableType = tableType.name();
+ cfg.filterDupes = true;
+ cfg.sourceOrderingFields = "timestamp,rider";
+ cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+ cfg.recordMergeStrategyId =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+ // Set record merge mode to event time ordering
+ cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name()));
+ //addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
Review Comment:
Addressed
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java:
##########
@@ -41,6 +41,7 @@ public class TestDataSource extends AbstractBaseTestSource {
private static final Logger LOG =
LoggerFactory.getLogger(TestDataSource.class);
public static boolean returnEmptyBatch = false;
+ public static Option<String> recordInstantTime = Option.empty();
Review Comment:
Addressed
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1955,6 +1957,62 @@ public void testFilterDupesWithPrecombine(
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType
tableType) throws Exception {
+ String tableBasePath = basePath + "/test_with_multiple_ordering_fields";
+ HoodieDeltaStreamer.Config cfg =
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+ cfg.tableType = tableType.name();
+ cfg.filterDupes = true;
+ cfg.sourceOrderingFields = "timestamp,rider";
+ cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+ cfg.recordMergeStrategyId =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+ // Set record merge mode to event time ordering
+ cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name()));
+ //addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
+ TestDataSource.recordInstantTime = Option.of("002");
+ new HoodieStreamer(cfg, jsc).sync();
+
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
+
+ // Generate new updates with lower recordInstantTime so that updates are
rejected
+ TestDataSource.recordInstantTime = Option.of("001");
+ runStreamSync(cfg, false, 50, WriteOperationType.UPSERT);
+ int numInserts = 25;
+ // TestDataSource generates 500 inserts, 450 updates and 50 deletes
+ assertRecordCount(1025, tableBasePath, sqlContext); // if filter dupes is
not enabled, we should be expecting 3000 records here.
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
+ // Filter records with rider-001 value and deduct the number of inserts
+ long numUpdates =
sparkSession.read().format("hudi").load(tableBasePath).filter("rider =
'rider-001'").count()
+ - numInserts;
+ // There should be no updates since ordering value rider-001 is lower than
existing record ordering value rider-002
+ assertEquals(0, numUpdates);
+
+ // Generate new updates with lower recordInstantTime so that updates are
rejected
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]