This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new dedb56134b (threadpool) threadpool schedules does not work right on 
concurrent token (#12369)
dedb56134b is described below

commit dedb56134bd23663751bc09c27569a5e81823add
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Thu Sep 8 09:10:13 2022 +0800

    (threadpool) threadpool schedules does not work right on concurrent token 
(#12369)
    
    Assuming there is a concurrent thread token whose concurrency is 2, and the 
1st
    submit on the token is submitted to threadpool while the 2nd is not 
submitted due
    to busy. The token's active_threads is 1, then thread pool does not 
schedule the
    token.
    
    The patch fixes the problem.
---
 be/src/util/threadpool.cpp       | 19 ++++++++++++------
 be/test/util/threadpool_test.cpp | 42 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 6 deletions(-)

diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index f7f194f735..de4615dd90 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -587,19 +587,26 @@ void ThreadPool::dispatch_thread() {
                state == ThreadPoolToken::State::QUIESCING);
         --token->_active_threads;
         --token->_num_submitted_tasks;
+
+        // handle shutdown && idle
         if (token->_active_threads == 0) {
             if (state == ThreadPoolToken::State::QUIESCING) {
                 DCHECK(token->_entries.empty());
                 token->transition(ThreadPoolToken::State::QUIESCED);
             } else if (token->_entries.empty()) {
                 token->transition(ThreadPoolToken::State::IDLE);
-            } else if (token->mode() == ExecutionMode::SERIAL) {
-                _queue.emplace_back(token);
-                ++token->_num_submitted_tasks;
-                --token->_num_unsubmitted_tasks;
             }
-        } else if (token->mode() == ExecutionMode::CONCURRENT && 
token->_num_submitted_tasks < token->_max_concurrency
-                && token->_num_unsubmitted_tasks > 0) {
+        }
+
+        // We decrease _num_submitted_tasks holding lock, so the following 
DCHECK works.
+        DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
+
+        // If token->state is running and there are unsubmitted tasks in the 
token, we put
+        // the token back.
+        if (token->_num_unsubmitted_tasks > 0 && state == 
ThreadPoolToken::State::RUNNING) {
+            // SERIAL: if _entries is not empty, then num_unsubmitted_tasks 
must be greater than 0.
+            // CONCURRENT: we have to check _num_unsubmitted_tasks because 
there may be at least 2 
+            // threads are running for the token.
             _queue.emplace_back(token);
             ++token->_num_submitted_tasks;
             --token->_num_unsubmitted_tasks;
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index a87d2b68c4..0dec77d5c7 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -894,6 +894,48 @@ TEST_F(ThreadPoolTest, 
TestThreadPoolDynamicAdjustMaximumMinimum) {
     ASSERT_EQ(0, _pool->num_threads());
 }
 
+TEST_F(ThreadPoolTest, TestThreadTokenSerial) {
+    std::unique_ptr<ThreadPool> thread_pool;
+    ThreadPoolBuilder("my_pool")
+            .set_min_threads(0)
+            .set_max_threads(1)
+            .set_max_queue_size(10)
+            .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+            .build(&thread_pool);
+
+    std::unique_ptr<ThreadPoolToken> token1 =
+            thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL, 2);
+    token1->submit_func(std::bind(&MyFunc, 0, 1));
+    std::cout << "after submit 1" << std::endl;
+    token1->wait();
+    ASSERT_EQ(0, token1->num_tasks());
+    for (int i = 0; i < 10; i++) {
+        token1->submit_func(std::bind(&MyFunc, i, 1));
+    }
+    std::cout << "after submit 1" << std::endl;
+    token1->wait();
+    ASSERT_EQ(0, token1->num_tasks());
+} 
+
+TEST_F(ThreadPoolTest, TestThreadTokenConcurrent) {
+    std::unique_ptr<ThreadPool> thread_pool;
+    ThreadPoolBuilder("my_pool")
+            .set_min_threads(0)
+            .set_max_threads(1)
+            .set_max_queue_size(10)
+            .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+            .build(&thread_pool);
+
+    std::unique_ptr<ThreadPoolToken> token1 =
+            thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 2);
+    for (int i = 0; i < 10; i++) {
+        token1->submit_func(std::bind(&MyFunc, i, 1));
+    }
+    std::cout << "after submit 1" << std::endl;
+    token1->wait();
+    ASSERT_EQ(0, token1->num_tasks());
+} 
+
 } // namespace doris
 
 int main(int argc, char* argv[]) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to