BiteTheDDDDt commented on code in PR #62589: URL: https://github.com/apache/doris/pull/62589#discussion_r3310231274
########## be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp: ########## @@ -0,0 +1,875 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/runtime_filter/runtime_filter_partition_pruner.h" + +#include <gen_cpp/PlanNodes_types.h> + +#include <optional> +#include <unordered_set> +#include <utility> + +#include "core/block/block.h" +#include "core/column/column.h" +#include "core/column/column_nullable.h" +#include "core/data_type/data_type_nullable.h" +#include "core/field.h" +#include "exprs/bloom_filter_func.h" +#include "exprs/hybrid_set.h" +#include "exprs/runtime_filter_expr.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" +#include "exprs/vliteral.h" +#include "exprs/vslot_ref.h" +#include "runtime/descriptors.h" + +namespace doris { + +// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size) +// Complexity is inflated by macro expansion for each PrimitiveType case. +Status ParsedPartitionBoundaries::parse( + const std::vector<TPartitionBoundary>& boundaries, + const phmap::flat_hash_map<int, SlotDescriptor*>& slot_descs) { + for (const auto& tb : boundaries) { + if (!tb.__isset.partition_id || !tb.__isset.slot_id) { + return Status::InternalError( + "Runtime filter partition boundary must contain partition_id and slot_id"); + } + SlotId slot_id = tb.slot_id; + + auto slot_it = slot_descs.find(slot_id); + if (slot_it == slot_descs.end()) { + return Status::InternalError( + "Runtime filter partition boundary references unknown slot_id={}, " + "partition_id={}", + slot_id, tb.partition_id); + } + SlotDescriptor* slot = slot_it->second; + // Reuse the slot's pre-built DataType: walking through VLiteral here + // would cost a `DataTypeFactory::create_data_type(node)` heap allocation + // and a one-row `ColumnConst` allocation per boundary endpoint. With + // thousands of partitions that dominates BuildTasksTime. + const DataTypePtr& slot_type = slot->type(); + PrimitiveType ptype = slot_type->get_primitive_type(); + int precision = cast_set<int>(slot_type->get_precision()); + int scale = cast_set<int>(slot_type->get_scale()); + bool is_nullable = slot->is_nullable(); + + // Store slot data type for potential projection use + _slot_data_types[slot_id] = slot_type; + + ParsedBoundary boundary; + boundary.partition_id = tb.partition_id; + boundary.slot_id = slot_id; + boundary.is_nullable = is_nullable; + + bool parsed_ok = false; + +#define BUILD_BOUNDARY_CVR(NAME) \ + case TYPE_##NAME: { \ + using CppType = typename PrimitiveTypeTraits<TYPE_##NAME>::CppType; \ + bool is_list = tb.__isset.list_values && !tb.list_values.empty(); \ + bool is_range = tb.__isset.range_start || tb.__isset.range_end; \ + if (!is_list && !is_range) { \ + return Status::InternalError( \ + "Runtime filter partition boundary must be RANGE or LIST, " \ + "partition_id={}, slot_id={}", \ + tb.partition_id, slot_id); \ + } \ + ColumnValueRange<TYPE_##NAME> cvr(slot->col_name(), is_nullable, precision, scale); \ + /* Returns nullopt if `node` is a NULL literal; the caller then sets contain_null */ \ + /* on the CVR instead of trying to extract a typed value (which would dereference */ \ + /* a null data pointer for the non-string branch). */ \ + auto parse_texpr_node = [&](const TExprNode& node) -> std::optional<CppType> { \ + if (node.node_type == TExprNodeType::NULL_LITERAL) { \ + return std::nullopt; \ + } \ + /* `Field` value is copied into the CVR by `add_fixed_value` / */ \ + /* `add_range` (both take CppType by const-ref / by value), so the */ \ + /* temporary `Field`'s lifetime ending at this expression's full-statement */ \ + /* boundary is safe -- including for `String` payloads. */ \ + Field field = slot_type->get_field(node); \ + return std::make_optional<CppType>(field.get<TYPE_##NAME>()); \ + }; \ + if (is_list) { \ + auto empty_cvr = ColumnValueRange<TYPE_##NAME>::create_empty_column_value_range( \ + is_nullable, precision, scale); \ + bool list_has_null = false; \ + bool list_has_value = false; \ + for (const auto& node : tb.list_values) { \ + auto parsed = parse_texpr_node(node); \ + if (!parsed) { \ + list_has_null = true; \ + continue; \ + } \ + static_cast<void>(empty_cvr.add_fixed_value(*parsed)); \ + list_has_value = true; \ + } \ + if (list_has_value) { \ + cvr.intersection(empty_cvr); \ + } \ + if (list_has_null && is_nullable) { \ + /* Track NULL membership on ParsedBoundary; calling */ \ + /* cvr.set_contain_null(true) here would invoke */ \ + /* set_empty_value_range() and discard the concrete fixed */ \ + /* values we just inserted, turning {NULL, v} into a */ \ + /* NULL-only boundary. */ \ + boundary.contains_null = true; \ + if (!list_has_value) { \ + boundary.only_null = true; \ + } \ + } \ + } else { \ + if (tb.__isset.range_start) { \ + auto parsed = parse_texpr_node(tb.range_start); \ + if (parsed) { \ + static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, *parsed)); \ + } \ + } \ + if (tb.__isset.range_end) { \ + auto parsed = parse_texpr_node(tb.range_end); \ + if (parsed) { \ + /* Multi-column RANGE projection emits a CLOSED upper bound (see */ \ + /* TPartitionBoundary.range_end_inclusive comment); single-column RANGE */ \ + /* keeps the natural OPEN upper bound matching Doris semantics. */ \ + SQLFilterOp upper_op = \ + (tb.__isset.range_end_inclusive && tb.range_end_inclusive) \ + ? FILTER_LESS_OR_EQUAL \ + : FILTER_LESS; \ + static_cast<void>(cvr.add_range(upper_op, *parsed)); \ + } \ + } \ + } \ + boundary.boundary_cvr = std::move(cvr); \ + parsed_ok = true; \ + break; \ + } + + switch (ptype) { + BUILD_BOUNDARY_CVR(TINYINT) + BUILD_BOUNDARY_CVR(SMALLINT) + BUILD_BOUNDARY_CVR(INT) + BUILD_BOUNDARY_CVR(BIGINT) + BUILD_BOUNDARY_CVR(LARGEINT) + BUILD_BOUNDARY_CVR(FLOAT) + BUILD_BOUNDARY_CVR(DOUBLE) + BUILD_BOUNDARY_CVR(CHAR) + BUILD_BOUNDARY_CVR(DATE) + BUILD_BOUNDARY_CVR(DATETIME) + BUILD_BOUNDARY_CVR(DATEV2) + BUILD_BOUNDARY_CVR(DATETIMEV2) + BUILD_BOUNDARY_CVR(TIMESTAMPTZ) + BUILD_BOUNDARY_CVR(VARCHAR) + BUILD_BOUNDARY_CVR(STRING) + BUILD_BOUNDARY_CVR(DECIMAL32) + BUILD_BOUNDARY_CVR(DECIMAL64) + BUILD_BOUNDARY_CVR(DECIMAL128I) + BUILD_BOUNDARY_CVR(DECIMAL256) + BUILD_BOUNDARY_CVR(DECIMALV2) + BUILD_BOUNDARY_CVR(BOOLEAN) + BUILD_BOUNDARY_CVR(IPV4) + BUILD_BOUNDARY_CVR(IPV6) + default: + break; + } +#undef BUILD_BOUNDARY_CVR + + if (parsed_ok) { + _slot_to_boundaries[slot_id].push_back(std::move(boundary)); + } else { + return Status::InternalError( + "Runtime filter partition boundary has unsupported type, partition_id={}, " + "slot_id={}, type={}", + tb.partition_id, slot_id, slot_type->get_name()); + } + } + + // Count distinct partition IDs across all boundaries. + if (!_slot_to_boundaries.empty()) { + phmap::flat_hash_set<int64_t> all_partition_ids; + for (const auto& [_, slot_boundaries] : _slot_to_boundaries) { + for (const auto& pb : slot_boundaries) { + all_partition_ids.insert(pb.partition_id); + } + } + _total_partition_count = static_cast<int64_t>(all_partition_ids.size()); + } + return Status::OK(); +} +// NOLINTEND(readability-function-cognitive-complexity,readability-function-size) + +static const VSlotRef* find_unique_slot_ref(const VExpr* expr) { + if (!expr) { + return nullptr; + } + if (expr->is_slot_ref()) { + return assert_cast<const VSlotRef*>(expr); + } + const VSlotRef* found = nullptr; + for (const auto& child : expr->children()) { + const VSlotRef* c = find_unique_slot_ref(child.get()); + if (c) { + if (found) { + return nullptr; // multiple slot refs, can't handle + } + found = c; + } + } + return found; +} + +// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size) +Status ParsedPartitionBoundaries::get_or_compute_projected_boundaries( + int filter_id, const VExprSPtr& target_expr, SlotId leaf_slot_id, int leaf_column_id, + const std::unordered_map<int64_t, TTargetExprMonotonicity::type>& partition_directions, + VExprContext* ctx, std::shared_ptr<const std::vector<ParsedBoundary>>* output) const { + { + std::lock_guard<std::mutex> lock(_projected_boundaries_mutex); + auto it = _projected_boundaries_by_filter_id.find(filter_id); + if (it != _projected_boundaries_by_filter_id.end()) { + *output = it->second; + return Status::OK(); + } + } + + std::vector<ParsedBoundary> projected; + auto store_projected_boundaries = [&](std::vector<ParsedBoundary>&& boundaries) { + auto cached = std::make_shared<const std::vector<ParsedBoundary>>(std::move(boundaries)); + std::lock_guard<std::mutex> lock(_projected_boundaries_mutex); + auto [it, inserted] = _projected_boundaries_by_filter_id.emplace(filter_id, cached); + *output = inserted ? cached : it->second; + return Status::OK(); + }; + + auto slot_boundaries_it = _slot_to_boundaries.find(leaf_slot_id); + if (slot_boundaries_it == _slot_to_boundaries.end()) { + return store_projected_boundaries(std::move(projected)); + } + + const auto& orig_boundaries = slot_boundaries_it->second; + if (orig_boundaries.empty()) { + return store_projected_boundaries(std::move(projected)); + } + + std::unordered_set<int64_t> boundary_partition_ids; + boundary_partition_ids.reserve(orig_boundaries.size()); + for (const auto& boundary : orig_boundaries) { + boundary_partition_ids.insert(boundary.partition_id); + } + for (const auto& [partition_id, direction] : partition_directions) { + if (direction == TTargetExprMonotonicity::NON_MONOTONIC) { + return Status::InternalError( + "Runtime filter partition pruning received NON_MONOTONIC partition " + "metadata, filter_id={}, partition_id={}", + filter_id, partition_id); + } + if (!boundary_partition_ids.contains(partition_id)) { + return Status::InternalError( + "Runtime filter partition pruning received monotonicity for a partition " + "without boundary, filter_id={}, partition_id={}, slot_id={}", + filter_id, partition_id, leaf_slot_id); + } + } + + if (target_expr->is_slot_ref()) { + auto* slot_ref = assert_cast<VSlotRef*>(target_expr.get()); + if (slot_ref->slot_id() != leaf_slot_id) { + return Status::InternalError( + "Runtime filter partition pruning SlotRef target mismatch, filter_id={}, " + "target_slot_id={}, leaf_slot_id={}", + filter_id, slot_ref->slot_id(), leaf_slot_id); + } + std::vector<ParsedBoundary> slot_boundaries; + slot_boundaries.reserve(orig_boundaries.size()); + for (const auto& boundary : orig_boundaries) { + auto direction_it = partition_directions.find(boundary.partition_id); + if (direction_it == partition_directions.end()) { + continue; + } + if (direction_it->second != TTargetExprMonotonicity::MONOTONIC_INCREASING) { + return Status::InternalError( + "Runtime filter partition pruning SlotRef target must use increasing " + "monotonicity, filter_id={}, partition_id={}, monotonicity={}", + filter_id, boundary.partition_id, direction_it->second); + } + slot_boundaries.emplace_back(boundary); + } + return store_projected_boundaries(std::move(slot_boundaries)); + } + + std::vector<std::pair<size_t, TTargetExprMonotonicity::type>> selected_boundaries; + selected_boundaries.reserve(orig_boundaries.size()); + for (size_t i = 0; i < orig_boundaries.size(); ++i) { + auto it = partition_directions.find(orig_boundaries[i].partition_id); + if (it != partition_directions.end() && + it->second != TTargetExprMonotonicity::NON_MONOTONIC) { + selected_boundaries.emplace_back(i, it->second); + } + } + if (selected_boundaries.empty()) { + return store_projected_boundaries(std::move(projected)); + } + + std::vector<std::pair<size_t, TTargetExprMonotonicity::type>> projectable_boundaries; + projectable_boundaries.reserve(selected_boundaries.size()); + for (const auto& selected_boundary : selected_boundaries) { + const auto& boundary = orig_boundaries[selected_boundary.first]; + if (!boundary.only_null && !boundary.contains_null) { + projectable_boundaries.emplace_back(selected_boundary); + } + } + selected_boundaries.swap(projectable_boundaries); + if (selected_boundaries.empty()) { + return store_projected_boundaries(std::move(projected)); + } + + auto slot_type_it = _slot_data_types.find(leaf_slot_id); + if (slot_type_it == _slot_data_types.end()) { + return Status::InternalError( + "Runtime filter partition pruning target slot has no boundary type, filter_id={}, " + "slot_id={}, target_expr={}", + filter_id, leaf_slot_id, target_expr->expr_name()); + } + + // LIST partitions require regrouping projected values per partition and + // should not be marked as expression-prunable by FE yet. + for (const auto& selected_boundary : selected_boundaries) { + bool is_list = false; + std::visit([&](const auto& cvr) { is_list = cvr.is_fixed_value_range(); }, + orig_boundaries[selected_boundary.first].boundary_cvr); + if (is_list) { + return Status::InternalError( + "Runtime filter partition pruning expression target does not support LIST " Review Comment: 之前list没支持expr,现在顺便加上了 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
