the-other-tim-brown commented on code in PR #13532:
URL: https://github.com/apache/hudi/pull/13532#discussion_r2193671286
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -228,7 +223,8 @@ public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> new
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
- return Option.empty();
+ // An empty Option indicates that the output represents a delete.
Review Comment:
Empty option is treated as a delete in the
[finalMerge](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java#L257)
logic so updating this logic to match.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -287,9 +283,12 @@ public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> new
if (combinedRecordData != existingRecord.getRecord()) {
Pair<HoodieRecord, Schema> combinedRecordAndSchema =
combinedRecordAndSchemaOpt.get();
return
Option.of(BufferedRecord.forRecordWithContext(combinedRecordData,
combinedRecordAndSchema.getRight(), readerContext, orderingFieldName, false));
+ } else {
+ return Option.empty();
}
}
- return Option.empty();
+ // An empty Option indicates that the output represents a delete.
+ return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
newRecord.getOrderingValue(), null, null, true));
Review Comment:
Similarly here, this empty option is translated to a delete
[here](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java#L309)
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -429,8 +428,9 @@ private static <T> Option<DeleteRecord>
deltaMergeDeleteRecord(DeleteRecord dele
}
private static <T> boolean shouldKeepNewerRecord(BufferedRecord<T>
oldRecord, BufferedRecord<T> newRecord) {
- if (newRecord.isCommitTimeOrderingDelete()) {
- // handle records coming from DELETE statements(the orderingVal is
constant 0)
+ if (newRecord.isCommitTimeOrderingDelete() ||
oldRecord.isCommitTimeOrderingDelete()) {
Review Comment:
Previously if the oldRecord is a commitTimeOrderingDelete it will have a
value of 0 (int) and fail the comparison at line 436.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -113,16 +113,11 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord
deleteRecord, BufferedRecord
@Override
public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) {
- if (newerRecord.isCommitTimeOrderingDelete()) {
- return Pair.of(true, newerRecord.getRecord());
- }
- Comparable newOrderingValue = newerRecord.getOrderingValue();
- Comparable oldOrderingValue = olderRecord.getOrderingValue();
- if (!olderRecord.isCommitTimeOrderingDelete()
Review Comment:
This check is now added to `shouldKeepNewerRecord`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]