the-other-tim-brown commented on code in PR #13444:
URL: https://github.com/apache/hudi/pull/13444#discussion_r2191223528
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -482,47 +449,74 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
if (!same) {
// clear up the beforeImageRecords
beforeImageRecords.clear()
- val iter = loadFileSlice(fileSlice)
- iter.foreach { row =>
- val key = getRecordKey(row)
- // Due to the reuse buffer mechanism of Spark serialization,
- // we have to copy the serialized result if we need to retain its
reference
- beforeImageRecords.put(key, serialize(row, copy = true))
+ val iter = loadFileSliceWithKeys(fileSlice)
+ iter.foreach { tuple =>
+ beforeImageRecords.put(tuple._1, tuple._2)
}
// reset beforeImageFiles
beforeImageFiles.clear()
beforeImageFiles.append(files: _*)
}
}
+ private def loadFileSliceWithKeys(fileSlice: FileSlice): Iterator[(String,
InternalRow)] = {
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ loadFileSlice(fileSlice, readerContext).map(internalRow => {
+ val recordKey = readerContext.getRecordKey(internalRow, avroSchema)
+ (recordKey, internalRow)
+ })
+ }
+
private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
- val baseFileInfo =
storage.getPathInfo(fileSlice.getBaseFile.get().getStoragePath)
- val basePartitionedFile = sparkPartitionedFileUtils.createPartitionedFile(
- InternalRow.empty,
- baseFileInfo.getPath,
- 0,
- baseFileInfo.getLength
- )
- val logFiles = fileSlice.getLogFiles
- .sorted(HoodieLogFile.getLogFileComparator)
- .collect(Collectors.toList[HoodieLogFile])
- .asScala.toList
- .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-
- if (logFiles.isEmpty) {
- // no log files, just load the base parquet file
- parquetReader(basePartitionedFile)
- } else {
- // use [[RecordMergingFileIterator]] to load both the base file and log
files
- val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile),
logFiles)
- new RecordMergingFileIterator(
- morSplit,
- BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
- originTableSchema,
- originTableSchema,
- tableState,
- conf.unwrapAs(classOf[Configuration]))
- }
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ loadFileSlice(fileSlice, readerContext)
+ }
+
+ private def loadFileSlice(fileSlice: FileSlice, readerContext:
SparkFileFormatInternalRowReaderContext): Iterator[InternalRow] = {
+ val fileGroupReader = HoodieFileGroupReader.newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(metaClient)
+ .withFileSlice(fileSlice)
+ .withDataSchema(avroSchema)
+ .withRequestedSchema(avroSchema)
+ .withInternalSchema(toJavaOption(originTableSchema.internalSchema))
+ .withProps(readerProperties)
+ .withLatestCommitTime(split.changes.last.getInstant)
+ .build()
+
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator).asScala
+ }
+
+ private def loadLogFile(logFile: HoodieLogFile, instant: String):
Iterator[BufferedRecord[InternalRow]] = {
+ val partitionPath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath,
logFile.getPath.getParent)
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ readerContext.setLatestCommitTime(instant)
+ readerContext.setHasBootstrapBaseFile(false)
+ readerContext.setHasLogFiles(true)
+ readerContext.initRecordMerger(readerProperties)
+ readerContext.setSchemaHandler(
+ new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema,
avroSchema,
+ Option.empty(), metaClient.getTableConfig, readerProperties))
+ val recordBuffer = new
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
+ readerContext.getMergeMode, readerProperties, new HoodieReadStats,
+ Option.ofNullable(metaClient.getTableConfig.getPreCombineField), true,
Option.empty())
+
+ HoodieMergedLogRecordReader.newBuilder[InternalRow]
+ .withStorage(metaClient.getStorage)
+ .withHoodieReaderContext(readerContext)
+ .withLogFiles(Collections.singletonList(logFile))
+ .withReverseReader(false)
+
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue)
+ .withPartition(partitionPath)
+ .withAllowInflightInstants(true)
+ .withMetaClient(metaClient)
+ .withAllowInflightInstants(true)
Review Comment:
Removed this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]