This is an automated email from the ASF dual-hosted git repository. suxiaogang223 pushed a commit to branch codex/complex-column-predicate-stats-filtering in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6dbbb5435b07aa2608ed5ee184435c4296b756e2 Author: Socrates <[email protected]> AuthorDate: Thu Jun 4 01:40:01 2026 +0800 [feature](be) Support nested parquet struct pruning ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Support file-layer pruning for primitive leaf predicates under Parquet STRUCT columns in the new parquet reader. The change keeps row-level filtering on Expr/VExprContext, adds file-local nested predicate targets, merges filter-only nested projections, and resolves nested primitive leaves for statistics, dictionary, bloom, and page-index pruning. ### Release note None ### Check List (For Author) - Test: Unit Test / Manual test - Ran build-support/clang-format.sh for modified BE files. - Ran git diff --check. - Local BE UT did not start because macOS clang16 failed CMake compiler probe with ld: library c++ not found. Fedora build and UT will be run after push. - Behavior changed: No - Does this need documentation: Yes (updated internal design document) --- be/src/format/new_parquet/parquet_statistics.cpp | 77 ++- be/src/format/new_parquet/parquet_statistics.h | 4 + be/src/format/reader/column_mapper.cpp | 588 ++++++++++++++++++++- be/src/format/reader/column_mapper.h | 6 +- be/src/format/reader/file_reader.h | 3 + be/test/format/new_parquet/parquet_reader_test.cpp | 186 +++++++ ...complex-column-predicate-and-stats-filtering.md | 339 +++++++----- 7 files changed, 1032 insertions(+), 171 deletions(-) diff --git a/be/src/format/new_parquet/parquet_statistics.cpp b/be/src/format/new_parquet/parquet_statistics.cpp index 0130a8bbaff..f5821d0640c 100644 --- a/be/src/format/new_parquet/parquet_statistics.cpp +++ b/be/src/format/new_parquet/parquet_statistics.cpp @@ -288,11 +288,9 @@ ParquetRowGroupPruneReason BloomFilterPruneReason( if (bloom_filter_cache == nullptr || column_filter.predicates.empty()) { return ParquetRowGroupPruneReason::NONE; } - DCHECK_LT(column_filter.file_column_id, schema.size()); - const auto& column_schema = *schema[column_filter.file_column_id]; - if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || - column_schema.leaf_column_id < 0 || - !ParquetStatisticsUtils::BloomFilterSupported(column_schema)) { + const auto* column_schema = + ParquetStatisticsUtils::ResolvePredicateLeafSchema(schema, column_filter); + if (column_schema == nullptr || !ParquetStatisticsUtils::BloomFilterSupported(*column_schema)) { return ParquetRowGroupPruneReason::NONE; } for (const auto& column_predicate : column_filter.predicates) { @@ -301,11 +299,11 @@ ParquetRowGroupPruneReason BloomFilterPruneReason( } } auto* bloom_filter = - bloom_filter_cache->get(row_group_idx, column_schema.leaf_column_id, pruning_stats); + bloom_filter_cache->get(row_group_idx, column_schema->leaf_column_id, pruning_stats); if (bloom_filter == nullptr) { return ParquetRowGroupPruneReason::NONE; } - return ParquetStatisticsUtils::BloomFilterExcludes(column_schema, column_filter, *bloom_filter) + return ParquetStatisticsUtils::BloomFilterExcludes(*column_schema, column_filter, *bloom_filter) ? ParquetRowGroupPruneReason::BLOOM_FILTER : ParquetRowGroupPruneReason::NONE; } @@ -486,8 +484,41 @@ segment_v2::ZoneMap to_column_predicate_statistics(const ParquetColumnStatistics return predicate_statistics; } +const ParquetColumnSchema* find_child_schema_by_field_id(const ParquetColumnSchema& column_schema, + int32_t field_id) { + const auto child_it = std::ranges::find_if( + column_schema.children, [&](const std::unique_ptr<ParquetColumnSchema>& child) { + return child != nullptr && child->field_id == field_id; + }); + return child_it == column_schema.children.end() ? nullptr : child_it->get(); +} + } // namespace +const ParquetColumnSchema* ParquetStatisticsUtils::ResolvePredicateLeafSchema( + const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, + const reader::FileColumnPredicateFilter& column_filter) { + if (column_filter.file_column_id < 0 || + column_filter.file_column_id >= static_cast<int>(schema.size())) { + return nullptr; + } + const ParquetColumnSchema* column_schema = schema[column_filter.file_column_id].get(); + if (column_schema == nullptr) { + return nullptr; + } + for (const auto child_field_id : column_filter.file_child_id_path) { + column_schema = find_child_schema_by_field_id(*column_schema, child_field_id); + if (column_schema == nullptr) { + return nullptr; + } + } + if (column_schema->kind != ParquetColumnSchemaKind::PRIMITIVE || + column_schema->leaf_column_id < 0 || column_schema->max_repetition_level > 0) { + return nullptr; + } + return column_schema; +} + ParquetColumnStatistics ParquetStatisticsUtils::TransformColumnStatistics( const ParquetColumnSchema& column_schema, const std::shared_ptr<::parquet::Statistics>& statistics) { @@ -561,28 +592,26 @@ ParquetRowGroupPruneReason ParquetStatisticsUtils::RowGroupPruneReason( if (column_filter.predicates.empty()) { return ParquetRowGroupPruneReason::NONE; } - DCHECK_LT(column_filter.file_column_id, schema.size()); - const auto& column_schema = *schema[column_filter.file_column_id]; - if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || - column_schema.leaf_column_id < 0) { + const auto* column_schema = ResolvePredicateLeafSchema(schema, column_filter); + if (column_schema == nullptr) { return ParquetRowGroupPruneReason::NONE; } - DCHECK_LT(column_schema.leaf_column_id, row_group.num_columns()); - auto column_chunk = row_group.ColumnChunk(column_schema.leaf_column_id); + DCHECK_LT(column_schema->leaf_column_id, row_group.num_columns()); + auto column_chunk = row_group.ColumnChunk(column_schema->leaf_column_id); if (column_chunk == nullptr) { return ParquetRowGroupPruneReason::NONE; } if (CheckStatistics(column_filter, - TransformColumnStatistics(column_schema, column_chunk->statistics()))) { + TransformColumnStatistics(*column_schema, column_chunk->statistics()))) { return ParquetRowGroupPruneReason::STATISTICS; } - if (!supports_dictionary_pruning(column_schema, *column_chunk, column_filter) || + if (!supports_dictionary_pruning(*column_schema, *column_chunk, column_filter) || !is_dictionary_encoded_chunk(*column_chunk)) { return ParquetRowGroupPruneReason::NONE; } OwnedDictionaryWords dict_words; - if (!read_dictionary_words(file_reader, row_group_idx, column_schema.leaf_column_id, - column_schema, &dict_words)) { + if (!read_dictionary_words(file_reader, row_group_idx, column_schema->leaf_column_id, + *column_schema, &dict_words)) { return ParquetRowGroupPruneReason::NONE; } for (const auto& column_predicate : column_filter.predicates) { @@ -883,19 +912,17 @@ bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndex if (column_filter.predicates.empty()) { return false; } - DORIS_CHECK(column_filter.file_column_id >= 0); - DORIS_CHECK(column_filter.file_column_id < static_cast<int>(file_schema.size())); - const auto& column_schema = *file_schema[column_filter.file_column_id]; - if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || - column_schema.descriptor == nullptr || column_schema.leaf_column_id < 0) { + const auto* column_schema = + ParquetStatisticsUtils::ResolvePredicateLeafSchema(file_schema, column_filter); + if (column_schema == nullptr || column_schema->descriptor == nullptr) { return false; } std::shared_ptr<::parquet::ColumnIndex> column_index; std::shared_ptr<::parquet::OffsetIndex> offset_index; try { - column_index = row_group->GetColumnIndex(column_schema.leaf_column_id); - offset_index = row_group->GetOffsetIndex(column_schema.leaf_column_id); + column_index = row_group->GetColumnIndex(column_schema->leaf_column_id); + offset_index = row_group->GetOffsetIndex(column_schema->leaf_column_id); } catch (const ::parquet::ParquetException&) { return false; } catch (const std::exception&) { @@ -910,7 +937,7 @@ bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndex const auto page_count = offset_index->page_locations().size(); for (size_t page_idx = 0; page_idx < page_count; ++page_idx) { ParquetColumnStatistics page_statistics; - if (!build_page_statistics(column_index, column_schema, page_idx, &page_statistics)) { + if (!build_page_statistics(column_index, *column_schema, page_idx, &page_statistics)) { ranges->clear(); return false; } diff --git a/be/src/format/new_parquet/parquet_statistics.h b/be/src/format/new_parquet/parquet_statistics.h index b28a784316d..560a073636c 100644 --- a/be/src/format/new_parquet/parquet_statistics.h +++ b/be/src/format/new_parquet/parquet_statistics.h @@ -81,6 +81,10 @@ struct ParquetColumnStatistics { // 结构参考 DuckDB ParquetStatisticsUtils:先把 Parquet metadata 转成统一统计对象, // 再由 filter/predicate 判断是否可以裁剪。这里不理解 table/global schema。 struct ParquetStatisticsUtils { + static const ParquetColumnSchema* ResolvePredicateLeafSchema( + const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, + const reader::FileColumnPredicateFilter& column_filter); + static ParquetColumnStatistics TransformColumnStatistics( const ParquetColumnSchema& column_schema, const std::shared_ptr<::parquet::Statistics>& statistics); diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp index 3620428fecb..1efd255673b 100644 --- a/be/src/format/reader/column_mapper.cpp +++ b/be/src/format/reader/column_mapper.cpp @@ -20,6 +20,8 @@ #include <algorithm> #include <cstddef> #include <memory> +#include <optional> +#include <span> #include <utility> #include <vector> @@ -33,6 +35,7 @@ #include "format/reader/file_reader.h" #include "format/reader/schema_projection.h" #include "format/reader/table_reader.h" +#include "storage/predicate/predicate_creator.h" namespace doris::reader { @@ -43,6 +46,24 @@ struct FileSlotRewriteInfo { std::string file_column_name; }; +struct StructChildSelector { + bool by_name = true; + std::string name; + size_t ordinal = 0; +}; + +struct NestedStructPath { + int32_t root_table_column_id = -1; + std::vector<StructChildSelector> selectors; +}; + +struct NestedPredicateTargetInfo { + int32_t root_file_column_id = -1; + std::vector<int32_t> file_child_id_path; + std::string leaf_name; + DataTypePtr file_leaf_type; +}; + // A split-local literal produced by slot-literal predicate localization. // // TableColumnMapper currently rewrites VExpr trees in-place because VExpr has no generic deep @@ -161,6 +182,430 @@ static VExprSPtr original_table_literal(const VExprSPtr& literal_expr) { rewritten_literal->original_field()); } +static bool is_struct_element_expr(const VExprSPtr& expr) { + return expr != nullptr && expr->get_num_children() == 2 && + expr->fn().name.function_name == "struct_element"; +} + +static bool parse_struct_child_selector(const VExprSPtr& expr, StructChildSelector* selector) { + DORIS_CHECK(selector != nullptr); + if (expr == nullptr || !expr->is_literal()) { + return false; + } + const Field field = literal_field(expr); + switch (field.get_type()) { + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: + selector->by_name = true; + selector->name = std::string(field.as_string_view()); + return true; + case TYPE_BOOLEAN: + selector->by_name = false; + selector->ordinal = field.get<TYPE_BOOLEAN>() ? 1 : 0; + return selector->ordinal > 0; + case TYPE_TINYINT: + selector->by_name = false; + if (field.get<TYPE_TINYINT>() <= 0) { + return false; + } + selector->ordinal = cast_set<size_t>(field.get<TYPE_TINYINT>()); + return true; + case TYPE_SMALLINT: + selector->by_name = false; + if (field.get<TYPE_SMALLINT>() <= 0) { + return false; + } + selector->ordinal = cast_set<size_t>(field.get<TYPE_SMALLINT>()); + return true; + case TYPE_INT: + selector->by_name = false; + if (field.get<TYPE_INT>() <= 0) { + return false; + } + selector->ordinal = cast_set<size_t>(field.get<TYPE_INT>()); + return true; + case TYPE_BIGINT: + selector->by_name = false; + if (field.get<TYPE_BIGINT>() <= 0) { + return false; + } + selector->ordinal = cast_set<size_t>(field.get<TYPE_BIGINT>()); + return true; + default: + return false; + } +} + +static bool extract_nested_struct_path(const VExprSPtr& expr, NestedStructPath* path) { + DORIS_CHECK(path != nullptr); + if (!is_struct_element_expr(expr)) { + return false; + } + + StructChildSelector selector; + if (!parse_struct_child_selector(expr->children()[1], &selector)) { + return false; + } + + const auto& parent = expr->children()[0]; + if (parent->is_slot_ref()) { + const auto* slot_ref = assert_cast<const VSlotRef*>(parent.get()); + path->root_table_column_id = slot_ref->slot_id(); + path->selectors.clear(); + path->selectors.push_back(std::move(selector)); + return true; + } + + if (!extract_nested_struct_path(parent, path)) { + return false; + } + path->selectors.push_back(std::move(selector)); + return true; +} + +static void collect_nested_struct_paths(const VExprSPtr& expr, + std::vector<NestedStructPath>* paths) { + DORIS_CHECK(paths != nullptr); + if (expr == nullptr) { + return; + } + NestedStructPath path; + if (extract_nested_struct_path(expr, &path)) { + paths->push_back(std::move(path)); + return; + } + for (const auto& child : expr->children()) { + collect_nested_struct_paths(child, paths); + } +} + +static const SchemaField* find_schema_child_by_field_id(const std::vector<SchemaField>& children, + int32_t field_id) { + const auto child_it = std::ranges::find_if( + children, [&](const SchemaField& child) { return child.id == field_id; }); + return child_it == children.end() ? nullptr : &*child_it; +} + +static const SchemaField* resolve_file_child(const std::vector<SchemaField>& children, + const StructChildSelector& selector) { + if (selector.by_name) { + const auto child_it = std::ranges::find_if( + children, [&](const SchemaField& child) { return child.name == selector.name; }); + return child_it == children.end() ? nullptr : &*child_it; + } + if (selector.ordinal == 0 || selector.ordinal > children.size()) { + return nullptr; + } + return &children[selector.ordinal - 1]; +} + +static Status build_filter_projection_path(const std::vector<SchemaField>& children, + std::span<const StructChildSelector> selectors, + FieldProjection* projection) { + DORIS_CHECK(projection != nullptr); + if (selectors.empty()) { + return Status::InvalidArgument("Nested struct selector path is empty"); + } + const auto* child = resolve_file_child(children, selectors.front()); + if (child == nullptr) { + return Status::OK(); + } + projection->field_id = child->id; + projection->project_all_children = selectors.size() == 1; + projection->children.clear(); + if (selectors.size() == 1) { + return Status::OK(); + } + if (child->children.empty() || + remove_nullable(child->type)->get_primitive_type() != TYPE_STRUCT) { + projection->field_id = -1; + return Status::OK(); + } + FieldProjection child_projection; + RETURN_IF_ERROR( + build_filter_projection_path(child->children, selectors.subspan(1), &child_projection)); + if (child_projection.field_id < 0) { + projection->field_id = -1; + return Status::OK(); + } + projection->children.push_back(std::move(child_projection)); + return Status::OK(); +} + +static const SchemaField* resolve_filter_schema_path(const std::vector<SchemaField>& children, + std::span<const StructChildSelector> selectors, + std::vector<int32_t>* file_child_id_path) { + DORIS_CHECK(file_child_id_path != nullptr); + if (selectors.empty()) { + return nullptr; + } + const auto* child = resolve_file_child(children, selectors.front()); + if (child == nullptr) { + return nullptr; + } + file_child_id_path->push_back(child->id); + if (selectors.size() == 1) { + return child; + } + if (child->children.empty() || + remove_nullable(child->type)->get_primitive_type() != TYPE_STRUCT) { + file_child_id_path->clear(); + return nullptr; + } + const auto* leaf = + resolve_filter_schema_path(child->children, selectors.subspan(1), file_child_id_path); + if (leaf == nullptr) { + file_child_id_path->clear(); + } + return leaf; +} + +static bool resolve_nested_predicate_target(const NestedStructPath& path, + const std::vector<ColumnMapping>& mappings, + NestedPredicateTargetInfo* target) { + DORIS_CHECK(target != nullptr); + if (path.selectors.empty()) { + return false; + } + const auto mapping_it = std::ranges::find_if(mappings, [&](const ColumnMapping& mapping) { + return mapping.table_column_id == path.root_table_column_id; + }); + if (mapping_it == mappings.end() || !mapping_it->field_id.has_value()) { + return false; + } + std::vector<int32_t> file_child_id_path; + const auto* leaf = resolve_filter_schema_path(mapping_it->original_file_children, + path.selectors, &file_child_id_path); + if (leaf == nullptr || leaf->type == nullptr || + is_complex_type(remove_nullable(leaf->type)->get_primitive_type())) { + return false; + } + + target->root_file_column_id = *mapping_it->field_id; + target->file_child_id_path = std::move(file_child_id_path); + target->leaf_name = leaf->name; + target->file_leaf_type = remove_nullable(leaf->type); + return true; +} + +static std::optional<PredicateType> to_column_predicate_type(TExprOpcode::type opcode) { + switch (opcode) { + case TExprOpcode::EQ: + return PredicateType::EQ; + case TExprOpcode::NE: + return PredicateType::NE; + case TExprOpcode::GT: + return PredicateType::GT; + case TExprOpcode::GE: + return PredicateType::GE; + case TExprOpcode::LT: + return PredicateType::LT; + case TExprOpcode::LE: + return PredicateType::LE; + default: + return std::nullopt; + } +} + +static TExprOpcode::type reverse_comparison_opcode(TExprOpcode::type opcode) { + switch (opcode) { + case TExprOpcode::GT: + return TExprOpcode::LT; + case TExprOpcode::GE: + return TExprOpcode::LE; + case TExprOpcode::LT: + return TExprOpcode::GT; + case TExprOpcode::LE: + return TExprOpcode::GE; + default: + return opcode; + } +} + +static std::shared_ptr<ColumnPredicate> create_comparison_column_predicate( + PredicateType predicate_type, uint32_t column_id, const std::string& column_name, + const DataTypePtr& data_type, const Field& value) { + switch (predicate_type) { + case PredicateType::EQ: + return create_comparison_predicate<PredicateType::EQ>(column_id, column_name, data_type, + value, false); + case PredicateType::NE: + return create_comparison_predicate<PredicateType::NE>(column_id, column_name, data_type, + value, false); + case PredicateType::GT: + return create_comparison_predicate<PredicateType::GT>(column_id, column_name, data_type, + value, false); + case PredicateType::GE: + return create_comparison_predicate<PredicateType::GE>(column_id, column_name, data_type, + value, false); + case PredicateType::LT: + return create_comparison_predicate<PredicateType::LT>(column_id, column_name, data_type, + value, false); + case PredicateType::LE: + return create_comparison_predicate<PredicateType::LE>(column_id, column_name, data_type, + value, false); + default: + return nullptr; + } +} + +static std::shared_ptr<ColumnPredicate> build_nested_comparison_predicate( + const VExprSPtr& literal_expr, TExprOpcode::type opcode, + const NestedPredicateTargetInfo& target) { + if (literal_expr == nullptr || !literal_expr->is_literal() || + target.file_leaf_type == nullptr) { + return nullptr; + } + const auto predicate_type = to_column_predicate_type(opcode); + if (!predicate_type.has_value()) { + return nullptr; + } + const auto original_literal = original_table_literal(literal_expr); + const Field original_field = literal_field(original_literal); + Field file_field; + try { + convert_field_to_type(original_field, *target.file_leaf_type, &file_field, + original_literal->data_type().get()); + } catch (const Exception&) { + return nullptr; + } + if (file_field.is_null()) { + return nullptr; + } + try { + return create_comparison_column_predicate( + *predicate_type, cast_set<uint32_t>(target.root_file_column_id), target.leaf_name, + target.file_leaf_type, file_field); + } catch (const Exception&) { + return nullptr; + } +} + +static bool extract_nested_binary_comparison_filter(const VExprSPtr& expr, + const std::vector<ColumnMapping>& mappings, + FileColumnPredicateFilter* column_filter) { + DORIS_CHECK(column_filter != nullptr); + if (!is_binary_comparison_predicate(expr)) { + return false; + } + NestedStructPath path; + VExprSPtr literal_expr; + TExprOpcode::type opcode = expr->op(); + if (extract_nested_struct_path(expr->children()[0], &path) && + expr->children()[1]->is_literal()) { + literal_expr = expr->children()[1]; + } else if (extract_nested_struct_path(expr->children()[1], &path) && + expr->children()[0]->is_literal()) { + literal_expr = expr->children()[0]; + opcode = reverse_comparison_opcode(opcode); + } else { + return false; + } + + NestedPredicateTargetInfo target; + if (!resolve_nested_predicate_target(path, mappings, &target)) { + return false; + } + auto predicate = build_nested_comparison_predicate(literal_expr, opcode, target); + if (predicate == nullptr) { + return false; + } + column_filter->file_column_id = target.root_file_column_id; + column_filter->file_child_id_path = std::move(target.file_child_id_path); + column_filter->predicates.push_back(std::move(predicate)); + return true; +} + +static void merge_column_predicate_filter(FileColumnPredicateFilter column_filter, + std::vector<FileColumnPredicateFilter>* filters) { + DORIS_CHECK(filters != nullptr); + auto existing_filter_it = std::ranges::find_if(*filters, [&](const auto& existing_filter) { + return existing_filter.file_column_id == column_filter.file_column_id && + existing_filter.file_child_id_path == column_filter.file_child_id_path; + }); + if (existing_filter_it == filters->end()) { + filters->push_back(std::move(column_filter)); + return; + } + existing_filter_it->predicates.insert(existing_filter_it->predicates.end(), + column_filter.predicates.begin(), + column_filter.predicates.end()); +} + +static void collect_nested_column_predicate_filters( + const VExprSPtr& expr, const std::vector<ColumnMapping>& mappings, + std::vector<FileColumnPredicateFilter>* filters) { + DORIS_CHECK(filters != nullptr); + if (expr == nullptr) { + return; + } + if (expr->node_type() == TExprNodeType::COMPOUND_PRED && + expr->op() == TExprOpcode::COMPOUND_AND) { + for (const auto& child : expr->children()) { + collect_nested_column_predicate_filters(child, mappings, filters); + } + return; + } + FileColumnPredicateFilter column_filter; + if (extract_nested_binary_comparison_filter(expr, mappings, &column_filter)) { + merge_column_predicate_filter(std::move(column_filter), filters); + } +} + +static void merge_field_projection(FieldProjection* target, const FieldProjection& source) { + DORIS_CHECK(target != nullptr); + DORIS_CHECK(target->field_id == source.field_id); + if (target->project_all_children) { + return; + } + if (source.project_all_children) { + target->project_all_children = true; + target->children.clear(); + return; + } + for (const auto& source_child : source.children) { + auto target_child_it = std::ranges::find_if( + target->children, + [&](const FieldProjection& c) { return c.field_id == source_child.field_id; }); + if (target_child_it == target->children.end()) { + target->children.push_back(source_child); + continue; + } + merge_field_projection(&*target_child_it, source_child); + } +} + +static Status build_projected_type_from_projection(const DataTypePtr& file_type, + const std::vector<SchemaField>& children, + const FieldProjection& projection, + DataTypePtr* projected_type) { + DORIS_CHECK(file_type != nullptr); + DORIS_CHECK(projected_type != nullptr); + if (projection.project_all_children || projection.children.empty()) { + *projected_type = file_type; + return Status::OK(); + } + + DataTypes child_types; + Strings child_names; + child_types.reserve(projection.children.size()); + child_names.reserve(projection.children.size()); + for (const auto& child_projection : projection.children) { + const auto* child = find_schema_child_by_field_id(children, child_projection.field_id); + if (child == nullptr) { + return Status::InvalidArgument("Invalid projected child field id {}", + child_projection.field_id); + } + DataTypePtr child_type; + RETURN_IF_ERROR(build_projected_type_from_projection(child->type, child->children, + child_projection, &child_type)); + child_types.push_back(std::move(child_type)); + child_names.push_back(child->name); + } + return rebuild_projected_type(file_type, child_types, child_names, projected_type); +} + static VExprSPtr rewrite_literal_to_file_type(const VExprSPtr& literal_expr, const FileSlotRewriteInfo& rewrite_info) { DORIS_CHECK(literal_expr != nullptr); @@ -280,6 +725,24 @@ static VExprSPtr rewrite_table_expr_to_file_expr( if (rewrite_in_slot_literal_predicate(expr, table_column_to_file_slot)) { return expr; } + if (is_struct_element_expr(expr)) { + auto children = expr->children(); + if (children[0]->is_slot_ref()) { + const auto* slot_ref = assert_cast<const VSlotRef*>(children[0].get()); + const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id()); + if (rewrite_it != table_column_to_file_slot.end()) { + // struct_element must see the actual file struct layout. Casting the parent struct + // to the output projection can hide filter-only children such as `s.id` in + // `SELECT s.name WHERE s.id > 5`. + children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second); + expr->set_children(std::move(children)); + return expr; + } + } + children[0] = rewrite_table_expr_to_file_expr(children[0], table_column_to_file_slot); + expr->set_children(std::move(children)); + return expr; + } if (expr->is_slot_ref()) { const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get()); const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id()); @@ -404,7 +867,11 @@ static Status rebuild_projected_file_type(ColumnMapping* mapping) { if (mapping == nullptr) { return Status::InvalidArgument("mapping is null"); } - DORIS_CHECK(is_complex_type(mapping->file_type->get_primitive_type())); + if (mapping->original_file_type == nullptr) { + mapping->original_file_type = mapping->file_type; + } + DORIS_CHECK( + is_complex_type(remove_nullable(mapping->original_file_type)->get_primitive_type())); DataTypes child_types; Strings child_names; child_types.reserve(mapping->child_mappings.size()); @@ -423,7 +890,7 @@ static Status rebuild_projected_file_type(ColumnMapping* mapping) { return Status::NotSupported("Projection for complex column {} contains no file children", mapping->file_column_name); } - RETURN_IF_ERROR(build_projected_child_type(mapping->file_type, mapping->child_mappings, + RETURN_IF_ERROR(build_projected_child_type(mapping->original_file_type, mapping->child_mappings, &mapping->file_type)); mapping->is_trivial = mapping->table_type != nullptr && mapping->table_type->equals(*mapping->file_type); @@ -431,8 +898,46 @@ static Status rebuild_projected_file_type(ColumnMapping* mapping) { return Status::OK(); } +using FilterProjectionMap = std::map<int32_t, FieldProjection>; + +static Status apply_projection_to_mapping_file_type(const FieldProjection& projection, + ColumnMapping* mapping) { + DORIS_CHECK(mapping != nullptr); + if (mapping->original_file_type == nullptr) { + mapping->original_file_type = mapping->file_type; + } + if (mapping->original_file_type == nullptr || + !is_complex_type(remove_nullable(mapping->original_file_type)->get_primitive_type())) { + return Status::OK(); + } + DataTypePtr projected_type; + RETURN_IF_ERROR(build_projected_type_from_projection(mapping->original_file_type, + mapping->original_file_children, + projection, &projected_type)); + mapping->file_type = std::move(projected_type); + mapping->has_complex_projection = !projection.project_all_children; + mapping->is_trivial = + mapping->table_type != nullptr && mapping->table_type->equals(*mapping->file_type); + return Status::OK(); +} + +static Status merge_filter_projection(const FilterProjectionMap* filter_projections, + FieldProjection* projection) { + DORIS_CHECK(projection != nullptr); + if (filter_projections == nullptr) { + return Status::OK(); + } + const auto filter_projection_it = filter_projections->find(projection->field_id); + if (filter_projection_it == filter_projections->end()) { + return Status::OK(); + } + merge_field_projection(projection, filter_projection_it->second); + return Status::OK(); +} + static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapping, - std::vector<FieldProjection>* scan_columns) { + std::vector<FieldProjection>* scan_columns, + const FilterProjectionMap* filter_projections = nullptr) { auto file_column_id = mapping->field_id.value(); if (scan_columns == &file_request->non_predicate_columns && std::ranges::find_if(file_request->predicate_columns, [&](const FieldProjection& p) { @@ -454,11 +959,18 @@ static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapp } RETURN_IF_ERROR(build_complex_projection(*mapping, &projection)); } - if (std::ranges::find_if(scan_columns->begin(), scan_columns->end(), - [&](const FieldProjection& p) { - return p.field_id == file_column_id; - }) == scan_columns->end()) { + if (scan_columns == &file_request->predicate_columns) { + RETURN_IF_ERROR(merge_filter_projection(filter_projections, &projection)); + } + RETURN_IF_ERROR(apply_projection_to_mapping_file_type(projection, mapping)); + + auto existing_projection_it = std::ranges::find_if( + *scan_columns, [&](const FieldProjection& p) { return p.field_id == file_column_id; }); + if (existing_projection_it == scan_columns->end()) { scan_columns->push_back(std::move(projection)); + } else { + merge_field_projection(&*existing_projection_it, projection); + RETURN_IF_ERROR(apply_projection_to_mapping_file_type(*existing_projection_it, mapping)); } if (scan_columns == &file_request->predicate_columns) { file_request->non_predicate_columns.erase( @@ -470,6 +982,48 @@ static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapp return Status::OK(); } +static Status build_filter_projection_map(const std::vector<TableFilter>& table_filters, + std::vector<ColumnMapping>* mappings, + FilterProjectionMap* filter_projections) { + DORIS_CHECK(mappings != nullptr); + DORIS_CHECK(filter_projections != nullptr); + filter_projections->clear(); + for (const auto& table_filter : table_filters) { + if (table_filter.conjunct == nullptr) { + continue; + } + std::vector<NestedStructPath> paths; + collect_nested_struct_paths(table_filter.conjunct->root(), &paths); + for (const auto& path : paths) { + auto mapping_it = std::ranges::find_if(*mappings, [&](const ColumnMapping& mapping) { + return mapping.table_column_id == path.root_table_column_id; + }); + if (mapping_it == mappings->end() || !mapping_it->field_id.has_value() || + path.selectors.empty()) { + continue; + } + + FieldProjection child_projection; + RETURN_IF_ERROR(build_filter_projection_path(mapping_it->original_file_children, + path.selectors, &child_projection)); + if (child_projection.field_id < 0) { + continue; + } + + FieldProjection root_projection {.field_id = *mapping_it->field_id, + .project_all_children = false}; + root_projection.children.push_back(std::move(child_projection)); + auto filter_projection_it = filter_projections->find(root_projection.field_id); + if (filter_projection_it == filter_projections->end()) { + filter_projections->emplace(root_projection.field_id, std::move(root_projection)); + continue; + } + merge_field_projection(&filter_projection_it->second, root_projection); + } + } + return Status::OK(); +} + static void rebuild_projection(ColumnMapping* mapping, size_t block_position) { DORIS_CHECK(mapping->field_id.has_value()); if (mapping->is_trivial || mapping->has_complex_projection) { @@ -671,14 +1225,16 @@ Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table FileScanRequest* file_request) { // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和 // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。 + FilterProjectionMap filter_projections; + RETURN_IF_ERROR(build_filter_projection_map(table_filters, &_mappings, &filter_projections)); for (const auto& table_filter : table_filters) { for (const auto table_column_id : filter_slot_ids(table_filter)) { auto* mapping = _find_mapping(table_column_id); if (mapping == nullptr || !mapping->field_id.has_value()) { continue; } - RETURN_IF_ERROR( - add_scan_column(file_request, mapping, &file_request->predicate_columns)); + RETURN_IF_ERROR(add_scan_column(file_request, mapping, &file_request->predicate_columns, + &filter_projections)); } } @@ -703,6 +1259,18 @@ Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table column_predicate_filter.predicates = predicates; file_request->column_predicate_filters.push_back(std::move(column_predicate_filter)); } + for (const auto& table_filter : table_filters) { + if (table_filter.conjunct == nullptr) { + continue; + } + std::vector<FileColumnPredicateFilter> nested_column_predicate_filters; + collect_nested_column_predicate_filters(table_filter.conjunct->root(), _mappings, + &nested_column_predicate_filters); + for (auto& column_predicate_filter : nested_column_predicate_filters) { + merge_column_predicate_filter(std::move(column_predicate_filter), + &file_request->column_predicate_filters); + } + } return Status::OK(); } @@ -729,6 +1297,8 @@ Status TableColumnMapper::_create_direct_mapping(const TableColumn& table_column } mapping->field_id = file_field.id; mapping->file_column_name = file_field.name; + mapping->original_file_type = file_field.type; + mapping->original_file_children = file_field.children; mapping->file_type = file_field.type; mapping->is_trivial = _is_same_type(mapping->table_type, mapping->file_type); mapping->child_mappings.clear(); diff --git a/be/src/format/reader/column_mapper.h b/be/src/format/reader/column_mapper.h index c52605cd3c9..a70e246bcce 100644 --- a/be/src/format/reader/column_mapper.h +++ b/be/src/format/reader/column_mapper.h @@ -30,6 +30,7 @@ #include "core/data_type/data_type.h" #include "exprs/vexpr_fwd.h" #include "format/reader/expr/literal.h" +#include "format/reader/file_reader.h" namespace doris { class ColumnPredicate; @@ -39,9 +40,6 @@ namespace doris::reader { struct TableColumn; struct TableFilter; -struct SchemaField; -struct FileScanRequest; -struct FieldProjection; using TableColumnPredicates = std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>; @@ -67,6 +65,8 @@ struct ColumnMapping { // File-local field id for top-level columns, or child id for nested columns. std::optional<int32_t> field_id; std::string file_column_name; + DataTypePtr original_file_type; + std::vector<SchemaField> original_file_children; std::vector<int32_t> file_path; DataTypePtr file_type; DataTypePtr table_type; diff --git a/be/src/format/reader/file_reader.h b/be/src/format/reader/file_reader.h index 315128f33e2..9d118ea56de 100644 --- a/be/src/format/reader/file_reader.h +++ b/be/src/format/reader/file_reader.h @@ -75,6 +75,9 @@ struct FieldProjection { // dictionary and bloom filter. Predicates must all belong to file_column_id. struct FileColumnPredicateFilter { ColumnId file_column_id = -1; + // File-local child field-id path under file_column_id. Empty means top-level scalar. + // The ids are Parquet/Doris file schema child ids, not table ids and not child ordinals. + std::vector<int32_t> file_child_id_path; std::vector<std::shared_ptr<ColumnPredicate>> predicates; }; diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index 6c5cccbb27a..5e0fe23b2fe 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -36,6 +36,7 @@ #include "core/block/block.h" #include "core/column/column_string.h" #include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" #include "core/data_type/data_type_struct.h" @@ -48,6 +49,7 @@ #include "format/new_parquet/reader/column_reader.h" #include "format/reader/column_mapper.h" #include "format/reader/expr/delete_predicate.h" +#include "format/reader/expr/literal.h" #include "format/reader/expr/slot_ref.h" #include "format/reader/file_reader.h" #include "format/reader/table_reader.h" @@ -106,6 +108,39 @@ private: const std::string _expr_name = "Int32GreaterThanExpr"; }; +class TestFunctionExpr final : public VExpr { +public: + TestFunctionExpr(std::string function_name, DataTypePtr data_type, + TExprNodeType::type node_type = TExprNodeType::FUNCTION_CALL, + TExprOpcode::type opcode = TExprOpcode::INVALID_OPCODE) + : VExpr(std::move(data_type), false), _expr_name(std::move(function_name)) { + set_node_type(node_type); + _opcode = opcode; + TFunctionName fn_name; + fn_name.__set_function_name(_expr_name); + _fn.__set_name(fn_name); + } + + const std::string& expr_name() const override { return _expr_name; } + + Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + return Status::NotSupported("TestFunctionExpr is only used for mapper expression analysis"); + } + +private: + const std::string _expr_name; +}; + +VExprSPtr struct_element_expr(const VExprSPtr& parent, const DataTypePtr& child_type, + const std::string& child_name) { + auto expr = std::make_shared<TestFunctionExpr>("struct_element", make_nullable(child_type)); + expr->add_child(parent); + expr->add_child(TableLiteral::create_shared(std::make_shared<DataTypeString>(), + Field::create_field<TYPE_STRING>(child_name))); + return expr; +} + class Int32SumGreaterThanExpr final : public VExpr { public: Int32SumGreaterThanExpr(int left_column_id, int right_column_id, int32_t value) @@ -219,6 +254,27 @@ std::shared_ptr<arrow::Array> build_string_array(const std::vector<std::string>& return finish_array(&builder); } +std::shared_ptr<arrow::Array> build_struct_array(const std::vector<int32_t>& ids, + const std::vector<std::string>& names) { + auto struct_type = arrow::struct_({arrow::field("id", arrow::int32(), false), + arrow::field("name", arrow::utf8(), false)}); + std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders; + auto id_builder = std::make_unique<arrow::Int32Builder>(); + field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(id_builder))); + auto name_builder = std::make_unique<arrow::StringBuilder>(); + field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(name_builder))); + arrow::StructBuilder builder(struct_type, arrow::default_memory_pool(), + std::move(field_builders)); + auto* struct_id_builder = assert_cast<arrow::Int32Builder*>(builder.field_builder(0)); + auto* struct_name_builder = assert_cast<arrow::StringBuilder*>(builder.field_builder(1)); + for (size_t row = 0; row < ids.size(); ++row) { + EXPECT_TRUE(builder.Append().ok()); + EXPECT_TRUE(struct_id_builder->Append(ids[row]).ok()); + EXPECT_TRUE(struct_name_builder->Append(names[row]).ok()); + } + return finish_array(&builder); +} + void write_parquet_file(const std::string& file_path, int64_t row_group_size = ROW_COUNT) { auto schema = arrow::schema({ arrow::field("id", arrow::int32(), false), @@ -262,6 +318,28 @@ void write_int_pair_parquet_file(const std::string& file_path, int64_t row_group row_group_size, builder.build())); } +void write_struct_filter_parquet_file(const std::string& file_path) { + auto id_field = arrow::field("id", arrow::int32(), false); + auto name_field = arrow::field("name", arrow::utf8(), false); + auto struct_type = arrow::struct_({id_field, name_field}); + auto schema = arrow::schema({ + arrow::field("s", struct_type, false), + }); + auto table = arrow::Table::Make( + schema, {build_struct_array({1, 2, 10, 11}, {"one", "two", "ten", "eleven"})}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 2, + builder.build())); +} + void write_dictionary_filter_parquet_file(const std::string& file_path) { auto schema = arrow::schema({ arrow::field("id", arrow::int32(), false), @@ -522,6 +600,78 @@ TEST(TableColumnMapperTest, CreatesComplexProjectionForStructChildren) { EXPECT_EQ(projected_type->get_element_name(0), "b"); } +TEST(TableColumnMapperTest, MergesStructFilterOnlyChildIntoPredicateProjection) { + auto a_type = std::make_shared<DataTypeInt32>(); + auto b_type = std::make_shared<DataTypeString>(); + reader::SchemaField a_field; + a_field.id = 0; + a_field.name = "a"; + a_field.type = a_type; + reader::SchemaField b_field; + b_field.id = 1; + b_field.name = "b"; + b_field.type = b_type; + reader::SchemaField struct_field; + struct_field.id = 0; + struct_field.name = "s"; + struct_field.type = + std::make_shared<DataTypeStruct>(DataTypes {a_type, b_type}, Strings {"a", "b"}); + struct_field.children = {a_field, b_field}; + + reader::TableColumn table_child; + table_child.id = 101; + table_child.name = "b"; + table_child.type = b_type; + reader::TableColumn table_column; + table_column.id = 100; + table_column.name = "s"; + table_column.type = std::make_shared<DataTypeStruct>(DataTypes {b_type}, Strings {"b"}); + table_column.children = {table_child}; + + const auto full_table_struct_type = + std::make_shared<DataTypeStruct>(DataTypes {a_type, b_type}, Strings {"a", "b"}); + auto filter_expr = std::make_shared<TestFunctionExpr>( + "gt", std::make_shared<DataTypeUInt8>(), TExprNodeType::BINARY_PRED, TExprOpcode::GT); + filter_expr->add_child(struct_element_expr( + TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, "a")); + filter_expr->add_child(TableLiteral::create_shared(a_type, Field::create_field<TYPE_INT>(5))); + reader::TableFilter table_filter { + .conjunct = VExprContext::create_shared(filter_expr), + .slot_ids = {100}, + }; + + reader::TableColumnMapperOptions options; + options.mode = reader::TableColumnMappingMode::BY_NAME; + reader::TableColumnMapper mapper(options); + ASSERT_TRUE(mapper.create_mapping({table_column}, {}, {struct_field}).ok()); + + reader::FileScanRequest request; + ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, {table_column}, &request).ok()); + + EXPECT_TRUE(request.non_predicate_columns.empty()); + ASSERT_EQ(request.predicate_columns.size(), 1); + const auto& projection = request.predicate_columns[0]; + EXPECT_EQ(projection.field_id, 0); + ASSERT_FALSE(projection.project_all_children); + ASSERT_EQ(projection.children.size(), 2); + EXPECT_EQ(projection.children[0].field_id, 1); + EXPECT_EQ(projection.children[1].field_id, 0); + ASSERT_EQ(request.column_predicate_filters.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].file_column_id, 0); + EXPECT_EQ(request.column_predicate_filters[0].file_child_id_path, std::vector<int32_t>({0})); + ASSERT_EQ(request.column_predicate_filters[0].predicates.size(), 1); + EXPECT_EQ(request.column_predicate_filters[0].predicates[0]->type(), PredicateType::GT); + + ASSERT_EQ(mapper.mappings().size(), 1); + ASSERT_EQ(mapper.mappings()[0].child_mappings.size(), 1); + EXPECT_EQ(mapper.mappings()[0].child_mappings[0].file_column_name, "b"); + const auto* read_type = + assert_cast<const DataTypeStruct*>(mapper.mappings()[0].file_type.get()); + ASSERT_EQ(read_type->get_elements().size(), 2); + EXPECT_EQ(read_type->get_element_name(0), "b"); + EXPECT_EQ(read_type->get_element_name(1), "a"); +} + TEST(TableColumnMapperTest, CreatesComplexProjectionForMapValueStructChildren) { auto key_type = std::make_shared<DataTypeInt32>(); auto a_type = std::make_shared<DataTypeInt32>(); @@ -1161,6 +1311,42 @@ TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByDictionary) { EXPECT_EQ(values, std::vector<std::string>({"lm"})); } +TEST_F(NewParquetReaderTest, NestedStructPredicateFiltersRowGroupsByStatistics) { + write_struct_filter_parquet_file(_file_path); + auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); + ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 2); + + std::vector<std::unique_ptr<parquet::ParquetColumnSchema>> file_schema; + auto schema_descriptor = parquet_file_reader->metadata()->schema(); + ASSERT_NE(schema_descriptor, nullptr); + ASSERT_TRUE(parquet::build_parquet_column_schema(*schema_descriptor, &file_schema).ok()); + ASSERT_EQ(file_schema.size(), 1); + ASSERT_EQ(file_schema[0]->children.size(), 2); + ASSERT_EQ(file_schema[0]->children[0]->name, "id"); + + reader::FileScanRequest request; + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = 0; + column_filter.file_child_id_path = {0}; + auto id_type = std::make_shared<DataTypeInt32>(); + column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>( + 0, "id", id_type, Field::create_field<TYPE_INT>(5), false)); + request.column_predicate_filters.push_back(std::move(column_filter)); + + parquet::RowGroupScanPlan plan; + parquet::ParquetScanRange scan_range; + ASSERT_TRUE(parquet::plan_parquet_row_groups(*parquet_file_reader->metadata(), + parquet_file_reader.get(), file_schema, request, + scan_range, false, &plan) + .ok()); + ASSERT_EQ(plan.row_groups.size(), 1); + EXPECT_EQ(plan.row_groups[0].row_group_id, 1); + EXPECT_EQ(plan.pruning_stats.total_row_groups, 2); + EXPECT_EQ(plan.pruning_stats.selected_row_groups, 1); + EXPECT_EQ(plan.pruning_stats.filtered_row_groups_by_statistics, 1); + EXPECT_EQ(plan.pruning_stats.filtered_group_rows, 2); +} + TEST_F(NewParquetReaderTest, PlannerNarrowsRowRangesByPageIndex) { write_page_index_filter_parquet_file(_file_path); auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); diff --git a/docs/complex-column-predicate-and-stats-filtering.md b/docs/complex-column-predicate-and-stats-filtering.md index c05ba5fb7a0..448149ddea4 100644 --- a/docs/complex-column-predicate-and-stats-filtering.md +++ b/docs/complex-column-predicate-and-stats-filtering.md @@ -1,201 +1,272 @@ # 复杂类型谓词过滤和统计信息过滤实现方案 -本文分析当前实现的限制,并以 DuckDB 为参考提出实现方案。 +本文聚焦 `STRUCT` 内 primitive leaf 的谓词过滤和 Parquet file-layer pruning。目标是参考 DuckDB 的 nested filter 语义,同时保持 Doris 当前 new parquet reader 的 block 布局和 `Expr` 行级过滤原则。 -## 1. 谓词过滤:嵌套字段无法下推到 Parquet 层 +## 1. DuckDB 参考模型 -### 1.1 问题根因 +DuckDB 的核心不是把 `struct_extract(s, 'id')` 改写成一个普通 leaf slot,而是在 filter 中保留 nested 结构。 -`WHERE s.id > 5` 的 VExpr 树是 `binary_predicate(GT, struct_element(VSlotRef(s), 'id'), literal(5))`。`VSlotRef` 的 `slot_id` 是父 struct `s` 的 table_column_id(记为 X),不是子字段 `id` 的 table_column_id(记为 Y,存储在 `child_mappings` 中)。 +### 1.1 StructFilter -`localize_filters()` 在以下三步中丢失了嵌套信息: +DuckDB 使用 `StructFilter` 表示 `s.id > 5`: +```text +StructFilter( + child_idx = id 在当前 struct 中的位置, + child_name = "id", + child_filter = ConstantFilter(GT, 5) +) ``` -build_file_slot_rewrite_map() - → 只遍历 _mappings(顶级列),不遍历 child_mappings - → 子字段 Y 不在映射表中 - -rewrite_table_expr_to_file_expr() 中的 fast path - → find_slot_rewrite_info 寻找 slot_ref 的 slot_id - → struct_element(VSlotRef(s)) 不是裸 VSlotRef,不会被 fast path 匹配 - → 回退到递归重写子节点 - → VSlotRef(s) 被找到并重写为 file block slot - → 但 struct_element 函数调用本身不会被重写 - -localize_filters() Phase 1 - → filter_slot_ids() 收集的是 VSlotRef 的 slot_id = X(父 struct) - → _find_mapping(X) 找到父 struct 的映射 ✓ - → add_scan_column 把父 struct 加入 predicate_columns - → 但子字段 Y 未被单独加入 → 无法享受统计信息剪枝 + +多层嵌套 `s.a.b > 5` 会递归包装成: + +```text +StructFilter(a_idx, StructFilter(b_idx, ConstantFilter(GT, 5))) ``` -**结论**:VExpr 表达式最终能在 file block 上正确求值(因为 struct_element 运行时会从 struct column 中提取子字段),但无法分解为子字段级别的优化(单列统计剪枝、literal 类型推断、子字段级 predicate_column 标记)。 +关键行为: -### 1.2 DuckDB 的解决方案:StructFilter +- `StructFilter::CheckStatistics()` 先从 struct stats 中取 child stats,再递归调用 child filter。 +- `StructFilter::ToExpression()` 可以还原成 `struct_extract` 表达式,用于运行时过滤。 +- `MultiFileColumnMapper::TryCastTableFilter()` 通过 `MultiFileIndexMapping.child_mapping` 把 global child index 递归重映射为 local child index。 -DuckDB 在优化器阶段(`FilterCombiner`)将 `struct_extract(s, 'id') > 5` 转换为: +参考: -``` -StructFilter( - child_idx = id 在 struct 中的位置, - child_name = "id", - child_filter = ConstantFilter(GT, 5) -) +- `/Users/xiaogangsu/code/duckdb/src/planner/filter/struct_filter.cpp` +- `/Users/xiaogangsu/code/duckdb/src/common/multi_file/multi_file_column_mapper.cpp` + +### 1.2 Parquet reader / statistics + +DuckDB 的 Parquet reader 按 logical schema 构造 reader tree: + +- root 是 `StructColumnReader`。 +- `STRUCT` child 可以只为 selected child 创建 reader,未选 child reader 为空。 +- `LIST`/`MAP` 仍作为 nested reader,非 primitive leaf 不直接参与普通 min/max pruning。 + +统计信息处理方式: + +- primitive schema 直接读取对应 `column_index` 的 ColumnChunk stats。 +- `STRUCT` 没有自己的 Parquet leaf stats,但 DuckDB 会递归构造 struct child stats。 +- `LIST`/`MAP`/`ARRAY` 返回不支持。 +- row group pruning 时,拿 selected column reader 的 stats,再调用 table filter 的 `CheckStatistics()`。如果 filter 是 `StructFilter`,就递归检查 child stats。 +- bloom filter 只在 selected reader 是非 nested primitive column 时应用。 + +参考: + +- `/Users/xiaogangsu/code/duckdb/extension/parquet/parquet_reader.cpp` +- `/Users/xiaogangsu/code/duckdb/extension/parquet/parquet_statistics.cpp` +- `/Users/xiaogangsu/code/duckdb/extension/parquet/reader/struct_column_reader.cpp` + +## 2. Doris 当前约束 + +Doris new parquet reader 和 DuckDB 的内部布局不同: + +- `FileScanRequest::column_positions` 是 top-level `file_column_id -> file-local block_position`。 +- `FieldProjection.children` 表示 top-level complex column 内部的 nested projection。 +- file block 中 `STRUCT` 是一个 `ColumnStruct`,不是每个 child 一个独立 block slot。 +- `TableReader` materialize struct child 时从 `ColumnStruct::get_column_ptr(child_idx)` 取 child column。 + +因此不能把 `struct_element(VSlotRef(parent), 'id')` 改写为 child `VSlotRef`,除非先改变 file block layout 和 materialization contract。本方案不做这种改变。 + +当前原则仍然是: + +- 行级过滤使用 `Expr` / `VExprContext`。 +- `ColumnPredicate` 只用于 file-layer pruning:row group statistics、dictionary、bloom filter、page index。 +- 所有 pruning 只能在确定不会漏读时生效;无法解析 nested target、类型不匹配、schema 缺失或 repeated 语义不明确时直接保留 row group/page。 + +## 3. 目标模型 + +Doris 应对齐 DuckDB 的 nested filter 语义,但使用适合当前代码的表示: + +```cpp +struct FileNestedPredicateTarget { + // Top-level file column id. For scalar top-level predicates this is also the leaf column id + // resolver entry. + ColumnId file_column_id = -1; + + // File-local child field-id path under file_column_id. Empty means top-level scalar. + // Example: s.id -> [id_file_field_id], s.a.b -> [a_file_field_id, b_file_field_id]. + // This is not table column id and not child ordinal. + std::vector<int32_t> file_child_id_path; +}; ``` -这个 `StructFilter` 是一个递归包装器。对于多层嵌套 `s.a.b > 5`,会产生: +`FileColumnPredicateFilter` 继续表达“一个 file-local target 上的一组 `ColumnPredicate`”,但 target 需要从 top-level 扩展到 nested leaf: + +```cpp +struct FileColumnPredicateFilter { + FileNestedPredicateTarget target; + std::vector<std::shared_ptr<ColumnPredicate>> predicates; +}; ``` -StructFilter(a_idx, StructFilter(b_idx, ConstantFilter(GT, 5))) + +兼容实现可以先保留现有 `file_column_id` 字段,再新增 `file_child_id_path`,但文档语义必须明确:path 是 file-local child field id path。 + +## 4. 谓词过滤实现方案 + +### 4.1 行级 Expr localization + +`WHERE s.id > 5` 的 VExpr 形态仍保留为: + +```text +binary_predicate(GT, struct_element(VSlotRef(s), 'id'), literal(5)) ``` -`StructFilter` 在三层发挥作用: +localization 只把 `VSlotRef(s)` 从 table slot 改写成 file block 中 top-level struct slot。`struct_element` 函数本身不替换成 child slot。 -| 层 | 行为 | -|---|---| -| **统计信息剪枝** | `CheckStatistics(stats)` 递归提取子字段的 `BaseStatistics`,调用 `child_filter->CheckStatistics(child_stats)` | -| **Filter 重映射**(MultiFileColumnMapper) | `TryCastTableFilter` 通过 `MultiFileIndexMapping.child_mapping` 将 global child_idx 重新映射为 local child_idx | -| **运行时过滤** | `FilterSelection(STRUCT_EXTRACT)` 从 struct vector 中提取子 vector,递归应用 `child_filter` | +需要做的事: -**在 Parquet reader 层**,DuckDB 把 struct 的每个子字段当作独立的列 reader。`root_reader.GetChildReader(column_id)` 可以拿到子字段的 reader,`Filter()` 时直接作用于 leaf column vector,StructFilter 在 `ApplyFilter` → `FilterSelection` 中被展开。 +1. `build_file_slot_rewrite_map()` 继续只为 top-level file block slot 建映射。 +2. `rewrite_table_expr_to_file_expr()` 递归改写 `struct_element` 内部的 parent `VSlotRef`。 +3. literal 类型重写可以新增 nested fast path,但只用于把 literal 转成 file child type,不改变表达式的 slot 形态。 -### 1.3 Doris 的实现方案 +### 4.2 filter-only nested projection -Doris 不需要完全照搬 DuckDB 的 StructFilter。因为 Doris 的 VExpr 已经能在 file block 上执行 `struct_element`,过滤的执行层不是问题。需要补齐的是**让 file reader 能感知到子字段级别的列依赖**。 +当前更关键的问题不是行级表达式无法执行,而是 filter 引用的 nested child 可能不在输出 projection 中。 -#### Step 1: 扩展 `build_file_slot_rewrite_map` 遍历 child_mappings +例如: -当前函数只遍历 `_mappings`(顶级列)。需要改为同时遍历 `child_mappings`,将子字段的 `table_column_id` → `FileSlotRewriteInfo` 加入映射。子字段的 `block_position` 不同于父 struct——struct 在 file block 中是一个列(`ColumnStruct`),子字段通过 `ColumnStruct::get_column(child_idx)` 访问。 +```sql +SELECT s.name FROM t WHERE s.id > 5; +``` -**关键问题**:子字段没有独立的 block_position。struct_element 在运行时从 struct 列中提取。所以子字段的 rewrite info 不能直接指向独立的 file block slot。 +这里 `s.name` 是输出 child,`s.id` 是 filter-only child。File reader 应读取同一个 top-level `s`,但 nested projection 应包含 `name` 和 `id`。不能把 `id` 当作独立 block slot,也不能因为输出只需要 `name` 而漏读 `id`。 -**替代方案**:不重写 slot_ref,而是**重写 struct_element 函数**。将 `struct_element(VSlotRef(struct_slot), 'field_name')` 替换为 `VSlotRef(child_slot)`,前提是子字段在 file block 中有独立位置。当前 Doris 的 struct 在 file block 中是展开的(每个投影子字段都有独立的 column),所以子字段确实有独立 block_position。 +实现要求: -这意味着需要在 `rewrite_table_expr_to_file_expr` 中新增一个 fast path:匹配 `struct_element(VSlotRef(parent_slot), field_name)`,找到对应 `ColumnMapping.child_mappings`,替换为子字段的 `VSlotRef`(使用子字段在 file block 中的 block_position)。 +- 从 table filters 中识别 `struct_element(VSlotRef(parent), literal child_name/index)` 链。 +- 将 filter 需要的 nested child path 合并到 `ColumnMapping.child_mappings` 或等价的 `FieldProjection.children`。 +- 同一 top-level complex column 的 output child 和 predicate child 需要去重合并。 +- 如果无法解析 nested path,退回读取 parent struct 的必要范围,保证行级 Expr 可以执行。 -#### Step 2: 扩展 `localize_filters` Phase 1 遍历子字段 +这一步对齐 DuckDB 的 selected child reader 思路,但落在 Doris 的 `FieldProjection` 上。 -当前 `filter_slot_ids()` 只收集裸 VSlotRef 的 slot_id。需要扩展为:对于 `struct_element(VSlotRef(parent), field_name)` 形式的表达式,收集子字段的 table_column_id。 +### 4.3 nested path 识别范围 -然后在 Phase 1 中,对子字段也调用 `_find_mapping`(需要扩展为递归查找 `child_mappings`),将其加入 `predicate_columns`。 +第一阶段只识别稳定且可控的模式: -#### Step 3: literal 类型推断适配 +- `struct_element(VSlotRef(parent), literal_name_or_index)` +- 多层嵌套的连续 `struct_element(struct_element(...), ...)` +- 比较谓词中的常量 literal,用于后续 pruning target 构造 -`find_slot_rewrite_info` 当前要求 binary_predicate 的直接 child 是 VSlotRef(或 Cast(VSlotRef))。对于 `struct_element(...) > 5`,第一个 child 是 function call。需要新增一个 fast path:识别 `struct_element(VSlotRef(parent), literal(name)) > literal` 模式,提取子字段的 file_type 并重写 literal。 +暂不识别: -### 1.4 影响范围 +- `LIST`/`MAP` 元素访问 +- 动态 field name +- 非 deterministic 表达式 +- 需要 row-level 计算才能确定 child path 的表达式 -| 文件 | 改动 | -|---|---| -| `column_mapper.cpp` | `build_file_slot_rewrite_map` 递归遍历 child_mappings;`_find_mapping` 扩展为递归查找 child_mappings;新增 `struct_element` fast path | -| `table_reader.h` | `_build_table_filters_from_conjuncts` 扩展 slot_id 收集,识别 struct_element 内部引用的子字段 | -| 无需改动 parquet reader 内部 | 谓词的运行时执行路径不变 | +## 5. 当前实现状态 ---- +### 5.1 行级 Expr localization -## 2. 统计信息过滤:复杂类型叶子列无法参与剪枝 +已实现: -### 2.1 问题根因 +- `struct_element(VSlotRef(parent), literal child)` 链可以被识别为 nested path。 +- 行级表达式仍保留 `struct_element(file_struct_slot, field)` 形态,只改写 parent slot 到 file-local top-level block slot。 +- 不把 struct child 注册为独立 block slot,也不把 `struct_element` 改成 child `VSlotRef`。 -`parquet_statistics.cpp` 中有 4 处 `kind != PRIMITIVE` 检查,会在遇到复杂类型时直接跳过全部剪枝: +### 5.2 filter-only nested projection -| 函数 | 行号 | 跳过的剪枝类型 | -|---|---|---| -| `RowGroupPruneReason` | 566 | Min/max + Dictionary | -| `BloomFilterPruneReason` | 293 | Bloom filter | -| `select_ranges_for_filter` | 889 | Page index | -| `supports_dictionary_pruning` | 363 | Dictionary(间接) | +已实现: -根因是统计剪枝函数使用 `file_schema[column_filter.file_column_id]`(平铺索引),而 `file_column_id` 是顶级列索引(struct/list/map 的索引,其 `leaf_column_id = -1`)。 +- filter 引用的 struct child 会合并到同一个 top-level complex column 的 `FieldProjection.children`。 +- output child 顺序保持优先,filter-only child 追加到 read projection。 +- filter-only child 不加入 `ColumnMapping.child_mappings`,避免 table output materialization 把它当作输出字段。 +- `ColumnMapping` 保存 `original_file_type` / `original_file_children`,重复创建 split-local request 时可以从原始 file schema 重建 read projection。 -**DuckDB 的做法**:每个 leaf `ParquetColumnSchema` 有自己的 `column_index`(等价于 Doris 的 `leaf_column_id`),直接指向 `RowGroup.ColumnChunk[column_index]`。`PrepareRowGroupBuffer` 通过 `root_reader.GetChildReader(column_id)` 获取叶子 reader,`column_reader.Stats()` 返回该叶子列的统计信息。`StructColumnReader` 只是中间节点,其 `Stats()` 递归聚合子字段统计。 +### 5.3 nested file-layer pruning target -Doris 的 `leaf_column_id` 已经存在于每个叶子 `ParquetColumnSchema` 中。需要补齐的是:让统计剪枝函数能够从顶级列索引 + 子路径 → 定位叶子 `ParquetColumnSchema`。 +已实现: -### 2.2 实现方案 +- `FileColumnPredicateFilter` 保留 `file_column_id`,新增 `file_child_id_path`。 +- `file_child_id_path` 是 top-level file column 下的 file-local child field id path,不是 table id,也不是 ordinal。 +- mapper 会从 AND 语义下的 `struct_element(...) op literal` / `literal op struct_element(...)` 构造 nested file-layer pruning hint。 +- 不从 OR/NOT/任意函数子树中提取 pruning predicate,避免把非必要条件当成必需条件裁剪。 +- literal 转换到 file leaf type 失败、path 解析失败、leaf 不是 primitive 时,不生成 pruning hint。 -#### Step 1: 在 `FileColumnPredicateFilter` 中增加子字段路径 +### 5.4 Parquet leaf resolver and pruning -```cpp -struct FileColumnPredicateFilter { - ColumnId file_column_id = -1; // 顶级列索引(不变) - std::vector<int> child_field_path; // 新增:struct/map 内部子字段的 field_id 路径 - std::vector<std::shared_ptr<ColumnPredicate>> predicates; -}; -``` +已实现: + +- `ParquetStatisticsUtils::ResolvePredicateLeafSchema()` 统一解析 top-level 或 nested target。 +- 解析结果必须是 primitive leaf、`leaf_column_id >= 0` 且 `max_repetition_level == 0`。 +- row group min/max statistics 使用 resolved leaf schema。 +- dictionary pruning 使用 resolved leaf schema 和 leaf `ColumnChunk`,仍保持 string-like、dictionary-encoded、EQ/IN_LIST 限制。 +- bloom filter 使用 resolved leaf schema,仍保持 supported primitive type、EQ/IN_LIST/null 相关限制。 +- page index 使用 resolved leaf schema,只允许 non-repeated primitive leaf;LIST/MAP/repeated leaf 直接跳过 page range pruning。 + +## 6. 统计信息 / pruning 设计约束 -`child_field_path` 为空表示过滤顶级原始列(现有行为不变)。非空表示过滤复杂类型内部的叶子列: -- `s.id` → `child_field_path = [0]`(struct 的第 0 个子字段 id) -- `m.value` → 对于 MAP,key_value 是第 0 个子(struct),value 是 struct 的第 1 个子 → `child_field_path = [0, 1]` +### 6.1 leaf schema resolver -#### Step 2: 新增 schema tree 中的叶子定位函数 +当前 resolver: ```cpp -// parquet_column_schema.h -// 沿 child_field_path 从顶级列定位到叶子 ParquetColumnSchema -const ParquetColumnSchema* resolve_leaf_schema( - const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, - ColumnId file_column_id, - const std::vector<int>& child_field_path); +const ParquetColumnSchema* ParquetStatisticsUtils::ResolvePredicateLeafSchema( + const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, + const reader::FileColumnPredicateFilter& column_filter); ``` -实现沿 `children` 逐级递进,返回叶子 `ParquetColumnSchema*`。 +解析规则: -#### Step 3: 修改 4 处 PRIMITIVE 检查 +1. `target.file_column_id` 必须是合法 top-level file schema id。 +2. `target.file_child_id_path` 为空时,target schema 就是 top-level schema。 +3. path 非空时,逐层在 `ParquetColumnSchema::children` 中按 file-local `field_id` 匹配。 +4. 最终 schema 必须是 primitive leaf,且 `leaf_column_id >= 0`。 +5. 本轮只允许 `max_repetition_level == 0` 的 leaf。任何 LIST/MAP/repeated path 直接不剪枝。 -将 `schema[column_filter.file_column_id]` 替换为 `resolve_leaf_schema(schema, column_filter.file_column_id, column_filter.child_field_path)`。解析成功后,`leaf_column_id`、`descriptor`、`type` 均可直接使用,后续统计逻辑无需修改。 +这相当于 Doris 版本的 DuckDB `StructFilter::CheckStatistics()`:不是在 filter 对象里递归拿 child stats,而是在 pruning 层先把 nested target 解析到 Parquet leaf schema,再复用现有 primitive pruning 逻辑。 -```cpp -// 之前 -const auto& column_schema = *schema[column_filter.file_column_id]; -if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE) { return NONE; } - -// 之后 -const auto* leaf_schema = resolve_leaf_schema(schema, column_filter.file_column_id, - column_filter.child_field_path); -if (leaf_schema == nullptr || leaf_schema->kind != ParquetColumnSchemaKind::PRIMITIVE - || leaf_schema->leaf_column_id < 0) { return NONE; } -// 使用 leaf_schema 替代 column_schema -``` +### 6.2 支持范围 + +第一阶段支持: + +- top-level primitive column 的现有 pruning。 +- `STRUCT` / nested `STRUCT` 下的 primitive leaf: + - min/max statistics row group pruning。 + - string-like dictionary pruning。 + - supported primitive bloom filter pruning。 + - page index row range pruning。 + +第一阶段不支持: + +- `LIST` element predicate。 +- `MAP` key/value predicate。 +- repeated primitive / repeated group 的 leaf pruning。 +- page index pruning on repeated leaf。 +- complex child schema change 的完整 pruning 语义。 -#### Step 4: 在 `localize_filters` 中填充 `child_field_path` +### 6.3 pruning 类型处理 -当 `_find_mapping` 查找到的是子字段(在 child_mappings 中)时,沿 ColumnMapping 树向上回溯,构建 `child_field_path`。 +#### Row group min/max -### 2.3 限制 +将 `schema[column_filter.file_column_id]` 替换为 `resolve_predicate_leaf_schema()` 的结果。对 resolved leaf 调用现有 `TransformColumnStatistics()` 和 `CheckStatistics()`。 -以下场景的统计剪枝仍然不可用(当前和改为后均不可用,不在此方案范围): +#### Dictionary -- **LIST 元素**:`list[0] > 5` 形式的过滤不适用于行组级统计剪枝(元素级别的 min/max 不代表行组级别的 min/max) -- **MAP key/value**:`m['k'] > 5` 同理——映射到特定 key 的 value 没有行组级聚合统计 -- **嵌套 LIST**:`List<List<INT>>` 中内层元素的统计没有行组级意义 +dictionary pruning 只对 resolved primitive leaf 生效。现有 string-like 限制保持不变。无法读取 dictionary page 或 predicate 不支持时保留 row group。 -### 2.4 影响范围 +#### Bloom filter -| 文件 | 改动 | -|---|---| -| `file_reader.h` | `FileColumnPredicateFilter` 增加 `child_field_path` 字段 | -| `parquet_column_schema.h/.cpp` | 新增 `resolve_leaf_schema()` | -| `parquet_statistics.cpp` | 4 处 PRIMITIVE 检查改为 `resolve_leaf_schema()` + 叶子判断 | -| `column_mapper.cpp` | `localize_filters` 中为子字段 filter 填充 `child_field_path` | -| `parquet_reader.cpp` | `open()` 中的列验证适配含有 child_field_path 的 filter | +DuckDB 只对非 nested primitive reader 应用 bloom filter。Doris 当前通过 resolved leaf 接入 nested struct primitive leaf,但仍只处理 Arrow adapter 已支持且 predicate 可安全转换的 primitive 类型。不确定时保留 row group。 ---- +#### Page index -## 3. 实施顺序 +page index 对 repeated leaf 的 row range 语义复杂。本轮只允许 non-repeated primitive leaf。`STRUCT` 下 non-repeated primitive leaf 可以复用现有 page index range 逻辑;LIST/MAP/repeated leaf 直接跳过。 -建议分两个独立 PR: +## 7. 后续工作 -**PR 1: 谓词过滤**(Filter localization for nested fields) -- Step 1: `build_file_slot_rewrite_map` 递归 child_mappings -- Step 2: `localize_filters` 识别 struct_element,收集子字段 slot_id 并匹配 child_mappings -- Step 3: `struct_element` → 子字段 VSlotRef fast path -- 测试:`SELECT * FROM t WHERE s.id > 5` 验证谓词列被正确标记 +- 扩展 UT 覆盖 nested struct 多层 path、反向比较、OR 不提取、缺失 child 不下推。 +- 增加 nested string leaf dictionary pruning、nested page index pruning、nested bloom pruning 的真实 parquet fixture。 +- 支持从 `IN_PRED` 的 `struct_element(...) IN (...)` 构造 nested `IN_LIST` pruning hint。 +- schema change 场景下,把 table nested path 到 file nested path 的 mapping 入口收敛到 mapper,不让 file reader 理解 table/global schema。 +- LIST/MAP/repeated leaf 只有在 Dremel row semantics 和 row-range 语义明确后再接入 pruning。 -**PR 2: 统计信息过滤**(Statistics pruning for nested leaf columns) -- Step 1: `FileColumnPredicateFilter` 增加 `child_field_path` -- Step 2: `resolve_leaf_schema()` -- Step 3: 4 处 PRIMITIVE 检查改为路径解析 -- Step 4: `localize_filters` 填充 `child_field_path` -- 测试:验证 `s.id > 5` 能正确剪枝(准备只有 2 个 row group 的 parquet 文件,一个 `s.id` 全 < 5,一个全 > 5,验证前者被剪枝) +## 8. 需要避免的实现 -两个 PR 独立:PR 1 不依赖 PR 2(filter localization 可在没有统计剪枝时工作),但 PR 2 依赖 PR 1 的 `child_mappings` 遍历能力。 +- 不要把 struct child 注册成独立 `column_positions` block slot。 +- 不要把 `struct_element(...)` 改写成 child `VSlotRef`。 +- 不要把 `ColumnPredicate` 用于行级过滤。 +- 不要对 LIST/MAP/repeated leaf 做 row group/page pruning,除非后续有明确的 Dremel row semantics 证明。 +- 不要新增语义不清的 `child_field_path`。如果新增 path 字段,必须明确它是 file-local child field id path。 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
