yiguolei commented on code in PR #50817:
URL: https://github.com/apache/doris/pull/50817#discussion_r2128854213
##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -54,41 +56,78 @@ PausedQuery::PausedQuery(std::shared_ptr<ResourceContext>
resource_ctx, double c
WorkloadGroupMgr::~WorkloadGroupMgr() = default;
-WorkloadGroupMgr::WorkloadGroupMgr() {
- _dummy_workload_group = std::make_shared<DummyWorkloadGroup>();
-}
+WorkloadGroupMgr::WorkloadGroupMgr() = default;
WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
- const WorkloadGroupInfo& workload_group_info) {
- {
- std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- if (LIKELY(_workload_groups.count(workload_group_info.id))) {
- auto workload_group = _workload_groups[workload_group_info.id];
- workload_group->check_and_update(workload_group_info);
- return workload_group;
+ const WorkloadGroupInfo& fe_wg_info) {
+ std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
+ // 1. update internal wg's id
+ if (fe_wg_info.name == INTERNAL_NORMAL_WG_NAME) {
+ WorkloadGroupPtr wg_ptr = nullptr;
+ uint64_t old_wg_id = -1;
+ int before_wg_size = _workload_groups.size();
+ for (auto& wg_pair : _workload_groups) {
+ uint64_t wg_id = wg_pair.first;
+ WorkloadGroupPtr wg = wg_pair.second;
+ if (INTERNAL_NORMAL_WG_NAME == wg->name() && wg_id !=
fe_wg_info.id) {
+ wg_ptr = wg_pair.second;
+ old_wg_id = wg_id;
+ break;
+ }
+ }
+ if (wg_ptr) {
+ _workload_groups.erase(old_wg_id);
+ wg_ptr->set_id(fe_wg_info.id);
+ _workload_groups[wg_ptr->id()] = wg_ptr;
+ LOG(INFO) << "[topic_publish_wg] normal wg id changed, before: "
<< old_wg_id
+ << ", after:" << wg_ptr->id() << ", wg size:" <<
before_wg_size << ", "
+ << _workload_groups.size();
}
}
- auto new_task_group = std::make_shared<WorkloadGroup>(workload_group_info);
- std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
- if (_workload_groups.contains(workload_group_info.id)) {
- auto workload_group = _workload_groups[workload_group_info.id];
- workload_group->check_and_update(workload_group_info);
+ // 2. check and update wg
+ if (LIKELY(_workload_groups.count(fe_wg_info.id))) {
+ auto workload_group = _workload_groups[fe_wg_info.id];
+ workload_group->check_and_update(fe_wg_info);
return workload_group;
}
- _workload_groups[workload_group_info.id] = new_task_group;
+
+ auto new_task_group = std::make_shared<WorkloadGroup>(fe_wg_info);
+ _workload_groups[fe_wg_info.id] = new_task_group;
return new_task_group;
}
-WorkloadGroupPtr WorkloadGroupMgr::get_group(uint64_t wg_id) {
- if (wg_id == DUMMY_WORKLOAD_GROUP_ID) {
- return _dummy_workload_group;
+WorkloadGroupPtr WorkloadGroupMgr::get_group(std::vector<uint64_t>& id_list) {
+ WorkloadGroupPtr ret_wg = nullptr;
+ int wg_cout = 0;
+ {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ for (auto& wg_id : id_list) {
+ if (_workload_groups.find(wg_id) != _workload_groups.end()) {
+ wg_cout++;
+ ret_wg = _workload_groups.at(wg_id);
+ }
+ }
}
- std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- if (_workload_groups.find(wg_id) != _workload_groups.end()) {
- return _workload_groups.at(wg_id);
+
+ if (wg_cout > 1) {
+ LOG(ERROR) << "Unexpected error: find too much wg in BE";
Review Comment:
这里得加一个DCHECK, 让流水线里遇到这个问题就直接挂掉,能发现问题。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]