the-other-tim-brown commented on code in PR #13544:
URL: https://github.com/apache/hudi/pull/13544#discussion_r2218622995


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -346,75 +346,43 @@ object LogFileIterator extends SparkAdapterSupport {
     val tablePath = tableState.tablePath
     val storage = HoodieStorageUtils.getStorage(tablePath, 
HadoopFSUtils.getStorageConf(hadoopConf))
 
-    if (HoodieTableMetadata.isMetadataTable(tablePath)) {
-      val metadataConfig = HoodieMetadataConfig.newBuilder()
-        .fromProperties(tableState.metadataConfig.getProps)
-        
.withSpillableMapDir(hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
+    val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
+      .withStorage(storage)
+      .withBasePath(tablePath)
+      .withLogFilePaths(logFiles.map(logFile => 
logFile.getPath.toString).asJava)
+      .withReaderSchema(logSchema)
+      // NOTE: This part shall only be reached when at least one log is 
present in the file-group
+      //       entailing that table has to have at least one commit
+      .withLatestInstantTime(tableState.latestCommitTimestamp.get)
+      .withReverseReader(false)
+      .withInternalSchema(internalSchema)
+      .withBufferSize(
+        hadoopConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+          HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
+      .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
+      .withSpillableMapBasePath(
+        hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
           FileIOUtils.getDefaultSpillableMapBasePath))
-        .enable(true).build()
-      val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
-      val metadataTable = new HoodieBackedTableMetadata(
-        new 
HoodieLocalEngineContext(HadoopFSUtils.getStorageConf(hadoopConf)), storage, 
metadataConfig,
-        dataTableBasePath)
-
-      // We have to force full-scan for the MT log record reader, to make sure
-      // we can iterate over all of the partitions, since by default some of 
the partitions (Column Stats,
-      // Bloom Filter) are in "point-lookup" mode
-      val forceFullScan = true
-
-      // NOTE: In case of Metadata Table partition path equates to partition 
name (since there's just one level
-      //       of indirection among MT partitions)
-      val relativePartitionPath = getRelativePartitionPath(
-        new StoragePath(tablePath), partitionPath)
-
-      val logRecordReader =
-        metadataTable.getLogRecordScanner(logFiles.asJava, 
relativePartitionPath, toJavaOption(Some(forceFullScan)), 
toJavaOption(Some(tableState.latestCommitTimestamp.get)))
-          .getLeft
-
-      val recordList = closing(logRecordReader) {
-        logRecordReader.getRecords
-      }
-
-      mutable.HashMap(recordList.asScala.map(r => (r.getRecordKey, r)).toSeq: 
_*)
-    } else {
-      val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(storage)
-        .withBasePath(tablePath)
-        .withLogFilePaths(logFiles.map(logFile => 
logFile.getPath.toString).asJava)
-        .withReaderSchema(logSchema)
-        // NOTE: This part shall only be reached when at least one log is 
present in the file-group
-        //       entailing that table has to have at least one commit
-        .withLatestInstantTime(tableState.latestCommitTimestamp.get)
-        .withReverseReader(false)
-        .withInternalSchema(internalSchema)
-        .withBufferSize(
-          
hadoopConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
-            HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
-        .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
-        .withSpillableMapBasePath(
-          hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
-            FileIOUtils.getDefaultSpillableMapBasePath))
-        .withDiskMapType(
-          hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
-            HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
-        .withBitCaskDiskMapCompressionEnabled(
-          
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-            
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-
-      if (logFiles.nonEmpty) {
-        logRecordScannerBuilder.withPartition(getRelativePartitionPath(
-          new StoragePath(tableState.tablePath), 
logFiles.head.getPath.getParent))
-      }
+      .withDiskMapType(
+        hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
+          HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
+      .withBitCaskDiskMapCompressionEnabled(
+        
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+          
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+
+    if (logFiles.nonEmpty) {
+      logRecordScannerBuilder.withPartition(getRelativePartitionPath(
+        new StoragePath(tableState.tablePath), 
logFiles.head.getPath.getParent))
+    }
 
-      logRecordScannerBuilder.withRecordMerger(
-        HoodieRecordUtils.createRecordMerger(tableState.tablePath, 
EngineType.SPARK, tableState.recordMergeImplClasses.asJava, 
tableState.recordMergeStrategyId))
+    logRecordScannerBuilder.withRecordMerger(
+      HoodieRecordUtils.createRecordMerger(tableState.tablePath, 
EngineType.SPARK, tableState.recordMergeImplClasses.asJava, 
tableState.recordMergeStrategyId))
 
-      val scanner = logRecordScannerBuilder.build()
+    val scanner = logRecordScannerBuilder.build()
 
-      closing(scanner) {
-        // NOTE: We have to copy record-map (by default immutable copy is 
exposed)
-        mutable.HashMap(scanner.getRecords.asScala.toSeq: _*)
-      }
+    closing(scanner) {
+      // NOTE: We have to copy record-map (by default immutable copy is 
exposed)
+      mutable.HashMap(scanner.getRecords.asScala.toSeq: _*)

Review Comment:
   Looks like it, we should be able to get rid of this reader path soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to