nsivabalan commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2249160326


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,77 @@ public I combineOnCondition(
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
+    HoodieReaderContext<T> readerContext =
+        (HoodieReaderContext<T>) 
table.getContext().<T>getReaderContextFactoryDuringWrite(table.getMetaClient(), 
table.getConfig().getRecordMerger().getRecordType(), 
table.getConfig().getProps())

Review Comment:
   minor. 
   how about `getReaderContextFactoryForWrites` ?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java:
##########
@@ -84,4 +85,12 @@ public Comparable<?> getOrderingValue() {
   public byte[] getRecordBytes() {
     return recordBytes;
   }
+
+  @Override
+  public Option<IndexedRecord> getIndexedRecord(Schema schema, Properties 
properties) throws IOException {

Review Comment:
   should we move this to BaseAvroPayload 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -414,27 +420,34 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithEx
       Schema existingSchema,
       Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      BaseKeyGenerator keyGenerator) throws IOException {
-    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
-        incoming, writeSchemaWithMetaFields, config.getProps());
-    if (!mergeResult.isPresent()) {
-      //the record was deleted
-      return Option.empty();
+      BufferedRecordMerger<R> recordMerger,
+      BaseKeyGenerator keyGenerator,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
+    Option<BufferedRecord<R>> mergeResult = merge(
+        incoming, existing, writeSchemaWithMetaFields, existingSchema, 
incomingRecordContext, orderingFieldNames, recordMerger, config.getProps());
+    HoodieRecord<R> result;
+    if (mergeResult.isPresent()) {
+      if (mergeResult.get().isDelete()) {
+        //the record was deleted
+        return Option.empty();
+      }
+      result = existingRecordContext.constructHoodieRecord(mergeResult.get());
+    } else {
+      result = existing;
     }
-    HoodieRecord<R> result = mergeResult.get().getLeft();
     if (result.getData().equals(HoodieRecord.SENTINEL)) {
       //the record did not match and merge case and should not be modified
       return Option.of(result);
     }
 
     //record is inserted or updated
-    String partitionPath = keyGenerator.getPartitionPath((GenericRecord) 
result.getData());
+    String partitionPath = 
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.get().getRecord(),
 writeSchemaWithMetaFields));

Review Comment:
   In AvroRecordContext.convertToAvroRecord(record, schema) impl,
   the schema arg is never used only. not sure where else we are using this api 
from recordContext. thought will point it out. 
   here, to fetch partition path, it may not matter. but if we are using it 
elsewhere
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -414,27 +420,34 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithEx
       Schema existingSchema,
       Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      BaseKeyGenerator keyGenerator) throws IOException {
-    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
-        incoming, writeSchemaWithMetaFields, config.getProps());
-    if (!mergeResult.isPresent()) {
-      //the record was deleted
-      return Option.empty();
+      BufferedRecordMerger<R> recordMerger,
+      BaseKeyGenerator keyGenerator,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
+    Option<BufferedRecord<R>> mergeResult = merge(

Review Comment:
   don't we need to call `finalMerge` here? 
   merge() is invoking `deltaMerge` internally. 
   incase of dedup, I agree we can call `deltaMerge`. but for global index 
purpose, shouldn't we be calling `finalMerge` 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -493,10 +513,36 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
         .filter(p -> p.getRight().isPresent())
         .map(p -> Pair.of(p.getRight().get().getPartitionPath(), 
p.getRight().get().getFileId()))
         .distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+    // define the buffered record merger.
+    ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
hoodieTable.getContext()
+        .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
+    HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+    RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+    
incomingRecordContext.updateRecordKeyExtractor(hoodieTable.getMetaClient().getTableConfig(),
 false);
+    readerContext.initRecordMerger(config.getProps());
+    // Create a reader context for the existing records. In the case of 
merge-into commands, the incoming records
+    // can be using an expression payload so here we rely on the table's 
configured payload class if it is required.
+    ReaderContextFactory<R> readerContextFactoryForExistingRecords = 
(ReaderContextFactory<R>) hoodieTable.getContext()
+        .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), 
hoodieTable.getMetaClient().getTableConfig().getProps());
+    RecordContext<R> existingRecordContext = 
readerContextFactoryForExistingRecords.getContext().getRecordContext();
     // merged existing records with current locations being set
-    HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(), 
hoodieTable);
-
-    final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+    HoodieData<HoodieRecord<R>> existingRecords =
+        getExistingRecords(globalLocations, 
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable, 
readerContextFactoryForExistingRecords);
+    List<String> orderingFieldNames = getOrderingFieldNames(
+        readerContext.getMergeMode(), hoodieTable.getConfig().getProps(), 
hoodieTable.getMetaClient());
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, config.getPayloadClass(), 
null,
+        String.join(",", orderingFieldNames), 
hoodieTable.getMetaClient().getTableConfig().getTableVersion()).getLeft();
+    BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+        readerContext,
+        recordMergeMode,
+        false,

Review Comment:
   we cannot set `enablePartialMerging` to false blindly here right? 
   what in case, this is part of MIT command w/ partial encoding. 
   So, incoming could have partial set of columns, while existing record could 
contain all data cols. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,77 @@ public I combineOnCondition(
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
+    HoodieReaderContext<T> readerContext =
+        (HoodieReaderContext<T>) 
table.getContext().<T>getReaderContextFactoryDuringWrite(table.getMetaClient(), 
table.getConfig().getRecordMerger().getRecordType(), 
table.getConfig().getProps())
+            .getContext();
+    
readerContext.getRecordContext().updateRecordKeyExtractor(table.getMetaClient().getTableConfig(),
 false);
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    readerContext.initRecordMerger(table.getConfig().getProps());
+    List<String> orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
table.getConfig().getProps(), table.getMetaClient());
     HoodieRecordMerger recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
-    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
table.getConfig().getPayloadClass(), null,

Review Comment:
   why merge mode is set to null? 
   shdn't we fetch from table config and set it here?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -445,40 +458,47 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
       HoodieRecord<R> existing,
       Schema writeSchema,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
+      BufferedRecordMerger<R> recordMerger,
+      Option<BaseKeyGenerator> expressionPayloadKeygen,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
     Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
     if (expressionPayloadKeygen.isPresent()) {
       return mergeIncomingWithExistingRecordWithExpressionPayload(incoming, 
existing, writeSchema,
-          existingSchema, writeSchemaWithMetaFields, config, recordMerger, 
expressionPayloadKeygen.get());
+          existingSchema, writeSchemaWithMetaFields, config, recordMerger, 
expressionPayloadKeygen.get(), incomingRecordContext, existingRecordContext, 
orderingFieldNames);
     } else {
       // prepend the hoodie meta fields as the incoming record does not have 
them
       HoodieRecord incomingPrepended = incoming
           .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-      // after prepend the meta fields, convert the record back to the 
original payload
-      HoodieRecord incomingWithMetaFields = incomingPrepended
-          .wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, 
config.getProps(), Option.empty(), config.allowOperationMetadataField(), 
Option.empty(), false, Option.empty());
-      Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
-          .merge(existing, existingSchema, incomingWithMetaFields, 
writeSchemaWithMetaFields, config.getProps());
+      Option<BufferedRecord<R>> mergeResult = merge(

Review Comment:
   I see you are adding support to recordContext to fetch record key for 
incoming records using 
   ```
   
incomingRecordContext.updateRecordKeyExtractor(hoodieTable.getMetaClient().getTableConfig(),
 false);
   ```
   
   below. 
   can we do the same here. 
   this prepending is rewriting entire data w/ new schema right.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to