nsivabalan commented on a change in pull request #4761:
URL: https://github.com/apache/hudi/pull/4761#discussion_r804981688



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
##########
@@ -33,6 +39,24 @@
   private final long totalSize;
   private final long totalUncompressedSize;
 
+  public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, 
HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> 
COLUMN_RANGE_MERGE_FUNCTION =
+      (oldColumnRange, newColumnRange) -> {
+        
ValidationUtils.checkArgument(oldColumnRange.getColumnName().equals(newColumnRange.getColumnName()));

Review comment:
       do we need to validate for every record? can we move validations one 
level up and avoid this may be. since this is called or every record in an 
iterative manner, trying to see if this is over kill

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MetadataRecordsGenerationParams implements Serializable {

Review comment:
       java docs

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -608,82 +583,126 @@ private static void 
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
   }
 
   /**
-   * Convert rollback action metadata to bloom filter index records.
+   * Convert added and deleted files metadata to bloom filter index records.
    */
-  private static List<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
-                                                                     
HoodieTableMetaClient dataMetaClient,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
-      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
-        return;
-      }
+  public static HoodieData<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                          
Map<String, List<String>> partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
MetadataRecordsGenerationParams recordsGenerationParams,
+                                                                          
String instantTime) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+
+    List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
+    HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
+      final List<String> deletedFileList = 
partitionToDeletedFilesEntry.getRight();
+      return deletedFileList.stream().flatMap(deletedFile -> {
+        if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+          return Stream.empty();
+        }
 
-      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
-    }));
+        final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+        return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+            partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, 
ByteBuffer.allocate(0), true));
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
 
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+    List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+      final String partitionName = partitionToAppendedFilesEntry.getKey();
+      final Map<String, Long> appendedFileMap = 
partitionToAppendedFilesEntry.getValue();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      appendedFileMap.forEach((appendedFile, length) -> {
+      return 
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
+        final String appendedFile = appendedFileLengthPairEntry.getKey();
         if (!FSUtils.isBaseFile(new Path(appendedFile))) {
-          return;
+          return Stream.empty();
         }
         final String pathWithPartition = partitionName + "/" + appendedFile;
-        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), 
pathWithPartition);
-        try {
-          HoodieFileReader<IndexedRecord> fileReader =
-              
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), 
appendedFilePath);
+        final Path appendedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
+        try (HoodieFileReader<IndexedRecord> fileReader =
+                 
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
 appendedFilePath)) {
           final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
           if (fileBloomFilter == null) {
             LOG.error("Failed to read bloom filter for " + appendedFilePath);
-            return;
+            return Stream.empty();

Review comment:
       shouldn't we be throwing exception here? how an a base file don't have 
bloom filter? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +383,11 @@ private void processAppendResult(AppendResult result) {
       updateWriteStatus(stat, result);
     }
 
+    Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
stat.getRecordsStats().isPresent()

Review comment:
       we should populate this only if metadata table and meta index is enabled 
right?  or did we decide to serialize this info irrespective of it. bcoz, 
computing these stats will def add to write latency. So, trying to see if we 
can avoid if not required at all. 
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -642,6 +633,13 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
     }
   }
 
+  private MetadataRecordsGenerationParams getRecordsGenerationParams() {
+    return new MetadataRecordsGenerationParams(
+        dataMetaClient, enabledPartitionTypes, 
dataWriteConfig.getBloomFilterType(),
+        dataWriteConfig.getBloomIndexParallelism(),

Review comment:
       I feel we can't use the data table's bloom index parallelism here. may 
be we can re-use file listing parallelism or introduce new config.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -329,48 +321,49 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
       });
     });
 
-    return engineContext.map(deleteFileList, deleteFileInfo -> {
-      return HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, 
ByteBuffer.allocate(0), true);
-    }, 1).stream().collect(Collectors.toList());
+    HoodieData<Pair<String, String>> deleteFileListRDD = 
engineContext.parallelize(deleteFileList,
+        Math.max(deleteFileList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+    return deleteFileListRDD.map(deleteFileInfo -> 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+        deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, 
StringUtils.EMPTY_STRING,
+        ByteBuffer.allocate(0), true));
   }
 
   /**
    * Convert clean metadata to column stats index records.
    *
-   * @param cleanMetadata     - Clean action metadata
-   * @param engineContext     - Engine context
-   * @param datasetMetaClient - data table meta client
+   * @param cleanMetadata           - Clean action metadata
+   * @param engineContext           - Engine context
+   * @param recordsGenerationParams - Parameters for bloom filter record 
generation
    * @return List of column stats index records for the clean metadata
    */
-  public static List<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
-                                                                       
HoodieEngineContext engineContext,
-                                                                       
HoodieTableMetaClient datasetMetaClient) {
+  public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
+                                                                             
HoodieEngineContext engineContext,
+                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
     List<Pair<String, String>> deleteFileList = new ArrayList<>();
     cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
       // Files deleted from a partition
       List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
       deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, 
entry)));
     });
 
-    List<String> latestColumns = getLatestColumns(datasetMetaClient);
-    return engineContext.flatMap(deleteFileList,
-        deleteFileInfo -> {
-          if 
(deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
 {
-            return getColumnStats(deleteFileInfo.getKey(), 
deleteFileInfo.getValue(), datasetMetaClient,
-                latestColumns, true);
-          }
-          return Stream.empty();
-        }, 1).stream().collect(Collectors.toList());
+    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient());

Review comment:
       sorry, with getLatestColumns(), I see 2nd arg as 
isMetaIndexColumnStatsForAllColumns. shouldn't we be setting it appropriately ? 
why letting it be false here? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -608,82 +583,126 @@ private static void 
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
   }
 
   /**
-   * Convert rollback action metadata to bloom filter index records.
+   * Convert added and deleted files metadata to bloom filter index records.
    */
-  private static List<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
-                                                                     
HoodieTableMetaClient dataMetaClient,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
-      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
-        return;
-      }
+  public static HoodieData<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                          
Map<String, List<String>> partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
MetadataRecordsGenerationParams recordsGenerationParams,
+                                                                          
String instantTime) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+
+    List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
+    HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
+      final List<String> deletedFileList = 
partitionToDeletedFilesEntry.getRight();
+      return deletedFileList.stream().flatMap(deletedFile -> {
+        if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+          return Stream.empty();
+        }
 
-      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
-    }));
+        final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+        return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+            partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, 
ByteBuffer.allocate(0), true));
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
 
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+    List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+      final String partitionName = partitionToAppendedFilesEntry.getKey();
+      final Map<String, Long> appendedFileMap = 
partitionToAppendedFilesEntry.getValue();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      appendedFileMap.forEach((appendedFile, length) -> {
+      return 
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
+        final String appendedFile = appendedFileLengthPairEntry.getKey();
         if (!FSUtils.isBaseFile(new Path(appendedFile))) {
-          return;
+          return Stream.empty();
         }
         final String pathWithPartition = partitionName + "/" + appendedFile;
-        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), 
pathWithPartition);
-        try {
-          HoodieFileReader<IndexedRecord> fileReader =
-              
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), 
appendedFilePath);
+        final Path appendedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
+        try (HoodieFileReader<IndexedRecord> fileReader =
+                 
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
 appendedFilePath)) {
           final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
           if (fileBloomFilter == null) {
             LOG.error("Failed to read bloom filter for " + appendedFilePath);
-            return;
+            return Stream.empty();
           }
           ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
           HoodieRecord record = 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
-              partition, appendedFile, instantTime, bloomByteBuffer, false);
-          records.add(record);
-          fileReader.close();
+              partition, appendedFile, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
+          return Stream.of(record);
         } catch (IOException e) {
           LOG.error("Failed to get bloom filter for file: " + 
appendedFilePath);
         }
-      });
+        return Stream.empty();
+      }).iterator();
     });
-    return records;
+    allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
+
+    return allRecordsRDD;
   }
 
   /**
-   * Convert rollback action metadata to column stats index records.
+   * Convert added and deleted action metadata to column stats index records.
    */
-  private static List<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
-                                                                     
HoodieTableMetaClient datasetMetaClient,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> latestColumns = getLatestColumns(datasetMetaClient);
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
+  public static HoodieData<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                          
Map<String, List<String>> partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
MetadataRecordsGenerationParams recordsGenerationParams) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient());
+
+    final List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+      final List<String> deletedFileList = 
partitionToDeletedFilesEntry.getRight();
+
+      return deletedFileList.stream().flatMap(deletedFile -> {
         final String filePathWithPartition = partitionName + "/" + deletedFile;
-        records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,
-            latestColumns, true).collect(Collectors.toList()));
-      }
-    }));
-
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> 
appendedFileMap.forEach(
-        (appendedFile, size) -> {
-          final String partition = partitionName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionName;
-          if 
(appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-            final String filePathWithPartition = partitionName + "/" + 
appendedFile;
-            records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,
-                latestColumns, false).collect(Collectors.toList()));
-          }
-        }));
-    return records;
+        return getColumnStats(partition, filePathWithPartition, 
recordsGenerationParams.getDataMetaClient(),
+            columnsToIndex, Option.empty(), true);
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+
+    final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, Map<String, Long>>> 
partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), 
recordsGenerationParams.getBloomIndexParallelism()));

Review comment:
       why are we using bloom index parallelism in col stats generation ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -416,47 +412,26 @@ private static void 
processRestoreMetadata(HoodieActiveTimeline metadataTableTim
                                              Map<String, Map<String, Long>> 
partitionToAppendedFiles,
                                              Map<String, List<String>> 
partitionToDeletedFiles,
                                              Option<String> lastSyncTs) {
-    restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
-      rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm,
-          partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs));
-    });
+    restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> 
rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm,
+        partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)));
   }
 
   /**
    * Convert rollback action metadata to metadata table records.
    */
   public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMetadataToRecords(
-      HoodieEngineContext engineContext, List<MetadataPartitionType> 
enabledPartitionTypes,
-      HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata 
rollbackMetadata,
-      HoodieTableMetaClient dataMetaClient, String instantTime, Option<String> 
lastSyncTs, boolean wasSynced) {
+      HoodieEngineContext engineContext, HoodieActiveTimeline 
metadataTableTimeline,
+      HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams 
recordsGenerationParams,
+      String instantTime, Option<String> lastSyncTs, boolean wasSynced) {
     final Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordsMap = new HashMap<>();
 
     Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
     Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
     List<HoodieRecord> filesPartitionRecords = 
convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata,
         partitionToDeletedFiles, partitionToAppendedFiles, instantTime, 
lastSyncTs, wasSynced);
     final HoodieData<HoodieRecord> rollbackRecordsRDD = 
engineContext.parallelize(filesPartitionRecords, 1);
-    partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD);
-
-    if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
-      final List<HoodieRecord> metadataBloomFilterRecords = 
convertFilesToBloomFilterRecords(
-          engineContext, dataMetaClient, partitionToDeletedFiles, 
partitionToAppendedFiles, instantTime);
-      if (!metadataBloomFilterRecords.isEmpty()) {
-        final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD = 
engineContext.parallelize(metadataBloomFilterRecords, 1);
-        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
metadataBloomFilterRecordsRDD);
-      }
-    }
-
-    if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
-      final List<HoodieRecord> metadataColumnStats = 
convertFilesToColumnStatsRecords(
-          engineContext, dataMetaClient, partitionToDeletedFiles, 
partitionToAppendedFiles, instantTime);
-      if (!metadataColumnStats.isEmpty()) {
-        final HoodieData<HoodieRecord> metadataColumnStatsRDD = 
engineContext.parallelize(metadataColumnStats, 1);
-        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
metadataColumnStatsRDD);
-      }
-    }
-
-    return partitionToRecordsMap;
+    return getMetadataPartitionTypeHoodieDataMap(engineContext, 
recordsGenerationParams, instantTime,

Review comment:
       may be I see the reason why. we can have a callback and call into that 
so that its consistent. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -381,26 +374,29 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
     final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
engineContext.parallelize(

Review comment:
       not sure why FILES is here and BLOOM and col stats are in 
getMetadataPartitionTypeHoodieDataMap. can we also move FILES record generation 
to getMetadataPartitionTypeHoodieDataMap. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -847,45 +852,52 @@ public static HoodieTableFileSystemView 
getFileSystemView(HoodieTableMetaClient
     }
   }
 
-  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient) {
-    return getLatestColumns(datasetMetaClient, false);
+  private static List<String> getColumnsToIndex(HoodieTableMetaClient 
datasetMetaClient) {
+    return getColumnsToIndex(datasetMetaClient, false);
   }
 
   public static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
                                                                      
HoodieTableMetaClient datasetMetaClient,
-                                                                     
List<String> latestColumns) {
-    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, latestColumns, false);
+                                                                     
List<String> columnsToIndex) {
+    Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> columnRangeMap 
= Option.empty();
+    if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) 
writeStat).getRecordsStats().isPresent()) {
+      columnRangeMap = Option.of(((HoodieDeltaWriteStat) 
writeStat).getRecordsStats().get().getStats());
+    }
+    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, columnsToIndex,

Review comment:
       I don't see much value in passing columnRangeMap to getColumnStats. 
might as well do 
   ```
   List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new 
ArrayList<>(columnRangeMap.get().values());
         return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
columnRangeMetadataList, isDeleted);
   ```
   here within if block and call getColumnStats() only in else block. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to