github-actions[bot] commented on code in PR #16775: URL: https://github.com/apache/doris/pull/16775#discussion_r1106701569
########## be/src/util/priority_work_stealing_thread_pool.cpp: ########## @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/priority_work_stealing_thread_pool.hpp" + +#include <cstdio> +#include <mutex> +#include <utility> + +#include "util/blocking_priority_queue.hpp" +#include "util/thread.h" + +namespace doris { +PriorityWorkStealingThreadPool::PriorityWorkStealingThreadPool(uint32_t num_threads, + uint32_t num_queues, + uint32_t queue_size, + std::string name) + : PriorityThreadPool(0, 0, std::move(name)) { Review Comment: warning: passing result of std::move() as a const reference argument; no move will actually happen [performance-move-const-arg] ```suggestion : PriorityThreadPool(0, 0, name) { ``` ########## be/src/util/priority_work_stealing_thread_pool.hpp: ########## @@ -37,109 +39,31 @@ // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, uint32_t queue_size, - const std::string& name) - : PriorityThreadPool(0, 0, name) { - DCHECK_GT(num_queues, 0); - DCHECK_GE(num_threads, num_queues); - // init _work_queues first because the work thread needs it - for (int i = 0; i < num_queues; ++i) { - _work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size)); - } - for (int i = 0; i < num_threads; ++i) { - _threads.create_thread(std::bind<void>( - std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), this, i)); - } - } + std::string name); + virtual ~PriorityWorkStealingThreadPool(); - virtual ~PriorityWorkStealingThreadPool() { - shutdown(); - join(); - } - - // Blocking operation that puts a work item on the queue. If the queue is full, blocks - // until there is capacity available. - // - // 'work' is copied into the work queue, but may be referenced at any time in the - // future. Therefore the caller needs to ensure that any data referenced by work (if T - // is, e.g., a pointer type) remains valid until work has been processed, and it's up to - // the caller to provide their own signalling mechanism to detect this (or to wait until - // after DrainAndshutdown returns). - // - // Returns true if the work item was successfully added to the queue, false otherwise - // (which typically means that the thread pool has already been shut down). - bool offer(Task task) override { return _work_queues[task.queue_id]->blocking_put(task); } - - bool offer(WorkFunction func) override { - PriorityThreadPool::Task task = {0, func, 0}; - return _work_queues[task.queue_id]->blocking_put(task); - } + bool offer(Task task) override; + bool offer(WorkFunction func) override; // Shuts the thread pool down, causing the work queue to cease accepting offered work // and the worker threads to terminate once they have processed their current work item. // Returns once the shutdown flag has been set, does not wait for the threads to // terminate. - void shutdown() override { - PriorityThreadPool::shutdown(); - for (auto work_queue : _work_queues) { - work_queue->shutdown(); - } - } - - uint32_t get_queue_size() const override { - uint32_t size = 0; - for (auto work_queue : _work_queues) { - size += work_queue->get_size(); - } - return size; - } + void shutdown() override; // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. // Any work Offer()'ed during DrainAndshutdown may or may not be processed. - void drain_and_shutdown() override { - { - std::unique_lock l(_lock); - while (get_queue_size() != 0) { - _empty_cv.wait(l); - } - } - shutdown(); - join(); - } + void drain_and_shutdown(); Review Comment: warning: 'drain_and_shutdown' overrides a member function but is not marked 'override' [clang-diagnostic-inconsistent-missing-override] ```cpp void drain_and_shutdown(); ^ ``` **be/src/util/priority_thread_pool.hpp:104:** overridden virtual function is here ```cpp virtual void drain_and_shutdown() { ^ ``` ########## be/src/util/priority_work_stealing_thread_pool.hpp: ########## @@ -37,109 +39,31 @@ // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, uint32_t queue_size, - const std::string& name) - : PriorityThreadPool(0, 0, name) { - DCHECK_GT(num_queues, 0); - DCHECK_GE(num_threads, num_queues); - // init _work_queues first because the work thread needs it - for (int i = 0; i < num_queues; ++i) { - _work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size)); - } - for (int i = 0; i < num_threads; ++i) { - _threads.create_thread(std::bind<void>( - std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), this, i)); - } - } + std::string name); + virtual ~PriorityWorkStealingThreadPool(); - virtual ~PriorityWorkStealingThreadPool() { - shutdown(); - join(); - } - - // Blocking operation that puts a work item on the queue. If the queue is full, blocks - // until there is capacity available. - // - // 'work' is copied into the work queue, but may be referenced at any time in the - // future. Therefore the caller needs to ensure that any data referenced by work (if T - // is, e.g., a pointer type) remains valid until work has been processed, and it's up to - // the caller to provide their own signalling mechanism to detect this (or to wait until - // after DrainAndshutdown returns). - // - // Returns true if the work item was successfully added to the queue, false otherwise - // (which typically means that the thread pool has already been shut down). - bool offer(Task task) override { return _work_queues[task.queue_id]->blocking_put(task); } - - bool offer(WorkFunction func) override { - PriorityThreadPool::Task task = {0, func, 0}; - return _work_queues[task.queue_id]->blocking_put(task); - } + bool offer(Task task) override; + bool offer(WorkFunction func) override; // Shuts the thread pool down, causing the work queue to cease accepting offered work // and the worker threads to terminate once they have processed their current work item. // Returns once the shutdown flag has been set, does not wait for the threads to // terminate. - void shutdown() override { - PriorityThreadPool::shutdown(); - for (auto work_queue : _work_queues) { - work_queue->shutdown(); - } - } - - uint32_t get_queue_size() const override { - uint32_t size = 0; - for (auto work_queue : _work_queues) { - size += work_queue->get_size(); - } - return size; - } + void shutdown() override; // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. // Any work Offer()'ed during DrainAndshutdown may or may not be processed. - void drain_and_shutdown() override { - { - std::unique_lock l(_lock); - while (get_queue_size() != 0) { - _empty_cv.wait(l); - } - } - shutdown(); - join(); - } + void drain_and_shutdown(); Review Comment: warning: annotate this function with 'override' or (rarely) 'final' [modernize-use-override] ```suggestion void drain_and_shutdown() override; ``` ########## be/src/util/priority_work_stealing_thread_pool.hpp: ########## @@ -37,109 +39,31 @@ // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, uint32_t queue_size, - const std::string& name) - : PriorityThreadPool(0, 0, name) { - DCHECK_GT(num_queues, 0); - DCHECK_GE(num_threads, num_queues); - // init _work_queues first because the work thread needs it - for (int i = 0; i < num_queues; ++i) { - _work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size)); - } - for (int i = 0; i < num_threads; ++i) { - _threads.create_thread(std::bind<void>( - std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), this, i)); - } - } + std::string name); + virtual ~PriorityWorkStealingThreadPool(); - virtual ~PriorityWorkStealingThreadPool() { - shutdown(); - join(); - } - - // Blocking operation that puts a work item on the queue. If the queue is full, blocks - // until there is capacity available. - // - // 'work' is copied into the work queue, but may be referenced at any time in the - // future. Therefore the caller needs to ensure that any data referenced by work (if T - // is, e.g., a pointer type) remains valid until work has been processed, and it's up to - // the caller to provide their own signalling mechanism to detect this (or to wait until - // after DrainAndshutdown returns). - // - // Returns true if the work item was successfully added to the queue, false otherwise - // (which typically means that the thread pool has already been shut down). - bool offer(Task task) override { return _work_queues[task.queue_id]->blocking_put(task); } - - bool offer(WorkFunction func) override { - PriorityThreadPool::Task task = {0, func, 0}; - return _work_queues[task.queue_id]->blocking_put(task); - } + bool offer(Task task) override; + bool offer(WorkFunction func) override; // Shuts the thread pool down, causing the work queue to cease accepting offered work // and the worker threads to terminate once they have processed their current work item. // Returns once the shutdown flag has been set, does not wait for the threads to // terminate. - void shutdown() override { - PriorityThreadPool::shutdown(); - for (auto work_queue : _work_queues) { - work_queue->shutdown(); - } - } - - uint32_t get_queue_size() const override { - uint32_t size = 0; - for (auto work_queue : _work_queues) { - size += work_queue->get_size(); - } - return size; - } + void shutdown() override; // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. // Any work Offer()'ed during DrainAndshutdown may or may not be processed. - void drain_and_shutdown() override { - { - std::unique_lock l(_lock); - while (get_queue_size() != 0) { - _empty_cv.wait(l); - } - } - shutdown(); - join(); - } + void drain_and_shutdown(); + + uint32_t get_queue_size() const override; private: // Driver method for each thread in the pool. Continues to read work from the queue // until the pool is shutdown. - void work_thread(int thread_id) { - auto queue_id = thread_id % _work_queues.size(); - auto steal_queue_id = (queue_id + 1) % _work_queues.size(); - while (!is_shutdown()) { - Task task; - // avoid blocking get - bool is_other_queues_empty = true; - // steal work in round-robin if nothing to do - while (_work_queues[queue_id]->get_size() == 0 && queue_id != steal_queue_id && - !is_shutdown()) { - if (_work_queues[steal_queue_id]->non_blocking_get(&task)) { - is_other_queues_empty = false; - task.work_function(); - } - steal_queue_id = (steal_queue_id + 1) % _work_queues.size(); - } - if (queue_id == steal_queue_id) { - steal_queue_id = (steal_queue_id + 1) % _work_queues.size(); - } - if (is_other_queues_empty && - _work_queues[queue_id]->blocking_get( - &task, config::doris_blocking_priority_queue_wait_timeout_ms)) { - task.work_function(); - } - if (_work_queues[queue_id]->get_size() == 0) { - _empty_cv.notify_all(); - } - } - } + void work_thread(int thread_id); +private: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` **be/src/util/priority_work_stealing_thread_pool.hpp:60:** previously declared here ```cpp private: ^ ``` ########## be/src/util/priority_work_stealing_thread_pool.hpp: ########## @@ -37,109 +39,31 @@ class PriorityWorkStealingThreadPool : public PriorityThreadPool { // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, uint32_t queue_size, - const std::string& name) - : PriorityThreadPool(0, 0, name) { - DCHECK_GT(num_queues, 0); - DCHECK_GE(num_threads, num_queues); - // init _work_queues first because the work thread needs it - for (int i = 0; i < num_queues; ++i) { - _work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size)); - } - for (int i = 0; i < num_threads; ++i) { - _threads.create_thread(std::bind<void>( - std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), this, i)); - } - } + std::string name); + virtual ~PriorityWorkStealingThreadPool(); Review Comment: warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override] ```suggestion ~PriorityWorkStealingThreadPool() override; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org