This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2f4401189aa29efb171ff9cbb016109d89b9ab3f Author: Pxl <pxl...@qq.com> AuthorDate: Wed Mar 13 15:24:56 2024 +0800 [Bug](top-n) do not update topn filter when sort node and scan node are not in the… (#32159) --- be/src/olap/tablet_reader.cpp | 2 -- be/src/pipeline/exec/sort_sink_operator.cpp | 14 ++++++++------ be/src/runtime/runtime_predicate.h | 10 ++++++++-- be/src/vec/exec/vsort_node.cpp | 14 ++++++++------ 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 1cecd56a82a..f0229431b7b 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -578,8 +578,6 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode( for (int id : read_params.topn_filter_source_node_ids) { auto& runtime_predicate = read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id); - DCHECK(runtime_predicate.inited()) - << "runtime predicate not inited, source_node_id=" << id; runtime_predicate.set_tablet_schema(_tablet_schema); } } diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 657dd7d62d2..687332e1aec 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -154,13 +154,15 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size()); RETURN_IF_CANCELLED(state); - // update runtime predicate if (_use_topn_opt) { - vectorized::Field new_top = local_state._shared_state->sorter->get_top_value(); - if (!new_top.is_null() && new_top != local_state.old_top) { - auto* query_ctx = state->get_query_ctx(); - RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top)); - local_state.old_top = std::move(new_top); + auto& predicate = state->get_query_ctx()->get_runtime_predicate(_node_id); + if (predicate.need_update()) { + vectorized::Field new_top = local_state._shared_state->sorter->get_top_value(); + if (!new_top.is_null() && new_top != local_state.old_top) { + auto* query_ctx = state->get_query_ctx(); + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top)); + local_state.old_top = std::move(new_top); + } } } if (!_reuse_mem) { diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index fcfc9db7021..4975b037201 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -46,13 +46,19 @@ public: Status init(PrimitiveType type, bool nulls_first, bool is_asc, const std::string& col_name); - bool inited() { - std::unique_lock<std::shared_mutex> wlock(_rwlock); + bool inited() const { + std::shared_lock<std::shared_mutex> rlock(_rwlock); return _inited; } + bool need_update() const { + std::shared_lock<std::shared_mutex> rlock(_rwlock); + return _inited && _tablet_schema; + } + void set_tablet_schema(TabletSchemaSPtr tablet_schema) { std::unique_lock<std::shared_mutex> wlock(_rwlock); + // when sort node and scan node are not in the same backend, predicate will not be initialized if (_tablet_schema || !_inited) { return; } diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index b142f011697..160690f7737 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -141,13 +141,15 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool RETURN_IF_ERROR(_sorter->append_block(input_block)); RETURN_IF_CANCELLED(state); - // update runtime predicate if (_use_topn_opt) { - Field new_top = _sorter->get_top_value(); - if (!new_top.is_null() && new_top != old_top) { - auto* query_ctx = state->get_query_ctx(); - RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top)); - old_top = std::move(new_top); + auto& predicate = state->get_query_ctx()->get_runtime_predicate(_id); + if (predicate.need_update()) { + vectorized::Field new_top = _sorter->get_top_value(); + if (!new_top.is_null() && new_top != old_top) { + auto* query_ctx = state->get_query_ctx(); + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top)); + old_top = std::move(new_top); + } } } if (!_reuse_mem) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org