Copilot commented on code in PR #61356: URL: https://github.com/apache/doris/pull/61356#discussion_r2937911969
########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetKvMetadata.java: ########## @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.ParquetMetadataTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** parquet_kv_metadata */ +public class ParquetKvMetadata extends TableValuedFunction { + public ParquetKvMetadata(Properties properties) { + super("parquet_kv_metadata", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map<String, String> arguments = getTVFProperties().getMap(); + arguments.put("mode", "parquet_kv_metadata"); + return new ParquetMetadataTableValuedFunction(arguments); + } catch (Throwable t) { Review Comment: `getTVFProperties().getMap()` returns the underlying properties map, which (from the parser) is an `ImmutableMap`. Mutating it via `arguments.put("mode", ...)` will throw `UnsupportedOperationException` at runtime. Create a mutable copy (e.g., `new HashMap<>(getTVFProperties().getMap())`) before adding/overriding entries. ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,394 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.FileFormatConstants; +import org.apache.doris.datasource.property.storage.LocalProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Table-valued function parquet_meta for reading Parquet metadata. + * Currently works in two modes: + * - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + * - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata() + * - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata() + * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe() + */ Review Comment: The class-level Javadoc says "Currently works in two modes" but the implementation supports 5 distinct modes (`parquet_metadata`, `parquet_schema`, `parquet_file_metadata`, `parquet_kv_metadata`, `parquet_bloom_probe`). Update the comment to match to avoid misleading readers. ########## be/src/vec/exec/format/table/parquet_metadata_reader.cpp: ########## @@ -0,0 +1,880 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/parquet_metadata_reader.h" + +#include <fmt/format.h> + +#include <algorithm> +#include <array> +#include <cctype> +#include <cstring> +#include <memory> +#include <optional> +#include <unordered_map> +#include <utility> + +#include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "io/hdfs_builder.h" +#include "io/io_common.h" +#include "runtime/runtime_state.h" +#include "util/string_util.h" +#include "vec/columns/column_map.h" +#include "vec/columns/column_nullable.h" +#include "vec/common/string_view.h" +#include "vec/core/block.h" +#include "vec/core/field.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/parquet/parquet_thrift_util.h" +#include "vec/exec/format/parquet/schema_desc.h" +#include "vec/exec/format/parquet/vparquet_file_metadata.h" +#include "vec/exec/format/table/parquet_utils.h" + +namespace doris::vectorized { + +using namespace parquet_utils; + +class ParquetMetadataReader::ModeHandler { +public: + explicit ModeHandler(RuntimeState* state) : _state(state) {} + virtual ~ModeHandler() = default; + + virtual void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) = 0; + virtual Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) = 0; + +protected: + RuntimeState* _state = nullptr; + + static std::unordered_map<std::string, int> _build_name_to_pos_map( + const std::vector<SlotDescriptor*>& slots) { + std::unordered_map<std::string, int> name_to_pos; + name_to_pos.reserve(slots.size()); + for (size_t i = 0; i < slots.size(); ++i) { + name_to_pos.emplace(to_lower(slots[i]->col_name()), static_cast<int>(i)); + } + return name_to_pos; + } + + template <size_t N> + static void _init_slot_pos_map(const std::unordered_map<std::string, int>& name_to_pos, + const std::array<const char*, N>& column_names, + std::array<int, N>* slot_pos) { + slot_pos->fill(-1); + for (size_t i = 0; i < column_names.size(); ++i) { + auto it = name_to_pos.find(column_names[i]); + if (it != name_to_pos.end()) { + (*slot_pos)[i] = it->second; + } + } + } +}; + +class ParquetSchemaModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetSchemaModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kSchemaColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + RETURN_IF_ERROR(_append_schema_node(path, field, columns)); + } + return Status::OK(); + } + +private: + std::array<int, SCHEMA_COLUMN_COUNT> _slot_pos {}; + + static std::string _repetition_type_to_string(tparquet::FieldRepetitionType::type type) { + switch (type) { + case tparquet::FieldRepetitionType::REQUIRED: + return "REQUIRED"; + case tparquet::FieldRepetitionType::OPTIONAL: + return "OPTIONAL"; + case tparquet::FieldRepetitionType::REPEATED: + return "REPEATED"; + default: + return "UNKNOWN"; + } + } + + Status _append_schema_node(const std::string& path, const FieldSchema& field, + std::vector<MutableColumnPtr>& columns) { + auto insert_if_requested = [&](SchemaColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(SCHEMA_FILE_NAME, insert_string, path); + insert_if_requested(SCHEMA_NAME, insert_string, field.parquet_schema.name); + + if (field.parquet_schema.__isset.type) { + insert_if_requested(SCHEMA_TYPE, insert_string, + physical_type_to_string(field.parquet_schema.type)); + } else { + insert_if_requested(SCHEMA_TYPE, insert_null); + } + + if (field.parquet_schema.__isset.type_length) { + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_int64, + static_cast<Int64>(field.parquet_schema.type_length)); + } else { + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_null); + } + + if (field.parquet_schema.__isset.repetition_type) { + insert_if_requested(SCHEMA_REPETITION_TYPE, insert_string, + _repetition_type_to_string(field.parquet_schema.repetition_type)); + } else { + insert_if_requested(SCHEMA_REPETITION_TYPE, insert_null); + } + + int64_t num_children = field.parquet_schema.__isset.num_children + ? static_cast<int64_t>(field.parquet_schema.num_children) + : 0; + insert_if_requested(SCHEMA_NUM_CHILDREN, insert_int64, static_cast<Int64>(num_children)); + + if (field.parquet_schema.__isset.converted_type) { + insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_string, + converted_type_to_string(field.parquet_schema.converted_type)); + } else { + insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_null); + } + + if (field.parquet_schema.__isset.scale) { + insert_if_requested(SCHEMA_SCALE, insert_int64, + static_cast<Int64>(field.parquet_schema.scale)); + } else { + insert_if_requested(SCHEMA_SCALE, insert_null); + } + + if (field.parquet_schema.__isset.precision) { + insert_if_requested(SCHEMA_PRECISION, insert_int64, + static_cast<Int64>(field.parquet_schema.precision)); + } else { + insert_if_requested(SCHEMA_PRECISION, insert_null); + } + + if (field.parquet_schema.__isset.field_id) { + insert_if_requested(SCHEMA_FIELD_ID, insert_int64, + static_cast<Int64>(field.parquet_schema.field_id)); + } else { + insert_if_requested(SCHEMA_FIELD_ID, insert_null); + } + + std::string logical = logical_type_to_string(field.parquet_schema); + if (logical.empty()) { + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_null); + } else { + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_string, logical); + } + + for (const auto& child : field.children) { + RETURN_IF_ERROR(_append_schema_node(path, child, columns)); + } + return Status::OK(); + } +}; + +class ParquetMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + std::unordered_map<std::string, int> name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + std::unordered_map<std::string, const FieldSchema*> path_map; + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + build_path_map(field, "", &path_map); + } + + const int kv_pos = _slot_pos[META_KEY_VALUE_METADATA]; + bool has_kv_map = false; + Field kv_map_field; + if (kv_pos >= 0 && thrift_meta.__isset.key_value_metadata && + !thrift_meta.key_value_metadata.empty()) { + Array keys; + Array values; + keys.reserve(thrift_meta.key_value_metadata.size()); + values.reserve(thrift_meta.key_value_metadata.size()); + for (const auto& kv : thrift_meta.key_value_metadata) { + keys.emplace_back(Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.key))); + if (kv.__isset.value) { + values.emplace_back( + Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.value))); + } else { + values.emplace_back(Field {}); + } + } + Map map_value; + map_value.reserve(2); + map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(keys))); + map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(values))); + kv_map_field = Field::create_field<TYPE_MAP>(std::move(map_value)); + has_kv_map = true; + } + + for (size_t rg_index = 0; rg_index < thrift_meta.row_groups.size(); ++rg_index) { + const auto& row_group = thrift_meta.row_groups[rg_index]; + Int64 row_group_num_rows = static_cast<Int64>(row_group.num_rows); + Int64 row_group_num_columns = static_cast<Int64>(row_group.columns.size()); + Int64 row_group_bytes = static_cast<Int64>(row_group.total_byte_size); + Int64 row_group_compressed_bytes = 0; + if (row_group.__isset.total_compressed_size) { + row_group_compressed_bytes = static_cast<Int64>(row_group.total_compressed_size); + } else { + for (const auto& col_chunk : row_group.columns) { + if (!col_chunk.__isset.meta_data) { + continue; + } + row_group_compressed_bytes += col_chunk.meta_data.total_compressed_size; + } + } + + for (size_t col_idx = 0; col_idx < row_group.columns.size(); ++col_idx) { + const auto& column_chunk = row_group.columns[col_idx]; + if (!column_chunk.__isset.meta_data) { + continue; + } + const auto& column_meta = column_chunk.meta_data; + std::string path_in_schema = join_path(column_meta.path_in_schema); + const FieldSchema* schema_field = nullptr; + auto it = path_map.find(path_in_schema); + if (it != path_map.end()) { + schema_field = it->second; + } + + auto insert_if_requested = [&](MetadataColumnIndex idx, auto&& inserter, + auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(META_FILE_NAME, insert_string, + column_chunk.__isset.file_path ? column_chunk.file_path : path); + insert_if_requested(META_ROW_GROUP_ID, insert_int64, static_cast<Int64>(rg_index)); + insert_if_requested(META_ROW_GROUP_NUM_ROWS, insert_int64, row_group_num_rows); + insert_if_requested(META_ROW_GROUP_NUM_COLUMNS, insert_int64, + row_group_num_columns); + insert_if_requested(META_ROW_GROUP_BYTES, insert_int64, row_group_bytes); + insert_if_requested(META_COLUMN_ID, insert_int64, static_cast<Int64>(col_idx)); + + // `ColumnChunk.file_offset` is deprecated and can be 0 even when page offsets are present. + // Fall back to the first page (dictionary/data) offset to provide a useful value. + Int64 file_offset = static_cast<Int64>(column_chunk.file_offset); + if (file_offset == 0) { + if (column_meta.__isset.dictionary_page_offset) { + file_offset = static_cast<Int64>(column_meta.dictionary_page_offset); + } else { + file_offset = static_cast<Int64>(column_meta.data_page_offset); + } + } + insert_if_requested(META_FILE_OFFSET, insert_int64, file_offset); + insert_if_requested(META_NUM_VALUES, insert_int64, column_meta.num_values); + insert_if_requested(META_PATH_IN_SCHEMA, insert_string, path_in_schema); + insert_if_requested(META_TYPE, insert_string, + physical_type_to_string(column_meta.type)); + + if (column_meta.__isset.statistics) { + static const cctz::time_zone kUtc0 = cctz::utc_time_zone(); + const cctz::time_zone& ctz = _state != nullptr ? _state->timezone_obj() : kUtc0; + + const auto& stats = column_meta.statistics; + + if (stats.__isset.min) { + insert_if_requested(META_STATS_MIN, insert_string, + decode_statistics_value(schema_field, column_meta.type, + stats.min, ctz)); + } else { + insert_if_requested(META_STATS_MIN, insert_null); + } + if (stats.__isset.max) { + insert_if_requested(META_STATS_MAX, insert_string, + decode_statistics_value(schema_field, column_meta.type, + stats.max, ctz)); + } else { + insert_if_requested(META_STATS_MAX, insert_null); + } + + if (stats.__isset.null_count) { + insert_if_requested(META_STATS_NULL_COUNT, insert_int64, stats.null_count); + } else { + insert_if_requested(META_STATS_NULL_COUNT, insert_null); + } + if (stats.__isset.distinct_count) { + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_int64, + stats.distinct_count); + } else { + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null); + } + + // Prefer min_value/max_value, but fall back to deprecated min/max so the column + // is still populated for older files. + std::string encoded_min_value; + std::string encoded_max_value; + bool has_min_value = false; + bool has_max_value = false; + if (stats.__isset.min_value) { + encoded_min_value = stats.min_value; + has_min_value = true; + } else if (stats.__isset.min) { + encoded_min_value = stats.min; + has_min_value = true; + } + if (stats.__isset.max_value) { + encoded_max_value = stats.max_value; + has_max_value = true; + } else if (stats.__isset.max) { + encoded_max_value = stats.max; + has_max_value = true; + } + if (has_min_value) { + insert_if_requested(META_STATS_MIN_VALUE, insert_string, + decode_statistics_value(schema_field, column_meta.type, + encoded_min_value, ctz)); + } else { + insert_if_requested(META_STATS_MIN_VALUE, insert_null); + } + if (has_max_value) { + insert_if_requested(META_STATS_MAX_VALUE, insert_string, + decode_statistics_value(schema_field, column_meta.type, + encoded_max_value, ctz)); + } else { + insert_if_requested(META_STATS_MAX_VALUE, insert_null); + } + + if (stats.__isset.is_min_value_exact) { + insert_if_requested(META_MIN_IS_EXACT, insert_bool, + stats.is_min_value_exact); + } else { + insert_if_requested(META_MIN_IS_EXACT, insert_null); + } + if (stats.__isset.is_max_value_exact) { + insert_if_requested(META_MAX_IS_EXACT, insert_bool, + stats.is_max_value_exact); + } else { + insert_if_requested(META_MAX_IS_EXACT, insert_null); + } + } else { + insert_if_requested(META_STATS_MIN, insert_null); + insert_if_requested(META_STATS_MAX, insert_null); + insert_if_requested(META_STATS_NULL_COUNT, insert_null); + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null); + insert_if_requested(META_STATS_MIN_VALUE, insert_null); + insert_if_requested(META_STATS_MAX_VALUE, insert_null); + insert_if_requested(META_MIN_IS_EXACT, insert_null); + insert_if_requested(META_MAX_IS_EXACT, insert_null); + } + + insert_if_requested(META_COMPRESSION, insert_string, + compression_to_string(column_meta.codec)); + insert_if_requested(META_ENCODINGS, insert_string, + encodings_to_string(column_meta.encodings)); + + if (column_meta.__isset.index_page_offset) { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_int64, + column_meta.index_page_offset); + } else { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_null); + } + if (column_meta.__isset.dictionary_page_offset) { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_int64, + column_meta.dictionary_page_offset); + } else { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_null); + } + insert_if_requested(META_DATA_PAGE_OFFSET, insert_int64, + column_meta.data_page_offset); + + insert_if_requested(META_TOTAL_COMPRESSED_SIZE, insert_int64, + column_meta.total_compressed_size); + insert_if_requested(META_TOTAL_UNCOMPRESSED_SIZE, insert_int64, + column_meta.total_uncompressed_size); + + if (kv_pos >= 0) { + if (has_kv_map) { + columns[kv_pos]->insert(kv_map_field); + } else { + insert_null(columns[kv_pos]); + } + } + + if (column_meta.__isset.bloom_filter_offset) { + insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_int64, + column_meta.bloom_filter_offset); + } else { + insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_null); + } + if (column_meta.__isset.bloom_filter_length) { + insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_int64, + static_cast<Int64>(column_meta.bloom_filter_length)); + } else { + insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_null); + } + + insert_if_requested(META_ROW_GROUP_COMPRESSED_BYTES, insert_int64, + row_group_compressed_bytes); + } + } + return Status::OK(); + } + +private: + std::array<int, META_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetFileMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetFileMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kFileMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + + auto insert_if_requested = [&](FileMetadataColumnIndex idx, auto&& inserter, + auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(FILE_META_FILE_NAME, insert_string, path); + if (thrift_meta.__isset.created_by) { + insert_if_requested(FILE_META_CREATED_BY, insert_string, thrift_meta.created_by); + } else { + insert_if_requested(FILE_META_CREATED_BY, insert_null); + } + insert_if_requested(FILE_META_NUM_ROWS, insert_int64, + static_cast<Int64>(thrift_meta.num_rows)); + insert_if_requested(FILE_META_NUM_ROW_GROUPS, insert_int64, + static_cast<Int64>(thrift_meta.row_groups.size())); + insert_if_requested(FILE_META_FORMAT_VERSION, insert_int64, + static_cast<Int64>(thrift_meta.version)); + if (thrift_meta.__isset.encryption_algorithm) { + const auto& algo = thrift_meta.encryption_algorithm; + std::string algo_name; + if (algo.__isset.AES_GCM_V1) { + algo_name = "AES_GCM_V1"; + } else if (algo.__isset.AES_GCM_CTR_V1) { + algo_name = "AES_GCM_CTR_V1"; + } + if (!algo_name.empty()) { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_string, algo_name); + } else { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null); + } + } else { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null); + } + if (thrift_meta.__isset.footer_signing_key_metadata) { + insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_string, + thrift_meta.footer_signing_key_metadata); + } else { + insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_null); + } + return Status::OK(); + } + +private: + std::array<int, FILE_META_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetKeyValueModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetKeyValueModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kKeyValueColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (!thrift_meta.__isset.key_value_metadata || thrift_meta.key_value_metadata.empty()) { + return Status::OK(); + } + + auto insert_if_requested = [&](KeyValueColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + for (const auto& kv : thrift_meta.key_value_metadata) { + insert_if_requested(KV_FILE_NAME, insert_string, path); + insert_if_requested(KV_KEY, insert_string, kv.key); + if (kv.__isset.value) { + insert_if_requested(KV_VALUE, insert_string, kv.value); + } else { + insert_if_requested(KV_VALUE, insert_null); + } + } + return Status::OK(); + } + +private: + std::array<int, KV_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetBloomProbeModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + ParquetBloomProbeModeHandler(RuntimeState* state, TFileType::type file_type, + std::map<std::string, std::string> properties, std::string column, + std::string literal) + : ModeHandler(state), + _file_type(file_type), + _properties(std::move(properties)), + _column(std::move(column)), + _literal(std::move(literal)) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kBloomProbeColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const FieldSchema* schema = metadata->schema().get_column(_column); + if (schema == nullptr) { + return Status::InvalidArgument( + fmt::format("Column '{}' not found for parquet_bloom_probe", _column)); + } + int parquet_col_id = schema->physical_column_index; + PrimitiveType primitive_type = _get_primitive(schema->data_type); + if (!ParquetPredicate::bloom_filter_supported(primitive_type)) { + return Status::InvalidArgument( + fmt::format("Column '{}' type {} does not support parquet bloom filter probe", + _column, primitive_type)); + } + + std::string encoded_literal; + RETURN_IF_ERROR( + _encode_literal(schema->physical_type, primitive_type, _literal, &encoded_literal)); + + io::FileSystemProperties system_properties; + system_properties.system_type = _file_type; + system_properties.properties = _properties; + io::FileDescription file_desc; + file_desc.path = path; + io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader( + system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr)); + io::IOContext io_ctx; + + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + for (size_t rg_idx = 0; rg_idx < thrift_meta.row_groups.size(); ++rg_idx) { + if (parquet_col_id < 0 || + parquet_col_id >= thrift_meta.row_groups[rg_idx].columns.size()) { + return Status::InvalidArgument(fmt::format( + "Invalid column index {} for parquet_bloom_probe", parquet_col_id)); + } + const auto& column_chunk = thrift_meta.row_groups[rg_idx].columns[parquet_col_id]; + std::optional<bool> excludes; + if (column_chunk.__isset.meta_data && + column_chunk.meta_data.__isset.bloom_filter_offset) { + ParquetPredicate::ColumnStat stat; + auto st = ParquetPredicate::read_bloom_filter(column_chunk.meta_data, file_reader, + &io_ctx, &stat); + if (st.ok() && stat.bloom_filter) { + bool might_contain = stat.bloom_filter->test_bytes(encoded_literal.data(), + encoded_literal.size()); + excludes = !might_contain; + } + } + _emit_row(path, static_cast<Int64>(rg_idx), excludes, columns); + } + return Status::OK(); + } + +private: + std::array<int, BLOOM_COLUMN_COUNT> _slot_pos {}; + TFileType::type _file_type; + std::map<std::string, std::string> _properties; + std::string _column; + std::string _literal; + + PrimitiveType _get_primitive(const DataTypePtr& type) const { + if (auto nullable = typeid_cast<const DataTypeNullable*>(type.get())) { + return nullable->get_nested_type()->get_primitive_type(); + } + return type->get_primitive_type(); + } + + Status _encode_literal(tparquet::Type::type physical_type, PrimitiveType primitive_type, + const std::string& literal, std::string* out) const { + try { + switch (physical_type) { + case tparquet::Type::INT32: { + int64_t v = std::stoll(literal); + int32_t v32 = static_cast<int32_t>(v); + out->assign(reinterpret_cast<const char*>(&v32), sizeof(int32_t)); + return Status::OK(); + } + case tparquet::Type::INT64: { + int64_t v = std::stoll(literal); + out->assign(reinterpret_cast<const char*>(&v), sizeof(int64_t)); + return Status::OK(); + } + case tparquet::Type::FLOAT: { + float v = std::stof(literal); + out->assign(reinterpret_cast<const char*>(&v), sizeof(float)); + return Status::OK(); + } + case tparquet::Type::DOUBLE: { + double v = std::stod(literal); + out->assign(reinterpret_cast<const char*>(&v), sizeof(double)); + return Status::OK(); + } + case tparquet::Type::BYTE_ARRAY: { + // For string/blob, use raw bytes from the literal. + *out = literal; + return Status::OK(); + } + default: + break; + } + } catch (const std::exception& e) { + return Status::InvalidArgument(fmt::format( + "Failed to parse literal '{}' for parquet bloom probe: {}", literal, e.what())); + } + return Status::NotSupported( + fmt::format("Physical type {} for column '{}' not supported in parquet_bloom_probe", + physical_type, _column)); + } + + void _emit_row(const std::string& path, Int64 row_group_id, std::optional<bool> excludes, + std::vector<MutableColumnPtr>& columns) { + if (_slot_pos[BLOOM_FILE_NAME] >= 0) { + insert_string(columns[_slot_pos[BLOOM_FILE_NAME]], path); + } + if (_slot_pos[BLOOM_ROW_GROUP_ID] >= 0) { + insert_int32(columns[_slot_pos[BLOOM_ROW_GROUP_ID]], static_cast<Int32>(row_group_id)); + } + if (_slot_pos[BLOOM_EXCLUDES] >= 0) { + int32_t excludes_val = -1; // -1: no bloom filter present + if (excludes.has_value()) { + excludes_val = excludes.value() ? 1 : 0; + } + insert_int32(columns[_slot_pos[BLOOM_EXCLUDES]], excludes_val); + } + } +}; + +ParquetMetadataReader::ParquetMetadataReader(std::vector<SlotDescriptor*> slots, + RuntimeState* state, RuntimeProfile* profile, + TMetaScanRange scan_range) + : _state(state), _slots(std::move(slots)), _scan_range(std::move(scan_range)) { + (void)profile; +} + +ParquetMetadataReader::~ParquetMetadataReader() = default; + +Status ParquetMetadataReader::init_reader() { + RETURN_IF_ERROR(_init_from_scan_range(_scan_range)); + if (_mode_type == Mode::SCHEMA) { + _mode_handler = std::make_unique<ParquetSchemaModeHandler>(_state); + } else if (_mode_type == Mode::FILE_METADATA) { + _mode_handler = std::make_unique<ParquetFileMetadataModeHandler>(_state); + } else if (_mode_type == Mode::KEY_VALUE_METADATA) { + _mode_handler = std::make_unique<ParquetKeyValueModeHandler>(_state); + } else if (_mode_type == Mode::BLOOM_PROBE) { + _mode_handler = std::make_unique<ParquetBloomProbeModeHandler>( + _state, _file_type, _properties, _bloom_column, _bloom_literal); + } else { + _mode_handler = std::make_unique<ParquetMetadataModeHandler>(_state); + } + _mode_handler->init_slot_pos_map(_slots); + return Status::OK(); +} + +Status ParquetMetadataReader::_init_from_scan_range(const TMetaScanRange& scan_range) { + if (!scan_range.__isset.parquet_params) { + return Status::InvalidArgument( + "Missing parquet parameters for parquet_meta table function"); + } + const TParquetMetadataParams& params = scan_range.parquet_params; + std::vector<std::string> resolved_paths; + if (scan_range.__isset.serialized_splits && !scan_range.serialized_splits.empty()) { + resolved_paths.assign(scan_range.serialized_splits.begin(), + scan_range.serialized_splits.end()); + } else if (params.__isset.paths && !params.paths.empty()) { + resolved_paths.assign(params.paths.begin(), params.paths.end()); + } else { + return Status::InvalidArgument("Property 'path' must be set for parquet_meta"); Review Comment: The error message refers to `Property 'path'` but the thrift field (and FE) uses `paths`/`serialized_splits`. Consider aligning the message to the actual parameter name(s) to avoid confusing users when this is triggered. ########## be/src/vec/exec/format/table/parquet_metadata_reader.cpp: ########## @@ -0,0 +1,880 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/parquet_metadata_reader.h" + +#include <fmt/format.h> + +#include <algorithm> +#include <array> +#include <cctype> +#include <cstring> +#include <memory> +#include <optional> +#include <unordered_map> +#include <utility> + +#include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "io/hdfs_builder.h" +#include "io/io_common.h" +#include "runtime/runtime_state.h" +#include "util/string_util.h" +#include "vec/columns/column_map.h" +#include "vec/columns/column_nullable.h" +#include "vec/common/string_view.h" +#include "vec/core/block.h" +#include "vec/core/field.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/parquet/parquet_thrift_util.h" +#include "vec/exec/format/parquet/schema_desc.h" +#include "vec/exec/format/parquet/vparquet_file_metadata.h" +#include "vec/exec/format/table/parquet_utils.h" + +namespace doris::vectorized { + +using namespace parquet_utils; + +class ParquetMetadataReader::ModeHandler { +public: + explicit ModeHandler(RuntimeState* state) : _state(state) {} + virtual ~ModeHandler() = default; + + virtual void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) = 0; + virtual Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) = 0; + +protected: + RuntimeState* _state = nullptr; + + static std::unordered_map<std::string, int> _build_name_to_pos_map( + const std::vector<SlotDescriptor*>& slots) { + std::unordered_map<std::string, int> name_to_pos; + name_to_pos.reserve(slots.size()); + for (size_t i = 0; i < slots.size(); ++i) { + name_to_pos.emplace(to_lower(slots[i]->col_name()), static_cast<int>(i)); + } + return name_to_pos; + } + + template <size_t N> + static void _init_slot_pos_map(const std::unordered_map<std::string, int>& name_to_pos, + const std::array<const char*, N>& column_names, + std::array<int, N>* slot_pos) { + slot_pos->fill(-1); + for (size_t i = 0; i < column_names.size(); ++i) { + auto it = name_to_pos.find(column_names[i]); + if (it != name_to_pos.end()) { + (*slot_pos)[i] = it->second; + } + } + } +}; + +class ParquetSchemaModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetSchemaModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kSchemaColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + RETURN_IF_ERROR(_append_schema_node(path, field, columns)); + } + return Status::OK(); + } + +private: + std::array<int, SCHEMA_COLUMN_COUNT> _slot_pos {}; + + static std::string _repetition_type_to_string(tparquet::FieldRepetitionType::type type) { + switch (type) { + case tparquet::FieldRepetitionType::REQUIRED: + return "REQUIRED"; + case tparquet::FieldRepetitionType::OPTIONAL: + return "OPTIONAL"; + case tparquet::FieldRepetitionType::REPEATED: + return "REPEATED"; + default: + return "UNKNOWN"; + } + } + + Status _append_schema_node(const std::string& path, const FieldSchema& field, + std::vector<MutableColumnPtr>& columns) { + auto insert_if_requested = [&](SchemaColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(SCHEMA_FILE_NAME, insert_string, path); + insert_if_requested(SCHEMA_NAME, insert_string, field.parquet_schema.name); + + if (field.parquet_schema.__isset.type) { + insert_if_requested(SCHEMA_TYPE, insert_string, + physical_type_to_string(field.parquet_schema.type)); + } else { + insert_if_requested(SCHEMA_TYPE, insert_null); + } + + if (field.parquet_schema.__isset.type_length) { + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_int64, + static_cast<Int64>(field.parquet_schema.type_length)); + } else { + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_null); + } + + if (field.parquet_schema.__isset.repetition_type) { + insert_if_requested(SCHEMA_REPETITION_TYPE, insert_string, + _repetition_type_to_string(field.parquet_schema.repetition_type)); + } else { + insert_if_requested(SCHEMA_REPETITION_TYPE, insert_null); + } + + int64_t num_children = field.parquet_schema.__isset.num_children + ? static_cast<int64_t>(field.parquet_schema.num_children) + : 0; + insert_if_requested(SCHEMA_NUM_CHILDREN, insert_int64, static_cast<Int64>(num_children)); + + if (field.parquet_schema.__isset.converted_type) { + insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_string, + converted_type_to_string(field.parquet_schema.converted_type)); + } else { + insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_null); + } + + if (field.parquet_schema.__isset.scale) { + insert_if_requested(SCHEMA_SCALE, insert_int64, + static_cast<Int64>(field.parquet_schema.scale)); + } else { + insert_if_requested(SCHEMA_SCALE, insert_null); + } + + if (field.parquet_schema.__isset.precision) { + insert_if_requested(SCHEMA_PRECISION, insert_int64, + static_cast<Int64>(field.parquet_schema.precision)); + } else { + insert_if_requested(SCHEMA_PRECISION, insert_null); + } + + if (field.parquet_schema.__isset.field_id) { + insert_if_requested(SCHEMA_FIELD_ID, insert_int64, + static_cast<Int64>(field.parquet_schema.field_id)); + } else { + insert_if_requested(SCHEMA_FIELD_ID, insert_null); + } + + std::string logical = logical_type_to_string(field.parquet_schema); + if (logical.empty()) { + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_null); + } else { + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_string, logical); + } + + for (const auto& child : field.children) { + RETURN_IF_ERROR(_append_schema_node(path, child, columns)); + } + return Status::OK(); + } +}; + +class ParquetMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + std::unordered_map<std::string, int> name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + std::unordered_map<std::string, const FieldSchema*> path_map; + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + build_path_map(field, "", &path_map); + } + + const int kv_pos = _slot_pos[META_KEY_VALUE_METADATA]; + bool has_kv_map = false; + Field kv_map_field; + if (kv_pos >= 0 && thrift_meta.__isset.key_value_metadata && + !thrift_meta.key_value_metadata.empty()) { + Array keys; + Array values; + keys.reserve(thrift_meta.key_value_metadata.size()); + values.reserve(thrift_meta.key_value_metadata.size()); + for (const auto& kv : thrift_meta.key_value_metadata) { + keys.emplace_back(Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.key))); + if (kv.__isset.value) { + values.emplace_back( + Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.value))); + } else { + values.emplace_back(Field {}); + } + } + Map map_value; + map_value.reserve(2); + map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(keys))); + map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(values))); + kv_map_field = Field::create_field<TYPE_MAP>(std::move(map_value)); + has_kv_map = true; + } + + for (size_t rg_index = 0; rg_index < thrift_meta.row_groups.size(); ++rg_index) { + const auto& row_group = thrift_meta.row_groups[rg_index]; + Int64 row_group_num_rows = static_cast<Int64>(row_group.num_rows); + Int64 row_group_num_columns = static_cast<Int64>(row_group.columns.size()); + Int64 row_group_bytes = static_cast<Int64>(row_group.total_byte_size); + Int64 row_group_compressed_bytes = 0; + if (row_group.__isset.total_compressed_size) { + row_group_compressed_bytes = static_cast<Int64>(row_group.total_compressed_size); + } else { + for (const auto& col_chunk : row_group.columns) { + if (!col_chunk.__isset.meta_data) { + continue; + } + row_group_compressed_bytes += col_chunk.meta_data.total_compressed_size; + } + } + + for (size_t col_idx = 0; col_idx < row_group.columns.size(); ++col_idx) { + const auto& column_chunk = row_group.columns[col_idx]; + if (!column_chunk.__isset.meta_data) { + continue; + } + const auto& column_meta = column_chunk.meta_data; + std::string path_in_schema = join_path(column_meta.path_in_schema); + const FieldSchema* schema_field = nullptr; + auto it = path_map.find(path_in_schema); + if (it != path_map.end()) { + schema_field = it->second; + } + + auto insert_if_requested = [&](MetadataColumnIndex idx, auto&& inserter, + auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(META_FILE_NAME, insert_string, + column_chunk.__isset.file_path ? column_chunk.file_path : path); + insert_if_requested(META_ROW_GROUP_ID, insert_int64, static_cast<Int64>(rg_index)); + insert_if_requested(META_ROW_GROUP_NUM_ROWS, insert_int64, row_group_num_rows); + insert_if_requested(META_ROW_GROUP_NUM_COLUMNS, insert_int64, + row_group_num_columns); + insert_if_requested(META_ROW_GROUP_BYTES, insert_int64, row_group_bytes); + insert_if_requested(META_COLUMN_ID, insert_int64, static_cast<Int64>(col_idx)); + + // `ColumnChunk.file_offset` is deprecated and can be 0 even when page offsets are present. + // Fall back to the first page (dictionary/data) offset to provide a useful value. + Int64 file_offset = static_cast<Int64>(column_chunk.file_offset); + if (file_offset == 0) { + if (column_meta.__isset.dictionary_page_offset) { + file_offset = static_cast<Int64>(column_meta.dictionary_page_offset); + } else { + file_offset = static_cast<Int64>(column_meta.data_page_offset); + } + } + insert_if_requested(META_FILE_OFFSET, insert_int64, file_offset); + insert_if_requested(META_NUM_VALUES, insert_int64, column_meta.num_values); + insert_if_requested(META_PATH_IN_SCHEMA, insert_string, path_in_schema); + insert_if_requested(META_TYPE, insert_string, + physical_type_to_string(column_meta.type)); + + if (column_meta.__isset.statistics) { + static const cctz::time_zone kUtc0 = cctz::utc_time_zone(); + const cctz::time_zone& ctz = _state != nullptr ? _state->timezone_obj() : kUtc0; + + const auto& stats = column_meta.statistics; + + if (stats.__isset.min) { + insert_if_requested(META_STATS_MIN, insert_string, + decode_statistics_value(schema_field, column_meta.type, + stats.min, ctz)); + } else { + insert_if_requested(META_STATS_MIN, insert_null); + } + if (stats.__isset.max) { + insert_if_requested(META_STATS_MAX, insert_string, + decode_statistics_value(schema_field, column_meta.type, + stats.max, ctz)); + } else { + insert_if_requested(META_STATS_MAX, insert_null); + } + + if (stats.__isset.null_count) { + insert_if_requested(META_STATS_NULL_COUNT, insert_int64, stats.null_count); + } else { + insert_if_requested(META_STATS_NULL_COUNT, insert_null); + } + if (stats.__isset.distinct_count) { + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_int64, + stats.distinct_count); + } else { + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null); + } + + // Prefer min_value/max_value, but fall back to deprecated min/max so the column + // is still populated for older files. + std::string encoded_min_value; + std::string encoded_max_value; + bool has_min_value = false; + bool has_max_value = false; + if (stats.__isset.min_value) { + encoded_min_value = stats.min_value; + has_min_value = true; + } else if (stats.__isset.min) { + encoded_min_value = stats.min; + has_min_value = true; + } + if (stats.__isset.max_value) { + encoded_max_value = stats.max_value; + has_max_value = true; + } else if (stats.__isset.max) { + encoded_max_value = stats.max; + has_max_value = true; + } + if (has_min_value) { + insert_if_requested(META_STATS_MIN_VALUE, insert_string, + decode_statistics_value(schema_field, column_meta.type, + encoded_min_value, ctz)); + } else { + insert_if_requested(META_STATS_MIN_VALUE, insert_null); + } + if (has_max_value) { + insert_if_requested(META_STATS_MAX_VALUE, insert_string, + decode_statistics_value(schema_field, column_meta.type, + encoded_max_value, ctz)); + } else { + insert_if_requested(META_STATS_MAX_VALUE, insert_null); + } + + if (stats.__isset.is_min_value_exact) { + insert_if_requested(META_MIN_IS_EXACT, insert_bool, + stats.is_min_value_exact); + } else { + insert_if_requested(META_MIN_IS_EXACT, insert_null); + } + if (stats.__isset.is_max_value_exact) { + insert_if_requested(META_MAX_IS_EXACT, insert_bool, + stats.is_max_value_exact); + } else { + insert_if_requested(META_MAX_IS_EXACT, insert_null); + } + } else { + insert_if_requested(META_STATS_MIN, insert_null); + insert_if_requested(META_STATS_MAX, insert_null); + insert_if_requested(META_STATS_NULL_COUNT, insert_null); + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null); + insert_if_requested(META_STATS_MIN_VALUE, insert_null); + insert_if_requested(META_STATS_MAX_VALUE, insert_null); + insert_if_requested(META_MIN_IS_EXACT, insert_null); + insert_if_requested(META_MAX_IS_EXACT, insert_null); + } + + insert_if_requested(META_COMPRESSION, insert_string, + compression_to_string(column_meta.codec)); + insert_if_requested(META_ENCODINGS, insert_string, + encodings_to_string(column_meta.encodings)); + + if (column_meta.__isset.index_page_offset) { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_int64, + column_meta.index_page_offset); + } else { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_null); + } + if (column_meta.__isset.dictionary_page_offset) { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_int64, + column_meta.dictionary_page_offset); + } else { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_null); + } + insert_if_requested(META_DATA_PAGE_OFFSET, insert_int64, + column_meta.data_page_offset); + + insert_if_requested(META_TOTAL_COMPRESSED_SIZE, insert_int64, + column_meta.total_compressed_size); + insert_if_requested(META_TOTAL_UNCOMPRESSED_SIZE, insert_int64, + column_meta.total_uncompressed_size); + + if (kv_pos >= 0) { + if (has_kv_map) { + columns[kv_pos]->insert(kv_map_field); + } else { + insert_null(columns[kv_pos]); + } + } + + if (column_meta.__isset.bloom_filter_offset) { + insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_int64, + column_meta.bloom_filter_offset); + } else { + insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_null); + } + if (column_meta.__isset.bloom_filter_length) { + insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_int64, + static_cast<Int64>(column_meta.bloom_filter_length)); + } else { + insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_null); + } + + insert_if_requested(META_ROW_GROUP_COMPRESSED_BYTES, insert_int64, + row_group_compressed_bytes); + } + } + return Status::OK(); + } + +private: + std::array<int, META_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetFileMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetFileMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kFileMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + + auto insert_if_requested = [&](FileMetadataColumnIndex idx, auto&& inserter, + auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(FILE_META_FILE_NAME, insert_string, path); + if (thrift_meta.__isset.created_by) { + insert_if_requested(FILE_META_CREATED_BY, insert_string, thrift_meta.created_by); + } else { + insert_if_requested(FILE_META_CREATED_BY, insert_null); + } + insert_if_requested(FILE_META_NUM_ROWS, insert_int64, + static_cast<Int64>(thrift_meta.num_rows)); + insert_if_requested(FILE_META_NUM_ROW_GROUPS, insert_int64, + static_cast<Int64>(thrift_meta.row_groups.size())); + insert_if_requested(FILE_META_FORMAT_VERSION, insert_int64, + static_cast<Int64>(thrift_meta.version)); + if (thrift_meta.__isset.encryption_algorithm) { + const auto& algo = thrift_meta.encryption_algorithm; + std::string algo_name; + if (algo.__isset.AES_GCM_V1) { + algo_name = "AES_GCM_V1"; + } else if (algo.__isset.AES_GCM_CTR_V1) { + algo_name = "AES_GCM_CTR_V1"; + } + if (!algo_name.empty()) { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_string, algo_name); + } else { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null); + } + } else { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null); + } + if (thrift_meta.__isset.footer_signing_key_metadata) { + insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_string, + thrift_meta.footer_signing_key_metadata); + } else { + insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_null); + } + return Status::OK(); + } + +private: + std::array<int, FILE_META_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetKeyValueModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetKeyValueModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kKeyValueColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (!thrift_meta.__isset.key_value_metadata || thrift_meta.key_value_metadata.empty()) { + return Status::OK(); + } + + auto insert_if_requested = [&](KeyValueColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + for (const auto& kv : thrift_meta.key_value_metadata) { + insert_if_requested(KV_FILE_NAME, insert_string, path); + insert_if_requested(KV_KEY, insert_string, kv.key); + if (kv.__isset.value) { + insert_if_requested(KV_VALUE, insert_string, kv.value); + } else { + insert_if_requested(KV_VALUE, insert_null); + } + } + return Status::OK(); + } + +private: + std::array<int, KV_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetBloomProbeModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + ParquetBloomProbeModeHandler(RuntimeState* state, TFileType::type file_type, + std::map<std::string, std::string> properties, std::string column, + std::string literal) + : ModeHandler(state), + _file_type(file_type), + _properties(std::move(properties)), + _column(std::move(column)), + _literal(std::move(literal)) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kBloomProbeColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const FieldSchema* schema = metadata->schema().get_column(_column); + if (schema == nullptr) { + return Status::InvalidArgument( + fmt::format("Column '{}' not found for parquet_bloom_probe", _column)); + } + int parquet_col_id = schema->physical_column_index; + PrimitiveType primitive_type = _get_primitive(schema->data_type); + if (!ParquetPredicate::bloom_filter_supported(primitive_type)) { + return Status::InvalidArgument( + fmt::format("Column '{}' type {} does not support parquet bloom filter probe", + _column, primitive_type)); + } + + std::string encoded_literal; + RETURN_IF_ERROR( + _encode_literal(schema->physical_type, primitive_type, _literal, &encoded_literal)); + + io::FileSystemProperties system_properties; + system_properties.system_type = _file_type; + system_properties.properties = _properties; + io::FileDescription file_desc; + file_desc.path = path; + io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader( + system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr)); + io::IOContext io_ctx; + + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + for (size_t rg_idx = 0; rg_idx < thrift_meta.row_groups.size(); ++rg_idx) { + if (parquet_col_id < 0 || + parquet_col_id >= thrift_meta.row_groups[rg_idx].columns.size()) { + return Status::InvalidArgument(fmt::format( + "Invalid column index {} for parquet_bloom_probe", parquet_col_id)); + } + const auto& column_chunk = thrift_meta.row_groups[rg_idx].columns[parquet_col_id]; + std::optional<bool> excludes; + if (column_chunk.__isset.meta_data && + column_chunk.meta_data.__isset.bloom_filter_offset) { + ParquetPredicate::ColumnStat stat; + auto st = ParquetPredicate::read_bloom_filter(column_chunk.meta_data, file_reader, + &io_ctx, &stat); + if (st.ok() && stat.bloom_filter) { + bool might_contain = stat.bloom_filter->test_bytes(encoded_literal.data(), + encoded_literal.size()); + excludes = !might_contain; + } + } + _emit_row(path, static_cast<Int64>(rg_idx), excludes, columns); + } + return Status::OK(); + } + +private: + std::array<int, BLOOM_COLUMN_COUNT> _slot_pos {}; + TFileType::type _file_type; + std::map<std::string, std::string> _properties; + std::string _column; + std::string _literal; + + PrimitiveType _get_primitive(const DataTypePtr& type) const { + if (auto nullable = typeid_cast<const DataTypeNullable*>(type.get())) { + return nullable->get_nested_type()->get_primitive_type(); + } + return type->get_primitive_type(); + } + + Status _encode_literal(tparquet::Type::type physical_type, PrimitiveType primitive_type, + const std::string& literal, std::string* out) const { + try { + switch (physical_type) { + case tparquet::Type::INT32: { + int64_t v = std::stoll(literal); + int32_t v32 = static_cast<int32_t>(v); + out->assign(reinterpret_cast<const char*>(&v32), sizeof(int32_t)); + return Status::OK(); + } Review Comment: In `_encode_literal()` the INT32 branch parses into `int64_t` and then `static_cast<int32_t>` without any range validation. Values outside the int32 range will silently wrap, producing incorrect bloom-probe results; it would be better to validate bounds and return `InvalidArgument` on overflow. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetBloomProbe.java: ########## @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.ParquetMetadataTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** parquet_bloom_probe */ +public class ParquetBloomProbe extends TableValuedFunction { + public ParquetBloomProbe(Properties properties) { + super("parquet_bloom_probe", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map<String, String> arguments = getTVFProperties().getMap(); + arguments.put("mode", "parquet_bloom_probe"); + return new ParquetMetadataTableValuedFunction(arguments); + } catch (Throwable t) { Review Comment: `getTVFProperties().getMap()` is typically an `ImmutableMap` (see parser property handling). Mutating it with `arguments.put("mode", ...)` can fail at runtime. Use a mutable copy before setting `mode` and constructing `ParquetMetadataTableValuedFunction`. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ParquetFileMetadata.java: ########## @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.ParquetMetadataTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** parquet_file_metadata */ +public class ParquetFileMetadata extends TableValuedFunction { + public ParquetFileMetadata(Properties properties) { + super("parquet_file_metadata", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map<String, String> arguments = getTVFProperties().getMap(); + arguments.put("mode", "parquet_file_metadata"); + return new ParquetMetadataTableValuedFunction(arguments); + } catch (Throwable t) { Review Comment: `getTVFProperties().getMap()` is backed by an `ImmutableMap` created by the parser, so calling `arguments.put("mode", ...)` can throw `UnsupportedOperationException`. Copy to a mutable map before modifying and passing to `ParquetMetadataTableValuedFunction`. ########## be/src/vec/exec/format/table/parquet_metadata_reader.cpp: ########## @@ -0,0 +1,880 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/parquet_metadata_reader.h" + +#include <fmt/format.h> + +#include <algorithm> +#include <array> +#include <cctype> +#include <cstring> +#include <memory> +#include <optional> +#include <unordered_map> +#include <utility> + +#include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "io/hdfs_builder.h" +#include "io/io_common.h" +#include "runtime/runtime_state.h" +#include "util/string_util.h" +#include "vec/columns/column_map.h" +#include "vec/columns/column_nullable.h" +#include "vec/common/string_view.h" +#include "vec/core/block.h" +#include "vec/core/field.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/parquet/parquet_thrift_util.h" +#include "vec/exec/format/parquet/schema_desc.h" +#include "vec/exec/format/parquet/vparquet_file_metadata.h" +#include "vec/exec/format/table/parquet_utils.h" + +namespace doris::vectorized { + +using namespace parquet_utils; + +class ParquetMetadataReader::ModeHandler { +public: + explicit ModeHandler(RuntimeState* state) : _state(state) {} + virtual ~ModeHandler() = default; + + virtual void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) = 0; + virtual Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) = 0; + +protected: + RuntimeState* _state = nullptr; + + static std::unordered_map<std::string, int> _build_name_to_pos_map( + const std::vector<SlotDescriptor*>& slots) { + std::unordered_map<std::string, int> name_to_pos; + name_to_pos.reserve(slots.size()); + for (size_t i = 0; i < slots.size(); ++i) { + name_to_pos.emplace(to_lower(slots[i]->col_name()), static_cast<int>(i)); + } + return name_to_pos; + } + + template <size_t N> + static void _init_slot_pos_map(const std::unordered_map<std::string, int>& name_to_pos, + const std::array<const char*, N>& column_names, + std::array<int, N>* slot_pos) { + slot_pos->fill(-1); + for (size_t i = 0; i < column_names.size(); ++i) { + auto it = name_to_pos.find(column_names[i]); + if (it != name_to_pos.end()) { + (*slot_pos)[i] = it->second; + } + } + } +}; + +class ParquetSchemaModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetSchemaModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kSchemaColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + RETURN_IF_ERROR(_append_schema_node(path, field, columns)); + } + return Status::OK(); + } + +private: + std::array<int, SCHEMA_COLUMN_COUNT> _slot_pos {}; + + static std::string _repetition_type_to_string(tparquet::FieldRepetitionType::type type) { + switch (type) { + case tparquet::FieldRepetitionType::REQUIRED: + return "REQUIRED"; + case tparquet::FieldRepetitionType::OPTIONAL: + return "OPTIONAL"; + case tparquet::FieldRepetitionType::REPEATED: + return "REPEATED"; + default: + return "UNKNOWN"; + } + } + + Status _append_schema_node(const std::string& path, const FieldSchema& field, + std::vector<MutableColumnPtr>& columns) { + auto insert_if_requested = [&](SchemaColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(SCHEMA_FILE_NAME, insert_string, path); + insert_if_requested(SCHEMA_NAME, insert_string, field.parquet_schema.name); + + if (field.parquet_schema.__isset.type) { + insert_if_requested(SCHEMA_TYPE, insert_string, + physical_type_to_string(field.parquet_schema.type)); + } else { + insert_if_requested(SCHEMA_TYPE, insert_null); + } + + if (field.parquet_schema.__isset.type_length) { + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_int64, + static_cast<Int64>(field.parquet_schema.type_length)); + } else { + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_null); + } + + if (field.parquet_schema.__isset.repetition_type) { + insert_if_requested(SCHEMA_REPETITION_TYPE, insert_string, + _repetition_type_to_string(field.parquet_schema.repetition_type)); + } else { + insert_if_requested(SCHEMA_REPETITION_TYPE, insert_null); + } + + int64_t num_children = field.parquet_schema.__isset.num_children + ? static_cast<int64_t>(field.parquet_schema.num_children) + : 0; + insert_if_requested(SCHEMA_NUM_CHILDREN, insert_int64, static_cast<Int64>(num_children)); + + if (field.parquet_schema.__isset.converted_type) { + insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_string, + converted_type_to_string(field.parquet_schema.converted_type)); + } else { + insert_if_requested(SCHEMA_CONVERTED_TYPE, insert_null); + } + + if (field.parquet_schema.__isset.scale) { + insert_if_requested(SCHEMA_SCALE, insert_int64, + static_cast<Int64>(field.parquet_schema.scale)); + } else { + insert_if_requested(SCHEMA_SCALE, insert_null); + } + + if (field.parquet_schema.__isset.precision) { + insert_if_requested(SCHEMA_PRECISION, insert_int64, + static_cast<Int64>(field.parquet_schema.precision)); + } else { + insert_if_requested(SCHEMA_PRECISION, insert_null); + } + + if (field.parquet_schema.__isset.field_id) { + insert_if_requested(SCHEMA_FIELD_ID, insert_int64, + static_cast<Int64>(field.parquet_schema.field_id)); + } else { + insert_if_requested(SCHEMA_FIELD_ID, insert_null); + } + + std::string logical = logical_type_to_string(field.parquet_schema); + if (logical.empty()) { + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_null); + } else { + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_string, logical); + } + + for (const auto& child : field.children) { + RETURN_IF_ERROR(_append_schema_node(path, child, columns)); + } + return Status::OK(); + } +}; + +class ParquetMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + std::unordered_map<std::string, int> name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + std::unordered_map<std::string, const FieldSchema*> path_map; + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + build_path_map(field, "", &path_map); + } + + const int kv_pos = _slot_pos[META_KEY_VALUE_METADATA]; + bool has_kv_map = false; + Field kv_map_field; + if (kv_pos >= 0 && thrift_meta.__isset.key_value_metadata && + !thrift_meta.key_value_metadata.empty()) { + Array keys; + Array values; + keys.reserve(thrift_meta.key_value_metadata.size()); + values.reserve(thrift_meta.key_value_metadata.size()); + for (const auto& kv : thrift_meta.key_value_metadata) { + keys.emplace_back(Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.key))); + if (kv.__isset.value) { + values.emplace_back( + Field::create_field<TYPE_VARBINARY>(doris::StringView(kv.value))); + } else { + values.emplace_back(Field {}); + } + } + Map map_value; + map_value.reserve(2); + map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(keys))); + map_value.emplace_back(Field::create_field<TYPE_ARRAY>(std::move(values))); + kv_map_field = Field::create_field<TYPE_MAP>(std::move(map_value)); + has_kv_map = true; + } + + for (size_t rg_index = 0; rg_index < thrift_meta.row_groups.size(); ++rg_index) { + const auto& row_group = thrift_meta.row_groups[rg_index]; + Int64 row_group_num_rows = static_cast<Int64>(row_group.num_rows); + Int64 row_group_num_columns = static_cast<Int64>(row_group.columns.size()); + Int64 row_group_bytes = static_cast<Int64>(row_group.total_byte_size); + Int64 row_group_compressed_bytes = 0; + if (row_group.__isset.total_compressed_size) { + row_group_compressed_bytes = static_cast<Int64>(row_group.total_compressed_size); + } else { + for (const auto& col_chunk : row_group.columns) { + if (!col_chunk.__isset.meta_data) { + continue; + } + row_group_compressed_bytes += col_chunk.meta_data.total_compressed_size; + } + } + + for (size_t col_idx = 0; col_idx < row_group.columns.size(); ++col_idx) { + const auto& column_chunk = row_group.columns[col_idx]; + if (!column_chunk.__isset.meta_data) { + continue; + } + const auto& column_meta = column_chunk.meta_data; + std::string path_in_schema = join_path(column_meta.path_in_schema); + const FieldSchema* schema_field = nullptr; + auto it = path_map.find(path_in_schema); + if (it != path_map.end()) { + schema_field = it->second; + } + + auto insert_if_requested = [&](MetadataColumnIndex idx, auto&& inserter, + auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(META_FILE_NAME, insert_string, + column_chunk.__isset.file_path ? column_chunk.file_path : path); + insert_if_requested(META_ROW_GROUP_ID, insert_int64, static_cast<Int64>(rg_index)); + insert_if_requested(META_ROW_GROUP_NUM_ROWS, insert_int64, row_group_num_rows); + insert_if_requested(META_ROW_GROUP_NUM_COLUMNS, insert_int64, + row_group_num_columns); + insert_if_requested(META_ROW_GROUP_BYTES, insert_int64, row_group_bytes); + insert_if_requested(META_COLUMN_ID, insert_int64, static_cast<Int64>(col_idx)); + + // `ColumnChunk.file_offset` is deprecated and can be 0 even when page offsets are present. + // Fall back to the first page (dictionary/data) offset to provide a useful value. + Int64 file_offset = static_cast<Int64>(column_chunk.file_offset); + if (file_offset == 0) { + if (column_meta.__isset.dictionary_page_offset) { + file_offset = static_cast<Int64>(column_meta.dictionary_page_offset); + } else { + file_offset = static_cast<Int64>(column_meta.data_page_offset); + } + } + insert_if_requested(META_FILE_OFFSET, insert_int64, file_offset); + insert_if_requested(META_NUM_VALUES, insert_int64, column_meta.num_values); + insert_if_requested(META_PATH_IN_SCHEMA, insert_string, path_in_schema); + insert_if_requested(META_TYPE, insert_string, + physical_type_to_string(column_meta.type)); + + if (column_meta.__isset.statistics) { + static const cctz::time_zone kUtc0 = cctz::utc_time_zone(); + const cctz::time_zone& ctz = _state != nullptr ? _state->timezone_obj() : kUtc0; + + const auto& stats = column_meta.statistics; + + if (stats.__isset.min) { + insert_if_requested(META_STATS_MIN, insert_string, + decode_statistics_value(schema_field, column_meta.type, + stats.min, ctz)); + } else { + insert_if_requested(META_STATS_MIN, insert_null); + } + if (stats.__isset.max) { + insert_if_requested(META_STATS_MAX, insert_string, + decode_statistics_value(schema_field, column_meta.type, + stats.max, ctz)); + } else { + insert_if_requested(META_STATS_MAX, insert_null); + } + + if (stats.__isset.null_count) { + insert_if_requested(META_STATS_NULL_COUNT, insert_int64, stats.null_count); + } else { + insert_if_requested(META_STATS_NULL_COUNT, insert_null); + } + if (stats.__isset.distinct_count) { + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_int64, + stats.distinct_count); + } else { + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null); + } + + // Prefer min_value/max_value, but fall back to deprecated min/max so the column + // is still populated for older files. + std::string encoded_min_value; + std::string encoded_max_value; + bool has_min_value = false; + bool has_max_value = false; + if (stats.__isset.min_value) { + encoded_min_value = stats.min_value; + has_min_value = true; + } else if (stats.__isset.min) { + encoded_min_value = stats.min; + has_min_value = true; + } + if (stats.__isset.max_value) { + encoded_max_value = stats.max_value; + has_max_value = true; + } else if (stats.__isset.max) { + encoded_max_value = stats.max; + has_max_value = true; + } + if (has_min_value) { + insert_if_requested(META_STATS_MIN_VALUE, insert_string, + decode_statistics_value(schema_field, column_meta.type, + encoded_min_value, ctz)); + } else { + insert_if_requested(META_STATS_MIN_VALUE, insert_null); + } + if (has_max_value) { + insert_if_requested(META_STATS_MAX_VALUE, insert_string, + decode_statistics_value(schema_field, column_meta.type, + encoded_max_value, ctz)); + } else { + insert_if_requested(META_STATS_MAX_VALUE, insert_null); + } + + if (stats.__isset.is_min_value_exact) { + insert_if_requested(META_MIN_IS_EXACT, insert_bool, + stats.is_min_value_exact); + } else { + insert_if_requested(META_MIN_IS_EXACT, insert_null); + } + if (stats.__isset.is_max_value_exact) { + insert_if_requested(META_MAX_IS_EXACT, insert_bool, + stats.is_max_value_exact); + } else { + insert_if_requested(META_MAX_IS_EXACT, insert_null); + } + } else { + insert_if_requested(META_STATS_MIN, insert_null); + insert_if_requested(META_STATS_MAX, insert_null); + insert_if_requested(META_STATS_NULL_COUNT, insert_null); + insert_if_requested(META_STATS_DISTINCT_COUNT, insert_null); + insert_if_requested(META_STATS_MIN_VALUE, insert_null); + insert_if_requested(META_STATS_MAX_VALUE, insert_null); + insert_if_requested(META_MIN_IS_EXACT, insert_null); + insert_if_requested(META_MAX_IS_EXACT, insert_null); + } + + insert_if_requested(META_COMPRESSION, insert_string, + compression_to_string(column_meta.codec)); + insert_if_requested(META_ENCODINGS, insert_string, + encodings_to_string(column_meta.encodings)); + + if (column_meta.__isset.index_page_offset) { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_int64, + column_meta.index_page_offset); + } else { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_null); + } + if (column_meta.__isset.dictionary_page_offset) { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_int64, + column_meta.dictionary_page_offset); + } else { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_null); + } + insert_if_requested(META_DATA_PAGE_OFFSET, insert_int64, + column_meta.data_page_offset); + + insert_if_requested(META_TOTAL_COMPRESSED_SIZE, insert_int64, + column_meta.total_compressed_size); + insert_if_requested(META_TOTAL_UNCOMPRESSED_SIZE, insert_int64, + column_meta.total_uncompressed_size); + + if (kv_pos >= 0) { + if (has_kv_map) { + columns[kv_pos]->insert(kv_map_field); + } else { + insert_null(columns[kv_pos]); + } + } + + if (column_meta.__isset.bloom_filter_offset) { + insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_int64, + column_meta.bloom_filter_offset); + } else { + insert_if_requested(META_BLOOM_FILTER_OFFSET, insert_null); + } + if (column_meta.__isset.bloom_filter_length) { + insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_int64, + static_cast<Int64>(column_meta.bloom_filter_length)); + } else { + insert_if_requested(META_BLOOM_FILTER_LENGTH, insert_null); + } + + insert_if_requested(META_ROW_GROUP_COMPRESSED_BYTES, insert_int64, + row_group_compressed_bytes); + } + } + return Status::OK(); + } + +private: + std::array<int, META_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetFileMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetFileMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kFileMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + + auto insert_if_requested = [&](FileMetadataColumnIndex idx, auto&& inserter, + auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(FILE_META_FILE_NAME, insert_string, path); + if (thrift_meta.__isset.created_by) { + insert_if_requested(FILE_META_CREATED_BY, insert_string, thrift_meta.created_by); + } else { + insert_if_requested(FILE_META_CREATED_BY, insert_null); + } + insert_if_requested(FILE_META_NUM_ROWS, insert_int64, + static_cast<Int64>(thrift_meta.num_rows)); + insert_if_requested(FILE_META_NUM_ROW_GROUPS, insert_int64, + static_cast<Int64>(thrift_meta.row_groups.size())); + insert_if_requested(FILE_META_FORMAT_VERSION, insert_int64, + static_cast<Int64>(thrift_meta.version)); + if (thrift_meta.__isset.encryption_algorithm) { + const auto& algo = thrift_meta.encryption_algorithm; + std::string algo_name; + if (algo.__isset.AES_GCM_V1) { + algo_name = "AES_GCM_V1"; + } else if (algo.__isset.AES_GCM_CTR_V1) { + algo_name = "AES_GCM_CTR_V1"; + } + if (!algo_name.empty()) { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_string, algo_name); + } else { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null); + } + } else { + insert_if_requested(FILE_META_ENCRYPTION_ALGORITHM, insert_null); + } + if (thrift_meta.__isset.footer_signing_key_metadata) { + insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_string, + thrift_meta.footer_signing_key_metadata); + } else { + insert_if_requested(FILE_META_FOOTER_SIGNING_KEY_METADATA, insert_null); + } + return Status::OK(); + } + +private: + std::array<int, FILE_META_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetKeyValueModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetKeyValueModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kKeyValueColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (!thrift_meta.__isset.key_value_metadata || thrift_meta.key_value_metadata.empty()) { + return Status::OK(); + } + + auto insert_if_requested = [&](KeyValueColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + for (const auto& kv : thrift_meta.key_value_metadata) { + insert_if_requested(KV_FILE_NAME, insert_string, path); + insert_if_requested(KV_KEY, insert_string, kv.key); + if (kv.__isset.value) { + insert_if_requested(KV_VALUE, insert_string, kv.value); + } else { + insert_if_requested(KV_VALUE, insert_null); + } + } + return Status::OK(); + } + +private: + std::array<int, KV_COLUMN_COUNT> _slot_pos {}; +}; + +class ParquetBloomProbeModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + ParquetBloomProbeModeHandler(RuntimeState* state, TFileType::type file_type, + std::map<std::string, std::string> properties, std::string column, + std::string literal) + : ModeHandler(state), + _file_type(file_type), + _properties(std::move(properties)), + _column(std::move(column)), + _literal(std::move(literal)) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + const auto& name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kBloomProbeColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const FieldSchema* schema = metadata->schema().get_column(_column); + if (schema == nullptr) { + return Status::InvalidArgument( + fmt::format("Column '{}' not found for parquet_bloom_probe", _column)); + } + int parquet_col_id = schema->physical_column_index; + PrimitiveType primitive_type = _get_primitive(schema->data_type); + if (!ParquetPredicate::bloom_filter_supported(primitive_type)) { + return Status::InvalidArgument( + fmt::format("Column '{}' type {} does not support parquet bloom filter probe", + _column, primitive_type)); + } + + std::string encoded_literal; + RETURN_IF_ERROR( + _encode_literal(schema->physical_type, primitive_type, _literal, &encoded_literal)); + + io::FileSystemProperties system_properties; + system_properties.system_type = _file_type; + system_properties.properties = _properties; + io::FileDescription file_desc; + file_desc.path = path; + io::FileReaderSPtr file_reader = DORIS_TRY(FileFactory::create_file_reader( + system_properties, file_desc, io::FileReaderOptions::DEFAULT, nullptr)); + io::IOContext io_ctx; + + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + for (size_t rg_idx = 0; rg_idx < thrift_meta.row_groups.size(); ++rg_idx) { + if (parquet_col_id < 0 || + parquet_col_id >= thrift_meta.row_groups[rg_idx].columns.size()) { + return Status::InvalidArgument(fmt::format( + "Invalid column index {} for parquet_bloom_probe", parquet_col_id)); + } + const auto& column_chunk = thrift_meta.row_groups[rg_idx].columns[parquet_col_id]; + std::optional<bool> excludes; + if (column_chunk.__isset.meta_data && + column_chunk.meta_data.__isset.bloom_filter_offset) { + ParquetPredicate::ColumnStat stat; + auto st = ParquetPredicate::read_bloom_filter(column_chunk.meta_data, file_reader, + &io_ctx, &stat); + if (st.ok() && stat.bloom_filter) { + bool might_contain = stat.bloom_filter->test_bytes(encoded_literal.data(), + encoded_literal.size()); + excludes = !might_contain; + } + } + _emit_row(path, static_cast<Int64>(rg_idx), excludes, columns); + } + return Status::OK(); + } + +private: + std::array<int, BLOOM_COLUMN_COUNT> _slot_pos {}; + TFileType::type _file_type; + std::map<std::string, std::string> _properties; + std::string _column; + std::string _literal; + + PrimitiveType _get_primitive(const DataTypePtr& type) const { + if (auto nullable = typeid_cast<const DataTypeNullable*>(type.get())) { + return nullable->get_nested_type()->get_primitive_type(); + } + return type->get_primitive_type(); + } + + Status _encode_literal(tparquet::Type::type physical_type, PrimitiveType primitive_type, + const std::string& literal, std::string* out) const { + try { + switch (physical_type) { + case tparquet::Type::INT32: { + int64_t v = std::stoll(literal); + int32_t v32 = static_cast<int32_t>(v); + out->assign(reinterpret_cast<const char*>(&v32), sizeof(int32_t)); + return Status::OK(); + } + case tparquet::Type::INT64: { + int64_t v = std::stoll(literal); + out->assign(reinterpret_cast<const char*>(&v), sizeof(int64_t)); + return Status::OK(); + } + case tparquet::Type::FLOAT: { + float v = std::stof(literal); + out->assign(reinterpret_cast<const char*>(&v), sizeof(float)); + return Status::OK(); + } + case tparquet::Type::DOUBLE: { + double v = std::stod(literal); + out->assign(reinterpret_cast<const char*>(&v), sizeof(double)); + return Status::OK(); + } + case tparquet::Type::BYTE_ARRAY: { + // For string/blob, use raw bytes from the literal. + *out = literal; + return Status::OK(); + } + default: + break; + } + } catch (const std::exception& e) { + return Status::InvalidArgument(fmt::format( + "Failed to parse literal '{}' for parquet bloom probe: {}", literal, e.what())); + } + return Status::NotSupported( + fmt::format("Physical type {} for column '{}' not supported in parquet_bloom_probe", + physical_type, _column)); + } + + void _emit_row(const std::string& path, Int64 row_group_id, std::optional<bool> excludes, + std::vector<MutableColumnPtr>& columns) { + if (_slot_pos[BLOOM_FILE_NAME] >= 0) { + insert_string(columns[_slot_pos[BLOOM_FILE_NAME]], path); + } + if (_slot_pos[BLOOM_ROW_GROUP_ID] >= 0) { + insert_int32(columns[_slot_pos[BLOOM_ROW_GROUP_ID]], static_cast<Int32>(row_group_id)); + } + if (_slot_pos[BLOOM_EXCLUDES] >= 0) { + int32_t excludes_val = -1; // -1: no bloom filter present + if (excludes.has_value()) { + excludes_val = excludes.value() ? 1 : 0; + } + insert_int32(columns[_slot_pos[BLOOM_EXCLUDES]], excludes_val); + } + } +}; + +ParquetMetadataReader::ParquetMetadataReader(std::vector<SlotDescriptor*> slots, + RuntimeState* state, RuntimeProfile* profile, + TMetaScanRange scan_range) + : _state(state), _slots(std::move(slots)), _scan_range(std::move(scan_range)) { + (void)profile; +} + +ParquetMetadataReader::~ParquetMetadataReader() = default; + +Status ParquetMetadataReader::init_reader() { + RETURN_IF_ERROR(_init_from_scan_range(_scan_range)); + if (_mode_type == Mode::SCHEMA) { + _mode_handler = std::make_unique<ParquetSchemaModeHandler>(_state); + } else if (_mode_type == Mode::FILE_METADATA) { + _mode_handler = std::make_unique<ParquetFileMetadataModeHandler>(_state); + } else if (_mode_type == Mode::KEY_VALUE_METADATA) { + _mode_handler = std::make_unique<ParquetKeyValueModeHandler>(_state); + } else if (_mode_type == Mode::BLOOM_PROBE) { + _mode_handler = std::make_unique<ParquetBloomProbeModeHandler>( + _state, _file_type, _properties, _bloom_column, _bloom_literal); + } else { + _mode_handler = std::make_unique<ParquetMetadataModeHandler>(_state); + } + _mode_handler->init_slot_pos_map(_slots); + return Status::OK(); +} + +Status ParquetMetadataReader::_init_from_scan_range(const TMetaScanRange& scan_range) { + if (!scan_range.__isset.parquet_params) { + return Status::InvalidArgument( + "Missing parquet parameters for parquet_meta table function"); + } + const TParquetMetadataParams& params = scan_range.parquet_params; + std::vector<std::string> resolved_paths; + if (scan_range.__isset.serialized_splits && !scan_range.serialized_splits.empty()) { + resolved_paths.assign(scan_range.serialized_splits.begin(), + scan_range.serialized_splits.end()); + } else if (params.__isset.paths && !params.paths.empty()) { + resolved_paths.assign(params.paths.begin(), params.paths.end()); + } else { + return Status::InvalidArgument("Property 'path' must be set for parquet_meta"); + } + _paths.swap(resolved_paths); + + if (params.__isset.mode) { + _mode = params.mode; + } else { + _mode = MODE_METADATA; // default + } + + if (params.__isset.file_type) { + _file_type = params.file_type; + } else { + return Status::InvalidArgument("Property 'file_type' must be set for parquet_metadata"); Review Comment: This error says `parquet_metadata` but the TVF name is `parquet_meta` (and other messages in the file use `parquet_meta`). Consider using consistent wording here for easier troubleshooting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
