danny0405 commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2271943944
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,16 +106,79 @@ public BufferedRecord<T> processUpdate(String recordKey,
BufferedRecord<T> previ
}
return null;
} else {
- T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
- T mergedRow = mergedRecord.getRecord();
- if (prevRow != null && prevRow != mergedRow) {
- mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
- readStats.incrementNumUpdates();
- } else if (prevRow == null) {
- mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
- readStats.incrementNumInserts();
+ return handleNonDeletes(previousRecord, mergedRecord);
+ }
+ }
+
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+ T mergedRow = mergedRecord.getRecord();
+ if (prevRow != null && prevRow != mergedRow) {
+ mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+ readStats.incrementNumUpdates();
+ } else if (prevRow == null) {
+ mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+ readStats.incrementNumInserts();
+ }
+ return mergedRecord.seal(readerContext);
+ }
+ }
+
+ class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+ private final String payloadClass;
+ private final Properties properties;
+
+ public PayloadUpdateProcessor(HoodieReadStats readStats,
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+ Properties properties, String payloadClass) {
+ super(readStats, readerContext, emitDeletes);
+ this.payloadClass = payloadClass;
+ this.properties = properties;
+ }
+
+ @Override
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ if (previousRecord == null) {
+ // special case for payloads when there is no previous record
+ Schema recordSchema =
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
+ GenericRecord record =
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(),
recordSchema);
+ HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
+ try {
+ if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+ return null;
+ } else {
+ Schema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
+ // If the record schema is different from the reader schema,
rewrite the record using the payload methods to ensure consistency with legacy
writer paths
+ hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties,
readerSchema).toIndexedRecord(readerSchema, properties)
+ .ifPresent(rewrittenRecord ->
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error processing record with payload
class: " + payloadClass, e);
+ }
+ }
+ return super.handleNonDeletes(previousRecord, mergedRecord);
+ }
+ }
+
+ class CustomMergerUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+ private final HoodieRecordMerger merger;
+ private final TypedProperties properties;
+
+ CustomMergerUpdateProcessor(HoodieReadStats readStats,
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+ TypedProperties properties) {
+ super(readStats, readerContext, emitDeletes);
+ this.merger = readerContext.getRecordMerger().get();
+ this.properties = properties;
+ }
+
+ @Override
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ try {
+ if
(merger.shouldFlush(readerContext.getRecordContext().constructHoodieRecord(mergedRecord),
readerContext.getRecordContext().getSchemaFromBufferRecord(mergedRecord),
properties)) {
Review Comment:
we have discussed to drop the `shouldFlush` functionality as of now, it is
not in good design and the fix in just log and base merging is not complete,
let's just drop this first and fire a JIRA to trace it first.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,16 +106,79 @@ public BufferedRecord<T> processUpdate(String recordKey,
BufferedRecord<T> previ
}
return null;
} else {
- T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
- T mergedRow = mergedRecord.getRecord();
- if (prevRow != null && prevRow != mergedRow) {
- mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
- readStats.incrementNumUpdates();
- } else if (prevRow == null) {
- mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
- readStats.incrementNumInserts();
+ return handleNonDeletes(previousRecord, mergedRecord);
+ }
+ }
+
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+ T mergedRow = mergedRecord.getRecord();
+ if (prevRow != null && prevRow != mergedRow) {
+ mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+ readStats.incrementNumUpdates();
+ } else if (prevRow == null) {
+ mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+ readStats.incrementNumInserts();
+ }
+ return mergedRecord.seal(readerContext);
+ }
+ }
+
+ class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+ private final String payloadClass;
+ private final Properties properties;
+
+ public PayloadUpdateProcessor(HoodieReadStats readStats,
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+ Properties properties, String payloadClass) {
+ super(readStats, readerContext, emitDeletes);
+ this.payloadClass = payloadClass;
+ this.properties = properties;
+ }
+
+ @Override
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ if (previousRecord == null) {
+ // special case for payloads when there is no previous record
+ Schema recordSchema =
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
+ GenericRecord record =
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(),
recordSchema);
+ HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
+ try {
+ if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+ return null;
+ } else {
+ Schema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
+ // If the record schema is different from the reader schema,
rewrite the record using the payload methods to ensure consistency with legacy
writer paths
+ hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties,
readerSchema).toIndexedRecord(readerSchema, properties)
+ .ifPresent(rewrittenRecord ->
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error processing record with payload
class: " + payloadClass, e);
+ }
+ }
+ return super.handleNonDeletes(previousRecord, mergedRecord);
+ }
+ }
+
+ class CustomMergerUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+ private final HoodieRecordMerger merger;
+ private final TypedProperties properties;
+
+ CustomMergerUpdateProcessor(HoodieReadStats readStats,
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+ TypedProperties properties) {
+ super(readStats, readerContext, emitDeletes);
+ this.merger = readerContext.getRecordMerger().get();
+ this.properties = properties;
+ }
+
+ @Override
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ try {
+ if
(merger.shouldFlush(readerContext.getRecordContext().constructHoodieRecord(mergedRecord),
readerContext.getRecordContext().getSchemaFromBufferRecord(mergedRecord),
properties)) {
Review Comment:
we have discussed to drop the `shouldFlush` functionality as of now, it is
not in good design and the fix in just log and base merging is not complete,
let's just drop this first and fire a JIRA to trace it instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]