This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2f4401189aa29efb171ff9cbb016109d89b9ab3f
Author: Pxl <pxl...@qq.com>
AuthorDate: Wed Mar 13 15:24:56 2024 +0800

    [Bug](top-n) do not update topn filter when sort node and scan node are not 
in the… (#32159)
---
 be/src/olap/tablet_reader.cpp               |  2 --
 be/src/pipeline/exec/sort_sink_operator.cpp | 14 ++++++++------
 be/src/runtime/runtime_predicate.h          | 10 ++++++++--
 be/src/vec/exec/vsort_node.cpp              | 14 ++++++++------
 4 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index 1cecd56a82a..f0229431b7b 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -578,8 +578,6 @@ void 
TabletReader::_init_conditions_param_except_leafnode_of_andnode(
         for (int id : read_params.topn_filter_source_node_ids) {
             auto& runtime_predicate =
                     
read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id);
-            DCHECK(runtime_predicate.inited())
-                    << "runtime predicate not inited, source_node_id=" << id;
             runtime_predicate.set_tablet_schema(_tablet_schema);
         }
     }
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 657dd7d62d2..687332e1aec 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -154,13 +154,15 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block* in
         
local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size());
         RETURN_IF_CANCELLED(state);
 
-        // update runtime predicate
         if (_use_topn_opt) {
-            vectorized::Field new_top = 
local_state._shared_state->sorter->get_top_value();
-            if (!new_top.is_null() && new_top != local_state.old_top) {
-                auto* query_ctx = state->get_query_ctx();
-                
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top));
-                local_state.old_top = std::move(new_top);
+            auto& predicate = 
state->get_query_ctx()->get_runtime_predicate(_node_id);
+            if (predicate.need_update()) {
+                vectorized::Field new_top = 
local_state._shared_state->sorter->get_top_value();
+                if (!new_top.is_null() && new_top != local_state.old_top) {
+                    auto* query_ctx = state->get_query_ctx();
+                    
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top));
+                    local_state.old_top = std::move(new_top);
+                }
             }
         }
         if (!_reuse_mem) {
diff --git a/be/src/runtime/runtime_predicate.h 
b/be/src/runtime/runtime_predicate.h
index fcfc9db7021..4975b037201 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -46,13 +46,19 @@ public:
 
     Status init(PrimitiveType type, bool nulls_first, bool is_asc, const 
std::string& col_name);
 
-    bool inited() {
-        std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    bool inited() const {
+        std::shared_lock<std::shared_mutex> rlock(_rwlock);
         return _inited;
     }
 
+    bool need_update() const {
+        std::shared_lock<std::shared_mutex> rlock(_rwlock);
+        return _inited && _tablet_schema;
+    }
+
     void set_tablet_schema(TabletSchemaSPtr tablet_schema) {
         std::unique_lock<std::shared_mutex> wlock(_rwlock);
+        // when sort node and scan node are not in the same backend, predicate 
will not be initialized
         if (_tablet_schema || !_inited) {
             return;
         }
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index b142f011697..160690f7737 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -141,13 +141,15 @@ Status VSortNode::sink(RuntimeState* state, 
vectorized::Block* input_block, bool
         RETURN_IF_ERROR(_sorter->append_block(input_block));
         RETURN_IF_CANCELLED(state);
 
-        // update runtime predicate
         if (_use_topn_opt) {
-            Field new_top = _sorter->get_top_value();
-            if (!new_top.is_null() && new_top != old_top) {
-                auto* query_ctx = state->get_query_ctx();
-                
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top));
-                old_top = std::move(new_top);
+            auto& predicate = 
state->get_query_ctx()->get_runtime_predicate(_id);
+            if (predicate.need_update()) {
+                vectorized::Field new_top = _sorter->get_top_value();
+                if (!new_top.is_null() && new_top != old_top) {
+                    auto* query_ctx = state->get_query_ctx();
+                    
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top));
+                    old_top = std::move(new_top);
+                }
             }
         }
         if (!_reuse_mem) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to