This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1114a8cd882 [Bug](join) return eof when join build sink awakend by 
downstream source (#47380)
1114a8cd882 is described below

commit 1114a8cd8822f7c212857d5541346bed461bbbe3
Author: Pxl <x...@selectdb.com>
AuthorDate: Sun Jan 26 15:05:07 2025 +0800

    [Bug](join) return eof when join build sink awakend by downstream source 
(#47380)
    
    ### What problem does this PR solve?
    1. return eof when join build sink awakend by downstream source to avoid
    HashJoinBuildSinkLocalState::close meet error.
    
    
![QQ_1737641060365](https://github.com/user-attachments/assets/8b8ddc15-7616-45ca-8afa-8895df21b52c)
    2. add WakeUpEarlyReason to profile
    3. add debug point `Pipeline::make_all_runnable.sleep` to reproduce
    problem in regression test
    ```cpp
    Exception in inverted_index_p0/ssb_unique_sql_zstd/sql/q4.3.sql:
    java.lang.IllegalStateException: exceptions : exception : errCode = 2, 
detailMessage = (127.0.0.1)[INTERNAL_ERROR]rf process meet error: [E6] bf not 
inited and not ignored/disabled, rf: RuntimeFilter: (id = 0, type = 
bloomfilter, is_broadcast: true, ignored: false, disabled: false, 
build_bf_cardinality: true, dependency: none, synced_size: -1, 
has_local_target: true, has_remote_target: false, error_msg: []
      0#  doris::Exception::Exception(int, std::basic_string_view<char, 
std::char_traits<char> > const&) at /root/doris/be/src/common/exception.cpp:29
      1#  doris::Exception::Exception<std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> > >(int, 
std::basic_string_view<char, std::char_traits<char> > const&, 
std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> 
>&&) at 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:187
      2#  doris::IRuntimeFilter::signal() at 
/root/doris/be/src/exprs/runtime_filter.cpp:610
      3#  doris::IRuntimeFilter::publish(doris::RuntimeState*, 
bool)::$_1::operator()(std::shared_ptr<doris::RuntimePredicateWrapper>, bool, 
unsigned long) const at /root/doris/be/src/exprs/runtime_filter.cpp:0
      4#  doris::IRuntimeFilter::publish(doris::RuntimeState*, bool) at 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
      5#  doris::VRuntimeFilterSlots::publish(doris::RuntimeState*, bool) at 
/root/doris/be/src/exprs/runtime_filter_slots.h:0
      6#  
doris::pipeline::HashJoinBuildSinkLocalState::close(doris::RuntimeState*, 
doris::Status) at /root/doris/be/src/pipeline/exec/hashjoin_build_sink.cpp:173
      7#  doris::pipeline::DataSinkOperatorXBase::close(doris::RuntimeState*, 
doris::Status) at 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
      8#  doris::pipeline::PipelineTask::close(doris::Status, bool) at 
/root/doris/be/src/common/status.h:390
      9#  doris::pipeline::_close_task(doris::pipeline::PipelineTask*, 
doris::Status) at 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
      10# doris::pipeline::TaskScheduler::_do_work(int) at 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
      11# doris::ThreadPool::dispatch_thread() at 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/move.h:206
      12# doris::Thread::supervise_thread(void*) at 
/var/local/ldb-toolchain/bin/../usr/include/pthread.h:563
      13# ?
      14# __clone
    ```
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  14 ++--
 be/src/pipeline/pipeline.cpp                       |   8 +++
 be/src/vec/runtime/shared_hash_table_controller.h  |   1 -
 .../join/test_slow_close/test_slow_close.out       | Bin 0 -> 114 bytes
 .../join/test_slow_close/test_slow_close.groovy    |  78 +++++++++++++++++++++
 5 files changed, 91 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 47025aa62f7..e53ebca2304 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -172,20 +172,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         SCOPED_TIMER(_publish_runtime_filter_timer);
         RETURN_IF_ERROR(_runtime_filter_slots->publish(state, 
!_should_build_hash_table));
     } catch (Exception& e) {
-        bool blocked_by_complete_build_stage = p._shared_hashtable_controller 
&&
-                                               
!p._shared_hash_table_context->complete_build_stage;
         bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
                                                    
p._shared_hashtable_controller &&
                                                    
!p._shared_hash_table_context->signaled;
 
         return Status::InternalError(
                 "rf process meet error: {}, wake_up_early: {}, 
should_build_hash_table: "
-                "{}, _finish_dependency: {}, blocked_by_complete_build_stage: 
{}, "
+                "{}, _finish_dependency: {}, "
                 "blocked_by_shared_hash_table_signal: "
                 "{}",
                 e.to_string(), state->get_task()->wake_up_early(), 
_should_build_hash_table,
-                _finish_dependency->debug_string(), 
blocked_by_complete_build_stage,
-                blocked_by_shared_hash_table_signal);
+                _finish_dependency->debug_string(), 
blocked_by_shared_hash_table_signal);
     }
     return Base::close(state, exec_status);
 }
@@ -557,7 +554,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
         if (_shared_hashtable_controller) {
             _shared_hash_table_context->status = Status::OK();
-            _shared_hash_table_context->complete_build_stage = true;
             // arena will be shared with other instances.
             _shared_hash_table_context->arena = 
local_state._shared_state->arena;
             _shared_hash_table_context->hash_table_variants =
@@ -569,12 +565,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                     local_state._shared_state->build_indexes_null;
             
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
         }
-    } else if (!local_state._should_build_hash_table &&
-               _shared_hash_table_context->complete_build_stage) {
+    } else if (!local_state._should_build_hash_table) {
         DCHECK(_shared_hashtable_controller != nullptr);
         DCHECK(_shared_hash_table_context != nullptr);
         // the instance which is not build hash table, it's should wait the 
signal of hash table build finished.
-        // but if it's running and signaled == false, maybe the source 
operator have closed caused by some short circuit,
+        // but if it's running and signaled == false, maybe the source 
operator have closed caused by some short circuit
+        // return eof will make task marked as wake_up_early
         if (!_shared_hash_table_context->signaled) {
             return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
         }
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 1292edea9ee..2dd0394d2ae 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -112,6 +112,14 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
 }
 
 void Pipeline::make_all_runnable() {
+    DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
+        auto pipeline_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                "Pipeline::make_all_runnable", "pipeline_id", 0);
+        if (pipeline_id == id()) {
+            sleep(10);
+        }
+    });
+
     if (_sink->count_down_destination()) {
         for (auto* task : _tasks) {
             if (task) {
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index ff9ad4d0ef4..ad4c7341120 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -69,7 +69,6 @@ struct SharedHashTableContext {
     std::map<int, RuntimeFilterContextSPtr> runtime_filters;
     std::atomic<bool> signaled = false;
     bool short_circuit_for_null_in_probe_side = false;
-    std::atomic<bool> complete_build_stage = false;
 };
 
 using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
diff --git 
a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out 
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out
new file mode 100644
index 00000000000..cb92be84e47
Binary files /dev/null and 
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ
diff --git 
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
new file mode 100644
index 00000000000..8d1c33ff923
--- /dev/null
+++ 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_slow_close") {
+    sql "set disable_join_reorder=true;"
+    sql "set runtime_filter_type='bloom_filter';"
+    sql "set parallel_pipeline_task_num=3"
+    sql "set ignore_runtime_filter_ids='1,2';"
+    sql "set enable_runtime_filter_prune=false;"
+
+    sql """ drop table if exists t1; """
+    sql """ drop table if exists t3; """
+    sql """ drop table if exists t5; """
+
+    sql """
+        create table t1 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+        """
+
+    sql """
+        create table t3 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+
+    """
+
+    sql """
+        create table t5 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+    """
+
+    sql """
+    insert into t1 select e1,e1 from (select 1 k1) as t lateral view 
explode_numbers(100000) tmp1 as e1;
+    """
+    
+    sql """
+    insert into t3 values(1,1),(2,2),(3,3);
+    """
+
+    sql """
+    insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5);
+    """
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep",[pipeline_id:
 4])
+        qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join 
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] 
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to