xushiyan commented on code in PR #5627:
URL: https://github.com/apache/hudi/pull/5627#discussion_r901041090


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -81,9 +83,10 @@ public I combineOnCondition(
    */
   public I deduplicateRecords(
       I records, HoodieTable<T, I, K, O> table, int parallelism) {
-    return deduplicateRecords(records, table.getIndex(), parallelism);
+    HoodieMerge hoodieMerge = 
ReflectionUtils.loadHoodieMerge(table.getConfig().getMergeClass());

Review Comment:
   Ditto



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -103,6 +105,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, 
String instantTime, String
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
     schemaOnReadEnabled = 
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+    hoodieMerge = ReflectionUtils.loadHoodieMerge(config.getMergeClass());

Review Comment:
   Reflection impacts performance greatly. Can we optimize the instantiation 
here?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java:
##########
@@ -49,19 +50,17 @@ protected HoodieData<HoodieRecord<T>> 
tag(HoodieData<HoodieRecord<T>> dedupedRec
 
   @Override
   public HoodieData<HoodieRecord<T>> deduplicateRecords(
-      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism) {
+      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism, HoodieMerge hoodieMerge) {
     boolean isIndexingGlobal = index.isGlobal();
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
-      // If index used is global, then records are expected to differ in their 
partitionPath
+      // If index used is global,x then records are expected to differ in 
their partitionPath

Review Comment:
   Typo?



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java:
##########
@@ -87,20 +88,21 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> 
dedupedRecords, Hoodie
 
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) 
{
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, 
HoodieMerge hoodieMerge) {
     // If index used is global, then records are expected to differ in their 
partitionPath
     Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
         .collect(Collectors.groupingBy(record -> 
record.getKey().getRecordKey()));
 
     return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, 
rec2) -> {
-      @SuppressWarnings("unchecked") final HoodieRecord reducedRec = 
rec2.preCombine(rec1);
+      @SuppressWarnings("unchecked")
+      final HoodieRecord reducedData =  hoodieMerge.preCombine(rec1, rec2);
       // we cannot allow the user to change the key or partitionPath, since 
that will affect
       // everything
       // so pick it from one of the records.
-      boolean choosePrev = rec1 == reducedRec;
+      boolean choosePrev = rec1 == reducedData;

Review Comment:
   reducedRec sounds more precise. Why rename?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -154,6 +155,12 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Payload class to use for performing compactions, i.e 
merge delta logs with current base file and then "
           + " produce a new base file.");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty

Review Comment:
   This duplicates the other config?
   



##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java:
##########
@@ -65,11 +67,12 @@ public List<HoodieRecord<T>> deduplicateRecords(
 
     return keyedRecords.values().stream().map(x -> 
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
       @SuppressWarnings("unchecked")
-      HoodieRecord reducedRec = rec2.preCombine(rec1);
+      HoodieRecord reducedRecord =  hoodieMerge.preCombine(rec1,rec2);
+
       // we cannot allow the user to change the key or partitionPath, since 
that will affect
       // everything
       // so pick it from one of the records.
-      return (HoodieRecord<T>) reducedRec.newInstance(rec1.getKey());
+      return (HoodieRecord<T>) new HoodieAvroRecord(reducedRecord);

Review Comment:
   Why not using the newInstance API?



##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -181,6 +181,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
           case b: ByteBuffer =>
             val bytes = new Array[Byte](b.remaining)
             b.get(bytes)
+            // Do not forget to reset the position
+            b.rewind()

Review Comment:
   An existing bug? Maybe file a separate jira and land in master first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to