the-other-tim-brown commented on code in PR #13444:
URL: https://github.com/apache/hudi/pull/13444#discussion_r2191223528


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -482,47 +449,74 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     if (!same) {
       // clear up the beforeImageRecords
       beforeImageRecords.clear()
-      val iter = loadFileSlice(fileSlice)
-      iter.foreach { row =>
-        val key = getRecordKey(row)
-        // Due to the reuse buffer mechanism of Spark serialization,
-        // we have to copy the serialized result if we need to retain its 
reference
-        beforeImageRecords.put(key, serialize(row, copy = true))
+      val iter = loadFileSliceWithKeys(fileSlice)
+      iter.foreach { tuple =>
+        beforeImageRecords.put(tuple._1, tuple._2)
       }
       // reset beforeImageFiles
       beforeImageFiles.clear()
       beforeImageFiles.append(files: _*)
     }
   }
 
+  private def loadFileSliceWithKeys(fileSlice: FileSlice): Iterator[(String, 
InternalRow)] = {
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    loadFileSlice(fileSlice, readerContext).map(internalRow => {
+      val recordKey = readerContext.getRecordKey(internalRow, avroSchema)
+      (recordKey, internalRow)
+    })
+  }
+
   private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
-    val baseFileInfo = 
storage.getPathInfo(fileSlice.getBaseFile.get().getStoragePath)
-    val basePartitionedFile = sparkPartitionedFileUtils.createPartitionedFile(
-      InternalRow.empty,
-      baseFileInfo.getPath,
-      0,
-      baseFileInfo.getLength
-    )
-    val logFiles = fileSlice.getLogFiles
-      .sorted(HoodieLogFile.getLogFileComparator)
-      .collect(Collectors.toList[HoodieLogFile])
-      .asScala.toList
-      .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-
-    if (logFiles.isEmpty) {
-      // no log files, just load the base parquet file
-      parquetReader(basePartitionedFile)
-    } else {
-      // use [[RecordMergingFileIterator]] to load both the base file and log 
files
-      val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile), 
logFiles)
-      new RecordMergingFileIterator(
-        morSplit,
-        BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
-        originTableSchema,
-        originTableSchema,
-        tableState,
-        conf.unwrapAs(classOf[Configuration]))
-    }
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    loadFileSlice(fileSlice, readerContext)
+  }
+
+  private def loadFileSlice(fileSlice: FileSlice, readerContext: 
SparkFileFormatInternalRowReaderContext): Iterator[InternalRow] = {
+    val fileGroupReader = HoodieFileGroupReader.newBuilder()
+      .withReaderContext(readerContext)
+      .withHoodieTableMetaClient(metaClient)
+      .withFileSlice(fileSlice)
+      .withDataSchema(avroSchema)
+      .withRequestedSchema(avroSchema)
+      .withInternalSchema(toJavaOption(originTableSchema.internalSchema))
+      .withProps(readerProperties)
+      .withLatestCommitTime(split.changes.last.getInstant)
+      .build()
+    
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator).asScala
+  }
+
+  private def loadLogFile(logFile: HoodieLogFile, instant: String): 
Iterator[BufferedRecord[InternalRow]] = {
+    val partitionPath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath, 
logFile.getPath.getParent)
+    val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+      conf, metaClient.getTableConfig)
+    readerContext.setLatestCommitTime(instant)
+    readerContext.setHasBootstrapBaseFile(false)
+    readerContext.setHasLogFiles(true)
+    readerContext.initRecordMerger(readerProperties)
+    readerContext.setSchemaHandler(
+      new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema, 
avroSchema,
+        Option.empty(), metaClient.getTableConfig, readerProperties))
+    val recordBuffer = new 
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
+      readerContext.getMergeMode, readerProperties, new HoodieReadStats,
+      Option.ofNullable(metaClient.getTableConfig.getPreCombineField), true, 
Option.empty())
+
+    HoodieMergedLogRecordReader.newBuilder[InternalRow]
+      .withStorage(metaClient.getStorage)
+      .withHoodieReaderContext(readerContext)
+      .withLogFiles(Collections.singletonList(logFile))
+      .withReverseReader(false)
+      
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue)
+      .withPartition(partitionPath)
+      .withAllowInflightInstants(true)
+      .withMetaClient(metaClient)
+      .withAllowInflightInstants(true)

Review Comment:
   Removed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to