the-other-tim-brown commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2266753079


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -75,13 +85,60 @@
 public class FileGroupReaderBasedMergeHandle<T, I, K, O> extends 
HoodieWriteMergeHandle<T, I, K, O> {
   private static final Logger LOG = 
LoggerFactory.getLogger(FileGroupReaderBasedMergeHandle.class);
 
-  private final HoodieReaderContext<T> readerContext;
-  private final CompactionOperation operation;
+  private final Option<CompactionOperation> operation;
   private final String maxInstantTime;
+  private HoodieReaderContext<T> readerContext;
   private HoodieReadStats readStats;
-  private final HoodieRecord.HoodieRecordType recordType;
-  private final Option<HoodieCDCLogger> cdcLogger;
+  private HoodieRecord.HoodieRecordType recordType;
+  private Option<HoodieCDCLogger> cdcLogger;
+  private Option<RecordLevelIndexCallback> recordIndexCallbackOpt;
+  private Option<SecondaryIndexCallback> secondaryIndexCallbackOpt;
+  private final boolean isCompaction;
+  private final TypedProperties props;
+  private Iterator<HoodieRecord> incomingRecordsItr;
 
+  /**
+   * Constructor for Copy-On-Write (COW) merge path.
+   * Takes in a base path and an iterator of records to be merged with that 
file.
+   * @param config instance of {@link HoodieWriteConfig} to use.
+   * @param instantTime instant time of the current commit.
+   * @param hoodieTable instance of {@link HoodieTable} being updated.
+   * @param recordItr iterator of records to be merged with the file.
+   * @param partitionPath partition path of the base file.
+   * @param fileId file ID of the base file.
+   * @param taskContextSupplier instance of {@link TaskContextSupplier} to use.
+   * @param keyGeneratorOpt optional instance of {@link BaseKeyGenerator} to 
use for extracting keys from records.
+   * @param readerContext instance of {@link HoodieReaderContext} to use while 
merging for accessing fields and transforming records.
+   */
+  public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                         Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
+                                         TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt, 
HoodieReaderContext<T> readerContext) {

Review Comment:
   The issue is that the merge handles are created on the executors in spark so 
the `hoodieTable.getContext` will always return a local engine context instead 
of a spark engine context when required. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -267,26 +393,141 @@ private static class CDCCallback<T> implements 
BaseFileUpdateCallback<T> {
     }
 
     @Override
-    public void onUpdate(String recordKey, T previousRecord, T mergedRecord) {
+    public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, 
BufferedRecord<T> mergedRecord) {
       cdcLogger.put(recordKey, convertOutput(previousRecord), 
Option.of(convertOutput(mergedRecord)));
-
     }
 
     @Override
-    public void onInsert(String recordKey, T newRecord) {
+    public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
       cdcLogger.put(recordKey, null, Option.of(convertOutput(newRecord)));
-
     }
 
     @Override
-    public void onDelete(String recordKey, T previousRecord) {
+    public void onDelete(String recordKey, BufferedRecord<T> previousRecord, 
HoodieOperation hoodieOperation) {
       cdcLogger.put(recordKey, convertOutput(previousRecord), Option.empty());
-
     }
 
-    private GenericRecord convertOutput(T record) {
-      T convertedRecord = outputConverter.get().map(converter -> record == 
null ? null : converter.apply(record)).orElse(record);
+    private GenericRecord convertOutput(BufferedRecord<T> record) {
+      T convertedRecord = outputConverter.get().map(converter -> record == 
null ? null : converter.apply(record.getRecord())).orElse(record.getRecord());
       return convertedRecord == null ? null : 
readerContext.getRecordContext().convertToAvroRecord(convertedRecord, 
requestedSchema.get());
     }
   }
+
+  private static class RecordLevelIndexCallback<T> implements 
BaseFileUpdateCallback<T> {
+    private final WriteStatus writeStatus;
+    private final HoodieRecordLocation fileRecordLocation;
+    private final String partitionPath;
+
+    public RecordLevelIndexCallback(WriteStatus writeStatus, 
HoodieRecordLocation fileRecordLocation, String partitionPath) {
+      this.writeStatus = writeStatus;
+      this.fileRecordLocation = fileRecordLocation;
+      this.partitionPath = partitionPath;
+    }
+
+    @Override
+    public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, 
BufferedRecord<T> mergedRecord) {
+      writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, 
partitionPath, fileRecordLocation, fileRecordLocation, 
mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
+    }
+
+    @Override
+    public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
+      writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, 
partitionPath, null, fileRecordLocation, newRecord.getHoodieOperation() == 
HoodieOperation.UPDATE_BEFORE));

Review Comment:
   It's always false today but do we want to keep this in case there is some 
future case where it may not be the case?



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -139,15 +139,16 @@ public abstract <I, K, V> List<V> reduceByKey(
   /**
    * Returns reader context factory for write operations in the table.
    *
-   * @param metaClient Table meta client
-   * @param recordType Record type
-   * @param properties Typed properties
+   * @param metaClient            Table meta client
+   * @param recordType            Record type
+   * @param properties            Typed properties
+   * @param outputsCustomPayloads Whether the reader context factory should 
output custom payloads. Final merging of records before writes does not require 
custom payloads.
    */
   public ReaderContextFactory<?> 
getReaderContextFactoryForWrite(HoodieTableMetaClient metaClient, 
HoodieRecord.HoodieRecordType recordType,
-                                                                 
TypedProperties properties) {
+                                                                 
TypedProperties properties, boolean outputsCustomPayloads) {

Review Comment:
   I didn't find a good way right now. This flag is really representing two 
different stages of the writer path, the dedupe/indexing stages and the final 
write. In the final write, we don't want to ever use the payload based records 
since we just want the final indexed record representation of the record.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SecondaryIndexStreamingTracker.java:
##########
@@ -226,6 +228,81 @@ static <T> void trackSecondaryIndexStats(@Nullable 
HoodieKey hoodieKey, Option<H
     });
   }
 
+  /**
+   * The utility function used by Merge Handle to generate secondary index 
stats for the corresponding record.
+   * It considers the new merged version of the record and compares it with 
the older version of the record to generate
+   * secondary index stats.
+   *
+   * @param hoodieKey                 The hoodie key
+   * @param combinedRecordOpt         New record merged with the old record
+   * @param oldRecord                 The old record
+   * @param isDelete                  Whether the record is a DELETE
+   * @param writeStatus               The Write status
+   * @param secondaryIndexDefns       Definitions for secondary index which 
need to be updated
+   */
+  static <T> void trackSecondaryIndexStats(HoodieKey hoodieKey, 
Option<BufferedRecord<T>> combinedRecordOpt, @Nullable BufferedRecord<T> 
oldRecord, boolean isDelete,

Review Comment:
   Method mirrors the one above it but operates directly on BufferedRecord 
instead of converting to HoodieRecord



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -267,26 +393,141 @@ private static class CDCCallback<T> implements 
BaseFileUpdateCallback<T> {
     }
 
     @Override
-    public void onUpdate(String recordKey, T previousRecord, T mergedRecord) {
+    public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, 
BufferedRecord<T> mergedRecord) {
       cdcLogger.put(recordKey, convertOutput(previousRecord), 
Option.of(convertOutput(mergedRecord)));
-
     }
 
     @Override
-    public void onInsert(String recordKey, T newRecord) {
+    public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
       cdcLogger.put(recordKey, null, Option.of(convertOutput(newRecord)));
-
     }
 
     @Override
-    public void onDelete(String recordKey, T previousRecord) {
+    public void onDelete(String recordKey, BufferedRecord<T> previousRecord, 
HoodieOperation hoodieOperation) {
       cdcLogger.put(recordKey, convertOutput(previousRecord), Option.empty());
-
     }
 
-    private GenericRecord convertOutput(T record) {
-      T convertedRecord = outputConverter.get().map(converter -> record == 
null ? null : converter.apply(record)).orElse(record);
+    private GenericRecord convertOutput(BufferedRecord<T> record) {
+      T convertedRecord = outputConverter.get().map(converter -> record == 
null ? null : converter.apply(record.getRecord())).orElse(record.getRecord());
       return convertedRecord == null ? null : 
readerContext.getRecordContext().convertToAvroRecord(convertedRecord, 
requestedSchema.get());
     }
   }
+
+  private static class RecordLevelIndexCallback<T> implements 
BaseFileUpdateCallback<T> {
+    private final WriteStatus writeStatus;
+    private final HoodieRecordLocation fileRecordLocation;
+    private final String partitionPath;
+
+    public RecordLevelIndexCallback(WriteStatus writeStatus, 
HoodieRecordLocation fileRecordLocation, String partitionPath) {
+      this.writeStatus = writeStatus;
+      this.fileRecordLocation = fileRecordLocation;
+      this.partitionPath = partitionPath;
+    }
+
+    @Override
+    public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, 
BufferedRecord<T> mergedRecord) {
+      writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey, 
partitionPath, fileRecordLocation, fileRecordLocation, 
mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));

Review Comment:
   The write status will still be updated in the current code with this record 
delegate even though `ignoreIndexUpdate` is true. This is keeping parity with 
the old system but I am not sure of the context for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to