stayrascal commented on a change in pull request #4724:
URL: https://github.com/apache/hudi/pull/4724#discussion_r801702428



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -105,7 +105,7 @@ public static FlinkWriteHelper newInstance() {
       // we cannot allow the user to change the key or partitionPath, since 
that will affect
       // everything
       // so pick it from one of the records.
-      boolean choosePrev = data1.equals(reducedData);
+      boolean choosePrev = data2.compareTo(data1) < 0;
       HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
       HoodieOperation operation = choosePrev ? rec1.getOperation() : 
rec2.getOperation();

Review comment:
       The previous logic of `data2.preCombine(data1)` is that return one of 
data1 or data2 ordering by their `orderVal`. But if we merge/combine data1 and 
data2 into a new payload(reduceData), the `data1.equals(reduceData)` is always 
false. In order to get the `HoodieKey` and `HoodieOperation` for new  
HoodieRecord with `reduceData`, we need to get the latest `HoodieKey` and 
`HoodieOperation` from `data1` and `data2`, `compareTo` is used for replace 
`#preCombine` to compare their `orderingVal`.
   
   ```
    @Override
     public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
       return orderingVal.compareTo(oldValue.orderingVal);
     }
   ```
   
   ```
   @Test
     public void testCompareFunction() {
       GenericRecord record = new GenericData.Record(schema);
       record.put("id", "1");
       record.put("partition", "partition1");
       record.put("ts", 0L);
       record.put("_hoodie_is_deleted", false);
       record.put("city", "NY0");
       record.put("child", Arrays.asList("A"));
   
       PartialOverwriteWithLatestAvroPayload payload1 = new 
PartialOverwriteWithLatestAvroPayload(record, 1);
       PartialOverwriteWithLatestAvroPayload payload2 = new 
PartialOverwriteWithLatestAvroPayload(record, 2);
   
       assertEquals(payload1.compareTo(payload2), -1);
       assertEquals(payload2.compareTo(payload1), 1);
       assertEquals(payload1.compareTo(payload1), 0);
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to