codope commented on a change in pull request #4761:
URL: https://github.com/apache/hudi/pull/4761#discussion_r808202079



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -608,82 +583,126 @@ private static void 
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
   }
 
   /**
-   * Convert rollback action metadata to bloom filter index records.
+   * Convert added and deleted files metadata to bloom filter index records.
    */
-  private static List<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
-                                                                     
HoodieTableMetaClient dataMetaClient,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
-      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
-        return;
-      }
+  public static HoodieData<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                          
Map<String, List<String>> partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
MetadataRecordsGenerationParams recordsGenerationParams,
+                                                                          
String instantTime) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+
+    List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
+    HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
+      final List<String> deletedFileList = 
partitionToDeletedFilesEntry.getRight();
+      return deletedFileList.stream().flatMap(deletedFile -> {
+        if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+          return Stream.empty();
+        }
 
-      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
-    }));
+        final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+        return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+            partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, 
ByteBuffer.allocate(0), true));
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
 
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+    List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+      final String partitionName = partitionToAppendedFilesEntry.getKey();
+      final Map<String, Long> appendedFileMap = 
partitionToAppendedFilesEntry.getValue();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      appendedFileMap.forEach((appendedFile, length) -> {
+      return 
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
+        final String appendedFile = appendedFileLengthPairEntry.getKey();
         if (!FSUtils.isBaseFile(new Path(appendedFile))) {
-          return;
+          return Stream.empty();
         }
         final String pathWithPartition = partitionName + "/" + appendedFile;
-        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), 
pathWithPartition);
-        try {
-          HoodieFileReader<IndexedRecord> fileReader =
-              
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), 
appendedFilePath);
+        final Path appendedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
+        try (HoodieFileReader<IndexedRecord> fileReader =
+                 
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
 appendedFilePath)) {
           final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
           if (fileBloomFilter == null) {
             LOG.error("Failed to read bloom filter for " + appendedFilePath);
-            return;
+            return Stream.empty();
           }
           ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
           HoodieRecord record = 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
-              partition, appendedFile, instantTime, bloomByteBuffer, false);
-          records.add(record);
-          fileReader.close();
+              partition, appendedFile, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
+          return Stream.of(record);
         } catch (IOException e) {
           LOG.error("Failed to get bloom filter for file: " + 
appendedFilePath);
         }
-      });
+        return Stream.empty();
+      }).iterator();
     });
-    return records;
+    allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
+
+    return allRecordsRDD;
   }
 
   /**
-   * Convert rollback action metadata to column stats index records.
+   * Convert added and deleted action metadata to column stats index records.
    */
-  private static List<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
-                                                                     
HoodieTableMetaClient datasetMetaClient,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> latestColumns = getLatestColumns(datasetMetaClient);
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
+  public static HoodieData<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                          
Map<String, List<String>> partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
MetadataRecordsGenerationParams recordsGenerationParams) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient());
+
+    final List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+      final List<String> deletedFileList = 
partitionToDeletedFilesEntry.getRight();
+
+      return deletedFileList.stream().flatMap(deletedFile -> {
         final String filePathWithPartition = partitionName + "/" + deletedFile;
-        records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,
-            latestColumns, true).collect(Collectors.toList()));
-      }
-    }));
-
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> 
appendedFileMap.forEach(
-        (appendedFile, size) -> {
-          final String partition = partitionName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionName;
-          if 
(appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-            final String filePathWithPartition = partitionName + "/" + 
appendedFile;
-            records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,
-                latestColumns, false).collect(Collectors.toList()));
-          }
-        }));
-    return records;
+        return getColumnStats(partition, filePathWithPartition, 
recordsGenerationParams.getDataMetaClient(),
+            columnsToIndex, Option.empty(), true);
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+
+    final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, Map<String, Long>>> 
partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), 
recordsGenerationParams.getBloomIndexParallelism()));

Review comment:
       Added a config for col stats parallelism.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to