github-actions[bot] commented on code in PR #16775:
URL: https://github.com/apache/doris/pull/16775#discussion_r1106701569


##########
be/src/util/priority_work_stealing_thread_pool.cpp:
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/priority_work_stealing_thread_pool.hpp"
+
+#include <cstdio>
+#include <mutex>
+#include <utility>
+
+#include "util/blocking_priority_queue.hpp"
+#include "util/thread.h"
+
+namespace doris {
+PriorityWorkStealingThreadPool::PriorityWorkStealingThreadPool(uint32_t 
num_threads,
+                                                               uint32_t 
num_queues,
+                                                               uint32_t 
queue_size,
+                                                               std::string 
name)
+        : PriorityThreadPool(0, 0, std::move(name)) {

Review Comment:
   warning: passing result of std::move() as a const reference argument; no 
move will actually happen [performance-move-const-arg]
   
   ```suggestion
           : PriorityThreadPool(0, 0, name) {
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -37,109 +39,31 @@
     //     queue exceeds this size, subsequent calls to Offer will block until 
there is
     //     capacity available.
     PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, 
uint32_t queue_size,
-                                   const std::string& name)
-            : PriorityThreadPool(0, 0, name) {
-        DCHECK_GT(num_queues, 0);
-        DCHECK_GE(num_threads, num_queues);
-        // init _work_queues first because the work thread needs it
-        for (int i = 0; i < num_queues; ++i) {
-            
_work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size));
-        }
-        for (int i = 0; i < num_threads; ++i) {
-            _threads.create_thread(std::bind<void>(
-                    std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), 
this, i));
-        }
-    }
+                                   std::string name);
+    virtual ~PriorityWorkStealingThreadPool();
 
-    virtual ~PriorityWorkStealingThreadPool() {
-        shutdown();
-        join();
-    }
-
-    // Blocking operation that puts a work item on the queue. If the queue is 
full, blocks
-    // until there is capacity available.
-    //
-    // 'work' is copied into the work queue, but may be referenced at any time 
in the
-    // future. Therefore the caller needs to ensure that any data referenced 
by work (if T
-    // is, e.g., a pointer type) remains valid until work has been processed, 
and it's up to
-    // the caller to provide their own signalling mechanism to detect this (or 
to wait until
-    // after DrainAndshutdown returns).
-    //
-    // Returns true if the work item was successfully added to the queue, 
false otherwise
-    // (which typically means that the thread pool has already been shut down).
-    bool offer(Task task) override { return 
_work_queues[task.queue_id]->blocking_put(task); }
-
-    bool offer(WorkFunction func) override {
-        PriorityThreadPool::Task task = {0, func, 0};
-        return _work_queues[task.queue_id]->blocking_put(task);
-    }
+    bool offer(Task task) override;
+    bool offer(WorkFunction func) override;
 
     // Shuts the thread pool down, causing the work queue to cease accepting 
offered work
     // and the worker threads to terminate once they have processed their 
current work item.
     // Returns once the shutdown flag has been set, does not wait for the 
threads to
     // terminate.
-    void shutdown() override {
-        PriorityThreadPool::shutdown();
-        for (auto work_queue : _work_queues) {
-            work_queue->shutdown();
-        }
-    }
-
-    uint32_t get_queue_size() const override {
-        uint32_t size = 0;
-        for (auto work_queue : _work_queues) {
-            size += work_queue->get_size();
-        }
-        return size;
-    }
+    void shutdown() override;
 
     // Blocks until the work queue is empty, and then calls shutdown to stop 
the worker
     // threads and Join to wait until they are finished.
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
-    void drain_and_shutdown() override {
-        {
-            std::unique_lock l(_lock);
-            while (get_queue_size() != 0) {
-                _empty_cv.wait(l);
-            }
-        }
-        shutdown();
-        join();
-    }
+    void drain_and_shutdown();

Review Comment:
   warning: 'drain_and_shutdown' overrides a member function but is not marked 
'override' [clang-diagnostic-inconsistent-missing-override]
   ```cpp
       void drain_and_shutdown();
            ^
   ```
   **be/src/util/priority_thread_pool.hpp:104:** overridden virtual function is 
here
   ```cpp
       virtual void drain_and_shutdown() {
                    ^
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -37,109 +39,31 @@
     //     queue exceeds this size, subsequent calls to Offer will block until 
there is
     //     capacity available.
     PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, 
uint32_t queue_size,
-                                   const std::string& name)
-            : PriorityThreadPool(0, 0, name) {
-        DCHECK_GT(num_queues, 0);
-        DCHECK_GE(num_threads, num_queues);
-        // init _work_queues first because the work thread needs it
-        for (int i = 0; i < num_queues; ++i) {
-            
_work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size));
-        }
-        for (int i = 0; i < num_threads; ++i) {
-            _threads.create_thread(std::bind<void>(
-                    std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), 
this, i));
-        }
-    }
+                                   std::string name);
+    virtual ~PriorityWorkStealingThreadPool();
 
-    virtual ~PriorityWorkStealingThreadPool() {
-        shutdown();
-        join();
-    }
-
-    // Blocking operation that puts a work item on the queue. If the queue is 
full, blocks
-    // until there is capacity available.
-    //
-    // 'work' is copied into the work queue, but may be referenced at any time 
in the
-    // future. Therefore the caller needs to ensure that any data referenced 
by work (if T
-    // is, e.g., a pointer type) remains valid until work has been processed, 
and it's up to
-    // the caller to provide their own signalling mechanism to detect this (or 
to wait until
-    // after DrainAndshutdown returns).
-    //
-    // Returns true if the work item was successfully added to the queue, 
false otherwise
-    // (which typically means that the thread pool has already been shut down).
-    bool offer(Task task) override { return 
_work_queues[task.queue_id]->blocking_put(task); }
-
-    bool offer(WorkFunction func) override {
-        PriorityThreadPool::Task task = {0, func, 0};
-        return _work_queues[task.queue_id]->blocking_put(task);
-    }
+    bool offer(Task task) override;
+    bool offer(WorkFunction func) override;
 
     // Shuts the thread pool down, causing the work queue to cease accepting 
offered work
     // and the worker threads to terminate once they have processed their 
current work item.
     // Returns once the shutdown flag has been set, does not wait for the 
threads to
     // terminate.
-    void shutdown() override {
-        PriorityThreadPool::shutdown();
-        for (auto work_queue : _work_queues) {
-            work_queue->shutdown();
-        }
-    }
-
-    uint32_t get_queue_size() const override {
-        uint32_t size = 0;
-        for (auto work_queue : _work_queues) {
-            size += work_queue->get_size();
-        }
-        return size;
-    }
+    void shutdown() override;
 
     // Blocks until the work queue is empty, and then calls shutdown to stop 
the worker
     // threads and Join to wait until they are finished.
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
-    void drain_and_shutdown() override {
-        {
-            std::unique_lock l(_lock);
-            while (get_queue_size() != 0) {
-                _empty_cv.wait(l);
-            }
-        }
-        shutdown();
-        join();
-    }
+    void drain_and_shutdown();

Review Comment:
   warning: annotate this function with 'override' or (rarely) 'final' 
[modernize-use-override]
   
   ```suggestion
       void drain_and_shutdown() override;
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -37,109 +39,31 @@
     //     queue exceeds this size, subsequent calls to Offer will block until 
there is
     //     capacity available.
     PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, 
uint32_t queue_size,
-                                   const std::string& name)
-            : PriorityThreadPool(0, 0, name) {
-        DCHECK_GT(num_queues, 0);
-        DCHECK_GE(num_threads, num_queues);
-        // init _work_queues first because the work thread needs it
-        for (int i = 0; i < num_queues; ++i) {
-            
_work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size));
-        }
-        for (int i = 0; i < num_threads; ++i) {
-            _threads.create_thread(std::bind<void>(
-                    std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), 
this, i));
-        }
-    }
+                                   std::string name);
+    virtual ~PriorityWorkStealingThreadPool();
 
-    virtual ~PriorityWorkStealingThreadPool() {
-        shutdown();
-        join();
-    }
-
-    // Blocking operation that puts a work item on the queue. If the queue is 
full, blocks
-    // until there is capacity available.
-    //
-    // 'work' is copied into the work queue, but may be referenced at any time 
in the
-    // future. Therefore the caller needs to ensure that any data referenced 
by work (if T
-    // is, e.g., a pointer type) remains valid until work has been processed, 
and it's up to
-    // the caller to provide their own signalling mechanism to detect this (or 
to wait until
-    // after DrainAndshutdown returns).
-    //
-    // Returns true if the work item was successfully added to the queue, 
false otherwise
-    // (which typically means that the thread pool has already been shut down).
-    bool offer(Task task) override { return 
_work_queues[task.queue_id]->blocking_put(task); }
-
-    bool offer(WorkFunction func) override {
-        PriorityThreadPool::Task task = {0, func, 0};
-        return _work_queues[task.queue_id]->blocking_put(task);
-    }
+    bool offer(Task task) override;
+    bool offer(WorkFunction func) override;
 
     // Shuts the thread pool down, causing the work queue to cease accepting 
offered work
     // and the worker threads to terminate once they have processed their 
current work item.
     // Returns once the shutdown flag has been set, does not wait for the 
threads to
     // terminate.
-    void shutdown() override {
-        PriorityThreadPool::shutdown();
-        for (auto work_queue : _work_queues) {
-            work_queue->shutdown();
-        }
-    }
-
-    uint32_t get_queue_size() const override {
-        uint32_t size = 0;
-        for (auto work_queue : _work_queues) {
-            size += work_queue->get_size();
-        }
-        return size;
-    }
+    void shutdown() override;
 
     // Blocks until the work queue is empty, and then calls shutdown to stop 
the worker
     // threads and Join to wait until they are finished.
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
-    void drain_and_shutdown() override {
-        {
-            std::unique_lock l(_lock);
-            while (get_queue_size() != 0) {
-                _empty_cv.wait(l);
-            }
-        }
-        shutdown();
-        join();
-    }
+    void drain_and_shutdown();
+
+    uint32_t get_queue_size() const override;
 
 private:
     // Driver method for each thread in the pool. Continues to read work from 
the queue
     // until the pool is shutdown.
-    void work_thread(int thread_id) {
-        auto queue_id = thread_id % _work_queues.size();
-        auto steal_queue_id = (queue_id + 1) % _work_queues.size();
-        while (!is_shutdown()) {
-            Task task;
-            // avoid blocking get
-            bool is_other_queues_empty = true;
-            // steal work in round-robin if nothing to do
-            while (_work_queues[queue_id]->get_size() == 0 && queue_id != 
steal_queue_id &&
-                   !is_shutdown()) {
-                if (_work_queues[steal_queue_id]->non_blocking_get(&task)) {
-                    is_other_queues_empty = false;
-                    task.work_function();
-                }
-                steal_queue_id = (steal_queue_id + 1) % _work_queues.size();
-            }
-            if (queue_id == steal_queue_id) {
-                steal_queue_id = (steal_queue_id + 1) % _work_queues.size();
-            }
-            if (is_other_queues_empty &&
-                _work_queues[queue_id]->blocking_get(
-                        &task, 
config::doris_blocking_priority_queue_wait_timeout_ms)) {
-                task.work_function();
-            }
-            if (_work_queues[queue_id]->get_size() == 0) {
-                _empty_cv.notify_all();
-            }
-        }
-    }
+    void work_thread(int thread_id);
 
+private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the 
previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/util/priority_work_stealing_thread_pool.hpp:60:** previously 
declared here
   ```cpp
   private:
   ^
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -37,109 +39,31 @@ class PriorityWorkStealingThreadPool : public 
PriorityThreadPool {
     //     queue exceeds this size, subsequent calls to Offer will block until 
there is
     //     capacity available.
     PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, 
uint32_t queue_size,
-                                   const std::string& name)
-            : PriorityThreadPool(0, 0, name) {
-        DCHECK_GT(num_queues, 0);
-        DCHECK_GE(num_threads, num_queues);
-        // init _work_queues first because the work thread needs it
-        for (int i = 0; i < num_queues; ++i) {
-            
_work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size));
-        }
-        for (int i = 0; i < num_threads; ++i) {
-            _threads.create_thread(std::bind<void>(
-                    std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), 
this, i));
-        }
-    }
+                                   std::string name);
+    virtual ~PriorityWorkStealingThreadPool();

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' 
[modernize-use-override]
   
   ```suggestion
       ~PriorityWorkStealingThreadPool() override;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to