liutang123 commented on code in PR #17615:
URL: https://github.com/apache/doris/pull/17615#discussion_r1136643695


##########
be/src/pipeline/task_queue.cpp:
##########
@@ -131,21 +132,151 @@ Status WorkTaskQueue::push(PipelineTask* task) {
 
 ////////////////// TaskQueue ////////////
 
-void TaskQueue::close() {
+
+////////////////// Resource Group ////////
+
+bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
+        const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& 
rhs_ptr) const {
+    int64_t lhs_val = lhs_ptr->vruntime_ns();
+    int64_t rhs_val = rhs_ptr->vruntime_ns();
+    return lhs_val < rhs_val;
+}
+
+TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : 
TaskQueue(core_size) {}
+
+TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
+
+void TaskGroupTaskQueue::close() {
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    _closed = true;
+    _wait_task.notify_all();
+}
+
+Status TaskGroupTaskQueue::push_back(PipelineTask* task) {
+    return _push_back<false>(task);
+}
+
+Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) {
+    return _push_back<true>(task);
+}
+
+template <bool from_executor>
+Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
+    auto* entry = task->get_task_group()->task_entity();
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    entry->push_back(task);
+    if (_groups.find(entry) == _groups.end()) {
+        _enqueue_task_group<from_executor>(entry);
+    }
+    _wait_task.notify_one();
+    return Status::OK();
+}
+
+// TODO pipeline support steal
+PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    taskgroup::TGEntityPtr entry = nullptr;
+    while (entry == nullptr) {
+        if (_closed) {
+            return nullptr;
+        }
+        if (_groups.empty()) {
+            _wait_task.wait(lock);
+        } else {
+            entry = _next_ts_entity();
+            if (!entry) {
+                _wait_task.wait_for(lock, 
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
+            }
+        }
+    }
+    DCHECK(entry->task_size() > 0);
+    if (entry->task_size() == 1) {
+        _dequeue_task_group(entry);
+    }
+    return entry->take();
+}
+
+template <bool from_worker>
+void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr ts_entity) 
{
+    _total_cpu_share += ts_entity->cpu_share();
+    if constexpr (!from_worker) {
+        auto* min_entity = _min_ts_entity.load();
+        if (!min_entity) {
+            int64_t new_vruntime_ns = min_entity->vruntime_ns() - 
_ideal_runtime_ns(ts_entity) / 2;
+            if (new_vruntime_ns > ts_entity->vruntime_ns()) {
+                ts_entity->adjust_vruntime_ns(new_vruntime_ns);
+            }
+        }
+    }
+    _groups.emplace(ts_entity);
+    _update_min_rg();
+}
+
+void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr ts_entity) 
{
+    _total_cpu_share -= ts_entity->cpu_share();
+    _groups.erase(ts_entity);
+    _update_min_rg();
+}
+
+void TaskGroupTaskQueue::_update_min_rg() {
+    auto* min_entity = _next_ts_entity();

Review Comment:
   `TGEntityPtr` is `TaskGroupEntity*`, why use auto instead of auto*?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to