yihua commented on code in PR #13670:
URL: https://github.com/apache/hudi/pull/13670#discussion_r2251890881
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java:
##########
@@ -36,33 +39,40 @@
*/
public class InputSplit {
private final Option<HoodieBaseFile> baseFileOption;
+ private final Either<Stream<HoodieLogFile>, Iterator<BufferedRecord>>
recordsToMerge;
Review Comment:
Revert this to `Option<Iterator<BufferedRecord>>` if the decision is to use
keep both the record iterator and a list of log files as members?
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java:
##########
@@ -146,11 +153,50 @@ void readWithEventTimeOrderingAndDeleteBlock() throws
IOException {
assertEquals(2, readStats.getNumUpdates());
}
+ @Test
+ void readWithEventTimeOrderingWithRecords() throws IOException {
Review Comment:
Same for these tests too, better to have a separate test class for the
streaming record buffer loader?
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java:
##########
@@ -58,31 +70,92 @@ class TestSortedKeyBasedFileGroupRecordBuffer {
private final TestRecord testRecord6 = new TestRecord("6", 0);
private final TestRecord testRecord6Update = new TestRecord("6", 1);
+ private final IndexedRecord testIndexedRecord1 = createTestRecord("1", 1,
1L);
+ private final IndexedRecord testIndexedRecord2 = createTestRecord("2", 1,
1L);
+ private final IndexedRecord testIndexedRecord2Update = createTestRecord("2",
1, 2L);
+ private final IndexedRecord testIndexedRecord3 = createTestRecord("3", 1,
1L);
+ private final IndexedRecord testIndexedRecord4 = createTestRecord("4", 2,
2L);
+ private final IndexedRecord testIndexedRecord4LowerOrdering =
createTestRecord("4", 2, 1L);
+ private final IndexedRecord testIndexedRecord5 = createTestRecord("5", 1,
1L);
+ private final IndexedRecord testRecord5DeleteByCustomMarker =
createTestRecord("5", 3, 2L);
+ private final IndexedRecord testIndexedRecord6 = createTestRecord("6", 1,
5L);
+ private final IndexedRecord testIndexedRecord6Update = createTestRecord("6",
2, 10L);
+
@Test
void readBaseFileAndLogFile() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
HoodieReaderContext<TestRecord> mockReaderContext =
mock(HoodieReaderContext.class, RETURNS_DEEP_STUBS);
+
SortedKeyBasedFileGroupRecordBuffer<TestRecord> fileGroupRecordBuffer =
buildSortedKeyBasedFileGroupRecordBuffer(mockReaderContext, readStats);
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord2,
testRecord3, testRecord5).iterator()));
HoodieDataBlock dataBlock = mock(HoodieDataBlock.class);
when(dataBlock.getSchema()).thenReturn(HoodieTestDataGenerator.AVRO_SCHEMA);
-
when(dataBlock.getEngineRecordIterator(mockReaderContext)).thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord6,
testRecord4, testRecord1, testRecord6Update, testRecord2Update).iterator()));
+ when(dataBlock.getEngineRecordIterator(mockReaderContext)).thenReturn(
+ ClosableIterator.wrap(Arrays.asList(testRecord6, testRecord4,
testRecord1, testRecord6Update, testRecord2Update).iterator()));
HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
- when(deleteBlock.getRecordsToDelete()).thenReturn(new
DeleteRecord[]{DeleteRecord.create("3", "")});
+ when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[]
{DeleteRecord.create("3", "")});
fileGroupRecordBuffer.processDataBlock(dataBlock, Option.empty());
fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
-
List<TestRecord> actualRecords = getActualRecords(fileGroupRecordBuffer);
assertEquals(Arrays.asList(testRecord1, testRecord2Update, testRecord4,
testRecord5, testRecord6Update), actualRecords);
assertEquals(3, readStats.getNumInserts());
assertEquals(1, readStats.getNumUpdates());
assertEquals(1, readStats.getNumDeletes());
}
+ @Test
+ void readWithEventTimeOrderingWithRecords() throws IOException {
Review Comment:
```suggestion
void readWithStreamingRecordBufferLoaderAndEventTimeOrdering() throws
IOException {
```
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java:
##########
@@ -58,31 +70,92 @@ class TestSortedKeyBasedFileGroupRecordBuffer {
private final TestRecord testRecord6 = new TestRecord("6", 0);
private final TestRecord testRecord6Update = new TestRecord("6", 1);
+ private final IndexedRecord testIndexedRecord1 = createTestRecord("1", 1,
1L);
+ private final IndexedRecord testIndexedRecord2 = createTestRecord("2", 1,
1L);
+ private final IndexedRecord testIndexedRecord2Update = createTestRecord("2",
1, 2L);
+ private final IndexedRecord testIndexedRecord3 = createTestRecord("3", 1,
1L);
+ private final IndexedRecord testIndexedRecord4 = createTestRecord("4", 2,
2L);
+ private final IndexedRecord testIndexedRecord4LowerOrdering =
createTestRecord("4", 2, 1L);
+ private final IndexedRecord testIndexedRecord5 = createTestRecord("5", 1,
1L);
+ private final IndexedRecord testRecord5DeleteByCustomMarker =
createTestRecord("5", 3, 2L);
+ private final IndexedRecord testIndexedRecord6 = createTestRecord("6", 1,
5L);
+ private final IndexedRecord testIndexedRecord6Update = createTestRecord("6",
2, 10L);
+
@Test
void readBaseFileAndLogFile() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
HoodieReaderContext<TestRecord> mockReaderContext =
mock(HoodieReaderContext.class, RETURNS_DEEP_STUBS);
+
SortedKeyBasedFileGroupRecordBuffer<TestRecord> fileGroupRecordBuffer =
buildSortedKeyBasedFileGroupRecordBuffer(mockReaderContext, readStats);
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord2,
testRecord3, testRecord5).iterator()));
HoodieDataBlock dataBlock = mock(HoodieDataBlock.class);
when(dataBlock.getSchema()).thenReturn(HoodieTestDataGenerator.AVRO_SCHEMA);
-
when(dataBlock.getEngineRecordIterator(mockReaderContext)).thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord6,
testRecord4, testRecord1, testRecord6Update, testRecord2Update).iterator()));
+ when(dataBlock.getEngineRecordIterator(mockReaderContext)).thenReturn(
+ ClosableIterator.wrap(Arrays.asList(testRecord6, testRecord4,
testRecord1, testRecord6Update, testRecord2Update).iterator()));
HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
- when(deleteBlock.getRecordsToDelete()).thenReturn(new
DeleteRecord[]{DeleteRecord.create("3", "")});
+ when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[]
{DeleteRecord.create("3", "")});
fileGroupRecordBuffer.processDataBlock(dataBlock, Option.empty());
fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
-
List<TestRecord> actualRecords = getActualRecords(fileGroupRecordBuffer);
assertEquals(Arrays.asList(testRecord1, testRecord2Update, testRecord4,
testRecord5, testRecord6Update), actualRecords);
assertEquals(3, readStats.getNumInserts());
assertEquals(1, readStats.getNumUpdates());
assertEquals(1, readStats.getNumDeletes());
}
+ @Test
+ void readWithEventTimeOrderingWithRecords() throws IOException {
Review Comment:
Should this test be moved to a new class
`TestStreamingFileGroupRecordBufferLoader`, then the method can be name simply
as `readWithEventTimeOrdering`?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java:
##########
@@ -36,33 +39,40 @@
*/
public class InputSplit {
private final Option<HoodieBaseFile> baseFileOption;
+ private final Either<Stream<HoodieLogFile>, Iterator<BufferedRecord>>
recordsToMerge;
private final List<HoodieLogFile> logFiles;
+ private final Option<Iterator<BufferedRecord>> recordIterator;
private final String partitionPath;
// Byte offset to start reading from the base file
private final long start;
// Length of bytes to read from the base file
private final long length;
- InputSplit(Option<HoodieBaseFile> baseFileOption, Stream<HoodieLogFile>
logFiles, String partitionPath, long start, long length) {
+ InputSplit(Option<HoodieBaseFile> baseFileOption,
+ Either<Stream<HoodieLogFile>, Iterator<BufferedRecord>>
recordsToMerge,
+ String partitionPath, long start, long length) {
this.baseFileOption = baseFileOption;
- this.logFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator())
- .filter(logFile ->
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
- .collect(Collectors.toList());
+ this.recordsToMerge = recordsToMerge;
+ if (recordsToMerge.isLeft()) {
+ this.logFiles =
recordsToMerge.asLeft().sorted(HoodieLogFile.getLogFileComparator())
+ .filter(logFile ->
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ .collect(Collectors.toList());
+ this.recordIterator = Option.empty();
+ } else {
+ this.logFiles = Collections.emptyList();
+ this.recordIterator = Option.of(recordsToMerge.asRight());
+ }
this.partitionPath = partitionPath;
this.start = start;
this.length = length;
}
- static InputSplit fromFileSlice(FileSlice fileSlice, long start, long
length) {
- return new InputSplit(fileSlice.getBaseFile(), fileSlice.getLogFiles(),
fileSlice.getPartitionPath(),
- start, length);
- }
-
public Option<HoodieBaseFile> getBaseFileOption() {
return baseFileOption;
}
public List<HoodieLogFile> getLogFiles() {
+ ValidationUtils.checkArgument(recordIterator.isEmpty(),"Log files are not
initialized");
Review Comment:
```suggestion
ValidationUtils.checkArgument(recordIterator.isEmpty(), "Log files are
not initialized");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]