nsivabalan commented on a change in pull request #4761:
URL: https://github.com/apache/hudi/pull/4761#discussion_r804981688
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
##########
@@ -33,6 +39,24 @@
private final long totalSize;
private final long totalUncompressedSize;
+ public static final BiFunction<HoodieColumnRangeMetadata<Comparable>,
HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>>
COLUMN_RANGE_MERGE_FUNCTION =
+ (oldColumnRange, newColumnRange) -> {
+
ValidationUtils.checkArgument(oldColumnRange.getColumnName().equals(newColumnRange.getColumnName()));
Review comment:
do we need to validate for every record? can we move validations one
level up and avoid this may be. since this is called or every record in an
iterative manner, trying to see if this is over kill
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MetadataRecordsGenerationParams implements Serializable {
Review comment:
java docs
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -608,82 +583,126 @@ private static void
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
}
/**
- * Convert rollback action metadata to bloom filter index records.
+ * Convert added and deleted files metadata to bloom filter index records.
*/
- private static List<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
-
HoodieTableMetaClient dataMetaClient,
-
Map<String, List<String>> partitionToDeletedFiles,
-
Map<String, Map<String, Long>> partitionToAppendedFiles,
- String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return;
- }
+ public static HoodieData<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+
MetadataRecordsGenerationParams recordsGenerationParams,
+
String instantTime) {
+ HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+
+ List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
+ .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
+ HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList,
+ Math.max(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+
+ HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+ final String partitionName = partitionToDeletedFilesEntry.getLeft();
+ final List<String> deletedFileList =
partitionToDeletedFilesEntry.getRight();
+ return deletedFileList.stream().flatMap(deletedFile -> {
+ if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+ return Stream.empty();
+ }
- final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
- records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
- }));
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
+ }).iterator();
+ });
+ allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
- partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+ List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet()
+ .stream().map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
+ HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList,
+ Math.max(partitionToAppendedFiles.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+
+ HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+ final String partitionName = partitionToAppendedFilesEntry.getKey();
+ final Map<String, Long> appendedFileMap =
partitionToAppendedFilesEntry.getValue();
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
- appendedFileMap.forEach((appendedFile, length) -> {
+ return
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
+ final String appendedFile = appendedFileLengthPairEntry.getKey();
if (!FSUtils.isBaseFile(new Path(appendedFile))) {
- return;
+ return Stream.empty();
}
final String pathWithPartition = partitionName + "/" + appendedFile;
- final Path appendedFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
- try {
- HoodieFileReader<IndexedRecord> fileReader =
-
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(),
appendedFilePath);
+ final Path appendedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
+ try (HoodieFileReader<IndexedRecord> fileReader =
+
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
appendedFilePath)) {
final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
if (fileBloomFilter == null) {
LOG.error("Failed to read bloom filter for " + appendedFilePath);
- return;
+ return Stream.empty();
Review comment:
shouldn't we be throwing exception here? how an a base file don't have
bloom filter?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -339,6 +383,11 @@ private void processAppendResult(AppendResult result) {
updateWriteStatus(stat, result);
}
+ Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap =
stat.getRecordsStats().isPresent()
Review comment:
we should populate this only if metadata table and meta index is enabled
right? or did we decide to serialize this info irrespective of it. bcoz,
computing these stats will def add to write latency. So, trying to see if we
can avoid if not required at all.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -642,6 +633,13 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
}
}
+ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
+ return new MetadataRecordsGenerationParams(
+ dataMetaClient, enabledPartitionTypes,
dataWriteConfig.getBloomFilterType(),
+ dataWriteConfig.getBloomIndexParallelism(),
Review comment:
I feel we can't use the data table's bloom index parallelism here. may
be we can re-use file listing parallelism or introduce new config.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -329,48 +321,49 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
});
});
- return engineContext.map(deleteFileList, deleteFileInfo -> {
- return HoodieMetadataPayload.createBloomFilterMetadataRecord(
- deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime,
ByteBuffer.allocate(0), true);
- }, 1).stream().collect(Collectors.toList());
+ HoodieData<Pair<String, String>> deleteFileListRDD =
engineContext.parallelize(deleteFileList,
+ Math.max(deleteFileList.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+ return deleteFileListRDD.map(deleteFileInfo ->
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime,
StringUtils.EMPTY_STRING,
+ ByteBuffer.allocate(0), true));
}
/**
* Convert clean metadata to column stats index records.
*
- * @param cleanMetadata - Clean action metadata
- * @param engineContext - Engine context
- * @param datasetMetaClient - data table meta client
+ * @param cleanMetadata - Clean action metadata
+ * @param engineContext - Engine context
+ * @param recordsGenerationParams - Parameters for bloom filter record
generation
* @return List of column stats index records for the clean metadata
*/
- public static List<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
-
HoodieEngineContext engineContext,
-
HoodieTableMetaClient datasetMetaClient) {
+ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
+
HoodieEngineContext engineContext,
+
MetadataRecordsGenerationParams recordsGenerationParams) {
List<Pair<String, String>> deleteFileList = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partition,
partitionMetadata) -> {
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition,
entry)));
});
- List<String> latestColumns = getLatestColumns(datasetMetaClient);
- return engineContext.flatMap(deleteFileList,
- deleteFileInfo -> {
- if
(deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
{
- return getColumnStats(deleteFileInfo.getKey(),
deleteFileInfo.getValue(), datasetMetaClient,
- latestColumns, true);
- }
- return Stream.empty();
- }, 1).stream().collect(Collectors.toList());
+ final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams.getDataMetaClient());
Review comment:
sorry, with getLatestColumns(), I see 2nd arg as
isMetaIndexColumnStatsForAllColumns. shouldn't we be setting it appropriately ?
why letting it be false here?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -608,82 +583,126 @@ private static void
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
}
/**
- * Convert rollback action metadata to bloom filter index records.
+ * Convert added and deleted files metadata to bloom filter index records.
*/
- private static List<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
-
HoodieTableMetaClient dataMetaClient,
-
Map<String, List<String>> partitionToDeletedFiles,
-
Map<String, Map<String, Long>> partitionToAppendedFiles,
- String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return;
- }
+ public static HoodieData<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+
MetadataRecordsGenerationParams recordsGenerationParams,
+
String instantTime) {
+ HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+
+ List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
+ .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
+ HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList,
+ Math.max(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+
+ HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+ final String partitionName = partitionToDeletedFilesEntry.getLeft();
+ final List<String> deletedFileList =
partitionToDeletedFilesEntry.getRight();
+ return deletedFileList.stream().flatMap(deletedFile -> {
+ if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+ return Stream.empty();
+ }
- final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
- records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
- }));
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
+ }).iterator();
+ });
+ allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
- partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+ List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet()
+ .stream().map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
+ HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList,
+ Math.max(partitionToAppendedFiles.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+
+ HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+ final String partitionName = partitionToAppendedFilesEntry.getKey();
+ final Map<String, Long> appendedFileMap =
partitionToAppendedFilesEntry.getValue();
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
- appendedFileMap.forEach((appendedFile, length) -> {
+ return
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
+ final String appendedFile = appendedFileLengthPairEntry.getKey();
if (!FSUtils.isBaseFile(new Path(appendedFile))) {
- return;
+ return Stream.empty();
}
final String pathWithPartition = partitionName + "/" + appendedFile;
- final Path appendedFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
- try {
- HoodieFileReader<IndexedRecord> fileReader =
-
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(),
appendedFilePath);
+ final Path appendedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
+ try (HoodieFileReader<IndexedRecord> fileReader =
+
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
appendedFilePath)) {
final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
if (fileBloomFilter == null) {
LOG.error("Failed to read bloom filter for " + appendedFilePath);
- return;
+ return Stream.empty();
}
ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, appendedFile, instantTime, bloomByteBuffer, false);
- records.add(record);
- fileReader.close();
+ partition, appendedFile, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
+ return Stream.of(record);
} catch (IOException e) {
LOG.error("Failed to get bloom filter for file: " +
appendedFilePath);
}
- });
+ return Stream.empty();
+ }).iterator();
});
- return records;
+ allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
+
+ return allRecordsRDD;
}
/**
- * Convert rollback action metadata to column stats index records.
+ * Convert added and deleted action metadata to column stats index records.
*/
- private static List<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
-
HoodieTableMetaClient datasetMetaClient,
-
Map<String, List<String>> partitionToDeletedFiles,
-
Map<String, Map<String, Long>> partitionToAppendedFiles,
- String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- List<String> latestColumns = getLatestColumns(datasetMetaClient);
- partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
+ public static HoodieData<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+
MetadataRecordsGenerationParams recordsGenerationParams) {
+ HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+ final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams.getDataMetaClient());
+
+ final List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
+ .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
+ final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList,
+ Math.max(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+
+ HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+ final String partitionName = partitionToDeletedFilesEntry.getLeft();
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
- if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ final List<String> deletedFileList =
partitionToDeletedFilesEntry.getRight();
+
+ return deletedFileList.stream().flatMap(deletedFile -> {
final String filePathWithPartition = partitionName + "/" + deletedFile;
- records.addAll(getColumnStats(partition, filePathWithPartition,
datasetMetaClient,
- latestColumns, true).collect(Collectors.toList()));
- }
- }));
-
- partitionToAppendedFiles.forEach((partitionName, appendedFileMap) ->
appendedFileMap.forEach(
- (appendedFile, size) -> {
- final String partition = partitionName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionName;
- if
(appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- final String filePathWithPartition = partitionName + "/" +
appendedFile;
- records.addAll(getColumnStats(partition, filePathWithPartition,
datasetMetaClient,
- latestColumns, false).collect(Collectors.toList()));
- }
- }));
- return records;
+ return getColumnStats(partition, filePathWithPartition,
recordsGenerationParams.getDataMetaClient(),
+ columnsToIndex, Option.empty(), true);
+ }).iterator();
+ });
+ allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+
+ final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet()
+ .stream().map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
+ final HoodieData<Pair<String, Map<String, Long>>>
partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList,
+ Math.max(partitionToAppendedFiles.size(),
recordsGenerationParams.getBloomIndexParallelism()));
Review comment:
why are we using bloom index parallelism in col stats generation ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -416,47 +412,26 @@ private static void
processRestoreMetadata(HoodieActiveTimeline metadataTableTim
Map<String, Map<String, Long>>
partitionToAppendedFiles,
Map<String, List<String>>
partitionToDeletedFiles,
Option<String> lastSyncTs) {
- restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
- rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm,
- partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs));
- });
+ restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms ->
rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm,
+ partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)));
}
/**
* Convert rollback action metadata to metadata table records.
*/
public static Map<MetadataPartitionType, HoodieData<HoodieRecord>>
convertMetadataToRecords(
- HoodieEngineContext engineContext, List<MetadataPartitionType>
enabledPartitionTypes,
- HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata
rollbackMetadata,
- HoodieTableMetaClient dataMetaClient, String instantTime, Option<String>
lastSyncTs, boolean wasSynced) {
+ HoodieEngineContext engineContext, HoodieActiveTimeline
metadataTableTimeline,
+ HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams
recordsGenerationParams,
+ String instantTime, Option<String> lastSyncTs, boolean wasSynced) {
final Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordsMap = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
List<HoodieRecord> filesPartitionRecords =
convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata,
partitionToDeletedFiles, partitionToAppendedFiles, instantTime,
lastSyncTs, wasSynced);
final HoodieData<HoodieRecord> rollbackRecordsRDD =
engineContext.parallelize(filesPartitionRecords, 1);
- partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD);
-
- if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
- final List<HoodieRecord> metadataBloomFilterRecords =
convertFilesToBloomFilterRecords(
- engineContext, dataMetaClient, partitionToDeletedFiles,
partitionToAppendedFiles, instantTime);
- if (!metadataBloomFilterRecords.isEmpty()) {
- final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
engineContext.parallelize(metadataBloomFilterRecords, 1);
- partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
metadataBloomFilterRecordsRDD);
- }
- }
-
- if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
- final List<HoodieRecord> metadataColumnStats =
convertFilesToColumnStatsRecords(
- engineContext, dataMetaClient, partitionToDeletedFiles,
partitionToAppendedFiles, instantTime);
- if (!metadataColumnStats.isEmpty()) {
- final HoodieData<HoodieRecord> metadataColumnStatsRDD =
engineContext.parallelize(metadataColumnStats, 1);
- partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS,
metadataColumnStatsRDD);
- }
- }
-
- return partitionToRecordsMap;
+ return getMetadataPartitionTypeHoodieDataMap(engineContext,
recordsGenerationParams, instantTime,
Review comment:
may be I see the reason why. we can have a callback and call into that
so that its consistent.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -381,26 +374,29 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
engineContext.parallelize(
Review comment:
not sure why FILES is here and BLOOM and col stats are in
getMetadataPartitionTypeHoodieDataMap. can we also move FILES record generation
to getMetadataPartitionTypeHoodieDataMap.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -847,45 +852,52 @@ public static HoodieTableFileSystemView
getFileSystemView(HoodieTableMetaClient
}
}
- private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient) {
- return getLatestColumns(datasetMetaClient, false);
+ private static List<String> getColumnsToIndex(HoodieTableMetaClient
datasetMetaClient) {
+ return getColumnsToIndex(datasetMetaClient, false);
}
public static Stream<HoodieRecord>
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
HoodieTableMetaClient datasetMetaClient,
-
List<String> latestColumns) {
- return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(),
datasetMetaClient, latestColumns, false);
+
List<String> columnsToIndex) {
+ Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> columnRangeMap
= Option.empty();
+ if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)
writeStat).getRecordsStats().isPresent()) {
+ columnRangeMap = Option.of(((HoodieDeltaWriteStat)
writeStat).getRecordsStats().get().getStats());
+ }
+ return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(),
datasetMetaClient, columnsToIndex,
Review comment:
I don't see much value in passing columnRangeMap to getColumnStats.
might as well do
```
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new
ArrayList<>(columnRangeMap.get().values());
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadataList, isDeleted);
```
here within if block and call getColumnStats() only in else block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]