yihua commented on code in PR #13670:
URL: https://github.com/apache/hudi/pull/13670#discussion_r2250418873
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java:
##########
@@ -146,11 +151,54 @@ void readWithEventTimeOrderingAndDeleteBlock() throws
IOException {
assertEquals(2, readStats.getNumUpdates());
}
+ @Test
+ void readWithEventTimeOrderingWithRecords() throws IOException {
Review Comment:
Could these new tests be revised to directly construct
`StreamingFileGroupRecordBufferLoader` and call
`StreamingFileGroupRecordBufferLoader#getRecordBuffer` for testing instead of
mimicking the behavior of calling
`HoodieFileGroupRecordBuffer#processNextDataRecord`?
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java:
##########
@@ -146,11 +151,54 @@ void readWithEventTimeOrderingAndDeleteBlock() throws
IOException {
assertEquals(2, readStats.getNumUpdates());
}
+ @Test
+ void readWithEventTimeOrderingWithRecords() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ Map<Serializable, BufferedRecord> inputRecords =
convertToBufferedRecordsMap(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate), readerContext, properties, new
String[]{"ts"});
+
inputRecords.putAll(convertToBufferedRecordsMapForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), false));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+ RecordMergeMode.EVENT_TIME_ORDERING, Collections.singletonList("ts"),
properties);
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ inputRecords.entrySet().forEach(kv -> {
+ try {
+ fileGroupRecordBuffer.processNextDataRecord(kv.getValue(),
kv.getKey());
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to process next data ", e);
+ }
+ });
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ // update for 4 is ignored due to lower ordering value.
+ // record5 is deleted.
+ // delete for 6 is ignored due to lower ordering value.
+ assertEquals(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4, testRecord6), actualRecords);
+ assertEquals(0, readStats.getNumInserts());
Review Comment:
Add a case where `readStats.getNumInserts()` is positive in
`TestKeyBasedFileGroupRecordBuffer`, e.g., by changing this test?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]