This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new be7273da83e [refactor](executor)Refactor workload meta update to be
#26710
be7273da83e is described below
commit be7273da83e12bdc2d2c2d3d2acf9e05f20268dc
Author: wangbo <[email protected]>
AuthorDate: Sat Nov 18 11:19:38 2023 +0800
[refactor](executor)Refactor workload meta update to be #26710
---
be/src/agent/cgroup_cpu_ctl.cpp | 26 +++---
be/src/agent/cgroup_cpu_ctl.h | 3 +
be/src/agent/topic_listener.h | 2 +-
be/src/agent/topic_subscriber.cpp | 7 +-
be/src/agent/workload_group_listener.cpp | 40 +++++++--
be/src/agent/workload_group_listener.h | 2 +-
be/src/common/config.cpp | 4 +-
be/src/common/config.h | 4 +-
be/src/runtime/fragment_mgr.cpp | 53 ++++--------
be/src/runtime/task_group/task_group.cpp | 97 +++++++++++++++-------
be/src/runtime/task_group/task_group.h | 12 ++-
be/src/runtime/task_group/task_group_manager.cpp | 73 +++++++++++-----
be/src/runtime/task_group/task_group_manager.h | 14 +++-
.../common/publish/WorkloadGroupPublisher.java | 8 +-
.../resource/workloadgroup/WorkloadGroup.java | 47 ++++++++---
.../resource/workloadgroup/WorkloadGroupMgr.java | 7 --
.../workloadgroup/WorkloadGroupMgrTest.java | 16 ++--
gensrc/thrift/BackendService.thrift | 17 +++-
18 files changed, 277 insertions(+), 155 deletions(-)
diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index d16a32b7be5..ac09725cb23 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -25,14 +25,14 @@ Status CgroupCpuCtl::init() {
_doris_cgroup_cpu_path = config::doris_cgroup_cpu_path;
if (_doris_cgroup_cpu_path.empty()) {
LOG(INFO) << "doris cgroup cpu path is not specify, path=" <<
_doris_cgroup_cpu_path;
- return Status::InternalError("doris cgroup cpu path {} is not
specify.",
- _doris_cgroup_cpu_path);
+ return Status::InternalError<false>("doris cgroup cpu path {} is not
specify.",
+ _doris_cgroup_cpu_path);
}
if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) {
LOG(ERROR) << "doris cgroup cpu path not exists, path=" <<
_doris_cgroup_cpu_path;
- return Status::InternalError("doris cgroup cpu path {} not exists.",
- _doris_cgroup_cpu_path);
+ return Status::InternalError<false>("doris cgroup cpu path {} not
exists.",
+ _doris_cgroup_cpu_path);
}
if (_doris_cgroup_cpu_path.back() != '/') {
@@ -41,6 +41,12 @@ Status CgroupCpuCtl::init() {
return Status::OK();
}
+void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t*
cpu_hard_limit) {
+ std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
+ *cpu_shares = this->_cpu_shares;
+ *cpu_hard_limit = this->_cpu_hard_limit;
+}
+
void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) {
if (!_init_succ) {
return;
@@ -72,14 +78,14 @@ Status CgroupCpuCtl::write_cg_sys_file(std::string
file_path, int value, std::st
int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
if (fd == -1) {
LOG(ERROR) << "open path failed, path=" << file_path;
- return Status::InternalError("open path failed, path={}", file_path);
+ return Status::InternalError<false>("open path failed, path={}",
file_path);
}
auto str = fmt::format("{}\n", value);
int ret = write(fd, str.c_str(), str.size());
if (ret == -1) {
LOG(ERROR) << msg << " write sys file failed";
- return Status::InternalError("{} write sys file failed", msg);
+ return Status::InternalError<false>("{} write sys file failed", msg);
}
LOG(INFO) << msg << " success";
return Status::OK();
@@ -94,8 +100,8 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir query failed, path=" <<
_cgroup_v1_cpu_query_path;
- return Status::InternalError("cgroup v1 mkdir query failed, path=",
- _cgroup_v1_cpu_query_path);
+ return Status::InternalError<false>("cgroup v1 mkdir query failed,
path=",
+ _cgroup_v1_cpu_query_path);
}
}
@@ -105,8 +111,8 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" <<
_cgroup_v1_cpu_tg_path;
- return Status::InternalError("cgroup v1 mkdir workload group
failed, path=",
- _cgroup_v1_cpu_tg_path);
+ return Status::InternalError<false>("cgroup v1 mkdir workload
group failed, path=",
+ _cgroup_v1_cpu_tg_path);
}
}
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index b98e268da09..4d78ca82ab7 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -47,6 +47,9 @@ public:
void update_cpu_soft_limit(int cpu_shares);
+ // for log
+ void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit);
+
protected:
Status write_cg_sys_file(std::string file_path, int value, std::string
msg, bool is_append);
diff --git a/be/src/agent/topic_listener.h b/be/src/agent/topic_listener.h
index af99a78545b..40cf0ba2a21 100644
--- a/be/src/agent/topic_listener.h
+++ b/be/src/agent/topic_listener.h
@@ -25,6 +25,6 @@ class TopicListener {
public:
virtual ~TopicListener() {}
- virtual void handle_topic_info(const TPublishTopicRequest& topic_request)
= 0;
+ virtual void handle_topic_info(const std::vector<TopicInfo>&
topic_info_list) = 0;
};
} // namespace doris
diff --git a/be/src/agent/topic_subscriber.cpp
b/be/src/agent/topic_subscriber.cpp
index c3bcc29c623..c29533bf617 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -42,8 +42,11 @@ void TopicSubscriber::handle_topic_info(const
TPublishTopicRequest& topic_reques
std::shared_lock lock(_listener_mtx);
LOG(INFO) << "begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
- listener_pair.second->handle_topic_info(topic_request);
- LOG(INFO) << "handle topic " << listener_pair.first << " succ";
+ if (topic_request.topic_map.find(listener_pair.first) !=
topic_request.topic_map.end()) {
+ listener_pair.second->handle_topic_info(
+ topic_request.topic_map.at(listener_pair.first));
+ LOG(INFO) << "handle topic " << listener_pair.first << " succ";
+ }
}
}
} // namespace doris
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index bf27861c284..bc1d3294f6c 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -24,18 +24,44 @@
namespace doris {
-void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest&
topic_request) {
+void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>&
topic_info_list) {
std::set<uint64_t> current_wg_ids;
- for (const TopicInfo& topic_info : topic_request.topic_list) {
- if (topic_info.topic_type !=
doris::TTopicInfoType::type::WORKLOAD_GROUP) {
+ for (const TopicInfo& topic_info : topic_info_list) {
+ if (!topic_info.__isset.workload_group_info) {
continue;
}
- int wg_id = 0;
- auto iter2 = topic_info.info_map.find("id");
- std::from_chars(iter2->second.c_str(), iter2->second.c_str() +
iter2->second.size(), wg_id);
+ // 1 parse topicinfo to group info
+ taskgroup::TaskGroupInfo task_group_info;
+ Status ret =
taskgroup::TaskGroupInfo::parse_topic_info(topic_info.workload_group_info,
+
&task_group_info);
+ if (!ret.ok()) {
+ LOG(INFO) << "parse topic info failed, tg_id=" <<
task_group_info.id
+ << ", reason:" << ret.to_string();
+ continue;
+ }
+ current_wg_ids.insert(task_group_info.id);
+
+ // 2 update task group
+ auto tg =
_exec_env->task_group_manager()->get_or_create_task_group(task_group_info);
+
+ // 3 set cpu soft hard limit switch
+ _exec_env->task_group_manager()->_enable_cpu_hard_limit.store(
+ task_group_info.enable_cpu_hard_limit);
+
+ // 4 create and update task scheduler
+ Status ret2 =
+
_exec_env->task_group_manager()->upsert_task_scheduler(&task_group_info,
_exec_env);
+ if (!ret2.ok()) {
+ LOG(WARNING) << "upsert task sche failed, tg_id=" <<
task_group_info.id
+ << ", reason=" << ret2.to_string();
+ }
- current_wg_ids.insert(wg_id);
+ LOG(INFO) << "update task group success, tg info=" <<
tg->debug_string()
+ << ", enable_cpu_hard_limit="
+ << _exec_env->task_group_manager()->enable_cpu_hard_limit()
+ << ", cgroup cpu_shares=" <<
task_group_info.cgroup_cpu_shares
+ << ", cgroup cpu_hard_limit=" <<
task_group_info.cgroup_cpu_hard_limit;
}
_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
diff --git a/be/src/agent/workload_group_listener.h
b/be/src/agent/workload_group_listener.h
index d31b1c4ef65..732f5752e44 100644
--- a/be/src/agent/workload_group_listener.h
+++ b/be/src/agent/workload_group_listener.h
@@ -29,7 +29,7 @@ public:
~WorkloadGroupListener() {}
WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {}
- void handle_topic_info(const TPublishTopicRequest& topic_request) override;
+ void handle_topic_info(const std::vector<TopicInfo>& topic_info_list)
override;
private:
ExecEnv* _exec_env;
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 870ba5c9b0a..5f867429bff 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1090,8 +1090,8 @@ DEFINE_Bool(exit_on_exception, "false");
DEFINE_Bool(enable_flush_file_cache_async, "true");
// cgroup
-DEFINE_String(doris_cgroup_cpu_path, "");
-DEFINE_Bool(enable_cgroup_cpu_soft_limit, "false");
+DEFINE_mString(doris_cgroup_cpu_path, "");
+DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ac560d1f1c6..6b76d37387e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1163,8 +1163,8 @@ DECLARE_mInt32(tablet_schema_cache_recycle_interval);
DECLARE_mBool(exit_on_exception);
// cgroup
-DECLARE_String(doris_cgroup_cpu_path);
-DECLARE_Bool(enable_cgroup_cpu_soft_limit);
+DECLARE_mString(doris_cgroup_cpu_path);
+DECLARE_mBool(enable_cgroup_cpu_soft_limit);
// This config controls whether the s3 file writer would flush cache
asynchronously
DECLARE_Bool(enable_flush_file_cache_async);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a7a89bfd421..210de93a7ec 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -654,45 +654,28 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
- taskgroup::TaskGroupInfo task_group_info;
- auto status =
taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0],
-
&task_group_info);
- if (status.ok()) {
- auto tg =
_exec_env->task_group_manager()->get_or_create_task_group(
- task_group_info);
- tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
- uint64_t tg_id = tg->id();
- std::string tg_name = tg->name();
- LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
- << " use task group: " << tg->debug_string()
- << " cpu_hard_limit: " <<
task_group_info.cpu_hard_limit
- << " cpu_share:" << task_group_info.cpu_share
- << " enable cgroup soft cpu:" <<
config::enable_cgroup_cpu_soft_limit;
- if (task_group_info.cpu_hard_limit > 0) {
- Status ret =
_exec_env->task_group_manager()->create_and_get_task_scheduler(
- tg_id, tg_name, task_group_info.cpu_hard_limit,
- task_group_info.cpu_share, _exec_env,
query_ctx.get());
- if (!ret.ok()) {
- LOG(INFO) << "workload group init failed "
- << ", name=" << tg_name << ", id=" <<
tg_id
- << ", reason=" << ret.to_string();
- }
+ uint64_t tg_id = params.workload_groups[0].id;
+ auto* tg_mgr = _exec_env->task_group_manager();
+ if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id))
{
+ std::stringstream ss;
+ ss << "Query/load id: " << print_id(query_ctx->query_id());
+ ss << " use task group " << task_group_ptr->debug_string();
+ if (tg_mgr->enable_cpu_soft_limit() &&
!config::enable_cgroup_cpu_soft_limit) {
+ query_ctx->set_task_group(task_group_ptr);
+ ss << ", cpu soft limit based doris sche";
} else {
- if (!config::enable_cgroup_cpu_soft_limit) {
- query_ctx->set_task_group(tg);
+ bool ret = tg_mgr->set_task_sche_for_query_ctx(tg_id,
query_ctx.get());
+ if (tg_mgr->enable_cpu_hard_limit()) {
+ ss << ", cpu hard limit based cgroup";
} else {
- Status ret =
-
_exec_env->task_group_manager()->create_and_get_task_scheduler(
- tg_id, tg_name,
task_group_info.cpu_hard_limit,
- task_group_info.cpu_share,
_exec_env, query_ctx.get());
- if (!ret.ok()) {
- LOG(INFO) << "workload group cpu soft limit
init failed "
- << ", name=" << tg_name << ", id="
<< tg_id
- << ", reason=" << ret.to_string();
- }
+ ss << ", cpu soft limit based cgroup";
+ }
+ if (!ret) {
+ ss << ", but cgroup init failed, scan or exec
fallback to no group";
}
}
- }
+ LOG(INFO) << ss.str();
+ } // else, query run with no group
} else {
VLOG_DEBUG << "Query/load id: " <<
print_id(query_ctx->query_id())
<< " does not use task group.";
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index 37e4b9ae597..5878e38ed38 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -113,15 +113,16 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
_cpu_share(tg_info.cpu_share),
_task_entity(this, "pipeline task entity"),
_local_scan_entity(this, "local scan entity"),
- _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
+ _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
+ _cpu_hard_limit(tg_info.cpu_hard_limit) {}
std::string TaskGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
return fmt::format(
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {},
enable_memory_overcommit = "
- "{}, version = {}]",
+ "{}, version = {}, cpu_hard_limit = {}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit,
TUnit::BYTES),
- _enable_memory_overcommit ? "true" : "false", _version);
+ _enable_memory_overcommit ? "true" : "false", _version,
cpu_hard_limit());
}
void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
@@ -142,6 +143,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo&
tg_info) {
_memory_limit = tg_info.memory_limit;
_enable_memory_overcommit = tg_info.enable_memory_overcommit;
_cpu_share = tg_info.cpu_share;
+ _cpu_hard_limit = tg_info.cpu_hard_limit;
} else {
return;
}
@@ -185,49 +187,80 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info)
const {
tg_info->version = _version;
}
-Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup&
resource_group,
- TaskGroupInfo* task_group_info) {
- if (UNLIKELY(!check_group_info(resource_group))) {
- std::stringstream ss;
- ss << "incomplete resource group parameters: ";
- resource_group.printTo(ss);
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
+Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo&
workload_group_info,
+ taskgroup::TaskGroupInfo*
task_group_info) {
+ // 1 id
+ int tg_id = 0;
+ if (workload_group_info.__isset.id) {
+ tg_id = workload_group_info.id;
+ } else {
+ return Status::InternalError<false>("workload group id is required");
}
+ task_group_info->id = tg_id;
- auto iter = resource_group.properties.find(CPU_SHARE);
- uint64_t share = 0;
- std::from_chars(iter->second.c_str(), iter->second.c_str() +
iter->second.size(), share);
+ // 2 name
+ std::string name = "INVALID_NAME";
+ if (workload_group_info.__isset.name) {
+ name = workload_group_info.name;
+ }
+ task_group_info->name = name;
+
+ // 3 version
+ int version = 0;
+ if (workload_group_info.__isset.version) {
+ version = workload_group_info.version;
+ } else {
+ return Status::InternalError<false>("workload group version is
required");
+ }
+ task_group_info->version = version;
- int cpu_hard_limit = 0;
- auto iter2 = resource_group.properties.find(CPU_HARD_LIMIT);
- std::from_chars(iter2->second.c_str(), iter2->second.c_str() +
iter2->second.size(),
- cpu_hard_limit);
+ // 4 cpu_share
+ uint64_t cpu_share = 1024;
+ if (workload_group_info.__isset.cpu_share) {
+ cpu_share = workload_group_info.cpu_share;
+ }
+ task_group_info->cpu_share = cpu_share;
- task_group_info->id = resource_group.id;
- task_group_info->name = resource_group.name;
- task_group_info->version = resource_group.version;
- task_group_info->cpu_share = share;
+ // 5 cpu hard limit
+ int cpu_hard_limit = -1;
+ if (workload_group_info.__isset.cpu_hard_limit) {
+ cpu_hard_limit = workload_group_info.cpu_hard_limit;
+ }
task_group_info->cpu_hard_limit = cpu_hard_limit;
+ // 6 mem_limit
bool is_percent = true;
- auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second;
- auto mem_limit =
+ std::string mem_limit_str;
+ if (workload_group_info.__isset.mem_limit) {
+ mem_limit_str = workload_group_info.mem_limit;
+ } else {
+ return Status::InternalError<false>("workload group mem_limit is
required");
+ }
+ int64_t mem_limit =
ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(),
&is_percent);
if (UNLIKELY(mem_limit <= 0)) {
std::stringstream ss;
- ss << "parse memory limit from TPipelineWorkloadGroup error, " <<
MEMORY_LIMIT << ": "
- << mem_limit_str;
+ ss << "parse memory limit error, " << MEMORY_LIMIT << ": " <<
mem_limit_str;
LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
+ return Status::InternalError<false>("invalid value for {}, val={}",
MEMORY_LIMIT,
+ mem_limit);
}
task_group_info->memory_limit = mem_limit;
- auto enable_memory_overcommit_iter =
resource_group.properties.find(ENABLE_MEMORY_OVERCOMMIT);
- task_group_info->enable_memory_overcommit =
- enable_memory_overcommit_iter != resource_group.properties.end() &&
- enable_memory_overcommit_iter->second ==
- "true" /* fe guarantees it is 'true' or 'false' */;
+ // 7 mem overcommit
+ bool enable_memory_overcommit = true;
+ if (workload_group_info.__isset.enable_memory_overcommit) {
+ enable_memory_overcommit =
workload_group_info.enable_memory_overcommit;
+ }
+ task_group_info->enable_memory_overcommit = enable_memory_overcommit;
+
+ // 8 cpu soft limit or hard limit
+ bool enable_cpu_hard_limit = false;
+ if (workload_group_info.__isset.enable_cpu_hard_limit) {
+ enable_cpu_hard_limit = workload_group_info.enable_cpu_hard_limit;
+ }
+ task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;
+
return Status::OK();
}
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index a948bf53ec0..b3daf582575 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -17,6 +17,7 @@
#pragma once
+#include <gen_cpp/BackendService_types.h>
#include <stddef.h>
#include <stdint.h>
@@ -106,6 +107,8 @@ public:
uint64_t cpu_share() const { return _cpu_share.load(); }
+ int cpu_hard_limit() const { return _cpu_hard_limit.load(); }
+
uint64_t id() const { return _id; }
std::string name() const { return _name; };
@@ -147,6 +150,7 @@ private:
TaskGroupPipelineTaskEntity _task_entity;
TaskGroupScanTaskEntity _local_scan_entity;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
+ std::atomic<int> _cpu_hard_limit;
};
using TaskGroupPtr = std::shared_ptr<TaskGroup>;
@@ -159,9 +163,13 @@ struct TaskGroupInfo {
bool enable_memory_overcommit;
int64_t version;
int cpu_hard_limit;
+ bool enable_cpu_hard_limit;
+ // log cgroup cpu info
+ uint64_t cgroup_cpu_shares = 0;
+ uint64_t cgroup_cpu_hard_limit = 0;
- static Status parse_group_info(const TPipelineWorkloadGroup&
resource_group,
- TaskGroupInfo* task_group_info);
+ static Status parse_topic_info(const TWorkloadGroupInfo& topic_info,
+ taskgroup::TaskGroupInfo* task_group_info);
private:
static bool check_group_info(const TPipelineWorkloadGroup& resource_group);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index fb940069787..f37ed7ff711 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -59,10 +59,38 @@ void TaskGroupManager::get_resource_groups(const
std::function<bool(const TaskGr
}
}
-Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id,
std::string tg_name,
- int cpu_hard_limit, int
cpu_shares,
- ExecEnv* exec_env,
- QueryContext*
query_ctx_ptr) {
+TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ if (_task_groups.find(tg_id) != _task_groups.end()) {
+ return _task_groups.at(tg_id);
+ }
+ return nullptr;
+}
+
+bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t tg_id,
QueryContext* query_ctx_ptr) {
+ std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+ if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
+ query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
+ } else {
+ return false;
+ }
+
+ if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
+
query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_map.at(tg_id).get());
+ } else {
+ return false;
+ }
+ return true;
+}
+
+Status TaskGroupManager::upsert_task_scheduler(taskgroup::TaskGroupInfo*
tg_info,
+ ExecEnv* exec_env) {
+ uint64_t tg_id = tg_info->id;
+ std::string tg_name = tg_info->name;
+ int cpu_hard_limit = tg_info->cpu_hard_limit;
+ uint64_t cpu_shares = tg_info->cpu_share;
+ bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
+
std::lock_guard<std::mutex> lock(_task_scheduler_lock);
// step 1: init cgroup cpu controller
CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
@@ -73,7 +101,7 @@ Status
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
_cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
} else {
- return Status::Error<INTERNAL_ERROR, false>("cgroup init failed,
gid={}", tg_id);
+ return Status::InternalError<false>("cgroup init failed, gid={}",
tg_id);
}
}
@@ -92,8 +120,7 @@ Status
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
if (ret.ok()) {
_tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
} else {
- return Status::Error<INTERNAL_ERROR, false>("task scheduler start
failed, gid={}",
- tg_id);
+ return Status::InternalError<false>("task scheduler start failed,
gid={}", tg_id);
}
}
@@ -105,27 +132,27 @@ Status
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
if (ret.ok()) {
_tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
} else {
- return Status::Error<INTERNAL_ERROR, false>("scan scheduler start
failed, gid={}",
- tg_id);
+ return Status::InternalError<false>("scan scheduler start failed,
gid={}", tg_id);
}
}
- // step 4 set exec/scan task scheudler to query ctx
- pipeline::TaskScheduler* task_sche = _tg_sche_map.at(tg_id).get();
- query_ctx_ptr->set_task_scheduler(task_sche);
-
- vectorized::SimplifiedScanScheduler* scan_task_sche =
_tg_scan_sche_map.at(tg_id).get();
- query_ctx_ptr->set_scan_task_scheduler(scan_task_sche);
-
- // step 5 update cgroup cpu if needed
- if (cpu_hard_limit > 0) {
- _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
-
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+ // step 4 update cgroup cpu if needed
+ if (enable_cpu_hard_limit) {
+ if (cpu_hard_limit > 0) {
+ _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
+
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+ } else {
+ return Status::InternalError<false>("enable cpu hard limit but
value is illegal");
+ }
} else {
- _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
- _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
- CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
+ if (config::enable_cgroup_cpu_soft_limit) {
+ _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
+ _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
+ CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
+ }
}
+
_cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
+
&(tg_info->cgroup_cpu_hard_limit));
return Status::OK();
}
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index cf44f535440..552cddfe9dc 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -51,14 +51,22 @@ public:
void get_resource_groups(const std::function<bool(const TaskGroupPtr&
ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);
- Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name,
int cpu_hard_limit,
- int cpu_shares, ExecEnv* exec_env,
- QueryContext* query_ctx_ptr);
+ Status upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv*
exec_env);
void delete_task_group_by_ids(std::set<uint64_t> id_set);
+ TaskGroupPtr get_task_group_by_id(uint64_t tg_id);
+
void stop();
+ std::atomic<bool> _enable_cpu_hard_limit = false;
+
+ bool enable_cpu_soft_limit() { return !_enable_cpu_hard_limit.load(); }
+
+ bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
+
+ bool set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext*
query_ctx_ptr);
+
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
index 2330700ce7b..6c5ce9e4c11 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
@@ -19,7 +19,7 @@ package org.apache.doris.common.publish;
import org.apache.doris.catalog.Env;
import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TopicInfo;
+import org.apache.doris.thrift.TTopicInfoType;
public class WorkloadGroupPublisher implements TopicPublisher {
@@ -31,9 +31,7 @@ public class WorkloadGroupPublisher implements TopicPublisher
{
@Override
public void getTopicInfo(TPublishTopicRequest req) {
- for (TopicInfo topicInfo : env.getWorkloadGroupMgr()
- .getPublishTopicInfo()) {
- req.addToTopicList(topicInfo);
- }
+ req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
+ env.getWorkloadGroupMgr().getPublishTopicInfo());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 84a526aa7f0..53faa1a9133 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -18,6 +18,7 @@
package org.apache.doris.resource.workloadgroup;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -25,7 +26,7 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
-import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
@@ -311,20 +312,44 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
}
public TPipelineWorkloadGroup toThrift() {
- //note(wb) we need add a new key-value to properties and then transfer
it to be, so need a copy here
- // see WorkloadGroupMgr.getWorkloadGroup
- HashMap<String, String> clonedHashMap = new HashMap<>();
- clonedHashMap.putAll(properties);
- return new
TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version);
+ return new TPipelineWorkloadGroup().setId(id);
}
public TopicInfo toTopicInfo() {
- HashMap<String, String> newHashMap = new HashMap<>();
- newHashMap.put("id", String.valueOf(id));
+ TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
+ tWorkloadGroupInfo.setId(id);
+ tWorkloadGroupInfo.setName(name);
+ tWorkloadGroupInfo.setVersion(version);
+
+ String cpuShareStr = properties.get(CPU_SHARE);
+ if (cpuShareStr != null) {
+ tWorkloadGroupInfo.setCpuShare(Long.valueOf(cpuShareStr));
+ }
+
+ String cpuHardLimitStr = properties.get(CPU_HARD_LIMIT);
+ if (cpuHardLimitStr != null) {
+
tWorkloadGroupInfo.setCpuHardLimit(Integer.valueOf(cpuHardLimitStr));
+ }
+
+ String memLimitStr = properties.get(MEMORY_LIMIT);
+ if (memLimitStr != null) {
+ tWorkloadGroupInfo.setMemLimit(memLimitStr);
+ }
+ String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT);
+ if (memOvercommitStr != null) {
+
tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
+ }
+ // enable_cpu_hard_limit = true, using cpu hard limit
+ // enable_cpu_hard_limit = false, using cpu soft limit
+ tWorkloadGroupInfo.setEnableCpuHardLimit(Config.enable_cpu_hard_limit);
+
+ if (Config.enable_cpu_hard_limit && cpuHardLimit <= 0) {
+ LOG.warn("enable_cpu_hard_limit=true but cpuHardLimit value not
illegal,"
+ + "id=" + id + ",name=" + name);
+ }
+
TopicInfo topicInfo = new TopicInfo();
- topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP);
- topicInfo.setInfoMap(newHashMap);
- topicInfo.setTopicKey(name);
+ topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
return topicInfo;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 44c43c19fad..26b11f0cb85 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -125,13 +125,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
throw new UserException("Workload group " + groupName + " does
not exist");
}
workloadGroups.add(workloadGroup.toThrift());
- // note(wb) -1 to tell be no need to not use cpu hard limit
- int cpuHardLimitThriftVal = -1;
- if (Config.enable_cpu_hard_limit &&
workloadGroup.getCpuHardLimit() > 0) {
- cpuHardLimitThriftVal = workloadGroup.getCpuHardLimit();
- }
-
workloadGroups.get(0).getProperties().put(WorkloadGroup.CPU_HARD_LIMIT,
- String.valueOf(cpuHardLimitThriftVal));
context.setWorkloadGroupName(groupName);
} finally {
readUnlock();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index 89062d87cda..e4501fd6463 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -30,7 +30,7 @@ import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.thrift.TopicInfo;
import com.google.common.collect.Maps;
import mockit.Delegate;
@@ -169,11 +169,11 @@ public class WorkloadGroupMgrTest {
CreateWorkloadGroupStmt stmt1 = new CreateWorkloadGroupStmt(false,
name1, properties1);
workloadGroupMgr.createWorkloadGroup(stmt1);
context.getSessionVariable().setWorkloadGroup(name1);
- List<TPipelineWorkloadGroup> tWorkloadGroups1 =
workloadGroupMgr.getWorkloadGroup(context);
+ List<TopicInfo> tWorkloadGroups1 =
workloadGroupMgr.getPublishTopicInfo();
Assert.assertEquals(1, tWorkloadGroups1.size());
- TPipelineWorkloadGroup tWorkloadGroup1 = tWorkloadGroups1.get(0);
- Assert.assertEquals(name1, tWorkloadGroup1.getName());
-
Assert.assertTrue(tWorkloadGroup1.getProperties().containsKey(WorkloadGroup.CPU_SHARE));
+ TopicInfo tWorkloadGroup1 = tWorkloadGroups1.get(0);
+ Assert.assertEquals(name1,
tWorkloadGroup1.getWorkloadGroupInfo().getName());
+ Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare()
== 10);
try {
context.getSessionVariable().setWorkloadGroup("g2");
@@ -242,9 +242,9 @@ public class WorkloadGroupMgrTest {
workloadGroupMgr.alterWorkloadGroup(stmt2);
context.getSessionVariable().setWorkloadGroup(name);
- List<TPipelineWorkloadGroup> tWorkloadGroups =
workloadGroupMgr.getWorkloadGroup(context);
+ List<TopicInfo> tWorkloadGroups =
workloadGroupMgr.getPublishTopicInfo();
Assert.assertEquals(1, tWorkloadGroups.size());
- TPipelineWorkloadGroup tWorkloadGroup1 = tWorkloadGroups.get(0);
-
Assert.assertEquals(tWorkloadGroup1.getProperties().get(WorkloadGroup.CPU_SHARE),
"5");
+ TopicInfo tWorkloadGroup1 = tWorkloadGroups.get(0);
+ Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare()
== 5);
}
}
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index d35d6166d3b..1f2e8185a66 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -164,14 +164,23 @@ enum TTopicInfoType {
WORKLOAD_GROUP
}
+struct TWorkloadGroupInfo {
+ 1: optional i64 id
+ 2: optional string name
+ 3: optional i64 version
+ 4: optional i64 cpu_share
+ 5: optional i32 cpu_hard_limit
+ 6: optional string mem_limit
+ 7: optional bool enable_memory_overcommit
+ 8: optional bool enable_cpu_hard_limit
+}
+
struct TopicInfo {
- 1: optional string topic_key
- 2: required TTopicInfoType topic_type
- 3: optional map<string, string> info_map
+ 1: optional TWorkloadGroupInfo workload_group_info
}
struct TPublishTopicRequest {
- 1: required list<TopicInfo> topic_list
+ 1: required map<TTopicInfoType, list<TopicInfo>> topic_map
}
struct TPublishTopicResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]