the-other-tim-brown commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2273299236
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,16 +106,79 @@ public BufferedRecord<T> processUpdate(String recordKey,
BufferedRecord<T> previ
}
return null;
} else {
- T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
- T mergedRow = mergedRecord.getRecord();
- if (prevRow != null && prevRow != mergedRow) {
- mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
- readStats.incrementNumUpdates();
- } else if (prevRow == null) {
- mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
- readStats.incrementNumInserts();
+ return handleNonDeletes(previousRecord, mergedRecord);
+ }
+ }
+
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+ T mergedRow = mergedRecord.getRecord();
+ if (prevRow != null && prevRow != mergedRow) {
+ mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+ readStats.incrementNumUpdates();
+ } else if (prevRow == null) {
+ mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+ readStats.incrementNumInserts();
+ }
+ return mergedRecord.seal(readerContext);
+ }
+ }
+
+ class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+ private final String payloadClass;
+ private final Properties properties;
+
+ public PayloadUpdateProcessor(HoodieReadStats readStats,
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+ Properties properties, String payloadClass) {
+ super(readStats, readerContext, emitDeletes);
+ this.payloadClass = payloadClass;
+ this.properties = properties;
+ }
+
+ @Override
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ if (previousRecord == null) {
+ // special case for payloads when there is no previous record
+ Schema recordSchema =
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
+ GenericRecord record =
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(),
recordSchema);
+ HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
+ try {
+ if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+ return null;
+ } else {
+ Schema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
+ // If the record schema is different from the reader schema,
rewrite the record using the payload methods to ensure consistency with legacy
writer paths
+ hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties,
readerSchema).toIndexedRecord(readerSchema, properties)
+ .ifPresent(rewrittenRecord ->
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error processing record with payload
class: " + payloadClass, e);
+ }
+ }
+ return super.handleNonDeletes(previousRecord, mergedRecord);
+ }
+ }
+
+ class CustomMergerUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+ private final HoodieRecordMerger merger;
+ private final TypedProperties properties;
+
+ CustomMergerUpdateProcessor(HoodieReadStats readStats,
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+ TypedProperties properties) {
+ super(readStats, readerContext, emitDeletes);
+ this.merger = readerContext.getRecordMerger().get();
+ this.properties = properties;
+ }
+
+ @Override
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ try {
+ if
(merger.shouldFlush(readerContext.getRecordContext().constructHoodieRecord(mergedRecord),
readerContext.getRecordContext().getSchemaFromBufferRecord(mergedRecord),
properties)) {
Review Comment:
What should we do about the failing tests? Override the write handle to use
the old class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]