This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 95591ce49a [refactor](cv)wait on condition variable more gently (#12620) 95591ce49a is described below commit 95591ce49a3215d3223748ed219b2441481b4678 Author: starocean999 <40539150+starocean...@users.noreply.github.com> AuthorDate: Tue Nov 8 08:40:31 2022 +0800 [refactor](cv)wait on condition variable more gently (#12620) --- be/src/agent/task_worker_pool.cpp | 85 ++++++++----------- be/src/util/blocking_priority_queue.hpp | 135 ++++++++++++++----------------- be/src/util/blocking_queue.hpp | 58 +++++-------- be/src/util/threadpool.cpp | 17 ++-- be/src/vec/exec/scan/scanner_context.cpp | 15 ++-- 5 files changed, 127 insertions(+), 183 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 5e482ef90f..3f5ca98021 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -341,9 +341,8 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() { TCreateTabletReq create_tablet_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -416,9 +415,8 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() { TDropTabletReq drop_tablet_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -469,9 +467,8 @@ void TaskWorkerPool::_alter_tablet_worker_thread_callback() { TAgentTaskRequest agent_task_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -592,9 +589,8 @@ void TaskWorkerPool::_push_worker_thread_callback() { int32_t index = 0; do { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -664,9 +660,8 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { TPublishVersionRequest publish_version_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -773,9 +768,8 @@ void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() { TClearTransactionTaskRequest clear_transaction_task_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -826,9 +820,8 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { TUpdateTabletMetaInfoReq update_tablet_meta_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -902,9 +895,8 @@ void TaskWorkerPool::_clone_worker_thread_callback() { { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -952,9 +944,8 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { TStorageMediumMigrateReq storage_medium_migrate_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -1054,9 +1045,8 @@ void TaskWorkerPool::_check_consistency_worker_thread_callback() { TCheckConsistencyReq check_consistency_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -1248,9 +1238,8 @@ void TaskWorkerPool::_upload_worker_thread_callback() { TUploadReq upload_request; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -1300,9 +1289,8 @@ void TaskWorkerPool::_download_worker_thread_callback() { TDownloadReq download_request; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -1353,9 +1341,8 @@ void TaskWorkerPool::_make_snapshot_thread_callback() { TSnapshotRequest snapshot_request; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -1413,9 +1400,8 @@ void TaskWorkerPool::_release_snapshot_thread_callback() { TReleaseSnapshotRequest release_snapshot_request; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -1463,9 +1449,8 @@ void TaskWorkerPool::_move_dir_thread_callback() { TMoveDirReq move_dir_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -1571,9 +1556,8 @@ void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() { { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } @@ -1673,9 +1657,8 @@ void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() { TGetStoragePolicy get_storage_policy_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(worker_thread_lock); - } + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); if (!_is_work) { return; } diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index 8c264e57dd..29060613c8 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -49,101 +49,90 @@ public: // -- timeout_ms: 0 means wait indefinitely bool blocking_get(T* out, uint32_t timeout_ms = 0) { MonotonicStopWatch timer; + timer.start(); std::unique_lock<std::mutex> unique_lock(_lock); - - while (true) { - if (!_queue.empty()) { - // 定期提高队列中残留的任务优先级 - // 保证优先级较低的大查询不至于完全饿死 - if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { - std::priority_queue<T> tmp_queue; - while (!_queue.empty()) { - T v = _queue.top(); - _queue.pop(); - ++v; - tmp_queue.push(v); - } - swap(_queue, tmp_queue); - _upgrade_counter = 0; + bool wait_successful = false; + if (timeout_ms > 0) { + wait_successful = _get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms), + [this] { return _shutdown || !_queue.empty(); }); + } else { + _get_cv.wait(unique_lock, [this] { return _shutdown || !_queue.empty(); }); + wait_successful = true; + } + _total_get_wait_time += timer.elapsed_time(); + if (wait_successful) { + if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { + std::priority_queue<T> tmp_queue; + while (!_queue.empty()) { + T v = _queue.top(); + _queue.pop(); + ++v; + tmp_queue.push(v); } + swap(_queue, tmp_queue); + _upgrade_counter = 0; + } + if (!_queue.empty()) { *out = _queue.top(); _queue.pop(); ++_upgrade_counter; - _total_get_wait_time += timer.elapsed_time(); - unique_lock.unlock(); _put_cv.notify_one(); return true; - } - if (_shutdown) { - return false; - } - - timer.start(); - if (timeout_ms != 0) { - if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) == - std::cv_status::timeout) { - return false; - } } else { - _get_cv.wait(unique_lock); + assert(_shutdown); + return false; } - timer.stop(); + } else { + //time out + assert(!_shutdown); + return false; } } bool non_blocking_get(T* out) { MonotonicStopWatch timer; + timer.start(); std::unique_lock<std::mutex> unique_lock(_lock); - while (true) { - if (!_queue.empty()) { - // 定期提高队列中残留的任务优先级 - // 保证优先级较低的大查询不至于完全饿死 - if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { - std::priority_queue<T> tmp_queue; - while (!_queue.empty()) { - T v = _queue.top(); - _queue.pop(); - ++v; - tmp_queue.push(v); - } - swap(_queue, tmp_queue); - _upgrade_counter = 0; + if (!_queue.empty()) { + // 定期提高队列中残留的任务优先级 + // 保证优先级较低的大查询不至于完全饿死 + if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { + std::priority_queue<T> tmp_queue; + while (!_queue.empty()) { + T v = _queue.top(); + _queue.pop(); + ++v; + tmp_queue.push(v); } - *out = _queue.top(); - _queue.pop(); - ++_upgrade_counter; - _total_get_wait_time += timer.elapsed_time(); - unique_lock.unlock(); - _put_cv.notify_one(); - return true; + swap(_queue, tmp_queue); + _upgrade_counter = 0; } - if (_shutdown) { - return false; - } - return false; + *out = _queue.top(); + _queue.pop(); + ++_upgrade_counter; + _total_get_wait_time += timer.elapsed_time(); + _put_cv.notify_one(); + return true; } + + return false; } // Puts an element into the queue, waiting indefinitely until there is space. // If the queue is shut down, returns false. bool blocking_put(const T& val) { MonotonicStopWatch timer; + timer.start(); std::unique_lock<std::mutex> unique_lock(_lock); - - while (_queue.size() >= _max_element && !_shutdown) { - timer.start(); - _put_cv.wait(unique_lock); - timer.stop(); - } + _put_cv.wait(unique_lock, [this] { return _shutdown || _queue.size() < _max_element; }); _total_put_wait_time += timer.elapsed_time(); + if (_shutdown) { return false; } - DCHECK_LT(_queue.size(), _max_element); _queue.push(val); - unique_lock.unlock(); _get_cv.notify_one(); return true; } @@ -151,7 +140,7 @@ public: // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. void shutdown() { { - std::unique_lock<std::mutex> l(_lock); + std::lock_guard<std::mutex> l(_lock); _shutdown = true; } _get_cv.notify_all(); @@ -159,24 +148,18 @@ public: } uint32_t get_size() const { - std::unique_lock<std::mutex> l(_lock); + std::lock_guard<std::mutex> l(_lock); return _queue.size(); } // Returns the total amount of time threads have blocked in blocking_get. - uint64_t total_get_wait_time() const { - std::lock_guard<std::mutex> guard(_lock); - return _total_get_wait_time; - } + uint64_t total_get_wait_time() const { return _total_get_wait_time; } // Returns the total amount of time threads have blocked in blocking_put. - uint64_t total_put_wait_time() const { - std::lock_guard<std::mutex> guard(_lock); - return _total_put_wait_time; - } + uint64_t total_put_wait_time() const { return _total_put_wait_time; } private: - std::atomic<bool> _shutdown; + bool _shutdown; const int _max_element; std::condition_variable _get_cv; // 'get' callers wait on this std::condition_variable _put_cv; // 'put' callers wait on this @@ -184,8 +167,8 @@ private: mutable std::mutex _lock; std::priority_queue<T> _queue; int _upgrade_counter; - uint64_t _total_get_wait_time; - uint64_t _total_put_wait_time; + std::atomic<uint64_t> _total_get_wait_time; + std::atomic<uint64_t> _total_put_wait_time; }; } // namespace doris diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp index 7ba33c37f1..dff6911edf 100644 --- a/be/src/util/blocking_queue.hpp +++ b/be/src/util/blocking_queue.hpp @@ -22,6 +22,7 @@ #include <unistd.h> +#include <atomic> #include <condition_variable> #include <list> #include <mutex> @@ -47,25 +48,19 @@ public: // are no more elements available. bool blocking_get(T* out) { MonotonicStopWatch timer; + timer.start(); std::unique_lock<std::mutex> unique_lock(_lock); - - while (true) { - if (!_list.empty()) { - *out = _list.front(); - _list.pop_front(); - _total_get_wait_time += timer.elapsed_time(); - unique_lock.unlock(); - _put_cv.notify_one(); - return true; - } - - if (_shutdown) { - return false; - } - - timer.start(); - _get_cv.wait(unique_lock); - timer.stop(); + _get_cv.wait(unique_lock, [this] { return _shutdown || !_list.empty(); }); + _total_get_wait_time += timer.elapsed_time(); + + if (!_list.empty()) { + *out = _list.front(); + _list.pop_front(); + _put_cv.notify_one(); + return true; + } else { + assert(_shutdown); + return false; } } @@ -104,23 +99,16 @@ public: // If the queue is shut down, returns false. bool blocking_put(const T& val) { MonotonicStopWatch timer; + timer.start(); std::unique_lock<std::mutex> unique_lock(_lock); - - while (_list.size() >= _max_elements && !_shutdown) { - timer.start(); - _put_cv.wait(unique_lock); - timer.stop(); - } - + _put_cv.wait(unique_lock, [this] { return _shutdown || _list.size() < _max_elements; }); _total_put_wait_time += timer.elapsed_time(); if (_shutdown) { return false; } - DCHECK_LT(_list.size(), _max_elements); _list.push_back(val); - unique_lock.unlock(); _get_cv.notify_one(); return true; } @@ -137,21 +125,15 @@ public: } uint32_t get_size() const { - std::unique_lock<std::mutex> l(_lock); + std::lock_guard<std::mutex> l(_lock); return _list.size(); } // Returns the total amount of time threads have blocked in BlockingGet. - uint64_t total_get_wait_time() const { - std::lock_guard<std::mutex> guard(_lock); - return _total_get_wait_time; - } + uint64_t total_get_wait_time() const { return _total_get_wait_time; } // Returns the total amount of time threads have blocked in BlockingPut. - uint64_t total_put_wait_time() const { - std::lock_guard<std::mutex> guard(_lock); - return _total_put_wait_time; - } + uint64_t total_put_wait_time() const { return _total_put_wait_time; } private: uint32_t SizeLocked(const std::unique_lock<std::mutex>& lock) const { @@ -167,8 +149,8 @@ private: // _lock guards access to _list, total_get_wait_time, and total_put_wait_time mutable std::mutex _lock; std::list<T> _list; - uint64_t _total_get_wait_time; - uint64_t _total_put_wait_time; + std::atomic<uint64_t> _total_get_wait_time; + std::atomic<uint64_t> _total_put_wait_time; }; } // namespace doris diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index 881984ba38..76bc7e2171 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -148,9 +148,7 @@ void ThreadPoolToken::shutdown() { case State::QUIESCING: // The token is already quiescing. Just wait for a worker thread to // switch it to QUIESCED. - while (state() != State::QUIESCED) { - _not_running_cond.wait(l); - } + _not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; }); break; default: break; @@ -160,9 +158,7 @@ void ThreadPoolToken::shutdown() { void ThreadPoolToken::wait() { std::unique_lock<std::mutex> l(_pool->_lock); _pool->check_not_pool_thread_unlocked(); - while (is_active()) { - _not_running_cond.wait(l); - } + _not_running_cond.wait(l, [this]() { return !is_active(); }); } void ThreadPoolToken::transition(State new_state) { @@ -320,9 +316,8 @@ void ThreadPool::shutdown() { _idle_threads.front().not_empty.notify_one(); _idle_threads.pop_front(); } - while (_num_threads + _num_threads_pending_start > 0) { - _no_threads_cond.wait(l); - } + + _no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; }); // All the threads have exited. Check the state of each token. for (auto* t : _tokens) { @@ -465,9 +460,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token void ThreadPool::wait() { std::unique_lock<std::mutex> l(_lock); check_not_pool_thread_unlocked(); - while (_total_queued_tasks > 0 || _active_threads > 0) { - _idle_cond.wait(l); - } + _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; }); } void ThreadPool::dispatch_thread() { diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 45e106039f..7f6cd847bb 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -116,13 +116,16 @@ Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos _state->exec_env()->scanner_scheduler()->submit(this); } // Wait for block from queue - while (_process_status.ok() && !_is_finished && blocks_queue.empty()) { - if (_state->is_cancelled()) { - _process_status = Status::Cancelled("cancelled"); - break; - } + { SCOPED_TIMER(_parent->_scanner_wait_batch_timer); - _blocks_queue_added_cv.wait_for(l, std::chrono::seconds(1)); + _blocks_queue_added_cv.wait(l, [this]() { + return !blocks_queue.empty() || _is_finished || !_process_status.ok() || + _state->is_cancelled(); + }); + } + + if (_state->is_cancelled()) { + _process_status = Status::Cancelled("cancelled"); } if (!_process_status.ok()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org