levy5307 commented on code in PR #17615: URL: https://github.com/apache/doris/pull/17615#discussion_r1138280225
########## be/src/pipeline/task_queue.cpp: ########## @@ -131,21 +132,174 @@ Status WorkTaskQueue::push(PipelineTask* task) { ////////////////// TaskQueue //////////// -void TaskQueue::close() { + +////////////////// Resource Group //////// + +bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( + const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& rhs_ptr) const { + int64_t lhs_val = lhs_ptr->vruntime_ns(); + int64_t rhs_val = rhs_ptr->vruntime_ns(); + if (lhs_val != rhs_val) { + return lhs_val < rhs_val; + } else { + auto l_share = lhs_ptr->cpu_share(); + auto r_share = rhs_ptr->cpu_share(); + if (l_share != r_share) { + return l_share < rhs_val; + } else { + return lhs_ptr < rhs_ptr; + } + } +} + +TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : TaskQueue(core_size) {} + +TaskGroupTaskQueue::~TaskGroupTaskQueue() = default; + +void TaskGroupTaskQueue::close() { + std::unique_lock<std::mutex> lock(_rs_mutex); + _closed = true; + _wait_task.notify_all(); +} + +Status TaskGroupTaskQueue::push_back(PipelineTask* task) { + return _push_back<false>(task); +} + +Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) { + return _push_back<true>(task); +} + +template <bool from_executor> +Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { + auto* entity = task->get_task_group()->task_entity(); + std::unique_lock<std::mutex> lock(_rs_mutex); + entity->push_back(task); + if (_groups.find(entity) == _groups.end()) { + _enqueue_task_group<from_executor>(entity); + } + _wait_task.notify_one(); + return Status::OK(); +} + +// TODO pipeline support steal +PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { + std::unique_lock<std::mutex> lock(_rs_mutex); + taskgroup::TGEntityPtr entity = nullptr; + while (entity == nullptr) { + if (_closed) { + return nullptr; + } + if (_groups.empty()) { + _wait_task.wait(lock); + } else { + entity = _next_ts_entity(); + if (!entity) { + _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS)); + } + } + } + DCHECK(entity->task_size() > 0); + if (entity->task_size() == 1) { + _dequeue_task_group(entity); + } + return entity->take(); +} + +template <bool from_worker> +void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr ts_entity) { + _total_cpu_share += ts_entity->cpu_share(); + if constexpr (!from_worker) { + auto old_v_ns = ts_entity->vruntime_ns(); + auto* min_entity = _min_tg_entity.load(); + if (min_entity) { + int64_t new_vruntime_ns = min_entity->vruntime_ns() - _ideal_runtime_ns(ts_entity) / 2; Review Comment: Could you add an explaination to show why we should adjust vruntime here? ########## be/src/pipeline/task_queue.h: ########## @@ -79,36 +81,99 @@ class WorkTaskQueue { int _compute_level(PipelineTask* task); }; -// Need consider NUMA architecture class TaskQueue { public: - explicit TaskQueue(size_t core_size) : _core_size(core_size), _closed(false) { - _async_queue.reset(new WorkTaskQueue[core_size]); - } + TaskQueue(size_t core_size) : _core_size(core_size) {} + virtual ~TaskQueue(); + virtual void close() = 0; + // Get the task by core id. + // TODO: To think the logic is useful? + virtual PipelineTask* take(size_t core_id) = 0; - ~TaskQueue() = default; + // push from scheduler + virtual Status push_back(PipelineTask* task) = 0; - void close(); + virtual Status push_back(PipelineTask* task, size_t core_id) = 0; + + virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} + + int cores() const { return _core_size; } + +protected: + size_t _core_size; + static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; +}; + +class TaskGroupTaskQueue : public TaskQueue { +public: + explicit TaskGroupTaskQueue(size_t); + ~TaskGroupTaskQueue() override; + + void close() override; + + PipelineTask* take(size_t core_id) override; + + // from TaskScheduler or BlockedTaskScheduler + Status push_back(PipelineTask* task) override; + + // from worker + Status push_back(PipelineTask* task, size_t core_id) override; + + void update_statistics(PipelineTask* task, int64_t time_spent) override; + +private: + template <bool from_executor> + Status _push_back(PipelineTask* task); + template <bool from_worker> + void _enqueue_task_group(taskgroup::TGEntityPtr); + void _dequeue_task_group(taskgroup::TGEntityPtr); + taskgroup::TGEntityPtr _next_ts_entity(); + int64_t _ideal_runtime_ns(taskgroup::TGEntityPtr ts_entity) const; + void _update_min_tg(); + + static constexpr int64_t SCHEDULE_PERIOD_PER_WG_NS = 100'000'000; + + // Like cfs rb tree in sched_entity + struct TaskGroupSchedEntityComparator { + bool operator()(const taskgroup::TGEntityPtr&, const taskgroup::TGEntityPtr&) const; + }; + using ResouceGroupSet = std::set<taskgroup::TGEntityPtr, TaskGroupSchedEntityComparator>; + ResouceGroupSet _groups; Review Comment: It's better to name it `group_entities` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org