HappenLee commented on code in PR #62589: URL: https://github.com/apache/doris/pull/62589#discussion_r3308916954
########## be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp: ########## @@ -0,0 +1,875 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/runtime_filter/runtime_filter_partition_pruner.h" + +#include <gen_cpp/PlanNodes_types.h> + +#include <optional> +#include <unordered_set> +#include <utility> + +#include "core/block/block.h" +#include "core/column/column.h" +#include "core/column/column_nullable.h" +#include "core/data_type/data_type_nullable.h" +#include "core/field.h" +#include "exprs/bloom_filter_func.h" +#include "exprs/hybrid_set.h" +#include "exprs/runtime_filter_expr.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" +#include "exprs/vliteral.h" +#include "exprs/vslot_ref.h" +#include "runtime/descriptors.h" + +namespace doris { + +// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size) +// Complexity is inflated by macro expansion for each PrimitiveType case. +Status ParsedPartitionBoundaries::parse( + const std::vector<TPartitionBoundary>& boundaries, + const phmap::flat_hash_map<int, SlotDescriptor*>& slot_descs) { + for (const auto& tb : boundaries) { + if (!tb.__isset.partition_id || !tb.__isset.slot_id) { + return Status::InternalError( + "Runtime filter partition boundary must contain partition_id and slot_id"); + } + SlotId slot_id = tb.slot_id; + + auto slot_it = slot_descs.find(slot_id); + if (slot_it == slot_descs.end()) { + return Status::InternalError( + "Runtime filter partition boundary references unknown slot_id={}, " + "partition_id={}", + slot_id, tb.partition_id); + } + SlotDescriptor* slot = slot_it->second; + // Reuse the slot's pre-built DataType: walking through VLiteral here + // would cost a `DataTypeFactory::create_data_type(node)` heap allocation + // and a one-row `ColumnConst` allocation per boundary endpoint. With + // thousands of partitions that dominates BuildTasksTime. + const DataTypePtr& slot_type = slot->type(); + PrimitiveType ptype = slot_type->get_primitive_type(); + int precision = cast_set<int>(slot_type->get_precision()); + int scale = cast_set<int>(slot_type->get_scale()); + bool is_nullable = slot->is_nullable(); + + // Store slot data type for potential projection use + _slot_data_types[slot_id] = slot_type; + + ParsedBoundary boundary; + boundary.partition_id = tb.partition_id; + boundary.slot_id = slot_id; + boundary.is_nullable = is_nullable; + + bool parsed_ok = false; + +#define BUILD_BOUNDARY_CVR(NAME) \ + case TYPE_##NAME: { \ + using CppType = typename PrimitiveTypeTraits<TYPE_##NAME>::CppType; \ + bool is_list = tb.__isset.list_values && !tb.list_values.empty(); \ + bool is_range = tb.__isset.range_start || tb.__isset.range_end; \ + if (!is_list && !is_range) { \ + return Status::InternalError( \ + "Runtime filter partition boundary must be RANGE or LIST, " \ + "partition_id={}, slot_id={}", \ + tb.partition_id, slot_id); \ + } \ + ColumnValueRange<TYPE_##NAME> cvr(slot->col_name(), is_nullable, precision, scale); \ + /* Returns nullopt if `node` is a NULL literal; the caller then sets contain_null */ \ + /* on the CVR instead of trying to extract a typed value (which would dereference */ \ + /* a null data pointer for the non-string branch). */ \ + auto parse_texpr_node = [&](const TExprNode& node) -> std::optional<CppType> { \ + if (node.node_type == TExprNodeType::NULL_LITERAL) { \ + return std::nullopt; \ + } \ + /* `Field` value is copied into the CVR by `add_fixed_value` / */ \ + /* `add_range` (both take CppType by const-ref / by value), so the */ \ + /* temporary `Field`'s lifetime ending at this expression's full-statement */ \ + /* boundary is safe -- including for `String` payloads. */ \ + Field field = slot_type->get_field(node); \ + return std::make_optional<CppType>(field.get<TYPE_##NAME>()); \ + }; \ + if (is_list) { \ + auto empty_cvr = ColumnValueRange<TYPE_##NAME>::create_empty_column_value_range( \ + is_nullable, precision, scale); \ + bool list_has_null = false; \ + bool list_has_value = false; \ + for (const auto& node : tb.list_values) { \ + auto parsed = parse_texpr_node(node); \ + if (!parsed) { \ + list_has_null = true; \ + continue; \ + } \ + static_cast<void>(empty_cvr.add_fixed_value(*parsed)); \ + list_has_value = true; \ + } \ + if (list_has_value) { \ + cvr.intersection(empty_cvr); \ + } \ + if (list_has_null && is_nullable) { \ + /* Track NULL membership on ParsedBoundary; calling */ \ + /* cvr.set_contain_null(true) here would invoke */ \ + /* set_empty_value_range() and discard the concrete fixed */ \ + /* values we just inserted, turning {NULL, v} into a */ \ + /* NULL-only boundary. */ \ + boundary.contains_null = true; \ + if (!list_has_value) { \ + boundary.only_null = true; \ + } \ + } \ + } else { \ + if (tb.__isset.range_start) { \ + auto parsed = parse_texpr_node(tb.range_start); \ + if (parsed) { \ + static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, *parsed)); \ + } \ + } \ + if (tb.__isset.range_end) { \ + auto parsed = parse_texpr_node(tb.range_end); \ + if (parsed) { \ + /* Multi-column RANGE projection emits a CLOSED upper bound (see */ \ + /* TPartitionBoundary.range_end_inclusive comment); single-column RANGE */ \ + /* keeps the natural OPEN upper bound matching Doris semantics. */ \ + SQLFilterOp upper_op = \ + (tb.__isset.range_end_inclusive && tb.range_end_inclusive) \ + ? FILTER_LESS_OR_EQUAL \ + : FILTER_LESS; \ + static_cast<void>(cvr.add_range(upper_op, *parsed)); \ + } \ + } \ + } \ + boundary.boundary_cvr = std::move(cvr); \ + parsed_ok = true; \ + break; \ + } + + switch (ptype) { + BUILD_BOUNDARY_CVR(TINYINT) + BUILD_BOUNDARY_CVR(SMALLINT) + BUILD_BOUNDARY_CVR(INT) + BUILD_BOUNDARY_CVR(BIGINT) + BUILD_BOUNDARY_CVR(LARGEINT) + BUILD_BOUNDARY_CVR(FLOAT) + BUILD_BOUNDARY_CVR(DOUBLE) + BUILD_BOUNDARY_CVR(CHAR) + BUILD_BOUNDARY_CVR(DATE) + BUILD_BOUNDARY_CVR(DATETIME) + BUILD_BOUNDARY_CVR(DATEV2) + BUILD_BOUNDARY_CVR(DATETIMEV2) + BUILD_BOUNDARY_CVR(TIMESTAMPTZ) + BUILD_BOUNDARY_CVR(VARCHAR) + BUILD_BOUNDARY_CVR(STRING) + BUILD_BOUNDARY_CVR(DECIMAL32) + BUILD_BOUNDARY_CVR(DECIMAL64) + BUILD_BOUNDARY_CVR(DECIMAL128I) + BUILD_BOUNDARY_CVR(DECIMAL256) + BUILD_BOUNDARY_CVR(DECIMALV2) + BUILD_BOUNDARY_CVR(BOOLEAN) + BUILD_BOUNDARY_CVR(IPV4) + BUILD_BOUNDARY_CVR(IPV6) + default: + break; + } +#undef BUILD_BOUNDARY_CVR + + if (parsed_ok) { + _slot_to_boundaries[slot_id].push_back(std::move(boundary)); + } else { + return Status::InternalError( + "Runtime filter partition boundary has unsupported type, partition_id={}, " + "slot_id={}, type={}", + tb.partition_id, slot_id, slot_type->get_name()); + } + } + + // Count distinct partition IDs across all boundaries. + if (!_slot_to_boundaries.empty()) { + phmap::flat_hash_set<int64_t> all_partition_ids; + for (const auto& [_, slot_boundaries] : _slot_to_boundaries) { + for (const auto& pb : slot_boundaries) { + all_partition_ids.insert(pb.partition_id); + } + } + _total_partition_count = static_cast<int64_t>(all_partition_ids.size()); + } + return Status::OK(); +} +// NOLINTEND(readability-function-cognitive-complexity,readability-function-size) + +static const VSlotRef* find_unique_slot_ref(const VExpr* expr) { + if (!expr) { + return nullptr; + } + if (expr->is_slot_ref()) { + return assert_cast<const VSlotRef*>(expr); + } + const VSlotRef* found = nullptr; + for (const auto& child : expr->children()) { + const VSlotRef* c = find_unique_slot_ref(child.get()); + if (c) { + if (found) { + return nullptr; // multiple slot refs, can't handle + } + found = c; + } + } + return found; +} + +// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size) +Status ParsedPartitionBoundaries::get_or_compute_projected_boundaries( + int filter_id, const VExprSPtr& target_expr, SlotId leaf_slot_id, int leaf_column_id, + const std::unordered_map<int64_t, TTargetExprMonotonicity::type>& partition_directions, + VExprContext* ctx, std::shared_ptr<const std::vector<ParsedBoundary>>* output) const { + { + std::lock_guard<std::mutex> lock(_projected_boundaries_mutex); + auto it = _projected_boundaries_by_filter_id.find(filter_id); + if (it != _projected_boundaries_by_filter_id.end()) { + *output = it->second; + return Status::OK(); + } + } + + std::vector<ParsedBoundary> projected; + auto store_projected_boundaries = [&](std::vector<ParsedBoundary>&& boundaries) { + auto cached = std::make_shared<const std::vector<ParsedBoundary>>(std::move(boundaries)); + std::lock_guard<std::mutex> lock(_projected_boundaries_mutex); + auto [it, inserted] = _projected_boundaries_by_filter_id.emplace(filter_id, cached); + *output = inserted ? cached : it->second; + return Status::OK(); + }; + + auto slot_boundaries_it = _slot_to_boundaries.find(leaf_slot_id); + if (slot_boundaries_it == _slot_to_boundaries.end()) { + return store_projected_boundaries(std::move(projected)); + } + + const auto& orig_boundaries = slot_boundaries_it->second; + if (orig_boundaries.empty()) { + return store_projected_boundaries(std::move(projected)); + } + + std::unordered_set<int64_t> boundary_partition_ids; + boundary_partition_ids.reserve(orig_boundaries.size()); + for (const auto& boundary : orig_boundaries) { + boundary_partition_ids.insert(boundary.partition_id); + } + for (const auto& [partition_id, direction] : partition_directions) { + if (direction == TTargetExprMonotonicity::NON_MONOTONIC) { + return Status::InternalError( + "Runtime filter partition pruning received NON_MONOTONIC partition " + "metadata, filter_id={}, partition_id={}", + filter_id, partition_id); + } + if (!boundary_partition_ids.contains(partition_id)) { + return Status::InternalError( + "Runtime filter partition pruning received monotonicity for a partition " + "without boundary, filter_id={}, partition_id={}, slot_id={}", + filter_id, partition_id, leaf_slot_id); + } + } + + if (target_expr->is_slot_ref()) { + auto* slot_ref = assert_cast<VSlotRef*>(target_expr.get()); + if (slot_ref->slot_id() != leaf_slot_id) { + return Status::InternalError( + "Runtime filter partition pruning SlotRef target mismatch, filter_id={}, " + "target_slot_id={}, leaf_slot_id={}", + filter_id, slot_ref->slot_id(), leaf_slot_id); + } + std::vector<ParsedBoundary> slot_boundaries; + slot_boundaries.reserve(orig_boundaries.size()); + for (const auto& boundary : orig_boundaries) { + auto direction_it = partition_directions.find(boundary.partition_id); + if (direction_it == partition_directions.end()) { + continue; + } + if (direction_it->second != TTargetExprMonotonicity::MONOTONIC_INCREASING) { + return Status::InternalError( + "Runtime filter partition pruning SlotRef target must use increasing " + "monotonicity, filter_id={}, partition_id={}, monotonicity={}", + filter_id, boundary.partition_id, direction_it->second); + } + slot_boundaries.emplace_back(boundary); + } + return store_projected_boundaries(std::move(slot_boundaries)); + } + + std::vector<std::pair<size_t, TTargetExprMonotonicity::type>> selected_boundaries; + selected_boundaries.reserve(orig_boundaries.size()); + for (size_t i = 0; i < orig_boundaries.size(); ++i) { + auto it = partition_directions.find(orig_boundaries[i].partition_id); + if (it != partition_directions.end() && + it->second != TTargetExprMonotonicity::NON_MONOTONIC) { + selected_boundaries.emplace_back(i, it->second); + } + } + if (selected_boundaries.empty()) { + return store_projected_boundaries(std::move(projected)); + } + + std::vector<std::pair<size_t, TTargetExprMonotonicity::type>> projectable_boundaries; + projectable_boundaries.reserve(selected_boundaries.size()); + for (const auto& selected_boundary : selected_boundaries) { + const auto& boundary = orig_boundaries[selected_boundary.first]; + if (!boundary.only_null && !boundary.contains_null) { + projectable_boundaries.emplace_back(selected_boundary); + } + } + selected_boundaries.swap(projectable_boundaries); + if (selected_boundaries.empty()) { + return store_projected_boundaries(std::move(projected)); + } + + auto slot_type_it = _slot_data_types.find(leaf_slot_id); + if (slot_type_it == _slot_data_types.end()) { + return Status::InternalError( + "Runtime filter partition pruning target slot has no boundary type, filter_id={}, " + "slot_id={}, target_expr={}", + filter_id, leaf_slot_id, target_expr->expr_name()); + } + + // LIST partitions require regrouping projected values per partition and + // should not be marked as expression-prunable by FE yet. + for (const auto& selected_boundary : selected_boundaries) { + bool is_list = false; + std::visit([&](const auto& cvr) { is_list = cvr.is_fixed_value_range(); }, + orig_boundaries[selected_boundary.first].boundary_cvr); + if (is_list) { + return Status::InternalError( + "Runtime filter partition pruning expression target does not support LIST " + "partition boundaries, filter_id={}, partition_id={}, target_expr={}", + filter_id, orig_boundaries[selected_boundary.first].partition_id, + target_expr->expr_name()); + } + } + + const DataTypePtr& input_type = slot_type_it->second; + bool input_is_nullable = input_type->is_nullable(); + DataTypePtr inner_type = input_is_nullable ? remove_nullable(input_type) : input_type; + PrimitiveType input_ptype = input_type->get_primitive_type(); + + size_t N = selected_boundaries.size(); + std::vector<bool> lo_open(N, false); + std::vector<bool> hi_open(N, false); + std::vector<size_t> lo_result_row(N, 0); + std::vector<size_t> hi_result_row(N, 0); + size_t lo_row_count = 0; + size_t hi_row_count = 0; + + // Build input blocks for lo and hi + Block lo_block; + Block hi_block; + + // Pad columns 0..leaf_column_id-1 with placeholder columns so the leaf + // VSlotRef (whose column_id is leaf_column_id) reads our typed column. + auto add_dummy_columns = [&](Block& block, size_t row_count) { + for (int col_idx = 0; col_idx < leaf_column_id; ++col_idx) { + auto col = ColumnUInt8::create(row_count, 0); + block.insert({std::move(col), std::make_shared<DataTypeUInt8>(), + fmt::format("dummy_{}", col_idx)}); + } + }; + + // Macro-dispatch on input PrimitiveType to build finite RANGE endpoint + // columns. Open endpoints are not executed through target_expr; they stay + // open after projection and swap sides for monotonic decreasing targets. + bool input_built = false; +#define BUILD_INPUT_COLUMNS(INPUT_PT) \ + case TYPE_##INPUT_PT: { \ + using InCol = typename PrimitiveTypeTraits<TYPE_##INPUT_PT>::ColumnType; \ + MutableColumnPtr lo_inner_base = inner_type->create_column(); \ + MutableColumnPtr hi_inner_base = inner_type->create_column(); \ + auto* lo_inner = assert_cast<InCol*>(lo_inner_base.get()); \ + auto* hi_inner = assert_cast<InCol*>(hi_inner_base.get()); \ + lo_inner->reserve(N); \ + hi_inner->reserve(N); \ + auto lo_nulls = ColumnUInt8::create(); \ + auto hi_nulls = ColumnUInt8::create(); \ + lo_nulls->reserve(N); \ + hi_nulls->reserve(N); \ + for (size_t i = 0; i < N; ++i) { \ + const auto& boundary = orig_boundaries[selected_boundaries[i].first]; \ + const auto* cvr = \ + std::get_if<ColumnValueRange<TYPE_##INPUT_PT>>(&boundary.boundary_cvr); \ + DCHECK(cvr != nullptr); \ + DCHECK(!boundary.only_null); \ + DCHECK(!boundary.contains_null); \ + lo_open[i] = cvr->is_low_value_minimum(); \ + hi_open[i] = cvr->is_high_value_maximum(); \ + if (!lo_open[i]) { \ + lo_result_row[i] = lo_row_count++; \ + lo_inner->insert_value(cvr->get_range_min_value()); \ + lo_nulls->get_data().push_back(0); \ + } \ + if (!hi_open[i]) { \ + hi_result_row[i] = hi_row_count++; \ + hi_inner->insert_value(cvr->get_range_max_value()); \ + hi_nulls->get_data().push_back(0); \ + } \ + } \ + add_dummy_columns(lo_block, lo_row_count); \ + add_dummy_columns(hi_block, hi_row_count); \ + if (input_is_nullable) { \ + auto lo_col = ColumnNullable::create(std::move(lo_inner_base), std::move(lo_nulls)); \ + auto hi_col = ColumnNullable::create(std::move(hi_inner_base), std::move(hi_nulls)); \ + lo_block.insert({std::move(lo_col), input_type, "leaf_slot"}); \ + hi_block.insert({std::move(hi_col), input_type, "leaf_slot"}); \ + } else { \ + lo_block.insert({std::move(lo_inner_base), input_type, "leaf_slot"}); \ + hi_block.insert({std::move(hi_inner_base), input_type, "leaf_slot"}); \ + } \ + input_built = true; \ + break; \ + } + + switch (input_ptype) { + BUILD_INPUT_COLUMNS(TINYINT) + BUILD_INPUT_COLUMNS(SMALLINT) + BUILD_INPUT_COLUMNS(INT) + BUILD_INPUT_COLUMNS(BIGINT) + BUILD_INPUT_COLUMNS(LARGEINT) + BUILD_INPUT_COLUMNS(FLOAT) + BUILD_INPUT_COLUMNS(DOUBLE) + BUILD_INPUT_COLUMNS(CHAR) + BUILD_INPUT_COLUMNS(DATE) + BUILD_INPUT_COLUMNS(DATETIME) + BUILD_INPUT_COLUMNS(DATEV2) + BUILD_INPUT_COLUMNS(DATETIMEV2) + BUILD_INPUT_COLUMNS(TIMESTAMPTZ) + BUILD_INPUT_COLUMNS(VARCHAR) + BUILD_INPUT_COLUMNS(STRING) + BUILD_INPUT_COLUMNS(DECIMAL32) + BUILD_INPUT_COLUMNS(DECIMAL64) + BUILD_INPUT_COLUMNS(DECIMAL128I) + BUILD_INPUT_COLUMNS(DECIMAL256) + BUILD_INPUT_COLUMNS(DECIMALV2) + BUILD_INPUT_COLUMNS(BOOLEAN) + BUILD_INPUT_COLUMNS(IPV4) + BUILD_INPUT_COLUMNS(IPV6) + default: + break; + } +#undef BUILD_INPUT_COLUMNS + + if (!input_built) { + return Status::InternalError( + "Runtime filter partition pruning expression target has unsupported input type, " + "filter_id={}, input_type={}, target_expr={}", + filter_id, input_type->get_name(), target_expr->expr_name()); + } + + int lo_result_id = -1; + int hi_result_id = -1; + ColumnPtr lo_result_col; + ColumnPtr hi_result_col; + if (lo_row_count > 0) { + RETURN_IF_ERROR(target_expr->execute(ctx, &lo_block, &lo_result_id)); + if (lo_result_id < 0) { + return Status::InternalError( + "Runtime filter partition pruning failed to project lower boundary, " + "filter_id={}, target_expr={}", + filter_id, target_expr->expr_name()); + } + lo_result_col = lo_block.get_by_position(lo_result_id).column; + } + if (hi_row_count > 0) { + RETURN_IF_ERROR(target_expr->execute(ctx, &hi_block, &hi_result_id)); + if (hi_result_id < 0) { + return Status::InternalError( + "Runtime filter partition pruning failed to project upper boundary, " + "filter_id={}, target_expr={}", + filter_id, target_expr->expr_name()); + } + hi_result_col = hi_block.get_by_position(hi_result_id).column; + } + int out_precision = cast_set<int>(target_expr->data_type()->get_precision()); + int out_scale = cast_set<int>(target_expr->data_type()->get_scale()); + bool out_nullable = target_expr->data_type()->is_nullable(); + PrimitiveType output_ptype = target_expr->data_type()->get_primitive_type(); + + projected.reserve(N); + + bool output_built = false; +#define BUILD_PROJECTED_CVR(OUTPUT_PT) \ + case TYPE_##OUTPUT_PT: { \ + using OutputCppType = typename PrimitiveTypeTraits<TYPE_##OUTPUT_PT>::CppType; \ + auto projected_value = [](const ColumnPtr& column, size_t row) -> OutputCppType { \ + Field field = (*column)[row]; \ + return field.get<TYPE_##OUTPUT_PT>(); \ + }; \ + for (size_t i = 0; i < N; ++i) { \ + const auto& orig_boundary = orig_boundaries[selected_boundaries[i].first]; \ + TTargetExprMonotonicity::type direction = selected_boundaries[i].second; \ + if ((!lo_open[i] && lo_result_col->is_null_at(lo_result_row[i])) || \ + (!hi_open[i] && hi_result_col->is_null_at(hi_result_row[i]))) { \ + continue; \ + } \ + ColumnValueRange<TYPE_##OUTPUT_PT> cvr("", out_nullable, out_precision, out_scale); \ + if (direction == TTargetExprMonotonicity::MONOTONIC_DECREASING) { \ + if (!hi_open[i]) { \ + auto hi_projected = projected_value(hi_result_col, hi_result_row[i]); \ + static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, hi_projected)); \ + } \ + if (!lo_open[i]) { \ + auto lo_projected = projected_value(lo_result_col, lo_result_row[i]); \ + static_cast<void>(cvr.add_range(FILTER_LESS_OR_EQUAL, lo_projected)); \ + } \ + } else { \ + if (!lo_open[i]) { \ + auto lo_projected = projected_value(lo_result_col, lo_result_row[i]); \ + static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, lo_projected)); \ + } \ + if (!hi_open[i]) { \ + auto hi_projected = projected_value(hi_result_col, hi_result_row[i]); \ + static_cast<void>(cvr.add_range(FILTER_LESS_OR_EQUAL, hi_projected)); \ + } \ + } \ + ParsedBoundary projected_boundary; \ + projected_boundary.partition_id = orig_boundary.partition_id; \ + projected_boundary.slot_id = leaf_slot_id; \ + projected_boundary.is_nullable = out_nullable; \ + projected_boundary.only_null = orig_boundary.only_null; \ + projected_boundary.contains_null = orig_boundary.contains_null; \ + projected_boundary.boundary_cvr = std::move(cvr); \ + projected.emplace_back(std::move(projected_boundary)); \ + } \ + output_built = true; \ + break; \ + } + + switch (output_ptype) { + BUILD_PROJECTED_CVR(TINYINT) + BUILD_PROJECTED_CVR(SMALLINT) + BUILD_PROJECTED_CVR(INT) + BUILD_PROJECTED_CVR(BIGINT) + BUILD_PROJECTED_CVR(LARGEINT) + BUILD_PROJECTED_CVR(FLOAT) + BUILD_PROJECTED_CVR(DOUBLE) + BUILD_PROJECTED_CVR(CHAR) + BUILD_PROJECTED_CVR(DATE) + BUILD_PROJECTED_CVR(DATETIME) + BUILD_PROJECTED_CVR(DATEV2) + BUILD_PROJECTED_CVR(DATETIMEV2) + BUILD_PROJECTED_CVR(TIMESTAMPTZ) + BUILD_PROJECTED_CVR(VARCHAR) + BUILD_PROJECTED_CVR(STRING) + BUILD_PROJECTED_CVR(DECIMAL32) + BUILD_PROJECTED_CVR(DECIMAL64) + BUILD_PROJECTED_CVR(DECIMAL128I) + BUILD_PROJECTED_CVR(DECIMAL256) + BUILD_PROJECTED_CVR(DECIMALV2) + BUILD_PROJECTED_CVR(BOOLEAN) + BUILD_PROJECTED_CVR(IPV4) + BUILD_PROJECTED_CVR(IPV6) + default: + break; + } + +#undef BUILD_PROJECTED_CVR + + if (!output_built) { + return Status::InternalError( + "Runtime filter partition pruning expression target has unsupported output type, " + "filter_id={}, output_type={}, target_expr={}", + filter_id, target_expr->data_type()->get_name(), target_expr->expr_name()); + } + + return store_projected_boundaries(std::move(projected)); +} +// NOLINTEND(readability-function-cognitive-complexity,readability-function-size) + +static SQLFilterOp convert_opcode_to_filter_op(TExprOpcode::type op) { + switch (op) { + case TExprOpcode::LE: + return FILTER_LESS_OR_EQUAL; + case TExprOpcode::LT: + return FILTER_LESS; + case TExprOpcode::GE: + return FILTER_LARGER_OR_EQUAL; + case TExprOpcode::GT: + return FILTER_LARGER; + default: + return FILTER_IN; // sentinel: caller should skip + } +} + +// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size) +void RuntimeFilterPartitionPruner::_try_prune_by_single_rf( + const std::vector<ParsedBoundary>& boundaries, const VExprSPtr& impl, + phmap::flat_hash_set<int64_t>& newly_pruned) { + // Pre-compute whether the RF "matches NULL" -- i.e. whether the RF would + // accept a row whose probe value is NULL. The signal differs by RF impl: + // * IN filter → HybridSet::contain_null() + // * Bloom filter → BloomFilterFuncBase::contain_null() + // * MinMax filter → encoded via the BinaryPredicate node_type: + // NULL_AWARE_BINARY_PRED means the build side had a + // NULL key and a null-safe equal join asked NULL to + // match NULL; plain BINARY_PRED never matches NULL. + // FilterBase::contain_null() already folds in `_null_aware`, so we only + // get a true result when the build side is actually null-aware AND + // produced a NULL value. + bool rf_contains_null = false; + if (auto hybrid_set = impl->get_set_func()) { + rf_contains_null = hybrid_set->contain_null(); + } else if (impl->node_type() == TExprNodeType::BLOOM_PRED) { + auto bloom = impl->get_bloom_filter_func(); + rf_contains_null = bloom && bloom->contain_null(); + } else if (impl->node_type() == TExprNodeType::NULL_AWARE_BINARY_PRED) { + // Min/Max RF built on a null-safe equal join. The literal child holds + // the min or max bound; the NULL semantic is conveyed by the node + // type itself (see create_vbin_predicate in runtime_filter/utils.cpp). + rf_contains_null = true; + } + + for (const auto& pb : boundaries) { + if (_pruned_partition_ids.contains(pb.partition_id) || + newly_pruned.contains(pb.partition_id)) { + continue; + } + + // NULL handling: + // A partition row whose key is NULL matches the RF iff `rf_contains_null`. + // - only_null partition (rows are exclusively NULL): prunable iff !rf_contains_null. + // - mixed (NULL + concrete values): if rf_contains_null, NULL rows alone + // prevent pruning. Otherwise NULL rows can never match, so we ignore + // the NULL portion and let the regular non-NULL intersection decide. + if (pb.only_null) { + if (!rf_contains_null) { + newly_pruned.insert(pb.partition_id); + } + continue; + } + if (pb.contains_null && rf_contains_null) { + continue; + } + + std::visit( + [&](const auto& boundary_cvr) { + using CvrType = std::decay_t<decltype(boundary_cvr)>; + using CppType = typename CvrType::CppType; + + auto hybrid_set = impl->get_set_func(); + if (hybrid_set) { Review Comment: For循环能不能放在外面,这个runtime filter的这个range只需要生成一次,后续的这个每一个partition的都复用这个生成的来计算,而不是每一个partition算一次 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
