This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new dedb56134b (threadpool) threadpool schedules does not work right on concurrent token (#12369) dedb56134b is described below commit dedb56134bd23663751bc09c27569a5e81823add Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Thu Sep 8 09:10:13 2022 +0800 (threadpool) threadpool schedules does not work right on concurrent token (#12369) Assuming there is a concurrent thread token whose concurrency is 2, and the 1st submit on the token is submitted to threadpool while the 2nd is not submitted due to busy. The token's active_threads is 1, then thread pool does not schedule the token. The patch fixes the problem. --- be/src/util/threadpool.cpp | 19 ++++++++++++------ be/test/util/threadpool_test.cpp | 42 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index f7f194f735..de4615dd90 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -587,19 +587,26 @@ void ThreadPool::dispatch_thread() { state == ThreadPoolToken::State::QUIESCING); --token->_active_threads; --token->_num_submitted_tasks; + + // handle shutdown && idle if (token->_active_threads == 0) { if (state == ThreadPoolToken::State::QUIESCING) { DCHECK(token->_entries.empty()); token->transition(ThreadPoolToken::State::QUIESCED); } else if (token->_entries.empty()) { token->transition(ThreadPoolToken::State::IDLE); - } else if (token->mode() == ExecutionMode::SERIAL) { - _queue.emplace_back(token); - ++token->_num_submitted_tasks; - --token->_num_unsubmitted_tasks; } - } else if (token->mode() == ExecutionMode::CONCURRENT && token->_num_submitted_tasks < token->_max_concurrency - && token->_num_unsubmitted_tasks > 0) { + } + + // We decrease _num_submitted_tasks holding lock, so the following DCHECK works. + DCHECK(token->_num_submitted_tasks < token->_max_concurrency); + + // If token->state is running and there are unsubmitted tasks in the token, we put + // the token back. + if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) { + // SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0. + // CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2 + // threads are running for the token. _queue.emplace_back(token); ++token->_num_submitted_tasks; --token->_num_unsubmitted_tasks; diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index a87d2b68c4..0dec77d5c7 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -894,6 +894,48 @@ TEST_F(ThreadPoolTest, TestThreadPoolDynamicAdjustMaximumMinimum) { ASSERT_EQ(0, _pool->num_threads()); } +TEST_F(ThreadPoolTest, TestThreadTokenSerial) { + std::unique_ptr<ThreadPool> thread_pool; + ThreadPoolBuilder("my_pool") + .set_min_threads(0) + .set_max_threads(1) + .set_max_queue_size(10) + .set_idle_timeout(MonoDelta::FromMilliseconds(2000)) + .build(&thread_pool); + + std::unique_ptr<ThreadPoolToken> token1 = + thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL, 2); + token1->submit_func(std::bind(&MyFunc, 0, 1)); + std::cout << "after submit 1" << std::endl; + token1->wait(); + ASSERT_EQ(0, token1->num_tasks()); + for (int i = 0; i < 10; i++) { + token1->submit_func(std::bind(&MyFunc, i, 1)); + } + std::cout << "after submit 1" << std::endl; + token1->wait(); + ASSERT_EQ(0, token1->num_tasks()); +} + +TEST_F(ThreadPoolTest, TestThreadTokenConcurrent) { + std::unique_ptr<ThreadPool> thread_pool; + ThreadPoolBuilder("my_pool") + .set_min_threads(0) + .set_max_threads(1) + .set_max_queue_size(10) + .set_idle_timeout(MonoDelta::FromMilliseconds(2000)) + .build(&thread_pool); + + std::unique_ptr<ThreadPoolToken> token1 = + thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 2); + for (int i = 0; i < 10; i++) { + token1->submit_func(std::bind(&MyFunc, i, 1)); + } + std::cout << "after submit 1" << std::endl; + token1->wait(); + ASSERT_EQ(0, token1->num_tasks()); +} + } // namespace doris int main(int argc, char* argv[]) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org