the-other-tim-brown commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2249371306


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -414,27 +420,34 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithEx
       Schema existingSchema,
       Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      BaseKeyGenerator keyGenerator) throws IOException {
-    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
-        incoming, writeSchemaWithMetaFields, config.getProps());
-    if (!mergeResult.isPresent()) {
-      //the record was deleted
-      return Option.empty();
+      BufferedRecordMerger<R> recordMerger,
+      BaseKeyGenerator keyGenerator,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
+    Option<BufferedRecord<R>> mergeResult = merge(
+        incoming, existing, writeSchemaWithMetaFields, existingSchema, 
incomingRecordContext, orderingFieldNames, recordMerger, config.getProps());
+    HoodieRecord<R> result;
+    if (mergeResult.isPresent()) {
+      if (mergeResult.get().isDelete()) {
+        //the record was deleted
+        return Option.empty();
+      }
+      result = existingRecordContext.constructHoodieRecord(mergeResult.get());
+    } else {
+      result = existing;
     }
-    HoodieRecord<R> result = mergeResult.get().getLeft();
     if (result.getData().equals(HoodieRecord.SENTINEL)) {
       //the record did not match and merge case and should not be modified
       return Option.of(result);
     }
 
     //record is inserted or updated
-    String partitionPath = keyGenerator.getPartitionPath((GenericRecord) 
result.getData());
+    String partitionPath = 
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.get().getRecord(),
 writeSchemaWithMetaFields));

Review Comment:
   In the future if use Spark or Flink records on the write path, we would be 
using the schema to translate the row to avro so we can leverage this existing 
keyGenerator code that works on avros.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to