nsivabalan commented on code in PR #13444:
URL: https://github.com/apache/hudi/pull/13444#discussion_r2185713172


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -109,18 +112,23 @@ public HoodieFileGroupReader(HoodieReaderContext<T> 
readerContext, HoodieStorage
       long start, long length, boolean shouldUseRecordPosition) {
     this(readerContext, storage, tablePath, latestCommitTime, fileSlice, 
dataSchema,
         requestedSchema, internalSchemaOpt, hoodieTableMetaClient, props, 
start, length,
-        shouldUseRecordPosition, false, false, false);
+        shouldUseRecordPosition, false, false, false, Option.empty());
   }
 
   private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, 
HoodieStorage storage, String tablePath,
                                 String latestCommitTime, FileSlice fileSlice, 
Schema dataSchema, Schema requestedSchema,
                                 Option<InternalSchema> internalSchemaOpt, 
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
-                                long start, long length, boolean 
shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete, 
boolean sortOutput) {
+                                long start, long length, boolean 
shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete, 
boolean sortOutput,
+                                Option<FileGroupUpdateCallback<T>> 
updateCallback) {
     this.readerContext = readerContext;
+    this.fileGroupUpdateCallback = updateCallback;
     this.metaClient = hoodieTableMetaClient;
     this.storage = storage;
     this.hoodieBaseFileOption = fileSlice.getBaseFile();
-    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    this.logFiles = fileSlice.getLogFiles()

Review Comment:
   don't really need to fix in this patch. 
   but can we add a different api to `FileSlice` only to expose 
`getCDClogFiles()` and so `getLogFiles()` can only return non cdc files to call 
callers. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord, 
BufferedRecord<T> logRecordInf
       Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo, 
logRecordInfo);
       if (!isDeleteAndRecord.getLeft()) {
         // Updates
-        nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+        nextRecord = 
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+        // If the record is not the same as the base record, we can emit an 
update
+        if (isDeleteAndRecord.getRight() != baseRecord) {
+          callbackOption.ifPresent(callback -> {
+            BufferedRecord<T> mergeResult = 
BufferedRecord.forRecordWithContext(nextRecord, 
readerContext.getSchemaHandler().getRequestedSchema(), readerContext, 
orderingFieldName, false);
+            
callback.onUpdate(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
 
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo)),
+                
readerContext.constructHoodieRecord(applyOutputSchemaConversion(mergeResult)));
+          });
+        }
         readStats.incrementNumUpdates();
         return true;
-      } else if (emitDelete) {
-        // emit Deletes
-        nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(), 
baseRecordInfo.getRecordKey());
-        readStats.incrementNumDeletes();
-        return nextRecord != null;
       } else {
-        // not emit Deletes
+        // emit Deletes
+        callbackOption.ifPresent(callback -> 
callback.onDelete(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
+            
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo))));
         readStats.incrementNumDeletes();
-        return false;
+        if (emitDelete) {
+          nextRecord = 
applyOutputSchemaConversion(readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
 baseRecordInfo.getRecordKey()));

Review Comment:
   prior to this patch, can you help me understand where are we applying the 
schema conversion? 
   looks like w/ this patch, even if callbackOption is empty, here we are doing 
something extra which was not happening before. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord, 
BufferedRecord<T> logRecordInf
       Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo, 
logRecordInfo);
       if (!isDeleteAndRecord.getLeft()) {
         // Updates
-        nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+        nextRecord = 
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+        // If the record is not the same as the base record, we can emit an 
update
+        if (isDeleteAndRecord.getRight() != baseRecord) {
+          callbackOption.ifPresent(callback -> {
+            BufferedRecord<T> mergeResult = 
BufferedRecord.forRecordWithContext(nextRecord, 
readerContext.getSchemaHandler().getRequestedSchema(), readerContext, 
orderingFieldName, false);

Review Comment:
   orthogonal point. 
   sometimes, it might be confusing for devs on what is actually stored in 
BufferedRecord as record type. whether its HoodieRecord or is it the raw record 
representation. for eg, InternalRow incase of spark, GenericRecord incase of 
avro etc. 
   
   bcoz, many places we have named methods as `getRecord()` which might lead to 
confusion. 
   not sure if we can give it any better naming. 
   just a thought
   
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -402,10 +375,8 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
             && currentCDCFileSplit.getBeforeFileSlice.isPresent)
           
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
           val absLogPath = new StoragePath(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
-          val morSplit = HoodieMergeOnReadFileSplit(None, List(new 
HoodieLogFile(storage.getPathInfo(absLogPath))))
-          val logFileIterator = new LogFileIterator(

Review Comment:
   this existing LogFileIterator which is in Iterators.scala, accounts for 
isDeleteOperation. 
   can you remind me if FileGroupReader/FileGroupRecordBuffer accounts for it 
as well. I did not see any explicit checks in 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -598,11 +638,13 @@ protected boolean hasNextLogRecord() {
     while (logRecordIterator.hasNext()) {
       BufferedRecord<T> nextRecordInfo = logRecordIterator.next();
       if (!nextRecordInfo.isDelete()) {
-        nextRecord = nextRecordInfo.getRecord();
+        BufferedRecord<T> convertedBufferedRecord = 
applyOutputSchemaConversion(nextRecordInfo);
+        nextRecord = convertedBufferedRecord.getRecord();
+        callbackOption.ifPresent(callback -> 
callback.onInsert(readerContext.constructHoodieRecord(convertedBufferedRecord)));
         readStats.incrementNumInserts();
         return true;
       } else if (emitDelete) {
-        nextRecord = readerContext.getDeleteRow(nextRecordInfo.getRecord(), 
nextRecordInfo.getRecordKey());
+        nextRecord = 
applyOutputSchemaConversion(readerContext.getDeleteRow(nextRecordInfo.getRecord(),
 nextRecordInfo.getRecordKey()));

Review Comment:
   why no callback invoked here?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -482,47 +449,74 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     if (!same) {
       // clear up the beforeImageRecords
       beforeImageRecords.clear()
-      val iter = loadFileSlice(fileSlice)
-      iter.foreach { row =>
-        val key = getRecordKey(row)
-        // Due to the reuse buffer mechanism of Spark serialization,
-        // we have to copy the serialized result if we need to retain its 
reference
-        beforeImageRecords.put(key, serialize(row, copy = true))
+      val iter = loadFileSliceWithKeys(fileSlice)
+      iter.foreach { tuple =>
+        beforeImageRecords.put(tuple._1, tuple._2)
       }
       // reset beforeImageFiles
       beforeImageFiles.clear()
       beforeImageFiles.append(files: _*)
     }
   }
 
+  private def loadFileSliceWithKeys(fileSlice: FileSlice): Iterator[(String, 
InternalRow)] = {
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    loadFileSlice(fileSlice, readerContext).map(internalRow => {
+      val recordKey = readerContext.getRecordKey(internalRow, avroSchema)
+      (recordKey, internalRow)
+    })
+  }
+
   private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
-    val baseFileInfo = 
storage.getPathInfo(fileSlice.getBaseFile.get().getStoragePath)
-    val basePartitionedFile = sparkPartitionedFileUtils.createPartitionedFile(
-      InternalRow.empty,
-      baseFileInfo.getPath,
-      0,
-      baseFileInfo.getLength
-    )
-    val logFiles = fileSlice.getLogFiles
-      .sorted(HoodieLogFile.getLogFileComparator)
-      .collect(Collectors.toList[HoodieLogFile])
-      .asScala.toList
-      .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-
-    if (logFiles.isEmpty) {
-      // no log files, just load the base parquet file
-      parquetReader(basePartitionedFile)
-    } else {
-      // use [[RecordMergingFileIterator]] to load both the base file and log 
files
-      val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile), 
logFiles)
-      new RecordMergingFileIterator(
-        morSplit,
-        BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
-        originTableSchema,
-        originTableSchema,
-        tableState,
-        conf.unwrapAs(classOf[Configuration]))
-    }
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    loadFileSlice(fileSlice, readerContext)
+  }
+
+  private def loadFileSlice(fileSlice: FileSlice, readerContext: 
SparkFileFormatInternalRowReaderContext): Iterator[InternalRow] = {
+    val fileGroupReader = HoodieFileGroupReader.newBuilder()
+      .withReaderContext(readerContext)
+      .withHoodieTableMetaClient(metaClient)
+      .withFileSlice(fileSlice)
+      .withDataSchema(avroSchema)
+      .withRequestedSchema(avroSchema)
+      .withInternalSchema(toJavaOption(originTableSchema.internalSchema))
+      .withProps(readerProperties)
+      .withLatestCommitTime(split.changes.last.getInstant)
+      .build()
+    
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator).asScala
+  }
+
+  private def loadLogFile(logFile: HoodieLogFile, instant: String): 
Iterator[BufferedRecord[InternalRow]] = {
+    val partitionPath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath, 
logFile.getPath.getParent)
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    readerContext.setLatestCommitTime(instant)
+    readerContext.setHasBootstrapBaseFile(false)
+    readerContext.setHasLogFiles(true)
+    readerContext.initRecordMerger(readerProperties)
+    readerContext.setSchemaHandler(
+      new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema, 
avroSchema,
+        Option.empty(), metaClient.getTableConfig, readerProperties))
+    val recordBuffer = new 
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
+      readerContext.getMergeMode, readerProperties, new HoodieReadStats,
+      Option.ofNullable(metaClient.getTableConfig.getPreCombineField), true, 
Option.empty())
+
+    HoodieMergedLogRecordReader.newBuilder[InternalRow]
+      .withStorage(metaClient.getStorage)
+      .withHoodieReaderContext(readerContext)
+      .withLogFiles(Collections.singletonList(logFile))
+      .withReverseReader(false)
+      
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue)
+      .withPartition(partitionPath)
+      .withAllowInflightInstants(true)
+      .withMetaClient(metaClient)
+      .withAllowInflightInstants(true)

Review Comment:
   `withAllowInflightInstants` is expected to be used only for MDT record 
generation for writes



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord, 
BufferedRecord<T> logRecordInf
       Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo, 
logRecordInfo);
       if (!isDeleteAndRecord.getLeft()) {
         // Updates
-        nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+        nextRecord = 
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+        // If the record is not the same as the base record, we can emit an 
update
+        if (isDeleteAndRecord.getRight() != baseRecord) {
+          callbackOption.ifPresent(callback -> {
+            BufferedRecord<T> mergeResult = 
BufferedRecord.forRecordWithContext(nextRecord, 
readerContext.getSchemaHandler().getRequestedSchema(), readerContext, 
orderingFieldName, false);
+            
callback.onUpdate(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
 
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo)),
+                
readerContext.constructHoodieRecord(applyOutputSchemaConversion(mergeResult)));
+          });
+        }
         readStats.incrementNumUpdates();
         return true;
-      } else if (emitDelete) {
-        // emit Deletes
-        nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(), 
baseRecordInfo.getRecordKey());
-        readStats.incrementNumDeletes();
-        return nextRecord != null;
       } else {
-        // not emit Deletes
+        // emit Deletes
+        callbackOption.ifPresent(callback -> 
callback.onDelete(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
+            
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo))));

Review Comment:
   won't we get just HoodieKeys from delete log block for deletes. Trying to 
understand how does applying output schema conversion works here? we just 
inject defaults values for entire data cols ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord, 
BufferedRecord<T> logRecordInf
       Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo, 
logRecordInfo);
       if (!isDeleteAndRecord.getLeft()) {
         // Updates
-        nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+        nextRecord = 
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+        // If the record is not the same as the base record, we can emit an 
update
+        if (isDeleteAndRecord.getRight() != baseRecord) {
+          callbackOption.ifPresent(callback -> {
+            BufferedRecord<T> mergeResult = 
BufferedRecord.forRecordWithContext(nextRecord, 
readerContext.getSchemaHandler().getRequestedSchema(), readerContext, 
orderingFieldName, false);
+            
callback.onUpdate(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
 
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo)),
+                
readerContext.constructHoodieRecord(applyOutputSchemaConversion(mergeResult)));
+          });
+        }
         readStats.incrementNumUpdates();
         return true;
-      } else if (emitDelete) {
-        // emit Deletes
-        nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(), 
baseRecordInfo.getRecordKey());
-        readStats.incrementNumDeletes();
-        return nextRecord != null;
       } else {
-        // not emit Deletes
+        // emit Deletes
+        callbackOption.ifPresent(callback -> 
callback.onDelete(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
+            
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo))));
         readStats.incrementNumDeletes();
-        return false;
+        if (emitDelete) {
+          nextRecord = 
applyOutputSchemaConversion(readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
 baseRecordInfo.getRecordKey()));
+          return nextRecord != null;
+        } else {
+          return false;
+        }
       }
     }
 
     // Inserts
-    nextRecord = readerContext.seal(baseRecord);
+    nextRecord = readerContext.seal(applyOutputSchemaConversion(baseRecord));
     readStats.incrementNumInserts();
     return true;
   }
 
+  /**
+   * Applies the final output schema conversion to the buffered record if 
required. This ensures the records match the requested schema.
+   * @param bufferedRecord the buffered record to convert
+   * @return a new buffered record with the converted record and the proper 
schema ID set
+   */
+  protected BufferedRecord<T> applyOutputSchemaConversion(BufferedRecord<T> 
bufferedRecord) {
+    if (bufferedRecord.getRecord() != null && outputConverter.isPresent()) {
+      return new BufferedRecord<>(bufferedRecord.getRecordKey(), 
bufferedRecord.getOrderingValue(),
+          outputConverter.get().apply(bufferedRecord.getRecord()), 
readerContext.encodeAvroSchema(readerContext.getSchemaHandler().getRequestedSchema()),
 bufferedRecord.isDelete());

Review Comment:
   minor. 
`readerContext.encodeAvroSchema(readerContext.getSchemaHandler().getRequestedSchema())`
 will be same for entire file group. we could initialize once in the 
constructor and use it here rater than  calling `encodeAvroSchema` for every 
record 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord, 
BufferedRecord<T> logRecordInf
       Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo, 
logRecordInfo);
       if (!isDeleteAndRecord.getLeft()) {
         // Updates
-        nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+        nextRecord = 
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+        // If the record is not the same as the base record, we can emit an 
update
+        if (isDeleteAndRecord.getRight() != baseRecord) {
+          callbackOption.ifPresent(callback -> {
+            BufferedRecord<T> mergeResult = 
BufferedRecord.forRecordWithContext(nextRecord, 
readerContext.getSchemaHandler().getRequestedSchema(), readerContext, 
orderingFieldName, false);
+            
callback.onUpdate(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
 
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo)),
+                
readerContext.constructHoodieRecord(applyOutputSchemaConversion(mergeResult)));
+          });
+        }
         readStats.incrementNumUpdates();
         return true;
-      } else if (emitDelete) {
-        // emit Deletes
-        nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(), 
baseRecordInfo.getRecordKey());
-        readStats.incrementNumDeletes();
-        return nextRecord != null;
       } else {
-        // not emit Deletes
+        // emit Deletes
+        callbackOption.ifPresent(callback -> 
callback.onDelete(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
+            
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo))));
         readStats.incrementNumDeletes();
-        return false;
+        if (emitDelete) {
+          nextRecord = 
applyOutputSchemaConversion(readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
 baseRecordInfo.getRecordKey()));

Review Comment:
   oh, its happening at L 573 and 599 as well



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -482,47 +449,74 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     if (!same) {
       // clear up the beforeImageRecords
       beforeImageRecords.clear()
-      val iter = loadFileSlice(fileSlice)
-      iter.foreach { row =>
-        val key = getRecordKey(row)
-        // Due to the reuse buffer mechanism of Spark serialization,
-        // we have to copy the serialized result if we need to retain its 
reference
-        beforeImageRecords.put(key, serialize(row, copy = true))
+      val iter = loadFileSliceWithKeys(fileSlice)
+      iter.foreach { tuple =>
+        beforeImageRecords.put(tuple._1, tuple._2)
       }
       // reset beforeImageFiles
       beforeImageFiles.clear()
       beforeImageFiles.append(files: _*)
     }
   }
 
+  private def loadFileSliceWithKeys(fileSlice: FileSlice): Iterator[(String, 
InternalRow)] = {
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    loadFileSlice(fileSlice, readerContext).map(internalRow => {
+      val recordKey = readerContext.getRecordKey(internalRow, avroSchema)
+      (recordKey, internalRow)
+    })
+  }
+
   private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
-    val baseFileInfo = 
storage.getPathInfo(fileSlice.getBaseFile.get().getStoragePath)
-    val basePartitionedFile = sparkPartitionedFileUtils.createPartitionedFile(
-      InternalRow.empty,
-      baseFileInfo.getPath,
-      0,
-      baseFileInfo.getLength
-    )
-    val logFiles = fileSlice.getLogFiles
-      .sorted(HoodieLogFile.getLogFileComparator)
-      .collect(Collectors.toList[HoodieLogFile])
-      .asScala.toList
-      .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-
-    if (logFiles.isEmpty) {
-      // no log files, just load the base parquet file
-      parquetReader(basePartitionedFile)
-    } else {
-      // use [[RecordMergingFileIterator]] to load both the base file and log 
files
-      val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile), 
logFiles)
-      new RecordMergingFileIterator(
-        morSplit,
-        BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
-        originTableSchema,
-        originTableSchema,
-        tableState,
-        conf.unwrapAs(classOf[Configuration]))
-    }
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    loadFileSlice(fileSlice, readerContext)
+  }
+
+  private def loadFileSlice(fileSlice: FileSlice, readerContext: 
SparkFileFormatInternalRowReaderContext): Iterator[InternalRow] = {
+    val fileGroupReader = HoodieFileGroupReader.newBuilder()
+      .withReaderContext(readerContext)
+      .withHoodieTableMetaClient(metaClient)
+      .withFileSlice(fileSlice)
+      .withDataSchema(avroSchema)
+      .withRequestedSchema(avroSchema)
+      .withInternalSchema(toJavaOption(originTableSchema.internalSchema))
+      .withProps(readerProperties)
+      .withLatestCommitTime(split.changes.last.getInstant)
+      .build()
+    
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator).asScala
+  }
+
+  private def loadLogFile(logFile: HoodieLogFile, instant: String): 
Iterator[BufferedRecord[InternalRow]] = {
+    val partitionPath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath, 
logFile.getPath.getParent)
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    readerContext.setLatestCommitTime(instant)
+    readerContext.setHasBootstrapBaseFile(false)
+    readerContext.setHasLogFiles(true)
+    readerContext.initRecordMerger(readerProperties)
+    readerContext.setSchemaHandler(
+      new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema, 
avroSchema,
+        Option.empty(), metaClient.getTableConfig, readerProperties))
+    val recordBuffer = new 
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
+      readerContext.getMergeMode, readerProperties, new HoodieReadStats,
+      Option.ofNullable(metaClient.getTableConfig.getPreCombineField), true, 
Option.empty())
+
+    HoodieMergedLogRecordReader.newBuilder[InternalRow]
+      .withStorage(metaClient.getStorage)
+      .withHoodieReaderContext(readerContext)
+      .withLogFiles(Collections.singletonList(logFile))
+      .withReverseReader(false)
+      
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue)
+      .withPartition(partitionPath)
+      .withAllowInflightInstants(true)
+      .withMetaClient(metaClient)
+      .withAllowInflightInstants(true)

Review Comment:
   repeated calls to `withAllowInflightInstants`
   also, why do we need to set this to true? 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -598,11 +638,13 @@ protected boolean hasNextLogRecord() {
     while (logRecordIterator.hasNext()) {
       BufferedRecord<T> nextRecordInfo = logRecordIterator.next();
       if (!nextRecordInfo.isDelete()) {
-        nextRecord = nextRecordInfo.getRecord();
+        BufferedRecord<T> convertedBufferedRecord = 
applyOutputSchemaConversion(nextRecordInfo);
+        nextRecord = convertedBufferedRecord.getRecord();
+        callbackOption.ifPresent(callback -> 
callback.onInsert(readerContext.constructHoodieRecord(convertedBufferedRecord)));
         readStats.incrementNumInserts();
         return true;
       } else if (emitDelete) {
-        nextRecord = readerContext.getDeleteRow(nextRecordInfo.getRecord(), 
nextRecordInfo.getRecordKey());
+        nextRecord = 
applyOutputSchemaConversion(readerContext.getDeleteRow(nextRecordInfo.getRecord(),
 nextRecordInfo.getRecordKey()));

Review Comment:
   bcoz, in L 567 to 602, we would have invoked callbacks for updates and 
deletes. and only pending items are inserts from logs and so just 643 alone 
would suffice.
   is my understanding right?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupUpdateCallback.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+public interface FileGroupUpdateCallback<T> {

Review Comment:
   java docs please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to