danny0405 commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2265614815


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,17 +99,58 @@ public BufferedRecord<T> processUpdate(String recordKey, 
BufferedRecord<T> previ
         }
         return null;
       } else {
-        T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
-        T mergedRow = mergedRecord.getRecord();
-        if (prevRow != null && prevRow != mergedRow) {
-          mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
-          readStats.incrementNumUpdates();
-        } else if (prevRow == null) {
-          mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
-          readStats.incrementNumInserts();
+        return handleNonDeletes(previousRecord, mergedRecord);
+      }
+    }
+
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+      T mergedRow = mergedRecord.getRecord();
+      if (prevRow != null && prevRow != mergedRow) {
+        mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+        readStats.incrementNumUpdates();
+      } else if (prevRow == null) {
+        mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+        readStats.incrementNumInserts();
+      }
+      return mergedRecord.seal(readerContext);
+    }
+  }
+
+  class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+    private final String payloadClass;
+    private final Properties properties;
+
+    public PayloadUpdateProcessor(HoodieReadStats readStats, 
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+                                  Properties properties, String payloadClass) {
+      super(readStats, readerContext, emitDeletes);
+      this.payloadClass = payloadClass;
+      this.properties = properties;
+    }
+
+    @Override
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      if (previousRecord == null) {
+        // special case for payloads when there is no previous record
+        Schema recordSchema = 
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
+        GenericRecord record = 
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), 
recordSchema);
+        HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
+        try {
+          if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+            return null;
+          } else {
+            Schema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();
+            // If the record schema is different from the reader schema, 
rewrite the record using the payload methods to ensure consistency with legacy 
writer paths
+            if (!readerSchema.equals(recordSchema)) {

Review Comment:
   This could be super costly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to