This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d926041cc3c03160545785c729c1143e05115885
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Tue Sep 10 11:00:56 2024 +0800

    [fix](scanner) Fix deadlock when scanner submit failed (#40495)
    
    We have dead lock when submit scanner to scheduler failed.
    
    pstack looks like
    ```txt
    Thread 2012 (Thread 0x7f87363fb700 (LWP 4179707) "Pipe_normal [wo"):
    #0  0x00007f8b8f3dc82d in __lll_lock_wait () from /lib64/libpthread.so.0
    #1  0x00007f8b8f3d5ad9 in pthread_mutex_lock () from /lib64/libpthread.so.0
    #2  0x000055b20f333e7a in __gthread_mutex_lock (__mutex=0x7f8733d960a8) at 
/mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/x86_64-linux-gnu/c++/11/bits/gthr-default
    .h:749
    #3  std::mutex::lock (this=0x7f8733d960a8) at 
/mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_mutex.h:100
    #4  std::lock_guard<std::mutex>::lock_guard (__m=..., this=<optimized out>) 
at 
/mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_mutex.h:229
    #5  doris::vectorized::ScannerContext::append_block_to_queue 
(this=<optimized out>, scan_task=...) at 
/mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:234
    #6  0x000055b20f32c0f9 in doris::vectorized::ScannerScheduler::submit 
(this=<optimized out>, ctx=..., scan_task=...) at 
/mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:209
    #7  0x000055b20f3338fc in 
doris::vectorized::ScannerContext::submit_scan_task 
(this=this@entry=0x7f8733d96010, scan_task=...) at 
/mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:217
    #8  0x000055b20f3346cd in 
doris::vectorized::ScannerContext::get_block_from_queue (this=0x7f8733d96010, 
state=<optimized out>, block=0x7f871f728de0, eos=0x7f871abce470, id=<optimized 
out>) at 
/mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:290
    #9  0x000055b214cb4f13 in 
doris::pipeline::ScanOperatorX<doris::pipeline::OlapScanLocalState>::get_block 
(this=<optimized out>, state=0x7f872f0eb400, block=0x7f8b8f3dc82d 
<__lll_lock_wait+29>, eos=0x7f871abce470) at 
/mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/scan_operator.cpp:1292
    #10 0x000055b2142b5772 in 
doris::pipeline::ScanOperatorX<doris::pipeline::OlapScanLocalState>::get_block_after_projects
 (this=0x80, state=0x0, block=0x7f8b8f3dc82d <__lll_lock_wait+29>, 
eos=0x7f8733d960a8) at 
/mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/scan_operator.h:363
    #11 0x000055b2142e7880 in 
doris::pipeline::StatefulOperatorX<doris::pipeline::StreamingAggLocalState>::get_block
 (this=0x7f871f9bee00, state=0x7f872f0eb400, block=0x7f8716d49060, 
eos=0x7f87363f4937) at 
/mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/operator.cpp:587
    ```
    Deallock happens with following
    ```cpp
    Status ScannerContext::get_block_from_queue {
         std::unique_lock l(_transfer_lock);
         ...
         if (scan_task->is_eos()) {
         ...
         } else {
              // resubmit current running scanner to read the next block
             submit_scan_task(scan_task);
         }
    }
    
    ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
         _scanner_scheduler->submit(shared_from_this(), scan_task);
    }
    
    void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
                                  std::shared_ptr<ScanTask> scan_task) {
        ...
        if (auto ret = sumbit_task(); !ret) {
            scan_task->set_status(Status::InternalError(
                    "Failed to submit scanner to scanner pool reason:" + 
std::string(ret.msg()) +
                    "|type:" + std::to_string(type)));
            ctx->append_block_to_queue(scan_task);
            return;
        }
    }
    
    void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> 
scan_task) {
        ...
        std::lock_guard<std::mutex> l(_transfer_lock);
        ...
    }
    ```
    Since mutex in cpp is not re-enterable, so the scanner thread will
    deadlock with itself.
    
    This pr fix the problem by making `ScannerScheduler::submit` return a
    Status instead of doing append failed task to the ScannerContext. The
    caller itself will decide where resubmit the scanner or just abort the
    execution of the query.
---
 be/src/vec/exec/scan/scanner_context.cpp   | 44 ++++++++++++++++++++++--------
 be/src/vec/exec/scan/scanner_context.h     |  4 +--
 be/src/vec/exec/scan/scanner_scheduler.cpp | 31 +++++++++++----------
 be/src/vec/exec/scan/scanner_scheduler.h   |  2 +-
 4 files changed, 53 insertions(+), 28 deletions(-)

diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index bab11616c77..52838d7cf46 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -141,7 +141,7 @@ Status ScannerContext::init() {
     for (int i = 0; i < _max_thread_num; ++i) {
         std::weak_ptr<ScannerDelegate> next_scanner;
         if (_scanners.try_dequeue(next_scanner)) {
-            submit_scan_task(std::make_shared<ScanTask>(next_scanner));
+            
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(next_scanner)));
             _num_running_scanners++;
         }
     }
@@ -181,10 +181,10 @@ bool ScannerContext::empty_in_queue(int id) {
     return _blocks_queue.empty();
 }
 
-void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
+Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
     _scanner_sched_counter->update(1);
     _num_scheduled_scanners++;
-    _scanner_scheduler->submit(shared_from_this(), scan_task);
+    return _scanner_scheduler->submit(shared_from_this(), scan_task);
 }
 
 void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> 
scan_task) {
@@ -232,10 +232,15 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         auto scan_task = _blocks_queue.front();
         DCHECK(scan_task);
 
+        // The abnormal status of scanner may come from the execution of the 
scanner itself,
+        // or come from the scanner scheduler, such as TooManyTasks.
         if (!scan_task->status_ok()) {
+            // TODO: If the scanner status is TooManyTasks, maybe we can retry 
the scanner after a while.
+            _process_status = scan_task->get_status();
             _set_scanner_done();
-            return scan_task->get_status();
+            return _process_status;
         }
+
         if (!scan_task->cached_blocks.empty()) {
             auto [current_block, block_size] = 
std::move(scan_task->cached_blocks.front());
             scan_task->cached_blocks.pop_front();
@@ -248,13 +253,20 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
             block->swap(*current_block);
             return_free_block(std::move(current_block));
         } else {
+            // This scan task do not have any cached blocks.
             _blocks_queue.pop_front();
-            if (scan_task->is_eos()) { // current scanner is finished, and no 
more data to read
+            // current scanner is finished, and no more data to read
+            if (scan_task->is_eos()) {
                 _num_finished_scanners++;
                 std::weak_ptr<ScannerDelegate> next_scanner;
                 // submit one of the remaining scanners
                 if (_scanners.try_dequeue(next_scanner)) {
-                    submit_scan_task(std::make_shared<ScanTask>(next_scanner));
+                    auto submit_status = 
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
+                    if (!submit_status.ok()) {
+                        _process_status = submit_status;
+                        _set_scanner_done();
+                        return _process_status;
+                    }
                 } else {
                     // no more scanner to be scheduled
                     // `_free_blocks` serve all running scanners, maybe it's 
too large for the remaining scanners
@@ -270,11 +282,16 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
                 }
             } else {
                 // resubmit current running scanner to read the next block
-                submit_scan_task(scan_task);
+                Status submit_status = submit_scan_task(scan_task);
+                if (!submit_status.ok()) {
+                    _process_status = submit_status;
+                    _set_scanner_done();
+                    return _process_status;
+                }
             }
         }
         // scale up
-        _try_to_scale_up();
+        RETURN_IF_ERROR(_try_to_scale_up());
     }
 
     if (_num_finished_scanners == _all_scanners.size() && 
_blocks_queue.empty()) {
@@ -289,7 +306,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     return Status::OK();
 }
 
-void ScannerContext::_try_to_scale_up() {
+Status ScannerContext::_try_to_scale_up() {
     // Four criteria to determine whether to increase the parallelism of the 
scanners
     // 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
     // 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get 
blocks
@@ -306,7 +323,7 @@ void ScannerContext::_try_to_scale_up() {
             // when _last_wait_duration_ratio > 0, it has scaled up before.
             // we need to determine if the scale-up is effective:
             // the wait duration ratio after last scaling up should less than 
80% of `_last_wait_duration_ratio`
-            return;
+            return Status::OK();
         }
 
         bool is_scale_up = false;
@@ -322,7 +339,10 @@ void ScannerContext::_try_to_scale_up() {
             // get enough memory to launch one more scanner.
             std::weak_ptr<ScannerDelegate> scale_up_scanner;
             if (_scanners.try_dequeue(scale_up_scanner)) {
-                submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner));
+                // Just return error to caller.
+                // Because _try_to_scale_up is called under _transfer_lock 
locked, if we add the scanner
+                // to the block queue, we will get a deadlock.
+                
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner)));
                 _num_running_scanners++;
                 _scale_up_scanners_counter->update(1);
                 is_scale_up = true;
@@ -337,6 +357,8 @@ void ScannerContext::_try_to_scale_up() {
             _total_wait_block_time = 0;
         }
     }
+
+    return Status::OK();
 }
 
 Status ScannerContext::validate_block_schema(Block* block) {
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index d97fc731fe5..972ec3a6b30 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -137,7 +137,7 @@ public:
     // set the next scanned block to `ScanTask::current_block`
     // set the error state to `ScanTask::status`
     // set the `eos` to `ScanTask::eos` if there is no more data in current 
scanner
-    void submit_scan_task(std::shared_ptr<ScanTask> scan_task);
+    Status submit_scan_task(std::shared_ptr<ScanTask> scan_task);
 
     // append the running scanner and its cached block to `_blocks_queue`
     void append_block_to_queue(std::shared_ptr<ScanTask> scan_task);
@@ -184,7 +184,7 @@ protected:
     /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough 
memory to scale up
     /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
     void _set_scanner_done();
-    void _try_to_scale_up();
+    Status _try_to_scale_up();
 
     RuntimeState* _state = nullptr;
     pipeline::ScanLocalStateBase* _local_state = nullptr;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 351f5d4e275..7f868fba5a6 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -119,23 +119,23 @@ Status ScannerScheduler::init(ExecEnv* env) {
     return Status::OK();
 }
 
-void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
-                              std::shared_ptr<ScanTask> scan_task) {
+Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
+                                std::shared_ptr<ScanTask> scan_task) {
     scan_task->last_submit_time = GetCurrentTimeNanos();
     if (ctx->done()) {
-        return;
+        return Status::OK();
     }
     auto task_lock = ctx->task_exec_ctx();
     if (task_lock == nullptr) {
         LOG(INFO) << "could not lock task execution context, query " << 
ctx->debug_string()
                   << " maybe finished";
-        return;
+        return Status::OK();
     }
 
     if (ctx->thread_token != nullptr) {
         std::shared_ptr<ScannerDelegate> scanner_delegate = 
scan_task->scanner.lock();
         if (scanner_delegate == nullptr) {
-            return;
+            return Status::OK();
         }
 
         scanner_delegate->_scanner->start_wait_worker_timer();
@@ -152,13 +152,12 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
         });
         if (!s.ok()) {
             scan_task->set_status(s);
-            ctx->append_block_to_queue(scan_task);
-            return;
+            return s;
         }
     } else {
         std::shared_ptr<ScannerDelegate> scanner_delegate = 
scan_task->scanner.lock();
         if (scanner_delegate == nullptr) {
-            return;
+            return Status::OK();
         }
 
         scanner_delegate->_scanner->start_wait_worker_timer();
@@ -186,14 +185,18 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
             return scan_sched->submit_scan_task(simple_scan_task);
         };
 
-        if (auto ret = sumbit_task(); !ret) {
-            scan_task->set_status(Status::InternalError(
-                    "Failed to submit scanner to scanner pool reason:" + 
std::string(ret.msg()) +
-                    "|type:" + std::to_string(type)));
-            ctx->append_block_to_queue(scan_task);
-            return;
+        Status submit_status = sumbit_task();
+        if (!submit_status.ok()) {
+            // User will see TooManyTasks error. It looks like a more 
reasonable error.
+            Status scan_task_status = Status::TooManyTasks(
+                    "Failed to submit scanner to scanner pool reason:" +
+                    std::string(submit_status.msg()) + "|type:" + 
std::to_string(type));
+            scan_task->set_status(scan_task_status);
+            return scan_task_status;
         }
     }
+
+    return Status::OK();
 }
 
 std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index ddc61396e23..439291f2107 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -57,7 +57,7 @@ public:
 
     [[nodiscard]] Status init(ExecEnv* env);
 
-    void submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> 
scan_task);
+    Status submit(std::shared_ptr<ScannerContext> ctx, 
std::shared_ptr<ScanTask> scan_task);
 
     void stop();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to