yihua commented on code in PR #13532:
URL: https://github.com/apache/hudi/pull/13532#discussion_r2207653575
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -113,16 +113,11 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord
deleteRecord, BufferedRecord
@Override
public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) {
- if (newerRecord.isCommitTimeOrderingDelete()) {
- return Pair.of(true, newerRecord.getRecord());
- }
- Comparable newOrderingValue = newerRecord.getOrderingValue();
- Comparable oldOrderingValue = olderRecord.getOrderingValue();
- if (!olderRecord.isCommitTimeOrderingDelete()
- && oldOrderingValue.compareTo(newOrderingValue) > 0) {
+ if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
+ return Pair.of(newerRecord.isDelete(), newerRecord.getRecord());
+ } else {
Review Comment:
nit: could directly `return Pair.of(olderRecord.isDelete(),
olderRecord.getRecord());` outside if branch for brevity.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -113,16 +113,11 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord
deleteRecord, BufferedRecord
@Override
public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) {
Review Comment:
I suggest renaming these APIs in a separate PR for clarity:
- `deltaMerge` -> `mergeRecordsFromLogFiles`
- `finalMerge` -> `mergeRecordsFromBaseAndLogFiles`
@cshuo
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -113,16 +113,11 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord
deleteRecord, BufferedRecord
@Override
public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) {
Review Comment:
Also consider returning `BufferedRecord<T>` directly and let the caller
extract the record upon necessity.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -228,7 +223,8 @@ public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> new
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
- return Option.empty();
+ // An empty Option indicates that the output represents a delete.
+ return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
newRecord.getOrderingValue(), null, null, true));
Review Comment:
When the user indicate a delete through `Option.empty()`, should
`DEFAULT_ORDERING_VALUE` be used (as if no ordering value is provided in
`Option.empty()`)? This is to guarantee that the delete is a commit-time-based
delete, and if later an insert/update happens after this delete, the
insert/update should be treated as insert without considering the "ordering
value" of the delete indicated by the `Option.empty()`. We can make this
assumption as the semantics. If user does want to provide deletes with
ordering value, the delete record should be used, which provides the ability to
contain the ordering value, different from the semantics of `Option.empty`, so
that there is no overlap between these two types of deletes from the custom
merger.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]