nsivabalan commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2299386989
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -63,8 +63,15 @@ public interface HoodieRecordMerger extends Serializable {
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we
can translate as having 3 versions A, B, C
* of the single record, both orders of operations applications have to
yield the same result)
* This method takes only full records for merging.
+ *
+ * @param older Older record in terms of commit time ordering.
+ * @param oldSchema The schema of the older record.
+ * @param newer Newer record in terms of commit time ordering.
+ * @param newSchema The schema of the newer record.
+ * @param props The additional properties for the merging operation.
+ * @return The merged record and schema. The record is expected to be
non-null. If the record represents a deletion, the operation must be set as
{@link HoodieOperation#DELETE}.
Review Comment:
thanks for the java docs. on similar lines, can we enhance java docs for
BufferedRecordMerger apis as well. esply when is Option.empty expected to be
returned.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java:
##########
@@ -236,19 +236,19 @@ public boolean isEmptyNewRecords() {
return keyToNewRecords.isEmpty();
}
- protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema
writerSchema) throws IOException {
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, HoodieRecord combineRecord, Schema writerSchema)
throws IOException {
boolean isDelete = false;
- if (combineRecordOpt.isPresent()) {
- if (oldRecord.getData() != combineRecordOpt.get().getData()) {
- // the incoming record is chosen
- isDelete = isDeleteRecord(newRecord);
- } else {
- // the incoming record is dropped
- return false;
+ if (oldRecord.getData() != combineRecord.getData()) {
+ // the incoming record is chosen
+ isDelete = isDeleteRecord(combineRecord);
+ if (!isDelete) {
+ updatedRecordsWritten++;
}
- updatedRecordsWritten++;
+ } else {
+ // the incoming record is dropped
Review Comment:
can you help me understand something in this context.
say we have rk1_v1 and we have new incoming rg1_v2.
and v2 has lower ordering value.
So, L 241 is false (bcoz, old matches the combined record)
and don't we need to write the old record to the new file? how come we are
returning right away in L 249?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -43,9 +44,35 @@ public String getMergingStrategy() {
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- return combineAndGetUpdateValue(older, newer, oldSchema, newSchema, props)
- .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
+ public Pair<HoodieRecord, Schema> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
Review Comment:
can you confirm we added UTs for this change?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -161,8 +160,7 @@ protected static <T> HoodieRecord<T>
reduceRecords(TypedProperties props, Buffer
HoodieRecord<T> reducedRecord = merged.map(bufferedRecord ->
recordContext.constructHoodieRecord(bufferedRecord,
next.getPartitionPath())).orElse(previous);
boolean choosePrevious = merged.isEmpty();
HoodieKey reducedKey = choosePrevious ? previous.getKey() :
next.getKey();
- HoodieOperation operation = choosePrevious ? previous.getOperation() :
next.getOperation();
- return reducedRecord.newInstance(reducedKey, operation);
+ return reducedRecord.newInstance(reducedKey);
Review Comment:
are we not interested in operation for deduping use-case and hence we are
not carrying the operation over to the reducedRecord?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java:
##########
@@ -236,19 +236,19 @@ public boolean isEmptyNewRecords() {
return keyToNewRecords.isEmpty();
}
- protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema
writerSchema) throws IOException {
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, HoodieRecord combineRecord, Schema writerSchema)
throws IOException {
boolean isDelete = false;
- if (combineRecordOpt.isPresent()) {
- if (oldRecord.getData() != combineRecordOpt.get().getData()) {
- // the incoming record is chosen
- isDelete = isDeleteRecord(newRecord);
- } else {
- // the incoming record is dropped
- return false;
+ if (oldRecord.getData() != combineRecord.getData()) {
+ // the incoming record is chosen
+ isDelete = isDeleteRecord(combineRecord);
+ if (!isDelete) {
+ updatedRecordsWritten++;
}
- updatedRecordsWritten++;
+ } else {
+ // the incoming record is dropped
Review Comment:
ok, I see the caller is handing the copying of old record.
Is there any different handling we do at the caller end?
why not handle writing both combined or old record here at L 251 only?
may be we have some valid reason, just trying to understand the reasoning
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -338,16 +334,15 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord
deleteRecord, BufferedRecord
public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial updates
- Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().partialMerge(
+ Pair<HoodieRecord, Schema> mergedRecord =
recordMerger.get().partialMerge(
recordContext.constructHoodieRecord(olderRecord),
recordContext.getSchemaFromBufferRecord(olderRecord),
recordContext.constructHoodieRecord(newerRecord),
recordContext.getSchemaFromBufferRecord(newerRecord),
readerSchema, props);
- if (mergedRecord.isPresent()
- &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
- HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
- if (!mergedRecord.get().getRight().equals(readerSchema)) {
- hoodieRecord =
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema);
+ if (!mergedRecord.getLeft().isDelete(mergedRecord.getRight(), props)) {
+ HoodieRecord hoodieRecord = mergedRecord.getLeft();
+ if (!mergedRecord.getRight().equals(readerSchema)) {
+ hoodieRecord =
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.getRight(), null,
readerSchema);
Review Comment:
I remember there was a discussion around retaining ordering values for
delete records as well.
I assume we are not fixing them in this patch right?
for eg, L 349 does not carry over the ordering value. so, just wanted to
confirm my understanding.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
}
@Override
- public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
- if (mergedRecordAndSchema.isEmpty()) {
- // An empty Option indicates that the output represents a delete.
- return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
- }
- HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
- Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T>
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+ Pair<HoodieRecord, Schema> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
+ HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.getRight();
// Special handling for SENTINEL record in Expression Payload. This is
returned if the condition does not match.
if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
return Option.empty();
}
- T combinedRecordData =
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema,
props).get().getData());
+ Option<T> combinedRecordData =
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord ->
recordContext.convertAvroRecord(indexedRecord.getData())));
Review Comment:
minor: now that we have a separate merger for ExpressionPayload, do we need
L 459 to 461?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -136,6 +136,7 @@ default Option<Pair<HoodieRecord, Schema>>
partialMerge(HoodieRecord older, Sche
*
* <p> This interface is experimental and might be evolved in the future.
**/
+ @Deprecated
Review Comment:
I vote to clean it up
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -338,16 +334,15 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord
deleteRecord, BufferedRecord
public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial updates
- Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().partialMerge(
+ Pair<HoodieRecord, Schema> mergedRecord =
recordMerger.get().partialMerge(
Review Comment:
minor: Is it an intentional choice that we name "combinedRecord" above, and
"mergedRecord" here.
why not use the same name everywhere.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]