This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new ccc3728 [HUDI-3684] Fixing NPE in `ParquetUtils` (#5102) ccc3728 is described below commit ccc3728002533978cb35b3b4c22cb4d0ef087347 Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Thu Mar 24 05:07:38 2022 -0700 [HUDI-3684] Fixing NPE in `ParquetUtils` (#5102) * Make sure nulls are properly handled in `HoodieColumnRangeMetadata` --- .../common/model/HoodieColumnRangeMetadata.java | 58 ++++++++++++++++++---- .../org/apache/hudi/common/util/ParquetUtils.java | 29 ++++++++--- .../hudi/metadata/HoodieTableMetadataUtil.java | 9 ++-- .../utilities/HoodieMetadataTableValidator.java | 4 +- 4 files changed, 75 insertions(+), 25 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index d098c4f..2afbd19 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -18,19 +18,27 @@ package org.apache.hudi.common.model; +import javax.annotation.Nullable; import java.io.Serializable; -import java.util.Arrays; import java.util.Comparator; import java.util.Objects; import java.util.function.BiFunction; +import java.util.stream.Stream; /** - * Hoodie Range metadata. + * Hoodie metadata for the column range of data stored in columnar format (like Parquet) + * + * NOTE: {@link Comparable} is used as raw-type so that we can handle polymorphism, where + * caller apriori is not aware of the type {@link HoodieColumnRangeMetadata} is + * associated with */ -public class HoodieColumnRangeMetadata<T> implements Serializable { +@SuppressWarnings("rawtype") +public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializable { private final String filePath; private final String columnName; + @Nullable private final T minValue; + @Nullable private final T maxValue; private final long nullCount; private final long valueCount; @@ -38,21 +46,30 @@ public class HoodieColumnRangeMetadata<T> implements Serializable { private final long totalUncompressedSize; public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION = - (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<>( + (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<Comparable>( newColumnRange.getFilePath(), newColumnRange.getColumnName(), - (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) - .stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null), - (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) - .stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null), + (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) + .filter(Objects::nonNull) + .min(Comparator.naturalOrder()) + .orElse(null), + (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) + .filter(Objects::nonNull) + .max(Comparator.naturalOrder()).orElse(null), oldColumnRange.getNullCount() + newColumnRange.getNullCount(), oldColumnRange.getValueCount() + newColumnRange.getValueCount(), oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(), oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize() ); - public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, - final long nullCount, long valueCount, long totalSize, long totalUncompressedSize) { + private HoodieColumnRangeMetadata(String filePath, + String columnName, + @Nullable T minValue, + @Nullable T maxValue, + long nullCount, + long valueCount, + long totalSize, + long totalUncompressedSize) { this.filePath = filePath; this.columnName = columnName; this.minValue = minValue; @@ -71,10 +88,12 @@ public class HoodieColumnRangeMetadata<T> implements Serializable { return this.columnName; } + @Nullable public T getMinValue() { return this.minValue; } + @Nullable public T getMaxValue() { return this.maxValue; } @@ -133,6 +152,23 @@ public class HoodieColumnRangeMetadata<T> implements Serializable { + '}'; } + public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> create(String filePath, + String columnName, + @Nullable T minValue, + @Nullable T maxValue, + long nullCount, + long valueCount, + long totalSize, + long totalUncompressedSize) { + return new HoodieColumnRangeMetadata<>(filePath, columnName, minValue, maxValue, nullCount, valueCount, totalSize, totalUncompressedSize); + } + + @SuppressWarnings("rawtype") + public static HoodieColumnRangeMetadata<Comparable> stub(String filePath, + String columnName) { + return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1); + } + /** * Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index. */ @@ -144,6 +180,6 @@ public class HoodieColumnRangeMetadata<T> implements Serializable { public static final String TOTAL_SIZE = "total_size"; public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size"; - private Stats() { } + private Stats() {} } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index e74f4f7..c0f7aab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -288,18 +289,27 @@ public class ParquetUtils extends BaseFileUtils { /** * Parse min/max statistics stored in parquet footers for all columns. */ + @SuppressWarnings("rawtype") public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata( @Nonnull Configuration conf, @Nonnull Path parquetFilePath, @Nonnull List<String> cols ) { ParquetMetadata metadata = readMetadata(conf, parquetFilePath); + + // NOTE: This collector has to have fully specialized generic type params since + // Java 1.8 struggles to infer them + Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector = + Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName); + // Collect stats from all individual Parquet blocks - Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().sequential() - .flatMap(blockMetaData -> blockMetaData.getColumns().stream() - .filter(f -> cols.contains(f.getPath().toDotString())) + Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = + (Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential() + .flatMap(blockMetaData -> + blockMetaData.getColumns().stream() + .filter(f -> cols.contains(f.getPath().toDotString())) .map(columnChunkMetaData -> - new HoodieColumnRangeMetadata<Comparable>( + HoodieColumnRangeMetadata.<Comparable>create( parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(), convertToNativeJavaType( @@ -312,7 +322,8 @@ public class ParquetUtils extends BaseFileUtils { columnChunkMetaData.getValueCount(), columnChunkMetaData.getTotalSize(), columnChunkMetaData.getTotalUncompressedSize())) - ).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName)); + ) + .collect(groupingByCollector); // Combine those into file-level statistics // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer @@ -360,7 +371,7 @@ public class ParquetUtils extends BaseFileUtils { maxValue = one.getMaxValue(); } - return new HoodieColumnRangeMetadata<T>( + return HoodieColumnRangeMetadata.create( one.getFilePath(), one.getColumnName(), minValue, maxValue, one.getNullCount() + another.getNullCount(), @@ -369,7 +380,11 @@ public class ParquetUtils extends BaseFileUtils { one.getTotalUncompressedSize() + another.getTotalUncompressedSize()); } - private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) { + private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable<?> val) { + if (val == null) { + return null; + } + if (primitiveType.getOriginalType() == OriginalType.DECIMAL) { return extractDecimal(val, primitiveType.getDecimalMetadata()); } else if (primitiveType.getOriginalType() == OriginalType.DATE) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 4390e87..4d6c602 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -960,8 +960,7 @@ public class HoodieTableMetadataUtil { } else { // TODO we should delete records instead of stubbing them columnRangeMetadataList = - columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata<Comparable>(fileName, - entry, null, null, 0, 0, 0, 0)) + columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)) .collect(Collectors.toList()); } return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted); @@ -1012,11 +1011,11 @@ public class HoodieTableMetadataUtil { Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap, Map<String, Map<String, Object>> columnToStats) { Map<String, Object> columnStats = columnToStats.get(field.name()); - HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>( + HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = HoodieColumnRangeMetadata.create( filePath, field.name(), - String.valueOf(columnStats.get(MIN)), - String.valueOf(columnStats.get(MAX)), + (Comparable) String.valueOf(columnStats.get(MIN)), + (Comparable) String.valueOf(columnStats.get(MAX)), Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()), Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()), Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()), diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 832d942..af0c100 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -780,7 +780,7 @@ public class HoodieMetadataTableValidator implements Serializable { return allColumnNameList.stream() .flatMap(columnName -> tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream() - .map(stats -> new HoodieColumnRangeMetadata<>( + .map(stats -> HoodieColumnRangeMetadata.create( stats.getFileName(), columnName, stats.getMinValue(), @@ -799,7 +799,7 @@ public class HoodieMetadataTableValidator implements Serializable { metaClient.getHadoopConf(), new Path(new Path(metaClient.getBasePath(), partitionPath), filename), allColumnNameList).stream()) - .map(rangeMetadata -> new HoodieColumnRangeMetadata<String>( + .map(rangeMetadata -> HoodieColumnRangeMetadata.create( rangeMetadata.getFilePath(), rangeMetadata.getColumnName(), // Note: here we ignore the type in the validation,