xushiyan commented on code in PR #5627:
URL: https://github.com/apache/hudi/pull/5627#discussion_r906919277
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -131,25 +131,35 @@ public String getFieldName() {
*/
private HoodieOperation operation;
+ /**
+ * For purposes of preCombining.
+ */
+ private Comparable orderingVal;
+
public HoodieRecord(HoodieKey key, T data) {
- this(key, data, null);
+ this(key, data, null, null);
}
- public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) {
+ public HoodieRecord(HoodieKey key, T data, Comparable orderingVal) {
+ this(key, data, null, orderingVal);
+ }
+
+ public HoodieRecord(HoodieKey key, T data, HoodieOperation operation,
Comparable orderingVal) {
Review Comment:
use `Comparable<?>` to avoid ide warning?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java:
##########
@@ -81,6 +83,27 @@ public static <T extends HoodieRecordPayload> T
loadPayload(String recordPayload
}
}
+ /**
+ * Instantiate a given class with a record merge.
+ */
+ public static HoodieMerge loadHoodieMerge(String mergeClass) {
Review Comment:
we should actually keep this and `loadPayload` method out of
`ReflectionUtils`, which is only meant for generic reflection helpers. suggest
move them to a HoodieRecord specific util class.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -169,15 +179,17 @@ public HoodieOperation getOperation() {
return operation;
}
+ public Comparable getOrderingValue() {
Review Comment:
ditto
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -78,6 +79,8 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
// Stores the total time taken to perform reading and merging of log blocks
private long totalTimeTakenToReadAndMergeBlocks;
+ private HoodieMerge hoodieMerge;
Review Comment:
can this be final?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -303,7 +305,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
// NOTE: We have to pass in Avro Schema used to read from Delta Log file
since we invoke combining API
// on the record from the Delta Log
- toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord,
logFileReaderAvroSchema, payloadProps))
+ if (hoodieMerge.combineAndGetUpdateValue(new
HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema,
payloadProps).isPresent) {
+ toScalaOption(hoodieMerge.combineAndGetUpdateValue(new
HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema,
payloadProps)
Review Comment:
can we avoid doing it `combineAndGetUpdateValue()` twice here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]