danny0405 commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2308914341


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -449,95 +383,36 @@ protected Option<Pair<HoodieRecord, Schema>> 
getMergedRecord(BufferedRecord<T> o
    * based on {@code CUSTOM} merge mode and a given record payload class.
    */
   private static class CustomPayloadRecordMerger<T> extends 
BaseCustomMerger<T> {
-    private final String[] orderingFieldNames;
     protected final String payloadClass;
 
     public CustomPayloadRecordMerger(
         RecordContext<T> recordContext,
         Option<HoodieRecordMerger> recordMerger,
-        List<String> orderingFieldNames,
         String payloadClass,
         Schema readerSchema,
         TypedProperties props) {
       super(recordContext, recordMerger, readerSchema, props);
-      this.orderingFieldNames = orderingFieldNames.toArray(new String[0]);
       this.payloadClass = payloadClass;
+      props.setProperty(HoodieAvroRecordMerger.PAYLOAD_CLASS_PROP, 
payloadClass);
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
-      // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
-      if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
-        return Option.empty();
-      }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      BufferedRecord<T> mergedRecord = getMergedRecord(existingRecord, 
newRecord, false);
       // If pre-combine does not return existing record, update it
-      if (combinedRecordData != existingRecord.getRecord()) {
-        // For pkless we need to use record key from existing record
-        return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
-            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+      if (mergedRecord.getRecord() != existingRecord.getRecord()) {
+        return Option.of(mergedRecord);
       }
       return Option.empty();
     }
 
     @Override
-    public BufferedRecord<T> mergeNonDeleteRecord(BufferedRecord<T> 
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(olderRecord, newerRecord, true);
-      if (mergedRecordAndSchema.isEmpty()) {
-        return BufferedRecords.createDelete(newerRecord.getRecordKey());
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
-      // Special handling for SENTINEL record in Expression Payload
-      if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
-        return olderRecord;
-      }
-      if (!mergedRecord.isDelete(mergeResultSchema, props)) {
-        IndexedRecord indexedRecord = (IndexedRecord) mergedRecord.getData();
-        return BufferedRecords.fromEngineRecord(
-            recordContext.convertAvroRecord(indexedRecord), mergeResultSchema, 
recordContext, orderingFieldNames, newerRecord.getRecordKey(), false);
-      }
-      return BufferedRecords.createDelete(newerRecord.getRecordKey());
+    public BufferedRecord<T> mergeRecords(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
+      return getMergedRecord(olderRecord, newerRecord, true);
     }
 
-    protected Pair<HoodieRecord, HoodieRecord> 
getDeltaMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T> 
newerRecord) {
-      HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(recordContext, 
olderRecord, payloadClass);
-      HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext, 
newerRecord, payloadClass);
-      return Pair.of(oldHoodieRecord, newHoodieRecord);
-    }
-
-    protected Pair<HoodieRecord, HoodieRecord> 
getFinalMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T> 
newerRecord) {
-      return getDeltaMergeRecords(olderRecord, newerRecord);
-    }
-
-    protected Option<Pair<HoodieRecord, Schema>> 
getMergedRecord(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord, 
boolean isFinalMerge) throws IOException {
-      Pair<HoodieRecord, HoodieRecord> records = isFinalMerge ? 
getFinalMergeRecords(olderRecord, newerRecord) : 
getDeltaMergeRecords(olderRecord, newerRecord);
-      return recordMerger.merge(records.getLeft(), 
getSchemaForAvroPayloadMerge(olderRecord), records.getRight(), 
getSchemaForAvroPayloadMerge(newerRecord), props);
-    }
-
-    protected HoodieRecord constructHoodieAvroRecord(RecordContext<T> 
recordContext, BufferedRecord<T> bufferedRecord, String payloadClass) {
-      GenericRecord record = null;
-      if (!bufferedRecord.isDelete()) {
-        Schema recordSchema = 
recordContext.getSchemaFromBufferRecord(bufferedRecord);
-        record = recordContext.convertToAvroRecord(bufferedRecord.getRecord(), 
recordSchema);
-      }
-      HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
-      return new HoodieAvroRecord<>(hoodieKey,
-          HoodieRecordUtils.loadPayload(payloadClass, record, 
bufferedRecord.getOrderingValue()), null);
-    }
-
-    protected Schema getSchemaForAvroPayloadMerge(BufferedRecord<T> 
bufferedRecord) {
-      if (bufferedRecord.getSchemaId() == null) {
-        return readerSchema;
-      }
-      return recordContext.getSchemaFromBufferRecord(bufferedRecord);
+    protected BufferedRecord<T> getMergedRecord(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord, boolean isFinalMerge) throws IOException {

Review Comment:
   if the `isFinalMerge` is useless now, we can just get rid of this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to