the-other-tim-brown commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2271597279


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,17 +99,58 @@ public BufferedRecord<T> processUpdate(String recordKey, 
BufferedRecord<T> previ
         }
         return null;
       } else {
-        T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
-        T mergedRow = mergedRecord.getRecord();
-        if (prevRow != null && prevRow != mergedRow) {
-          mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
-          readStats.incrementNumUpdates();
-        } else if (prevRow == null) {
-          mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
-          readStats.incrementNumInserts();
+        return handleNonDeletes(previousRecord, mergedRecord);
+      }
+    }
+
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+      T mergedRow = mergedRecord.getRecord();
+      if (prevRow != null && prevRow != mergedRow) {
+        mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+        readStats.incrementNumUpdates();
+      } else if (prevRow == null) {
+        mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+        readStats.incrementNumInserts();
+      }
+      return mergedRecord.seal(readerContext);
+    }
+  }
+
+  class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+    private final String payloadClass;
+    private final Properties properties;
+
+    public PayloadUpdateProcessor(HoodieReadStats readStats, 
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+                                  Properties properties, String payloadClass) {
+      super(readStats, readerContext, emitDeletes);
+      this.payloadClass = payloadClass;
+      this.properties = properties;
+    }
+
+    @Override
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      if (previousRecord == null) {
+        // special case for payloads when there is no previous record
+        Schema recordSchema = 
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
+        GenericRecord record = 
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), 
recordSchema);
+        HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
+        try {
+          if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+            return null;
+          } else {
+            Schema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();
+            // If the record schema is different from the reader schema, 
rewrite the record using the payload methods to ensure consistency with legacy 
writer paths
+            if (!readerSchema.equals(recordSchema)) {

Review Comment:
   Checking the number of fields will not be enough to guarantee safety. This 
case is currently limited to the Payload based mergers where there is an update 
in the incoming records and there is no record in the base file for that key so 
it should not be very common.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to