This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b1ec70afd0 [Feature](executor)Workload Group support Non-Pipeline 
Execution (#30164)
2b1ec70afd0 is described below

commit 2b1ec70afd045ed421fcb346c1597d3e4931840b
Author: wangbo <wan...@apache.org>
AuthorDate: Mon Jan 22 20:25:28 2024 +0800

    [Feature](executor)Workload Group support Non-Pipeline Execution (#30164)
---
 be/src/agent/workload_group_listener.cpp           |  6 ++-
 be/src/common/config.cpp                           |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  7 +--
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  5 +-
 be/src/runtime/fragment_mgr.cpp                    | 61 +++++++++-------------
 be/src/runtime/query_context.cpp                   | 25 +++++++++
 be/src/runtime/query_context.h                     | 13 +++--
 be/src/runtime/task_group/task_group_manager.cpp   | 57 ++++++++++++++++----
 be/src/runtime/task_group/task_group_manager.h     |  9 ++--
 .../workload_sched_policy_mgr.cpp                  |  1 -
 be/src/vec/exec/scan/scanner_context.h             |  2 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  3 ++
 .../java/org/apache/doris/qe/StmtExecutor.java     |  2 +-
 gensrc/thrift/PaloInternalService.thrift           | 16 +++---
 14 files changed, 132 insertions(+), 77 deletions(-)

diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index 6ea7c28669c..237d6c77274 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -26,6 +26,7 @@ namespace doris {
 
 void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& 
topic_info_list) {
     std::set<uint64_t> current_wg_ids;
+    bool is_set_cgroup_path = config::doris_cgroup_cpu_path != "";
     for (const TopicInfo& topic_info : topic_info_list) {
         if (!topic_info.__isset.workload_group_info) {
             continue;
@@ -52,7 +53,7 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
         // 4 create and update task scheduler
         Status ret2 = 
_exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info,
                                                                                
 _exec_env);
-        if (!ret2.ok()) {
+        if (is_set_cgroup_path && !ret2.ok()) {
             LOG(INFO) << "upsert task sche failed, tg_id=" << 
task_group_info.id
                       << ", reason=" << ret2.to_string();
         }
@@ -63,7 +64,8 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
                   << ", cgroup cpu_shares=" << 
task_group_info.cgroup_cpu_shares
                   << ", cgroup cpu_hard_limit=" << 
task_group_info.cgroup_cpu_hard_limit
                   << ", enable_cgroup_cpu_soft_limit="
-                  << (config::enable_cgroup_cpu_soft_limit ? "true" : "false");
+                  << (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
+                  << ", is set cgroup path=" << (is_set_cgroup_path ? "true" : 
"flase");
     }
 
     _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5468ac29a02..271ad72410d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1133,7 +1133,7 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
 
 // cgroup
 DEFINE_mString(doris_cgroup_cpu_path, "");
-DEFINE_mBool(enable_cgroup_cpu_soft_limit, "false");
+DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
 
 DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 538a2ce1bdb..909039b23fb 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -718,12 +718,7 @@ Status PipelineFragmentContext::submit() {
 
     int submit_tasks = 0;
     Status st;
-    auto* scheduler = _exec_env->pipeline_task_scheduler();
-    if (_query_ctx->get_task_scheduler()) {
-        scheduler = _query_ctx->get_task_scheduler();
-    } else if (_task_group_entity && 
_query_ctx->use_task_group_for_cpu_limit.load()) {
-        scheduler = _exec_env->pipeline_task_group_scheduler();
-    }
+    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
     for (auto& task : _tasks) {
         st = scheduler->schedule_task(task.get());
         if (!st) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index a44db667450..4a16b97b2f3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1219,10 +1219,7 @@ Status PipelineXFragmentContext::submit() {
 
     int submit_tasks = 0;
     Status st;
-    auto* scheduler = _exec_env->pipeline_task_scheduler();
-    if (_task_group_entity) {
-        scheduler = _exec_env->pipeline_task_group_scheduler();
-    }
+    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
     for (auto& task : _tasks) {
         for (auto& t : task) {
             st = scheduler->schedule_task(t.get());
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0fbaacb4fa2..a76c7687f02 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -677,41 +677,27 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         query_ctx->register_memory_statistics();
         query_ctx->register_cpu_statistics();
 
+        bool is_pipeline = false;
         if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
-            if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
-                uint64_t tg_id = params.workload_groups[0].id;
-                auto* tg_mgr = _exec_env->task_group_manager();
-                if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) 
{
-                    
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
-                    // set task group to queryctx for memory tracker can be 
removed, see QueryContext's destructor
-                    query_ctx->set_task_group(task_group_ptr);
-                    stringstream ss;
-                    ss << "Query/load id: " << print_id(query_ctx->query_id())
-                       << ", use task group:" << task_group_ptr->debug_string()
-                       << ", enable cpu hard limit:"
-                       << (tg_mgr->enable_cpu_hard_limit() ? "true" : "false");
-                    bool ret = false;
-                    if (tg_mgr->enable_cgroup()) {
-                        ret = tg_mgr->set_cg_task_sche_for_query_ctx(tg_id, 
query_ctx.get());
-                        if (ret) {
-                            ss << ", use cgroup for cpu limit.";
-                        } else {
-                            ss << ", not found cgroup sche, no limit for cpu.";
-                        }
-                    } else {
-                        ss << ", use doris sche for cpu limit.";
-                        query_ctx->use_task_group_for_cpu_limit.store(true);
-                    }
-                    LOG(INFO) << ss.str();
-                    
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
-                            print_id(query_id), tg_id);
-                } else {
-                    VLOG_DEBUG << "Query/load id: " << 
print_id(query_ctx->query_id())
-                               << " no task group found, does not use task 
group.";
-                }
-            } else {
-                VLOG_DEBUG << "Query/load id: " << 
print_id(query_ctx->query_id())
-                           << " does not use task group.";
+            is_pipeline = true;
+        }
+
+        if (params.__isset.workload_groups && !params.workload_groups.empty()) 
{
+            uint64_t tg_id = params.workload_groups[0].id;
+            auto* tg_mgr = _exec_env->task_group_manager();
+            if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) {
+                
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
+                // set task group to queryctx for memory tracker can be 
removed, see QueryContext's destructor
+                query_ctx->set_task_group(task_group_ptr);
+                
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
+                                                                               
  tg_id);
+                query_ctx->set_query_scheduler(tg_id);
+
+                LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
+                          << ", use task group: " << 
task_group_ptr->debug_string()
+                          << ", is pipeline: " << ((int)is_pipeline)
+                          << ", enable cgroup soft limit: "
+                          << ((int)config::enable_cgroup_cpu_soft_limit);
             }
         }
 
@@ -795,7 +781,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
                 std::make_pair(params.params.fragment_instance_id, 
fragment_executor));
         _cv.notify_all();
     }
-    auto st = _thread_pool->submit_func(
+
+    auto* current_thread_pool = query_ctx->get_non_pipe_exec_thread_pool();
+    if (!current_thread_pool) {
+        current_thread_pool = _thread_pool.get();
+    }
+    auto st = current_thread_pool->submit_func(
             [this, fragment_executor, cb] { _exec_actual(fragment_executor, 
cb); });
     if (!st.ok()) {
         {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index fffb5ad57a9..a70bf6695ac 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -20,6 +20,7 @@
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_x/dependency.h"
 #include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/task_group/task_group_manager.h"
 
 namespace doris {
 
@@ -152,4 +153,28 @@ void QueryContext::register_cpu_statistics() {
     }
 }
 
+void QueryContext::set_query_scheduler(uint64_t tg_id) {
+    auto* tg_mgr = _exec_env->task_group_manager();
+    tg_mgr->get_query_scheduler(tg_id, &_task_scheduler, &_scan_task_scheduler,
+                                &_non_pipe_thread_pool);
+}
+
+doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
+    if (!config::enable_cgroup_cpu_soft_limit) {
+        return _exec_env->pipeline_task_group_scheduler();
+    } else if (_task_scheduler) {
+        return _task_scheduler;
+    } else {
+        return _exec_env->pipeline_task_scheduler();
+    }
+}
+
+ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
+    if (_task_group) {
+        return _non_pipe_thread_pool;
+    } else {
+        return nullptr;
+    }
+}
+
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index dd24206c415..d5a8f12cee1 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -218,6 +218,12 @@ public:
 
     std::shared_ptr<QueryStatistics> get_cpu_statistics() { return 
_cpu_statistics; }
 
+    void set_query_scheduler(uint64_t wg_id);
+
+    doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
+
+    ThreadPool* get_non_pipe_exec_thread_pool();
+
 public:
     DescriptorTbl* desc_tbl = nullptr;
     bool set_rsc_info = false;
@@ -247,8 +253,6 @@ public:
     // only for file scan node
     std::map<int, TFileScanRangeParams> file_scan_range_params_map;
 
-    std::atomic<bool> use_task_group_for_cpu_limit = false;
-
 private:
     TUniqueId _query_id;
     ExecEnv* _exec_env = nullptr;
@@ -272,7 +276,7 @@ private:
     std::shared_ptr<vectorized::SharedScannerController> 
_shared_scanner_controller;
     vectorized::RuntimePredicate _runtime_predicate;
 
-    taskgroup::TaskGroupPtr _task_group;
+    taskgroup::TaskGroupPtr _task_group = nullptr;
     std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
     const TQueryOptions _query_options;
 
@@ -281,8 +285,9 @@ private:
     // to report the real message if failed.
     Status _exec_status = Status::OK();
 
-    pipeline::TaskScheduler* _task_scheduler = nullptr;
+    doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
     vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
+    ThreadPool* _non_pipe_thread_pool = nullptr;
     std::unique_ptr<pipeline::Dependency> _execution_dependency;
 
     std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index cbd7b5f73f6..74694baa9fc 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -67,18 +67,25 @@ TaskGroupPtr 
TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
     return nullptr;
 }
 
-bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, 
QueryContext* query_ctx_ptr) {
-    std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
+void TaskGroupManager::get_query_scheduler(uint64_t tg_id,
+                                           doris::pipeline::TaskScheduler** 
exec_sched,
+                                           
vectorized::SimplifiedScanScheduler** scan_sched,
+                                           ThreadPool** non_pipe_thread_pool) {
+    std::shared_lock<std::shared_mutex> r_lock(_task_scheduler_lock);
     auto tg_sche_it = _tg_sche_map.find(tg_id);
     if (tg_sche_it != _tg_sche_map.end()) {
-        query_ctx_ptr->set_task_scheduler(tg_sche_it->second.get());
-        auto _tg_scan_sche_it = _tg_scan_sche_map.find(tg_id);
-        if (_tg_scan_sche_it != _tg_scan_sche_map.end()) {
-            
query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_it->second.get());
-            return true;
-        }
+        *exec_sched = tg_sche_it->second.get();
+    }
+
+    auto tg_scan_sche_it = _tg_scan_sche_map.find(tg_id);
+    if (tg_scan_sche_it != _tg_scan_sche_map.end()) {
+        *scan_sched = tg_scan_sche_it->second.get();
+    }
+
+    auto non_pipe_thread_pool_iter = _non_pipe_thread_pool_map.find(tg_id);
+    if (non_pipe_thread_pool_iter != _non_pipe_thread_pool_map.end()) {
+        *non_pipe_thread_pool = non_pipe_thread_pool_iter->second.get();
     }
-    return false;
 }
 
 Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* 
tg_info,
@@ -135,7 +142,23 @@ Status 
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
         }
     }
 
-    // step 4 update cgroup cpu if needed
+    // step 4: init non-pipe scheduler
+    if (_non_pipe_thread_pool_map.find(tg_id) == 
_non_pipe_thread_pool_map.end()) {
+        std::unique_ptr<ThreadPool> thread_pool = nullptr;
+        auto ret = ThreadPoolBuilder("nonPip_" + tg_name)
+                           .set_min_threads(1)
+                           
.set_max_threads(config::fragment_pool_thread_num_max)
+                           
.set_max_queue_size(config::fragment_pool_queue_size)
+                           .set_cgroup_cpu_ctl(cg_cu_ctl_ptr)
+                           .build(&thread_pool);
+        if (!ret.ok()) {
+            LOG(INFO) << "create non-pipline thread pool failed";
+        } else {
+            _non_pipe_thread_pool_map.emplace(tg_id, std::move(thread_pool));
+        }
+    }
+
+    // step 5: update cgroup cpu if needed
     if (enable_cpu_hard_limit) {
         if (cpu_hard_limit > 0) {
             _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
@@ -160,6 +183,7 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
     // stop task sche may cost some time, so it should not be locked
     std::set<doris::pipeline::TaskScheduler*> task_sche_to_del;
     std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del;
+    std::set<ThreadPool*> non_pip_thread_pool_to_del;
     std::set<uint64_t> deleted_tg_ids;
     {
         std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
@@ -177,6 +201,13 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
                 scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get());
             }
         }
+        for (auto iter = _non_pipe_thread_pool_map.begin(); iter != 
_non_pipe_thread_pool_map.end();
+             iter++) {
+            uint64_t tg_id = iter->first;
+            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+                
non_pip_thread_pool_to_del.insert(_non_pipe_thread_pool_map[tg_id].get());
+            }
+        }
     }
     // 1 stop all threads
     for (auto* ptr1 : task_sche_to_del) {
@@ -185,6 +216,9 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
     for (auto* ptr2 : scan_task_sche_to_del) {
         ptr2->stop();
     }
+    for (auto& ptr3 : non_pip_thread_pool_to_del) {
+        ptr3->shutdown();
+    }
     // 2 release resource in memory
     {
         std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
@@ -242,6 +276,9 @@ void TaskGroupManager::stop() {
     for (auto& task_sche : _tg_scan_sche_map) {
         task_sche.second->stop();
     }
+    for (auto& no_pip_sche : _non_pipe_thread_pool_map) {
+        no_pip_sche.second->shutdown();
+    }
 }
 
 } // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 08968b6fe99..a7ccb52f00e 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -65,11 +65,9 @@ public:
 
     bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
 
-    bool set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* 
query_ctx_ptr);
-
-    // currently cgroup both support cpu soft limit and cpu hard limit
-    // doris task group only support cpu soft limit
-    bool enable_cgroup() { return enable_cpu_hard_limit() || 
config::enable_cgroup_cpu_soft_limit; }
+    void get_query_scheduler(uint64_t tg_id, doris::pipeline::TaskScheduler** 
exec_sched,
+                             vectorized::SimplifiedScanScheduler** scan_sched,
+                             ThreadPool** non_pipe_thread_pool);
 
 private:
     std::shared_mutex _group_mutex;
@@ -81,6 +79,7 @@ private:
     std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>> 
_tg_sche_map;
     std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>> 
_tg_scan_sche_map;
     std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;
+    std::map<uint64_t, std::unique_ptr<ThreadPool>> _non_pipe_thread_pool_map;
 
     std::shared_mutex _init_cg_ctl_lock;
     std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp 
b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
index 2398bff465c..731dd0c8661 100644
--- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
@@ -81,7 +81,6 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
         if (list.size() == 0) {
             continue;
         }
-        LOG(INFO) << "[workload_schedule] get query list size=" << list.size();
 
         for (int i = 0; i < list.size(); i++) {
             WorkloadQueryInfo* query_info_ptr = &(list[i]);
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 1dd9966b162..28aec83d6a2 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -253,7 +253,7 @@ protected:
     const int64_t _max_bytes_in_queue;
 
     doris::vectorized::ScannerScheduler* _scanner_scheduler;
-    SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; // used for cpu 
hard limit
+    SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
     // List "scanners" saves all "unfinished" scanners.
     // The scanner scheduler will pop scanners from this list, run scanner,
     // and then if the scanner is not finished, will be pushed back to this 
list.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 586a27ee103..8581980da2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3639,6 +3639,9 @@ public class Coordinator implements CoordInterface {
                 params.params.setPerNodeScanRanges(scanRanges);
                 params.params.setPerExchNumSenders(perExchNumSenders);
 
+                if (tWorkloadGroups != null) {
+                    params.setWorkloadGroups(tWorkloadGroups);
+                }
                 params.params.setDestinations(destinations);
                 params.params.setSenderId(i);
                 params.params.setNumSenders(instanceExecParams.size());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 767d03f43d7..8b72ce3eb2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1519,7 +1519,7 @@ public class StmtExecutor {
             coordBase = new PointQueryExec(planner, analyzer);
         } else {
             coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
-            if (Config.enable_workload_group && 
context.sessionVariable.getEnablePipelineEngine()) {
+            if (Config.enable_workload_group) {
                 
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
             } else {
                 context.setWorkloadGroupName("");
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index fb9a1888f6c..10f70bd7f86 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -400,6 +400,13 @@ struct TGlobalDict {
   2: optional map<i32, i32> slot_dicts // map from slot id to column dict id, 
because 2 or more column may share the dict
 }
 
+struct TPipelineWorkloadGroup {
+  1: optional i64 id
+  2: optional string name
+  3: optional map<string, string> properties
+  4: optional i64 version
+}
+
 // ExecPlanFragment
 struct TExecPlanFragmentParams {
   1: required PaloInternalServiceVersion protocol_version
@@ -483,6 +490,8 @@ struct TExecPlanFragmentParams {
 
   29: optional i64 content_length
 
+  30: optional list<TPipelineWorkloadGroup> workload_groups
+
   // For cloud
   1000: optional bool is_mow_table;
 }
@@ -670,13 +679,6 @@ struct TPipelineInstanceParams {
   7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
 }
 
-struct TPipelineWorkloadGroup {
-  1: optional i64 id
-  2: optional string name
-  3: optional map<string, string> properties
-  4: optional i64 version
-}
-
 // ExecPlanFragment
 struct TPipelineFragmentParams {
   1: required PaloInternalServiceVersion protocol_version


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to