danny0405 commented on code in PR #13544:
URL: https://github.com/apache/hudi/pull/13544#discussion_r2218370762
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -346,75 +346,43 @@ object LogFileIterator extends SparkAdapterSupport {
val tablePath = tableState.tablePath
val storage = HoodieStorageUtils.getStorage(tablePath,
HadoopFSUtils.getStorageConf(hadoopConf))
- if (HoodieTableMetadata.isMetadataTable(tablePath)) {
- val metadataConfig = HoodieMetadataConfig.newBuilder()
- .fromProperties(tableState.metadataConfig.getProps)
-
.withSpillableMapDir(hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
+ val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
+ .withStorage(storage)
+ .withBasePath(tablePath)
+ .withLogFilePaths(logFiles.map(logFile =>
logFile.getPath.toString).asJava)
+ .withReaderSchema(logSchema)
+ // NOTE: This part shall only be reached when at least one log is
present in the file-group
+ // entailing that table has to have at least one commit
+ .withLatestInstantTime(tableState.latestCommitTimestamp.get)
+ .withReverseReader(false)
+ .withInternalSchema(internalSchema)
+ .withBufferSize(
+ hadoopConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+ HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
+ .withSpillableMapBasePath(
+ hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
FileIOUtils.getDefaultSpillableMapBasePath))
- .enable(true).build()
- val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
- val metadataTable = new HoodieBackedTableMetadata(
- new
HoodieLocalEngineContext(HadoopFSUtils.getStorageConf(hadoopConf)), storage,
metadataConfig,
- dataTableBasePath)
-
- // We have to force full-scan for the MT log record reader, to make sure
- // we can iterate over all of the partitions, since by default some of
the partitions (Column Stats,
- // Bloom Filter) are in "point-lookup" mode
- val forceFullScan = true
-
- // NOTE: In case of Metadata Table partition path equates to partition
name (since there's just one level
- // of indirection among MT partitions)
- val relativePartitionPath = getRelativePartitionPath(
- new StoragePath(tablePath), partitionPath)
-
- val logRecordReader =
- metadataTable.getLogRecordScanner(logFiles.asJava,
relativePartitionPath, toJavaOption(Some(forceFullScan)),
toJavaOption(Some(tableState.latestCommitTimestamp.get)))
- .getLeft
-
- val recordList = closing(logRecordReader) {
- logRecordReader.getRecords
- }
-
- mutable.HashMap(recordList.asScala.map(r => (r.getRecordKey, r)).toSeq:
_*)
- } else {
- val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(tablePath)
- .withLogFilePaths(logFiles.map(logFile =>
logFile.getPath.toString).asJava)
- .withReaderSchema(logSchema)
- // NOTE: This part shall only be reached when at least one log is
present in the file-group
- // entailing that table has to have at least one commit
- .withLatestInstantTime(tableState.latestCommitTimestamp.get)
- .withReverseReader(false)
- .withInternalSchema(internalSchema)
- .withBufferSize(
-
hadoopConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
- HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
- .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
- .withSpillableMapBasePath(
- hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
- FileIOUtils.getDefaultSpillableMapBasePath))
- .withDiskMapType(
- hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
- HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
- .withBitCaskDiskMapCompressionEnabled(
-
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-
- if (logFiles.nonEmpty) {
- logRecordScannerBuilder.withPartition(getRelativePartitionPath(
- new StoragePath(tableState.tablePath),
logFiles.head.getPath.getParent))
- }
+ .withDiskMapType(
+ hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
+ HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
+ .withBitCaskDiskMapCompressionEnabled(
+
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+
+ if (logFiles.nonEmpty) {
+ logRecordScannerBuilder.withPartition(getRelativePartitionPath(
+ new StoragePath(tableState.tablePath),
logFiles.head.getPath.getParent))
+ }
- logRecordScannerBuilder.withRecordMerger(
- HoodieRecordUtils.createRecordMerger(tableState.tablePath,
EngineType.SPARK, tableState.recordMergeImplClasses.asJava,
tableState.recordMergeStrategyId))
+ logRecordScannerBuilder.withRecordMerger(
+ HoodieRecordUtils.createRecordMerger(tableState.tablePath,
EngineType.SPARK, tableState.recordMergeImplClasses.asJava,
tableState.recordMergeStrategyId))
- val scanner = logRecordScannerBuilder.build()
+ val scanner = logRecordScannerBuilder.build()
- closing(scanner) {
- // NOTE: We have to copy record-map (by default immutable copy is
exposed)
- mutable.HashMap(scanner.getRecords.asScala.toSeq: _*)
- }
+ closing(scanner) {
+ // NOTE: We have to copy record-map (by default immutable copy is
exposed)
+ mutable.HashMap(scanner.getRecords.asScala.toSeq: _*)
Review Comment:
not related to this change but the spillable map is copied into a in-memory
hashmap?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]