This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 5719995 [Compaction][ThreadPool]Support adjust compaction threads num at runtime (#5781) 5719995 is described below commit 57199955d6b1f0b8db747876d0b7f4f4bc318235 Author: weizuo93 <wei...@apache.org> AuthorDate: Thu Sep 2 10:01:44 2021 +0800 [Compaction][ThreadPool]Support adjust compaction threads num at runtime (#5781) * adjust thread number of compaction thread pool dynamically Co-authored-by: weizuo <wei...@xiaomi.com> --- be/src/common/config.h | 2 +- be/src/olap/olap_server.cpp | 24 ++++++++++++ be/src/util/threadpool.cpp | 76 ++++++++++++++++++++++++++++++-------- be/src/util/threadpool.h | 28 +++++++++++++- be/test/util/threadpool_test.cpp | 79 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 190 insertions(+), 19 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index eb42572..d31f25a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -280,7 +280,7 @@ CONF_mInt32(cumulative_compaction_skip_window_seconds, "30"); CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min // This config can be set to limit thread number in compaction thread pool. -CONF_Int32(max_compaction_threads, "10"); +CONF_mInt32(max_compaction_threads, "10"); // Thread count to do tablet meta checkpoint, -1 means use the data directories count. CONF_Int32(max_meta_checkpoint_threads, "-1"); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index bad51dc..851e807 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -333,6 +333,30 @@ void StorageEngine::_compaction_tasks_producer_callback() { int64_t interval = config::generate_compaction_tasks_min_interval_ms; do { if (!config::disable_auto_compaction) { + VLOG_CRITICAL << "compaction thread pool. num_threads: " << _compaction_thread_pool->num_threads() + << ", num_threads_pending_start: " << _compaction_thread_pool->num_threads_pending_start() + << ", num_active_threads: " << _compaction_thread_pool->num_active_threads() + << ", max_threads: " << _compaction_thread_pool->max_threads() + << ", min_threads: " << _compaction_thread_pool->min_threads() + << ", num_total_queued_tasks: " << _compaction_thread_pool->get_queue_size(); + + if(_compaction_thread_pool->max_threads() != config::max_compaction_threads) { + int old_max_threads = _compaction_thread_pool->max_threads(); + Status status = _compaction_thread_pool->set_max_threads(config::max_compaction_threads); + if (status.ok()) { + LOG(INFO) << "update compaction thread pool max_threads from " + << old_max_threads << " to " << config::max_compaction_threads; + } + } + if(_compaction_thread_pool->min_threads() != config::max_compaction_threads) { + int old_min_threads = _compaction_thread_pool->min_threads(); + Status status = _compaction_thread_pool->set_min_threads(config::max_compaction_threads); + if (status.ok()) { + LOG(INFO) << "update compaction thread pool min_threads from " + << old_min_threads << " to " << config::max_compaction_threads; + } + } + bool check_score = false; int64_t cur_time = UnixMillis(); if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index b06a494..f7f194f 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -472,7 +472,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token } // If we failed to create a thread, but there are still some other // worker threads, log a warning message and continue. - LOG(ERROR) << "Thread pool failed to create thread: " << status.to_string(); + LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string(); } } @@ -509,9 +509,6 @@ void ThreadPool::dispatch_thread() { DCHECK_GT(_num_threads_pending_start, 0); _num_threads++; _num_threads_pending_start--; - // If we are one of the first '_min_threads' to start, we must be - // a "permanent" thread. - bool permanent = _num_threads <= _min_threads; // Owned by this worker thread and added/removed from _idle_threads as needed. IdleThread me(&_lock); @@ -523,6 +520,10 @@ void ThreadPool::dispatch_thread() { break; } + if (_num_threads + _num_threads_pending_start > _max_threads) { + break; + } + if (_queue.empty()) { // There's no work to do, let's go idle. // @@ -536,21 +537,17 @@ void ThreadPool::dispatch_thread() { _idle_threads.erase(_idle_threads.iterator_to(me)); } }); - if (permanent) { - me.not_empty.wait(); - } else { - if (!me.not_empty.wait_for(_idle_timeout)) { - // After much investigation, it appears that pthread condition variables have - // a weird behavior in which they can return ETIMEDOUT from timed_wait even if - // another thread did in fact signal. Apparently after a timeout there is some - // brief period during which another thread may actually grab the internal mutex - // protecting the state, signal, and release again before we get the mutex. So, - // we'll recheck the empty queue case regardless. - if (_queue.empty()) { + if (!me.not_empty.wait_for(_idle_timeout)) { + // After much investigation, it appears that pthread condition variables have + // a weird behavior in which they can return ETIMEDOUT from timed_wait even if + // another thread did in fact signal. Apparently after a timeout there is some + // brief period during which another thread may actually grab the internal mutex + // protecting the state, signal, and release again before we get the mutex. So, + // we'll recheck the empty queue case regardless. + if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) { VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after " << _idle_timeout.ToMilliseconds() << "ms of idle time."; break; - } } } continue; @@ -645,6 +642,53 @@ void ThreadPool::check_not_pool_thread_unlocked() { } } +Status ThreadPool::set_min_threads(int min_threads) { + MutexLock unique_lock(&_lock); + if (min_threads > _max_threads) { + // min threads can not be set greater than max threads + return Status::InternalError("set thread pool min_threads failed"); + } + + _min_threads = min_threads; + if (min_threads > _num_threads + _num_threads_pending_start) { + int addition_threads = min_threads - _num_threads - _num_threads_pending_start; + _num_threads_pending_start += addition_threads; + for (int i = 0; i < addition_threads; i++) { + Status status = create_thread(); + if (!status.ok()) { + _num_threads_pending_start--; + LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string(); + return status; + } + } + } + return Status::OK(); +} + +Status ThreadPool::set_max_threads(int max_threads) { + MutexLock unique_lock(&_lock); + if (_min_threads > max_threads) { + // max threads can not be set less than min threads + return Status::InternalError("set thread pool max_threads failed"); + } + + _max_threads = max_threads; + if (_max_threads > _num_threads + _num_threads_pending_start) { + int addition_threads = _max_threads - _num_threads - _num_threads_pending_start; + addition_threads = std::min(addition_threads, _total_queued_tasks); + _num_threads_pending_start += addition_threads; + for (int i = 0; i < addition_threads; i++) { + Status status = create_thread(); + if (!status.ok()) { + _num_threads_pending_start--; + LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string(); + return status; + } + } + } + return Status::OK(); +} + std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) { return o << ThreadPoolToken::state_to_string(s); } diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 8d2f26e..ca813bf 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -27,6 +27,7 @@ #include <unordered_set> #include <utility> +#include "common/atomic.h" #include "common/status.h" #include "gutil/ref_counted.h" #include "util/condition_variable.h" @@ -179,6 +180,9 @@ public: // Returns true if the pool reached the idle state, false otherwise. bool wait_for(const MonoDelta& delta); + Status set_min_threads(int min_threads); + Status set_max_threads(int max_threads); + // Allocates a new token for use in token-based task submission. All tokens // must be destroyed before their ThreadPool is destroyed. // @@ -199,6 +203,26 @@ public: return _num_threads + _num_threads_pending_start; } + int max_threads() const { + MutexLock l(&_lock); + return _max_threads; + } + + int min_threads() const { + MutexLock l(&_lock); + return _min_threads; + } + + int num_threads_pending_start() const { + MutexLock l(&_lock); + return _num_threads_pending_start; + } + + int num_active_threads() const { + MutexLock l(&_lock); + return _active_threads; + } + int get_queue_size() const { MutexLock l(&_lock); return _total_queued_tasks; @@ -241,8 +265,8 @@ private: void release_token(ThreadPoolToken* t); const std::string _name; - const int _min_threads; - const int _max_threads; + int _min_threads; + int _max_threads; const int _max_queue_size; const MonoDelta _idle_timeout; diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index b8ebebd..ef3d908 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -795,6 +795,85 @@ TEST_F(ThreadPoolTest, TestNormal) { ASSERT_EQ(0, token5->num_tasks()); } +TEST_F(ThreadPoolTest, TestThreadPoolDynamicAdjustMaximumMinimum) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(3) + .set_max_threads(3) + .set_idle_timeout(MonoDelta::FromMilliseconds(1))) + .ok()); + + ASSERT_EQ(3, _pool->min_threads()); + ASSERT_EQ(3, _pool->max_threads()); + ASSERT_EQ(3, _pool->num_threads()); + + ASSERT_TRUE(!_pool->set_min_threads(4).ok()); + ASSERT_TRUE(!_pool->set_max_threads(2).ok()); + + ASSERT_TRUE(_pool->set_min_threads(2).ok()); + ASSERT_EQ(2, _pool->min_threads()); + ASSERT_TRUE(_pool->set_max_threads(4).ok()); + ASSERT_EQ(4, _pool->max_threads()); + + ASSERT_TRUE(_pool->set_min_threads(3).ok()); + ASSERT_EQ(3, _pool->min_threads()); + ASSERT_TRUE(_pool->set_max_threads(3).ok()); + ASSERT_EQ(3, _pool->max_threads()); + + CountDownLatch latch_1(1); + CountDownLatch latch_2(1); + CountDownLatch latch_3(1); + CountDownLatch latch_4(1); + CountDownLatch latch_5(1); + CountDownLatch latch_6(1); + CountDownLatch latch_7(1); + CountDownLatch latch_8(1); + CountDownLatch latch_9(1); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_1)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_2)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_3)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_4)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_5)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_6)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_7)).ok()); + ASSERT_EQ(3, _pool->num_threads()); + ASSERT_TRUE(_pool->set_max_threads(4).ok()); + ASSERT_EQ(4, _pool->max_threads()); + ASSERT_EQ(4, _pool->num_threads()); + ASSERT_TRUE(_pool->set_max_threads(5).ok()); + ASSERT_EQ(5, _pool->max_threads()); + ASSERT_EQ(5, _pool->num_threads()); + ASSERT_TRUE(_pool->set_max_threads(6).ok()); + ASSERT_EQ(6, _pool->max_threads()); + ASSERT_EQ(6, _pool->num_threads()); + ASSERT_TRUE(_pool->set_max_threads(4).ok()); + ASSERT_EQ(4, _pool->max_threads()); + latch_1.count_down(); + latch_2.count_down(); + latch_3.count_down(); + SleepFor(MonoDelta::FromMilliseconds(500)); + ASSERT_EQ(4, _pool->num_threads()); + + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_8)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_9)).ok()); + ASSERT_EQ(4, _pool->num_threads()); + + ASSERT_TRUE(_pool->set_min_threads(2).ok()); + ASSERT_EQ(2, _pool->min_threads()); + + latch_4.count_down(); + latch_5.count_down(); + latch_6.count_down(); + latch_7.count_down(); + latch_8.count_down(); + latch_9.count_down(); + SleepFor(MonoDelta::FromMilliseconds(500)); + ASSERT_EQ(2, _pool->num_threads()); + + _pool->wait(); + _pool->shutdown(); + ASSERT_EQ(0, _pool->num_threads()); +} + } // namespace doris int main(int argc, char* argv[]) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org