This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e502ce4dd64 branch-3.0: [Refactor]refactor log workload group when
query arrives be #45034 (#46283)
e502ce4dd64 is described below
commit e502ce4dd64787857a54fe37d730505ad00eb079
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jan 2 18:32:13 2025 +0800
branch-3.0: [Refactor]refactor log workload group when query arrives be
#45034 (#46283)
Cherry-picked from #45034
Co-authored-by: wangbo <[email protected]>
---
be/src/runtime/fragment_mgr.cpp | 33 +++++++++++++---------
be/src/runtime/load_channel.cpp | 2 +-
be/src/runtime/query_context.cpp | 3 +-
be/src/runtime/query_context.h | 2 +-
.../workload_group/workload_group_manager.cpp | 6 ++--
.../workload_group/workload_group_manager.h | 2 +-
6 files changed, 26 insertions(+), 22 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index af4c5ac29e9..87c165222fc 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -685,12 +685,25 @@ Status FragmentMgr::_get_or_create_query_ctx(const
TPipelineFragmentParams& para
}
if (!query_ctx) {
+ WorkloadGroupPtr workload_group_ptr = nullptr;
+ std::string wg_info_str = "Workload Group not set";
+ if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
+ uint64_t wg_id = params.workload_groups[0].id;
+ workload_group_ptr =
_exec_env->workload_group_mgr()->get_group(wg_id);
+ if (workload_group_ptr != nullptr) {
+ wg_info_str = workload_group_ptr->debug_string();
+ } else {
+ wg_info_str = "set wg but not find it in be";
+ }
+ }
+
// First time a fragment of a query arrived. print logs.
LOG(INFO) << "query_id: " << print_id(query_id) << ",
coord_addr: " << params.coord
<< ", total fragment num on current host: " <<
params.fragment_num_on_host
<< ", fe process uuid: " <<
params.query_options.fe_process_uuid
<< ", query type: " <<
params.query_options.query_type
- << ", report audit fe:" << params.current_connect_fe;
+ << ", report audit fe:" << params.current_connect_fe
+ << ", use wg:" << wg_info_str;
// This may be a first fragment request of the query.
// Create the query fragments context.
@@ -715,19 +728,11 @@ Status FragmentMgr::_get_or_create_query_ctx(const
TPipelineFragmentParams& para
_set_scan_concurrency(params, query_ctx.get());
- if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
- uint64_t tg_id = params.workload_groups[0].id;
- WorkloadGroupPtr workload_group_ptr =
-
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
- if (workload_group_ptr != nullptr) {
-
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
-
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
-
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
- print_id(query_id), tg_id);
- } else {
- LOG(WARNING) << "Query/load id: " <<
print_id(query_ctx->query_id())
- << "can't find its workload group " <<
tg_id;
- }
+ if (workload_group_ptr != nullptr) {
+ RETURN_IF_ERROR(workload_group_ptr->add_query(query_id,
query_ctx));
+ query_ctx->set_workload_group(workload_group_ptr);
+
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+ print_id(query_id), workload_group_ptr->id());
}
// There is some logic in query ctx's dctor, we could not
check if exists and delete the
// temp query ctx now. For example, the query id maybe removed
from workload group's queryset.
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 4ff83ff93df..6156c21ce66 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -59,7 +59,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
timeout_s, bool is_hig
fmt::format("(FromLoadChannel)Load#Id={}",
_load_id.to_string()));
if (wg_id > 0) {
WorkloadGroupPtr workload_group_ptr =
-
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(wg_id);
+
ExecEnv::GetInstance()->workload_group_mgr()->get_group(wg_id);
if (workload_group_ptr) {
wg_ptr = workload_group_ptr;
wg_ptr->add_mem_tracker_limiter(mem_tracker);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index f4d4256e66f..d5ceefb84b2 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -325,14 +325,13 @@ ThreadPool* QueryContext::get_memtable_flush_pool() {
}
}
-Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
+void QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
_workload_group = tg;
// Should add query first, then the workload group will not be deleted.
// see task_group_manager::delete_workload_group_by_ids
_workload_group->add_mem_tracker_limiter(query_mem_tracker);
_workload_group->get_query_scheduler(&_task_scheduler,
&_scan_task_scheduler,
&_memtable_flush_pool,
&_remote_scan_task_scheduler);
- return Status::OK();
}
void QueryContext::add_fragment_profile(
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 5c8578bd92e..1a039080a9a 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -139,7 +139,7 @@ public:
}
}
- Status set_workload_group(WorkloadGroupPtr& tg);
+ void set_workload_group(WorkloadGroupPtr& tg);
int execution_timeout() const {
return _query_options.__isset.execution_timeout ?
_query_options.execution_timeout
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 930a72eb25f..6f29d9f4df6 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -68,10 +68,10 @@ void WorkloadGroupMgr::get_related_workload_groups(
}
}
-WorkloadGroupPtr WorkloadGroupMgr::get_task_group_by_id(uint64_t tg_id) {
+WorkloadGroupPtr WorkloadGroupMgr::get_group(uint64_t wg_id) {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- if (_workload_groups.find(tg_id) != _workload_groups.end()) {
- return _workload_groups.at(tg_id);
+ if (_workload_groups.find(wg_id) != _workload_groups.end()) {
+ return _workload_groups.at(wg_id);
}
return nullptr;
}
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 63562caaf7f..db2444e0bc1 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -48,7 +48,7 @@ public:
void delete_workload_group_by_ids(std::set<uint64_t> id_set);
- WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);
+ WorkloadGroupPtr get_group(uint64_t wg_id);
void do_sweep();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]